You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/12/25 12:18:22 UTC

[airflow] branch master updated: Respect LogFormat when using ES logging with Json Format (#13310)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new b6bf253  Respect LogFormat when using ES logging with Json Format (#13310)
b6bf253 is described below

commit b6bf25306243e78bf12528f9a080ea100a575641
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Fri Dec 25 12:18:09 2020 +0000

    Respect LogFormat when using ES logging with Json Format (#13310)
    
    This was a log standing bug / behaviour where Timestamps, log level,
    line number etc were not shown when using ElasticSearch Task Handler
    (Elasticsearch as remote logging) with json_format=True.
---
 .../providers/elasticsearch/log/es_task_handler.py | 25 +++++++++++++++++++++-
 .../elasticsearch/log/test_es_task_handler.py      | 25 ++++++++++++++++++++++
 2 files changed, 49 insertions(+), 1 deletion(-)

diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py b/airflow/providers/elasticsearch/log/es_task_handler.py
index 35c99ea..064b796 100644
--- a/airflow/providers/elasticsearch/log/es_task_handler.py
+++ b/airflow/providers/elasticsearch/log/es_task_handler.py
@@ -194,12 +194,26 @@ class ElasticsearchTaskHandler(FileTaskHandler, LoggingMixin):
         # to prevent it from showing in the UI.
         def concat_logs(lines):
             log_range = (len(lines) - 1) if lines[-1].message == self.end_of_log_mark.strip() else len(lines)
-            return '\n'.join([lines[i].message for i in range(log_range)])
+            return '\n'.join([self._format_msg(lines[i]) for i in range(log_range)])
 
         message = [(host, concat_logs(hosted_log)) for host, hosted_log in logs_by_host]
 
         return message, metadata
 
+    def _format_msg(self, log_line):
+        """Format ES Record to match settings.LOG_FORMAT when used with json_format"""
+        # Using formatter._style.format makes it future proof i.e.
+        # if we change the formatter style from '%' to '{' or '$', this will still work
+        if self.json_format:
+            try:
+                # pylint: disable=protected-access
+                return self.formatter._style.format(_ESJsonLogFmt(**log_line.to_dict()))
+            except Exception:  # noqa pylint: disable=broad-except
+                pass
+
+        # Just a safe-guard to preserve backwards-compatibility
+        return log_line.message
+
     def es_read(self, log_id: str, offset: str, metadata: dict) -> list:
         """
         Returns the logs matching log_id in Elasticsearch and next offset.
@@ -246,6 +260,7 @@ class ElasticsearchTaskHandler(FileTaskHandler, LoggingMixin):
 
         if self.json_format:
             self.formatter = JSONFormatter(
+                fmt=self.formatter._fmt,  # pylint: disable=protected-access
                 json_fields=self.json_fields,
                 extras={
                     'dag_id': str(ti.dag_id),
@@ -328,3 +343,11 @@ class ElasticsearchTaskHandler(FileTaskHandler, LoggingMixin):
         )
         url = 'https://' + self.frontend.format(log_id=quote(log_id))
         return url
+
+
+class _ESJsonLogFmt:
+    """Helper class to read ES Logs and re-format it to match settings.LOG_FORMAT"""
+
+    # A separate class is needed because 'self.formatter._style.format' uses '.__dict__'
+    def __init__(self, **kwargs):
+        self.__dict__.update(kwargs)
diff --git a/tests/providers/elasticsearch/log/test_es_task_handler.py b/tests/providers/elasticsearch/log/test_es_task_handler.py
index 5f34ab2..3531c09 100644
--- a/tests/providers/elasticsearch/log/test_es_task_handler.py
+++ b/tests/providers/elasticsearch/log/test_es_task_handler.py
@@ -251,6 +251,31 @@ class TestElasticsearchTaskHandler(unittest.TestCase):
         self.es_task_handler.json_format = True
         self.es_task_handler.set_context(self.ti)
 
+    def test_read_with_json_format(self):
+        ts = pendulum.now()
+        formatter = logging.Formatter('[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s')
+        self.es_task_handler.formatter = formatter
+        self.es_task_handler.json_format = True
+
+        self.body = {
+            'message': self.test_message,
+            'log_id': f'{self.DAG_ID}-{self.TASK_ID}-2016_01_01T00_00_00_000000-1',
+            'offset': 1,
+            'asctime': '2020-12-24 19:25:00,962',
+            'filename': 'taskinstance.py',
+            'lineno': 851,
+            'levelname': 'INFO',
+        }
+        self.es_task_handler.set_context(self.ti)
+        self.es.index(index=self.index_name, doc_type=self.doc_type, body=self.body, id=id)
+
+        logs, _ = self.es_task_handler.read(
+            self.ti, 1, {'offset': 0, 'last_log_timestamp': str(ts), 'end_of_log': False}
+        )
+        self.assertEqual(
+            "[2020-12-24 19:25:00,962] {taskinstance.py:851} INFO - some random stuff", logs[0][0][1]
+        )
+
     def test_close(self):
         formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
         self.es_task_handler.formatter = formatter