You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/10/19 17:09:25 UTC

[GitHub] [airflow] kaxil commented on a change in pull request #11184: Speed up `dag.clear()` when clearing lots of ExternalTaskSensor and ExternalTaskMarker

kaxil commented on a change in pull request #11184:
URL: https://github.com/apache/airflow/pull/11184#discussion_r507915333



##########
File path: airflow/models/dag.py
##########
@@ -1209,51 +1214,60 @@ def clear(
             instances = tis.all()
             for ti in instances:
                 if ti.operator == ExternalTaskMarker.__name__:
-                    task: ExternalTaskMarker = cast(ExternalTaskMarker, copy.copy(self.get_task(ti.task_id)))
-                    ti.task = task
-
-                    if recursion_depth == 0:
-                        # Maximum recursion depth allowed is the recursion_depth of the first
-                        # ExternalTaskMarker in the tasks to be cleared.
-                        max_recursion_depth = task.recursion_depth
-
-                    if recursion_depth + 1 > max_recursion_depth:
-                        # Prevent cycles or accidents.
-                        raise AirflowException("Maximum recursion depth {} reached for {} {}. "
-                                               "Attempted to clear too many tasks "
-                                               "or there may be a cyclic dependency."
-                                               .format(max_recursion_depth,
-                                                       ExternalTaskMarker.__name__, ti.task_id))
-                    ti.render_templates()
-                    external_tis = session.query(TI).filter(TI.dag_id == task.external_dag_id,
-                                                            TI.task_id == task.external_task_id,
-                                                            TI.execution_date ==
-                                                            pendulum.parse(task.execution_date))
-
-                    for tii in external_tis:
-                        if not dag_bag:
-                            dag_bag = DagBag()
-                        external_dag = dag_bag.get_dag(tii.dag_id)
-                        if not external_dag:
-                            raise AirflowException("Could not find dag {}".format(tii.dag_id))
-                        downstream = external_dag.sub_dag(
-                            task_regex=r"^{}$".format(tii.task_id),
-                            include_upstream=False,
-                            include_downstream=True
-                        )
-                        tis = tis.union(downstream.clear(start_date=tii.execution_date,
-                                                         end_date=tii.execution_date,
-                                                         only_failed=only_failed,
-                                                         only_running=only_running,
-                                                         confirm_prompt=confirm_prompt,
-                                                         include_subdags=include_subdags,
-                                                         include_parentdag=False,
-                                                         dag_run_state=dag_run_state,
-                                                         get_tis=True,
-                                                         session=session,
-                                                         recursion_depth=recursion_depth + 1,
-                                                         max_recursion_depth=max_recursion_depth,
-                                                         dag_bag=dag_bag))
+                    if visited_external_tis is None:
+                        visited_external_tis = set()
+                    ti_key = ti.key.primary
+                    if ti_key not in visited_external_tis:
+                        # Only clear this ExternalTaskMarker if it's not already visited by the
+                        # recursive calls to dag.clear().
+                        task: ExternalTaskMarker = cast(ExternalTaskMarker,
+                                                        copy.copy(self.get_task(ti.task_id)))
+                        ti.task = task
+
+                        if recursion_depth == 0:
+                            # Maximum recursion depth allowed is the recursion_depth of the first
+                            # ExternalTaskMarker in the tasks to be cleared.
+                            max_recursion_depth = task.recursion_depth
+
+                        if recursion_depth + 1 > max_recursion_depth:
+                            # Prevent cycles or accidents.
+                            raise AirflowException("Maximum recursion depth {} reached for {} {}. "
+                                                   "Attempted to clear too many tasks "
+                                                   "or there may be a cyclic dependency."
+                                                   .format(max_recursion_depth,
+                                                           ExternalTaskMarker.__name__, ti.task_id))
+                        ti.render_templates()
+                        external_tis = session.query(TI).filter(TI.dag_id == task.external_dag_id,
+                                                                TI.task_id == task.external_task_id,
+                                                                TI.execution_date ==
+                                                                pendulum.parse(task.execution_date))
+
+                        for tii in external_tis:
+                            if not dag_bag:
+                                dag_bag = DagBag()

Review comment:
       Pass read_dags_from_db = True to DagBag




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org