You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by el...@apache.org on 2023/10/01 09:14:22 UTC

[airflow] branch main updated: Remove duplicated logs by reusing PodLogsConsumer (#34127)

This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new cbb04e5133 Remove duplicated logs by reusing PodLogsConsumer (#34127)
cbb04e5133 is described below

commit cbb04e513352e237baf6075ff8c6a59cc88d1122
Author: Dongwon Kim <dk...@gmail.com>
AuthorDate: Sun Oct 1 18:14:11 2023 +0900

    Remove duplicated logs by reusing PodLogsConsumer (#34127)
    
    * Remove duplicated logs by reusing PodLogsConsumer
    
    * reuse the kubernetes connection not to re-consume the logs
      * If a failure occurs while consuming logs through `PodLogsConsumer`, a new `PodLogsConsumer` is created.
      * But, at this time there are duplicated logs even though they have already been consumed.
      * To fix the duplicated logs, `PodLogsConsumer` instance is created initially at once and is to reused
        when a failure occurs to prevent duplicate logs from occurring.
    
    Signed-off-by: 김동원 <do...@toss.im>
    
    * Move the try inside the connection creation
    
    Signed-off-by: 김동원 <do...@toss.im>
    
    * Validate reusing PodLogsConsumer instance
    
    Signed-off-by: 김동원 <do...@toss.im>
    
    * Check there are duplicated logs
    
    Signed-off-by: 김동원 <do...@toss.im>
    
    ---------
    
    Signed-off-by: 김동원 <do...@toss.im>
    Co-authored-by: 김동원 <do...@toss.im>
    Co-authored-by: Elad Kalif <45...@users.noreply.github.com>
---
 .../providers/cncf/kubernetes/utils/pod_manager.py | 39 +++++++++++++---------
 .../cncf/kubernetes/utils/test_pod_manager.py      | 25 ++++++++++++++
 2 files changed, 49 insertions(+), 15 deletions(-)

diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index cfb262cdec..05829a1615 100644
--- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -392,8 +392,12 @@ class PodManager(LoggingMixin):
             before=before_log(self.log, logging.INFO),
         )
         def consume_logs(
-            *, since_time: DateTime | None = None, follow: bool = True, termination_timeout: int = 120
-        ) -> DateTime | None:
+            *,
+            since_time: DateTime | None = None,
+            follow: bool = True,
+            termination_timeout: int = 120,
+            logs: PodLogsConsumer | None,
+        ) -> tuple[DateTime | None, PodLogsConsumer | None]:
             """
             Tries to follow container logs until container completes.
 
@@ -404,16 +408,17 @@ class PodManager(LoggingMixin):
             """
             last_captured_timestamp = None
             try:
-                logs = self.read_pod_logs(
-                    pod=pod,
-                    container_name=container_name,
-                    timestamps=True,
-                    since_seconds=(
-                        math.ceil((pendulum.now() - since_time).total_seconds()) if since_time else None
-                    ),
-                    follow=follow,
-                    post_termination_timeout=termination_timeout,
-                )
+                if not logs:
+                    logs = self.read_pod_logs(
+                        pod=pod,
+                        container_name=container_name,
+                        timestamps=True,
+                        since_seconds=(
+                            math.ceil((pendulum.now() - since_time).total_seconds()) if since_time else None
+                        ),
+                        follow=follow,
+                        post_termination_timeout=post_termination_timeout,
+                    )
                 for raw_line in logs:
                     line = raw_line.decode("utf-8", errors="backslashreplace")
                     line_timestamp, message = self.parse_log_line(line)
@@ -434,15 +439,19 @@ class PodManager(LoggingMixin):
                     pod.metadata.name,
                     exc_info=True,
                 )
-            return last_captured_timestamp or since_time
+            return last_captured_timestamp or since_time, logs
 
         # note: `read_pod_logs` follows the logs, so we shouldn't necessarily *need* to
         # loop as we do here. But in a long-running process we might temporarily lose connectivity.
         # So the looping logic is there to let us resume following the logs.
+        logs = None
         last_log_time = since_time
         while True:
-            last_log_time = consume_logs(
-                since_time=last_log_time, follow=follow, termination_timeout=post_termination_timeout
+            last_log_time, logs = consume_logs(
+                since_time=last_log_time,
+                follow=follow,
+                termination_timeout=post_termination_timeout,
+                logs=logs,
             )
             if not self.container_is_running(pod, container_name=container_name):
                 return PodLoggingStatus(running=False, last_log_time=last_log_time)
diff --git a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
index dfe06d9a74..42e2a5c610 100644
--- a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
+++ b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
@@ -284,6 +284,31 @@ class TestPodManager:
         self.pod_manager.fetch_container_logs(mock.MagicMock(), mock.MagicMock(), follow=True)
         self.mock_progress_callback.assert_has_calls([mock.call(message), mock.call(no_ts_message)])
 
+    @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running")
+    def test_fetch_container_logs_failures(self, mock_container_is_running):
+        last_timestamp_string = "2020-10-08T14:18:17.793417674Z"
+        messages = [
+            bytes("2020-10-08T14:16:17.793417674Z message", "utf-8"),
+            bytes("2020-10-08T14:17:17.793417674Z message", "utf-8"),
+            None,
+            bytes(f"{last_timestamp_string} message", "utf-8"),
+        ]
+        expected_call_count = len([message for message in messages if message is not None])
+
+        def consumer_iter():
+            while messages:
+                message = messages.pop(0)
+                if message is None:
+                    raise BaseHTTPError("Boom")
+                yield message
+
+        with mock.patch.object(PodLogsConsumer, "__iter__") as mock_consumer_iter:
+            mock_consumer_iter.side_effect = consumer_iter
+            mock_container_is_running.side_effect = [True, True, False]
+            status = self.pod_manager.fetch_container_logs(mock.MagicMock(), mock.MagicMock(), follow=True)
+        assert status.last_log_time == cast(DateTime, pendulum.parse(last_timestamp_string))
+        assert self.mock_progress_callback.call_count == expected_call_count
+
     def test_parse_invalid_log_line(self, caplog):
         with caplog.at_level(logging.INFO):
             self.pod_manager.parse_log_line("2020-10-08T14:16:17.793417674ZInvalidmessage\n")