You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/09/24 21:26:08 UTC

[GitHub] [airflow] pierrejeambrun opened a new pull request, #26658: Clear TaskGroup

pierrejeambrun opened a new pull request, #26658:
URL: https://github.com/apache/airflow/pull/26658

   solves: https://github.com/apache/airflow/issues/14529
   
   The goal of this PR is to collect feedback before going too deep into the implementation (tests, UI etc.). If you check at the issue thread, you will notice that there is quite a few concerns/inherent complexity to properly achieve TaskGroup clearing. (especially @potiuk who raised fair concerns about db deadlocks and topological order resolution for clearing tasks).
   
   This implements a simple 'POC', I was able to successfully clear simple task group, and also nested task groups.
   
   The function clearing tasks handles multiple task into one single transaction, which may resolve locks concerns. This code simply find the requested `TaskGroup` for clearing, recursively find all child tasks, then extract the partial_subset, and clear all those tasks.
   
   > Note: I also had to update the `check-init-decorator-arguments` static checks as `ast.unparse` was introduced in python 3.8. [doc](https://docs.python.org/3.10/library/ast.html#ast.unparse)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bbovenzi commented on pull request #26658: Clear TaskGroup

Posted by GitBox <gi...@apache.org>.
bbovenzi commented on PR #26658:
URL: https://github.com/apache/airflow/pull/26658#issuecomment-1286006932

   Testing locally and looking good. I haven't found any issues yet.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #26658: Clear TaskGroup

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #26658:
URL: https://github.com/apache/airflow/pull/26658#discussion_r1021029012


##########
airflow/www/views.py:
##########
@@ -2025,12 +2029,14 @@ def _clear_dag_tis(
         ]
     )
     @action_logging
-    def clear(self):
-        """Clears the Dag."""
+    @provide_session
+    def clear(self, session: Session = NEW_SESSION):

