You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2022/04/05 18:11:26 UTC
[airflow] branch main updated: Fix state and try number for failed mapped tasks (#22757)
This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a1fd82f2a5 Fix state and try number for failed mapped tasks (#22757)
a1fd82f2a5 is described below
commit a1fd82f2a5c9edc4817b04b4ccc257d6e394f886
Author: Ash Berlin-Taylor <as...@apache.org>
AuthorDate: Tue Apr 5 19:11:17 2022 +0100
Fix state and try number for failed mapped tasks (#22757)
Without this change the details drawer thinks there are two attempts
rather than one for failed tasks, and a mapped task in UPSTREAM_FAILED
has no_status rather than upstream_failed.
---
airflow/www/utils.py | 15 ++++++++++++---
1 file changed, 12 insertions(+), 3 deletions(-)
diff --git a/airflow/www/utils.py b/airflow/www/utils.py
index fc4c391660..40b88bcd4d 100644
--- a/airflow/www/utils.py
+++ b/airflow/www/utils.py
@@ -63,7 +63,6 @@ def get_mapped_instances(task_instance, session):
TaskInstance.dag_id == task_instance.dag_id,
TaskInstance.run_id == task_instance.run_id,
TaskInstance.task_id == task_instance.task_id,
- TaskInstance.map_index >= 0,
)
.all()
)
@@ -109,6 +108,11 @@ def get_mapped_summary(parent_instance, task_instances):
max((ti.end_date for ti in task_instances if ti.end_date), default=None)
)
+ try_count = (
+ parent_instance.prev_attempted_tries
+ if parent_instance.prev_attempted_tries != 0
+ else parent_instance.try_number
+ )
return {
'dag_id': parent_instance.dag_id,
'task_id': parent_instance.task_id,
@@ -119,7 +123,7 @@ def get_mapped_summary(parent_instance, task_instances):
'mapped_states': mapped_states,
'operator': parent_instance.operator,
'execution_date': datetime_to_string(parent_instance.execution_date),
- 'try_number': parent_instance.try_number,
+ 'try_number': try_count,
}
@@ -132,6 +136,11 @@ def encode_ti(
if is_mapped:
return get_mapped_summary(task_instance, task_instances=get_mapped_instances(task_instance, session))
+ try_count = (
+ task_instance.prev_attempted_tries
+ if task_instance.prev_attempted_tries != 0
+ else task_instance.try_number
+ )
return {
'task_id': task_instance.task_id,
'dag_id': task_instance.dag_id,
@@ -143,7 +152,7 @@ def encode_ti(
'end_date': datetime_to_string(task_instance.end_date),
'operator': task_instance.operator,
'execution_date': datetime_to_string(task_instance.execution_date),
- 'try_number': task_instance.try_number,
+ 'try_number': try_count,
}