You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/09/18 21:06:29 UTC

[airflow] branch main updated: Auto tail file logs in Web UI (#26169)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1f7b296227 Auto tail file logs in Web UI (#26169)
1f7b296227 is described below

commit 1f7b296227fee772de9ba15af6ce107937ef9b9b
Author: Jens Scheffler <95...@users.noreply.github.com>
AuthorDate: Sun Sep 18 23:06:22 2022 +0200

    Auto tail file logs in Web UI (#26169)
---
 .../alibaba/cloud/log/oss_task_handler.py          |  2 +-
 .../providers/amazon/aws/log/s3_task_handler.py    |  2 +-
 .../providers/google/cloud/log/gcs_task_handler.py |  2 +-
 .../microsoft/azure/log/wasb_task_handler.py       |  7 +++--
 airflow/utils/log/file_task_handler.py             | 34 ++++++++++++++++++----
 airflow/utils/log/log_reader.py                    |  6 +++-
 airflow/www/static/js/ti_log.js                    |  2 +-
 tests/api_connexion/endpoints/test_log_endpoint.py |  2 +-
 .../google/cloud/log/test_gcs_task_handler.py      |  2 +-
 tests/utils/log/test_log_reader.py                 |  6 ++--
 10 files changed, 48 insertions(+), 17 deletions(-)

diff --git a/airflow/providers/alibaba/cloud/log/oss_task_handler.py b/airflow/providers/alibaba/cloud/log/oss_task_handler.py
index 924046145b..79985e8f48 100644
--- a/airflow/providers/alibaba/cloud/log/oss_task_handler.py
+++ b/airflow/providers/alibaba/cloud/log/oss_task_handler.py
@@ -114,7 +114,7 @@ class OSSTaskHandler(FileTaskHandler, LoggingMixin):
         remote_loc = log_relative_path
 
         if not self.oss_log_exists(remote_loc):
-            return super()._read(ti, try_number)
+            return super()._read(ti, try_number, metadata)
         # If OSS remote file exists, we do not fetch logs from task instance
         # local machine even if there are errors reading remote logs, as
         # returned remote_log will contain error messages.
diff --git a/airflow/providers/amazon/aws/log/s3_task_handler.py b/airflow/providers/amazon/aws/log/s3_task_handler.py
index 14ea3ba7a0..9aca0f3249 100644
--- a/airflow/providers/amazon/aws/log/s3_task_handler.py
+++ b/airflow/providers/amazon/aws/log/s3_task_handler.py
@@ -130,7 +130,7 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin):
             return log, {'end_of_log': True}
         else:
             log += '*** Falling back to local log\n'
-            local_log, metadata = super()._read(ti, try_number)
+            local_log, metadata = super()._read(ti, try_number, metadata)
             return log + local_log, metadata
 
     def s3_log_exists(self, remote_log_location: str) -> bool:
diff --git a/airflow/providers/google/cloud/log/gcs_task_handler.py b/airflow/providers/google/cloud/log/gcs_task_handler.py
index 06837462b5..e0aa46ae36 100644
--- a/airflow/providers/google/cloud/log/gcs_task_handler.py
+++ b/airflow/providers/google/cloud/log/gcs_task_handler.py
@@ -152,7 +152,7 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
         except Exception as e:
             log = f'*** Unable to read remote log from {remote_loc}\n*** {str(e)}\n\n'
             self.log.error(log)
-            local_log, metadata = super()._read(ti, try_number)
+            local_log, metadata = super()._read(ti, try_number, metadata)
             log += local_log
             return log, metadata
 
diff --git a/airflow/providers/microsoft/azure/log/wasb_task_handler.py b/airflow/providers/microsoft/azure/log/wasb_task_handler.py
index 6af159633e..6049242373 100644
--- a/airflow/providers/microsoft/azure/log/wasb_task_handler.py
+++ b/airflow/providers/microsoft/azure/log/wasb_task_handler.py
@@ -19,6 +19,7 @@ from __future__ import annotations
 
 import os
 import shutil
+from typing import Any
 
 from azure.common import AzureHttpError
 
@@ -104,7 +105,9 @@ class WasbTaskHandler(FileTaskHandler, LoggingMixin):
         # Mark closed so we don't double write if close is called twice
         self.closed = True
 
-    def _read(self, ti, try_number: int, metadata: str | None = None) -> tuple[str, dict[str, bool]]:
+    def _read(
+        self, ti, try_number: int, metadata: dict[str, Any] | None = None
+    ) -> tuple[str, dict[str, bool]]:
         """
         Read logs of given task instance and try_number from Wasb remote storage.
         If failed, read the log from task instance host machine.
@@ -128,7 +131,7 @@ class WasbTaskHandler(FileTaskHandler, LoggingMixin):
             log = f'*** Reading remote log from {remote_loc}.\n{remote_log}\n'
             return log, {'end_of_log': True}
         else:
-            return super()._read(ti, try_number)
+            return super()._read(ti, try_number, metadata)
 
     def wasb_log_exists(self, remote_log_location: str) -> bool:
         """
diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py
index f87df22f66..cd4d049d9a 100644
--- a/airflow/utils/log/file_task_handler.py
+++ b/airflow/utils/log/file_task_handler.py
@@ -22,7 +22,7 @@ import logging
 import os
 import warnings
 from pathlib import Path
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Any
 from urllib.parse import urljoin
 
 from airflow.configuration import AirflowConfigException, conf
@@ -31,6 +31,7 @@ from airflow.utils.context import Context
 from airflow.utils.helpers import parse_template_string, render_template_to_string
 from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler
 from airflow.utils.session import create_session
+from airflow.utils.state import State
 
 if TYPE_CHECKING:
     from airflow.models import TaskInstance
@@ -129,7 +130,7 @@ class FileTaskHandler(logging.Handler):
     def _read_grouped_logs(self):
         return False
 
-    def _read(self, ti, try_number, metadata=None):
+    def _read(self, ti: TaskInstance, try_number: int, metadata: dict[str, Any] | None = None):
         """
         Template method that contains custom logic of reading
         logs given the try_number.
@@ -138,7 +139,18 @@ class FileTaskHandler(logging.Handler):
         :param try_number: current try_number to read log from
         :param metadata: log metadata,
                          can be used for steaming log reading and auto-tailing.
+                         Following attributes are used:
+                         log_pos: (absolute) Char position to which the log
+                                  which was retrieved in previous calls, this
+                                  part will be skipped and only following test
+                                  returned to be added to tail.
+
         :return: log message as a string and metadata.
+                 Following attributes are used in metadata:
+                 end_of_log: Boolean, True if end of log is reached or False
+                             if further calls might get more log text.
+                             This is determined by the status of the TaskInstance
+                 log_pos: (absolute) Char position to which the log is retrieved
         """
         from airflow.utils.jwt_signer import JWTSigner
 
@@ -158,6 +170,7 @@ class FileTaskHandler(logging.Handler):
             except Exception as e:
                 log = f"*** Failed to load local log file: {location}\n"
                 log += f"*** {str(e)}\n"
+                return log, {'end_of_log': True}
         elif conf.get('core', 'executor') == 'KubernetesExecutor':
             try:
                 from airflow.kubernetes.kube_client import get_kube_client
@@ -194,6 +207,7 @@ class FileTaskHandler(logging.Handler):
 
             except Exception as f:
                 log += f'*** Unable to fetch logs from worker pod {ti.hostname} ***\n{str(f)}\n\n'
+                return log, {'end_of_log': True}
         else:
             import httpx
 
@@ -219,7 +233,7 @@ class FileTaskHandler(logging.Handler):
                 response = httpx.get(
                     url,
                     timeout=timeout,
-                    headers={b'Authorization': signer.generate_signed_token({"filename": log_relative_path})},
+                    headers={'Authorization': signer.generate_signed_token({"filename": log_relative_path})},
                 )
                 response.encoding = "utf-8"
 
@@ -240,8 +254,16 @@ class FileTaskHandler(logging.Handler):
                 log += '\n' + response.text
             except Exception as e:
                 log += f"*** Failed to fetch log file from worker. {str(e)}\n"
+                return log, {'end_of_log': True}
+
+        # Process tailing if log is not at it's end
+        end_of_log = ti.try_number != try_number or ti.state not in State.running
+        log_pos = len(log)
+        if metadata and 'log_pos' in metadata:
+            previous_chars = metadata['log_pos']
+            log = log[previous_chars:]  # Cut off previously passed log test as new tail
 
-        return log, {'end_of_log': True}
+        return log, {'end_of_log': end_of_log, 'log_pos': log_pos}
 
     def read(self, task_instance, try_number=None, metadata=None):
         """
@@ -273,11 +295,11 @@ class FileTaskHandler(logging.Handler):
         logs = [''] * len(try_numbers)
         metadata_array = [{}] * len(try_numbers)
         for i, try_number_element in enumerate(try_numbers):
-            log, metadata = self._read(task_instance, try_number_element, metadata)
+            log, out_metadata = self._read(task_instance, try_number_element, metadata)
             # es_task_handler return logs grouped by host. wrap other handler returning log string
             # with default/ empty host so that UI can render the response in the same way
             logs[i] = log if self._read_grouped_logs() else [(task_instance.hostname, log)]
-            metadata_array[i] = metadata
+            metadata_array[i] = out_metadata
 
         return logs, metadata_array
 
diff --git a/airflow/utils/log/log_reader.py b/airflow/utils/log/log_reader.py
index 7fa04979bb..d14565d257 100644
--- a/airflow/utils/log/log_reader.py
+++ b/airflow/utils/log/log_reader.py
@@ -27,6 +27,7 @@ from airflow.models.taskinstance import TaskInstance
 from airflow.utils.helpers import render_log_filename
 from airflow.utils.log.logging_mixin import ExternalLoggingMixin
 from airflow.utils.session import NEW_SESSION, provide_session
+from airflow.utils.state import State
 
 
 class TaskLogReader:
@@ -77,7 +78,10 @@ class TaskLogReader:
             metadata.pop('end_of_log', None)
             metadata.pop('max_offset', None)
             metadata.pop('offset', None)
-            while 'end_of_log' not in metadata or not metadata['end_of_log']:
+            metadata.pop('log_pos', None)
+            while 'end_of_log' not in metadata or (
+                not metadata['end_of_log'] and ti.state not in State.running
+            ):
                 logs, metadata = self.read_log_chunks(ti, current_try_number, metadata)
                 for host, log in logs[0]:
                     yield "\n".join([host or '', log]) + "\n"
diff --git a/airflow/www/static/js/ti_log.js b/airflow/www/static/js/ti_log.js
index ae72837345..68013fb813 100644
--- a/airflow/www/static/js/ti_log.js
+++ b/airflow/www/static/js/ti_log.js
@@ -123,7 +123,7 @@ function autoTailingLog(tryNumber, metadata = null, autoTailing = false) {
           .replace(urlRegex, (url) => `<a href="${url}" target="_blank">${url}</a>`)
           .replaceAll(dateRegex, (date) => `<time datetime="${date}+00:00" data-with-tz="true">${formatDateTime(`${date}+00:00`)}</time>`)
           .replaceAll(iso8601Regex, (date) => `<time datetime="${date}" data-with-tz="true">${formatDateTime(`${date}`)}</time>`);
-        logBlock.innerHTML += `${linkifiedMessage}\n`;
+        logBlock.innerHTML += `${linkifiedMessage}`;
       });
 
       // Auto scroll window to the end if current window location is near the end.
diff --git a/tests/api_connexion/endpoints/test_log_endpoint.py b/tests/api_connexion/endpoints/test_log_endpoint.py
index f3f9fbe7a4..ae6ae66c1e 100644
--- a/tests/api_connexion/endpoints/test_log_endpoint.py
+++ b/tests/api_connexion/endpoints/test_log_endpoint.py
@@ -169,7 +169,7 @@ class TestGetLog:
             == f"[('localhost', '*** Reading local file: {expected_filename}\\nLog for testing.')]"
         )
         info = serializer.loads(response.json['continuation_token'])
-        assert info == {'end_of_log': True}
+        assert info == {'end_of_log': True, 'log_pos': 41 + len(expected_filename)}
         assert 200 == response.status_code
 
     @pytest.mark.parametrize(
diff --git a/tests/providers/google/cloud/log/test_gcs_task_handler.py b/tests/providers/google/cloud/log/test_gcs_task_handler.py
index 411dfb4af5..3daf9ba6e2 100644
--- a/tests/providers/google/cloud/log/test_gcs_task_handler.py
+++ b/tests/providers/google/cloud/log/test_gcs_task_handler.py
@@ -105,7 +105,7 @@ class TestGCSTaskHandler:
             log == "*** Unable to read remote log from gs://bucket/remote/log/location/1.log\n*** "
             f"Failed to connect\n\n*** Reading local file: {self.local_log_location}/1.log\n"
         )
-        assert metadata == {"end_of_log": True}
+        assert metadata == {'end_of_log': False, 'log_pos': 31 + len(self.local_log_location)}
         mock_blob.from_string.assert_called_once_with(
             "gs://bucket/remote/log/location/1.log", mock_client.return_value
         )
diff --git a/tests/utils/log/test_log_reader.py b/tests/utils/log/test_log_reader.py
index 54b67a1bcc..a14f13530a 100644
--- a/tests/utils/log/test_log_reader.py
+++ b/tests/utils/log/test_log_reader.py
@@ -127,7 +127,7 @@ class TestLogView:
                 f"try_number=1.\n",
             )
         ] == logs[0]
-        assert {"end_of_log": True} == metadatas
+        assert metadatas == {'end_of_log': True, 'log_pos': 102 + len(self.log_dir)}
 
     def test_test_read_log_chunks_should_read_all_files(self):
         task_log_reader = TaskLogReader()
@@ -159,7 +159,7 @@ class TestLogView:
                 )
             ],
         ] == logs
-        assert {"end_of_log": True} == metadatas
+        assert {'end_of_log': True, 'log_pos': 102 + len(self.log_dir)} == metadatas
 
     def test_test_test_read_log_stream_should_read_one_try(self):
         task_log_reader = TaskLogReader()
@@ -174,6 +174,7 @@ class TestLogView:
 
     def test_test_test_read_log_stream_should_read_all_logs(self):
         task_log_reader = TaskLogReader()
+        self.ti.state = TaskInstanceState.SUCCESS  # Ensure mocked instance is completed to return stream
         stream = task_log_reader.read_log_stream(ti=self.ti, try_number=None, metadata={})
         assert [
             "localhost\n*** Reading local file: "
@@ -199,6 +200,7 @@ class TestLogView:
         mock_read.side_effect = [first_return, second_return, third_return, fourth_return]
 
         task_log_reader = TaskLogReader()
+        self.ti.state = TaskInstanceState.SUCCESS
         log_stream = task_log_reader.read_log_stream(ti=self.ti, try_number=1, metadata={})
         assert ["\n1st line\n", "\n2nd line\n", "\n3rd line\n"] == list(log_stream)