You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2022/12/08 02:23:59 UTC
[airflow] branch main updated: Support restricted index patterns in Elasticsearch log handler (#23888)
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 99bbcd3780 Support restricted index patterns in Elasticsearch log handler (#23888)
99bbcd3780 is described below
commit 99bbcd3780dd08a0794ba99eb69006c106dcf5d2
Author: Konstantinos Koukopoulos <ko...@gmail.com>
AuthorDate: Thu Dec 8 04:23:50 2022 +0200
Support restricted index patterns in Elasticsearch log handler (#23888)
Sometimes Airflow doesn't have the ability to search across all indices
in an Elasticsearch server. This might be due to security settings in
the server. In these cases fetching the remote logs fails. To fix this
we create a index_patterns configuration setting that can be set to a
more restrictive pattern.
---
airflow/config_templates/config.yml | 7 +++
airflow/config_templates/default_airflow.cfg | 4 ++
.../providers/elasticsearch/log/es_task_handler.py | 8 +++-
.../log/elasticmock/fake_elasticsearch.py | 25 +++++++---
.../elasticsearch/log/test_es_task_handler.py | 54 ++++++++++++++++++++++
5 files changed, 91 insertions(+), 7 deletions(-)
diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index 009f6a4846..b15981aab9 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -2317,6 +2317,13 @@
type: string
example: ~
default: "offset"
+ - name: index_patterns
+ description: |
+ Comma separated list of index patterns to use when searching for logs (default: `_all`).
+ version_added: 2.6.0
+ type: string
+ example: something-*
+ default: "_all"
- name: elasticsearch_configs
description: ~
options:
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index 1cff5f8dbe..146d0077a9 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -1163,6 +1163,10 @@ host_field = host
# The field where offset is stored (normally either `offset` or `log.offset`)
offset_field = offset
+# Comma separated list of index patterns to use when searching for logs (default: `_all`).
+# Example: index_patterns = something-*
+index_patterns = _all
+
[elasticsearch_configs]
use_ssl = False
verify_certs = True
diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py b/airflow/providers/elasticsearch/log/es_task_handler.py
index ecd964a147..c0fecd7f2a 100644
--- a/airflow/providers/elasticsearch/log/es_task_handler.py
+++ b/airflow/providers/elasticsearch/log/es_task_handler.py
@@ -83,6 +83,7 @@ class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMix
offset_field: str = "offset",
host: str = "localhost:9200",
frontend: str = "localhost:5601",
+ index_patterns: str | None = conf.get("elasticsearch", "index_patterns", fallback="_all"),
es_kwargs: dict | None = conf.getsection("elasticsearch_configs"),
*,
filename_template: str | None = None,
@@ -114,6 +115,7 @@ class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMix
self.json_fields = [label.strip() for label in json_fields.split(",")]
self.host_field = host_field
self.offset_field = offset_field
+ self.index_patterns = index_patterns
self.context_set = False
self.formatter: logging.Formatter
@@ -282,7 +284,11 @@ class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMix
:param metadata: log metadata, used for steaming log download.
"""
# Offset is the unique key for sorting logs given log_id.
- search = Search(using=self.client).query("match_phrase", log_id=log_id).sort(self.offset_field)
+ search = (
+ Search(index=self.index_patterns, using=self.client)
+ .query("match_phrase", log_id=log_id)
+ .sort(self.offset_field)
+ )
search = search.filter("range", **{self.offset_field: {"gt": int(offset)}})
max_log_line = search.count()
diff --git a/tests/providers/elasticsearch/log/elasticmock/fake_elasticsearch.py b/tests/providers/elasticsearch/log/elasticmock/fake_elasticsearch.py
index b325abb1ea..c7887b864f 100644
--- a/tests/providers/elasticsearch/log/elasticmock/fake_elasticsearch.py
+++ b/tests/providers/elasticsearch/log/elasticmock/fake_elasticsearch.py
@@ -16,6 +16,7 @@
# under the License.
from __future__ import annotations
+import fnmatch
import json
from elasticsearch import Elasticsearch
@@ -363,6 +364,19 @@ class FakeElasticsearch(Elasticsearch):
# use in as a proxy for match_phrase
matches.append(document)
+ # Check index(es) exists.
+ def _validate_search_targets(self, targets):
+ # TODO: support allow_no_indices query parameter
+ matches = set()
+ for target in targets:
+ if target == "_all" or target == "":
+ matches.update(self.__documents_dict)
+ elif "*" in target:
+ matches.update(fnmatch.filter(self.__documents_dict, target))
+ elif target not in self.__documents_dict:
+ raise NotFoundError(404, f"IndexMissingException[[{target}] missing]")
+ return matches
+
def _normalize_index_to_list(self, index):
# Ensure to have a list of index
if index is None:
@@ -375,12 +389,11 @@ class FakeElasticsearch(Elasticsearch):
# Is it the correct exception to use ?
raise ValueError("Invalid param 'index'")
- # Check index(es) exists
- for searchable_index in searchable_indexes:
- if searchable_index not in self.__documents_dict:
- raise NotFoundError(404, f"IndexMissingException[[{searchable_index}] missing]")
-
- return searchable_indexes
+ return list(
+ self._validate_search_targets(
+ target for index in searchable_indexes for target in index.split(",")
+ )
+ )
@staticmethod
def _normalize_doc_type_to_list(doc_type):
diff --git a/tests/providers/elasticsearch/log/test_es_task_handler.py b/tests/providers/elasticsearch/log/test_es_task_handler.py
index bdf274732e..0d0e4e9cfa 100644
--- a/tests/providers/elasticsearch/log/test_es_task_handler.py
+++ b/tests/providers/elasticsearch/log/test_es_task_handler.py
@@ -105,6 +105,7 @@ class TestElasticsearchTaskHandler:
def test_client(self):
assert isinstance(self.es_task_handler.client, elasticsearch.Elasticsearch)
+ assert self.es_task_handler.index_patterns == "_all"
def test_client_with_config(self):
es_conf = dict(conf.getsection("elasticsearch_configs"))
@@ -125,6 +126,21 @@ class TestElasticsearchTaskHandler:
es_kwargs=es_conf,
)
+ def test_client_with_patterns(self):
+ # ensure creating with index patterns does not fail
+ patterns = "test_*,other_*"
+ handler = ElasticsearchTaskHandler(
+ base_log_folder=self.local_log_location,
+ end_of_log_mark=self.end_of_log_mark,
+ write_stdout=self.write_stdout,
+ json_format=self.json_format,
+ json_fields=self.json_fields,
+ host_field=self.host_field,
+ offset_field=self.offset_field,
+ index_patterns=patterns,
+ )
+ assert handler.index_patterns == patterns
+
def test_read(self, ti):
ts = pendulum.now()
logs, metadatas = self.es_task_handler.read(
@@ -139,6 +155,44 @@ class TestElasticsearchTaskHandler:
assert "1" == metadatas[0]["offset"]
assert timezone.parse(metadatas[0]["last_log_timestamp"]) > ts
+ def test_read_with_patterns(self, ti):
+ ts = pendulum.now()
+ with mock.patch.object(self.es_task_handler, "index_patterns", new="test_*,other_*"):
+ logs, metadatas = self.es_task_handler.read(
+ ti, 1, {"offset": 0, "last_log_timestamp": str(ts), "end_of_log": False}
+ )
+
+ assert 1 == len(logs)
+ assert len(logs) == len(metadatas)
+ assert len(logs[0]) == 1
+ assert self.test_message == logs[0][0][-1]
+ assert not metadatas[0]["end_of_log"]
+ assert "1" == metadatas[0]["offset"]
+ assert timezone.parse(metadatas[0]["last_log_timestamp"]) > ts
+
+ def test_read_with_patterns_no_match(self, ti):
+ ts = pendulum.now()
+ with mock.patch.object(self.es_task_handler, "index_patterns", new="test_other_*,test_another_*"):
+ logs, metadatas = self.es_task_handler.read(
+ ti, 1, {"offset": 0, "last_log_timestamp": str(ts), "end_of_log": False}
+ )
+
+ assert 1 == len(logs)
+ assert len(logs) == len(metadatas)
+ assert [[]] == logs
+ assert not metadatas[0]["end_of_log"]
+ assert "0" == metadatas[0]["offset"]
+ # last_log_timestamp won't change if no log lines read.
+ assert timezone.parse(metadatas[0]["last_log_timestamp"]) == ts
+
+ def test_read_with_missing_index(self, ti):
+ ts = pendulum.now()
+ with mock.patch.object(self.es_task_handler, "index_patterns", new="nonexistent,test_*"):
+ with pytest.raises(elasticsearch.exceptions.NotFoundError, match=r".*nonexistent.*"):
+ self.es_task_handler.read(
+ ti, 1, {"offset": 0, "last_log_timestamp": str(ts), "end_of_log": False}
+ )
+
@pytest.mark.parametrize("seconds", [3, 6])
def test_read_missing_logs(self, seconds, create_task_instance):
"""