You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by fo...@apache.org on 2018/05/30 08:59:06 UTC
incubator-airflow git commit: [AIRFLOW-2466] consider task_id in
_change_state_for_tis_without_dagrun
Repository: incubator-airflow
Updated Branches:
refs/heads/master 7c1d7db3d -> 0e892ccd7
[AIRFLOW-2466] consider task_id in _change_state_for_tis_without_dagrun
Closes #3360 from gwax/AF2466
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/0e892ccd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/0e892ccd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/0e892ccd
Branch: refs/heads/master
Commit: 0e892ccd7092c6d63b73b418097be2185c0d00e3
Parents: 7c1d7db
Author: George Leslie-Waksman <ge...@cloverhealth.com>
Authored: Wed May 30 10:59:01 2018 +0200
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Wed May 30 10:59:01 2018 +0200
----------------------------------------------------------------------
airflow/jobs.py | 1 +
tests/jobs.py | 42 ++++++++++++++++++++++++++++--------------
2 files changed, 29 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0e892ccd/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 810ee47..a019879 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -1014,6 +1014,7 @@ class SchedulerJob(BaseJob):
.query(models.TaskInstance) \
.filter(and_(
models.TaskInstance.dag_id == subq.c.dag_id,
+ models.TaskInstance.task_id == subq.c.task_id,
models.TaskInstance.execution_date ==
subq.c.execution_date)) \
.update({models.TaskInstance.state: new_state},
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0e892ccd/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index c7e1810..504d149 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -1611,13 +1611,17 @@ class SchedulerJobTest(unittest.TestCase):
self.assertEqual(State.QUEUED, ti.state)
def test_change_state_for_tis_without_dagrun(self):
- dag = DAG(
+ dag1 = DAG(
dag_id='test_change_state_for_tis_without_dagrun',
start_date=DEFAULT_DATE)
DummyOperator(
task_id='dummy',
- dag=dag,
+ dag=dag1,
+ owner='airflow')
+ DummyOperator(
+ task_id='dummy_b',
+ dag=dag1,
owner='airflow')
dag2 = DAG(
@@ -1639,7 +1643,7 @@ class SchedulerJobTest(unittest.TestCase):
owner='airflow')
session = settings.Session()
- dr = dag.create_dagrun(run_id=DagRun.ID_PREFIX,
+ dr1 = dag1.create_dagrun(run_id=DagRun.ID_PREFIX,
state=State.RUNNING,
execution_date=DEFAULT_DATE,
start_date=DEFAULT_DATE,
@@ -1651,8 +1655,10 @@ class SchedulerJobTest(unittest.TestCase):
start_date=DEFAULT_DATE,
session=session)
- ti = dr.get_task_instance(task_id='dummy', session=session)
- ti.state = State.SCHEDULED
+ ti1a = dr1.get_task_instance(task_id='dummy', session=session)
+ ti1a.state = State.SCHEDULED
+ ti1b = dr1.get_task_instance(task_id='dummy_b', session=session)
+ ti1b.state = State.SUCCESS
session.commit()
ti2 = dr2.get_task_instance(task_id='dummy', session=session)
@@ -1664,7 +1670,7 @@ class SchedulerJobTest(unittest.TestCase):
session.merge(ti3)
session.commit()
- dagbag = self._make_simple_dag_bag([dag, dag2, dag3])
+ dagbag = self._make_simple_dag_bag([dag1, dag2, dag3])
scheduler = SchedulerJob(num_runs=0, run_duration=0)
scheduler._change_state_for_tis_without_dagrun(
simple_dag_bag=dagbag,
@@ -1672,9 +1678,13 @@ class SchedulerJobTest(unittest.TestCase):
new_state=State.NONE,
session=session)
- ti = dr.get_task_instance(task_id='dummy', session=session)
- ti.refresh_from_db(session=session)
- self.assertEqual(ti.state, State.SCHEDULED)
+ ti1a = dr1.get_task_instance(task_id='dummy', session=session)
+ ti1a.refresh_from_db(session=session)
+ self.assertEqual(ti1a.state, State.SCHEDULED)
+
+ ti1b = dr1.get_task_instance(task_id='dummy_b', session=session)
+ ti1b.refresh_from_db(session=session)
+ self.assertEqual(ti1b.state, State.SUCCESS)
ti2 = dr2.get_task_instance(task_id='dummy', session=session)
ti2.refresh_from_db(session=session)
@@ -1683,11 +1693,11 @@ class SchedulerJobTest(unittest.TestCase):
ti3.refresh_from_db(session=session)
self.assertEquals(ti3.state, State.NONE)
- dr.refresh_from_db(session=session)
- dr.state = State.FAILED
+ dr1.refresh_from_db(session=session)
+ dr1.state = State.FAILED
# why o why
- session.merge(dr)
+ session.merge(dr1)
session.commit()
scheduler._change_state_for_tis_without_dagrun(
@@ -1695,8 +1705,12 @@ class SchedulerJobTest(unittest.TestCase):
old_states=[State.SCHEDULED, State.QUEUED],
new_state=State.NONE,
session=session)
- ti.refresh_from_db(session=session)
- self.assertEqual(ti.state, State.NONE)
+ ti1a.refresh_from_db(session=session)
+ self.assertEqual(ti1a.state, State.NONE)
+
+ # don't touch ti1b
+ ti1b.refresh_from_db(session=session)
+ self.assertEqual(ti1b.state, State.SUCCESS)
# don't touch ti2
ti2.refresh_from_db(session=session)