You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by pa...@apache.org on 2024/04/15 15:22:19 UTC
(airflow) branch main updated: Avoid logging empty line KPO (#38247)
This is an automated email from the ASF dual-hosted git repository.
pankaj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 43919c2fa6 Avoid logging empty line KPO (#38247)
43919c2fa6 is described below
commit 43919c2fa6cbffd65239cb7fa3db2abb0545a260
Author: Pankaj Singh <98...@users.noreply.github.com>
AuthorDate: Mon Apr 15 20:52:11 2024 +0530
Avoid logging empty line KPO (#38247)
* Avoid logging empty line KPO
* cleanup
* Apply review suggestions
* Apply review feedback
* Update airflow/providers/cncf/kubernetes/operators/pod.py
---
airflow/providers/cncf/kubernetes/operators/pod.py | 3 ++-
airflow/providers/cncf/kubernetes/utils/pod_manager.py | 6 ++++--
tests/providers/cncf/kubernetes/utils/test_pod_manager.py | 15 +++++++++++++++
3 files changed, 21 insertions(+), 3 deletions(-)
diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py b/airflow/providers/cncf/kubernetes/operators/pod.py
index f0f74f42c4..a331987b57 100644
--- a/airflow/providers/cncf/kubernetes/operators/pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -791,7 +791,8 @@ class KubernetesPodOperator(BaseOperator):
)
for raw_line in logs:
line = raw_line.decode("utf-8", errors="backslashreplace").rstrip("\n")
- self.log.info("Container logs: %s", line)
+ if line:
+ self.log.info("Container logs: %s", line)
except HTTPError as e:
self.log.warning(
"Reading of logs interrupted with error %r; will retry. "
diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index 787757828d..66bbd5d05a 100644
--- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -464,7 +464,8 @@ class PodManager(LoggingMixin):
self._callbacks.progress_callback(
line=line, client=self._client, mode=ExecutionMode.SYNC
)
- self.log.info("[%s] %s", container_name, message_to_log)
+ if message_to_log is not None:
+ self.log.info("[%s] %s", container_name, message_to_log)
last_captured_timestamp = message_timestamp
message_to_log = message
message_timestamp = line_timestamp
@@ -481,7 +482,8 @@ class PodManager(LoggingMixin):
self._callbacks.progress_callback(
line=line, client=self._client, mode=ExecutionMode.SYNC
)
- self.log.info("[%s] %s", container_name, message_to_log)
+ if message_to_log is not None:
+ self.log.info("[%s] %s", container_name, message_to_log)
last_captured_timestamp = message_timestamp
except TimeoutError as e:
# in case of timeout, increment return time by 2 seconds to avoid
diff --git a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
index fb04e27212..5ea3a0cd9f 100644
--- a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
+++ b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
@@ -93,6 +93,21 @@ class TestPodManager:
]
)
+ @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running")
+ def test_fetch_container_logs_do_not_log_none(self, mock_container_is_running, caplog):
+ MockWrapper.reset()
+ caplog.set_level(logging.INFO)
+
+ def consumer_iter():
+ """This will simulate a container that hasn't produced any logs in the last read_timeout window"""
+ yield from ()
+
+ with mock.patch.object(PodLogsConsumer, "__iter__") as mock_consumer_iter:
+ mock_consumer_iter.side_effect = consumer_iter
+ mock_container_is_running.side_effect = [True, True, False]
+ self.pod_manager.fetch_container_logs(mock.MagicMock(), "container-name", follow=True)
+ assert "[container-name] None" not in (record.message for record in caplog.records)
+
def test_read_pod_logs_retries_fails(self):
mock.sentinel.metadata = mock.MagicMock()
self.mock_kube_client.read_namespaced_pod_log.side_effect = [