You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "pierrejeambrun (via GitHub)" <gi...@apache.org> on 2023/09/22 13:34:03 UTC

[GitHub] [airflow] pierrejeambrun commented on a diff in pull request #34529: Add ability to clear downstream tis in "List Task Instances" view

pierrejeambrun commented on code in PR #34529:
URL: https://github.com/apache/airflow/pull/34529#discussion_r1334371070


##########
airflow/www/views.py:
##########
@@ -5803,21 +5859,42 @@ def duration_f(self):
     @provide_session
     @action_logging
     def action_clear(self, task_instances, session: Session = NEW_SESSION):
-        """Clears the action."""
+        """Clears an arbitrary number of task instances."""
         try:
-            dag_to_tis = collections.defaultdict(list)
-
-            for ti in task_instances:
-                dag = get_airflow_app().dag_bag.get_dag(ti.dag_id)
-                dag_to_tis[dag].append(ti)
+            count = self._clear_task_instances(
+                task_instances=task_instances, session=session, clear_downstream=False
+            )
+            session.commit()
+            flash(f"{count} task instances have been cleared")
+        except Exception as e:
+            flash(f'Failed to clear task instances: "{e}"', "error")
 
-            for dag, task_instances_list in dag_to_tis.items():
-                models.clear_task_instances(task_instances_list, session, dag=dag)
+        self.update_redirect()
+        return redirect(self.get_redirect())
 
+    @action(
+        "clear_downstream",
+        lazy_gettext("Clear (including downstream tasks)"),
+        lazy_gettext(
+            "Are you sure you want to clear the state of the selected task"
+            " instance(s) and all their downstream dependencie(s), and set their dagruns to the QUEUED state?"

Review Comment:
   singular should be `dependency`, also avoid adding to the wordlist `dependencie` that is mostly a typo. I wouldn't bother and put `dependencies` (always). 1 task can have multiple dependencies, seems fine



##########
tests/www/views/test_views_tasks.py:
##########
@@ -857,6 +859,63 @@ def test_task_instance_clear(session, request, client_fixture, should_succeed):
     assert state == (State.NONE if should_succeed else initial_state)
 
 
+def test_task_instance_clear_downstream(session, admin_client, dag_maker):
+    """Ensures clearing a task instance clears its downstream dependencies exclusively"""
+    with dag_maker(
+        dag_id="test_dag_id",
+        serialized=True,
+        session=session,
+        start_date=pendulum.DateTime(2023, 1, 1, 0, 0, 0, tzinfo=pendulum.UTC),
+    ):
+        EmptyOperator(task_id="task_1") >> EmptyOperator(task_id="task_2")
+        EmptyOperator(task_id="task_3")
+
+    run1 = dag_maker.create_dagrun(
+        run_id="run_1",
+        state=DagRunState.SUCCESS,
+        run_type=DagRunType.SCHEDULED,
+        execution_date=dag_maker.dag.start_date,
+        start_date=dag_maker.dag.start_date,
+        session=session,
+    )
+
+    run2 = dag_maker.create_dagrun(
+        run_id="run_2",
+        state=DagRunState.SUCCESS,
+        run_type=DagRunType.SCHEDULED,
+        execution_date=dag_maker.dag.start_date.add(days=1),
+        start_date=dag_maker.dag.start_date.add(days=1),
+        session=session,
+    )
+
+    for run in (run1, run2):
+        for ti in run.task_instances:
+            ti.state = State.SUCCESS
+
+    # Clear task_1 from dag run 1
+    run1_ti1 = run1.get_task_instance(task_id="task_1")
+    rowid = _get_appbuilder_pk_string(TaskInstanceModelView, run1_ti1)
+    resp = admin_client.post(
+        "/taskinstance/action_post",
+        data={"action": "clear_downstream", "rowid": rowid},
+        follow_redirects=True,
+    )
+    assert resp.status_code == 200
+
+    # Assert that task_2 of dag run 1 is cleared, but task_3 is left untouched

Review Comment:
   ```suggestion
       # Assert that task_1 and task_2 of dag run 1 are cleared, but task_3 is left untouched
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org