You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2018/05/19 13:42:24 UTC
incubator-airflow git commit: [AIRFLOW-2487] Enhance druid ingestion
hook
Repository: incubator-airflow
Updated Branches:
refs/heads/master fff87b5cf -> e48b8e36a
[AIRFLOW-2487] Enhance druid ingestion hook
Closes #3380 from feng-tao/aiflow-2487
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e48b8e36
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e48b8e36
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e48b8e36
Branch: refs/heads/master
Commit: e48b8e36af3d47d52cdcb8de4d3a63f65e34c2de
Parents: fff87b5
Author: Tao feng <tf...@lyft.com>
Authored: Sat May 19 14:42:13 2018 +0100
Committer: Kaxil Naik <ka...@apache.org>
Committed: Sat May 19 14:42:13 2018 +0100
----------------------------------------------------------------------
airflow/hooks/druid_hook.py | 13 +++++----
tests/hooks/test_druid_hook.py | 56 ++++++++++++++++++++++++-------------
2 files changed, 45 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e48b8e36/airflow/hooks/druid_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/druid_hook.py b/airflow/hooks/druid_hook.py
index e8b61c0..ef4f233 100644
--- a/airflow/hooks/druid_hook.py
+++ b/airflow/hooks/druid_hook.py
@@ -57,16 +57,17 @@ class DruidHook(BaseHook):
conn = self.get_connection(self.druid_ingest_conn_id)
host = conn.host
port = conn.port
- schema = conn.extra_dejson.get('schema', 'http')
+ conn_type = 'http' if not conn.conn_type else conn.conn_type
endpoint = conn.extra_dejson.get('endpoint', '')
- return "http://{host}:{port}/{endpoint}".format(**locals())
+ return "{conn_type}://{host}:{port}/{endpoint}".format(**locals())
def submit_indexing_job(self, json_index_spec):
url = self.get_conn_url()
req_index = requests.post(url, data=json_index_spec, headers=self.header)
if (req_index.status_code != 200):
- raise AirflowException("Did not get 200 when submitting the Druid job to {}".format(url))
+ raise AirflowException('Did not get 200 when '
+ 'submitting the Druid job to {}'.format(url))
req_json = req_index.json()
# Wait until the job is completed
@@ -85,7 +86,8 @@ class DruidHook(BaseHook):
if self.max_ingestion_time and sec > self.max_ingestion_time:
# ensure that the job gets killed if the max ingestion time is exceeded
requests.post("{0}/{1}/shutdown".format(url, druid_task_id))
- raise AirflowException('Druid ingestion took more than %s seconds', self.max_ingestion_time)
+ raise AirflowException('Druid ingestion took more than '
+ '%s seconds', self.max_ingestion_time)
time.sleep(self.timeout)
@@ -95,7 +97,8 @@ class DruidHook(BaseHook):
elif status == 'SUCCESS':
running = False # Great success!
elif status == 'FAILED':
- raise AirflowException('Druid indexing job failed, check console for more info')
+ raise AirflowException('Druid indexing job failed, '
+ 'check console for more info')
else:
raise AirflowException('Could not get status of the job, got %s', status)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e48b8e36/tests/hooks/test_druid_hook.py
----------------------------------------------------------------------
diff --git a/tests/hooks/test_druid_hook.py b/tests/hooks/test_druid_hook.py
index 72c88d8..6fd7b3c 100644
--- a/tests/hooks/test_druid_hook.py
+++ b/tests/hooks/test_druid_hook.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,7 +18,7 @@
# under the License.
#
-import mock
+from mock import MagicMock, patch
import requests
import requests_mock
import unittest
@@ -31,98 +31,116 @@ class TestDruidHook(unittest.TestCase):
def setUp(self):
super(TestDruidHook, self).setUp()
-
session = requests.Session()
adapter = requests_mock.Adapter()
session.mount('mock', adapter)
+ class TestDRuidhook(DruidHook):
+ def get_conn_url(self):
+ return 'http://druid-overlord:8081/druid/indexer/v1/task'
+ self.db_hook = TestDRuidhook()
+
@requests_mock.mock()
def test_submit_gone_wrong(self, m):
- hook = DruidHook()
task_post = m.post(
'http://druid-overlord:8081/druid/indexer/v1/task',
text='{"task":"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de"}'
)
status_check = m.get(
- 'http://druid-overlord:8081/druid/indexer/v1/task/9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/status',
+ 'http://druid-overlord:8081/druid/indexer/v1/task/'
+ '9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/status',
text='{"status":{"status": "FAILED"}}'
)
# The job failed for some reason
with self.assertRaises(AirflowException):
- hook.submit_indexing_job('Long json file')
+ self.db_hook.submit_indexing_job('Long json file')
self.assertTrue(task_post.called_once)
self.assertTrue(status_check.called_once)
@requests_mock.mock()
def test_submit_ok(self, m):
- hook = DruidHook()
task_post = m.post(
'http://druid-overlord:8081/druid/indexer/v1/task',
text='{"task":"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de"}'
)
status_check = m.get(
- 'http://druid-overlord:8081/druid/indexer/v1/task/9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/status',
+ 'http://druid-overlord:8081/druid/indexer/v1/task/'
+ '9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/status',
text='{"status":{"status": "SUCCESS"}}'
)
# Exists just as it should
- hook.submit_indexing_job('Long json file')
+ self.db_hook.submit_indexing_job('Long json file')
self.assertTrue(task_post.called_once)
self.assertTrue(status_check.called_once)
@requests_mock.mock()
def test_submit_unknown_response(self, m):
- hook = DruidHook()
task_post = m.post(
'http://druid-overlord:8081/druid/indexer/v1/task',
text='{"task":"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de"}'
)
status_check = m.get(
- 'http://druid-overlord:8081/druid/indexer/v1/task/9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/status',
+ 'http://druid-overlord:8081/druid/indexer/v1/task/'
+ '9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/status',
text='{"status":{"status": "UNKNOWN"}}'
)
# An unknown error code
with self.assertRaises(AirflowException):
- hook.submit_indexing_job('Long json file')
+ self.db_hook.submit_indexing_job('Long json file')
self.assertTrue(task_post.called_once)
self.assertTrue(status_check.called_once)
@requests_mock.mock()
def test_submit_timeout(self, m):
- hook = DruidHook(timeout=0, max_ingestion_time=5)
+ self.db_hook.timeout = 0
+ self.db_hook.max_ingestion_time = 5
task_post = m.post(
'http://druid-overlord:8081/druid/indexer/v1/task',
text='{"task":"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de"}'
)
status_check = m.get(
- 'http://druid-overlord:8081/druid/indexer/v1/task/9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/status',
+ 'http://druid-overlord:8081/druid/indexer/v1/task/'
+ '9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/status',
text='{"status":{"status": "RUNNING"}}'
)
shutdown_post = m.post(
- 'http://druid-overlord:8081/druid/indexer/v1/task/9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/shutdown',
+ 'http://druid-overlord:8081/druid/indexer/v1/task/'
+ '9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/shutdown',
text='{"task":"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de"}'
)
# Because the jobs keeps running
with self.assertRaises(AirflowException):
- hook.submit_indexing_job('Long json file')
+ self.db_hook.submit_indexing_job('Long json file')
self.assertTrue(task_post.called_once)
self.assertTrue(status_check.called)
self.assertTrue(shutdown_post.called_once)
+ @patch('airflow.hooks.druid_hook.DruidHook.get_connection')
+ def test_get_conn_url(self, mock_get_connection):
+ get_conn_value = MagicMock()
+ get_conn_value.host = 'test_host'
+ get_conn_value.conn_type = 'https'
+ get_conn_value.port = '1'
+ get_conn_value.extra_dejson = {'endpoint': 'ingest'}
+ mock_get_connection.return_value = get_conn_value
+ hook = DruidHook(timeout=0, max_ingestion_time=5)
+ self.assertEquals(hook.get_conn_url(), 'https://test_host:1/ingest')
+
class TestDruidDbApiHook(unittest.TestCase):
def setUp(self):
super(TestDruidDbApiHook, self).setUp()
- self.cur = mock.MagicMock()
- self.conn = conn = mock.MagicMock()
+ self.cur = MagicMock()
+ self.conn = conn = MagicMock()
self.conn.host = 'host'
self.conn.port = '1000'
self.conn.conn_type = 'druid'