You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ds...@apache.org on 2023/01/12 08:20:49 UTC

[airflow] branch main updated: Allow nested attr in elasticsearch host_field (#28878)

This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new ca8249f4a5 Allow nested attr in elasticsearch host_field (#28878)
ca8249f4a5 is described below

commit ca8249f4a5cb22b091738128e0fcee87ab31638b
Author: Daniel Standish <15...@users.noreply.github.com>
AuthorDate: Thu Jan 12 00:20:38 2023 -0800

    Allow nested attr in elasticsearch host_field (#28878)
    
    Sometimes we may need to use nested field e.g. with filebeat:
    
    AIRFLOW__ELASTICSEARCH__HOST_FIELD=host.name
    
    Currently this will not fail but will return "default_host" -- the default value.
---
 airflow/providers/elasticsearch/log/es_task_handler.py | 17 ++++++++++++++++-
 .../elasticsearch/log/test_es_task_handler.py          | 18 +++++++++++++++++-
 2 files changed, 33 insertions(+), 2 deletions(-)

diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py b/airflow/providers/elasticsearch/log/es_task_handler.py
index c0fecd7f2a..fce2f18814 100644
--- a/airflow/providers/elasticsearch/log/es_task_handler.py
+++ b/airflow/providers/elasticsearch/log/es_task_handler.py
@@ -179,7 +179,7 @@ class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMix
     def _group_logs_by_host(self, logs):
         grouped_logs = defaultdict(list)
         for log in logs:
-            key = getattr(log, self.host_field, "default_host")
+            key = getattr_nested(log, self.host_field, None) or "default_host"
             grouped_logs[key].append(log)
 
         return grouped_logs
@@ -407,3 +407,18 @@ class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMix
     def supports_external_link(self) -> bool:
         """Whether we can support external links"""
         return bool(self.frontend)
+
+
+def getattr_nested(obj, item, default):
+    """
+    Get item from obj but return default if not found
+
+    E.g. calling ``getattr_nested(a, 'b.c', "NA")`` will return
+    ``a.b.c`` if such a value exists, and "NA" otherwise.
+
+    :meta private:
+    """
+    try:
+        return attrgetter(item)(obj)
+    except AttributeError:
+        return default
diff --git a/tests/providers/elasticsearch/log/test_es_task_handler.py b/tests/providers/elasticsearch/log/test_es_task_handler.py
index eade093131..e3ba8f47ae 100644
--- a/tests/providers/elasticsearch/log/test_es_task_handler.py
+++ b/tests/providers/elasticsearch/log/test_es_task_handler.py
@@ -31,7 +31,7 @@ import pendulum
 import pytest
 
 from airflow.configuration import conf
-from airflow.providers.elasticsearch.log.es_task_handler import ElasticsearchTaskHandler
+from airflow.providers.elasticsearch.log.es_task_handler import ElasticsearchTaskHandler, getattr_nested
 from airflow.utils import timezone
 from airflow.utils.state import DagRunState, TaskInstanceState
 from airflow.utils.timezone import datetime
@@ -590,3 +590,19 @@ class TestElasticsearchTaskHandler:
         assert first_log["asctime"] == t1.format("YYYY-MM-DDTHH:mm:ss.SSSZZ")
         assert second_log["asctime"] == t2.format("YYYY-MM-DDTHH:mm:ss.SSSZZ")
         assert third_log["asctime"] == t3.format("YYYY-MM-DDTHH:mm:ss.SSSZZ")
+
+
+def test_safe_attrgetter():
+    class A:
+        ...
+
+    a = A()
+    a.b = "b"
+    a.c = None
+    a.x = a
+    a.x.d = "blah"
+    assert getattr_nested(a, "b", None) == "b"  # regular getattr
+    assert getattr_nested(a, "x.d", None) == "blah"  # nested val
+    assert getattr_nested(a, "aa", "heya") == "heya"  # respects non-none default
+    assert getattr_nested(a, "c", "heya") is None  # respects none value
+    assert getattr_nested(a, "aa", None) is None  # respects none default