You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2018/01/11 19:39:30 UTC
incubator-airflow git commit: [AIRFLOW-790] Clean up TaskInstances
without DagRuns
Repository: incubator-airflow
Updated Branches:
refs/heads/master 1f3b60792 -> 26911864f
[AIRFLOW-790] Clean up TaskInstances without DagRuns
Closes #2886 from gwax/AIRFLOW-
790_guard_against_orphans
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/26911864
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/26911864
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/26911864
Branch: refs/heads/master
Commit: 26911864fecb8cf160db732605b66e3243b10436
Parents: 1f3b607
Author: George Leslie-Waksman <ge...@cloverhealth.com>
Authored: Thu Jan 11 20:39:23 2018 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Thu Jan 11 20:39:23 2018 +0100
----------------------------------------------------------------------
.gitignore | 3 +++
airflow/jobs.py | 51 +++++++++++++++++++++++++--------------------------
tests/jobs.py | 37 ++++++++++++++++++++++++++++---------
3 files changed, 56 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/26911864/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 254ad15..e32065d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -113,6 +113,9 @@ ENV/
# PyCharm
.idea/
+# Visual Studio Code
+.vscode/
+
# vim
*.swp
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/26911864/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 6ed3f31..ae6969e 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -949,8 +949,9 @@ class SchedulerJob(BaseJob):
"""
For all DAG IDs in the SimpleDagBag, look for task instances in the
old_states and set them to new_state if the corresponding DagRun
- exists but is not in the running state. This normally should not
- happen, but it can if the state of DagRuns are changed manually.
+ does not exist or exists but is not in the running state. This
+ normally should not happen, but it can if the state of DagRuns are
+ changed manually.
:param old_states: examine TaskInstances in this state
:type old_state: list[State]
@@ -961,35 +962,33 @@ class SchedulerJob(BaseJob):
:type simple_dag_bag: SimpleDagBag
"""
tis_changed = 0
+ query = session \
+ .query(models.TaskInstance) \
+ .outerjoin(models.DagRun, and_(
+ models.TaskInstance.dag_id == models.DagRun.dag_id,
+ models.TaskInstance.execution_date == models.DagRun.execution_date)) \
+ .filter(models.TaskInstance.dag_id.in_(simple_dag_bag.dag_ids)) \
+ .filter(models.TaskInstance.state.in_(old_states)) \
+ .filter(or_(
+ models.DagRun.state != State.RUNNING,
+ models.DagRun.state.is_(None)))
if self.using_sqlite:
- tis_to_change = (
- session
- .query(models.TaskInstance)
- .filter(models.TaskInstance.dag_id.in_(simple_dag_bag.dag_ids))
- .filter(models.TaskInstance.state.in_(old_states))
- .filter(and_(
- models.DagRun.dag_id == models.TaskInstance.dag_id,
- models.DagRun.execution_date == models.TaskInstance.execution_date,
- models.DagRun.state != State.RUNNING))
- .with_for_update()
- .all()
- )
+ tis_to_change = query \
+ .with_for_update() \
+ .all()
for ti in tis_to_change:
ti.set_state(new_state, session=session)
tis_changed += 1
else:
- tis_changed = (
- session
- .query(models.TaskInstance)
- .filter(models.TaskInstance.dag_id.in_(simple_dag_bag.dag_ids))
- .filter(models.TaskInstance.state.in_(old_states))
- .filter(and_(
- models.DagRun.dag_id == models.TaskInstance.dag_id,
- models.DagRun.execution_date == models.TaskInstance.execution_date,
- models.DagRun.state != State.RUNNING))
- .update({models.TaskInstance.state: new_state},
- synchronize_session=False)
- )
+ subq = query.subquery()
+ tis_changed = session \
+ .query(models.TaskInstance) \
+ .filter(and_(
+ models.TaskInstance.dag_id == subq.c.dag_id,
+ models.TaskInstance.execution_date ==
+ subq.c.execution_date)) \
+ .update({models.TaskInstance.state: new_state},
+ synchronize_session=False)
session.commit()
if tis_changed > 0:
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/26911864/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index 5d0420a..e522de5 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -1531,6 +1531,15 @@ class SchedulerJobTest(unittest.TestCase):
dag=dag2,
owner='airflow')
+ dag3 = DAG(
+ dag_id='test_change_state_for_tis_without_dagrun_no_dagrun',
+ start_date=DEFAULT_DATE)
+
+ DummyOperator(
+ task_id='dummy',
+ dag=dag3,
+ owner='airflow')
+
session = settings.Session()
dr = dag.create_dagrun(run_id=DagRun.ID_PREFIX,
state=State.RUNNING,
@@ -1552,12 +1561,18 @@ class SchedulerJobTest(unittest.TestCase):
ti2.state = State.SCHEDULED
session.commit()
- dagbag = self._make_simple_dag_bag([dag])
+ ti3 = TI(dag3.get_task('dummy'), DEFAULT_DATE)
+ ti3.state = State.SCHEDULED
+ session.merge(ti3)
+ session.commit()
+
+ dagbag = self._make_simple_dag_bag([dag, dag2, dag3])
scheduler = SchedulerJob(num_runs=0, run_duration=0)
- scheduler._change_state_for_tis_without_dagrun(simple_dag_bag=dagbag,
- old_states=[State.SCHEDULED, State.QUEUED],
- new_state=State.NONE,
- session=session)
+ scheduler._change_state_for_tis_without_dagrun(
+ simple_dag_bag=dagbag,
+ old_states=[State.SCHEDULED, State.QUEUED],
+ new_state=State.NONE,
+ session=session)
ti = dr.get_task_instance(task_id='dummy', session=session)
ti.refresh_from_db(session=session)
@@ -1567,6 +1582,9 @@ class SchedulerJobTest(unittest.TestCase):
ti2.refresh_from_db(session=session)
self.assertEqual(ti2.state, State.SCHEDULED)
+ ti3.refresh_from_db(session=session)
+ self.assertEquals(ti3.state, State.NONE)
+
dr.refresh_from_db(session=session)
dr.state = State.FAILED
@@ -1574,10 +1592,11 @@ class SchedulerJobTest(unittest.TestCase):
session.merge(dr)
session.commit()
- scheduler._change_state_for_tis_without_dagrun(simple_dag_bag=dagbag,
- old_states=[State.SCHEDULED, State.QUEUED],
- new_state=State.NONE,
- session=session)
+ scheduler._change_state_for_tis_without_dagrun(
+ simple_dag_bag=dagbag,
+ old_states=[State.SCHEDULED, State.QUEUED],
+ new_state=State.NONE,
+ session=session)
ti.refresh_from_db(session=session)
self.assertEqual(ti.state, State.NONE)