You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by qi...@apache.org on 2021/05/29 15:28:22 UTC

[airflow] 02/02: Fix dag.clear() to set multiple dags to running when necessary (#15382)

This is an automated email from the ASF dual-hosted git repository.

qian pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit bdf9bc20354cf924a9fe0929db8613e4974c785e
Author: yuqian90 <yu...@gmail.com>
AuthorDate: Sat May 29 23:01:39 2021 +0800

    Fix dag.clear() to set multiple dags to running when necessary (#15382)
    
    closes: #14260
    related: #9824
    
    When clearing task across dags using ExternalTaskMarker the dag state of the external DagRun is not set to active. So cleared tasks in the external dag will not automatically start if the DagRun is a Failed or Succeeded state.
    
    Two changes are made to fix the issue:
    
    Make clear_task_instances set DagRuns' state to dag_run_state for all the affected DagRuns.
    The filter for DagRun in clear_task_instances is fixed too. Previously, it made an assumption that execution_dates for all the dag_ids are the same, which is not always correct.
    test_external_task_marker_clear_activate is added to make sure the fix does the right thing.
    
    (cherry picked from commit 2bca8a5425c234b04fdf32d6c50ae3a91cd08262)
---
 UPDATING.md                                        | 10 +++
 .../endpoints/task_instance_endpoint.py            | 12 +--
 airflow/models/dag.py                              | 21 ++---
 airflow/models/taskinstance.py                     | 19 +++--
 tests/sensors/test_external_task_sensor.py         | 96 ++++++++++++++++++++++
 5 files changed, 128 insertions(+), 30 deletions(-)

diff --git a/UPDATING.md b/UPDATING.md
index e7a9119..5b03ad8 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -69,6 +69,16 @@ https://developers.google.com/style/inclusive-documentation
 
 -->
 
+## Master
+
+### `activate_dag_runs` argument of the function `clear_task_instances` is replaced with `dag_run_state`
+
+To achieve the previous default behaviour of `clear_task_instances` with `activate_dag_runs=True`, no change is needed. To achieve the previous behaviour of `activate_dag_runs=False`, pass `dag_run_state=False` instead.
+
+### `dag.set_dag_runs_state` is deprecated
+
+The method `set_dag_runs_state` is no longer needed after a bug fix in PR: [#15382](https://github.com/apache/airflow/pull/15382). This method is now deprecated and will be removed in a future version.
+
 ## Airflow 2.1.0
 
 ### New "deprecated_api" extra
diff --git a/airflow/api_connexion/endpoints/task_instance_endpoint.py b/airflow/api_connexion/endpoints/task_instance_endpoint.py
index b84c59a..418bded 100644
--- a/airflow/api_connexion/endpoints/task_instance_endpoint.py
+++ b/airflow/api_connexion/endpoints/task_instance_endpoint.py
@@ -251,18 +251,8 @@ def post_clear_task_instances(dag_id: str, session=None):
     task_instances = dag.clear(get_tis=True, **data)
     if not data["dry_run"]:
         clear_task_instances(
-            task_instances,
-            session,
-            dag=dag,
-            activate_dag_runs=False,  # We will set DagRun state later.
+            task_instances, session, dag=dag, dag_run_state=State.RUNNING if reset_dag_runs else False
         )
-        if reset_dag_runs:
-            dag.set_dag_runs_state(
-                session=session,
-                start_date=data["start_date"],
-                end_date=data["end_date"],
-                state=State.RUNNING,
-            )
     task_instances = task_instances.join(
         DR, and_(DR.dag_id == TI.dag_id, DR.execution_date == TI.execution_date)
     ).add_column(DR.run_id)
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index c90fb4f..8e96554 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -1123,6 +1123,11 @@ class DAG(LoggingMixin):
         end_date: Optional[datetime] = None,
         dag_ids: List[str] = None,
     ) -> None:
+        warnings.warn(
+            "This method is deprecated and will be removed in a future version.",
+            DeprecationWarning,
+            stacklevel=2,
+        )
         dag_ids = dag_ids or [self.dag_id]
         query = session.query(DagRun).filter(DagRun.dag_id.in_(dag_ids))
         if start_date:
@@ -1172,7 +1177,8 @@ class DAG(LoggingMixin):
         :type include_subdags: bool
         :param include_parentdag: Clear tasks in the parent dag of the subdag.
         :type include_parentdag: bool
-        :param dag_run_state: state to set DagRun to
+        :param dag_run_state: state to set DagRun to. If set to False, dagrun state will not
+            be changed.
         :param dry_run: Find the tasks to clear but don't clear them.
         :type dry_run: bool
         :param session: The sqlalchemy session to use
@@ -1193,20 +1199,17 @@ class DAG(LoggingMixin):
         """
         TI = TaskInstance
         tis = session.query(TI)
-        dag_ids = []
         if include_subdags:
             # Crafting the right filter for dag_id and task_ids combo
             conditions = []
             for dag in self.subdags + [self]:
                 conditions.append((TI.dag_id == dag.dag_id) & TI.task_id.in_(dag.task_ids))
-                dag_ids.append(dag.dag_id)
             tis = tis.filter(or_(*conditions))
         else:
             tis = session.query(TI).filter(TI.dag_id == self.dag_id)
             tis = tis.filter(TI.task_id.in_(self.task_ids))
 
         if include_parentdag and self.is_subdag and self.parent_dag is not None:
-            dag_ids.append(self.parent_dag.dag_id)
             p_dag = self.parent_dag.sub_dag(
                 task_ids_or_regex=r"^{}$".format(self.dag_id.split('.')[1]),
                 include_upstream=False,
@@ -1340,15 +1343,7 @@ class DAG(LoggingMixin):
                 tis,
                 session,
                 dag=self,
-                activate_dag_runs=False,  # We will set DagRun state later.
-            )
-
-            self.set_dag_runs_state(
-                session=session,
-                start_date=start_date,
-                end_date=end_date,
-                state=dag_run_state,
-                dag_ids=dag_ids,
+                dag_run_state=dag_run_state,
             )
         else:
             count = 0
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index a7e94bd..ae7eeef 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -133,7 +133,7 @@ def set_error_file(error_file: str, error: Union[str, Exception]) -> None:
 def clear_task_instances(
     tis,
     session,
-    activate_dag_runs=True,
+    dag_run_state: str = State.RUNNING,
     dag=None,
 ):
     """
@@ -142,7 +142,8 @@ def clear_task_instances(
 
     :param tis: a list of task instances
     :param session: current session
-    :param activate_dag_runs: flag to check for active dag run
+    :param dag_run_state: state to set DagRun to. If set to False, dagrun state will not
+        be changed.
     :param dag: DAG object
     """
     job_ids = []
@@ -204,19 +205,25 @@ def clear_task_instances(
         for job in session.query(BaseJob).filter(BaseJob.id.in_(job_ids)).all():  # noqa
             job.state = State.SHUTDOWN
 
-    if activate_dag_runs and tis:
+    if dag_run_state is not False and tis:
         from airflow.models.dagrun import DagRun  # Avoid circular import
 
+        dates_by_dag_id = defaultdict(set)
+        for instance in tis:
+            dates_by_dag_id[instance.dag_id].add(instance.execution_date)
+
         drs = (
             session.query(DagRun)
             .filter(
-                DagRun.dag_id.in_({ti.dag_id for ti in tis}),
-                DagRun.execution_date.in_({ti.execution_date for ti in tis}),
+                or_(
+                    and_(DagRun.dag_id == dag_id, DagRun.execution_date.in_(dates))
+                    for dag_id, dates in dates_by_dag_id.items()
+                )
             )
             .all()
         )
         for dr in drs:
-            dr.state = State.RUNNING
+            dr.state = dag_run_state
             dr.start_date = timezone.utcnow()
 
 
diff --git a/tests/sensors/test_external_task_sensor.py b/tests/sensors/test_external_task_sensor.py
index 55080c9..19161a7 100644
--- a/tests/sensors/test_external_task_sensor.py
+++ b/tests/sensors/test_external_task_sensor.py
@@ -416,6 +416,53 @@ def dag_bag_ext():
     return dag_bag
 
 
+@pytest.fixture
+def dag_bag_parent_child():
+    """
+    Create a DagBag with two DAGs looking like this. task_1 of child_dag_1 on day 1 depends on
+    task_0 of parent_dag_0 on day 1. Therefore, when task_0 of parent_dag_0 on day 1 and day 2
+    are cleared, parent_dag_0 DagRuns need to be set to running on both days, but child_dag_1
+    only needs to be set to running on day 1.
+
+                   day 1   day 2
+
+     parent_dag_0  task_0  task_0
+                     |
+                     |
+                     v
+     child_dag_1   task_1  task_1
+
+    """
+    dag_bag = DagBag(dag_folder=DEV_NULL, include_examples=False)
+
+    day_1 = DEFAULT_DATE
+
+    dag_0 = DAG("parent_dag_0", start_date=day_1, schedule_interval=None)
+    task_0 = ExternalTaskMarker(
+        task_id="task_0",
+        external_dag_id="child_dag_1",
+        external_task_id="task_1",
+        execution_date=day_1.isoformat(),
+        recursion_depth=3,
+        dag=dag_0,
+    )
+
+    dag_1 = DAG("child_dag_1", start_date=day_1, schedule_interval=None)
+    _ = ExternalTaskSensor(
+        task_id="task_1",
+        external_dag_id=dag_0.dag_id,
+        external_task_id=task_0.task_id,
+        execution_date_fn=lambda execution_date: day_1 if execution_date == day_1 else [],
+        mode='reschedule',
+        dag=dag_1,
+    )
+
+    for dag in [dag_0, dag_1]:
+        dag_bag.bag_dag(dag=dag, root_dag=dag)
+
+    return dag_bag
+
+
 def run_tasks(dag_bag, execution_date=DEFAULT_DATE):
     """
     Run all tasks in the DAGs in the given dag_bag. Return the TaskInstance objects as a dict
@@ -464,6 +511,55 @@ def test_external_task_marker_transitive(dag_bag_ext):
     assert_ti_state_equal(ti_b_3, State.NONE)
 
 
+# pylint: disable=redefined-outer-name
+def test_external_task_marker_clear_activate(dag_bag_parent_child):
+    """
+    Test clearing tasks across DAGs and make sure the right DagRuns are activated.
+    """
+    from airflow.utils.session import create_session
+    from airflow.utils.types import DagRunType
+
+    dag_bag = dag_bag_parent_child
+    day_1 = DEFAULT_DATE
+    day_2 = DEFAULT_DATE + timedelta(days=1)
+
+    run_tasks(dag_bag, execution_date=day_1)
+    run_tasks(dag_bag, execution_date=day_2)
+
+    with create_session() as session:
+        for dag in dag_bag.dags.values():
+            for execution_date in [day_1, day_2]:
+                dagrun = dag.create_dagrun(
+                    State.RUNNING, execution_date, run_type=DagRunType.MANUAL, session=session
+                )
+                dagrun.set_state(State.SUCCESS)
+                session.add(dagrun)
+
+        session.commit()
+
+    # Assert that dagruns of all the affected dags are set to SUCCESS before tasks are cleared.
+    for dag in dag_bag.dags.values():
+        for execution_date in [day_1, day_2]:
+            dagrun = dag.get_dagrun(execution_date=execution_date)
+            assert dagrun.state == State.SUCCESS
+
+    dag_0 = dag_bag.get_dag("parent_dag_0")
+    task_0 = dag_0.get_task("task_0")
+    clear_tasks(dag_bag, dag_0, task_0, start_date=day_1, end_date=day_2)
+
+    # Assert that dagruns of all the affected dags are set to RUNNING after tasks are cleared.
+    # Unaffected dagruns should be left as SUCCESS.
+    dagrun_0_1 = dag_bag.get_dag('parent_dag_0').get_dagrun(execution_date=day_1)
+    dagrun_0_2 = dag_bag.get_dag('parent_dag_0').get_dagrun(execution_date=day_2)
+    dagrun_1_1 = dag_bag.get_dag('child_dag_1').get_dagrun(execution_date=day_1)
+    dagrun_1_2 = dag_bag.get_dag('child_dag_1').get_dagrun(execution_date=day_2)
+
+    assert dagrun_0_1.state == State.RUNNING
+    assert dagrun_0_2.state == State.RUNNING
+    assert dagrun_1_1.state == State.RUNNING
+    assert dagrun_1_2.state == State.SUCCESS
+
+
 def test_external_task_marker_future(dag_bag_ext):
     """
     Test clearing tasks with no end_date. This is the case when users clear tasks with