You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/07/08 21:51:51 UTC

[airflow] branch main updated: Adding ElasticserachPythonHook - ES Hook With The Python Client (#24895)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 2ddc100405 Adding ElasticserachPythonHook - ES Hook With The Python Client (#24895)
2ddc100405 is described below

commit 2ddc1004050464c112c18fee81b03f87a7a11610
Author: Alex Kruchkov <36...@users.noreply.github.com>
AuthorDate: Sat Jul 9 00:51:45 2022 +0300

    Adding ElasticserachPythonHook - ES Hook With The Python Client (#24895)
---
 .../providers/elasticsearch/hooks/elasticsearch.py | 64 ++++++++++++++++++++-
 .../hooks/elasticsearch_python_hook.rst            | 41 ++++++++++++++
 .../hooks/elasticsearch_sql_hook.rst               | 32 +++++++++++
 .../hooks/index.rst                                | 25 ++++++++
 .../index.rst                                      |  1 +
 tests/always/test_project_structure.py             |  3 +
 .../elasticsearch/hooks/test_elasticsearch.py      | 66 ++++++++++++++++++++--
 .../elasticsearch/example_elasticsearch_query.py   | 24 +++++++-
 8 files changed, 246 insertions(+), 10 deletions(-)

diff --git a/airflow/providers/elasticsearch/hooks/elasticsearch.py b/airflow/providers/elasticsearch/hooks/elasticsearch.py
index 493ca7f082..cbad370482 100644
--- a/airflow/providers/elasticsearch/hooks/elasticsearch.py
+++ b/airflow/providers/elasticsearch/hooks/elasticsearch.py
@@ -15,16 +15,19 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+import warnings
+from typing import Any, Dict, List, Optional
 
-from typing import Optional
-
+from elasticsearch import Elasticsearch
 from es.elastic.api import Connection as ESConnection, connect
 
+from airflow.compat.functools import cached_property
+from airflow.hooks.base import BaseHook
 from airflow.models.connection import Connection as AirflowConnection
 from airflow.providers.common.sql.hooks.sql import DbApiHook
 
 
-class ElasticsearchHook(DbApiHook):
+class ElasticsearchSQLHook(DbApiHook):
     """
     Interact with Elasticsearch through the elasticsearch-dbapi.
 
@@ -93,3 +96,58 @@ class ElasticsearchHook(DbApiHook):
                 uri += '&'
 
         return uri
+
+
+class ElasticsearchHook(ElasticsearchSQLHook):
+    """
+    This class is deprecated and was renamed to ElasticsearchSQLHook.
+    Please use `airflow.providers.elasticsearch.hooks.elasticsearch.ElasticsearchSQLHook`.
+    """
+
+    def __init__(self, *args, **kwargs):
+        warnings.warn(
+            """This class is deprecated.
+            Please use `airflow.providers.elasticsearch.hooks.elasticsearch.ElasticsearchSQLHook`.""",
+            DeprecationWarning,
+            stacklevel=3,
+        )
+        super().__init__(*args, **kwargs)
+
+
+class ElasticsearchPythonHook(BaseHook):
+    """
+    Interacts with Elasticsearch. This hook uses the official Elasticsearch Python Client.
+
+    :param hosts: list: A list of a single or many Elasticsearch instances. Example: ["http://localhost:9200"]
+    :param es_conn_args: dict: Additional arguments you might need to enter to connect to Elasticsearch.
+                                Example: {"ca_cert":"/path/to/cert", "basic_auth": "(user, pass)"}
+    """
+
+    def __init__(self, hosts: List[Any], es_conn_args: Optional[dict] = None):
+        super().__init__()
+        self.hosts = hosts
+        self.es_conn_args = es_conn_args if es_conn_args else {}
+
+    def _get_elastic_connection(self):
+        """Returns the Elasticsearch client"""
+        client = Elasticsearch(self.hosts, **self.es_conn_args)
+
+        return client
+
+    @cached_property
+    def get_conn(self):
+        """Returns the Elasticsearch client (cached)"""
+        return self._get_elastic_connection()
+
+    def search(self, query: Dict[Any, Any], index: str = "_all") -> dict:
+        """
+        Returns results matching a query using Elasticsearch DSL
+
+        :param index: str: The index you want to query
+        :param query: dict: The query you want to run
+
+        :returns: dict: The response 'hits' object from Elasticsearch
+        """
+        es_client = self.get_conn
+        result = es_client.search(index=index, body=query)
+        return result['hits']
diff --git a/docs/apache-airflow-providers-elasticsearch/hooks/elasticsearch_python_hook.rst b/docs/apache-airflow-providers-elasticsearch/hooks/elasticsearch_python_hook.rst
new file mode 100644
index 0000000000..8becd8051f
--- /dev/null
+++ b/docs/apache-airflow-providers-elasticsearch/hooks/elasticsearch_python_hook.rst
@@ -0,0 +1,41 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _howto/hook:elasticsearch_python_hook:
+
+ElasticsearchPythonHook
+========================
+
+Elasticsearch Hook that is using the native Python client to communicate with Elasticsearch
+
+Parameters
+------------
+hosts
+  A list of a single or many Elasticsearch instances. Example: ["http://localhost:9200"]
+es_conn_args
+  Additional arguments you might need to enter to connect to Elasticsearch.
+  Example: {"ca_cert":"/path/to/cert", "basic_auth": "(user, pass)"}
+  For all possible configurations, consult with Elasticsearch documentation.
+  Reference: https://www.elastic.co/guide/en/elasticsearch/client/python-api/current/connecting.html
+
+Usage Example
+---------------------
+
+.. exampleinclude:: /../../tests/system/providers/elasticsearch/example_elasticsearch_query.py
+    :language: python
+    :start-after: [START howto_elasticsearch_python_hook]
+    :end-before: [END howto_elasticsearch_python_hook]
diff --git a/docs/apache-airflow-providers-elasticsearch/hooks/elasticsearch_sql_hook.rst b/docs/apache-airflow-providers-elasticsearch/hooks/elasticsearch_sql_hook.rst
new file mode 100644
index 0000000000..6021f4ac5d
--- /dev/null
+++ b/docs/apache-airflow-providers-elasticsearch/hooks/elasticsearch_sql_hook.rst
@@ -0,0 +1,32 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+.. _howto/hook:elasticsearch_sql_hook:
+
+ElasticsearchSQLHook
+========================
+
+Elasticsearch Hook that interact with Elasticsearch through the elasticsearch-dbapi
+
+Usage Example
+---------------------
+
+.. exampleinclude:: /../../tests/system/providers/elasticsearch/example_elasticsearch_query.py
+    :language: python
+    :start-after: [START howto_elasticsearch_query]
+    :end-before: [END howto_elasticsearch_query]
diff --git a/docs/apache-airflow-providers-elasticsearch/hooks/index.rst b/docs/apache-airflow-providers-elasticsearch/hooks/index.rst
new file mode 100644
index 0000000000..cfad00fc50
--- /dev/null
+++ b/docs/apache-airflow-providers-elasticsearch/hooks/index.rst
@@ -0,0 +1,25 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+Hook Types
+----------------
+
+.. toctree::
+    :maxdepth: 1
+    :glob:
+
+    *
diff --git a/docs/apache-airflow-providers-elasticsearch/index.rst b/docs/apache-airflow-providers-elasticsearch/index.rst
index bab52f0dd1..534265cb3e 100644
--- a/docs/apache-airflow-providers-elasticsearch/index.rst
+++ b/docs/apache-airflow-providers-elasticsearch/index.rst
@@ -28,6 +28,7 @@ Content
 
     Connection types <connections/elasticsearch>
     Logging for Tasks <logging/index>
+    Elasticsearch hooks <hooks/index>
 
 .. toctree::
     :maxdepth: 1
diff --git a/tests/always/test_project_structure.py b/tests/always/test_project_structure.py
index bddb425f07..71a652b96f 100644
--- a/tests/always/test_project_structure.py
+++ b/tests/always/test_project_structure.py
@@ -474,6 +474,9 @@ class TestElasticsearchProviderProjectStructure(ExampleCoverageTest):
     PROVIDER = "elasticsearch"
     CLASS_DIRS = {"hooks"}
     CLASS_SUFFIXES = ["Hook"]
+    DEPRECATED_CLASSES = {
+        'airflow.providers.elasticsearch.hooks.elasticsearch.ElasticsearchHook',
+    }
 
 
 class TestDockerProviderProjectStructure(ExampleCoverageTest):
diff --git a/tests/providers/elasticsearch/hooks/test_elasticsearch.py b/tests/providers/elasticsearch/hooks/test_elasticsearch.py
index 8c1251221d..6a052fb0be 100644
--- a/tests/providers/elasticsearch/hooks/test_elasticsearch.py
+++ b/tests/providers/elasticsearch/hooks/test_elasticsearch.py
@@ -20,17 +20,42 @@
 import unittest
 from unittest import mock
 
+from elasticsearch import Elasticsearch
+
 from airflow.models import Connection
-from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchHook
+from airflow.providers.elasticsearch.hooks.elasticsearch import (
+    ElasticsearchHook,
+    ElasticsearchPythonHook,
+    ElasticsearchSQLHook,
+)
+
+
+class TestElasticsearchHook(unittest.TestCase):
+    def test_throws_warning(self):
+        self.cur = mock.MagicMock(rowcount=0)
+        self.conn = mock.MagicMock()
+        self.conn.cursor.return_value = self.cur
+        conn = self.conn
+        self.connection = Connection(host='localhost', port=9200, schema='http')
+
+        with self.assertWarns(DeprecationWarning):
+
+            class UnitTestElasticsearchHook(ElasticsearchHook):
+                conn_name_attr = 'test_conn_id'
+
+                def get_conn(self):
+                    return conn
 
+            self.db_hook = UnitTestElasticsearchHook()
 
-class TestElasticsearchHookConn(unittest.TestCase):
+
+class TestElasticsearchSQLHookConn(unittest.TestCase):
     def setUp(self):
         super().setUp()
 
         self.connection = Connection(host='localhost', port=9200, schema='http')
 
-        class UnitTestElasticsearchHook(ElasticsearchHook):
+        class UnitTestElasticsearchHook(ElasticsearchSQLHook):
             conn_name_attr = 'elasticsearch_conn_id'
 
         self.db_hook = UnitTestElasticsearchHook()
@@ -44,7 +69,7 @@ class TestElasticsearchHookConn(unittest.TestCase):
         mock_connect.assert_called_with(host='localhost', port=9200, scheme='http', user=None, password=None)
 
 
-class TestElasticsearchHook(unittest.TestCase):
+class TestElasticsearcSQLhHook(unittest.TestCase):
     def setUp(self):
         super().setUp()
 
@@ -53,7 +78,7 @@ class TestElasticsearchHook(unittest.TestCase):
         self.conn.cursor.return_value = self.cur
         conn = self.conn
 
-        class UnitTestElasticsearchHook(ElasticsearchHook):
+        class UnitTestElasticsearchHook(ElasticsearchSQLHook):
             conn_name_attr = 'test_conn_id'
 
             def get_conn(self):
@@ -95,3 +120,34 @@ class TestElasticsearchHook(unittest.TestCase):
         assert result_sets[1][0] == df.values.tolist()[1][0]
 
         self.cur.execute.assert_called_once_with(statement)
+
+
+class MockElasticsearch:
+    def __init__(self, data: dict):
+        self.data = data
+
+    def search(self, **kwargs):
+        return self.data
+
+
+class TestElasticsearchPythonHook:
+    def setup(self):
+        self.elasticsearch_hook = ElasticsearchPythonHook(hosts=["http://localhost:9200"])
+
+    def test_client(self):
+        es_connection = self.elasticsearch_hook.get_conn
+        assert isinstance(es_connection, Elasticsearch)
+
+    @mock.patch(
+        "airflow.providers.elasticsearch.hooks.elasticsearch.ElasticsearchPythonHook._get_elastic_connection"
+    )
+    def test_search(self, elastic_mock):
+        es_data = {"hits": "test_hit"}
+        es_client = MockElasticsearch(es_data)
+        elastic_mock.return_value = es_client
+
+        query = {"test_query": "test_filter"}
+
+        result = self.elasticsearch_hook.search(index="test_index", query=query)
+
+        assert result == es_data['hits']
diff --git a/tests/system/providers/elasticsearch/example_elasticsearch_query.py b/tests/system/providers/elasticsearch/example_elasticsearch_query.py
index 8edc348a08..6daa04eda0 100644
--- a/tests/system/providers/elasticsearch/example_elasticsearch_query.py
+++ b/tests/system/providers/elasticsearch/example_elasticsearch_query.py
@@ -23,7 +23,8 @@ from datetime import datetime
 
 from airflow import models
 from airflow.decorators import task
-from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchHook
+from airflow.operators.python import PythonOperator
+from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHook, ElasticsearchSQLHook
 
 ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
 DAG_ID = 'elasticsearch_dag'
@@ -36,7 +37,7 @@ def show_tables():
     show_tables queries elasticsearch to list available tables
     """
     # [START howto_elasticsearch_query]
-    es = ElasticsearchHook(elasticsearch_conn_id=CONN_ID)
+    es = ElasticsearchSQLHook(elasticsearch_conn_id=CONN_ID)
 
     # Handle ES conn with context manager
     with es.get_conn() as es_conn:
@@ -47,6 +48,22 @@ def show_tables():
     # [END howto_elasticsearch_query]
 
 
+# [START howto_elasticsearch_python_hook]
+def use_elasticsearch_hook():
+    """
+    Use ElasticSearchPythonHook to print results from a local Elasticsearch
+    """
+    es_hosts = ["http://localhost:9200"]
+    es_hook = ElasticsearchPythonHook(hosts=es_hosts)
+    query = {"query": {"match_all": {}}}
+    result = es_hook.search(query=query)
+    print(result)
+    return True
+
+
+# [END howto_elasticsearch_python_hook]
+
+
 with models.DAG(
     DAG_ID,
     schedule_interval="@once",
@@ -59,6 +76,9 @@ with models.DAG(
         # TEST BODY
         execute_query
     )
+    es_python_test = PythonOperator(
+        task_id='print_data_from_elasticsearch', python_callable=use_elasticsearch_hook
+    )
 
 from tests.system.utils import get_test_run  # noqa: E402