You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ds...@apache.org on 2023/01/12 08:20:49 UTC
[airflow] branch main updated: Allow nested attr in elasticsearch host_field (#28878)
This is an automated email from the ASF dual-hosted git repository.
dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ca8249f4a5 Allow nested attr in elasticsearch host_field (#28878)
ca8249f4a5 is described below
commit ca8249f4a5cb22b091738128e0fcee87ab31638b
Author: Daniel Standish <15...@users.noreply.github.com>
AuthorDate: Thu Jan 12 00:20:38 2023 -0800
Allow nested attr in elasticsearch host_field (#28878)
Sometimes we may need to use nested field e.g. with filebeat:
AIRFLOW__ELASTICSEARCH__HOST_FIELD=host.name
Currently this will not fail but will return "default_host" -- the default value.
---
airflow/providers/elasticsearch/log/es_task_handler.py | 17 ++++++++++++++++-
.../elasticsearch/log/test_es_task_handler.py | 18 +++++++++++++++++-
2 files changed, 33 insertions(+), 2 deletions(-)
diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py b/airflow/providers/elasticsearch/log/es_task_handler.py
index c0fecd7f2a..fce2f18814 100644
--- a/airflow/providers/elasticsearch/log/es_task_handler.py
+++ b/airflow/providers/elasticsearch/log/es_task_handler.py
@@ -179,7 +179,7 @@ class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMix
def _group_logs_by_host(self, logs):
grouped_logs = defaultdict(list)
for log in logs:
- key = getattr(log, self.host_field, "default_host")
+ key = getattr_nested(log, self.host_field, None) or "default_host"
grouped_logs[key].append(log)
return grouped_logs
@@ -407,3 +407,18 @@ class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMix
def supports_external_link(self) -> bool:
"""Whether we can support external links"""
return bool(self.frontend)
+
+
+def getattr_nested(obj, item, default):
+ """
+ Get item from obj but return default if not found
+
+ E.g. calling ``getattr_nested(a, 'b.c', "NA")`` will return
+ ``a.b.c`` if such a value exists, and "NA" otherwise.
+
+ :meta private:
+ """
+ try:
+ return attrgetter(item)(obj)
+ except AttributeError:
+ return default
diff --git a/tests/providers/elasticsearch/log/test_es_task_handler.py b/tests/providers/elasticsearch/log/test_es_task_handler.py
index eade093131..e3ba8f47ae 100644
--- a/tests/providers/elasticsearch/log/test_es_task_handler.py
+++ b/tests/providers/elasticsearch/log/test_es_task_handler.py
@@ -31,7 +31,7 @@ import pendulum
import pytest
from airflow.configuration import conf
-from airflow.providers.elasticsearch.log.es_task_handler import ElasticsearchTaskHandler
+from airflow.providers.elasticsearch.log.es_task_handler import ElasticsearchTaskHandler, getattr_nested
from airflow.utils import timezone
from airflow.utils.state import DagRunState, TaskInstanceState
from airflow.utils.timezone import datetime
@@ -590,3 +590,19 @@ class TestElasticsearchTaskHandler:
assert first_log["asctime"] == t1.format("YYYY-MM-DDTHH:mm:ss.SSSZZ")
assert second_log["asctime"] == t2.format("YYYY-MM-DDTHH:mm:ss.SSSZZ")
assert third_log["asctime"] == t3.format("YYYY-MM-DDTHH:mm:ss.SSSZZ")
+
+
+def test_safe_attrgetter():
+ class A:
+ ...
+
+ a = A()
+ a.b = "b"
+ a.c = None
+ a.x = a
+ a.x.d = "blah"
+ assert getattr_nested(a, "b", None) == "b" # regular getattr
+ assert getattr_nested(a, "x.d", None) == "blah" # nested val
+ assert getattr_nested(a, "aa", "heya") == "heya" # respects non-none default
+ assert getattr_nested(a, "c", "heya") is None # respects none value
+ assert getattr_nested(a, "aa", None) is None # respects none default