You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by pi...@apache.org on 2023/03/06 21:46:53 UTC
[airflow] 09/37: Throttle streaming log reads (#28818)
This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch v2-5-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 76b279681723b626063f5425934beca6cb43b1c7
Author: Daniel Standish <15...@users.noreply.github.com>
AuthorDate: Tue Jan 10 22:16:11 2023 -0800
Throttle streaming log reads (#28818)
Co-authored-by: Jed Cunningham <66...@users.noreply.github.com>
(cherry picked from commit 46704eed5340ef0e71cf828bb560b3d00aa88691)
---
airflow/utils/log/log_reader.py | 17 ++++++++++++++---
1 file changed, 14 insertions(+), 3 deletions(-)
diff --git a/airflow/utils/log/log_reader.py b/airflow/utils/log/log_reader.py
index 7ad4700195..5cc8b9377e 100644
--- a/airflow/utils/log/log_reader.py
+++ b/airflow/utils/log/log_reader.py
@@ -17,6 +17,7 @@
from __future__ import annotations
import logging
+import time
from typing import Iterator
from sqlalchemy.orm.session import Session
@@ -33,6 +34,9 @@ from airflow.utils.state import State
class TaskLogReader:
"""Task log reader"""
+ STREAM_LOOP_SLEEP_SECONDS = 0.5
+ """Time to sleep between loops while waiting for more logs"""
+
def read_log_chunks(
self, ti: TaskInstance, try_number: int | None, metadata
) -> tuple[list[tuple[tuple[str, str]]], dict[str, str]]:
@@ -77,12 +81,19 @@ class TaskLogReader:
metadata.pop("max_offset", None)
metadata.pop("offset", None)
metadata.pop("log_pos", None)
- while "end_of_log" not in metadata or (
- not metadata["end_of_log"] and ti.state not in [State.RUNNING, State.DEFERRED]
- ):
+ while True:
logs, metadata = self.read_log_chunks(ti, current_try_number, metadata)
for host, log in logs[0]:
yield "\n".join([host or "", log]) + "\n"
+ if "end_of_log" not in metadata or (
+ not metadata["end_of_log"] and ti.state not in [State.RUNNING, State.DEFERRED]
+ ):
+ if not logs[0]:
+ # we did not receive any logs in this loop
+ # sleeping to conserve resources / limit requests on external services
+ time.sleep(self.STREAM_LOOP_SLEEP_SECONDS)
+ else:
+ break
@cached_property
def log_handler(self):