You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2018/01/11 19:39:30 UTC

incubator-airflow git commit: [AIRFLOW-790] Clean up TaskInstances without DagRuns

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 1f3b60792 -> 26911864f


[AIRFLOW-790] Clean up TaskInstances without DagRuns

Closes #2886 from gwax/AIRFLOW-
790_guard_against_orphans


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/26911864
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/26911864
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/26911864

Branch: refs/heads/master
Commit: 26911864fecb8cf160db732605b66e3243b10436
Parents: 1f3b607
Author: George Leslie-Waksman <ge...@cloverhealth.com>
Authored: Thu Jan 11 20:39:23 2018 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Thu Jan 11 20:39:23 2018 +0100

----------------------------------------------------------------------
 .gitignore      |  3 +++
 airflow/jobs.py | 51 +++++++++++++++++++++++++--------------------------
 tests/jobs.py   | 37 ++++++++++++++++++++++++++++---------
 3 files changed, 56 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/26911864/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 254ad15..e32065d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -113,6 +113,9 @@ ENV/
 # PyCharm
 .idea/
 
+# Visual Studio Code
+.vscode/
+
 # vim
 *.swp
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/26911864/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 6ed3f31..ae6969e 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -949,8 +949,9 @@ class SchedulerJob(BaseJob):
         """
         For all DAG IDs in the SimpleDagBag, look for task instances in the
         old_states and set them to new_state if the corresponding DagRun
-        exists but is not in the running state. This normally should not
-        happen, but it can if the state of DagRuns are changed manually.
+        does not exist or exists but is not in the running state. This
+        normally should not happen, but it can if the state of DagRuns are
+        changed manually.
 
         :param old_states: examine TaskInstances in this state
         :type old_state: list[State]
@@ -961,35 +962,33 @@ class SchedulerJob(BaseJob):
         :type simple_dag_bag: SimpleDagBag
         """
         tis_changed = 0
+        query = session \
+            .query(models.TaskInstance) \
+            .outerjoin(models.DagRun, and_(
+                models.TaskInstance.dag_id == models.DagRun.dag_id,
+                models.TaskInstance.execution_date == models.DagRun.execution_date)) \
+            .filter(models.TaskInstance.dag_id.in_(simple_dag_bag.dag_ids)) \
+            .filter(models.TaskInstance.state.in_(old_states)) \
+            .filter(or_(
+                models.DagRun.state != State.RUNNING,
+                models.DagRun.state.is_(None)))
         if self.using_sqlite:
-            tis_to_change = (
-                session
-                    .query(models.TaskInstance)
-                    .filter(models.TaskInstance.dag_id.in_(simple_dag_bag.dag_ids))
-                    .filter(models.TaskInstance.state.in_(old_states))
-                    .filter(and_(
-                        models.DagRun.dag_id == models.TaskInstance.dag_id,
-                        models.DagRun.execution_date == models.TaskInstance.execution_date,
-                        models.DagRun.state != State.RUNNING))
-                    .with_for_update()
-                    .all()
-            )
+            tis_to_change = query \
+                .with_for_update() \
+                .all()
             for ti in tis_to_change:
                 ti.set_state(new_state, session=session)
                 tis_changed += 1
         else:
-            tis_changed = (
-                session
-                    .query(models.TaskInstance)
-                    .filter(models.TaskInstance.dag_id.in_(simple_dag_bag.dag_ids))
-                    .filter(models.TaskInstance.state.in_(old_states))
-                    .filter(and_(
-                        models.DagRun.dag_id == models.TaskInstance.dag_id,
-                        models.DagRun.execution_date == models.TaskInstance.execution_date,
-                        models.DagRun.state != State.RUNNING))
-                    .update({models.TaskInstance.state: new_state},
-                            synchronize_session=False)
-            )
+            subq = query.subquery()
+            tis_changed = session \
+                .query(models.TaskInstance) \
+                .filter(and_(
+                    models.TaskInstance.dag_id == subq.c.dag_id,
+                    models.TaskInstance.execution_date ==
+                    subq.c.execution_date)) \
+                .update({models.TaskInstance.state: new_state},
+                        synchronize_session=False)
             session.commit()
 
         if tis_changed > 0:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/26911864/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index 5d0420a..e522de5 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -1531,6 +1531,15 @@ class SchedulerJobTest(unittest.TestCase):
             dag=dag2,
             owner='airflow')
 
+        dag3 = DAG(
+            dag_id='test_change_state_for_tis_without_dagrun_no_dagrun',
+            start_date=DEFAULT_DATE)
+
+        DummyOperator(
+            task_id='dummy',
+            dag=dag3,
+            owner='airflow')
+
         session = settings.Session()
         dr = dag.create_dagrun(run_id=DagRun.ID_PREFIX,
                                state=State.RUNNING,
@@ -1552,12 +1561,18 @@ class SchedulerJobTest(unittest.TestCase):
         ti2.state = State.SCHEDULED
         session.commit()
 
-        dagbag = self._make_simple_dag_bag([dag])
+        ti3 = TI(dag3.get_task('dummy'), DEFAULT_DATE)
+        ti3.state = State.SCHEDULED
+        session.merge(ti3)
+        session.commit()
+
+        dagbag = self._make_simple_dag_bag([dag, dag2, dag3])
         scheduler = SchedulerJob(num_runs=0, run_duration=0)
-        scheduler._change_state_for_tis_without_dagrun(simple_dag_bag=dagbag,
-                                                       old_states=[State.SCHEDULED, State.QUEUED],
-                                                       new_state=State.NONE,
-                                                       session=session)
+        scheduler._change_state_for_tis_without_dagrun(
+            simple_dag_bag=dagbag,
+            old_states=[State.SCHEDULED, State.QUEUED],
+            new_state=State.NONE,
+            session=session)
 
         ti = dr.get_task_instance(task_id='dummy', session=session)
         ti.refresh_from_db(session=session)
@@ -1567,6 +1582,9 @@ class SchedulerJobTest(unittest.TestCase):
         ti2.refresh_from_db(session=session)
         self.assertEqual(ti2.state, State.SCHEDULED)
 
+        ti3.refresh_from_db(session=session)
+        self.assertEquals(ti3.state, State.NONE)
+
         dr.refresh_from_db(session=session)
         dr.state = State.FAILED
 
@@ -1574,10 +1592,11 @@ class SchedulerJobTest(unittest.TestCase):
         session.merge(dr)
         session.commit()
 
-        scheduler._change_state_for_tis_without_dagrun(simple_dag_bag=dagbag,
-                                                       old_states=[State.SCHEDULED, State.QUEUED],
-                                                       new_state=State.NONE,
-                                                       session=session)
+        scheduler._change_state_for_tis_without_dagrun(
+            simple_dag_bag=dagbag,
+            old_states=[State.SCHEDULED, State.QUEUED],
+            new_state=State.NONE,
+            session=session)
         ti.refresh_from_db(session=session)
         self.assertEqual(ti.state, State.NONE)