Review Comment:
   ```suggestion
       def clear(self, *, session: Session = NEW_SESSION):
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pierrejeambrun commented on a diff in pull request #26658: Clear TaskGroup

Posted by GitBox <gi...@apache.org>.
pierrejeambrun commented on code in PR #26658:
URL: https://github.com/apache/airflow/pull/26658#discussion_r1016002541


##########
airflow/www/views.py:
##########
@@ -2047,14 +2053,42 @@ def clear(self):
         recursive = request.form.get('recursive') == "true"
         only_failed = request.form.get('only_failed') == "true"
 
-        task_ids: list[str | tuple[str, int]]
-        if map_indexes is None:
-            task_ids = [task_id]
-        else:
-            task_ids = [(task_id, map_index) for map_index in map_indexes]
+        task_ids: list[str | tuple[str, int]] = []
+
+        end_date = execution_date if not future else None
+        start_date = execution_date if not past else None
+
+        if group_id is not None:
+            task_group_dict = dag.task_group.get_task_group_dict()
+            task_group = task_group_dict.get(group_id)
+            if task_group is None:
+                return redirect_or_json(
+                    origin, msg=f"TaskGroup {group_id} could not be found", status="error", status_code=404
+                )
+            task_ids = task_ids_or_regex = [t.task_id for t in task_group.iter_tasks()]
+
+            # Lock the related dag runs to prevent from possible dead lock.
+            # https://github.com/apache/airflow/pull/26658
+            dag_runs_query = session.query(DagRun).filter(DagRun.dag_id == dag_id).with_for_update()
+            if start_date is None and end_date is None:
+                dag_runs_query = dag_runs_query.filter(DagRun.execution_date == start_date)
+            else:
+                if start_date is not None:
+                    dag_runs_query = dag_runs_query.filter(DagRun.execution_date >= start_date)
+
+                if end_date is not None:
+                    dag_runs_query = dag_runs_query.filter(DagRun.execution_date <= end_date)
+
+            _ = dag_runs_query.all()

Review Comment:
   Really interesting, thanks for your answers.
   
   I think you are right, some big use cases might be problematic, even smaller one because we cannot skip locked here. I don't have enough experience for assessing how 'frequent' those scenarios can be, and if they can be harmful.  Do you think this can be enough to be a 'first acceptable version' (with @uranusjr suggested change). Then I can start working on b) which will require quite a few front-end changes. That said we should have what is needed to achieve what you are describing. (especially thanks to react query).
   
   What do you think @potiuk ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pierrejeambrun merged pull request #26658: Clear TaskGroup

Posted by GitBox <gi...@apache.org>.
pierrejeambrun merged PR #26658:
URL: https://github.com/apache/airflow/pull/26658


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pierrejeambrun commented on a diff in pull request #26658: Clear TaskGroup

Posted by GitBox <gi...@apache.org>.
pierrejeambrun commented on code in PR #26658:
URL: https://github.com/apache/airflow/pull/26658#discussion_r985095699


##########
airflow/utils/task_group.py:
##########
@@ -453,6 +454,21 @@ def topological_sort(self, _include_subdag_tasks: bool = False):
 
         return graph_sorted
 
+    def get_task_dict(self) -> dict[str, AbstractOperator]:
+        """Returns a flat dictionary of task_id: AbstractOperator"""
+        task_map = {}
+
+        def build_map(dag_node):
+            if not isinstance(dag_node, TaskGroup):

Review Comment:
   Done



##########
airflow/utils/task_group.py:
##########
@@ -453,6 +454,21 @@ def topological_sort(self, _include_subdag_tasks: bool = False):
 
         return graph_sorted
 
+    def get_task_dict(self) -> dict[str, AbstractOperator]:
+        """Returns a flat dictionary of task_id: AbstractOperator"""
+        task_map = {}
+
+        def build_map(dag_node):
+            if not isinstance(dag_node, TaskGroup):
+                task_map[dag_node.task_id] = dag_node
+                return
+
+            for child in dag_node.children.values():
+                build_map(child)

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pierrejeambrun commented on a diff in pull request #26658: Clear TaskGroup

Posted by GitBox <gi...@apache.org>.
pierrejeambrun commented on code in PR #26658:
URL: https://github.com/apache/airflow/pull/26658#discussion_r985101030


##########
airflow/utils/task_group.py:
##########
@@ -453,6 +454,22 @@ def topological_sort(self, _include_subdag_tasks: bool = False):
 
         return graph_sorted
 
+    def get_task_dict(self) -> dict[str, AbstractOperator]:
+        """Returns a flat dictionary of task_id: AbstractOperator"""
+        task_map = {}
+        groups_to_visit = [self]
+
+        while groups_to_visit:
+            visiting = groups_to_visit.pop(0)
+
+            for child in visiting.children.values():
+                if isinstance(child, AbstractOperator):
+                    task_map[child.task_id] = child
+                elif isinstance(child, TaskGroup):

Review Comment:
   Today not beeing an `AbstracOperator` means that you are a `TaskGroup`. (only 2 subclasses for DAGNode) but still doing an explicit checking for `TaskGroup`, in case that changes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pierrejeambrun commented on a diff in pull request #26658: Clear TaskGroup

Posted by GitBox <gi...@apache.org>.
pierrejeambrun commented on code in PR #26658:
URL: https://github.com/apache/airflow/pull/26658#discussion_r985101030


##########
airflow/utils/task_group.py:
##########
@@ -453,6 +454,22 @@ def topological_sort(self, _include_subdag_tasks: bool = False):
 
         return graph_sorted
 
+    def get_task_dict(self) -> dict[str, AbstractOperator]:
+        """Returns a flat dictionary of task_id: AbstractOperator"""
+        task_map = {}
+        groups_to_visit = [self]
+
+        while groups_to_visit:
+            visiting = groups_to_visit.pop(0)
+
+            for child in visiting.children.values():
+                if isinstance(child, AbstractOperator):
+                    task_map[child.task_id] = child
+                elif isinstance(child, TaskGroup):

Review Comment:
   Today not beeing an `AbstracOperator` means that you are a `TaskGroup`. (only 2 subclasses for DAGNode) but doing an explicit checking for TaskGroup still, in case that changes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pierrejeambrun commented on a diff in pull request #26658: Clear TaskGroup

Posted by GitBox <gi...@apache.org>.
pierrejeambrun commented on code in PR #26658:
URL: https://github.com/apache/airflow/pull/26658#discussion_r1016002541


##########
airflow/www/views.py:
##########
@@ -2047,14 +2053,42 @@ def clear(self):
         recursive = request.form.get('recursive') == "true"
         only_failed = request.form.get('only_failed') == "true"
 
-        task_ids: list[str | tuple[str, int]]
-        if map_indexes is None:
-            task_ids = [task_id]
-        else:
-            task_ids = [(task_id, map_index) for map_index in map_indexes]
+        task_ids: list[str | tuple[str, int]] = []
+
+        end_date = execution_date if not future else None
+        start_date = execution_date if not past else None
+
+        if group_id is not None:
+            task_group_dict = dag.task_group.get_task_group_dict()
+            task_group = task_group_dict.get(group_id)
+            if task_group is None:
+                return redirect_or_json(
+                    origin, msg=f"TaskGroup {group_id} could not be found", status="error", status_code=404
+                )
+            task_ids = task_ids_or_regex = [t.task_id for t in task_group.iter_tasks()]
+
+            # Lock the related dag runs to prevent from possible dead lock.
+            # https://github.com/apache/airflow/pull/26658
+            dag_runs_query = session.query(DagRun).filter(DagRun.dag_id == dag_id).with_for_update()
+            if start_date is None and end_date is None:
+                dag_runs_query = dag_runs_query.filter(DagRun.execution_date == start_date)
+            else:
+                if start_date is not None:
+                    dag_runs_query = dag_runs_query.filter(DagRun.execution_date >= start_date)
+
+                if end_date is not None:
+                    dag_runs_query = dag_runs_query.filter(DagRun.execution_date <= end_date)
+
+            _ = dag_runs_query.all()

Review Comment:
   Really interesting, thanks for your answers.
   
   I think you are right, some big use cases might be problematic, even smaller one because we cannot skip locked here.
   
   I don't have enough experience for assessing how 'frequent' those scenarios can be, and if they can be harmful (hard to recover from etc.). 
   
   Do you think this can be enough to be a **'first acceptable version'** (with @uranusjr changes). Then I can start working on **b)** which will require quite a few front-end changes. That said we should have what is needed to achieve what you are describing. (especially thanks to react query).
   
   What do you think @potiuk ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pierrejeambrun commented on a diff in pull request #26658: Clear TaskGroup

Posted by GitBox <gi...@apache.org>.
pierrejeambrun commented on code in PR #26658:
URL: https://github.com/apache/airflow/pull/26658#discussion_r1021857903


##########
airflow/www/views.py:
##########
@@ -1966,6 +1966,7 @@ def trigger(self, session=None):
         flash(f"Triggered {dag_id}, it should start any moment now.")
         return redirect(origin)
 
+    @provide_session

Review Comment:
   Good idea, make sense.
   
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pierrejeambrun commented on a diff in pull request #26658: Clear TaskGroup

Posted by GitBox <gi...@apache.org>.
pierrejeambrun commented on code in PR #26658:
URL: https://github.com/apache/airflow/pull/26658#discussion_r1016002541


##########
airflow/www/views.py:
##########
@@ -2047,14 +2053,42 @@ def clear(self):
         recursive = request.form.get('recursive') == "true"
         only_failed = request.form.get('only_failed') == "true"
 
-        task_ids: list[str | tuple[str, int]]
-        if map_indexes is None:
-            task_ids = [task_id]
-        else:
-            task_ids = [(task_id, map_index) for map_index in map_indexes]
+        task_ids: list[str | tuple[str, int]] = []
+
+        end_date = execution_date if not future else None
+        start_date = execution_date if not past else None
+
+        if group_id is not None:
+            task_group_dict = dag.task_group.get_task_group_dict()
+            task_group = task_group_dict.get(group_id)
+            if task_group is None:
+                return redirect_or_json(
+                    origin, msg=f"TaskGroup {group_id} could not be found", status="error", status_code=404
+                )
+            task_ids = task_ids_or_regex = [t.task_id for t in task_group.iter_tasks()]
+
+            # Lock the related dag runs to prevent from possible dead lock.
+            # https://github.com/apache/airflow/pull/26658
+            dag_runs_query = session.query(DagRun).filter(DagRun.dag_id == dag_id).with_for_update()
+            if start_date is None and end_date is None:
+                dag_runs_query = dag_runs_query.filter(DagRun.execution_date == start_date)
+            else:
+                if start_date is not None:
+                    dag_runs_query = dag_runs_query.filter(DagRun.execution_date >= start_date)
+
+                if end_date is not None:
+                    dag_runs_query = dag_runs_query.filter(DagRun.execution_date <= end_date)
+
+            _ = dag_runs_query.all()

Review Comment:
   Really interesting, thanks for your answers.
   
   I think you are right, some big use cases might be problematic, even smaller one because we cannot skip locked here.
   
   I don't have enough experience for assessing how 'frequent' those scenarios can be, and if they can be harmful (hard to recover from etc.). 
   
   Do you think this can be enough to be a **'first acceptable version'** (with @uranusjr changes). Then I can start working on **b)** which will require a few front-end changes. That said we should have what is needed to achieve what you are describing. (especially thanks to react query).
   
   What do you think @potiuk ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #26658: Clear TaskGroup

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #26658:
URL: https://github.com/apache/airflow/pull/26658#discussion_r1015864927


##########
airflow/www/views.py:
##########
@@ -2047,14 +2053,42 @@ def clear(self):
         recursive = request.form.get('recursive') == "true"
         only_failed = request.form.get('only_failed') == "true"
 
-        task_ids: list[str | tuple[str, int]]
-        if map_indexes is None:
-            task_ids = [task_id]
-        else:
-            task_ids = [(task_id, map_index) for map_index in map_indexes]
+        task_ids: list[str | tuple[str, int]] = []
+
+        end_date = execution_date if not future else None
+        start_date = execution_date if not past else None
+
+        if group_id is not None:
+            task_group_dict = dag.task_group.get_task_group_dict()
+            task_group = task_group_dict.get(group_id)
+            if task_group is None:
+                return redirect_or_json(
+                    origin, msg=f"TaskGroup {group_id} could not be found", status="error", status_code=404
+                )
+            task_ids = task_ids_or_regex = [t.task_id for t in task_group.iter_tasks()]
+
+            # Lock the related dag runs to prevent from possible dead lock.
+            # https://github.com/apache/airflow/pull/26658
+            dag_runs_query = session.query(DagRun).filter(DagRun.dag_id == dag_id).with_for_update()
+            if start_date is None and end_date is None:
+                dag_runs_query = dag_runs_query.filter(DagRun.execution_date == start_date)
+            else:
+                if start_date is not None:
+                    dag_runs_query = dag_runs_query.filter(DagRun.execution_date >= start_date)
+
+                if end_date is not None:
+                    dag_runs_query = dag_runs_query.filter(DagRun.execution_date <= end_date)
+
+            _ = dag_runs_query.all()

Review Comment:
   Yeah I have a feeling that locking especially mutliple DagRuns will cause  long running queries (more than few  seconds) with multiple DagRuns locked and secondly it might cause UI problems when such clearing willl be a long running operation for many future/past dagruns and multiple tasks belonging to a task group.
   
   I think we should get a good feeling on how long such DagRun lock query can be - when we have big installation will it keep the locks for milliseconds? single seconds? 10-20 seconds? I am afraid (but this is just a guess) it might take longer. Also when you lock such multiple DagRuns and scheduler already keeps any of the DagRuns because it is scheduling that particular DagRun - this query will run for as long as scheduler (or maybe - depending on lock priorities - multiple schedulers) will keep them. I can easily imagine this query might "hang" for multiple seconds while waiting for single DagRun lock release. This is not a problem for schedulers because they use "skip locked" but this is a problem here.
   
   The locks are not applied "atomically" they are likely applied one-by-one. And schedulers use "lock skip locked" in a tight loop. What happens if such a row lock is set by one scheduler and then another scheduler schedules some next one,  the first one releases first lock but will get the next one etc. etc. etc. This might effectively turn that query into multiple minutes query.
   
   Let me just voice again my concerns here (not blocking but something that should be very well thought here) -  when you have such multi-entity operations which interfere with running schedulers, I think it's inevitable you will run into competing for DagRuns and it has potential to run for even multiple minutes. When you run an operation from a UI that can potentially last more than fews seconds  there are two ways the UI problems might appear.
   
   a) You run this operation "synchronously" - you lock everything for the time of opertion and UI gets "blocked" during this operation. This is difficult to achive and it is prone to user pressing "back" and re-running the queries and other operations that might again lock/deadlock.
   
   b) you let it return quickly and keep it running in the background - but then you need to provide some kind of feedback operation and you need to have an entity which will keep running the query and provide feedback and UI to handle those.
   
   Not sure if we are prepared for either of those scenarios.
    
   Maybe my concerns are not founded and in vast majority of cases those operations will be short, but I think we should also think about big use cases.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #26658: Clear TaskGroup

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #26658:
URL: https://github.com/apache/airflow/pull/26658#discussion_r1021027532


##########
airflow/www/views.py:
##########
@@ -1966,6 +1966,7 @@ def trigger(self, session=None):
         flash(f"Triggered {dag_id}, it should start any moment now.")
         return redirect(origin)
 
+    @provide_session

Review Comment:
   Since this is an internal function, we should be able to control all cases where `session` is needed, and explicitly pass it. This decorator is thus redundant and should be removed.
   
   Also, can we take this change to add type hints to all arguments?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pierrejeambrun commented on a diff in pull request #26658: Clear TaskGroup

Posted by GitBox <gi...@apache.org>.
pierrejeambrun commented on code in PR #26658:
URL: https://github.com/apache/airflow/pull/26658#discussion_r1021865946


##########
airflow/www/views.py:
##########
@@ -2025,12 +2029,14 @@ def _clear_dag_tis(
         ]
     )
     @action_logging
-    def clear(self):
-        """Clears the Dag."""
+    @provide_session
+    def clear(self, session: Session = NEW_SESSION):

Review Comment:
   Done, thanks.
   
   Maybe I should update similar functions in this file to be consistent ? i.e `extra_links`, `rendered_k8s`, 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pierrejeambrun commented on a diff in pull request #26658: Clear TaskGroup

Posted by GitBox <gi...@apache.org>.
pierrejeambrun commented on code in PR #26658:
URL: https://github.com/apache/airflow/pull/26658#discussion_r1016002541


##########
airflow/www/views.py:
##########
@@ -2047,14 +2053,42 @@ def clear(self):
         recursive = request.form.get('recursive') == "true"
         only_failed = request.form.get('only_failed') == "true"
 
-        task_ids: list[str | tuple[str, int]]
-        if map_indexes is None:
-            task_ids = [task_id]
-        else:
-            task_ids = [(task_id, map_index) for map_index in map_indexes]
+        task_ids: list[str | tuple[str, int]] = []
+
+        end_date = execution_date if not future else None
+        start_date = execution_date if not past else None
+
+        if group_id is not None:
+            task_group_dict = dag.task_group.get_task_group_dict()
+            task_group = task_group_dict.get(group_id)
+            if task_group is None:
+                return redirect_or_json(
+                    origin, msg=f"TaskGroup {group_id} could not be found", status="error", status_code=404
+                )
+            task_ids = task_ids_or_regex = [t.task_id for t in task_group.iter_tasks()]
+
+            # Lock the related dag runs to prevent from possible dead lock.
+            # https://github.com/apache/airflow/pull/26658
+            dag_runs_query = session.query(DagRun).filter(DagRun.dag_id == dag_id).with_for_update()
+            if start_date is None and end_date is None:
+                dag_runs_query = dag_runs_query.filter(DagRun.execution_date == start_date)
+            else:
+                if start_date is not None:
+                    dag_runs_query = dag_runs_query.filter(DagRun.execution_date >= start_date)
+
+                if end_date is not None:
+                    dag_runs_query = dag_runs_query.filter(DagRun.execution_date <= end_date)
+
+            _ = dag_runs_query.all()

Review Comment:
   Really interesting, thanks for your answers.
   
   I think you are right, some big use cases might be problematic, even smaller once because we cannot skip lock here. While incorporating @uranusjr changes to not load the DagRun object entirely, do you think this can be enough to be a 'first acceptable version'. Then I can start working on b) which will require quite a few front-end changes. That said we should have what is needed to achieve what you are describing. (especially thanks to react query).
   
   What do you think @potiuk ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #26658: Clear TaskGroup

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #26658:
URL: https://github.com/apache/airflow/pull/26658#discussion_r979326927


##########
airflow/utils/task_group.py:
##########
@@ -453,6 +454,21 @@ def topological_sort(self, _include_subdag_tasks: bool = False):
 
         return graph_sorted
 
+    def get_task_dict(self) -> dict[str, AbstractOperator]:
+        """Returns a flat dictionary of task_id: AbstractOperator"""
+        task_map = {}
+
+        def build_map(dag_node):
+            if not isinstance(dag_node, TaskGroup):

Review Comment:
   You want to flip this check, checking something is a task (AbstractOperator) instead of not a task group



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #26658: Clear TaskGroup

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #26658:
URL: https://github.com/apache/airflow/pull/26658#discussion_r985328519


##########
airflow/utils/task_group.py:
##########
@@ -453,6 +454,22 @@ def topological_sort(self, _include_subdag_tasks: bool = False):
 
         return graph_sorted
 
+    def get_task_dict(self) -> dict[str, AbstractOperator]:
+        """Returns a flat dictionary of task_id: AbstractOperator"""
+        task_map = {}
+        groups_to_visit = [self]
+
+        while groups_to_visit:
+            visiting = groups_to_visit.pop(0)
+
+            for child in visiting.children.values():
+                if isinstance(child, AbstractOperator):
+                    task_map[child.task_id] = child
+                elif isinstance(child, TaskGroup):

Review Comment:
   I’d add an additional `else` block below and throw a ValueError or something to make sure a new class does not get ignored accidentally. Also if you annotate `task_map` this would be handled by Mypy.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #26658: Clear TaskGroup

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #26658:
URL: https://github.com/apache/airflow/pull/26658#discussion_r1016506308


##########
airflow/www/views.py:
##########
@@ -2047,14 +2053,42 @@ def clear(self):
         recursive = request.form.get('recursive') == "true"
         only_failed = request.form.get('only_failed') == "true"
 
-        task_ids: list[str | tuple[str, int]]
-        if map_indexes is None:
-            task_ids = [task_id]
-        else:
-            task_ids = [(task_id, map_index) for map_index in map_indexes]
+        task_ids: list[str | tuple[str, int]] = []
+
+        end_date = execution_date if not future else None
+        start_date = execution_date if not past else None
+
+        if group_id is not None:
+            task_group_dict = dag.task_group.get_task_group_dict()
+            task_group = task_group_dict.get(group_id)
+            if task_group is None:
+                return redirect_or_json(
+                    origin, msg=f"TaskGroup {group_id} could not be found", status="error", status_code=404
+                )
+            task_ids = task_ids_or_regex = [t.task_id for t in task_group.iter_tasks()]
+
+            # Lock the related dag runs to prevent from possible dead lock.
+            # https://github.com/apache/airflow/pull/26658
+            dag_runs_query = session.query(DagRun).filter(DagRun.dag_id == dag_id).with_for_update()
+            if start_date is None and end_date is None:
+                dag_runs_query = dag_runs_query.filter(DagRun.execution_date == start_date)
+            else:
+                if start_date is not None:
+                    dag_runs_query = dag_runs_query.filter(DagRun.execution_date >= start_date)
+
+                if end_date is not None:
+                    dag_runs_query = dag_runs_query.filter(DagRun.execution_date <= end_date)
+
+            _ = dag_runs_query.all()

Review Comment:
   Yes. I think it should be fine to start, but I think we should give some warning to the users. Maybe even do some pre-emptive check and see how many DagRuns/Tasks are afected and warn the user when they exceed some experimentally set threshold - I think maybe even someone from Astronomer :D could get some "realistic" bigg-ish installation and we could test this and get some kind of indication when such clearing might become problematic ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pierrejeambrun commented on a diff in pull request #26658: Clear TaskGroup

Posted by GitBox <gi...@apache.org>.
pierrejeambrun commented on code in PR #26658:
URL: https://github.com/apache/airflow/pull/26658#discussion_r1016002541


##########
airflow/www/views.py:
##########
@@ -2047,14 +2053,42 @@ def clear(self):
         recursive = request.form.get('recursive') == "true"
         only_failed = request.form.get('only_failed') == "true"
 
-        task_ids: list[str | tuple[str, int]]
-        if map_indexes is None:
-            task_ids = [task_id]
-        else:
-            task_ids = [(task_id, map_index) for map_index in map_indexes]
+        task_ids: list[str | tuple[str, int]] = []
+
+        end_date = execution_date if not future else None
+        start_date = execution_date if not past else None
+
+        if group_id is not None:
+            task_group_dict = dag.task_group.get_task_group_dict()
+            task_group = task_group_dict.get(group_id)
+            if task_group is None:
+                return redirect_or_json(
+                    origin, msg=f"TaskGroup {group_id} could not be found", status="error", status_code=404
+                )
+            task_ids = task_ids_or_regex = [t.task_id for t in task_group.iter_tasks()]
+
+            # Lock the related dag runs to prevent from possible dead lock.
+            # https://github.com/apache/airflow/pull/26658
+            dag_runs_query = session.query(DagRun).filter(DagRun.dag_id == dag_id).with_for_update()
+            if start_date is None and end_date is None:
+                dag_runs_query = dag_runs_query.filter(DagRun.execution_date == start_date)
+            else:
+                if start_date is not None:
+                    dag_runs_query = dag_runs_query.filter(DagRun.execution_date >= start_date)
+
+                if end_date is not None:
+                    dag_runs_query = dag_runs_query.filter(DagRun.execution_date <= end_date)
+
+            _ = dag_runs_query.all()

Review Comment:
   Really interesting, thanks for your answers.
   
   I think you are right, some big use cases might be problematic, even smaller one because we cannot skip locked here.
   
   I don't have enough experience for assessing how 'frequent' those scenarios can be, and if they can be harmful (hard to recover from etc.). 
   
   Do you think this can be enough to be a **'first acceptable version'** (with @uranusjr changes). Then I can start working on **b)** which will require a few front-end changes. I believe we have what is needed to achieve what you are describing. (especially thanks to `react-query`).
   
   What do you think @potiuk ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pierrejeambrun commented on a diff in pull request #26658: Clear TaskGroup

Posted by GitBox <gi...@apache.org>.
pierrejeambrun commented on code in PR #26658:
URL: https://github.com/apache/airflow/pull/26658#discussion_r1021862512


##########
airflow/www/views.py:
##########
@@ -2047,14 +2053,42 @@ def clear(self):
         recursive = request.form.get("recursive") == "true"
         only_failed = request.form.get("only_failed") == "true"
 
-        task_ids: list[str | tuple[str, int]]
-        if map_indexes is None:
-            task_ids = [task_id]
-        else:
-            task_ids = [(task_id, map_index) for map_index in map_indexes]
+        task_ids: list[str | tuple[str, int]] = []
+
+        end_date = execution_date if not future else None
+        start_date = execution_date if not past else None
+
+        if group_id is not None:
+            task_group_dict = dag.task_group.get_task_group_dict()
+            task_group = task_group_dict.get(group_id)
+            if task_group is None:
+                return redirect_or_json(
+                    origin, msg=f"TaskGroup {group_id} could not be found", status="error", status_code=404
+                )
+            task_ids = task_ids_or_regex = [t.task_id for t in task_group.iter_tasks()]
+
+            # Lock the related dag runs to prevent from possible dead lock.
+            # https://github.com/apache/airflow/pull/26658
+            dag_runs_query = session.query(DagRun.id).filter(DagRun.dag_id == dag_id).with_for_update()
+            if start_date is None and end_date is None:
+                dag_runs_query = dag_runs_query.filter(DagRun.execution_date == start_date)
+            else:
+                if start_date is not None:
+                    dag_runs_query = dag_runs_query.filter(DagRun.execution_date >= start_date)
+
+                if end_date is not None:
+                    dag_runs_query = dag_runs_query.filter(DagRun.execution_date <= end_date)
+
+            _ = dag_runs_query.all()

Review Comment:
   Good idea. I believe we need to clear after `_clear_dag_tis`, so right before returning, so we cannot 'release early', but at least it is explicit now with a `del` statement.
   
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pierrejeambrun closed pull request #26658: Clear TaskGroup

Posted by GitBox <gi...@apache.org>.
pierrejeambrun closed pull request #26658: Clear TaskGroup
URL: https://github.com/apache/airflow/pull/26658


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pierrejeambrun commented on pull request #26658: Clear TaskGroup

Posted by GitBox <gi...@apache.org>.
pierrejeambrun commented on PR #26658:
URL: https://github.com/apache/airflow/pull/26658#issuecomment-1315897249

   @potiuk I think this first version is ready, do you agree ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pierrejeambrun closed pull request #26658: Clear TaskGroup

Posted by GitBox <gi...@apache.org>.
pierrejeambrun closed pull request #26658: Clear TaskGroup
URL: https://github.com/apache/airflow/pull/26658


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #26658: Clear TaskGroup

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #26658:
URL: https://github.com/apache/airflow/pull/26658#discussion_r979326972


##########
airflow/utils/task_group.py:
##########
@@ -453,6 +454,21 @@ def topological_sort(self, _include_subdag_tasks: bool = False):
 
         return graph_sorted
 
+    def get_task_dict(self) -> dict[str, AbstractOperator]:
+        """Returns a flat dictionary of task_id: AbstractOperator"""
+        task_map = {}
+
+        def build_map(dag_node):
+            if not isinstance(dag_node, TaskGroup):
+                task_map[dag_node.task_id] = dag_node
+                return
+
+            for child in dag_node.children.values():
+                build_map(child)

Review Comment:
   It’s probably a good idea to rewrite this as a loop instead of recursive calls, we have had some issues with very deep graphs in the past.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #26658: Clear TaskGroup

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #26658:
URL: https://github.com/apache/airflow/pull/26658#discussion_r1002919415


##########
airflow/utils/task_group.py:
##########
@@ -454,6 +455,26 @@ def topological_sort(self, _include_subdag_tasks: bool = False):
 
         return graph_sorted
 
+    def get_task_dict(self) -> dict[str, AbstractOperator]:
+        """Returns a flat dictionary of task_id: AbstractOperator"""
+        task_map: dict[str, AbstractOperator] = {}
+        groups_to_visit = [self]
+
+        while groups_to_visit:
+            visiting = groups_to_visit.pop(0)
+
+            for child in visiting.children.values():
+                if isinstance(child, AbstractOperator):
+                    task_map[child.task_id] = child
+                elif isinstance(child, TaskGroup):
+                    groups_to_visit.append(child)
+                else:
+                    raise ValueError(
+                        f"Encountered a DAGNode that is not a TaskGroup or an AbstractOperator: {type(child)}"
+                    )
+
+        return task_map

Review Comment:
   Hmm, this feels like a pretty expensive way for what is needed. Unless we actually need to dict, using an iterator is likely better (for this particular feature):
   
   ```python
   def iter_tasks(self) -> Iterator[AbstractOperator]:
       groups_to_visit = [self]
       while groups_to_visit:
           visiting = groups_to_visit.pop(0)
           for child in visiting.children.values():
               if isinstance(child, AbstractOperator):
                   yield child
               elif isinstance(child, TaskGroup):
                   groups_to_visit.append(child)
               else:
                   raise ValueError(f"Encountered unknown DAGNode type: {type(child)}")
   ```



##########
airflow/www/views.py:
##########
@@ -2046,14 +2047,26 @@ def clear(self):
         recursive = request.form.get('recursive') == "true"
         only_failed = request.form.get('only_failed') == "true"
 
-        task_ids: list[str | tuple[str, int]]
-        if map_indexes is None:
-            task_ids = [task_id]
+        task_ids: list[str | tuple[str, int]] = []
+        if group_id is not None:
+            task_group_dict = dag.task_group.get_task_group_dict()
+            task_group = task_group_dict.get(group_id)
+            if task_group is None:
+                return redirect_or_json(
+                    origin, msg=f"TaskGroup {group_id} could not be found", status="error", status_code=404
+                )
+            tasks = task_group.get_task_dict()
+            task_ids = list(tasks.keys())
+            task_ids_or_regex = task_ids

Review Comment:
   ```suggestion
               task_ids = task_ids_or_regex = [t.task_id for t in task_group.iter_tasks()]
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #26658: Clear TaskGroup

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #26658:
URL: https://github.com/apache/airflow/pull/26658#discussion_r1015864927


