You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2021/03/03 09:37:46 UTC

[airflow] 20/28: Support google-cloud-logging` >=2.0.0 (#13801)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 25f2db1c5dd043ee8bf9b9c7b2f09a505ce9f560
Author: Kamil BreguĊ‚a <mi...@users.noreply.github.com>
AuthorDate: Wed Feb 3 04:16:50 2021 +0100

    Support google-cloud-logging` >=2.0.0 (#13801)
    
    (cherry picked from commit 0e8c77b93a5ca5ecfdcd1c4bd91f54846fc15d57)
---
 airflow/providers/google/ADDITIONAL_INFO.md        |   1 +
 .../google/cloud/log/stackdriver_task_handler.py   |  72 +++++--
 setup.py                                           |   2 +-
 .../cloud/log/test_stackdriver_task_handler.py     | 225 +++++++++++++--------
 4 files changed, 200 insertions(+), 100 deletions(-)

diff --git a/airflow/providers/google/ADDITIONAL_INFO.md b/airflow/providers/google/ADDITIONAL_INFO.md
index 9cf9853..a363051 100644
--- a/airflow/providers/google/ADDITIONAL_INFO.md
+++ b/airflow/providers/google/ADDITIONAL_INFO.md
@@ -34,6 +34,7 @@ Details are covered in the UPDATING.md files for each library, but there are som
 | [``google-cloud-datacatalog``](https://pypi.org/project/google-cloud-datacatalog/) | ``>=0.5.0,<0.8`` | ``>=3.0.0,<4.0.0``  | [`UPGRADING.md`](https://github.com/googleapis/python-datacatalog/blob/master/UPGRADING.md) |
 | [``google-cloud-dataproc``](https://pypi.org/project/google-cloud-dataproc/) | ``>=1.0.1,<2.0.0`` | ``>=2.2.0,<3.0.0``  | [`UPGRADING.md`](https://github.com/googleapis/python-dataproc/blob/master/UPGRADING.md) |
 | [``google-cloud-kms``](https://pypi.org/project/google-cloud-os-login/) | ``>=1.2.1,<2.0.0`` | ``>=2.0.0,<3.0.0``  | [`UPGRADING.md`](https://github.com/googleapis/python-kms/blob/master/UPGRADING.md) |
+| [``google-cloud-logging``](https://pypi.org/project/google-cloud-logging/) | ``>=1.14.0,<2.0.0`` | ``>=2.0.0,<3.0.0``  | [`UPGRADING.md`](https://github.com/googleapis/python-logging/blob/master/UPGRADING.md) |
 | [``google-cloud-monitoring``](https://pypi.org/project/google-cloud-monitoring/) | ``>=0.34.0,<2.0.0`` | ``>=2.0.0,<3.0.0``  | [`UPGRADING.md`](https://github.com/googleapis/python-monitoring/blob/master/UPGRADING.md) |
 | [``google-cloud-os-login``](https://pypi.org/project/google-cloud-os-login/) | ``>=1.0.0,<2.0.0`` | ``>=2.0.0,<3.0.0``  | [`UPGRADING.md`](https://github.com/googleapis/python-oslogin/blob/master/UPGRADING.md) |
 | [``google-cloud-pubsub``](https://pypi.org/project/google-cloud-pubsub/) | ``>=1.0.0,<2.0.0`` | ``>=2.0.0,<3.0.0``  | [`UPGRADING.md`](https://github.com/googleapis/python-pubsub/blob/master/UPGRADING.md) |
diff --git a/airflow/providers/google/cloud/log/stackdriver_task_handler.py b/airflow/providers/google/cloud/log/stackdriver_task_handler.py
index be75fcd..5479185 100644
--- a/airflow/providers/google/cloud/log/stackdriver_task_handler.py
+++ b/airflow/providers/google/cloud/log/stackdriver_task_handler.py
@@ -21,9 +21,12 @@ from urllib.parse import urlencode
 
 from cached_property import cached_property
 from google.api_core.gapic_v1.client_info import ClientInfo
+from google.auth.credentials import Credentials
 from google.cloud import logging as gcp_logging
+from google.cloud.logging import Resource
 from google.cloud.logging.handlers.transports import BackgroundThreadTransport, Transport
-from google.cloud.logging.resource import Resource
+from google.cloud.logging_v2.services.logging_service_v2 import LoggingServiceV2Client
+from google.cloud.logging_v2.types import ListLogEntriesRequest, ListLogEntriesResponse
 
 from airflow import version
 from airflow.models import TaskInstance
@@ -99,13 +102,19 @@ class StackdriverTaskHandler(logging.Handler):
         self.resource: Resource = resource
         self.labels: Optional[Dict[str, str]] = labels
         self.task_instance_labels: Optional[Dict[str, str]] = {}
+        self.task_instance_hostname = 'default-hostname'
 
     @cached_property
-    def _client(self) -> gcp_logging.Client:
-        """Google Cloud Library API client"""
+    def _credentials_and_project(self) -> Tuple[Credentials, str]:
         credentials, project = get_credentials_and_project_id(
             key_path=self.gcp_key_path, scopes=self.scopes, disable_logging=True
         )
+        return credentials, project
+
+    @property
+    def _client(self) -> gcp_logging.Client:
+        """The Cloud Library API client"""
+        credentials, project = self._credentials_and_project
         client = gcp_logging.Client(
             credentials=credentials,
             project=project,
@@ -113,6 +122,16 @@ class StackdriverTaskHandler(logging.Handler):
         )
         return client
 
+    @property
+    def _logging_service_client(self) -> LoggingServiceV2Client:
+        """The Cloud logging service v2 client."""
+        credentials, _ = self._credentials_and_project
+        client = LoggingServiceV2Client(
+            credentials=credentials,
+            client_info=ClientInfo(client_library_version='airflow_v' + version.version),
+        )
+        return client
+
     @cached_property
     def _transport(self) -> Transport:
         """Object responsible for sending data to Stackdriver"""
@@ -146,10 +165,11 @@ class StackdriverTaskHandler(logging.Handler):
         :type task_instance:  :class:`airflow.models.TaskInstance`
         """
         self.task_instance_labels = self._task_instance_to_labels(task_instance)
+        self.task_instance_hostname = task_instance.hostname
 
     def read(
         self, task_instance: TaskInstance, try_number: Optional[int] = None, metadata: Optional[Dict] = None
-    ) -> Tuple[List[str], List[Dict]]:
+    ) -> Tuple[List[Tuple[Tuple[str, str]]], List[Dict[str, str]]]:
         """
         Read logs of given task instance from Stackdriver logging.
 
@@ -160,12 +180,14 @@ class StackdriverTaskHandler(logging.Handler):
         :type try_number: Optional[int]
         :param metadata: log metadata. It is used for steaming log reading and auto-tailing.
         :type metadata: Dict
-        :return: a tuple of list of logs and list of metadata
-        :rtype: Tuple[List[str], List[Dict]]
+        :return: a tuple of (
+            list of (one element tuple with two element tuple - hostname and logs)
+            and list of metadata)
+        :rtype: Tuple[List[Tuple[Tuple[str, str]]], List[Dict[str, str]]]
         """
         if try_number is not None and try_number < 1:
-            logs = [f"Error fetching the logs. Try number {try_number} is invalid."]
-            return logs, [{"end_of_log": "true"}]
+            logs = f"Error fetching the logs. Try number {try_number} is invalid."
+            return [((self.task_instance_hostname, logs),)], [{"end_of_log": "true"}]
 
         if not metadata:
             metadata = {}
@@ -188,7 +210,7 @@ class StackdriverTaskHandler(logging.Handler):
         if next_page_token:
             new_metadata['next_page_token'] = next_page_token
 
-        return [messages], [new_metadata]
+        return [((self.task_instance_hostname, messages),)], [new_metadata]
 
     def _prepare_log_filter(self, ti_labels: Dict[str, str]) -> str:
         """
@@ -210,9 +232,10 @@ class StackdriverTaskHandler(logging.Handler):
             escaped_value = value.replace("\\", "\\\\").replace('"', '\\"')
             return f'"{escaped_value}"'
 
+        _, project = self._credentials_and_project
         log_filters = [
             f'resource.type={escale_label_value(self.resource.type)}',
-            f'logName="projects/{self._client.project}/logs/{self.name}"',
+            f'logName="projects/{project}/logs/{self.name}"',
         ]
 
         for key, value in self.resource.labels.items():
@@ -252,6 +275,8 @@ class StackdriverTaskHandler(logging.Handler):
                     log_filter=log_filter, page_token=next_page_token
                 )
                 messages.append(new_messages)
+                if not messages:
+                    break
 
             end_of_log = True
             next_page_token = None
@@ -271,15 +296,21 @@ class StackdriverTaskHandler(logging.Handler):
         :return: Downloaded logs and next page token
         :rtype: Tuple[str, str]
         """
-        entries = self._client.list_entries(filter_=log_filter, page_token=page_token)
-        page = next(entries.pages)
-        next_page_token = entries.next_page_token
+        _, project = self._credentials_and_project
+        request = ListLogEntriesRequest(
+            resource_names=[f'projects/{project}'],
+            filter=log_filter,
+            page_token=page_token,
+            order_by='timestamp asc',
+            page_size=1000,
+        )
+        response = self._logging_service_client.list_log_entries(request=request)
+        page: ListLogEntriesResponse = next(response.pages)
         messages = []
-        for entry in page:
-            if "message" in entry.payload:
-                messages.append(entry.payload["message"])
-
-        return "\n".join(messages), next_page_token
+        for entry in page.entries:
+            if "message" in entry.json_payload:
+                messages.append(entry.json_payload["message"])
+        return "\n".join(messages), page.next_page_token
 
     @classmethod
     def _task_instance_to_labels(cls, ti: TaskInstance) -> Dict[str, str]:
@@ -315,7 +346,7 @@ class StackdriverTaskHandler(logging.Handler):
         :return: URL to the external log collection service
         :rtype: str
         """
-        project_id = self._client.project
+        _, project_id = self._credentials_and_project
 
         ti_labels = self._task_instance_to_labels(task_instance)
         ti_labels[self.LABEL_TRY_NUMBER] = str(try_number)
@@ -331,3 +362,6 @@ class StackdriverTaskHandler(logging.Handler):
 
         url = f"{self.LOG_VIEWER_BASE_URL}?{urlencode(url_query_string)}"
         return url
+
+    def close(self) -> None:
+        self._transport.flush()
diff --git a/setup.py b/setup.py
index fa1e73a..7beb684 100644
--- a/setup.py
+++ b/setup.py
@@ -292,7 +292,7 @@ google = [
     'google-cloud-dlp>=0.11.0,<2.0.0',
     'google-cloud-kms>=2.0.0,<3.0.0',
     'google-cloud-language>=1.1.1,<2.0.0',
-    'google-cloud-logging>=1.14.0,<2.0.0',
+    'google-cloud-logging>=2.1.1,<3.0.0',
     'google-cloud-memcache>=0.2.0',
     'google-cloud-monitoring>=2.0.0,<3.0.0',
     'google-cloud-os-login>=2.0.0,<3.0.0',
diff --git a/tests/providers/google/cloud/log/test_stackdriver_task_handler.py b/tests/providers/google/cloud/log/test_stackdriver_task_handler.py
index 4159e9e..b4dbf69 100644
--- a/tests/providers/google/cloud/log/test_stackdriver_task_handler.py
+++ b/tests/providers/google/cloud/log/test_stackdriver_task_handler.py
@@ -21,7 +21,8 @@ from datetime import datetime
 from unittest import mock
 from urllib.parse import parse_qs, urlparse
 
-from google.cloud.logging.resource import Resource
+from google.cloud.logging import Resource
+from google.cloud.logging_v2.types import ListLogEntriesRequest, ListLogEntriesResponse, LogEntry
 
 from airflow.models import TaskInstance
 from airflow.models.dag import DAG
@@ -30,15 +31,27 @@ from airflow.providers.google.cloud.log.stackdriver_task_handler import Stackdri
 from airflow.utils.state import State
 
 
-def _create_list_response(messages, token):
-    page = [mock.MagicMock(payload={"message": message}) for message in messages]
-    return mock.MagicMock(pages=(n for n in [page]), next_page_token=token)
+def _create_list_log_entries_response_mock(messages, token):
+    return ListLogEntriesResponse(
+        entries=[LogEntry(json_payload={"message": message}) for message in messages], next_page_token=token
+    )
+
+
+def _remove_stackdriver_handlers():
+    for handler_ref in reversed(logging._handlerList[:]):
+        handler = handler_ref()
+        if not isinstance(handler, StackdriverTaskHandler):
+            continue
+        logging._removeHandlerRef(handler_ref)
+        del handler
 
 
 class TestStackdriverLoggingHandlerStandalone(unittest.TestCase):
     @mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id')
     @mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.gcp_logging.Client')
     def test_should_pass_message_to_client(self, mock_client, mock_get_creds_and_project_id):
+        self.addCleanup(_remove_stackdriver_handlers)
+
         mock_get_creds_and_project_id.return_value = ('creds', 'project_id')
 
         transport_type = mock.MagicMock()
@@ -69,6 +82,7 @@ class TestStackdriverLoggingHandlerTask(unittest.TestCase):
         self.ti.try_number = 1
         self.ti.state = State.RUNNING
         self.addCleanup(self.dag.clear)
+        self.addCleanup(_remove_stackdriver_handlers)
 
     @mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id')
     @mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.gcp_logging.Client')
@@ -118,107 +132,153 @@ class TestStackdriverLoggingHandlerTask(unittest.TestCase):
         )
 
     @mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id')
-    @mock.patch(
-        'airflow.providers.google.cloud.log.stackdriver_task_handler.gcp_logging.Client',
-        **{'return_value.project': 'asf-project'},  # type: ignore
-    )
+    @mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.LoggingServiceV2Client')
     def test_should_read_logs_for_all_try(self, mock_client, mock_get_creds_and_project_id):
-        mock_client.return_value.list_entries.return_value = _create_list_response(["MSG1", "MSG2"], None)
+        mock_client.return_value.list_log_entries.return_value.pages = iter(
+            [_create_list_log_entries_response_mock(["MSG1", "MSG2"], None)]
+        )
         mock_get_creds_and_project_id.return_value = ('creds', 'project_id')
 
         logs, metadata = self.stackdriver_task_handler.read(self.ti)
-        mock_client.return_value.list_entries.assert_called_once_with(
-            filter_='resource.type="global"\n'
-            'logName="projects/asf-project/logs/airflow"\n'
-            'labels.task_id="task_for_testing_file_log_handler"\n'
-            'labels.dag_id="dag_for_testing_file_task_handler"\n'
-            'labels.execution_date="2016-01-01T00:00:00+00:00"',
-            page_token=None,
+        mock_client.return_value.list_log_entries.assert_called_once_with(
+            request=ListLogEntriesRequest(
+                resource_names=["projects/project_id"],
+                filter=(
+                    'resource.type="global"\n'
+                    'logName="projects/project_id/logs/airflow"\n'
+                    'labels.task_id="task_for_testing_file_log_handler"\n'
+                    'labels.dag_id="dag_for_testing_file_task_handler"\n'
+                    'labels.execution_date="2016-01-01T00:00:00+00:00"'
+                ),
+                order_by='timestamp asc',
+                page_size=1000,
+                page_token=None,
+            )
         )
-        assert ['MSG1\nMSG2'] == logs
+        assert [(('default-hostname', 'MSG1\nMSG2'),)] == logs
         assert [{'end_of_log': True}] == metadata
 
     @mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id')
-    @mock.patch(
-        'airflow.providers.google.cloud.log.stackdriver_task_handler.gcp_logging.Client',
-        **{'return_value.project': 'asf-project'},  # type: ignore
-    )
+    @mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.LoggingServiceV2Client')
     def test_should_read_logs_for_task_with_quote(self, mock_client, mock_get_creds_and_project_id):
-        mock_client.return_value.list_entries.return_value = _create_list_response(["MSG1", "MSG2"], None)
+        mock_client.return_value.list_log_entries.return_value.pages = iter(
+            [_create_list_log_entries_response_mock(["MSG1", "MSG2"], None)]
+        )
         mock_get_creds_and_project_id.return_value = ('creds', 'project_id')
         self.ti.task_id = "K\"OT"
         logs, metadata = self.stackdriver_task_handler.read(self.ti)
-        mock_client.return_value.list_entries.assert_called_once_with(
-            filter_='resource.type="global"\n'
-            'logName="projects/asf-project/logs/airflow"\n'
-            'labels.task_id="K\\"OT"\n'
-            'labels.dag_id="dag_for_testing_file_task_handler"\n'
-            'labels.execution_date="2016-01-01T00:00:00+00:00"',
-            page_token=None,
+        mock_client.return_value.list_log_entries.assert_called_once_with(
+            request=ListLogEntriesRequest(
+                resource_names=["projects/project_id"],
+                filter=(
+                    'resource.type="global"\n'
+                    'logName="projects/project_id/logs/airflow"\n'
+                    'labels.task_id="K\\"OT"\n'
+                    'labels.dag_id="dag_for_testing_file_task_handler"\n'
+                    'labels.execution_date="2016-01-01T00:00:00+00:00"'
+                ),
+                order_by='timestamp asc',
+                page_size=1000,
+                page_token=None,
+            )
         )
-        assert ['MSG1\nMSG2'] == logs
+        assert [(('default-hostname', 'MSG1\nMSG2'),)] == logs
         assert [{'end_of_log': True}] == metadata
 
     @mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id')
-    @mock.patch(
-        'airflow.providers.google.cloud.log.stackdriver_task_handler.gcp_logging.Client',
-        **{'return_value.project': 'asf-project'},  # type: ignore
-    )
+    @mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.LoggingServiceV2Client')
     def test_should_read_logs_for_single_try(self, mock_client, mock_get_creds_and_project_id):
-        mock_client.return_value.list_entries.return_value = _create_list_response(["MSG1", "MSG2"], None)
+        mock_client.return_value.list_log_entries.return_value.pages = iter(
+            [_create_list_log_entries_response_mock(["MSG1", "MSG2"], None)]
+        )
         mock_get_creds_and_project_id.return_value = ('creds', 'project_id')
 
         logs, metadata = self.stackdriver_task_handler.read(self.ti, 3)
-        mock_client.return_value.list_entries.assert_called_once_with(
-            filter_='resource.type="global"\n'
-            'logName="projects/asf-project/logs/airflow"\n'
-            'labels.task_id="task_for_testing_file_log_handler"\n'
-            'labels.dag_id="dag_for_testing_file_task_handler"\n'
-            'labels.execution_date="2016-01-01T00:00:00+00:00"\n'
-            'labels.try_number="3"',
-            page_token=None,
+        mock_client.return_value.list_log_entries.assert_called_once_with(
+            request=ListLogEntriesRequest(
+                resource_names=["projects/project_id"],
+                filter=(
+                    'resource.type="global"\n'
+                    'logName="projects/project_id/logs/airflow"\n'
+                    'labels.task_id="task_for_testing_file_log_handler"\n'
+                    'labels.dag_id="dag_for_testing_file_task_handler"\n'
+                    'labels.execution_date="2016-01-01T00:00:00+00:00"\n'
+                    'labels.try_number="3"'
+                ),
+                order_by='timestamp asc',
+                page_size=1000,
+                page_token=None,
+            )
         )
-        assert ['MSG1\nMSG2'] == logs
+        assert [(('default-hostname', 'MSG1\nMSG2'),)] == logs
         assert [{'end_of_log': True}] == metadata
 
     @mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id')
-    @mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.gcp_logging.Client')
+    @mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.LoggingServiceV2Client')
     def test_should_read_logs_with_pagination(self, mock_client, mock_get_creds_and_project_id):
-        mock_client.return_value.list_entries.side_effect = [
-            _create_list_response(["MSG1", "MSG2"], "TOKEN1"),
-            _create_list_response(["MSG3", "MSG4"], None),
+        mock_client.return_value.list_log_entries.side_effect = [
+            mock.MagicMock(pages=iter([_create_list_log_entries_response_mock(["MSG1", "MSG2"], "TOKEN1")])),
+            mock.MagicMock(pages=iter([_create_list_log_entries_response_mock(["MSG3", "MSG4"], None)])),
         ]
         mock_get_creds_and_project_id.return_value = ('creds', 'project_id')
         logs, metadata1 = self.stackdriver_task_handler.read(self.ti, 3)
-        mock_client.return_value.list_entries.assert_called_once_with(filter_=mock.ANY, page_token=None)
-        assert ['MSG1\nMSG2'] == logs
+        mock_client.return_value.list_log_entries.assert_called_once_with(
+            request=ListLogEntriesRequest(
+                resource_names=["projects/project_id"],
+                filter=(
+                    '''resource.type="global"
+logName="projects/project_id/logs/airflow"
+labels.task_id="task_for_testing_file_log_handler"
+labels.dag_id="dag_for_testing_file_task_handler"
+labels.execution_date="2016-01-01T00:00:00+00:00"
+labels.try_number="3"'''
+                ),
+                order_by='timestamp asc',
+                page_size=1000,
+                page_token=None,
+            )
+        )
+        assert [(('default-hostname', 'MSG1\nMSG2'),)] == logs
         assert [{'end_of_log': False, 'next_page_token': 'TOKEN1'}] == metadata1
 
-        mock_client.return_value.list_entries.return_value.next_page_token = None
+        mock_client.return_value.list_log_entries.return_value.next_page_token = None
         logs, metadata2 = self.stackdriver_task_handler.read(self.ti, 3, metadata1[0])
-        mock_client.return_value.list_entries.assert_called_with(filter_=mock.ANY, page_token="TOKEN1")
-        assert ['MSG3\nMSG4'] == logs
+
+        mock_client.return_value.list_log_entries.assert_called_with(
+            request=ListLogEntriesRequest(
+                resource_names=["projects/project_id"],
+                filter=(
+                    'resource.type="global"\n'
+                    'logName="projects/project_id/logs/airflow"\n'
+                    'labels.task_id="task_for_testing_file_log_handler"\n'
+                    'labels.dag_id="dag_for_testing_file_task_handler"\n'
+                    'labels.execution_date="2016-01-01T00:00:00+00:00"\n'
+                    'labels.try_number="3"'
+                ),
+                order_by='timestamp asc',
+                page_size=1000,
+                page_token="TOKEN1",
+            )
+        )
+        assert [(('default-hostname', 'MSG3\nMSG4'),)] == logs
         assert [{'end_of_log': True}] == metadata2
 
     @mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id')
-    @mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.gcp_logging.Client')
+    @mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.LoggingServiceV2Client')
     def test_should_read_logs_with_download(self, mock_client, mock_get_creds_and_project_id):
-        mock_client.return_value.list_entries.side_effect = [
-            _create_list_response(["MSG1", "MSG2"], "TOKEN1"),
-            _create_list_response(["MSG3", "MSG4"], None),
+        mock_client.return_value.list_log_entries.side_effect = [
+            mock.MagicMock(pages=iter([_create_list_log_entries_response_mock(["MSG1", "MSG2"], "TOKEN1")])),
+            mock.MagicMock(pages=iter([_create_list_log_entries_response_mock(["MSG3", "MSG4"], None)])),
         ]
         mock_get_creds_and_project_id.return_value = ('creds', 'project_id')
 
         logs, metadata1 = self.stackdriver_task_handler.read(self.ti, 3, {'download_logs': True})
 
-        assert ['MSG1\nMSG2\nMSG3\nMSG4'] == logs
+        assert [(('default-hostname', 'MSG1\nMSG2\nMSG3\nMSG4'),)] == logs
         assert [{'end_of_log': True}] == metadata1
 
     @mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id')
-    @mock.patch(
-        'airflow.providers.google.cloud.log.stackdriver_task_handler.gcp_logging.Client',
-        **{'return_value.project': 'asf-project'},  # type: ignore
-    )
+    @mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.LoggingServiceV2Client')
     def test_should_read_logs_with_custom_resources(self, mock_client, mock_get_creds_and_project_id):
         mock_get_creds_and_project_id.return_value = ('creds', 'project_id')
         resource = Resource(
@@ -226,31 +286,37 @@ class TestStackdriverLoggingHandlerTask(unittest.TestCase):
             labels={
                 "environment.name": 'test-instancce',
                 "location": 'europpe-west-3',
-                "project_id": "asf-project",
+                "project_id": "project_id",
             },
         )
         self.stackdriver_task_handler = StackdriverTaskHandler(
             transport=self.transport_mock, resource=resource
         )
 
-        entry = mock.MagicMock(payload={"message": "TEXT"})
-        page = [entry, entry]
-        mock_client.return_value.list_entries.return_value.pages = (n for n in [page])
-        mock_client.return_value.list_entries.return_value.next_page_token = None
+        entry = mock.MagicMock(json_payload={"message": "TEXT"})
+        page = mock.MagicMock(entries=[entry, entry], next_page_token=None)
+        mock_client.return_value.list_log_entries.return_value.pages = (n for n in [page])
 
         logs, metadata = self.stackdriver_task_handler.read(self.ti)
-        mock_client.return_value.list_entries.assert_called_once_with(
-            filter_='resource.type="cloud_composer_environment"\n'
-            'logName="projects/asf-project/logs/airflow"\n'
-            'resource.labels."environment.name"="test-instancce"\n'
-            'resource.labels.location="europpe-west-3"\n'
-            'resource.labels.project_id="asf-project"\n'
-            'labels.task_id="task_for_testing_file_log_handler"\n'
-            'labels.dag_id="dag_for_testing_file_task_handler"\n'
-            'labels.execution_date="2016-01-01T00:00:00+00:00"',
-            page_token=None,
+        mock_client.return_value.list_log_entries.assert_called_once_with(
+            request=ListLogEntriesRequest(
+                resource_names=["projects/project_id"],
+                filter=(
+                    'resource.type="cloud_composer_environment"\n'
+                    'logName="projects/project_id/logs/airflow"\n'
+                    'resource.labels."environment.name"="test-instancce"\n'
+                    'resource.labels.location="europpe-west-3"\n'
+                    'resource.labels.project_id="project_id"\n'
+                    'labels.task_id="task_for_testing_file_log_handler"\n'
+                    'labels.dag_id="dag_for_testing_file_task_handler"\n'
+                    'labels.execution_date="2016-01-01T00:00:00+00:00"'
+                ),
+                order_by='timestamp asc',
+                page_size=1000,
+                page_token=None,
+            )
         )
-        assert ['TEXT\nTEXT'] == logs
+        assert [(('default-hostname', 'TEXT\nTEXT'),)] == logs
         assert [{'end_of_log': True}] == metadata
 
     @mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id')
@@ -278,10 +344,9 @@ class TestStackdriverLoggingHandlerTask(unittest.TestCase):
         assert mock_client.return_value == client
 
     @mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id')
-    @mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.gcp_logging.Client')
+    @mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.LoggingServiceV2Client')
     def test_should_return_valid_external_url(self, mock_client, mock_get_creds_and_project_id):
         mock_get_creds_and_project_id.return_value = ('creds', 'project_id')
-        mock_client.return_value.project = 'project_id'
 
         stackdriver_task_handler = StackdriverTaskHandler(
             gcp_key_path="KEY_PATH",