You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/12/08 09:21:47 UTC

incubator-airflow git commit: [AIRFLOW-1884][AIRFLOW-1059] Reset orphaned task state for external dagruns

Repository: incubator-airflow
Updated Branches:
  refs/heads/master f9dcab0fa -> 8626186ca


[AIRFLOW-1884][AIRFLOW-1059] Reset orphaned task state for external dagruns

On scheduler startup, orphaned task instances have
their state cleared and are rescheduled to avoid
having tasks that are stuck in a QUEUED state
forever. Previously, this check ignored backfilled
and externally triggered dagruns, meaning that
backfilled and externally triggered dagruns could
have orphaned tasks that are stuck forever. This
changeset removes the special case logic for
externally triggered dagruns, ensuring that
externally triggered dagruns are crash safe. This
same fix cannot be applied to backfilled dagruns,
so for now backfilled dagruns are not crash safe.

Closes #2843 from grantnicholas/AIRFLOW-1884


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/8626186c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/8626186c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/8626186c

Branch: refs/heads/master
Commit: 8626186ca8c244386a8a97fcaf6d4221270863da
Parents: f9dcab0
Author: GRANT NICHOLAS <gn...@homeaway.com>
Authored: Fri Dec 8 10:21:38 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Fri Dec 8 10:21:38 2017 +0100

----------------------------------------------------------------------
 airflow/jobs.py | 1 -
 tests/jobs.py   | 3 ++-
 2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8626186c/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 28c5980..ec95c3c 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -238,7 +238,6 @@ class BaseJob(Base, LoggingMixin):
                         TI.execution_date == DR.execution_date))
                 .filter(
                     DR.state == State.RUNNING,
-                    DR.external_trigger == False,
                     DR.run_id.notlike(BackfillJob.ID_PREFIX + '%'),
                     TI.state.in_(resettable_states))).all()
         else:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8626186c/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index 77d872f..bdaccab 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -2827,7 +2827,8 @@ class SchedulerJobTest(unittest.TestCase):
         session.merge(dr1)
         session.commit()
 
-        self.assertEquals(0, len(scheduler.reset_state_for_orphaned_tasks(session=session)))
+        reset_tis = scheduler.reset_state_for_orphaned_tasks(session=session)
+        self.assertEquals(1, len(reset_tis))
 
     def test_reset_orphaned_tasks_backfill_dag(self):
         dag_id = 'test_reset_orphaned_tasks_backfill_dag'