You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by da...@apache.org on 2017/01/20 22:50:07 UTC

incubator-airflow git commit: Revert "[AIRFLOW-779] Task should fail with specific message when deleted"

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 922158751 -> 927f30c9b


Revert "[AIRFLOW-779] Task should fail with specific message when deleted"

This reverts commit 9221587514e2a0155cdced2d3ae50129b0793a10.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/927f30c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/927f30c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/927f30c9

Branch: refs/heads/master
Commit: 927f30c9b15bc3ccd7ea8aa53abf4c9d82b3bef5
Parents: 9221587
Author: Dan Davydov <da...@airbnb.com>
Authored: Fri Jan 20 14:49:53 2017 -0800
Committer: Dan Davydov <da...@airbnb.com>
Committed: Fri Jan 20 14:49:53 2017 -0800

----------------------------------------------------------------------
 airflow/jobs.py | 16 +++++++++++-----
 1 file changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/927f30c9/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 350c6d4..f1de333 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -2016,6 +2016,10 @@ class LocalTaskJob(BaseJob):
         self.pickle_id = pickle_id
         self.mark_success = mark_success
 
+        # terminating state is used so that a job don't try to
+        # terminate multiple times
+        self.terminating = False
+
         # Keeps track of the fact that the task instance has been observed
         # as running at least once
         self.was_running = False
@@ -2079,16 +2083,17 @@ class LocalTaskJob(BaseJob):
     def heartbeat_callback(self, session=None):
         """Self destruct task if state has been moved away from running externally"""
 
+        if self.terminating:
+            # task is already terminating, let it breathe
+            return
+
         # Suicide pill
         TI = models.TaskInstance
         ti = self.task_instance
         new_ti = session.query(TI).filter(
             TI.dag_id == ti.dag_id, TI.task_id == ti.task_id,
             TI.execution_date == ti.execution_date).scalar()
-        if new_ti is None:
-            logging.warning("Task instance does not exist in DB. Terminating")
-            raise AirflowException("Task instance does not exist in DB")
-        elif new_ti.state == State.RUNNING:
+        if new_ti.state == State.RUNNING:
             self.was_running = True
             fqdn = socket.getfqdn()
             if not (fqdn == new_ti.hostname and
@@ -2105,4 +2110,5 @@ class LocalTaskJob(BaseJob):
                 "State of this instance has been externally set to "
                 "{self.task_instance.state}. "
                 "Taking the poison pill. So long.".format(**locals()))
-            raise AirflowException("Task instance state has been changed externally")
+            self.task_runner.terminate()
+            self.terminating = True