##########
airflow/www/views.py:
##########
@@ -2047,14 +2053,42 @@ def clear(self):
         recursive = request.form.get('recursive') == "true"
         only_failed = request.form.get('only_failed') == "true"
 
-        task_ids: list[str | tuple[str, int]]
-        if map_indexes is None:
-            task_ids = [task_id]
-        else:
-            task_ids = [(task_id, map_index) for map_index in map_indexes]
+        task_ids: list[str | tuple[str, int]] = []
+
+        end_date = execution_date if not future else None
+        start_date = execution_date if not past else None
+
+        if group_id is not None:
+            task_group_dict = dag.task_group.get_task_group_dict()
+            task_group = task_group_dict.get(group_id)
+            if task_group is None:
+                return redirect_or_json(
+                    origin, msg=f"TaskGroup {group_id} could not be found", status="error", status_code=404
+                )
+            task_ids = task_ids_or_regex = [t.task_id for t in task_group.iter_tasks()]
+
+            # Lock the related dag runs to prevent from possible dead lock.
+            # https://github.com/apache/airflow/pull/26658
+            dag_runs_query = session.query(DagRun).filter(DagRun.dag_id == dag_id).with_for_update()
+            if start_date is None and end_date is None:
+                dag_runs_query = dag_runs_query.filter(DagRun.execution_date == start_date)
+            else:
+                if start_date is not None:
+                    dag_runs_query = dag_runs_query.filter(DagRun.execution_date >= start_date)
+
+                if end_date is not None:
+                    dag_runs_query = dag_runs_query.filter(DagRun.execution_date <= end_date)
+
+            _ = dag_runs_query.all()

