You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bb...@apache.org on 2022/04/20 14:31:34 UTC
[airflow] 03/09: fixup! Accept multiple map_index param from front end
This is an automated email from the ASF dual-hosted git repository.
bbovenzi pushed a commit to branch mapped-instance-actions
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 6c7f1dfb16a758466c3425a917952dd740b08779
Author: Tzu-ping Chung <tp...@astronomer.io>
AuthorDate: Wed Apr 20 10:35:00 2022 +0800
fixup! Accept multiple map_index param from front end
---
airflow/www/views.py | 44 ++++++++++++++++++++++++--------------------
1 file changed, 24 insertions(+), 20 deletions(-)
diff --git a/airflow/www/views.py b/airflow/www/views.py
index aac30f64ff..ae0186e493 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -1960,7 +1960,7 @@ class Airflow(AirflowBaseView):
def _clear_dag_tis(
self,
- dag,
+ dag: DAG,
start_date,
end_date,
origin,
@@ -1995,24 +1995,19 @@ class Airflow(AirflowBaseView):
except AirflowException as ex:
return redirect_or_json(origin, msg=str(ex), status="error")
- if not tis:
- msg = "No task instances to clear"
- return redirect_or_json(origin, msg, status="error")
- elif request.headers.get('Accept') == 'application/json':
- details = [str(t) for t in tis]
+ assert isinstance(tis, collections.abc.Iterable)
+ details = [str(t) for t in tis]
+ if not details:
+ return redirect_or_json(origin, "No task instances to clear", status="error")
+ elif request.headers.get('Accept') == 'application/json':
return htmlsafe_json_dumps(details, separators=(',', ':'))
- else:
- details = "\n".join(str(t) for t in tis)
-
- response = self.render_template(
- 'airflow/confirm.html',
- endpoint=None,
- message="Task instances you are about to clear:",
- details=details,
- )
-
- return response
+ return self.render_template(
+ 'airflow/confirm.html',
+ endpoint=None,
+ message="Task instances you are about to clear:",
+ details="\n".join(details),
+ )
@expose('/clear', methods=['POST'])
@auth.has_access(
@@ -2028,7 +2023,11 @@ class Airflow(AirflowBaseView):
task_id = request.form.get('task_id')
origin = get_safe_url(request.form.get('origin'))
dag = current_app.dag_bag.get_dag(dag_id)
- map_index = request.form.get('map_index')
+
+ if 'map_index' not in request.form:
+ map_indexes: Optional[List[int]] = None
+ else:
+ map_indexes = request.form.getlist('map_index', type=int)
execution_date = request.form.get('execution_date')
execution_date = timezone.parse(execution_date)
@@ -2047,7 +2046,12 @@ class Airflow(AirflowBaseView):
)
end_date = execution_date if not future else None
start_date = execution_date if not past else None
- task_ids = [(task_id, map_index)] if map_index else [task_id]
+
+ if map_indexes is None:
+ task_ids: Union[List[str], List[Tuple[str, int]]] = [task_id]
+ else:
+ task_ids = [(task_id, map_index) for map_index in map_indexes]
+
return self._clear_dag_tis(
dag,
start_date,
@@ -2298,7 +2302,7 @@ class Airflow(AirflowBaseView):
past: bool,
state: TaskInstanceState,
):
- dag = current_app.dag_bag.get_dag(dag_id)
+ dag: DAG = current_app.dag_bag.get_dag(dag_id)
if not run_id:
flash(f"Cannot mark tasks as {state}, seem that DAG {dag_id} has never run", "error")