You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by pa...@apache.org on 2024/04/15 15:22:19 UTC

(airflow) branch main updated: Avoid logging empty line KPO (#38247)

This is an automated email from the ASF dual-hosted git repository.

pankaj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 43919c2fa6 Avoid logging empty line KPO (#38247)
43919c2fa6 is described below

commit 43919c2fa6cbffd65239cb7fa3db2abb0545a260
Author: Pankaj Singh <98...@users.noreply.github.com>
AuthorDate: Mon Apr 15 20:52:11 2024 +0530

    Avoid logging empty line KPO (#38247)
    
    * Avoid logging empty line KPO
    
    * cleanup
    
    * Apply review suggestions
    
    * Apply review feedback
    
    * Update airflow/providers/cncf/kubernetes/operators/pod.py
---
 airflow/providers/cncf/kubernetes/operators/pod.py        |  3 ++-
 airflow/providers/cncf/kubernetes/utils/pod_manager.py    |  6 ++++--
 tests/providers/cncf/kubernetes/utils/test_pod_manager.py | 15 +++++++++++++++
 3 files changed, 21 insertions(+), 3 deletions(-)

diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py b/airflow/providers/cncf/kubernetes/operators/pod.py
index f0f74f42c4..a331987b57 100644
--- a/airflow/providers/cncf/kubernetes/operators/pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -791,7 +791,8 @@ class KubernetesPodOperator(BaseOperator):
             )
             for raw_line in logs:
                 line = raw_line.decode("utf-8", errors="backslashreplace").rstrip("\n")
-                self.log.info("Container logs: %s", line)
+                if line:
+                    self.log.info("Container logs: %s", line)
         except HTTPError as e:
             self.log.warning(
                 "Reading of logs interrupted with error %r; will retry. "
diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index 787757828d..66bbd5d05a 100644
--- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -464,7 +464,8 @@ class PodManager(LoggingMixin):
                                         self._callbacks.progress_callback(
                                             line=line, client=self._client, mode=ExecutionMode.SYNC
                                         )
-                                self.log.info("[%s] %s", container_name, message_to_log)
+                                if message_to_log is not None:
+                                    self.log.info("[%s] %s", container_name, message_to_log)
                                 last_captured_timestamp = message_timestamp
                                 message_to_log = message
                                 message_timestamp = line_timestamp
@@ -481,7 +482,8 @@ class PodManager(LoggingMixin):
                             self._callbacks.progress_callback(
                                 line=line, client=self._client, mode=ExecutionMode.SYNC
                             )
-                    self.log.info("[%s] %s", container_name, message_to_log)
+                    if message_to_log is not None:
+                        self.log.info("[%s] %s", container_name, message_to_log)
                     last_captured_timestamp = message_timestamp
             except TimeoutError as e:
                 # in case of timeout, increment return time by 2 seconds to avoid
diff --git a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
index fb04e27212..5ea3a0cd9f 100644
--- a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
+++ b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
@@ -93,6 +93,21 @@ class TestPodManager:
             ]
         )
 
+    @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running")
+    def test_fetch_container_logs_do_not_log_none(self, mock_container_is_running, caplog):
+        MockWrapper.reset()
+        caplog.set_level(logging.INFO)
+
+        def consumer_iter():
+            """This will simulate a container that hasn't produced any logs in the last read_timeout window"""
+            yield from ()
+
+        with mock.patch.object(PodLogsConsumer, "__iter__") as mock_consumer_iter:
+            mock_consumer_iter.side_effect = consumer_iter
+            mock_container_is_running.side_effect = [True, True, False]
+            self.pod_manager.fetch_container_logs(mock.MagicMock(), "container-name", follow=True)
+            assert "[container-name] None" not in (record.message for record in caplog.records)
+
     def test_read_pod_logs_retries_fails(self):
         mock.sentinel.metadata = mock.MagicMock()
         self.mock_kube_client.read_namespaced_pod_log.side_effect = [