You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by el...@apache.org on 2023/10/01 09:14:22 UTC
[airflow] branch main updated: Remove duplicated logs by reusing PodLogsConsumer (#34127)
This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new cbb04e5133 Remove duplicated logs by reusing PodLogsConsumer (#34127)
cbb04e5133 is described below
commit cbb04e513352e237baf6075ff8c6a59cc88d1122
Author: Dongwon Kim <dk...@gmail.com>
AuthorDate: Sun Oct 1 18:14:11 2023 +0900
Remove duplicated logs by reusing PodLogsConsumer (#34127)
* Remove duplicated logs by reusing PodLogsConsumer
* reuse the kubernetes connection not to re-consume the logs
* If a failure occurs while consuming logs through `PodLogsConsumer`, a new `PodLogsConsumer` is created.
* But, at this time there are duplicated logs even though they have already been consumed.
* To fix the duplicated logs, `PodLogsConsumer` instance is created initially at once and is to reused
when a failure occurs to prevent duplicate logs from occurring.
Signed-off-by: 김동원 <do...@toss.im>
* Move the try inside the connection creation
Signed-off-by: 김동원 <do...@toss.im>
* Validate reusing PodLogsConsumer instance
Signed-off-by: 김동원 <do...@toss.im>
* Check there are duplicated logs
Signed-off-by: 김동원 <do...@toss.im>
---------
Signed-off-by: 김동원 <do...@toss.im>
Co-authored-by: 김동원 <do...@toss.im>
Co-authored-by: Elad Kalif <45...@users.noreply.github.com>
---
.../providers/cncf/kubernetes/utils/pod_manager.py | 39 +++++++++++++---------
.../cncf/kubernetes/utils/test_pod_manager.py | 25 ++++++++++++++
2 files changed, 49 insertions(+), 15 deletions(-)
diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index cfb262cdec..05829a1615 100644
--- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -392,8 +392,12 @@ class PodManager(LoggingMixin):
before=before_log(self.log, logging.INFO),
)
def consume_logs(
- *, since_time: DateTime | None = None, follow: bool = True, termination_timeout: int = 120
- ) -> DateTime | None:
+ *,
+ since_time: DateTime | None = None,
+ follow: bool = True,
+ termination_timeout: int = 120,
+ logs: PodLogsConsumer | None,
+ ) -> tuple[DateTime | None, PodLogsConsumer | None]:
"""
Tries to follow container logs until container completes.
@@ -404,16 +408,17 @@ class PodManager(LoggingMixin):
"""
last_captured_timestamp = None
try:
- logs = self.read_pod_logs(
- pod=pod,
- container_name=container_name,
- timestamps=True,
- since_seconds=(
- math.ceil((pendulum.now() - since_time).total_seconds()) if since_time else None
- ),
- follow=follow,
- post_termination_timeout=termination_timeout,
- )
+ if not logs:
+ logs = self.read_pod_logs(
+ pod=pod,
+ container_name=container_name,
+ timestamps=True,
+ since_seconds=(
+ math.ceil((pendulum.now() - since_time).total_seconds()) if since_time else None
+ ),
+ follow=follow,
+ post_termination_timeout=post_termination_timeout,
+ )
for raw_line in logs:
line = raw_line.decode("utf-8", errors="backslashreplace")
line_timestamp, message = self.parse_log_line(line)
@@ -434,15 +439,19 @@ class PodManager(LoggingMixin):
pod.metadata.name,
exc_info=True,
)
- return last_captured_timestamp or since_time
+ return last_captured_timestamp or since_time, logs
# note: `read_pod_logs` follows the logs, so we shouldn't necessarily *need* to
# loop as we do here. But in a long-running process we might temporarily lose connectivity.
# So the looping logic is there to let us resume following the logs.
+ logs = None
last_log_time = since_time
while True:
- last_log_time = consume_logs(
- since_time=last_log_time, follow=follow, termination_timeout=post_termination_timeout
+ last_log_time, logs = consume_logs(
+ since_time=last_log_time,
+ follow=follow,
+ termination_timeout=post_termination_timeout,
+ logs=logs,
)
if not self.container_is_running(pod, container_name=container_name):
return PodLoggingStatus(running=False, last_log_time=last_log_time)
diff --git a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
index dfe06d9a74..42e2a5c610 100644
--- a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
+++ b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
@@ -284,6 +284,31 @@ class TestPodManager:
self.pod_manager.fetch_container_logs(mock.MagicMock(), mock.MagicMock(), follow=True)
self.mock_progress_callback.assert_has_calls([mock.call(message), mock.call(no_ts_message)])
+ @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running")
+ def test_fetch_container_logs_failures(self, mock_container_is_running):
+ last_timestamp_string = "2020-10-08T14:18:17.793417674Z"
+ messages = [
+ bytes("2020-10-08T14:16:17.793417674Z message", "utf-8"),
+ bytes("2020-10-08T14:17:17.793417674Z message", "utf-8"),
+ None,
+ bytes(f"{last_timestamp_string} message", "utf-8"),
+ ]
+ expected_call_count = len([message for message in messages if message is not None])
+
+ def consumer_iter():
+ while messages:
+ message = messages.pop(0)
+ if message is None:
+ raise BaseHTTPError("Boom")
+ yield message
+
+ with mock.patch.object(PodLogsConsumer, "__iter__") as mock_consumer_iter:
+ mock_consumer_iter.side_effect = consumer_iter
+ mock_container_is_running.side_effect = [True, True, False]
+ status = self.pod_manager.fetch_container_logs(mock.MagicMock(), mock.MagicMock(), follow=True)
+ assert status.last_log_time == cast(DateTime, pendulum.parse(last_timestamp_string))
+ assert self.mock_progress_callback.call_count == expected_call_count
+
def test_parse_invalid_log_line(self, caplog):
with caplog.at_level(logging.INFO):
self.pod_manager.parse_log_line("2020-10-08T14:16:17.793417674ZInvalidmessage\n")