You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2021/03/19 15:06:04 UTC
[airflow] 02/42: Fix running child tasks in a subdag after clearing
a successful subdag (#14776)
This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit f4cc5c50f350801f43fc17152605d45cc169b452
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Thu Mar 18 11:38:52 2021 +0100
Fix running child tasks in a subdag after clearing a successful subdag (#14776)
After successfully running a SUBDAG, clearing it
(including downstream+recursive) doesn't trigger the inner tasks.
Instead, the subdag is marked successful and the inner tasks all
stay cleared and aren't re-run.
The above problem is because the DagRun state of the subdags are not updated
after clearing. This PR solves it by updating the DagRun state of all DAGs
including subdags when include_subdags is True
(cherry picked from commit 052163516bf91ab7bb53f4ec3c7b5621df515820)
---
airflow/models/dag.py | 10 +++++++--
tests/models/test_dag.py | 55 ++++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 63 insertions(+), 2 deletions(-)
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 8bb32db..d77cdfc 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -1116,13 +1116,15 @@ class DAG(LoggingMixin):
session: Session = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
+ dag_ids: List[str] = None,
) -> None:
- query = session.query(DagRun).filter_by(dag_id=self.dag_id)
+ dag_ids = dag_ids or [self.dag_id]
+ query = session.query(DagRun).filter(DagRun.dag_id.in_(dag_ids))
if start_date:
query = query.filter(DagRun.execution_date >= start_date)
if end_date:
query = query.filter(DagRun.execution_date <= end_date)
- query.update({DagRun.state: state})
+ query.update({DagRun.state: state}, synchronize_session='fetch')
@provide_session
def clear(
@@ -1183,11 +1185,13 @@ class DAG(LoggingMixin):
"""
TI = TaskInstance
tis = session.query(TI)
+ dag_ids = []
if include_subdags:
# Crafting the right filter for dag_id and task_ids combo
conditions = []
for dag in self.subdags + [self]:
conditions.append((TI.dag_id == dag.dag_id) & TI.task_id.in_(dag.task_ids))
+ dag_ids.append(dag.dag_id)
tis = tis.filter(or_(*conditions))
else:
tis = session.query(TI).filter(TI.dag_id == self.dag_id)
@@ -1327,11 +1331,13 @@ class DAG(LoggingMixin):
dag=self,
activate_dag_runs=False, # We will set DagRun state later.
)
+
self.set_dag_runs_state(
session=session,
start_date=start_date,
end_date=end_date,
state=dag_run_state,
+ dag_ids=dag_ids,
)
else:
count = 0
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 60171d8..c923241 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -1299,6 +1299,61 @@ class TestDag(unittest.TestCase):
assert dagrun.state == dag_run_state
@parameterized.expand(
+ [
+ (State.NONE,),
+ (State.RUNNING,),
+ ]
+ )
+ def test_clear_set_dagrun_state_for_subdag(self, dag_run_state):
+ dag_id = 'test_clear_set_dagrun_state_subdag'
+ self._clean_up(dag_id)
+ task_id = 't1'
+ dag = DAG(dag_id, start_date=DEFAULT_DATE, max_active_runs=1)
+ t_1 = DummyOperator(task_id=task_id, dag=dag)
+ subdag = DAG(dag_id + '.test', start_date=DEFAULT_DATE, max_active_runs=1)
+ SubDagOperator(task_id='test', subdag=subdag, dag=dag)
+ t_2 = DummyOperator(task_id='task', dag=subdag)
+
+ session = settings.Session()
+ dagrun_1 = dag.create_dagrun(
+ run_type=DagRunType.BACKFILL_JOB,
+ state=State.FAILED,
+ start_date=DEFAULT_DATE,
+ execution_date=DEFAULT_DATE,
+ )
+ dagrun_2 = subdag.create_dagrun(
+ run_type=DagRunType.BACKFILL_JOB,
+ state=State.FAILED,
+ start_date=DEFAULT_DATE,
+ execution_date=DEFAULT_DATE,
+ )
+ session.merge(dagrun_1)
+ session.merge(dagrun_2)
+ task_instance_1 = TI(t_1, execution_date=DEFAULT_DATE, state=State.RUNNING)
+ task_instance_2 = TI(t_2, execution_date=DEFAULT_DATE, state=State.RUNNING)
+ session.merge(task_instance_1)
+ session.merge(task_instance_2)
+ session.commit()
+
+ dag.clear(
+ start_date=DEFAULT_DATE,
+ end_date=DEFAULT_DATE + datetime.timedelta(days=1),
+ dag_run_state=dag_run_state,
+ include_subdags=True,
+ include_parentdag=False,
+ session=session,
+ )
+
+ dagrun = (
+ session.query(
+ DagRun,
+ )
+ .filter(DagRun.dag_id == subdag.dag_id)
+ .one()
+ )
+ assert dagrun.state == dag_run_state
+
+ @parameterized.expand(
[(state, State.NONE) for state in State.task_states if state != State.RUNNING]
+ [(State.RUNNING, State.SHUTDOWN)]
) # type: ignore