You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2016/12/25 13:55:14 UTC
incubator-airflow git commit: [AIRFLOW-403] Bash operator's kill
method leaves underlying processes running
Repository: incubator-airflow
Updated Branches:
refs/heads/master 0f9112daf -> d3abe2c3c
[AIRFLOW-403] Bash operator's kill method leaves underlying processes running
Currently only the main process is being killed due to the fact that the
process group is not being terminated.
Closes #1714 from spektom/bash_operator_kill
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/d3abe2c3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/d3abe2c3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/d3abe2c3
Branch: refs/heads/master
Commit: d3abe2c3c2dd2ef53b00282de9f9e3a4512ab068
Parents: 0f9112d
Author: Michael Spector <sp...@gmail.com>
Authored: Sun Dec 25 14:53:50 2016 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Sun Dec 25 14:53:53 2016 +0100
----------------------------------------------------------------------
airflow/models.py | 13 ++++++++-----
airflow/operators/bash_operator.py | 10 +++++++---
tests/core.py | 22 ++++++++++++++++++++++
3 files changed, 37 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d3abe2c3/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 55b855b..f6f7968 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -60,7 +60,7 @@ import six
from airflow import settings, utils
from airflow.executors import DEFAULT_EXECUTOR, LocalExecutor
from airflow import configuration
-from airflow.exceptions import AirflowException, AirflowSkipException
+from airflow.exceptions import AirflowException, AirflowSkipException, AirflowTaskTimeout
from airflow.dag.base_dag import BaseDag, BaseDagBag
from airflow.ti_deps.deps.not_in_retry_period_dep import NotInRetryPeriodDep
from airflow.ti_deps.deps.prev_dagrun_dep import PrevDagrunDep
@@ -1294,10 +1294,13 @@ class TaskInstance(Base):
# if it goes beyond
result = None
if task_copy.execution_timeout:
- with timeout(int(
- task_copy.execution_timeout.total_seconds())):
- result = task_copy.execute(context=context)
-
+ try:
+ with timeout(int(
+ task_copy.execution_timeout.total_seconds())):
+ result = task_copy.execute(context=context)
+ except AirflowTaskTimeout:
+ task_copy.on_kill()
+ raise
else:
result = task_copy.execute(context=context)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d3abe2c3/airflow/operators/bash_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/bash_operator.py b/airflow/operators/bash_operator.py
index 8e30da4..3146cd6 100644
--- a/airflow/operators/bash_operator.py
+++ b/airflow/operators/bash_operator.py
@@ -14,6 +14,8 @@
from builtins import bytes
+import os
+import signal
import logging
from subprocess import Popen, STDOUT, PIPE
from tempfile import gettempdir, NamedTemporaryFile
@@ -80,7 +82,8 @@ class BashOperator(BaseOperator):
sp = Popen(
['bash', fname],
stdout=PIPE, stderr=STDOUT,
- cwd=tmp_dir, env=self.env)
+ cwd=tmp_dir, env=self.env,
+ preexec_fn=os.setsid)
self.sp = sp
@@ -100,5 +103,6 @@ class BashOperator(BaseOperator):
return line
def on_kill(self):
- logging.info('Sending SIGTERM signal to bash subprocess')
- self.sp.terminate()
+ logging.info('Sending SIGTERM signal to bash process group')
+ os.killpg(os.getpgid(self.sp.pid), signal.SIGTERM)
+
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d3abe2c3/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 24315f1..85e7fa1 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -416,6 +416,28 @@ class CoreTest(unittest.TestCase):
output_encoding='utf-8')
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+ def test_bash_operator_kill(self):
+ import subprocess
+ import psutil
+ sleep_time = "100%d" % os.getpid()
+ t = BashOperator(
+ task_id='test_bash_operator_kill',
+ execution_timeout=timedelta(seconds=1),
+ bash_command="/bin/bash -c 'sleep %s'" % sleep_time,
+ dag=self.dag)
+ self.assertRaises(
+ exceptions.AirflowTaskTimeout,
+ t.run,
+ start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+ sleep(2)
+ pid = -1
+ for proc in psutil.process_iter():
+ if proc.cmdline() == ['sleep', sleep_time]:
+ pid = proc.pid
+ if pid != -1:
+ os.kill(pid, signal.SIGTERM)
+ self.fail("BashOperator's subprocess still running after stopping on timeout!")
+
def test_trigger_dagrun(self):
def trigga(context, obj):
if True: