You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2021/03/03 09:35:06 UTC
[airflow] 29/38: Support google-cloud-logging` >=2.0.0 (#13801)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 191d69ae63b43706f1823944c99366749221f4a7
Author: Kamil BreguĊa <mi...@users.noreply.github.com>
AuthorDate: Wed Feb 3 04:16:50 2021 +0100
Support google-cloud-logging` >=2.0.0 (#13801)
(cherry picked from commit 0e8c77b93a5ca5ecfdcd1c4bd91f54846fc15d57)
---
airflow/providers/google/ADDITIONAL_INFO.md | 1 +
.../google/cloud/log/stackdriver_task_handler.py | 72 +++++--
setup.py | 2 +-
.../cloud/log/test_stackdriver_task_handler.py | 225 +++++++++++++--------
4 files changed, 200 insertions(+), 100 deletions(-)
diff --git a/airflow/providers/google/ADDITIONAL_INFO.md b/airflow/providers/google/ADDITIONAL_INFO.md
index 9cf9853..a363051 100644
--- a/airflow/providers/google/ADDITIONAL_INFO.md
+++ b/airflow/providers/google/ADDITIONAL_INFO.md
@@ -34,6 +34,7 @@ Details are covered in the UPDATING.md files for each library, but there are som
| [``google-cloud-datacatalog``](https://pypi.org/project/google-cloud-datacatalog/) | ``>=0.5.0,<0.8`` | ``>=3.0.0,<4.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-datacatalog/blob/master/UPGRADING.md) |
| [``google-cloud-dataproc``](https://pypi.org/project/google-cloud-dataproc/) | ``>=1.0.1,<2.0.0`` | ``>=2.2.0,<3.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-dataproc/blob/master/UPGRADING.md) |
| [``google-cloud-kms``](https://pypi.org/project/google-cloud-os-login/) | ``>=1.2.1,<2.0.0`` | ``>=2.0.0,<3.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-kms/blob/master/UPGRADING.md) |
+| [``google-cloud-logging``](https://pypi.org/project/google-cloud-logging/) | ``>=1.14.0,<2.0.0`` | ``>=2.0.0,<3.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-logging/blob/master/UPGRADING.md) |
| [``google-cloud-monitoring``](https://pypi.org/project/google-cloud-monitoring/) | ``>=0.34.0,<2.0.0`` | ``>=2.0.0,<3.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-monitoring/blob/master/UPGRADING.md) |
| [``google-cloud-os-login``](https://pypi.org/project/google-cloud-os-login/) | ``>=1.0.0,<2.0.0`` | ``>=2.0.0,<3.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-oslogin/blob/master/UPGRADING.md) |
| [``google-cloud-pubsub``](https://pypi.org/project/google-cloud-pubsub/) | ``>=1.0.0,<2.0.0`` | ``>=2.0.0,<3.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-pubsub/blob/master/UPGRADING.md) |
diff --git a/airflow/providers/google/cloud/log/stackdriver_task_handler.py b/airflow/providers/google/cloud/log/stackdriver_task_handler.py
index be75fcd..5479185 100644
--- a/airflow/providers/google/cloud/log/stackdriver_task_handler.py
+++ b/airflow/providers/google/cloud/log/stackdriver_task_handler.py
@@ -21,9 +21,12 @@ from urllib.parse import urlencode
from cached_property import cached_property
from google.api_core.gapic_v1.client_info import ClientInfo
+from google.auth.credentials import Credentials
from google.cloud import logging as gcp_logging
+from google.cloud.logging import Resource
from google.cloud.logging.handlers.transports import BackgroundThreadTransport, Transport
-from google.cloud.logging.resource import Resource
+from google.cloud.logging_v2.services.logging_service_v2 import LoggingServiceV2Client
+from google.cloud.logging_v2.types import ListLogEntriesRequest, ListLogEntriesResponse
from airflow import version
from airflow.models import TaskInstance
@@ -99,13 +102,19 @@ class StackdriverTaskHandler(logging.Handler):
self.resource: Resource = resource
self.labels: Optional[Dict[str, str]] = labels
self.task_instance_labels: Optional[Dict[str, str]] = {}
+ self.task_instance_hostname = 'default-hostname'
@cached_property
- def _client(self) -> gcp_logging.Client:
- """Google Cloud Library API client"""
+ def _credentials_and_project(self) -> Tuple[Credentials, str]:
credentials, project = get_credentials_and_project_id(
key_path=self.gcp_key_path, scopes=self.scopes, disable_logging=True
)
+ return credentials, project
+
+ @property
+ def _client(self) -> gcp_logging.Client:
+ """The Cloud Library API client"""
+ credentials, project = self._credentials_and_project
client = gcp_logging.Client(
credentials=credentials,
project=project,
@@ -113,6 +122,16 @@ class StackdriverTaskHandler(logging.Handler):
)
return client
+ @property
+ def _logging_service_client(self) -> LoggingServiceV2Client:
+ """The Cloud logging service v2 client."""
+ credentials, _ = self._credentials_and_project
+ client = LoggingServiceV2Client(
+ credentials=credentials,
+ client_info=ClientInfo(client_library_version='airflow_v' + version.version),
+ )
+ return client
+
@cached_property
def _transport(self) -> Transport:
"""Object responsible for sending data to Stackdriver"""
@@ -146,10 +165,11 @@ class StackdriverTaskHandler(logging.Handler):
:type task_instance: :class:`airflow.models.TaskInstance`
"""
self.task_instance_labels = self._task_instance_to_labels(task_instance)
+ self.task_instance_hostname = task_instance.hostname
def read(
self, task_instance: TaskInstance, try_number: Optional[int] = None, metadata: Optional[Dict] = None
- ) -> Tuple[List[str], List[Dict]]:
+ ) -> Tuple[List[Tuple[Tuple[str, str]]], List[Dict[str, str]]]:
"""
Read logs of given task instance from Stackdriver logging.
@@ -160,12 +180,14 @@ class StackdriverTaskHandler(logging.Handler):
:type try_number: Optional[int]
:param metadata: log metadata. It is used for steaming log reading and auto-tailing.
:type metadata: Dict
- :return: a tuple of list of logs and list of metadata
- :rtype: Tuple[List[str], List[Dict]]
+ :return: a tuple of (
+ list of (one element tuple with two element tuple - hostname and logs)
+ and list of metadata)
+ :rtype: Tuple[List[Tuple[Tuple[str, str]]], List[Dict[str, str]]]
"""
if try_number is not None and try_number < 1:
- logs = [f"Error fetching the logs. Try number {try_number} is invalid."]
- return logs, [{"end_of_log": "true"}]
+ logs = f"Error fetching the logs. Try number {try_number} is invalid."
+ return [((self.task_instance_hostname, logs),)], [{"end_of_log": "true"}]
if not metadata:
metadata = {}
@@ -188,7 +210,7 @@ class StackdriverTaskHandler(logging.Handler):
if next_page_token:
new_metadata['next_page_token'] = next_page_token
- return [messages], [new_metadata]
+ return [((self.task_instance_hostname, messages),)], [new_metadata]
def _prepare_log_filter(self, ti_labels: Dict[str, str]) -> str:
"""
@@ -210,9 +232,10 @@ class StackdriverTaskHandler(logging.Handler):
escaped_value = value.replace("\\", "\\\\").replace('"', '\\"')
return f'"{escaped_value}"'
+ _, project = self._credentials_and_project
log_filters = [
f'resource.type={escale_label_value(self.resource.type)}',
- f'logName="projects/{self._client.project}/logs/{self.name}"',
+ f'logName="projects/{project}/logs/{self.name}"',
]
for key, value in self.resource.labels.items():
@@ -252,6 +275,8 @@ class StackdriverTaskHandler(logging.Handler):
log_filter=log_filter, page_token=next_page_token
)
messages.append(new_messages)
+ if not messages:
+ break
end_of_log = True
next_page_token = None
@@ -271,15 +296,21 @@ class StackdriverTaskHandler(logging.Handler):
:return: Downloaded logs and next page token
:rtype: Tuple[str, str]
"""
- entries = self._client.list_entries(filter_=log_filter, page_token=page_token)
- page = next(entries.pages)
- next_page_token = entries.next_page_token
+ _, project = self._credentials_and_project
+ request = ListLogEntriesRequest(
+ resource_names=[f'projects/{project}'],
+ filter=log_filter,
+ page_token=page_token,
+ order_by='timestamp asc',
+ page_size=1000,
+ )
+ response = self._logging_service_client.list_log_entries(request=request)
+ page: ListLogEntriesResponse = next(response.pages)
messages = []
- for entry in page:
- if "message" in entry.payload:
- messages.append(entry.payload["message"])
-
- return "\n".join(messages), next_page_token
+ for entry in page.entries:
+ if "message" in entry.json_payload:
+ messages.append(entry.json_payload["message"])
+ return "\n".join(messages), page.next_page_token
@classmethod
def _task_instance_to_labels(cls, ti: TaskInstance) -> Dict[str, str]:
@@ -315,7 +346,7 @@ class StackdriverTaskHandler(logging.Handler):
:return: URL to the external log collection service
:rtype: str
"""
- project_id = self._client.project
+ _, project_id = self._credentials_and_project
ti_labels = self._task_instance_to_labels(task_instance)
ti_labels[self.LABEL_TRY_NUMBER] = str(try_number)
@@ -331,3 +362,6 @@ class StackdriverTaskHandler(logging.Handler):
url = f"{self.LOG_VIEWER_BASE_URL}?{urlencode(url_query_string)}"
return url
+
+ def close(self) -> None:
+ self._transport.flush()
diff --git a/setup.py b/setup.py
index fa1e73a..7beb684 100644
--- a/setup.py
+++ b/setup.py
@@ -292,7 +292,7 @@ google = [
'google-cloud-dlp>=0.11.0,<2.0.0',
'google-cloud-kms>=2.0.0,<3.0.0',
'google-cloud-language>=1.1.1,<2.0.0',
- 'google-cloud-logging>=1.14.0,<2.0.0',
+ 'google-cloud-logging>=2.1.1,<3.0.0',
'google-cloud-memcache>=0.2.0',
'google-cloud-monitoring>=2.0.0,<3.0.0',
'google-cloud-os-login>=2.0.0,<3.0.0',
diff --git a/tests/providers/google/cloud/log/test_stackdriver_task_handler.py b/tests/providers/google/cloud/log/test_stackdriver_task_handler.py
index 4159e9e..b4dbf69 100644
--- a/tests/providers/google/cloud/log/test_stackdriver_task_handler.py
+++ b/tests/providers/google/cloud/log/test_stackdriver_task_handler.py
@@ -21,7 +21,8 @@ from datetime import datetime
from unittest import mock
from urllib.parse import parse_qs, urlparse
-from google.cloud.logging.resource import Resource
+from google.cloud.logging import Resource
+from google.cloud.logging_v2.types import ListLogEntriesRequest, ListLogEntriesResponse, LogEntry
from airflow.models import TaskInstance
from airflow.models.dag import DAG
@@ -30,15 +31,27 @@ from airflow.providers.google.cloud.log.stackdriver_task_handler import Stackdri
from airflow.utils.state import State
-def _create_list_response(messages, token):
- page = [mock.MagicMock(payload={"message": message}) for message in messages]
- return mock.MagicMock(pages=(n for n in [page]), next_page_token=token)
+def _create_list_log_entries_response_mock(messages, token):
+ return ListLogEntriesResponse(
+ entries=[LogEntry(json_payload={"message": message}) for message in messages], next_page_token=token
+ )
+
+
+def _remove_stackdriver_handlers():
+ for handler_ref in reversed(logging._handlerList[:]):
+ handler = handler_ref()
+ if not isinstance(handler, StackdriverTaskHandler):
+ continue
+ logging._removeHandlerRef(handler_ref)
+ del handler
class TestStackdriverLoggingHandlerStandalone(unittest.TestCase):
@mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id')
@mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.gcp_logging.Client')
def test_should_pass_message_to_client(self, mock_client, mock_get_creds_and_project_id):
+ self.addCleanup(_remove_stackdriver_handlers)
+
mock_get_creds_and_project_id.return_value = ('creds', 'project_id')
transport_type = mock.MagicMock()
@@ -69,6 +82,7 @@ class TestStackdriverLoggingHandlerTask(unittest.TestCase):
self.ti.try_number = 1
self.ti.state = State.RUNNING
self.addCleanup(self.dag.clear)
+ self.addCleanup(_remove_stackdriver_handlers)
@mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id')
@mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.gcp_logging.Client')
@@ -118,107 +132,153 @@ class TestStackdriverLoggingHandlerTask(unittest.TestCase):
)
@mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id')
- @mock.patch(
- 'airflow.providers.google.cloud.log.stackdriver_task_handler.gcp_logging.Client',
- **{'return_value.project': 'asf-project'}, # type: ignore
- )
+ @mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.LoggingServiceV2Client')
def test_should_read_logs_for_all_try(self, mock_client, mock_get_creds_and_project_id):
- mock_client.return_value.list_entries.return_value = _create_list_response(["MSG1", "MSG2"], None)
+ mock_client.return_value.list_log_entries.return_value.pages = iter(
+ [_create_list_log_entries_response_mock(["MSG1", "MSG2"], None)]
+ )
mock_get_creds_and_project_id.return_value = ('creds', 'project_id')
logs, metadata = self.stackdriver_task_handler.read(self.ti)
- mock_client.return_value.list_entries.assert_called_once_with(
- filter_='resource.type="global"\n'
- 'logName="projects/asf-project/logs/airflow"\n'
- 'labels.task_id="task_for_testing_file_log_handler"\n'
- 'labels.dag_id="dag_for_testing_file_task_handler"\n'
- 'labels.execution_date="2016-01-01T00:00:00+00:00"',
- page_token=None,
+ mock_client.return_value.list_log_entries.assert_called_once_with(
+ request=ListLogEntriesRequest(
+ resource_names=["projects/project_id"],
+ filter=(
+ 'resource.type="global"\n'
+ 'logName="projects/project_id/logs/airflow"\n'
+ 'labels.task_id="task_for_testing_file_log_handler"\n'
+ 'labels.dag_id="dag_for_testing_file_task_handler"\n'
+ 'labels.execution_date="2016-01-01T00:00:00+00:00"'
+ ),
+ order_by='timestamp asc',
+ page_size=1000,
+ page_token=None,
+ )
)
- assert ['MSG1\nMSG2'] == logs
+ assert [(('default-hostname', 'MSG1\nMSG2'),)] == logs
assert [{'end_of_log': True}] == metadata
@mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id')
- @mock.patch(
- 'airflow.providers.google.cloud.log.stackdriver_task_handler.gcp_logging.Client',
- **{'return_value.project': 'asf-project'}, # type: ignore
- )
+ @mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.LoggingServiceV2Client')
def test_should_read_logs_for_task_with_quote(self, mock_client, mock_get_creds_and_project_id):
- mock_client.return_value.list_entries.return_value = _create_list_response(["MSG1", "MSG2"], None)
+ mock_client.return_value.list_log_entries.return_value.pages = iter(
+ [_create_list_log_entries_response_mock(["MSG1", "MSG2"], None)]
+ )
mock_get_creds_and_project_id.return_value = ('creds', 'project_id')
self.ti.task_id = "K\"OT"
logs, metadata = self.stackdriver_task_handler.read(self.ti)
- mock_client.return_value.list_entries.assert_called_once_with(
- filter_='resource.type="global"\n'
- 'logName="projects/asf-project/logs/airflow"\n'
- 'labels.task_id="K\\"OT"\n'
- 'labels.dag_id="dag_for_testing_file_task_handler"\n'
- 'labels.execution_date="2016-01-01T00:00:00+00:00"',
- page_token=None,
+ mock_client.return_value.list_log_entries.assert_called_once_with(
+ request=ListLogEntriesRequest(
+ resource_names=["projects/project_id"],
+ filter=(
+ 'resource.type="global"\n'
+ 'logName="projects/project_id/logs/airflow"\n'
+ 'labels.task_id="K\\"OT"\n'
+ 'labels.dag_id="dag_for_testing_file_task_handler"\n'
+ 'labels.execution_date="2016-01-01T00:00:00+00:00"'
+ ),
+ order_by='timestamp asc',
+ page_size=1000,
+ page_token=None,
+ )
)
- assert ['MSG1\nMSG2'] == logs
+ assert [(('default-hostname', 'MSG1\nMSG2'),)] == logs
assert [{'end_of_log': True}] == metadata
@mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id')
- @mock.patch(
- 'airflow.providers.google.cloud.log.stackdriver_task_handler.gcp_logging.Client',
- **{'return_value.project': 'asf-project'}, # type: ignore
- )
+ @mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.LoggingServiceV2Client')
def test_should_read_logs_for_single_try(self, mock_client, mock_get_creds_and_project_id):
- mock_client.return_value.list_entries.return_value = _create_list_response(["MSG1", "MSG2"], None)
+ mock_client.return_value.list_log_entries.return_value.pages = iter(
+ [_create_list_log_entries_response_mock(["MSG1", "MSG2"], None)]
+ )
mock_get_creds_and_project_id.return_value = ('creds', 'project_id')
logs, metadata = self.stackdriver_task_handler.read(self.ti, 3)
- mock_client.return_value.list_entries.assert_called_once_with(
- filter_='resource.type="global"\n'
- 'logName="projects/asf-project/logs/airflow"\n'
- 'labels.task_id="task_for_testing_file_log_handler"\n'
- 'labels.dag_id="dag_for_testing_file_task_handler"\n'
- 'labels.execution_date="2016-01-01T00:00:00+00:00"\n'
- 'labels.try_number="3"',
- page_token=None,
+ mock_client.return_value.list_log_entries.assert_called_once_with(
+ request=ListLogEntriesRequest(
+ resource_names=["projects/project_id"],
+ filter=(
+ 'resource.type="global"\n'
+ 'logName="projects/project_id/logs/airflow"\n'
+ 'labels.task_id="task_for_testing_file_log_handler"\n'
+ 'labels.dag_id="dag_for_testing_file_task_handler"\n'
+ 'labels.execution_date="2016-01-01T00:00:00+00:00"\n'
+ 'labels.try_number="3"'
+ ),
+ order_by='timestamp asc',
+ page_size=1000,
+ page_token=None,
+ )
)
- assert ['MSG1\nMSG2'] == logs
+ assert [(('default-hostname', 'MSG1\nMSG2'),)] == logs
assert [{'end_of_log': True}] == metadata
@mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id')
- @mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.gcp_logging.Client')
+ @mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.LoggingServiceV2Client')
def test_should_read_logs_with_pagination(self, mock_client, mock_get_creds_and_project_id):
- mock_client.return_value.list_entries.side_effect = [
- _create_list_response(["MSG1", "MSG2"], "TOKEN1"),
- _create_list_response(["MSG3", "MSG4"], None),
+ mock_client.return_value.list_log_entries.side_effect = [
+ mock.MagicMock(pages=iter([_create_list_log_entries_response_mock(["MSG1", "MSG2"], "TOKEN1")])),
+ mock.MagicMock(pages=iter([_create_list_log_entries_response_mock(["MSG3", "MSG4"], None)])),
]
mock_get_creds_and_project_id.return_value = ('creds', 'project_id')
logs, metadata1 = self.stackdriver_task_handler.read(self.ti, 3)
- mock_client.return_value.list_entries.assert_called_once_with(filter_=mock.ANY, page_token=None)
- assert ['MSG1\nMSG2'] == logs
+ mock_client.return_value.list_log_entries.assert_called_once_with(
+ request=ListLogEntriesRequest(
+ resource_names=["projects/project_id"],
+ filter=(
+ '''resource.type="global"
+logName="projects/project_id/logs/airflow"
+labels.task_id="task_for_testing_file_log_handler"
+labels.dag_id="dag_for_testing_file_task_handler"
+labels.execution_date="2016-01-01T00:00:00+00:00"
+labels.try_number="3"'''
+ ),
+ order_by='timestamp asc',
+ page_size=1000,
+ page_token=None,
+ )
+ )
+ assert [(('default-hostname', 'MSG1\nMSG2'),)] == logs
assert [{'end_of_log': False, 'next_page_token': 'TOKEN1'}] == metadata1
- mock_client.return_value.list_entries.return_value.next_page_token = None
+ mock_client.return_value.list_log_entries.return_value.next_page_token = None
logs, metadata2 = self.stackdriver_task_handler.read(self.ti, 3, metadata1[0])
- mock_client.return_value.list_entries.assert_called_with(filter_=mock.ANY, page_token="TOKEN1")
- assert ['MSG3\nMSG4'] == logs
+
+ mock_client.return_value.list_log_entries.assert_called_with(
+ request=ListLogEntriesRequest(
+ resource_names=["projects/project_id"],
+ filter=(
+ 'resource.type="global"\n'
+ 'logName="projects/project_id/logs/airflow"\n'
+ 'labels.task_id="task_for_testing_file_log_handler"\n'
+ 'labels.dag_id="dag_for_testing_file_task_handler"\n'
+ 'labels.execution_date="2016-01-01T00:00:00+00:00"\n'
+ 'labels.try_number="3"'
+ ),
+ order_by='timestamp asc',
+ page_size=1000,
+ page_token="TOKEN1",
+ )
+ )
+ assert [(('default-hostname', 'MSG3\nMSG4'),)] == logs
assert [{'end_of_log': True}] == metadata2
@mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id')
- @mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.gcp_logging.Client')
+ @mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.LoggingServiceV2Client')
def test_should_read_logs_with_download(self, mock_client, mock_get_creds_and_project_id):
- mock_client.return_value.list_entries.side_effect = [
- _create_list_response(["MSG1", "MSG2"], "TOKEN1"),
- _create_list_response(["MSG3", "MSG4"], None),
+ mock_client.return_value.list_log_entries.side_effect = [
+ mock.MagicMock(pages=iter([_create_list_log_entries_response_mock(["MSG1", "MSG2"], "TOKEN1")])),
+ mock.MagicMock(pages=iter([_create_list_log_entries_response_mock(["MSG3", "MSG4"], None)])),
]
mock_get_creds_and_project_id.return_value = ('creds', 'project_id')
logs, metadata1 = self.stackdriver_task_handler.read(self.ti, 3, {'download_logs': True})
- assert ['MSG1\nMSG2\nMSG3\nMSG4'] == logs
+ assert [(('default-hostname', 'MSG1\nMSG2\nMSG3\nMSG4'),)] == logs
assert [{'end_of_log': True}] == metadata1
@mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id')
- @mock.patch(
- 'airflow.providers.google.cloud.log.stackdriver_task_handler.gcp_logging.Client',
- **{'return_value.project': 'asf-project'}, # type: ignore
- )
+ @mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.LoggingServiceV2Client')
def test_should_read_logs_with_custom_resources(self, mock_client, mock_get_creds_and_project_id):
mock_get_creds_and_project_id.return_value = ('creds', 'project_id')
resource = Resource(
@@ -226,31 +286,37 @@ class TestStackdriverLoggingHandlerTask(unittest.TestCase):
labels={
"environment.name": 'test-instancce',
"location": 'europpe-west-3',
- "project_id": "asf-project",
+ "project_id": "project_id",
},
)
self.stackdriver_task_handler = StackdriverTaskHandler(
transport=self.transport_mock, resource=resource
)
- entry = mock.MagicMock(payload={"message": "TEXT"})
- page = [entry, entry]
- mock_client.return_value.list_entries.return_value.pages = (n for n in [page])
- mock_client.return_value.list_entries.return_value.next_page_token = None
+ entry = mock.MagicMock(json_payload={"message": "TEXT"})
+ page = mock.MagicMock(entries=[entry, entry], next_page_token=None)
+ mock_client.return_value.list_log_entries.return_value.pages = (n for n in [page])
logs, metadata = self.stackdriver_task_handler.read(self.ti)
- mock_client.return_value.list_entries.assert_called_once_with(
- filter_='resource.type="cloud_composer_environment"\n'
- 'logName="projects/asf-project/logs/airflow"\n'
- 'resource.labels."environment.name"="test-instancce"\n'
- 'resource.labels.location="europpe-west-3"\n'
- 'resource.labels.project_id="asf-project"\n'
- 'labels.task_id="task_for_testing_file_log_handler"\n'
- 'labels.dag_id="dag_for_testing_file_task_handler"\n'
- 'labels.execution_date="2016-01-01T00:00:00+00:00"',
- page_token=None,
+ mock_client.return_value.list_log_entries.assert_called_once_with(
+ request=ListLogEntriesRequest(
+ resource_names=["projects/project_id"],
+ filter=(
+ 'resource.type="cloud_composer_environment"\n'
+ 'logName="projects/project_id/logs/airflow"\n'
+ 'resource.labels."environment.name"="test-instancce"\n'
+ 'resource.labels.location="europpe-west-3"\n'
+ 'resource.labels.project_id="project_id"\n'
+ 'labels.task_id="task_for_testing_file_log_handler"\n'
+ 'labels.dag_id="dag_for_testing_file_task_handler"\n'
+ 'labels.execution_date="2016-01-01T00:00:00+00:00"'
+ ),
+ order_by='timestamp asc',
+ page_size=1000,
+ page_token=None,
+ )
)
- assert ['TEXT\nTEXT'] == logs
+ assert [(('default-hostname', 'TEXT\nTEXT'),)] == logs
assert [{'end_of_log': True}] == metadata
@mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id')
@@ -278,10 +344,9 @@ class TestStackdriverLoggingHandlerTask(unittest.TestCase):
assert mock_client.return_value == client
@mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id')
- @mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.gcp_logging.Client')
+ @mock.patch('airflow.providers.google.cloud.log.stackdriver_task_handler.LoggingServiceV2Client')
def test_should_return_valid_external_url(self, mock_client, mock_get_creds_and_project_id):
mock_get_creds_and_project_id.return_value = ('creds', 'project_id')
- mock_client.return_value.project = 'project_id'
stackdriver_task_handler = StackdriverTaskHandler(
gcp_key_path="KEY_PATH",