You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/01/25 22:00:07 UTC

[2/6] incubator-airflow git commit: [AIRFLOW-798] Check return_code before forcing termination

[AIRFLOW-798] Check return_code before forcing termination

LocalTaskJob could still log an error en self destruct,
although the underlying process already exited.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/24d641bc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/24d641bc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/24d641bc

Branch: refs/heads/v1-8-test
Commit: 24d641bc106c112f86771bd394d877dd4df578f9
Parents: a2b0ea3
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Tue Jan 24 12:01:44 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Tue Jan 24 16:40:22 2017 +0100

----------------------------------------------------------------------
 airflow/jobs.py | 27 ++++++++++++---------------
 1 file changed, 12 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/24d641bc/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 8bb93bb..978fc35 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -2081,28 +2081,25 @@ class LocalTaskJob(BaseJob):
             # task is already terminating, let it breathe
             return
 
-        # Suicide pill
-        TI = models.TaskInstance
+        self.task_instance.refresh_from_db()
         ti = self.task_instance
-        new_ti = session.query(TI).filter(
-            TI.dag_id == ti.dag_id, TI.task_id == ti.task_id,
-            TI.execution_date == ti.execution_date).scalar()
-        if new_ti.state == State.RUNNING:
+        if ti.state == State.RUNNING:
             self.was_running = True
             fqdn = socket.getfqdn()
-            if not (fqdn == new_ti.hostname and
-                    self.task_runner.process.pid == new_ti.pid):
-                logging.warning("Recorded hostname and pid of {new_ti.hostname} "
-                                "and {new_ti.pid} do not match this instance's "
+            if not (fqdn == ti.hostname and
+                    self.task_runner.process.pid == ti.pid):
+                logging.warning("Recorded hostname and pid of {ti.hostname} "
+                                "and {ti.pid} do not match this instance's "
                                 "which are {fqdn} and "
-                                "{self.task_runner.process.pid}. Taking the poison pill. "
-                                "So long."
+                                "{self.task_runner.process.pid}. "
+                                "Taking the poison pill. So long."
                                 .format(**locals()))
                 raise AirflowException("Another worker/process is running this job")
-        elif self.was_running and hasattr(self.task_runner, 'process'):
+        elif (self.was_running
+              and self.task_runner.return_code() is None
+              and hasattr(self.task_runner, 'process')):
             logging.warning(
                 "State of this instance has been externally set to "
-                "{self.task_instance.state}. "
-                "Taking the poison pill. So long.".format(**locals()))
+                "{}. Taking the poison pill. So long.".format(ti.state))
             self.task_runner.terminate()
             self.terminating = True