You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/09/18 21:06:29 UTC
[airflow] branch main updated: Auto tail file logs in Web UI (#26169)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 1f7b296227 Auto tail file logs in Web UI (#26169)
1f7b296227 is described below
commit 1f7b296227fee772de9ba15af6ce107937ef9b9b
Author: Jens Scheffler <95...@users.noreply.github.com>
AuthorDate: Sun Sep 18 23:06:22 2022 +0200
Auto tail file logs in Web UI (#26169)
---
.../alibaba/cloud/log/oss_task_handler.py | 2 +-
.../providers/amazon/aws/log/s3_task_handler.py | 2 +-
.../providers/google/cloud/log/gcs_task_handler.py | 2 +-
.../microsoft/azure/log/wasb_task_handler.py | 7 +++--
airflow/utils/log/file_task_handler.py | 34 ++++++++++++++++++----
airflow/utils/log/log_reader.py | 6 +++-
airflow/www/static/js/ti_log.js | 2 +-
tests/api_connexion/endpoints/test_log_endpoint.py | 2 +-
.../google/cloud/log/test_gcs_task_handler.py | 2 +-
tests/utils/log/test_log_reader.py | 6 ++--
10 files changed, 48 insertions(+), 17 deletions(-)
diff --git a/airflow/providers/alibaba/cloud/log/oss_task_handler.py b/airflow/providers/alibaba/cloud/log/oss_task_handler.py
index 924046145b..79985e8f48 100644
--- a/airflow/providers/alibaba/cloud/log/oss_task_handler.py
+++ b/airflow/providers/alibaba/cloud/log/oss_task_handler.py
@@ -114,7 +114,7 @@ class OSSTaskHandler(FileTaskHandler, LoggingMixin):
remote_loc = log_relative_path
if not self.oss_log_exists(remote_loc):
- return super()._read(ti, try_number)
+ return super()._read(ti, try_number, metadata)
# If OSS remote file exists, we do not fetch logs from task instance
# local machine even if there are errors reading remote logs, as
# returned remote_log will contain error messages.
diff --git a/airflow/providers/amazon/aws/log/s3_task_handler.py b/airflow/providers/amazon/aws/log/s3_task_handler.py
index 14ea3ba7a0..9aca0f3249 100644
--- a/airflow/providers/amazon/aws/log/s3_task_handler.py
+++ b/airflow/providers/amazon/aws/log/s3_task_handler.py
@@ -130,7 +130,7 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin):
return log, {'end_of_log': True}
else:
log += '*** Falling back to local log\n'
- local_log, metadata = super()._read(ti, try_number)
+ local_log, metadata = super()._read(ti, try_number, metadata)
return log + local_log, metadata
def s3_log_exists(self, remote_log_location: str) -> bool:
diff --git a/airflow/providers/google/cloud/log/gcs_task_handler.py b/airflow/providers/google/cloud/log/gcs_task_handler.py
index 06837462b5..e0aa46ae36 100644
--- a/airflow/providers/google/cloud/log/gcs_task_handler.py
+++ b/airflow/providers/google/cloud/log/gcs_task_handler.py
@@ -152,7 +152,7 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
except Exception as e:
log = f'*** Unable to read remote log from {remote_loc}\n*** {str(e)}\n\n'
self.log.error(log)
- local_log, metadata = super()._read(ti, try_number)
+ local_log, metadata = super()._read(ti, try_number, metadata)
log += local_log
return log, metadata
diff --git a/airflow/providers/microsoft/azure/log/wasb_task_handler.py b/airflow/providers/microsoft/azure/log/wasb_task_handler.py
index 6af159633e..6049242373 100644
--- a/airflow/providers/microsoft/azure/log/wasb_task_handler.py
+++ b/airflow/providers/microsoft/azure/log/wasb_task_handler.py
@@ -19,6 +19,7 @@ from __future__ import annotations
import os
import shutil
+from typing import Any
from azure.common import AzureHttpError
@@ -104,7 +105,9 @@ class WasbTaskHandler(FileTaskHandler, LoggingMixin):
# Mark closed so we don't double write if close is called twice
self.closed = True
- def _read(self, ti, try_number: int, metadata: str | None = None) -> tuple[str, dict[str, bool]]:
+ def _read(
+ self, ti, try_number: int, metadata: dict[str, Any] | None = None
+ ) -> tuple[str, dict[str, bool]]:
"""
Read logs of given task instance and try_number from Wasb remote storage.
If failed, read the log from task instance host machine.
@@ -128,7 +131,7 @@ class WasbTaskHandler(FileTaskHandler, LoggingMixin):
log = f'*** Reading remote log from {remote_loc}.\n{remote_log}\n'
return log, {'end_of_log': True}
else:
- return super()._read(ti, try_number)
+ return super()._read(ti, try_number, metadata)
def wasb_log_exists(self, remote_log_location: str) -> bool:
"""
diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py
index f87df22f66..cd4d049d9a 100644
--- a/airflow/utils/log/file_task_handler.py
+++ b/airflow/utils/log/file_task_handler.py
@@ -22,7 +22,7 @@ import logging
import os
import warnings
from pathlib import Path
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Any
from urllib.parse import urljoin
from airflow.configuration import AirflowConfigException, conf
@@ -31,6 +31,7 @@ from airflow.utils.context import Context
from airflow.utils.helpers import parse_template_string, render_template_to_string
from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler
from airflow.utils.session import create_session
+from airflow.utils.state import State
if TYPE_CHECKING:
from airflow.models import TaskInstance
@@ -129,7 +130,7 @@ class FileTaskHandler(logging.Handler):
def _read_grouped_logs(self):
return False
- def _read(self, ti, try_number, metadata=None):
+ def _read(self, ti: TaskInstance, try_number: int, metadata: dict[str, Any] | None = None):
"""
Template method that contains custom logic of reading
logs given the try_number.
@@ -138,7 +139,18 @@ class FileTaskHandler(logging.Handler):
:param try_number: current try_number to read log from
:param metadata: log metadata,
can be used for steaming log reading and auto-tailing.
+ Following attributes are used:
+ log_pos: (absolute) Char position to which the log
+ which was retrieved in previous calls, this
+ part will be skipped and only following test
+ returned to be added to tail.
+
:return: log message as a string and metadata.
+ Following attributes are used in metadata:
+ end_of_log: Boolean, True if end of log is reached or False
+ if further calls might get more log text.
+ This is determined by the status of the TaskInstance
+ log_pos: (absolute) Char position to which the log is retrieved
"""
from airflow.utils.jwt_signer import JWTSigner
@@ -158,6 +170,7 @@ class FileTaskHandler(logging.Handler):
except Exception as e:
log = f"*** Failed to load local log file: {location}\n"
log += f"*** {str(e)}\n"
+ return log, {'end_of_log': True}
elif conf.get('core', 'executor') == 'KubernetesExecutor':
try:
from airflow.kubernetes.kube_client import get_kube_client
@@ -194,6 +207,7 @@ class FileTaskHandler(logging.Handler):
except Exception as f:
log += f'*** Unable to fetch logs from worker pod {ti.hostname} ***\n{str(f)}\n\n'
+ return log, {'end_of_log': True}
else:
import httpx
@@ -219,7 +233,7 @@ class FileTaskHandler(logging.Handler):
response = httpx.get(
url,
timeout=timeout,
- headers={b'Authorization': signer.generate_signed_token({"filename": log_relative_path})},
+ headers={'Authorization': signer.generate_signed_token({"filename": log_relative_path})},
)
response.encoding = "utf-8"
@@ -240,8 +254,16 @@ class FileTaskHandler(logging.Handler):
log += '\n' + response.text
except Exception as e:
log += f"*** Failed to fetch log file from worker. {str(e)}\n"
+ return log, {'end_of_log': True}
+
+ # Process tailing if log is not at it's end
+ end_of_log = ti.try_number != try_number or ti.state not in State.running
+ log_pos = len(log)
+ if metadata and 'log_pos' in metadata:
+ previous_chars = metadata['log_pos']
+ log = log[previous_chars:] # Cut off previously passed log test as new tail
- return log, {'end_of_log': True}
+ return log, {'end_of_log': end_of_log, 'log_pos': log_pos}
def read(self, task_instance, try_number=None, metadata=None):
"""
@@ -273,11 +295,11 @@ class FileTaskHandler(logging.Handler):
logs = [''] * len(try_numbers)
metadata_array = [{}] * len(try_numbers)
for i, try_number_element in enumerate(try_numbers):
- log, metadata = self._read(task_instance, try_number_element, metadata)
+ log, out_metadata = self._read(task_instance, try_number_element, metadata)
# es_task_handler return logs grouped by host. wrap other handler returning log string
# with default/ empty host so that UI can render the response in the same way
logs[i] = log if self._read_grouped_logs() else [(task_instance.hostname, log)]
- metadata_array[i] = metadata
+ metadata_array[i] = out_metadata
return logs, metadata_array
diff --git a/airflow/utils/log/log_reader.py b/airflow/utils/log/log_reader.py
index 7fa04979bb..d14565d257 100644
--- a/airflow/utils/log/log_reader.py
+++ b/airflow/utils/log/log_reader.py
@@ -27,6 +27,7 @@ from airflow.models.taskinstance import TaskInstance
from airflow.utils.helpers import render_log_filename
from airflow.utils.log.logging_mixin import ExternalLoggingMixin
from airflow.utils.session import NEW_SESSION, provide_session
+from airflow.utils.state import State
class TaskLogReader:
@@ -77,7 +78,10 @@ class TaskLogReader:
metadata.pop('end_of_log', None)
metadata.pop('max_offset', None)
metadata.pop('offset', None)
- while 'end_of_log' not in metadata or not metadata['end_of_log']:
+ metadata.pop('log_pos', None)
+ while 'end_of_log' not in metadata or (
+ not metadata['end_of_log'] and ti.state not in State.running
+ ):
logs, metadata = self.read_log_chunks(ti, current_try_number, metadata)
for host, log in logs[0]:
yield "\n".join([host or '', log]) + "\n"
diff --git a/airflow/www/static/js/ti_log.js b/airflow/www/static/js/ti_log.js
index ae72837345..68013fb813 100644
--- a/airflow/www/static/js/ti_log.js
+++ b/airflow/www/static/js/ti_log.js
@@ -123,7 +123,7 @@ function autoTailingLog(tryNumber, metadata = null, autoTailing = false) {
.replace(urlRegex, (url) => `<a href="${url}" target="_blank">${url}</a>`)
.replaceAll(dateRegex, (date) => `<time datetime="${date}+00:00" data-with-tz="true">${formatDateTime(`${date}+00:00`)}</time>`)
.replaceAll(iso8601Regex, (date) => `<time datetime="${date}" data-with-tz="true">${formatDateTime(`${date}`)}</time>`);
- logBlock.innerHTML += `${linkifiedMessage}\n`;
+ logBlock.innerHTML += `${linkifiedMessage}`;
});
// Auto scroll window to the end if current window location is near the end.
diff --git a/tests/api_connexion/endpoints/test_log_endpoint.py b/tests/api_connexion/endpoints/test_log_endpoint.py
index f3f9fbe7a4..ae6ae66c1e 100644
--- a/tests/api_connexion/endpoints/test_log_endpoint.py
+++ b/tests/api_connexion/endpoints/test_log_endpoint.py
@@ -169,7 +169,7 @@ class TestGetLog:
== f"[('localhost', '*** Reading local file: {expected_filename}\\nLog for testing.')]"
)
info = serializer.loads(response.json['continuation_token'])
- assert info == {'end_of_log': True}
+ assert info == {'end_of_log': True, 'log_pos': 41 + len(expected_filename)}
assert 200 == response.status_code
@pytest.mark.parametrize(
diff --git a/tests/providers/google/cloud/log/test_gcs_task_handler.py b/tests/providers/google/cloud/log/test_gcs_task_handler.py
index 411dfb4af5..3daf9ba6e2 100644
--- a/tests/providers/google/cloud/log/test_gcs_task_handler.py
+++ b/tests/providers/google/cloud/log/test_gcs_task_handler.py
@@ -105,7 +105,7 @@ class TestGCSTaskHandler:
log == "*** Unable to read remote log from gs://bucket/remote/log/location/1.log\n*** "
f"Failed to connect\n\n*** Reading local file: {self.local_log_location}/1.log\n"
)
- assert metadata == {"end_of_log": True}
+ assert metadata == {'end_of_log': False, 'log_pos': 31 + len(self.local_log_location)}
mock_blob.from_string.assert_called_once_with(
"gs://bucket/remote/log/location/1.log", mock_client.return_value
)
diff --git a/tests/utils/log/test_log_reader.py b/tests/utils/log/test_log_reader.py
index 54b67a1bcc..a14f13530a 100644
--- a/tests/utils/log/test_log_reader.py
+++ b/tests/utils/log/test_log_reader.py
@@ -127,7 +127,7 @@ class TestLogView:
f"try_number=1.\n",
)
] == logs[0]
- assert {"end_of_log": True} == metadatas
+ assert metadatas == {'end_of_log': True, 'log_pos': 102 + len(self.log_dir)}
def test_test_read_log_chunks_should_read_all_files(self):
task_log_reader = TaskLogReader()
@@ -159,7 +159,7 @@ class TestLogView:
)
],
] == logs
- assert {"end_of_log": True} == metadatas
+ assert {'end_of_log': True, 'log_pos': 102 + len(self.log_dir)} == metadatas
def test_test_test_read_log_stream_should_read_one_try(self):
task_log_reader = TaskLogReader()
@@ -174,6 +174,7 @@ class TestLogView:
def test_test_test_read_log_stream_should_read_all_logs(self):
task_log_reader = TaskLogReader()
+ self.ti.state = TaskInstanceState.SUCCESS # Ensure mocked instance is completed to return stream
stream = task_log_reader.read_log_stream(ti=self.ti, try_number=None, metadata={})
assert [
"localhost\n*** Reading local file: "
@@ -199,6 +200,7 @@ class TestLogView:
mock_read.side_effect = [first_return, second_return, third_return, fourth_return]
task_log_reader = TaskLogReader()
+ self.ti.state = TaskInstanceState.SUCCESS
log_stream = task_log_reader.read_log_stream(ti=self.ti, try_number=1, metadata={})
assert ["\n1st line\n", "\n2nd line\n", "\n3rd line\n"] == list(log_stream)