You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2022/12/08 02:23:59 UTC

[airflow] branch main updated: Support restricted index patterns in Elasticsearch log handler (#23888)

This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 99bbcd3780 Support restricted index patterns in Elasticsearch log handler (#23888)
99bbcd3780 is described below

commit 99bbcd3780dd08a0794ba99eb69006c106dcf5d2
Author: Konstantinos Koukopoulos <ko...@gmail.com>
AuthorDate: Thu Dec 8 04:23:50 2022 +0200

    Support restricted index patterns in Elasticsearch log handler (#23888)
    
    Sometimes Airflow doesn't have the ability to search across all indices
    in an Elasticsearch server. This might be due to security settings in
    the server. In these cases fetching the remote logs fails. To fix this
    we create a index_patterns configuration setting that can be set to a
    more restrictive pattern.
---
 airflow/config_templates/config.yml                |  7 +++
 airflow/config_templates/default_airflow.cfg       |  4 ++
 .../providers/elasticsearch/log/es_task_handler.py |  8 +++-
 .../log/elasticmock/fake_elasticsearch.py          | 25 +++++++---
 .../elasticsearch/log/test_es_task_handler.py      | 54 ++++++++++++++++++++++
 5 files changed, 91 insertions(+), 7 deletions(-)

diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index 009f6a4846..b15981aab9 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -2317,6 +2317,13 @@
       type: string
       example: ~
       default: "offset"
+    - name: index_patterns
+      description: |
+        Comma separated list of index patterns to use when searching for logs (default: `_all`).
+      version_added: 2.6.0
+      type: string
+      example: something-*
+      default: "_all"
 - name: elasticsearch_configs
   description: ~
   options:
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index 1cff5f8dbe..146d0077a9 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -1163,6 +1163,10 @@ host_field = host
 # The field where offset is stored (normally either `offset` or `log.offset`)
 offset_field = offset
 
+# Comma separated list of index patterns to use when searching for logs (default: `_all`).
+# Example: index_patterns = something-*
+index_patterns = _all
+
 [elasticsearch_configs]
 use_ssl = False
 verify_certs = True
diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py b/airflow/providers/elasticsearch/log/es_task_handler.py
index ecd964a147..c0fecd7f2a 100644
--- a/airflow/providers/elasticsearch/log/es_task_handler.py
+++ b/airflow/providers/elasticsearch/log/es_task_handler.py
@@ -83,6 +83,7 @@ class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMix
         offset_field: str = "offset",
         host: str = "localhost:9200",
         frontend: str = "localhost:5601",
+        index_patterns: str | None = conf.get("elasticsearch", "index_patterns", fallback="_all"),
         es_kwargs: dict | None = conf.getsection("elasticsearch_configs"),
         *,
         filename_template: str | None = None,
@@ -114,6 +115,7 @@ class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMix
         self.json_fields = [label.strip() for label in json_fields.split(",")]
         self.host_field = host_field
         self.offset_field = offset_field
+        self.index_patterns = index_patterns
         self.context_set = False
 
         self.formatter: logging.Formatter
@@ -282,7 +284,11 @@ class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMix
         :param metadata: log metadata, used for steaming log download.
         """
         # Offset is the unique key for sorting logs given log_id.
-        search = Search(using=self.client).query("match_phrase", log_id=log_id).sort(self.offset_field)
+        search = (
+            Search(index=self.index_patterns, using=self.client)
+            .query("match_phrase", log_id=log_id)
+            .sort(self.offset_field)
+        )
 
         search = search.filter("range", **{self.offset_field: {"gt": int(offset)}})
         max_log_line = search.count()
diff --git a/tests/providers/elasticsearch/log/elasticmock/fake_elasticsearch.py b/tests/providers/elasticsearch/log/elasticmock/fake_elasticsearch.py
index b325abb1ea..c7887b864f 100644
--- a/tests/providers/elasticsearch/log/elasticmock/fake_elasticsearch.py
+++ b/tests/providers/elasticsearch/log/elasticmock/fake_elasticsearch.py
@@ -16,6 +16,7 @@
 # under the License.
 from __future__ import annotations
 
+import fnmatch
 import json
 
 from elasticsearch import Elasticsearch
@@ -363,6 +364,19 @@ class FakeElasticsearch(Elasticsearch):
                     # use in as a proxy for match_phrase
                     matches.append(document)
 
+    # Check index(es) exists.
+    def _validate_search_targets(self, targets):
+        # TODO: support allow_no_indices query parameter
+        matches = set()
+        for target in targets:
+            if target == "_all" or target == "":
+                matches.update(self.__documents_dict)
+            elif "*" in target:
+                matches.update(fnmatch.filter(self.__documents_dict, target))
+            elif target not in self.__documents_dict:
+                raise NotFoundError(404, f"IndexMissingException[[{target}] missing]")
+        return matches
+
     def _normalize_index_to_list(self, index):
         # Ensure to have a list of index
         if index is None:
@@ -375,12 +389,11 @@ class FakeElasticsearch(Elasticsearch):
             # Is it the correct exception to use ?
             raise ValueError("Invalid param 'index'")
 
-        # Check index(es) exists
-        for searchable_index in searchable_indexes:
-            if searchable_index not in self.__documents_dict:
-                raise NotFoundError(404, f"IndexMissingException[[{searchable_index}] missing]")
-
-        return searchable_indexes
+        return list(
+            self._validate_search_targets(
+                target for index in searchable_indexes for target in index.split(",")
+            )
+        )
 
     @staticmethod
     def _normalize_doc_type_to_list(doc_type):
diff --git a/tests/providers/elasticsearch/log/test_es_task_handler.py b/tests/providers/elasticsearch/log/test_es_task_handler.py
index bdf274732e..0d0e4e9cfa 100644
--- a/tests/providers/elasticsearch/log/test_es_task_handler.py
+++ b/tests/providers/elasticsearch/log/test_es_task_handler.py
@@ -105,6 +105,7 @@ class TestElasticsearchTaskHandler:
 
     def test_client(self):
         assert isinstance(self.es_task_handler.client, elasticsearch.Elasticsearch)
+        assert self.es_task_handler.index_patterns == "_all"
 
     def test_client_with_config(self):
         es_conf = dict(conf.getsection("elasticsearch_configs"))
@@ -125,6 +126,21 @@ class TestElasticsearchTaskHandler:
             es_kwargs=es_conf,
         )
 
+    def test_client_with_patterns(self):
+        # ensure creating with index patterns does not fail
+        patterns = "test_*,other_*"
+        handler = ElasticsearchTaskHandler(
+            base_log_folder=self.local_log_location,
+            end_of_log_mark=self.end_of_log_mark,
+            write_stdout=self.write_stdout,
+            json_format=self.json_format,
+            json_fields=self.json_fields,
+            host_field=self.host_field,
+            offset_field=self.offset_field,
+            index_patterns=patterns,
+        )
+        assert handler.index_patterns == patterns
+
     def test_read(self, ti):
         ts = pendulum.now()
         logs, metadatas = self.es_task_handler.read(
@@ -139,6 +155,44 @@ class TestElasticsearchTaskHandler:
         assert "1" == metadatas[0]["offset"]
         assert timezone.parse(metadatas[0]["last_log_timestamp"]) > ts
 
+    def test_read_with_patterns(self, ti):
+        ts = pendulum.now()
+        with mock.patch.object(self.es_task_handler, "index_patterns", new="test_*,other_*"):
+            logs, metadatas = self.es_task_handler.read(
+                ti, 1, {"offset": 0, "last_log_timestamp": str(ts), "end_of_log": False}
+            )
+
+        assert 1 == len(logs)
+        assert len(logs) == len(metadatas)
+        assert len(logs[0]) == 1
+        assert self.test_message == logs[0][0][-1]
+        assert not metadatas[0]["end_of_log"]
+        assert "1" == metadatas[0]["offset"]
+        assert timezone.parse(metadatas[0]["last_log_timestamp"]) > ts
+
+    def test_read_with_patterns_no_match(self, ti):
+        ts = pendulum.now()
+        with mock.patch.object(self.es_task_handler, "index_patterns", new="test_other_*,test_another_*"):
+            logs, metadatas = self.es_task_handler.read(
+                ti, 1, {"offset": 0, "last_log_timestamp": str(ts), "end_of_log": False}
+            )
+
+        assert 1 == len(logs)
+        assert len(logs) == len(metadatas)
+        assert [[]] == logs
+        assert not metadatas[0]["end_of_log"]
+        assert "0" == metadatas[0]["offset"]
+        # last_log_timestamp won't change if no log lines read.
+        assert timezone.parse(metadatas[0]["last_log_timestamp"]) == ts
+
+    def test_read_with_missing_index(self, ti):
+        ts = pendulum.now()
+        with mock.patch.object(self.es_task_handler, "index_patterns", new="nonexistent,test_*"):
+            with pytest.raises(elasticsearch.exceptions.NotFoundError, match=r".*nonexistent.*"):
+                self.es_task_handler.read(
+                    ti, 1, {"offset": 0, "last_log_timestamp": str(ts), "end_of_log": False}
+                )
+
     @pytest.mark.parametrize("seconds", [3, 6])
     def test_read_missing_logs(self, seconds, create_task_instance):
         """