You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2016/12/25 13:55:14 UTC

incubator-airflow git commit: [AIRFLOW-403] Bash operator's kill method leaves underlying processes running

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 0f9112daf -> d3abe2c3c


[AIRFLOW-403] Bash operator's kill method leaves underlying processes running

Currently only the main process is being killed due to the fact that the
process group is not being terminated.

Closes #1714 from spektom/bash_operator_kill


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/d3abe2c3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/d3abe2c3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/d3abe2c3

Branch: refs/heads/master
Commit: d3abe2c3c2dd2ef53b00282de9f9e3a4512ab068
Parents: 0f9112d
Author: Michael Spector <sp...@gmail.com>
Authored: Sun Dec 25 14:53:50 2016 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Sun Dec 25 14:53:53 2016 +0100

----------------------------------------------------------------------
 airflow/models.py                  | 13 ++++++++-----
 airflow/operators/bash_operator.py | 10 +++++++---
 tests/core.py                      | 22 ++++++++++++++++++++++
 3 files changed, 37 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d3abe2c3/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 55b855b..f6f7968 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -60,7 +60,7 @@ import six
 from airflow import settings, utils
 from airflow.executors import DEFAULT_EXECUTOR, LocalExecutor
 from airflow import configuration
-from airflow.exceptions import AirflowException, AirflowSkipException
+from airflow.exceptions import AirflowException, AirflowSkipException, AirflowTaskTimeout
 from airflow.dag.base_dag import BaseDag, BaseDagBag
 from airflow.ti_deps.deps.not_in_retry_period_dep import NotInRetryPeriodDep
 from airflow.ti_deps.deps.prev_dagrun_dep import PrevDagrunDep
@@ -1294,10 +1294,13 @@ class TaskInstance(Base):
                 # if it goes beyond
                 result = None
                 if task_copy.execution_timeout:
-                    with timeout(int(
-                            task_copy.execution_timeout.total_seconds())):
-                        result = task_copy.execute(context=context)
-
+                    try:
+                        with timeout(int(
+                                task_copy.execution_timeout.total_seconds())):
+                            result = task_copy.execute(context=context)
+                    except AirflowTaskTimeout:
+                        task_copy.on_kill()
+                        raise
                 else:
                     result = task_copy.execute(context=context)
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d3abe2c3/airflow/operators/bash_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/bash_operator.py b/airflow/operators/bash_operator.py
index 8e30da4..3146cd6 100644
--- a/airflow/operators/bash_operator.py
+++ b/airflow/operators/bash_operator.py
@@ -14,6 +14,8 @@
 
 
 from builtins import bytes
+import os
+import signal
 import logging
 from subprocess import Popen, STDOUT, PIPE
 from tempfile import gettempdir, NamedTemporaryFile
@@ -80,7 +82,8 @@ class BashOperator(BaseOperator):
                 sp = Popen(
                     ['bash', fname],
                     stdout=PIPE, stderr=STDOUT,
-                    cwd=tmp_dir, env=self.env)
+                    cwd=tmp_dir, env=self.env,
+                    preexec_fn=os.setsid)
 
                 self.sp = sp
 
@@ -100,5 +103,6 @@ class BashOperator(BaseOperator):
             return line
 
     def on_kill(self):
-        logging.info('Sending SIGTERM signal to bash subprocess')
-        self.sp.terminate()
+        logging.info('Sending SIGTERM signal to bash process group')
+        os.killpg(os.getpgid(self.sp.pid), signal.SIGTERM)
+

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d3abe2c3/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 24315f1..85e7fa1 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -416,6 +416,28 @@ class CoreTest(unittest.TestCase):
             output_encoding='utf-8')
         t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
+    def test_bash_operator_kill(self):
+        import subprocess
+        import psutil
+        sleep_time = "100%d" % os.getpid()
+        t = BashOperator(
+            task_id='test_bash_operator_kill',
+            execution_timeout=timedelta(seconds=1),
+            bash_command="/bin/bash -c 'sleep %s'" % sleep_time,
+            dag=self.dag)
+        self.assertRaises(
+            exceptions.AirflowTaskTimeout,
+            t.run,
+            start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+        sleep(2)
+        pid = -1
+        for proc in psutil.process_iter():
+            if proc.cmdline() == ['sleep', sleep_time]:
+                pid = proc.pid
+        if pid != -1:
+            os.kill(pid, signal.SIGTERM)
+            self.fail("BashOperator's subprocess still running after stopping on timeout!")
+
     def test_trigger_dagrun(self):
         def trigga(context, obj):
             if True: