You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by fo...@apache.org on 2018/03/14 08:21:48 UTC

incubator-airflow git commit: [AIRFLOW-2183] Refactor DruidHook to enable sql

Repository: incubator-airflow
Updated Branches:
  refs/heads/master c3730650c -> 7a880a7e9


[AIRFLOW-2183] Refactor DruidHook to enable sql

Refactor DruidHook to be able to issue druid sql query to druid broker

Closes #3105 from feng-tao/airflow-2183


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/7a880a7e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/7a880a7e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/7a880a7e

Branch: refs/heads/master
Commit: 7a880a7e987f423d4baca67eaa9d20451fa8fa87
Parents: c373065
Author: Tao feng <tf...@lyft.com>
Authored: Wed Mar 14 09:20:16 2018 +0100
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Wed Mar 14 09:20:20 2018 +0100

----------------------------------------------------------------------
 .gitignore                     |  1 +
 airflow/hooks/__init__.py      |  5 ++-
 airflow/hooks/druid_hook.py    | 64 +++++++++++++++++++++++++++++++++++--
 airflow/utils/db.py            |  4 +++
 setup.py                       |  6 ++--
 tests/hooks/test_druid_hook.py | 52 ++++++++++++++++++++++++++++--
 6 files changed, 123 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7a880a7e/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index a29c9ad..f5ed5ad 100644
--- a/.gitignore
+++ b/.gitignore
@@ -19,6 +19,7 @@ logs/
 __pycache__/
 *.py[cod]
 *$py.class
