You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/12/25 12:18:22 UTC
[airflow] branch master updated: Respect LogFormat when using ES
logging with Json Format (#13310)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new b6bf253 Respect LogFormat when using ES logging with Json Format (#13310)
b6bf253 is described below
commit b6bf25306243e78bf12528f9a080ea100a575641
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Fri Dec 25 12:18:09 2020 +0000
Respect LogFormat when using ES logging with Json Format (#13310)
This was a log standing bug / behaviour where Timestamps, log level,
line number etc were not shown when using ElasticSearch Task Handler
(Elasticsearch as remote logging) with json_format=True.
---
.../providers/elasticsearch/log/es_task_handler.py | 25 +++++++++++++++++++++-
.../elasticsearch/log/test_es_task_handler.py | 25 ++++++++++++++++++++++
2 files changed, 49 insertions(+), 1 deletion(-)
diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py b/airflow/providers/elasticsearch/log/es_task_handler.py
index 35c99ea..064b796 100644
--- a/airflow/providers/elasticsearch/log/es_task_handler.py
+++ b/airflow/providers/elasticsearch/log/es_task_handler.py
@@ -194,12 +194,26 @@ class ElasticsearchTaskHandler(FileTaskHandler, LoggingMixin):
# to prevent it from showing in the UI.
def concat_logs(lines):
log_range = (len(lines) - 1) if lines[-1].message == self.end_of_log_mark.strip() else len(lines)
- return '\n'.join([lines[i].message for i in range(log_range)])
+ return '\n'.join([self._format_msg(lines[i]) for i in range(log_range)])
message = [(host, concat_logs(hosted_log)) for host, hosted_log in logs_by_host]
return message, metadata
+ def _format_msg(self, log_line):
+ """Format ES Record to match settings.LOG_FORMAT when used with json_format"""
+ # Using formatter._style.format makes it future proof i.e.
+ # if we change the formatter style from '%' to '{' or '$', this will still work
+ if self.json_format:
+ try:
+ # pylint: disable=protected-access
+ return self.formatter._style.format(_ESJsonLogFmt(**log_line.to_dict()))
+ except Exception: # noqa pylint: disable=broad-except
+ pass
+
+ # Just a safe-guard to preserve backwards-compatibility
+ return log_line.message
+
def es_read(self, log_id: str, offset: str, metadata: dict) -> list:
"""
Returns the logs matching log_id in Elasticsearch and next offset.
@@ -246,6 +260,7 @@ class ElasticsearchTaskHandler(FileTaskHandler, LoggingMixin):
if self.json_format:
self.formatter = JSONFormatter(
+ fmt=self.formatter._fmt, # pylint: disable=protected-access
json_fields=self.json_fields,
extras={
'dag_id': str(ti.dag_id),
@@ -328,3 +343,11 @@ class ElasticsearchTaskHandler(FileTaskHandler, LoggingMixin):
)
url = 'https://' + self.frontend.format(log_id=quote(log_id))
return url
+
+
+class _ESJsonLogFmt:
+ """Helper class to read ES Logs and re-format it to match settings.LOG_FORMAT"""
+
+ # A separate class is needed because 'self.formatter._style.format' uses '.__dict__'
+ def __init__(self, **kwargs):
+ self.__dict__.update(kwargs)
diff --git a/tests/providers/elasticsearch/log/test_es_task_handler.py b/tests/providers/elasticsearch/log/test_es_task_handler.py
index 5f34ab2..3531c09 100644
--- a/tests/providers/elasticsearch/log/test_es_task_handler.py
+++ b/tests/providers/elasticsearch/log/test_es_task_handler.py
@@ -251,6 +251,31 @@ class TestElasticsearchTaskHandler(unittest.TestCase):
self.es_task_handler.json_format = True
self.es_task_handler.set_context(self.ti)
+ def test_read_with_json_format(self):
+ ts = pendulum.now()
+ formatter = logging.Formatter('[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s')
+ self.es_task_handler.formatter = formatter
+ self.es_task_handler.json_format = True
+
+ self.body = {
+ 'message': self.test_message,
+ 'log_id': f'{self.DAG_ID}-{self.TASK_ID}-2016_01_01T00_00_00_000000-1',
+ 'offset': 1,
+ 'asctime': '2020-12-24 19:25:00,962',
+ 'filename': 'taskinstance.py',
+ 'lineno': 851,
+ 'levelname': 'INFO',
+ }
+ self.es_task_handler.set_context(self.ti)
+ self.es.index(index=self.index_name, doc_type=self.doc_type, body=self.body, id=id)
+
+ logs, _ = self.es_task_handler.read(
+ self.ti, 1, {'offset': 0, 'last_log_timestamp': str(ts), 'end_of_log': False}
+ )
+ self.assertEqual(
+ "[2020-12-24 19:25:00,962] {taskinstance.py:851} INFO - some random stuff", logs[0][0][1]
+ )
+
def test_close(self):
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
self.es_task_handler.formatter = formatter