You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2021/06/22 13:46:13 UTC

[airflow] 23/38: Support remote logging in elasticsearch with filebeat 7 (#14625)

This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 77060cdeeb6b82f4aa7891e9feb0cea677857525
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Fri Jun 11 12:32:42 2021 -0600

    Support remote logging in elasticsearch with filebeat 7 (#14625)
    
    Filebeat 7 renamed some fields (offset->log.offset and host->host.name),
    so allow the field names Airflow uses to be configured.
    
    Airflow isn't directly involved with getting the logs _to_
    elasticsearch, so we should allow easy configuration to accomodate
    whatever tools are used in that process.
    
    (cherry picked from commit 5cd0bf733b839951c075c54e808a595ac923c4e8)
---
 airflow/config_templates/airflow_local_settings.py |  4 ++
 airflow/config_templates/config.yml                | 14 +++++
 airflow/config_templates/default_airflow.cfg       |  6 +++
 .../providers/elasticsearch/log/es_task_handler.py | 20 ++++---
 .../elasticsearch/log/test_es_task_handler.py      | 61 +++++++++++++++++++++-
 5 files changed, 97 insertions(+), 8 deletions(-)

diff --git a/airflow/config_templates/airflow_local_settings.py b/airflow/config_templates/airflow_local_settings.py
index 3c8ffb8..b3705bf 100644
--- a/airflow/config_templates/airflow_local_settings.py
+++ b/airflow/config_templates/airflow_local_settings.py
@@ -254,6 +254,8 @@ if REMOTE_LOGGING:
         ELASTICSEARCH_WRITE_STDOUT: bool = conf.getboolean('elasticsearch', 'WRITE_STDOUT')
         ELASTICSEARCH_JSON_FORMAT: bool = conf.getboolean('elasticsearch', 'JSON_FORMAT')
         ELASTICSEARCH_JSON_FIELDS: str = conf.get('elasticsearch', 'JSON_FIELDS')
+        ELASTICSEARCH_HOST_FIELD: str = conf.get('elasticsearch', 'HOST_FIELD')
+        ELASTICSEARCH_OFFSET_FIELD: str = conf.get('elasticsearch', 'OFFSET_FIELD')
 
         ELASTIC_REMOTE_HANDLERS: Dict[str, Dict[str, Union[str, bool]]] = {
             'task': {
@@ -268,6 +270,8 @@ if REMOTE_LOGGING:
                 'write_stdout': ELASTICSEARCH_WRITE_STDOUT,
                 'json_format': ELASTICSEARCH_JSON_FORMAT,
                 'json_fields': ELASTICSEARCH_JSON_FIELDS,
+                'host_field': ELASTICSEARCH_HOST_FIELD,
+                'offset_field': ELASTICSEARCH_OFFSET_FIELD,
             },
         }
 
diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index 39d2539..dd2d48f 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -1992,6 +1992,20 @@
       type: string
       example: ~
       default: "asctime, filename, lineno, levelname, message"
+    - name: host_field
+      description: |
+        The field where host name is stored (normally either `host` or `host.name`)
+      version_added: 2.1.1
+      type: string
+      example: ~
+      default: "host"
+    - name: offset_field
+      description: |
+        The field where offset is stored (normally either `offset` or `log.offset`)
+      version_added: 2.1.1
+      type: string
+      example: ~
+      default: "offset"
 - name: elasticsearch_configs
   description: ~
   options:
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index bf033ef..f8e8588 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -989,6 +989,12 @@ json_format = False
 # Log fields to also attach to the json output, if enabled
 json_fields = asctime, filename, lineno, levelname, message
 
+# The field where host name is stored (normally either `host` or `host.name`)
+host_field = host
+
+# The field where offset is stored (normally either `offset` or `log.offset`)
+offset_field = offset
+
 [elasticsearch_configs]
 use_ssl = False
 verify_certs = True
diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py b/airflow/providers/elasticsearch/log/es_task_handler.py
index 44e72bf..16e4d65 100644
--- a/airflow/providers/elasticsearch/log/es_task_handler.py
+++ b/airflow/providers/elasticsearch/log/es_task_handler.py
@@ -20,6 +20,7 @@ import logging
 import sys
 from collections import defaultdict
 from datetime import datetime
+from operator import attrgetter
 from time import time
 from typing import List, Optional, Tuple
 from urllib.parse import quote
@@ -71,6 +72,8 @@ class ElasticsearchTaskHandler(FileTaskHandler, LoggingMixin):
         write_stdout: bool,
         json_format: bool,
         json_fields: str,
+        host_field: str = "host",
+        offset_field: str = "offset",
         host: str = "localhost:9200",
         frontend: str = "localhost:5601",
         es_kwargs: Optional[dict] = conf.getsection("elasticsearch_configs"),
@@ -94,6 +97,8 @@ class ElasticsearchTaskHandler(FileTaskHandler, LoggingMixin):
         self.write_stdout = write_stdout
         self.json_format = json_format
         self.json_fields = [label.strip() for label in json_fields.split(",")]
+        self.host_field = host_field
+        self.offset_field = offset_field
         self.handler = None
         self.context_set = False
 
@@ -122,11 +127,10 @@ class ElasticsearchTaskHandler(FileTaskHandler, LoggingMixin):
         """
         return execution_date.strftime("%Y_%m_%dT%H_%M_%S_%f")
 
-    @staticmethod
-    def _group_logs_by_host(logs):
+    def _group_logs_by_host(self, logs):
         grouped_logs = defaultdict(list)
         for log in logs:
-            key = getattr(log, 'host', 'default_host')
+            key = getattr(log, self.host_field, 'default_host')
             grouped_logs[key].append(log)
 
         # return items sorted by timestamp.
@@ -160,7 +164,7 @@ class ElasticsearchTaskHandler(FileTaskHandler, LoggingMixin):
         logs = self.es_read(log_id, offset, metadata)
         logs_by_host = self._group_logs_by_host(logs)
 
-        next_offset = offset if not logs else logs[-1].offset
+        next_offset = offset if not logs else attrgetter(self.offset_field)(logs[-1])
 
         # Ensure a string here. Large offset numbers will get JSON.parsed incorrectly
         # on the client. Sending as a string prevents this issue.
@@ -227,14 +231,16 @@ class ElasticsearchTaskHandler(FileTaskHandler, LoggingMixin):
         :type metadata: dict
         """
         # Offset is the unique key for sorting logs given log_id.
-        search = Search(using=self.client).query('match_phrase', log_id=log_id).sort('offset')
+        search = Search(using=self.client).query('match_phrase', log_id=log_id).sort(self.offset_field)
 
-        search = search.filter('range', offset={'gt': int(offset)})
+        search = search.filter('range', **{self.offset_field: {'gt': int(offset)}})
         max_log_line = search.count()
         if 'download_logs' in metadata and metadata['download_logs'] and 'max_offset' not in metadata:
             try:
                 if max_log_line > 0:
-                    metadata['max_offset'] = search[max_log_line - 1].execute()[-1].offset
+                    metadata['max_offset'] = attrgetter(self.offset_field)(
+                        search[max_log_line - 1].execute()[-1]
+                    )
                 else:
                     metadata['max_offset'] = 0
             except Exception:  # pylint: disable=broad-except
diff --git a/tests/providers/elasticsearch/log/test_es_task_handler.py b/tests/providers/elasticsearch/log/test_es_task_handler.py
index 8a3a7a2..4025722 100644
--- a/tests/providers/elasticsearch/log/test_es_task_handler.py
+++ b/tests/providers/elasticsearch/log/test_es_task_handler.py
@@ -38,7 +38,7 @@ from airflow.utils.timezone import datetime
 from .elasticmock import elasticmock
 
 
-class TestElasticsearchTaskHandler(unittest.TestCase):
+class TestElasticsearchTaskHandler(unittest.TestCase):  # pylint: disable=too-many-instance-attributes
     DAG_ID = 'dag_for_testing_file_task_handler'
     TASK_ID = 'task_for_testing_file_log_handler'
     EXECUTION_DATE = datetime(2016, 1, 1)
@@ -54,6 +54,8 @@ class TestElasticsearchTaskHandler(unittest.TestCase):
         self.write_stdout = False
         self.json_format = False
         self.json_fields = 'asctime,filename,lineno,levelname,message,exc_text'
+        self.host_field = 'host'
+        self.offset_field = 'offset'
         self.es_task_handler = ElasticsearchTaskHandler(
             self.local_log_location,
             self.filename_template,
@@ -62,6 +64,8 @@ class TestElasticsearchTaskHandler(unittest.TestCase):
             self.write_stdout,
             self.json_format,
             self.json_fields,
+            self.host_field,
+            self.offset_field,
         )
 
         self.es = elasticsearch.Elasticsearch(  # pylint: disable=invalid-name
@@ -103,6 +107,8 @@ class TestElasticsearchTaskHandler(unittest.TestCase):
             self.write_stdout,
             self.json_format,
             self.json_fields,
+            self.host_field,
+            self.offset_field,
             es_kwargs=es_conf,
         )
 
@@ -276,6 +282,55 @@ class TestElasticsearchTaskHandler(unittest.TestCase):
         )
         assert "[2020-12-24 19:25:00,962] {taskinstance.py:851} INFO - some random stuff - " == logs[0][0][1]
 
+    def test_read_with_json_format_with_custom_offset_and_host_fields(self):
+        ts = pendulum.now()
+        formatter = logging.Formatter(
+            '[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s - %(exc_text)s'
+        )
+        self.es_task_handler.formatter = formatter
+        self.es_task_handler.json_format = True
+        self.es_task_handler.host_field = "host.name"
+        self.es_task_handler.offset_field = "log.offset"
+
+        self.body = {
+            'message': self.test_message,
+            'log_id': f'{self.DAG_ID}-{self.TASK_ID}-2016_01_01T00_00_00_000000-1',
+            'log': {'offset': 1},
+            'host': {'name': 'somehostname'},
+            'asctime': '2020-12-24 19:25:00,962',
+            'filename': 'taskinstance.py',
+            'lineno': 851,
+            'levelname': 'INFO',
+        }
+        self.es_task_handler.set_context(self.ti)
+        self.es.index(index=self.index_name, doc_type=self.doc_type, body=self.body, id=id)
+
+        logs, _ = self.es_task_handler.read(
+            self.ti, 1, {'offset': 0, 'last_log_timestamp': str(ts), 'end_of_log': False}
+        )
+        assert "[2020-12-24 19:25:00,962] {taskinstance.py:851} INFO - some random stuff - " == logs[0][0][1]
+
+    def test_read_with_custom_offset_and_host_fields(self):
+        ts = pendulum.now()
+        # Delete the existing log entry as it doesn't have the new offset and host fields
+        self.es.delete(index=self.index_name, doc_type=self.doc_type, id=1)
+
+        self.es_task_handler.host_field = "host.name"
+        self.es_task_handler.offset_field = "log.offset"
+
+        self.body = {
+            'message': self.test_message,
+            'log_id': self.LOG_ID,
+            'log': {'offset': 1},
+            'host': {'name': 'somehostname'},
+        }
+        self.es.index(index=self.index_name, doc_type=self.doc_type, body=self.body, id=id)
+
+        logs, _ = self.es_task_handler.read(
+            self.ti, 1, {'offset': 0, 'last_log_timestamp': str(ts), 'end_of_log': False}
+        )
+        assert self.test_message == logs[0][0][1]
+
     def test_close(self):
         formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
         self.es_task_handler.formatter = formatter
@@ -357,6 +412,8 @@ class TestElasticsearchTaskHandler(unittest.TestCase):
             self.write_stdout,
             self.json_format,
             self.json_fields,
+            self.host_field,
+            self.offset_field,
         )
         log_id = self.es_task_handler._render_log_id(self.ti, 1)
         assert expected_log_id == log_id
@@ -382,6 +439,8 @@ class TestElasticsearchTaskHandler(unittest.TestCase):
             self.write_stdout,
             self.json_format,
             self.json_fields,
+            self.host_field,
+            self.offset_field,
             frontend=es_frontend,
         )
         url = es_task_handler.get_external_log_url(self.ti, self.ti.try_number)