You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by pi...@apache.org on 2023/03/06 21:46:53 UTC

[airflow] 09/37: Throttle streaming log reads (#28818)

This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch v2-5-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 76b279681723b626063f5425934beca6cb43b1c7
Author: Daniel Standish <15...@users.noreply.github.com>
AuthorDate: Tue Jan 10 22:16:11 2023 -0800

    Throttle streaming log reads (#28818)
    
    Co-authored-by: Jed Cunningham <66...@users.noreply.github.com>
    (cherry picked from commit 46704eed5340ef0e71cf828bb560b3d00aa88691)
---
 airflow/utils/log/log_reader.py | 17 ++++++++++++++---
 1 file changed, 14 insertions(+), 3 deletions(-)

diff --git a/airflow/utils/log/log_reader.py b/airflow/utils/log/log_reader.py
index 7ad4700195..5cc8b9377e 100644
--- a/airflow/utils/log/log_reader.py
+++ b/airflow/utils/log/log_reader.py
@@ -17,6 +17,7 @@
 from __future__ import annotations
 
 import logging
+import time
 from typing import Iterator
 
 from sqlalchemy.orm.session import Session
@@ -33,6 +34,9 @@ from airflow.utils.state import State
 class TaskLogReader:
     """Task log reader"""
 
+    STREAM_LOOP_SLEEP_SECONDS = 0.5
+    """Time to sleep between loops while waiting for more logs"""
+
     def read_log_chunks(
         self, ti: TaskInstance, try_number: int | None, metadata
     ) -> tuple[list[tuple[tuple[str, str]]], dict[str, str]]:
@@ -77,12 +81,19 @@ class TaskLogReader:
             metadata.pop("max_offset", None)
             metadata.pop("offset", None)
             metadata.pop("log_pos", None)
-            while "end_of_log" not in metadata or (
-                not metadata["end_of_log"] and ti.state not in [State.RUNNING, State.DEFERRED]
-            ):
+            while True:
                 logs, metadata = self.read_log_chunks(ti, current_try_number, metadata)
                 for host, log in logs[0]:
                     yield "\n".join([host or "", log]) + "\n"
+                if "end_of_log" not in metadata or (
+                    not metadata["end_of_log"] and ti.state not in [State.RUNNING, State.DEFERRED]
+                ):
+                    if not logs[0]:
+                        # we did not receive any logs in this loop
+                        # sleeping to conserve resources / limit requests on external services
+                        time.sleep(self.STREAM_LOOP_SLEEP_SECONDS)
+                else:
+                    break
 
     @cached_property
     def log_handler(self):