You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bb...@apache.org on 2022/04/20 14:31:34 UTC

[airflow] 03/09: fixup! Accept multiple map_index param from front end

This is an automated email from the ASF dual-hosted git repository.

bbovenzi pushed a commit to branch mapped-instance-actions
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 6c7f1dfb16a758466c3425a917952dd740b08779
Author: Tzu-ping Chung <tp...@astronomer.io>
AuthorDate: Wed Apr 20 10:35:00 2022 +0800

    fixup! Accept multiple map_index param from front end
---
 airflow/www/views.py | 44 ++++++++++++++++++++++++--------------------
 1 file changed, 24 insertions(+), 20 deletions(-)

diff --git a/airflow/www/views.py b/airflow/www/views.py
index aac30f64ff..ae0186e493 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -1960,7 +1960,7 @@ class Airflow(AirflowBaseView):
 
     def _clear_dag_tis(
         self,
-        dag,
+        dag: DAG,
         start_date,
         end_date,
         origin,
@@ -1995,24 +1995,19 @@ class Airflow(AirflowBaseView):
         except AirflowException as ex:
             return redirect_or_json(origin, msg=str(ex), status="error")
 
-        if not tis:
-            msg = "No task instances to clear"
-            return redirect_or_json(origin, msg, status="error")
-        elif request.headers.get('Accept') == 'application/json':
-            details = [str(t) for t in tis]
+        assert isinstance(tis, collections.abc.Iterable)
+        details = [str(t) for t in tis]
 
+        if not details:
+            return redirect_or_json(origin, "No task instances to clear", status="error")
+        elif request.headers.get('Accept') == 'application/json':
             return htmlsafe_json_dumps(details, separators=(',', ':'))
-        else:
-            details = "\n".join(str(t) for t in tis)
-
-            response = self.render_template(
-                'airflow/confirm.html',
-                endpoint=None,
-                message="Task instances you are about to clear:",
-                details=details,
-            )
-
-        return response
+        return self.render_template(
+            'airflow/confirm.html',
+            endpoint=None,
+            message="Task instances you are about to clear:",
+            details="\n".join(details),
+        )
 
     @expose('/clear', methods=['POST'])
     @auth.has_access(
@@ -2028,7 +2023,11 @@ class Airflow(AirflowBaseView):
         task_id = request.form.get('task_id')
         origin = get_safe_url(request.form.get('origin'))
         dag = current_app.dag_bag.get_dag(dag_id)
-        map_index = request.form.get('map_index')
+
+        if 'map_index' not in request.form:
+            map_indexes: Optional[List[int]] = None
+        else:
+            map_indexes = request.form.getlist('map_index', type=int)
 
         execution_date = request.form.get('execution_date')
         execution_date = timezone.parse(execution_date)
@@ -2047,7 +2046,12 @@ class Airflow(AirflowBaseView):
         )
         end_date = execution_date if not future else None
         start_date = execution_date if not past else None
-        task_ids = [(task_id, map_index)] if map_index else [task_id]
+
+        if map_indexes is None:
+            task_ids: Union[List[str], List[Tuple[str, int]]] = [task_id]
+        else:
+            task_ids = [(task_id, map_index) for map_index in map_indexes]
+
         return self._clear_dag_tis(
             dag,
             start_date,
@@ -2298,7 +2302,7 @@ class Airflow(AirflowBaseView):
         past: bool,
         state: TaskInstanceState,
     ):
-        dag = current_app.dag_bag.get_dag(dag_id)
+        dag: DAG = current_app.dag_bag.get_dag(dag_id)
 
         if not run_id:
             flash(f"Cannot mark tasks as {state}, seem that DAG {dag_id} has never run", "error")