Review Comment:
   Yeah I have a feeling that locking especially mutliple DagRuns will cause  long running queries (more than few  seconds) with multiple DagRuns locked and secondly it might cause UI problems when such clearing willl be a long running operation for many future/past dagruns and multiple tasks belonging to a task group.
   
   I think we should get a good feeling on how long such DagRun lock query can be - when we have big installation will it keep the locks for milliseconds? single seconds? 10-20 seconds? I am afraid (but this is just a guess) it might take longer. Also when you lock such multiple DagRuns and scheduler already keeps any of the DagRuns because it is scheduling that particular DagRun - this query will run for as long as scheduler (or maybe - depending on lock priorities - multiple schedulers) will keep them. I can easily imagine this query might "hang" for multiple seconds while waiting for single DagRun lock release. This is not a problem for schedulers because they use "skip locked" but this is a problem here.
   
   The locks are not applied "atomically" they are likely applied one-by-one. And schedulers use "lock skip locked" in a tight look. What happens if such a row lock is set by one scheduler and then another scheduler schedules some next one,  the first one releases first lock but will get the next one etc. etc. etc. This might effectively turn that query into multiple minutes query.
   
   Let me just voice again my concerns here (not blocking but something that should be very well thought here) -  when you have such multi-entity operations which interfere with running schedulers, I think it's inevitable you will run into competing for DagRuns and it has potential to run for even multiple minutes. When you run an operation from a UI that can potentially last more than fews seconds  there are two ways the UI problems might appear.
   
   a) You run this operation "synchronously" - you lock everything for the time of opertion and UI gets "blocked" during this operation. This is difficult to achive and it is prone to user pressing "back" and re-running the queries and other operations that might again lock/deadlock.
   
   b) you let it return quickly and keep it running in the background - but then you need to provide some kind of feedback operation and you need to have an entity which will keep running the query and provide feedback and UI to handle those.
   
   Not sure if we are prepared for either of those scenarios.
    
   Maybe my concerns are not founded and in vast majority of cases those operations will be short, but I think we should also think about big use cases.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] o-nikolas commented on a diff in pull request #26658: Clear TaskGroup

