You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2018/05/19 13:42:24 UTC

incubator-airflow git commit: [AIRFLOW-2487] Enhance druid ingestion hook

Repository: incubator-airflow
Updated Branches:
  refs/heads/master fff87b5cf -> e48b8e36a


[AIRFLOW-2487] Enhance druid ingestion hook

Closes #3380 from feng-tao/aiflow-2487


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e48b8e36
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e48b8e36
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e48b8e36

Branch: refs/heads/master
Commit: e48b8e36af3d47d52cdcb8de4d3a63f65e34c2de
Parents: fff87b5
Author: Tao feng <tf...@lyft.com>
Authored: Sat May 19 14:42:13 2018 +0100
Committer: Kaxil Naik <ka...@apache.org>
Committed: Sat May 19 14:42:13 2018 +0100

----------------------------------------------------------------------
 airflow/hooks/druid_hook.py    | 13 +++++----
 tests/hooks/test_druid_hook.py | 56 ++++++++++++++++++++++++-------------
 2 files changed, 45 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e48b8e36/airflow/hooks/druid_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/druid_hook.py b/airflow/hooks/druid_hook.py
index e8b61c0..ef4f233 100644
--- a/airflow/hooks/druid_hook.py
+++ b/airflow/hooks/druid_hook.py
@@ -57,16 +57,17 @@ class DruidHook(BaseHook):
         conn = self.get_connection(self.druid_ingest_conn_id)
         host = conn.host
         port = conn.port
-        schema = conn.extra_dejson.get('schema', 'http')
+        conn_type = 'http' if not conn.conn_type else conn.conn_type
         endpoint = conn.extra_dejson.get('endpoint', '')
-        return "http://{host}:{port}/{endpoint}".format(**locals())
+        return "{conn_type}://{host}:{port}/{endpoint}".format(**locals())
 
     def submit_indexing_job(self, json_index_spec):
         url = self.get_conn_url()
 
         req_index = requests.post(url, data=json_index_spec, headers=self.header)
         if (req_index.status_code != 200):
-            raise AirflowException("Did not get 200 when submitting the Druid job to {}".format(url))
+            raise AirflowException('Did not get 200 when '
+                                   'submitting the Druid job to {}'.format(url))
 
         req_json = req_index.json()
         # Wait until the job is completed
@@ -85,7 +86,8 @@ class DruidHook(BaseHook):
             if self.max_ingestion_time and sec > self.max_ingestion_time:
                 # ensure that the job gets killed if the max ingestion time is exceeded
                 requests.post("{0}/{1}/shutdown".format(url, druid_task_id))
-                raise AirflowException('Druid ingestion took more than %s seconds', self.max_ingestion_time)
+                raise AirflowException('Druid ingestion took more than '
+                                       '%s seconds', self.max_ingestion_time)
 
             time.sleep(self.timeout)
 
@@ -95,7 +97,8 @@ class DruidHook(BaseHook):
             elif status == 'SUCCESS':
                 running = False  # Great success!
             elif status == 'FAILED':
-                raise AirflowException('Druid indexing job failed, check console for more info')
+                raise AirflowException('Druid indexing job failed, '
+                                       'check console for more info')
             else:
                 raise AirflowException('Could not get status of the job, got %s', status)
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e48b8e36/tests/hooks/test_druid_hook.py
----------------------------------------------------------------------
diff --git a/tests/hooks/test_druid_hook.py b/tests/hooks/test_druid_hook.py
index 72c88d8..6fd7b3c 100644
--- a/tests/hooks/test_druid_hook.py
+++ b/tests/hooks/test_druid_hook.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,7 +18,7 @@
 # under the License.
 #
 
-import mock
+from mock import MagicMock, patch
 import requests
 import requests_mock
 import unittest
@@ -31,98 +31,116 @@ class TestDruidHook(unittest.TestCase):
 
     def setUp(self):
         super(TestDruidHook, self).setUp()
-
         session = requests.Session()
         adapter = requests_mock.Adapter()
         session.mount('mock', adapter)
 
+        class TestDRuidhook(DruidHook):
+            def get_conn_url(self):
+                return 'http://druid-overlord:8081/druid/indexer/v1/task'
+        self.db_hook = TestDRuidhook()
+
     @requests_mock.mock()
     def test_submit_gone_wrong(self, m):
