You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by on...@apache.org on 2023/06/26 23:56:25 UTC

[airflow] branch main updated: rewrite method used in ecs to fetch less logs (#31786)

This is an automated email from the ASF dual-hosted git repository.

onikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e4eb19866e rewrite method used in ecs to fetch less logs (#31786)
e4eb19866e is described below

commit e4eb19866eeddc5839fda0b444849b01ea1dbc39
Author: Raphaƫl Vandon <va...@amazon.com>
AuthorDate: Mon Jun 26 16:56:17 2023 -0700

    rewrite method used in ecs to fetch less logs (#31786)
    
    The current behavior is to fetch all logs and only keep the last message(s)
    This is wasteful, as there is an option to fetch from the end directly, allowing to send only the minimum number of requests.
    Since a generator is used in get_log_events, stopping the iteration after we've collected enough logs prevents it from doing more requests.
    
    With this change, we can expect less API calls & faster execution time for this method, especially in tasks that emit a log of logs.
---
 airflow/providers/amazon/aws/hooks/logs.py         | 23 +++++++++--
 .../providers/amazon/aws/utils/task_log_fetcher.py | 14 ++++++-
 .../amazon/aws/utils/test_task_log_fetcher.py      | 45 ++++++++--------------
 3 files changed, 49 insertions(+), 33 deletions(-)

diff --git a/airflow/providers/amazon/aws/hooks/logs.py b/airflow/providers/amazon/aws/hooks/logs.py
index 2dff0aaaf3..6d006a95a3 100644
--- a/airflow/providers/amazon/aws/hooks/logs.py
+++ b/airflow/providers/amazon/aws/hooks/logs.py
@@ -21,8 +21,10 @@ functionality for interacting with AWS CloudWatch.
 """
 from __future__ import annotations
 
+import warnings
 from typing import Generator
 
+from airflow.exceptions import AirflowProviderDeprecationWarning
 from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
 
 # Guidance received from the AWS team regarding the correct way to check for the end of a stream is that the
@@ -62,7 +64,7 @@ class AwsLogsHook(AwsBaseHook):
         log_stream_name: str,
         start_time: int = 0,
         skip: int = 0,
-        start_from_head: bool = True,
+        start_from_head: bool | None = None,
         continuation_token: ContinuationToken | None = None,
     ) -> Generator:
         """
@@ -77,8 +79,8 @@ class AwsLogsHook(AwsBaseHook):
         :param start_time: The time stamp value to start reading the logs from (default: 0).
         :param skip: The number of log entries to skip at the start (default: 0).
             This is for when there are multiple entries at the same timestamp.
-        :param start_from_head: whether to start from the beginning (True) of the log or
-            at the end of the log (False).
+        :param start_from_head: Deprecated. Do not use with False, logs would be retrieved out of order.
+            If possible, retrieve logs in one query, or implement pagination yourself.
         :param continuation_token: a token indicating where to read logs from.
             Will be updated as this method reads new logs, to be reused in subsequent calls.
         :return: | A CloudWatch log event with the following key-value pairs:
@@ -86,6 +88,21 @@ class AwsLogsHook(AwsBaseHook):
                  |   'message' (str): The log event data.
                  |   'ingestionTime' (int): The time in milliseconds the event was ingested.
         """
+        if start_from_head is not None:
+            message = (
+                "start_from_head is deprecated, please remove this parameter."
+                if start_from_head
+                else "Do not use this method with start_from_head=False, logs will be returned out of order. "
+                "If possible, retrieve logs in one query, or implement pagination yourself."
+            )
+            warnings.warn(
+                message,
+                AirflowProviderDeprecationWarning,
+                stacklevel=2,
+            )
+        else:
+            start_from_head = True
+
         if continuation_token is None:
             continuation_token = AwsLogsHook.ContinuationToken()
 
diff --git a/airflow/providers/amazon/aws/utils/task_log_fetcher.py b/airflow/providers/amazon/aws/utils/task_log_fetcher.py
index 22a5e5f2a1..fc33219d72 100644
--- a/airflow/providers/amazon/aws/utils/task_log_fetcher.py
+++ b/airflow/providers/amazon/aws/utils/task_log_fetcher.py
@@ -18,7 +18,6 @@
 from __future__ import annotations
 
 import time
-from collections import deque
 from datetime import datetime, timedelta
 from logging import Logger
 from threading import Event, Thread
@@ -95,7 +94,18 @@ class AwsTaskLogFetcher(Thread):
         return f"[{formatted_event_dt}] {message}"
 
     def get_last_log_messages(self, number_messages) -> list:
-        return [log["message"] for log in deque(self._get_log_events(), maxlen=number_messages)]
+        """
+        Gets the last logs messages in one single request, so restrictions apply:
+         - if logs are too old, the response will be empty
+         - the max number of messages we can retrieve is constrained by cloudwatch limits (10,000).
+        """
+        response = self.hook.conn.get_log_events(
+            logGroupName=self.log_group,
+            logStreamName=self.log_stream_name,
+            startFromHead=False,
+            limit=number_messages,
+        )
+        return [log["message"] for log in response["events"]]
 
     def get_last_log_message(self) -> str | None:
         try:
diff --git a/tests/providers/amazon/aws/utils/test_task_log_fetcher.py b/tests/providers/amazon/aws/utils/test_task_log_fetcher.py
index a5598ebf55..1170bb643f 100644
--- a/tests/providers/amazon/aws/utils/test_task_log_fetcher.py
+++ b/tests/providers/amazon/aws/utils/test_task_log_fetcher.py
@@ -120,41 +120,30 @@ class TestAwsTaskLogFetcher:
             ]
         )
 
-    @mock.patch(
-        "airflow.providers.amazon.aws.hooks.logs.AwsLogsHook.get_log_events",
-        return_value=(),
-    )
-    def test_get_last_log_message_with_no_log_events(self, mock_log_events):
+    @mock.patch.object(AwsLogsHook, "conn")
+    def test_get_last_log_message_with_no_log_events(self, mock_conn):
         assert self.log_fetcher.get_last_log_message() is None
 
-    @mock.patch(
-        "airflow.providers.amazon.aws.hooks.logs.AwsLogsHook.get_log_events",
-        return_value=iter(
-            [
-                {"timestamp": 1617400267123, "message": "First"},
-                {"timestamp": 1617400367456, "message": "Second"},
+    @mock.patch.object(AwsLogsHook, "conn")
+    def test_get_last_log_message_with_log_events(self, log_conn_mock):
+        log_conn_mock.get_log_events.return_value = {
+            "events": [
+                {"timestamp": 1617400267123, "message": "Last"},
             ]
-        ),
-    )
-    def test_get_last_log_message_with_log_events(self, mock_log_events):
-        assert self.log_fetcher.get_last_log_message() == "Second"
+        }
+        assert self.log_fetcher.get_last_log_message() == "Last"
 
-    @mock.patch(
-        "airflow.providers.amazon.aws.hooks.logs.AwsLogsHook.get_log_events",
-        return_value=iter(
-            [
+    @mock.patch.object(AwsLogsHook, "conn")
+    def test_get_last_log_messages_with_log_events(self, log_conn_mock):
+        log_conn_mock.get_log_events.return_value = {
+            "events": [
                 {"timestamp": 1617400267123, "message": "First"},
                 {"timestamp": 1617400367456, "message": "Second"},
                 {"timestamp": 1617400367458, "message": "Third"},
             ]
-        ),
-    )
-    def test_get_last_log_messages_with_log_events(self, mock_log_events):
-        assert self.log_fetcher.get_last_log_messages(2) == ["Second", "Third"]
+        }
+        assert self.log_fetcher.get_last_log_messages(2) == ["First", "Second", "Third"]
 
-    @mock.patch(
-        "airflow.providers.amazon.aws.hooks.logs.AwsLogsHook.get_log_events",
-        return_value=(),
-    )
-    def test_get_last_log_messages_with_no_log_events(self, mock_log_events):
+    @mock.patch.object(AwsLogsHook, "conn")
+    def test_get_last_log_messages_with_no_log_events(self, mock_conn):
         assert self.log_fetcher.get_last_log_messages(2) == []