Posted by GitBox <gi...@apache.org>.
o-nikolas commented on code in PR #26658:
URL: https://github.com/apache/airflow/pull/26658#discussion_r996451511


##########
tests/utils/test_task_group.py:
##########
@@ -1263,3 +1263,41 @@ def test_mapped_task_group_id_prefix_task_id():
 
     dag.get_task("t1") == t1
     dag.get_task("g.t2") == t2
+
+
+def test_get_task_dict():
+    with DAG("test_dag", start_date=pendulum.parse("20200101")) as dag:
+        with TaskGroup("section_1") as tg1:
+            EmptyOperator(task_id='task1')
+
+        with TaskGroup("section_2") as tg2:
+            task2 = EmptyOperator(task_id="task2")
+            task3 = EmptyOperator(task_id="task3")
+            mapped_bash_operator = BashOperator.partial(task_id="get_tag_template_result").expand(

Review Comment:
   One super minor nit: This task_id feels a little specific/out of place compared to the others (looks like it might have come from the `example_datacatalog_tag_templates.py` system test?). Maybe just `task4` or `bash_task`? But this isn't a blocking suggestion, approving otherwise :smile: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pierrejeambrun commented on pull request #26658: Clear TaskGroup

Posted by GitBox <gi...@apache.org>.
pierrejeambrun commented on PR #26658:
URL: https://github.com/apache/airflow/pull/26658#issuecomment-1264369464

   For now limiting TaskAction  to 'Clear'  for TaskGroup. We might imaging adding others in the future (Run, Mark Fail, Mark Success)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pierrejeambrun commented on a diff in pull request #26658: Clear TaskGroup

Posted by GitBox <gi...@apache.org>.
pierrejeambrun commented on code in PR #26658:
URL: https://github.com/apache/airflow/pull/26658#discussion_r985101030


##########
airflow/utils/task_group.py:
##########
@@ -453,6 +454,22 @@ def topological_sort(self, _include_subdag_tasks: bool = False):
 
         return graph_sorted
 
+    def get_task_dict(self) -> dict[str, AbstractOperator]:
+        """Returns a flat dictionary of task_id: AbstractOperator"""
+        task_map = {}
+        groups_to_visit = [self]
+
+        while groups_to_visit:
+            visiting = groups_to_visit.pop(0)
+
+            for child in visiting.children.values():
+                if isinstance(child, AbstractOperator):
+                    task_map[child.task_id] = child
+                elif isinstance(child, TaskGroup):

Review Comment:
   Today not beeing an `AbstracOperator` means that you are `TaskGroup`. (only 2 subclasses for DAGNode) but 'elif'  in case that changes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pierrejeambrun commented on a diff in pull request #26658: Clear TaskGroup

Posted by GitBox <gi...@apache.org>.
pierrejeambrun commented on code in PR #26658:
URL: https://github.com/apache/airflow/pull/26658#discussion_r1020905058


##########
airflow/www/views.py:
##########
@@ -2047,14 +2053,42 @@ def clear(self):
         recursive = request.form.get('recursive') == "true"
         only_failed = request.form.get('only_failed') == "true"
 
-        task_ids: list[str | tuple[str, int]]
-        if map_indexes is None:
-            task_ids = [task_id]
-        else:
-            task_ids = [(task_id, map_index) for map_index in map_indexes]
+        task_ids: list[str | tuple[str, int]] = []
+
+        end_date = execution_date if not future else None
+        start_date = execution_date if not past else None
+
+        if group_id is not None:
+            task_group_dict = dag.task_group.get_task_group_dict()
+            task_group = task_group_dict.get(group_id)
+            if task_group is None:
+                return redirect_or_json(
+                    origin, msg=f"TaskGroup {group_id} could not be found", status="error", status_code=404
+                )
+            task_ids = task_ids_or_regex = [t.task_id for t in task_group.iter_tasks()]
+
+            # Lock the related dag runs to prevent from possible dead lock.
+            # https://github.com/apache/airflow/pull/26658
+            dag_runs_query = session.query(DagRun).filter(DagRun.dag_id == dag_id).with_for_update()
+            if start_date is None and end_date is None:
+                dag_runs_query = dag_runs_query.filter(DagRun.execution_date == start_date)
+            else:
+                if start_date is not None:
+                    dag_runs_query = dag_runs_query.filter(DagRun.execution_date >= start_date)
+
+                if end_date is not None:
+                    dag_runs_query = dag_runs_query.filter(DagRun.execution_date <= end_date)
+
+            _ = dag_runs_query.all()

Review Comment:
   @uranusjr Fixed it, we are now requesting only the id to not load the entire object, much better thanks.
   
   @potiuk I added a warning in the confirmation dialog to warn the user that this action could take a while (only for clearing task group across multiple dags -> future or past activated). Then we can merge this and start collecting feedback on this feature, maybe some input from astronomer so I can adjust for the **b)** solution you suggested.
   
   ![image](https://user-images.githubusercontent.com/14861206/201524754-9e993631-8a0c-4141-a70e-74b34b8f66e2.png)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #26658: Clear TaskGroup

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #26658:
URL: https://github.com/apache/airflow/pull/26658#discussion_r1016506308


##########
airflow/www/views.py:
##########
@@ -2047,14 +2053,42 @@ def clear(self):
         recursive = request.form.get('recursive') == "true"
         only_failed = request.form.get('only_failed') == "true"
 
-        task_ids: list[str | tuple[str, int]]
-        if map_indexes is None:
-            task_ids = [task_id]
-        else:
-            task_ids = [(task_id, map_index) for map_index in map_indexes]
+        task_ids: list[str | tuple[str, int]] = []
+
+        end_date = execution_date if not future else None
+        start_date = execution_date if not past else None
+
+        if group_id is not None:
+            task_group_dict = dag.task_group.get_task_group_dict()
+            task_group = task_group_dict.get(group_id)
+            if task_group is None:
+                return redirect_or_json(
+                    origin, msg=f"TaskGroup {group_id} could not be found", status="error", status_code=404
+                )
+            task_ids = task_ids_or_regex = [t.task_id for t in task_group.iter_tasks()]
+
+            # Lock the related dag runs to prevent from possible dead lock.
+            # https://github.com/apache/airflow/pull/26658
+            dag_runs_query = session.query(DagRun).filter(DagRun.dag_id == dag_id).with_for_update()
+            if start_date is None and end_date is None:
+                dag_runs_query = dag_runs_query.filter(DagRun.execution_date == start_date)
+            else:
+                if start_date is not None:
+                    dag_runs_query = dag_runs_query.filter(DagRun.execution_date >= start_date)
+
+                if end_date is not None:
+                    dag_runs_query = dag_runs_query.filter(DagRun.execution_date <= end_date)
+
+            _ = dag_runs_query.all()

Review Comment:
   Yes. I think it should be fine to start, but I think we should give some warning to the users. Maybe even do some pre-emptive check and see how many DagRuns/Tasks are afected and warn the user when they exceed some experimentally set threshold - I think maybe even someone from Astronomer :D could get some "realistic" bigg-ish installation and we could test this and get some kind of indication when such clearing might become problematic) ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #26658: Clear TaskGroup

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #26658:
URL: https://github.com/apache/airflow/pull/26658#discussion_r1016506308