-        hook = DruidHook()
         task_post = m.post(
             'http://druid-overlord:8081/druid/indexer/v1/task',
             text='{"task":"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de"}'
         )
         status_check = m.get(
-            'http://druid-overlord:8081/druid/indexer/v1/task/9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/status',
+            'http://druid-overlord:8081/druid/indexer/v1/task/'
+            '9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/status',
             text='{"status":{"status": "FAILED"}}'
         )
 
         # The job failed for some reason
         with self.assertRaises(AirflowException):
-            hook.submit_indexing_job('Long json file')
+            self.db_hook.submit_indexing_job('Long json file')
 
         self.assertTrue(task_post.called_once)
         self.assertTrue(status_check.called_once)
 
     @requests_mock.mock()
     def test_submit_ok(self, m):
-        hook = DruidHook()
         task_post = m.post(
             'http://druid-overlord:8081/druid/indexer/v1/task',
             text='{"task":"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de"}'
         )
         status_check = m.get(
-            'http://druid-overlord:8081/druid/indexer/v1/task/9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/status',
+            'http://druid-overlord:8081/druid/indexer/v1/task/'
+            '9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/status',
             text='{"status":{"status": "SUCCESS"}}'
         )
 
         # Exists just as it should
-        hook.submit_indexing_job('Long json file')
+        self.db_hook.submit_indexing_job('Long json file')
 
         self.assertTrue(task_post.called_once)
         self.assertTrue(status_check.called_once)
 
     @requests_mock.mock()
     def test_submit_unknown_response(self, m):
-        hook = DruidHook()
         task_post = m.post(
             'http://druid-overlord:8081/druid/indexer/v1/task',
             text='{"task":"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de"}'
         )
         status_check = m.get(
-            'http://druid-overlord:8081/druid/indexer/v1/task/9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/status',
+            'http://druid-overlord:8081/druid/indexer/v1/task/'
+            '9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/status',
             text='{"status":{"status": "UNKNOWN"}}'
         )
 
         # An unknown error code
         with self.assertRaises(AirflowException):
-            hook.submit_indexing_job('Long json file')
+            self.db_hook.submit_indexing_job('Long json file')
 
         self.assertTrue(task_post.called_once)
         self.assertTrue(status_check.called_once)
 
     @requests_mock.mock()
     def test_submit_timeout(self, m):
-        hook = DruidHook(timeout=0, max_ingestion_time=5)
+        self.db_hook.timeout = 0
+        self.db_hook.max_ingestion_time = 5
         task_post = m.post(
             'http://druid-overlord:8081/druid/indexer/v1/task',
             text='{"task":"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de"}'
         )
         status_check = m.get(
-            'http://druid-overlord:8081/druid/indexer/v1/task/9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/status',
+            'http://druid-overlord:8081/druid/indexer/v1/task/'
+            '9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/status',
             text='{"status":{"status": "RUNNING"}}'
         )
         shutdown_post = m.post(
-            'http://druid-overlord:8081/druid/indexer/v1/task/9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/shutdown',
+            'http://druid-overlord:8081/druid/indexer/v1/task/'
+            '9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/shutdown',
             text='{"task":"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de"}'
         )
 
         # Because the jobs keeps running
         with self.assertRaises(AirflowException):
-            hook.submit_indexing_job('Long json file')
+            self.db_hook.submit_indexing_job('Long json file')
 
         self.assertTrue(task_post.called_once)
         self.assertTrue(status_check.called)
         self.assertTrue(shutdown_post.called_once)
 
+    @patch('airflow.hooks.druid_hook.DruidHook.get_connection')
+    def test_get_conn_url(self, mock_get_connection):
+        get_conn_value = MagicMock()
+        get_conn_value.host = 'test_host'
+        get_conn_value.conn_type = 'https'
+        get_conn_value.port = '1'
+        get_conn_value.extra_dejson = {'endpoint': 'ingest'}
+        mock_get_connection.return_value = get_conn_value
+        hook = DruidHook(timeout=0, max_ingestion_time=5)
+        self.assertEquals(hook.get_conn_url(), 'https://test_host:1/ingest')
+
 
 class TestDruidDbApiHook(unittest.TestCase):
 
     def setUp(self):
         super(TestDruidDbApiHook, self).setUp()
-        self.cur = mock.MagicMock()
-        self.conn = conn = mock.MagicMock()
+        self.cur = MagicMock()
+        self.conn = conn = MagicMock()
         self.conn.host = 'host'
         self.conn.port = '1000'
         self.conn.conn_type = 'druid'