You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/07/08 21:51:51 UTC
[airflow] branch main updated: Adding ElasticserachPythonHook - ES Hook With The Python Client (#24895)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 2ddc100405 Adding ElasticserachPythonHook - ES Hook With The Python Client (#24895)
2ddc100405 is described below
commit 2ddc1004050464c112c18fee81b03f87a7a11610
Author: Alex Kruchkov <36...@users.noreply.github.com>
AuthorDate: Sat Jul 9 00:51:45 2022 +0300
Adding ElasticserachPythonHook - ES Hook With The Python Client (#24895)
---
.../providers/elasticsearch/hooks/elasticsearch.py | 64 ++++++++++++++++++++-
.../hooks/elasticsearch_python_hook.rst | 41 ++++++++++++++
.../hooks/elasticsearch_sql_hook.rst | 32 +++++++++++
.../hooks/index.rst | 25 ++++++++
.../index.rst | 1 +
tests/always/test_project_structure.py | 3 +
.../elasticsearch/hooks/test_elasticsearch.py | 66 ++++++++++++++++++++--
.../elasticsearch/example_elasticsearch_query.py | 24 +++++++-
8 files changed, 246 insertions(+), 10 deletions(-)
diff --git a/airflow/providers/elasticsearch/hooks/elasticsearch.py b/airflow/providers/elasticsearch/hooks/elasticsearch.py
index 493ca7f082..cbad370482 100644
--- a/airflow/providers/elasticsearch/hooks/elasticsearch.py
+++ b/airflow/providers/elasticsearch/hooks/elasticsearch.py
@@ -15,16 +15,19 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+import warnings
+from typing import Any, Dict, List, Optional
-from typing import Optional
-
+from elasticsearch import Elasticsearch
from es.elastic.api import Connection as ESConnection, connect
+from airflow.compat.functools import cached_property
+from airflow.hooks.base import BaseHook
from airflow.models.connection import Connection as AirflowConnection
from airflow.providers.common.sql.hooks.sql import DbApiHook
-class ElasticsearchHook(DbApiHook):
+class ElasticsearchSQLHook(DbApiHook):
"""
Interact with Elasticsearch through the elasticsearch-dbapi.
@@ -93,3 +96,58 @@ class ElasticsearchHook(DbApiHook):
uri += '&'
return uri
+
+
+class ElasticsearchHook(ElasticsearchSQLHook):
+ """
+ This class is deprecated and was renamed to ElasticsearchSQLHook.
+ Please use `airflow.providers.elasticsearch.hooks.elasticsearch.ElasticsearchSQLHook`.
+ """
+
+ def __init__(self, *args, **kwargs):
+ warnings.warn(
+ """This class is deprecated.
+ Please use `airflow.providers.elasticsearch.hooks.elasticsearch.ElasticsearchSQLHook`.""",
+ DeprecationWarning,
+ stacklevel=3,
+ )
+ super().__init__(*args, **kwargs)
+
+
+class ElasticsearchPythonHook(BaseHook):
+ """
+ Interacts with Elasticsearch. This hook uses the official Elasticsearch Python Client.
+
+ :param hosts: list: A list of a single or many Elasticsearch instances. Example: ["http://localhost:9200"]
+ :param es_conn_args: dict: Additional arguments you might need to enter to connect to Elasticsearch.
+ Example: {"ca_cert":"/path/to/cert", "basic_auth": "(user, pass)"}
+ """
+
+ def __init__(self, hosts: List[Any], es_conn_args: Optional[dict] = None):
+ super().__init__()
+ self.hosts = hosts
+ self.es_conn_args = es_conn_args if es_conn_args else {}
+
+ def _get_elastic_connection(self):
+ """Returns the Elasticsearch client"""
+ client = Elasticsearch(self.hosts, **self.es_conn_args)
+
+ return client
+
+ @cached_property
+ def get_conn(self):
+ """Returns the Elasticsearch client (cached)"""
+ return self._get_elastic_connection()
+
+ def search(self, query: Dict[Any, Any], index: str = "_all") -> dict:
+ """
+ Returns results matching a query using Elasticsearch DSL
+
+ :param index: str: The index you want to query
+ :param query: dict: The query you want to run
+
+ :returns: dict: The response 'hits' object from Elasticsearch
+ """
+ es_client = self.get_conn
+ result = es_client.search(index=index, body=query)
+ return result['hits']
diff --git a/docs/apache-airflow-providers-elasticsearch/hooks/elasticsearch_python_hook.rst b/docs/apache-airflow-providers-elasticsearch/hooks/elasticsearch_python_hook.rst
new file mode 100644
index 0000000000..8becd8051f
--- /dev/null
+++ b/docs/apache-airflow-providers-elasticsearch/hooks/elasticsearch_python_hook.rst
@@ -0,0 +1,41 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+.. _howto/hook:elasticsearch_python_hook:
+
+ElasticsearchPythonHook
+========================
+
+Elasticsearch Hook that is using the native Python client to communicate with Elasticsearch
+
+Parameters
+------------
+hosts
+ A list of a single or many Elasticsearch instances. Example: ["http://localhost:9200"]
+es_conn_args
+ Additional arguments you might need to enter to connect to Elasticsearch.
+ Example: {"ca_cert":"/path/to/cert", "basic_auth": "(user, pass)"}
+ For all possible configurations, consult with Elasticsearch documentation.
+ Reference: https://www.elastic.co/guide/en/elasticsearch/client/python-api/current/connecting.html
+
+Usage Example
+---------------------
+
+.. exampleinclude:: /../../tests/system/providers/elasticsearch/example_elasticsearch_query.py
+ :language: python
+ :start-after: [START howto_elasticsearch_python_hook]
+ :end-before: [END howto_elasticsearch_python_hook]
diff --git a/docs/apache-airflow-providers-elasticsearch/hooks/elasticsearch_sql_hook.rst b/docs/apache-airflow-providers-elasticsearch/hooks/elasticsearch_sql_hook.rst
new file mode 100644
index 0000000000..6021f4ac5d
--- /dev/null
+++ b/docs/apache-airflow-providers-elasticsearch/hooks/elasticsearch_sql_hook.rst
@@ -0,0 +1,32 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+
+.. _howto/hook:elasticsearch_sql_hook:
+
+ElasticsearchSQLHook
+========================
+
+Elasticsearch Hook that interact with Elasticsearch through the elasticsearch-dbapi
+
+Usage Example
+---------------------
+
+.. exampleinclude:: /../../tests/system/providers/elasticsearch/example_elasticsearch_query.py
+ :language: python
+ :start-after: [START howto_elasticsearch_query]
+ :end-before: [END howto_elasticsearch_query]
diff --git a/docs/apache-airflow-providers-elasticsearch/hooks/index.rst b/docs/apache-airflow-providers-elasticsearch/hooks/index.rst
new file mode 100644
index 0000000000..cfad00fc50
--- /dev/null
+++ b/docs/apache-airflow-providers-elasticsearch/hooks/index.rst
@@ -0,0 +1,25 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+Hook Types
+----------------
+
+.. toctree::
+ :maxdepth: 1
+ :glob:
+
+ *
diff --git a/docs/apache-airflow-providers-elasticsearch/index.rst b/docs/apache-airflow-providers-elasticsearch/index.rst
index bab52f0dd1..534265cb3e 100644
--- a/docs/apache-airflow-providers-elasticsearch/index.rst
+++ b/docs/apache-airflow-providers-elasticsearch/index.rst
@@ -28,6 +28,7 @@ Content
Connection types <connections/elasticsearch>
Logging for Tasks <logging/index>
+ Elasticsearch hooks <hooks/index>
.. toctree::
:maxdepth: 1
diff --git a/tests/always/test_project_structure.py b/tests/always/test_project_structure.py
index bddb425f07..71a652b96f 100644
--- a/tests/always/test_project_structure.py
+++ b/tests/always/test_project_structure.py
@@ -474,6 +474,9 @@ class TestElasticsearchProviderProjectStructure(ExampleCoverageTest):
PROVIDER = "elasticsearch"
CLASS_DIRS = {"hooks"}
CLASS_SUFFIXES = ["Hook"]
+ DEPRECATED_CLASSES = {
+ 'airflow.providers.elasticsearch.hooks.elasticsearch.ElasticsearchHook',
+ }
class TestDockerProviderProjectStructure(ExampleCoverageTest):
diff --git a/tests/providers/elasticsearch/hooks/test_elasticsearch.py b/tests/providers/elasticsearch/hooks/test_elasticsearch.py
index 8c1251221d..6a052fb0be 100644
--- a/tests/providers/elasticsearch/hooks/test_elasticsearch.py
+++ b/tests/providers/elasticsearch/hooks/test_elasticsearch.py
@@ -20,17 +20,42 @@
import unittest
from unittest import mock
+from elasticsearch import Elasticsearch
+
from airflow.models import Connection
-from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchHook
+from airflow.providers.elasticsearch.hooks.elasticsearch import (
+ ElasticsearchHook,
+ ElasticsearchPythonHook,
+ ElasticsearchSQLHook,
+)
+
+
+class TestElasticsearchHook(unittest.TestCase):
+ def test_throws_warning(self):
+ self.cur = mock.MagicMock(rowcount=0)
+ self.conn = mock.MagicMock()
+ self.conn.cursor.return_value = self.cur
+ conn = self.conn
+ self.connection = Connection(host='localhost', port=9200, schema='http')
+
+ with self.assertWarns(DeprecationWarning):
+
+ class UnitTestElasticsearchHook(ElasticsearchHook):
+ conn_name_attr = 'test_conn_id'
+
+ def get_conn(self):
+ return conn
+ self.db_hook = UnitTestElasticsearchHook()
-class TestElasticsearchHookConn(unittest.TestCase):
+
+class TestElasticsearchSQLHookConn(unittest.TestCase):
def setUp(self):
super().setUp()
self.connection = Connection(host='localhost', port=9200, schema='http')
- class UnitTestElasticsearchHook(ElasticsearchHook):
+ class UnitTestElasticsearchHook(ElasticsearchSQLHook):
conn_name_attr = 'elasticsearch_conn_id'
self.db_hook = UnitTestElasticsearchHook()
@@ -44,7 +69,7 @@ class TestElasticsearchHookConn(unittest.TestCase):
mock_connect.assert_called_with(host='localhost', port=9200, scheme='http', user=None, password=None)
-class TestElasticsearchHook(unittest.TestCase):
+class TestElasticsearcSQLhHook(unittest.TestCase):
def setUp(self):
super().setUp()
@@ -53,7 +78,7 @@ class TestElasticsearchHook(unittest.TestCase):
self.conn.cursor.return_value = self.cur
conn = self.conn
- class UnitTestElasticsearchHook(ElasticsearchHook):
+ class UnitTestElasticsearchHook(ElasticsearchSQLHook):
conn_name_attr = 'test_conn_id'
def get_conn(self):
@@ -95,3 +120,34 @@ class TestElasticsearchHook(unittest.TestCase):
assert result_sets[1][0] == df.values.tolist()[1][0]
self.cur.execute.assert_called_once_with(statement)
+
+
+class MockElasticsearch:
+ def __init__(self, data: dict):
+ self.data = data
+
+ def search(self, **kwargs):
+ return self.data
+
+
+class TestElasticsearchPythonHook:
+ def setup(self):
+ self.elasticsearch_hook = ElasticsearchPythonHook(hosts=["http://localhost:9200"])
+
+ def test_client(self):
+ es_connection = self.elasticsearch_hook.get_conn
+ assert isinstance(es_connection, Elasticsearch)
+
+ @mock.patch(
+ "airflow.providers.elasticsearch.hooks.elasticsearch.ElasticsearchPythonHook._get_elastic_connection"
+ )
+ def test_search(self, elastic_mock):
+ es_data = {"hits": "test_hit"}
+ es_client = MockElasticsearch(es_data)
+ elastic_mock.return_value = es_client
+
+ query = {"test_query": "test_filter"}
+
+ result = self.elasticsearch_hook.search(index="test_index", query=query)
+
+ assert result == es_data['hits']
diff --git a/tests/system/providers/elasticsearch/example_elasticsearch_query.py b/tests/system/providers/elasticsearch/example_elasticsearch_query.py
index 8edc348a08..6daa04eda0 100644
--- a/tests/system/providers/elasticsearch/example_elasticsearch_query.py
+++ b/tests/system/providers/elasticsearch/example_elasticsearch_query.py
@@ -23,7 +23,8 @@ from datetime import datetime
from airflow import models
from airflow.decorators import task
-from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchHook
+from airflow.operators.python import PythonOperator
+from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHook, ElasticsearchSQLHook
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = 'elasticsearch_dag'
@@ -36,7 +37,7 @@ def show_tables():
show_tables queries elasticsearch to list available tables
"""
# [START howto_elasticsearch_query]
- es = ElasticsearchHook(elasticsearch_conn_id=CONN_ID)
+ es = ElasticsearchSQLHook(elasticsearch_conn_id=CONN_ID)
# Handle ES conn with context manager
with es.get_conn() as es_conn:
@@ -47,6 +48,22 @@ def show_tables():
# [END howto_elasticsearch_query]
+# [START howto_elasticsearch_python_hook]
+def use_elasticsearch_hook():
+ """
+ Use ElasticSearchPythonHook to print results from a local Elasticsearch
+ """
+ es_hosts = ["http://localhost:9200"]
+ es_hook = ElasticsearchPythonHook(hosts=es_hosts)
+ query = {"query": {"match_all": {}}}
+ result = es_hook.search(query=query)
+ print(result)
+ return True
+
+
+# [END howto_elasticsearch_python_hook]
+
+
with models.DAG(
DAG_ID,
schedule_interval="@once",
@@ -59,6 +76,9 @@ with models.DAG(
# TEST BODY
execute_query
)
+ es_python_test = PythonOperator(
+ task_id='print_data_from_elasticsearch', python_callable=use_elasticsearch_hook
+ )
from tests.system.utils import get_test_run # noqa: E402