##########
airflow/www/views.py:
##########
@@ -2047,14 +2053,42 @@ def clear(self):
         recursive = request.form.get('recursive') == "true"
         only_failed = request.form.get('only_failed') == "true"
 
-        task_ids: list[str | tuple[str, int]]
-        if map_indexes is None:
-            task_ids = [task_id]
-        else:
-            task_ids = [(task_id, map_index) for map_index in map_indexes]
+        task_ids: list[str | tuple[str, int]] = []
+
+        end_date = execution_date if not future else None
+        start_date = execution_date if not past else None
+
+        if group_id is not None:
+            task_group_dict = dag.task_group.get_task_group_dict()
+            task_group = task_group_dict.get(group_id)
+            if task_group is None:
+                return redirect_or_json(
+                    origin, msg=f"TaskGroup {group_id} could not be found", status="error", status_code=404
+                )
+            task_ids = task_ids_or_regex = [t.task_id for t in task_group.iter_tasks()]
+
+            # Lock the related dag runs to prevent from possible dead lock.
+            # https://github.com/apache/airflow/pull/26658
+            dag_runs_query = session.query(DagRun).filter(DagRun.dag_id == dag_id).with_for_update()
+            if start_date is None and end_date is None:
+                dag_runs_query = dag_runs_query.filter(DagRun.execution_date == start_date)
+            else:
+                if start_date is not None:
+                    dag_runs_query = dag_runs_query.filter(DagRun.execution_date >= start_date)
+
+                if end_date is not None:
+                    dag_runs_query = dag_runs_query.filter(DagRun.execution_date <= end_date)
+
+            _ = dag_runs_query.all()

Review Comment:
   Yes. I think it should be fine to start, but I think we should give some warning to the users (maybe even do some pre-emptive check and see how many DagRuns/Tasks are afected and warn the user when they exceed some experimentally set threshold - I think maybe even someone from Astronomer :D could get some "realistic" bigg-ish installation and we could test this and get some kind of indication when such clearing might become problematic) ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #26658: Clear TaskGroup

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #26658:
URL: https://github.com/apache/airflow/pull/26658#discussion_r1021028895


##########
airflow/www/views.py:
##########
@@ -2047,14 +2053,42 @@ def clear(self):
         recursive = request.form.get("recursive") == "true"
         only_failed = request.form.get("only_failed") == "true"
 
-        task_ids: list[str | tuple[str, int]]
-        if map_indexes is None:
-            task_ids = [task_id]
-        else:
-            task_ids = [(task_id, map_index) for map_index in map_indexes]
+        task_ids: list[str | tuple[str, int]] = []
+
+        end_date = execution_date if not future else None
+        start_date = execution_date if not past else None
+
+        if group_id is not None:
+            task_group_dict = dag.task_group.get_task_group_dict()
+            task_group = task_group_dict.get(group_id)
+            if task_group is None:
+                return redirect_or_json(
+                    origin, msg=f"TaskGroup {group_id} could not be found", status="error", status_code=404
+                )
+            task_ids = task_ids_or_regex = [t.task_id for t in task_group.iter_tasks()]
+
+            # Lock the related dag runs to prevent from possible dead lock.
+            # https://github.com/apache/airflow/pull/26658
+            dag_runs_query = session.query(DagRun.id).filter(DagRun.dag_id == dag_id).with_for_update()
+            if start_date is None and end_date is None:
+                dag_runs_query = dag_runs_query.filter(DagRun.execution_date == start_date)
+            else:
+                if start_date is not None:
+                    dag_runs_query = dag_runs_query.filter(DagRun.execution_date >= start_date)
+
+                if end_date is not None:
+                    dag_runs_query = dag_runs_query.filter(DagRun.execution_date <= end_date)
+
+            _ = dag_runs_query.all()

Review Comment:
   ```suggestion
               locked_dag_run_ids = dag_runs_query.all()
   ```
   
   And `del` the variable explicitly when we can free up those rows.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #26658: Clear TaskGroup

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #26658:
URL: https://github.com/apache/airflow/pull/26658#issuecomment-1288211390

   I still think this has a potential to deadlock when you are clearing a subset of tasks while Scheduler might schedule them. Is it possible to hold a lock on DagRun before attempting it? I think that should prevent any possibility for deadlock ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pierrejeambrun commented on a diff in pull request #26658: Clear TaskGroup

Posted by GitBox <gi...@apache.org>.
pierrejeambrun commented on code in PR #26658:
URL: https://github.com/apache/airflow/pull/26658#discussion_r1010331735


##########
airflow/utils/task_group.py:
##########
@@ -454,6 +455,26 @@ def topological_sort(self, _include_subdag_tasks: bool = False):
 
         return graph_sorted
 
+    def get_task_dict(self) -> dict[str, AbstractOperator]:
+        """Returns a flat dictionary of task_id: AbstractOperator"""
+        task_map: dict[str, AbstractOperator] = {}
+        groups_to_visit = [self]
+
+        while groups_to_visit:
+            visiting = groups_to_visit.pop(0)
+
+            for child in visiting.children.values():
+                if isinstance(child, AbstractOperator):
+                    task_map[child.task_id] = child
+                elif isinstance(child, TaskGroup):
+                    groups_to_visit.append(child)
+                else:
+                    raise ValueError(
+                        f"Encountered a DAGNode that is not a TaskGroup or an AbstractOperator: {type(child)}"
+                    )
+
+        return task_map

Review Comment:
   True, for this specific use case, this is much better. Thanks !
   
   Done 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pierrejeambrun commented on a diff in pull request #26658: Clear TaskGroup

Posted by GitBox <gi...@apache.org>.
pierrejeambrun commented on code in PR #26658:
URL: https://github.com/apache/airflow/pull/26658#discussion_r1016002541


##########
airflow/www/views.py:
##########
@@ -2047,14 +2053,42 @@ def clear(self):
         recursive = request.form.get('recursive') == "true"
         only_failed = request.form.get('only_failed') == "true"
 
