You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2021/03/18 10:39:12 UTC

[airflow] branch master updated: Fix running child tasks in a subdag after clearing a successful subdag (#14776)

This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 0521635  Fix running child tasks in a subdag after clearing a successful subdag (#14776)
0521635 is described below

commit 052163516bf91ab7bb53f4ec3c7b5621df515820
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Thu Mar 18 11:38:52 2021 +0100

    Fix running child tasks in a subdag after clearing a successful subdag (#14776)
    
    After successfully running a SUBDAG, clearing it
    (including downstream+recursive) doesn't trigger the inner tasks.
    Instead, the subdag is marked successful and the inner tasks all
    stay cleared and aren't re-run.
    
    The above problem is because the DagRun state of the subdags are not updated
    after clearing. This PR solves it by updating the DagRun state of all DAGs
    including subdags when include_subdags is True
---
 airflow/models/dag.py    | 10 +++++++--
 tests/models/test_dag.py | 55 ++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 63 insertions(+), 2 deletions(-)

diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 449c7e7b..25a87bb 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -1116,13 +1116,15 @@ class DAG(LoggingMixin):
         session: Session = None,
         start_date: Optional[datetime] = None,
         end_date: Optional[datetime] = None,
+        dag_ids: List[str] = None,
     ) -> None:
-        query = session.query(DagRun).filter_by(dag_id=self.dag_id)
+        dag_ids = dag_ids or [self.dag_id]
+        query = session.query(DagRun).filter(DagRun.dag_id.in_(dag_ids))
         if start_date:
             query = query.filter(DagRun.execution_date >= start_date)
         if end_date:
             query = query.filter(DagRun.execution_date <= end_date)
-        query.update({DagRun.state: state})
+        query.update({DagRun.state: state}, synchronize_session='fetch')
 
     @provide_session
     def clear(
@@ -1183,11 +1185,13 @@ class DAG(LoggingMixin):
         """
         TI = TaskInstance
         tis = session.query(TI)
+        dag_ids = []
         if include_subdags:
             # Crafting the right filter for dag_id and task_ids combo
             conditions = []
             for dag in self.subdags + [self]:
                 conditions.append((TI.dag_id == dag.dag_id) & TI.task_id.in_(dag.task_ids))
+                dag_ids.append(dag.dag_id)
             tis = tis.filter(or_(*conditions))
         else:
             tis = session.query(TI).filter(TI.dag_id == self.dag_id)
@@ -1327,11 +1331,13 @@ class DAG(LoggingMixin):
                 dag=self,
                 activate_dag_runs=False,  # We will set DagRun state later.
             )
+
             self.set_dag_runs_state(
                 session=session,
                 start_date=start_date,
                 end_date=end_date,
                 state=dag_run_state,
+                dag_ids=dag_ids,
             )
         else:
             count = 0
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 3345da7..d2d0f9a 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -1306,6 +1306,61 @@ class TestDag(unittest.TestCase):
         assert dagrun.state == dag_run_state
 
     @parameterized.expand(
+        [
+            (State.NONE,),
+            (State.RUNNING,),
+        ]
+    )
+    def test_clear_set_dagrun_state_for_subdag(self, dag_run_state):
+        dag_id = 'test_clear_set_dagrun_state_subdag'
+        self._clean_up(dag_id)
+        task_id = 't1'
+        dag = DAG(dag_id, start_date=DEFAULT_DATE, max_active_runs=1)
+        t_1 = DummyOperator(task_id=task_id, dag=dag)
+        subdag = DAG(dag_id + '.test', start_date=DEFAULT_DATE, max_active_runs=1)
+        SubDagOperator(task_id='test', subdag=subdag, dag=dag)
+        t_2 = DummyOperator(task_id='task', dag=subdag)
+
+        session = settings.Session()
+        dagrun_1 = dag.create_dagrun(
+            run_type=DagRunType.BACKFILL_JOB,
+            state=State.FAILED,
+            start_date=DEFAULT_DATE,
+            execution_date=DEFAULT_DATE,
+        )
+        dagrun_2 = subdag.create_dagrun(
+            run_type=DagRunType.BACKFILL_JOB,
+            state=State.FAILED,
+            start_date=DEFAULT_DATE,
+            execution_date=DEFAULT_DATE,
+        )
+        session.merge(dagrun_1)
+        session.merge(dagrun_2)
+        task_instance_1 = TI(t_1, execution_date=DEFAULT_DATE, state=State.RUNNING)
+        task_instance_2 = TI(t_2, execution_date=DEFAULT_DATE, state=State.RUNNING)
+        session.merge(task_instance_1)
+        session.merge(task_instance_2)
+        session.commit()
+
+        dag.clear(
+            start_date=DEFAULT_DATE,
+            end_date=DEFAULT_DATE + datetime.timedelta(days=1),
+            dag_run_state=dag_run_state,
+            include_subdags=True,
+            include_parentdag=False,
+            session=session,
+        )
+
+        dagrun = (
+            session.query(
+                DagRun,
+            )
+            .filter(DagRun.dag_id == subdag.dag_id)
+            .one()
+        )
+        assert dagrun.state == dag_run_state
+
+    @parameterized.expand(
         [(state, State.NONE) for state in State.task_states if state != State.RUNNING]
         + [(State.RUNNING, State.SHUTDOWN)]
     )  # type: ignore