+.pytest_cache/
 
 # C extensions
 *.so

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7a880a7e/airflow/hooks/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/__init__.py b/airflow/hooks/__init__.py
index 6372b2f..3d75f9b 100644
--- a/airflow/hooks/__init__.py
+++ b/airflow/hooks/__init__.py
@@ -50,7 +50,10 @@ _hooks = {
     'S3_hook': ['S3Hook'],
     'zendesk_hook': ['ZendeskHook'],
     'http_hook': ['HttpHook'],
-    'druid_hook': ['DruidHook'],
+    'druid_hook': [
+        'DruidHook',
+        'DruidDbApiHook',
+    ],
     'jdbc_hook': ['JdbcHook'],
     'dbapi_hook': ['DbApiHook'],
     'mssql_hook': ['MsSqlHook'],

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7a880a7e/airflow/hooks/druid_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/druid_hook.py b/airflow/hooks/druid_hook.py
index 9ce1f9a..f08cbc2 100644
--- a/airflow/hooks/druid_hook.py
+++ b/airflow/hooks/druid_hook.py
@@ -17,17 +17,22 @@ from __future__ import print_function
 import requests
 import time
 
+from pydruid.db import connect
+
 from airflow.exceptions import AirflowException
 from airflow.hooks.base_hook import BaseHook
+from airflow.hooks.dbapi_hook import DbApiHook
 
 
 class DruidHook(BaseHook):
     """
-    Connection to Druid
+    Connection to Druid overlord for ingestion
 
-    :param druid_ingest_conn_id: The connection id to the Druid overlord machine which accepts index jobs
+    :param druid_ingest_conn_id: The connection id to the Druid overlord machine
+                                 which accepts index jobs
     :type druid_ingest_conn_id: string
-    :param timeout: The interval between polling the Druid job for the status of the ingestion job
+    :param timeout: The interval between polling
+                    the Druid job for the status of the ingestion job
     :type timeout: int
     :param max_ingestion_time: The maximum ingestion time before assuming the job failed
     :type max_ingestion_time: int
@@ -90,3 +95,56 @@ class DruidHook(BaseHook):
                 raise AirflowException('Could not get status of the job, got %s', status)
 
         self.log.info('Successful index')
+
+
+class DruidDbApiHook(DbApiHook):
+    """
+    Interact with Druid broker
+
+    This hook is purely for users to query druid broker.
+    For ingestion, please use druidHook.
+    """
+    conn_name_attr = 'druid_broker_conn_id'
+    default_conn_name = 'druid_broker_default'
+    supports_autocommit = False
+
+    def __init__(self, *args, **kwargs):
+        super(DruidDbApiHook, self).__init__(*args, **kwargs)
+
+    def get_conn(self):
+        """
+        Establish a connection to druid broker.
+        """
+        conn = self.get_connection(self.druid_broker_conn_id)
+        druid_broker_conn = connect(
+            host=conn.host,
+            port=conn.port,
+            path=conn.extra_dejson.get('endpoint', '/druid/v2/sql'),
+            scheme=conn.extra_dejson.get('schema', 'http')
+        )
+        self.log('Get the connection to druid broker on {host}'.format(host=conn.host))
+        return druid_broker_conn
+
+    def get_uri(self):
+        """
+        Get the connection uri for druid broker.
+
+        e.g: druid://localhost:8082/druid/v2/sql/
+        """
+        conn = self.get_connection(getattr(self, self.conn_name_attr))
+        host = conn.host
+        if conn.port is not None:
+            host += ':{port}'.format(port=conn.port)
+        conn_type = 'druid' if not conn.conn_type else conn.conn_type
+        endpoint = conn.extra_dejson.get('endpoint', 'druid/v2/sql')
+        return '{conn_type}://{host}/{endpoint}'.format(
+            conn_type=conn_type, host=host, endpoint=endpoint)
+
+    def set_autocommit(self, conn, autocommit):
+        raise NotImplementedError()
+
+    def get_pandas_df(self, sql, parameters=None):
+        raise NotImplementedError()
+
+    def insert_rows(self, table, rows, target_fields=None, commit_every=1000):
+        raise NotImplementedError()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7a880a7e/airflow/utils/db.py
----------------------------------------------------------------------
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index 6acab4f..6c7f3c0 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -193,6 +193,10 @@ def initdb():
             host='yarn', extra='{"queue": "root.default"}'))
     merge_conn(
         models.Connection(
+            conn_id='druid_broker_default', conn_type='druid',
+            host='druid-broker', port=8082, extra='{"endpoint": "druid/v2/sql"}'))
+    merge_conn(
+        models.Connection(
             conn_id='druid_ingest_default', conn_type='druid',
             host='druid-overlord', port=8081, extra='{"endpoint": "druid/indexer/v1/task"}'))
     merge_conn(

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7a880a7e/setup.py
----------------------------------------------------------------------
diff --git a/setup.py b/setup.py
index 20742d4..83f199a 100644
--- a/setup.py
+++ b/setup.py
@@ -118,6 +118,7 @@ doc = [
     'Sphinx-PyPI-upload>=0.2.1'
 ]
 docker = ['docker-py>=1.6.0']
+druid = ['pydruid>=0.4.1']
 emr = ['boto3>=1.0.0']
 gcp_api = [
     'httplib2',
@@ -168,7 +169,7 @@ kubernetes = ['kubernetes>=3.0.0',
 
 zendesk = ['zdesk']
 
-all_dbs = postgres + mysql + hive + mssql + hdfs + vertica + cloudant
+all_dbs = postgres + mysql + hive + mssql + hdfs + vertica + cloudant + druid
 devel = [
     'click',
     'freezegun',
@@ -190,7 +191,7 @@ devel_minreq = devel + kubernetes + mysql + doc + password + s3 + cgroups
 devel_hadoop = devel_minreq + hive + hdfs + webhdfs + kerberos
 devel_all = (sendgrid + devel + all_dbs + doc + samba + s3 + slack + crypto + oracle +
              docker + ssh + kubernetes + celery + azure + redis + gcp_api + datadog +
-             zendesk + jdbc + ldap + kerberos + password + webhdfs + jenkins)
+             zendesk + jdbc + ldap + kerberos + password + webhdfs + jenkins + druid)
 
 # Snakebite & Google Cloud Dataflow are not Python 3 compatible :'(
 if PY3:
@@ -269,6 +270,7 @@ def do_setup():
             'devel_hadoop': devel_hadoop,
             'doc': doc,
             'docker': docker,
+            'druid': druid,
             'emr': emr,
             'gcp_api': gcp_api,
             'github_enterprise': github_enterprise,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7a880a7e/tests/hooks/test_druid_hook.py
----------------------------------------------------------------------
diff --git a/tests/hooks/test_druid_hook.py b/tests/hooks/test_druid_hook.py
index ddab369..606cbb7 100644
--- a/tests/hooks/test_druid_hook.py
+++ b/tests/hooks/test_druid_hook.py
@@ -13,12 +13,13 @@
 # limitations under the License.
 #
 
+import mock
 import requests
 import requests_mock
 import unittest
 
 from airflow.exceptions import AirflowException
-from airflow.hooks.druid_hook import DruidHook
+from airflow.hooks.druid_hook import DruidDbApiHook, DruidHook
 
 
 class TestDruidHook(unittest.TestCase):
@@ -111,6 +112,51 @@ class TestDruidHook(unittest.TestCase):
         self.assertTrue(shutdown_post.called_once)
 
 
+class TestDruidDbApiHook(unittest.TestCase):
 
-
-
+    def setUp(self):
+        super(TestDruidDbApiHook, self).setUp()
+        self.cur = mock.MagicMock()
+        self.conn = conn = mock.MagicMock()
+        self.conn.host = 'host'
+        self.conn.port = '1000'
+        self.conn.conn_type = 'druid'
+        self.conn.extra_dejson = {'endpoint': 'druid/v2/sql'}
+        self.conn.cursor.return_value = self.cur
+
+        class TestDruidDBApiHook(DruidDbApiHook):
+            def get_conn(self):
+                return conn
+
+            def get_connection(self, conn_id):
+                return conn
+
+        self.db_hook = TestDruidDBApiHook
+
+    def test_get_uri(self):
+        db_hook = self.db_hook()
+        self.assertEquals('druid://host:1000/druid/v2/sql', db_hook.get_uri())
+
+    def test_get_first_record(self):
+        statement = 'SQL'
+        result_sets = [('row1',), ('row2',)]
+        self.cur.fetchone.return_value = result_sets[0]
+
+        self.assertEqual(result_sets[0], self.db_hook().get_first(statement))
+        self.conn.close.assert_called_once()
+        self.cur.close.assert_called_once()
+        self.cur.execute.assert_called_once_with(statement)
+
+    def test_get_records(self):
+        statement = 'SQL'
+        result_sets = [('row1',), ('row2',)]
+        self.cur.fetchall.return_value = result_sets
+
+        self.assertEqual(result_sets, self.db_hook().get_records(statement))
+        self.conn.close.assert_called_once()
+        self.cur.close.assert_called_once()
+        self.cur.execute.assert_called_once_with(statement)
+
+
+if __name__ == '__main__':
+    unittest.main()