You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2021/06/22 13:46:13 UTC
[airflow] 23/38: Support remote logging in elasticsearch with
filebeat 7 (#14625)
This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 77060cdeeb6b82f4aa7891e9feb0cea677857525
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Fri Jun 11 12:32:42 2021 -0600
Support remote logging in elasticsearch with filebeat 7 (#14625)
Filebeat 7 renamed some fields (offset->log.offset and host->host.name),
so allow the field names Airflow uses to be configured.
Airflow isn't directly involved with getting the logs _to_
elasticsearch, so we should allow easy configuration to accomodate
whatever tools are used in that process.
(cherry picked from commit 5cd0bf733b839951c075c54e808a595ac923c4e8)
---
airflow/config_templates/airflow_local_settings.py | 4 ++
airflow/config_templates/config.yml | 14 +++++
airflow/config_templates/default_airflow.cfg | 6 +++
.../providers/elasticsearch/log/es_task_handler.py | 20 ++++---
.../elasticsearch/log/test_es_task_handler.py | 61 +++++++++++++++++++++-
5 files changed, 97 insertions(+), 8 deletions(-)
diff --git a/airflow/config_templates/airflow_local_settings.py b/airflow/config_templates/airflow_local_settings.py
index 3c8ffb8..b3705bf 100644
--- a/airflow/config_templates/airflow_local_settings.py
+++ b/airflow/config_templates/airflow_local_settings.py
@@ -254,6 +254,8 @@ if REMOTE_LOGGING:
ELASTICSEARCH_WRITE_STDOUT: bool = conf.getboolean('elasticsearch', 'WRITE_STDOUT')
ELASTICSEARCH_JSON_FORMAT: bool = conf.getboolean('elasticsearch', 'JSON_FORMAT')
ELASTICSEARCH_JSON_FIELDS: str = conf.get('elasticsearch', 'JSON_FIELDS')
+ ELASTICSEARCH_HOST_FIELD: str = conf.get('elasticsearch', 'HOST_FIELD')
+ ELASTICSEARCH_OFFSET_FIELD: str = conf.get('elasticsearch', 'OFFSET_FIELD')
ELASTIC_REMOTE_HANDLERS: Dict[str, Dict[str, Union[str, bool]]] = {
'task': {
@@ -268,6 +270,8 @@ if REMOTE_LOGGING:
'write_stdout': ELASTICSEARCH_WRITE_STDOUT,
'json_format': ELASTICSEARCH_JSON_FORMAT,
'json_fields': ELASTICSEARCH_JSON_FIELDS,
+ 'host_field': ELASTICSEARCH_HOST_FIELD,
+ 'offset_field': ELASTICSEARCH_OFFSET_FIELD,
},
}
diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index 39d2539..dd2d48f 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -1992,6 +1992,20 @@
type: string
example: ~
default: "asctime, filename, lineno, levelname, message"
+ - name: host_field
+ description: |
+ The field where host name is stored (normally either `host` or `host.name`)
+ version_added: 2.1.1
+ type: string
+ example: ~
+ default: "host"
+ - name: offset_field
+ description: |
+ The field where offset is stored (normally either `offset` or `log.offset`)
+ version_added: 2.1.1
+ type: string
+ example: ~
+ default: "offset"
- name: elasticsearch_configs
description: ~
options:
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index bf033ef..f8e8588 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -989,6 +989,12 @@ json_format = False
# Log fields to also attach to the json output, if enabled
json_fields = asctime, filename, lineno, levelname, message
+# The field where host name is stored (normally either `host` or `host.name`)
+host_field = host
+
+# The field where offset is stored (normally either `offset` or `log.offset`)
+offset_field = offset
+
[elasticsearch_configs]
use_ssl = False
verify_certs = True
diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py b/airflow/providers/elasticsearch/log/es_task_handler.py
index 44e72bf..16e4d65 100644
--- a/airflow/providers/elasticsearch/log/es_task_handler.py
+++ b/airflow/providers/elasticsearch/log/es_task_handler.py
@@ -20,6 +20,7 @@ import logging
import sys
from collections import defaultdict
from datetime import datetime
+from operator import attrgetter
from time import time
from typing import List, Optional, Tuple
from urllib.parse import quote
@@ -71,6 +72,8 @@ class ElasticsearchTaskHandler(FileTaskHandler, LoggingMixin):
write_stdout: bool,
json_format: bool,
json_fields: str,
+ host_field: str = "host",
+ offset_field: str = "offset",
host: str = "localhost:9200",
frontend: str = "localhost:5601",
es_kwargs: Optional[dict] = conf.getsection("elasticsearch_configs"),
@@ -94,6 +97,8 @@ class ElasticsearchTaskHandler(FileTaskHandler, LoggingMixin):
self.write_stdout = write_stdout
self.json_format = json_format
self.json_fields = [label.strip() for label in json_fields.split(",")]
+ self.host_field = host_field
+ self.offset_field = offset_field
self.handler = None
self.context_set = False
@@ -122,11 +127,10 @@ class ElasticsearchTaskHandler(FileTaskHandler, LoggingMixin):
"""
return execution_date.strftime("%Y_%m_%dT%H_%M_%S_%f")
- @staticmethod
- def _group_logs_by_host(logs):
+ def _group_logs_by_host(self, logs):
grouped_logs = defaultdict(list)
for log in logs:
- key = getattr(log, 'host', 'default_host')
+ key = getattr(log, self.host_field, 'default_host')
grouped_logs[key].append(log)
# return items sorted by timestamp.
@@ -160,7 +164,7 @@ class ElasticsearchTaskHandler(FileTaskHandler, LoggingMixin):
logs = self.es_read(log_id, offset, metadata)
logs_by_host = self._group_logs_by_host(logs)
- next_offset = offset if not logs else logs[-1].offset
+ next_offset = offset if not logs else attrgetter(self.offset_field)(logs[-1])
# Ensure a string here. Large offset numbers will get JSON.parsed incorrectly
# on the client. Sending as a string prevents this issue.
@@ -227,14 +231,16 @@ class ElasticsearchTaskHandler(FileTaskHandler, LoggingMixin):
:type metadata: dict
"""
# Offset is the unique key for sorting logs given log_id.
- search = Search(using=self.client).query('match_phrase', log_id=log_id).sort('offset')
+ search = Search(using=self.client).query('match_phrase', log_id=log_id).sort(self.offset_field)
- search = search.filter('range', offset={'gt': int(offset)})
+ search = search.filter('range', **{self.offset_field: {'gt': int(offset)}})
max_log_line = search.count()
if 'download_logs' in metadata and metadata['download_logs'] and 'max_offset' not in metadata:
try:
if max_log_line > 0:
- metadata['max_offset'] = search[max_log_line - 1].execute()[-1].offset
+ metadata['max_offset'] = attrgetter(self.offset_field)(
+ search[max_log_line - 1].execute()[-1]
+ )
else:
metadata['max_offset'] = 0
except Exception: # pylint: disable=broad-except
diff --git a/tests/providers/elasticsearch/log/test_es_task_handler.py b/tests/providers/elasticsearch/log/test_es_task_handler.py
index 8a3a7a2..4025722 100644
--- a/tests/providers/elasticsearch/log/test_es_task_handler.py
+++ b/tests/providers/elasticsearch/log/test_es_task_handler.py
@@ -38,7 +38,7 @@ from airflow.utils.timezone import datetime
from .elasticmock import elasticmock
-class TestElasticsearchTaskHandler(unittest.TestCase):
+class TestElasticsearchTaskHandler(unittest.TestCase): # pylint: disable=too-many-instance-attributes
DAG_ID = 'dag_for_testing_file_task_handler'
TASK_ID = 'task_for_testing_file_log_handler'
EXECUTION_DATE = datetime(2016, 1, 1)
@@ -54,6 +54,8 @@ class TestElasticsearchTaskHandler(unittest.TestCase):
self.write_stdout = False
self.json_format = False
self.json_fields = 'asctime,filename,lineno,levelname,message,exc_text'
+ self.host_field = 'host'
+ self.offset_field = 'offset'
self.es_task_handler = ElasticsearchTaskHandler(
self.local_log_location,
self.filename_template,
@@ -62,6 +64,8 @@ class TestElasticsearchTaskHandler(unittest.TestCase):
self.write_stdout,
self.json_format,
self.json_fields,
+ self.host_field,
+ self.offset_field,
)
self.es = elasticsearch.Elasticsearch( # pylint: disable=invalid-name
@@ -103,6 +107,8 @@ class TestElasticsearchTaskHandler(unittest.TestCase):
self.write_stdout,
self.json_format,
self.json_fields,
+ self.host_field,
+ self.offset_field,
es_kwargs=es_conf,
)
@@ -276,6 +282,55 @@ class TestElasticsearchTaskHandler(unittest.TestCase):
)
assert "[2020-12-24 19:25:00,962] {taskinstance.py:851} INFO - some random stuff - " == logs[0][0][1]
+ def test_read_with_json_format_with_custom_offset_and_host_fields(self):
+ ts = pendulum.now()
+ formatter = logging.Formatter(
+ '[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s - %(exc_text)s'
+ )
+ self.es_task_handler.formatter = formatter
+ self.es_task_handler.json_format = True
+ self.es_task_handler.host_field = "host.name"
+ self.es_task_handler.offset_field = "log.offset"
+
+ self.body = {
+ 'message': self.test_message,
+ 'log_id': f'{self.DAG_ID}-{self.TASK_ID}-2016_01_01T00_00_00_000000-1',
+ 'log': {'offset': 1},
+ 'host': {'name': 'somehostname'},
+ 'asctime': '2020-12-24 19:25:00,962',
+ 'filename': 'taskinstance.py',
+ 'lineno': 851,
+ 'levelname': 'INFO',
+ }
+ self.es_task_handler.set_context(self.ti)
+ self.es.index(index=self.index_name, doc_type=self.doc_type, body=self.body, id=id)
+
+ logs, _ = self.es_task_handler.read(
+ self.ti, 1, {'offset': 0, 'last_log_timestamp': str(ts), 'end_of_log': False}
+ )
+ assert "[2020-12-24 19:25:00,962] {taskinstance.py:851} INFO - some random stuff - " == logs[0][0][1]
+
+ def test_read_with_custom_offset_and_host_fields(self):
+ ts = pendulum.now()
+ # Delete the existing log entry as it doesn't have the new offset and host fields
+ self.es.delete(index=self.index_name, doc_type=self.doc_type, id=1)
+
+ self.es_task_handler.host_field = "host.name"
+ self.es_task_handler.offset_field = "log.offset"
+
+ self.body = {
+ 'message': self.test_message,
+ 'log_id': self.LOG_ID,
+ 'log': {'offset': 1},
+ 'host': {'name': 'somehostname'},
+ }
+ self.es.index(index=self.index_name, doc_type=self.doc_type, body=self.body, id=id)
+
+ logs, _ = self.es_task_handler.read(
+ self.ti, 1, {'offset': 0, 'last_log_timestamp': str(ts), 'end_of_log': False}
+ )
+ assert self.test_message == logs[0][0][1]
+
def test_close(self):
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
self.es_task_handler.formatter = formatter
@@ -357,6 +412,8 @@ class TestElasticsearchTaskHandler(unittest.TestCase):
self.write_stdout,
self.json_format,
self.json_fields,
+ self.host_field,
+ self.offset_field,
)
log_id = self.es_task_handler._render_log_id(self.ti, 1)
assert expected_log_id == log_id
@@ -382,6 +439,8 @@ class TestElasticsearchTaskHandler(unittest.TestCase):
self.write_stdout,
self.json_format,
self.json_fields,
+ self.host_field,
+ self.offset_field,
frontend=es_frontend,
)
url = es_task_handler.get_external_log_url(self.ti, self.ti.try_number)