You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/10/02 15:09:14 UTC

incubator-airflow git commit: [AIRFLOW-1658] Kill Druid task on timeout

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 938da987c -> c61726288


[AIRFLOW-1658] Kill Druid task on timeout

If the total execution time of a Druid task
exceeds the max timeout
defined, the Airflow task fails, but the Druid
task may still keep
running. This can cause undesired behaviour if
Airflow retries the
task. This patch calls the shutdown endpoint on
the Druid task to
kill any still running Druid task.

This commit also adds tests to ensure that all
mocked requests in
the Druid hook are actually called.

Closes #2644 from
danielvdende/kill_druid_task_on_timeout_exceeded


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/c6172628
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/c6172628
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/c6172628

Branch: refs/heads/master
Commit: c61726288dcdb093c55a38faaf60aef020d0d3e0
Parents: 938da98
Author: Daniel van der Ende <da...@gmail.com>
Authored: Mon Oct 2 17:09:07 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Mon Oct 2 17:09:07 2017 +0200

----------------------------------------------------------------------
 airflow/hooks/druid_hook.py    |  2 ++
 tests/hooks/test_druid_hook.py | 33 +++++++++++++++++++++++++--------
 2 files changed, 27 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c6172628/airflow/hooks/druid_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/druid_hook.py b/airflow/hooks/druid_hook.py
index 0b13670..655f666 100644
--- a/airflow/hooks/druid_hook.py
+++ b/airflow/hooks/druid_hook.py
@@ -73,6 +73,8 @@ class DruidHook(BaseHook):
             sec = sec + 1
 
             if sec > self.max_ingestion_time:
+                # ensure that the job gets killed if the max ingestion time is exceeded
+                requests.post("{0}/{1}/shutdown".format(url, druid_task_id))
                 raise AirflowException('Druid ingestion took more than %s seconds', self.max_ingestion_time)
 
             time.sleep(self.timeout)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c6172628/tests/hooks/test_druid_hook.py
----------------------------------------------------------------------
diff --git a/tests/hooks/test_druid_hook.py b/tests/hooks/test_druid_hook.py
index c049cb2..ddab369 100644
--- a/tests/hooks/test_druid_hook.py
+++ b/tests/hooks/test_druid_hook.py
@@ -33,11 +33,11 @@ class TestDruidHook(unittest.TestCase):
     @requests_mock.mock()
     def test_submit_gone_wrong(self, m):
         hook = DruidHook()
-        m.post(
+        task_post = m.post(
             'http://druid-overlord:8081/druid/indexer/v1/task',
             text='{"task":"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de"}'
         )
-        m.get(
+        status_check = m.get(
             'http://druid-overlord:8081/druid/indexer/v1/task/9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/status',
             text='{"status":{"status": "FAILED"}}'
         )
@@ -46,14 +46,17 @@ class TestDruidHook(unittest.TestCase):
         with self.assertRaises(AirflowException):
             hook.submit_indexing_job('Long json file')
 
+        self.assertTrue(task_post.called_once)
+        self.assertTrue(status_check.called_once)
+
     @requests_mock.mock()
     def test_submit_ok(self, m):
         hook = DruidHook()
-        m.post(
+        task_post = m.post(
             'http://druid-overlord:8081/druid/indexer/v1/task',
             text='{"task":"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de"}'
         )
-        m.get(
+        status_check = m.get(
             'http://druid-overlord:8081/druid/indexer/v1/task/9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/status',
             text='{"status":{"status": "SUCCESS"}}'
         )
@@ -61,14 +64,17 @@ class TestDruidHook(unittest.TestCase):
         # Exists just as it should
         hook.submit_indexing_job('Long json file')
 
+        self.assertTrue(task_post.called_once)
+        self.assertTrue(status_check.called_once)
+
     @requests_mock.mock()
     def test_submit_unknown_response(self, m):
         hook = DruidHook()
-        m.post(
+        task_post = m.post(
             'http://druid-overlord:8081/druid/indexer/v1/task',
             text='{"task":"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de"}'
         )
-        m.get(
+        status_check = m.get(
             'http://druid-overlord:8081/druid/indexer/v1/task/9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/status',
             text='{"status":{"status": "UNKNOWN"}}'
         )
@@ -77,22 +83,33 @@ class TestDruidHook(unittest.TestCase):
         with self.assertRaises(AirflowException):
             hook.submit_indexing_job('Long json file')
 
+        self.assertTrue(task_post.called_once)
+        self.assertTrue(status_check.called_once)
+
     @requests_mock.mock()
     def test_submit_timeout(self, m):
         hook = DruidHook(timeout=0, max_ingestion_time=5)
-        m.post(
+        task_post = m.post(
             'http://druid-overlord:8081/druid/indexer/v1/task',
             text='{"task":"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de"}'
         )
-        m.get(
+        status_check = m.get(
             'http://druid-overlord:8081/druid/indexer/v1/task/9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/status',
             text='{"status":{"status": "RUNNING"}}'
         )
+        shutdown_post = m.post(
+            'http://druid-overlord:8081/druid/indexer/v1/task/9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/shutdown',
+            text='{"task":"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de"}'
+        )
 
         # Because the jobs keeps running
         with self.assertRaises(AirflowException):
             hook.submit_indexing_job('Long json file')
 
+        self.assertTrue(task_post.called_once)
+        self.assertTrue(status_check.called)
+        self.assertTrue(shutdown_post.called_once)
+