You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by fo...@apache.org on 2018/05/30 08:59:06 UTC

incubator-airflow git commit: [AIRFLOW-2466] consider task_id in _change_state_for_tis_without_dagrun

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 7c1d7db3d -> 0e892ccd7


[AIRFLOW-2466] consider task_id in _change_state_for_tis_without_dagrun

Closes #3360 from gwax/AF2466


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/0e892ccd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/0e892ccd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/0e892ccd

Branch: refs/heads/master
Commit: 0e892ccd7092c6d63b73b418097be2185c0d00e3
Parents: 7c1d7db
Author: George Leslie-Waksman <ge...@cloverhealth.com>
Authored: Wed May 30 10:59:01 2018 +0200
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Wed May 30 10:59:01 2018 +0200

----------------------------------------------------------------------
 airflow/jobs.py |  1 +
 tests/jobs.py   | 42 ++++++++++++++++++++++++++++--------------
 2 files changed, 29 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0e892ccd/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 810ee47..a019879 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -1014,6 +1014,7 @@ class SchedulerJob(BaseJob):
                 .query(models.TaskInstance) \
                 .filter(and_(
                     models.TaskInstance.dag_id == subq.c.dag_id,
+                    models.TaskInstance.task_id == subq.c.task_id,
                     models.TaskInstance.execution_date ==
                     subq.c.execution_date)) \
                 .update({models.TaskInstance.state: new_state},

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0e892ccd/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index c7e1810..504d149 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -1611,13 +1611,17 @@ class SchedulerJobTest(unittest.TestCase):
             self.assertEqual(State.QUEUED, ti.state)
 
     def test_change_state_for_tis_without_dagrun(self):
-        dag = DAG(
+        dag1 = DAG(
             dag_id='test_change_state_for_tis_without_dagrun',
             start_date=DEFAULT_DATE)
 
         DummyOperator(
             task_id='dummy',
-            dag=dag,
+            dag=dag1,
+            owner='airflow')
+        DummyOperator(
+            task_id='dummy_b',
+            dag=dag1,
             owner='airflow')
 
         dag2 = DAG(
@@ -1639,7 +1643,7 @@ class SchedulerJobTest(unittest.TestCase):
             owner='airflow')
 
         session = settings.Session()
-        dr = dag.create_dagrun(run_id=DagRun.ID_PREFIX,
+        dr1 = dag1.create_dagrun(run_id=DagRun.ID_PREFIX,
                                state=State.RUNNING,
                                execution_date=DEFAULT_DATE,
                                start_date=DEFAULT_DATE,
@@ -1651,8 +1655,10 @@ class SchedulerJobTest(unittest.TestCase):
                                  start_date=DEFAULT_DATE,
                                  session=session)
 
-        ti = dr.get_task_instance(task_id='dummy', session=session)
-        ti.state = State.SCHEDULED
+        ti1a = dr1.get_task_instance(task_id='dummy', session=session)
+        ti1a.state = State.SCHEDULED
+        ti1b = dr1.get_task_instance(task_id='dummy_b', session=session)
+        ti1b.state = State.SUCCESS
         session.commit()
 
         ti2 = dr2.get_task_instance(task_id='dummy', session=session)
@@ -1664,7 +1670,7 @@ class SchedulerJobTest(unittest.TestCase):
         session.merge(ti3)
         session.commit()
 
-        dagbag = self._make_simple_dag_bag([dag, dag2, dag3])
+        dagbag = self._make_simple_dag_bag([dag1, dag2, dag3])
         scheduler = SchedulerJob(num_runs=0, run_duration=0)
         scheduler._change_state_for_tis_without_dagrun(
             simple_dag_bag=dagbag,
@@ -1672,9 +1678,13 @@ class SchedulerJobTest(unittest.TestCase):
             new_state=State.NONE,
             session=session)
 
-        ti = dr.get_task_instance(task_id='dummy', session=session)
-        ti.refresh_from_db(session=session)
-        self.assertEqual(ti.state, State.SCHEDULED)
+        ti1a = dr1.get_task_instance(task_id='dummy', session=session)
+        ti1a.refresh_from_db(session=session)
+        self.assertEqual(ti1a.state, State.SCHEDULED)
+
+        ti1b = dr1.get_task_instance(task_id='dummy_b', session=session)
+        ti1b.refresh_from_db(session=session)
+        self.assertEqual(ti1b.state, State.SUCCESS)
 
         ti2 = dr2.get_task_instance(task_id='dummy', session=session)
         ti2.refresh_from_db(session=session)
@@ -1683,11 +1693,11 @@ class SchedulerJobTest(unittest.TestCase):
         ti3.refresh_from_db(session=session)
         self.assertEquals(ti3.state, State.NONE)
 
-        dr.refresh_from_db(session=session)
-        dr.state = State.FAILED
+        dr1.refresh_from_db(session=session)
+        dr1.state = State.FAILED
 
         # why o why
-        session.merge(dr)
+        session.merge(dr1)
         session.commit()
 
         scheduler._change_state_for_tis_without_dagrun(
@@ -1695,8 +1705,12 @@ class SchedulerJobTest(unittest.TestCase):
             old_states=[State.SCHEDULED, State.QUEUED],
             new_state=State.NONE,
             session=session)
-        ti.refresh_from_db(session=session)
-        self.assertEqual(ti.state, State.NONE)
+        ti1a.refresh_from_db(session=session)
+        self.assertEqual(ti1a.state, State.NONE)
+
+        # don't touch ti1b
+        ti1b.refresh_from_db(session=session)
+        self.assertEqual(ti1b.state, State.SUCCESS)
 
         # don't touch ti2
         ti2.refresh_from_db(session=session)