-        task_ids: list[str | tuple[str, int]]
-        if map_indexes is None:
-            task_ids = [task_id]
-        else:
-            task_ids = [(task_id, map_index) for map_index in map_indexes]
+        task_ids: list[str | tuple[str, int]] = []
+
+        end_date = execution_date if not future else None
+        start_date = execution_date if not past else None
+
+        if group_id is not None:
+            task_group_dict = dag.task_group.get_task_group_dict()
+            task_group = task_group_dict.get(group_id)
+            if task_group is None:
+                return redirect_or_json(
+                    origin, msg=f"TaskGroup {group_id} could not be found", status="error", status_code=404
+                )
+            task_ids = task_ids_or_regex = [t.task_id for t in task_group.iter_tasks()]
+
+            # Lock the related dag runs to prevent from possible dead lock.
+            # https://github.com/apache/airflow/pull/26658
+            dag_runs_query = session.query(DagRun).filter(DagRun.dag_id == dag_id).with_for_update()
+            if start_date is None and end_date is None:
+                dag_runs_query = dag_runs_query.filter(DagRun.execution_date == start_date)
+            else:
+                if start_date is not None:
+                    dag_runs_query = dag_runs_query.filter(DagRun.execution_date >= start_date)
+
+                if end_date is not None:
+                    dag_runs_query = dag_runs_query.filter(DagRun.execution_date <= end_date)
+
+            _ = dag_runs_query.all()

Review Comment:
   Really interesting, thanks for your answers.
   
   I think you are right, some big use cases might be problematic, even smaller one because we cannot skip locked here. While incorporating @uranusjr changes to not load the DagRun object entirely, do you think this can be enough to be a 'first acceptable version'. Then I can start working on b) which will require quite a few front-end changes. That said we should have what is needed to achieve what you are describing. (especially thanks to react query).
   
   What do you think @potiuk ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #26658: Clear TaskGroup

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #26658:
URL: https://github.com/apache/airflow/pull/26658#discussion_r1021923739


##########
airflow/www/views.py:
##########
@@ -2025,12 +2029,14 @@ def _clear_dag_tis(
         ]
     )
     @action_logging
-    def clear(self):
-        """Clears the Dag."""
+    @provide_session
+    def clear(self, session: Session = NEW_SESSION):

Review Comment:
   Maybe in a separate PR to not mix unrelated things.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pierrejeambrun commented on a diff in pull request #26658: Clear TaskGroup

Posted by GitBox <gi...@apache.org>.
pierrejeambrun commented on code in PR #26658:
URL: https://github.com/apache/airflow/pull/26658#discussion_r985101030


##########
airflow/utils/task_group.py:
##########
@@ -453,6 +454,22 @@ def topological_sort(self, _include_subdag_tasks: bool = False):
 
         return graph_sorted
 
+    def get_task_dict(self) -> dict[str, AbstractOperator]:
+        """Returns a flat dictionary of task_id: AbstractOperator"""
+        task_map = {}
+        groups_to_visit = [self]
+
+        while groups_to_visit:
+            visiting = groups_to_visit.pop(0)
+
+            for child in visiting.children.values():
+                if isinstance(child, AbstractOperator):
+                    task_map[child.task_id] = child
+                elif isinstance(child, TaskGroup):

Review Comment:
   `elif` just in case `DAGNode` get additionnal subclasses



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pierrejeambrun commented on a diff in pull request #26658: Clear TaskGroup

Posted by GitBox <gi...@apache.org>.
pierrejeambrun commented on code in PR #26658:
URL: https://github.com/apache/airflow/pull/26658#discussion_r996452968


##########
tests/utils/test_task_group.py:
##########
@@ -1263,3 +1263,41 @@ def test_mapped_task_group_id_prefix_task_id():
 
     dag.get_task("t1") == t1
     dag.get_task("g.t2") == t2
+
+
+def test_get_task_dict():
+    with DAG("test_dag", start_date=pendulum.parse("20200101")) as dag:
+        with TaskGroup("section_1") as tg1:
+            EmptyOperator(task_id='task1')
+
+        with TaskGroup("section_2") as tg2:
+            task2 = EmptyOperator(task_id="task2")
+            task3 = EmptyOperator(task_id="task3")
+            mapped_bash_operator = BashOperator.partial(task_id="get_tag_template_result").expand(

Review Comment:
   Yes good idea thanks, I'm going to update that :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pierrejeambrun commented on a diff in pull request #26658: Clear TaskGroup

Posted by GitBox <gi...@apache.org>.
pierrejeambrun commented on code in PR #26658:
URL: https://github.com/apache/airflow/pull/26658#discussion_r985101030


##########
airflow/utils/task_group.py:
##########
@@ -453,6 +454,22 @@ def topological_sort(self, _include_subdag_tasks: bool = False):
 
         return graph_sorted
 
+    def get_task_dict(self) -> dict[str, AbstractOperator]:
+        """Returns a flat dictionary of task_id: AbstractOperator"""
+        task_map = {}
+        groups_to_visit = [self]
+
+        while groups_to_visit:
+            visiting = groups_to_visit.pop(0)
+
+            for child in visiting.children.values():
+                if isinstance(child, AbstractOperator):
+                    task_map[child.task_id] = child
+                elif isinstance(child, TaskGroup):

Review Comment:
   Today not beeing an AbstracOperator means that you are are TaskGroup. (only 2 subclasses for DAGNode) but 'elif'  in case that changes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pierrejeambrun commented on pull request #26658: Clear TaskGroup

Posted by GitBox <gi...@apache.org>.
pierrejeambrun commented on PR #26658:
URL: https://github.com/apache/airflow/pull/26658#issuecomment-1298753105

   @potiuk I just updated the code to lock the related dag runs when we clear a  task group. (We may have to lock multiple dag runs depending on the `past` and `future` parameters.
   
   Let me know if this is what you had in mind :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pierrejeambrun commented on a diff in pull request #26658: Clear TaskGroup

Posted by GitBox <gi...@apache.org>.
pierrejeambrun commented on code in PR #26658:
URL: https://github.com/apache/airflow/pull/26658#discussion_r1010331914


##########
airflow/www/views.py:
##########
@@ -2046,14 +2047,26 @@ def clear(self):
         recursive = request.form.get('recursive') == "true"
         only_failed = request.form.get('only_failed') == "true"
 
-        task_ids: list[str | tuple[str, int]]
-        if map_indexes is None:
-            task_ids = [task_id]
+        task_ids: list[str | tuple[str, int]] = []
+        if group_id is not None:
+            task_group_dict = dag.task_group.get_task_group_dict()
+            task_group = task_group_dict.get(group_id)
+            if task_group is None:
+                return redirect_or_json(
+                    origin, msg=f"TaskGroup {group_id} could not be found", status="error", status_code=404
+                )
+            tasks = task_group.get_task_dict()
+            task_ids = list(tasks.keys())
+            task_ids_or_regex = task_ids

Review Comment:
   Done, thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #26658: Clear TaskGroup

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #26658:
URL: https://github.com/apache/airflow/pull/26658#discussion_r1015817554


##########
airflow/www/views.py:
##########
@@ -2047,14 +2053,42 @@ def clear(self):
         recursive = request.form.get('recursive') == "true"
         only_failed = request.form.get('only_failed') == "true"
 
-        task_ids: list[str | tuple[str, int]]
-        if map_indexes is None:
-            task_ids = [task_id]
-        else:
-            task_ids = [(task_id, map_index) for map_index in map_indexes]
+        task_ids: list[str | tuple[str, int]] = []
+
+        end_date = execution_date if not future else None
+        start_date = execution_date if not past else None
+
+        if group_id is not None:
+            task_group_dict = dag.task_group.get_task_group_dict()
+            task_group = task_group_dict.get(group_id)
+            if task_group is None:
+                return redirect_or_json(
+                    origin, msg=f"TaskGroup {group_id} could not be found", status="error", status_code=404
+                )
+            task_ids = task_ids_or_regex = [t.task_id for t in task_group.iter_tasks()]
+
+            # Lock the related dag runs to prevent from possible dead lock.
+            # https://github.com/apache/airflow/pull/26658
+            dag_runs_query = session.query(DagRun).filter(DagRun.dag_id == dag_id).with_for_update()
+            if start_date is None and end_date is None:
+                dag_runs_query = dag_runs_query.filter(DagRun.execution_date == start_date)
+            else:
+                if start_date is not None:
+                    dag_runs_query = dag_runs_query.filter(DagRun.execution_date >= start_date)
+
+                if end_date is not None:
+                    dag_runs_query = dag_runs_query.filter(DagRun.execution_date <= end_date)
+
+            _ = dag_runs_query.all()

Review Comment:
   I feel we should not load the DagRun objects. Is there a better way to lock the rows? If not, at least we should avoid loading the objects (maybe with `session.query(true())` or `session.query(DagRun.id)`)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pierrejeambrun commented on a diff in pull request #26658: Clear TaskGroup

Posted by GitBox <gi...@apache.org>.
pierrejeambrun commented on code in PR #26658:
URL: https://github.com/apache/airflow/pull/26658#discussion_r1016002541


##########
airflow/www/views.py:
##########
@@ -2047,14 +2053,42 @@ def clear(self):
         recursive = request.form.get('recursive') == "true"
         only_failed = request.form.get('only_failed') == "true"
 
-        task_ids: list[str | tuple[str, int]]
-        if map_indexes is None:
-            task_ids = [task_id]
-        else:
-            task_ids = [(task_id, map_index) for map_index in map_indexes]
+        task_ids: list[str | tuple[str, int]] = []
+
+        end_date = execution_date if not future else None
+        start_date = execution_date if not past else None
+
+        if group_id is not None:
+            task_group_dict = dag.task_group.get_task_group_dict()
+            task_group = task_group_dict.get(group_id)
+            if task_group is None:
+                return redirect_or_json(
+                    origin, msg=f"TaskGroup {group_id} could not be found", status="error", status_code=404
+                )
+            task_ids = task_ids_or_regex = [t.task_id for t in task_group.iter_tasks()]
+
+            # Lock the related dag runs to prevent from possible dead lock.
+            # https://github.com/apache/airflow/pull/26658
+            dag_runs_query = session.query(DagRun).filter(DagRun.dag_id == dag_id).with_for_update()
+            if start_date is None and end_date is None:
+                dag_runs_query = dag_runs_query.filter(DagRun.execution_date == start_date)
+            else:
+                if start_date is not None:
+                    dag_runs_query = dag_runs_query.filter(DagRun.execution_date >= start_date)
+
+                if end_date is not None:
+                    dag_runs_query = dag_runs_query.filter(DagRun.execution_date <= end_date)
+
+            _ = dag_runs_query.all()

Review Comment:
   Really interesting.
   
   I think you are right, some big use cases might be problematic. While incorporating @uranusjr changes to not load the DagRun object entirely, do you think this can be enough to be a 'first acceptable version'. Then I can start working on b) which will require quite a few front-end changes. That said we should have what is needed to achieve what you are describing. (especially thanks to react query).
   
   What do you think @potiuk ?



##########
airflow/www/views.py:
##########
@@ -2047,14 +2053,42 @@ def clear(self):
         recursive = request.form.get('recursive') == "true"
         only_failed = request.form.get('only_failed') == "true"
 
-        task_ids: list[str | tuple[str, int]]
-        if map_indexes is None:
-            task_ids = [task_id]
-        else:
-            task_ids = [(task_id, map_index) for map_index in map_indexes]
+        task_ids: list[str | tuple[str, int]] = []
+
+        end_date = execution_date if not future else None
+        start_date = execution_date if not past else None
+
+        if group_id is not None:
+            task_group_dict = dag.task_group.get_task_group_dict()
+            task_group = task_group_dict.get(group_id)
+            if task_group is None:
+                return redirect_or_json(
+                    origin, msg=f"TaskGroup {group_id} could not be found", status="error", status_code=404
+                )
+            task_ids = task_ids_or_regex = [t.task_id for t in task_group.iter_tasks()]
+
+            # Lock the related dag runs to prevent from possible dead lock.
+            # https://github.com/apache/airflow/pull/26658
+            dag_runs_query = session.query(DagRun).filter(DagRun.dag_id == dag_id).with_for_update()
+            if start_date is None and end_date is None:
+                dag_runs_query = dag_runs_query.filter(DagRun.execution_date == start_date)
+            else:
+                if start_date is not None:
+                    dag_runs_query = dag_runs_query.filter(DagRun.execution_date >= start_date)
+
+                if end_date is not None:
+                    dag_runs_query = dag_runs_query.filter(DagRun.execution_date <= end_date)
+
+            _ = dag_runs_query.all()

Review Comment:
   Really interesting, thanks for your answers 
   
   I think you are right, some big use cases might be problematic. While incorporating @uranusjr changes to not load the DagRun object entirely, do you think this can be enough to be a 'first acceptable version'. Then I can start working on b) which will require quite a few front-end changes. That said we should have what is needed to achieve what you are describing. (especially thanks to react query).
   
   What do you think @potiuk ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pierrejeambrun commented on a diff in pull request #26658: Clear TaskGroup

Posted by GitBox <gi...@apache.org>.
pierrejeambrun commented on code in PR #26658:
URL: https://github.com/apache/airflow/pull/26658#discussion_r997417608


##########
tests/utils/test_task_group.py:
##########
@@ -1263,3 +1263,41 @@ def test_mapped_task_group_id_prefix_task_id():
 
     dag.get_task("t1") == t1
     dag.get_task("g.t2") == t2
+
+
+def test_get_task_dict():
+    with DAG("test_dag", start_date=pendulum.parse("20200101")) as dag:
+        with TaskGroup("section_1") as tg1:
+            EmptyOperator(task_id='task1')
+
+        with TaskGroup("section_2") as tg2:
+            task2 = EmptyOperator(task_id="task2")
+            task3 = EmptyOperator(task_id="task3")
+            mapped_bash_operator = BashOperator.partial(task_id="get_tag_template_result").expand(

Review Comment:
   Done, `bash_task` looks nice, ty :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pierrejeambrun commented on a diff in pull request #26658: Clear TaskGroup

Posted by GitBox <gi...@apache.org>.
pierrejeambrun commented on code in PR #26658:
URL: https://github.com/apache/airflow/pull/26658#discussion_r1020905058


##########
airflow/www/views.py:
##########
@@ -2047,14 +2053,42 @@ def clear(self):
         recursive = request.form.get('recursive') == "true"
         only_failed = request.form.get('only_failed') == "true"
 
-        task_ids: list[str | tuple[str, int]]
-        if map_indexes is None:
-            task_ids = [task_id]
-        else:
-            task_ids = [(task_id, map_index) for map_index in map_indexes]
+        task_ids: list[str | tuple[str, int]] = []
+
+        end_date = execution_date if not future else None
+        start_date = execution_date if not past else None
+
+        if group_id is not None:
+            task_group_dict = dag.task_group.get_task_group_dict()
+            task_group = task_group_dict.get(group_id)
+            if task_group is None:
+                return redirect_or_json(
+                    origin, msg=f"TaskGroup {group_id} could not be found", status="error", status_code=404
+                )
+            task_ids = task_ids_or_regex = [t.task_id for t in task_group.iter_tasks()]
+
+            # Lock the related dag runs to prevent from possible dead lock.
+            # https://github.com/apache/airflow/pull/26658
+            dag_runs_query = session.query(DagRun).filter(DagRun.dag_id == dag_id).with_for_update()
+            if start_date is None and end_date is None:
+                dag_runs_query = dag_runs_query.filter(DagRun.execution_date == start_date)
+            else:
+                if start_date is not None:
+                    dag_runs_query = dag_runs_query.filter(DagRun.execution_date >= start_date)
+
+                if end_date is not None:
+                    dag_runs_query = dag_runs_query.filter(DagRun.execution_date <= end_date)
+
+            _ = dag_runs_query.all()

Review Comment:
   @uranusjr Fixed it, we are now requesting only the id to not load the entire object, much better thanks.
   
   @potiuk I added a warning in the confirmation dialog to warn the user that this action could take a while (only for clearing task group across multiple dag -> future or past activated). Then we can merge this and start collecting feedback on this feature, maybe some input from astronomer so I can adjust for the **b)** solution you suggested.
   
   ![image](https://user-images.githubusercontent.com/14861206/201524754-9e993631-8a0c-4141-a70e-74b34b8f66e2.png)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org