You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2022/04/05 18:11:26 UTC

[airflow] branch main updated: Fix state and try number for failed mapped tasks (#22757)

This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new a1fd82f2a5 Fix state and try number for failed mapped tasks (#22757)
a1fd82f2a5 is described below

commit a1fd82f2a5c9edc4817b04b4ccc257d6e394f886
Author: Ash Berlin-Taylor <as...@apache.org>
AuthorDate: Tue Apr 5 19:11:17 2022 +0100

    Fix state and try number for failed mapped tasks (#22757)
    
    Without this change the details drawer thinks there are two attempts
    rather than one for failed tasks, and a mapped task in UPSTREAM_FAILED
    has no_status rather than upstream_failed.
---
 airflow/www/utils.py | 15 ++++++++++++---
 1 file changed, 12 insertions(+), 3 deletions(-)

diff --git a/airflow/www/utils.py b/airflow/www/utils.py
index fc4c391660..40b88bcd4d 100644
--- a/airflow/www/utils.py
+++ b/airflow/www/utils.py
@@ -63,7 +63,6 @@ def get_mapped_instances(task_instance, session):
             TaskInstance.dag_id == task_instance.dag_id,
             TaskInstance.run_id == task_instance.run_id,
             TaskInstance.task_id == task_instance.task_id,
-            TaskInstance.map_index >= 0,
         )
         .all()
     )
@@ -109,6 +108,11 @@ def get_mapped_summary(parent_instance, task_instances):
         max((ti.end_date for ti in task_instances if ti.end_date), default=None)
     )
 
+    try_count = (
+        parent_instance.prev_attempted_tries
+        if parent_instance.prev_attempted_tries != 0
+        else parent_instance.try_number
+    )
     return {
         'dag_id': parent_instance.dag_id,
         'task_id': parent_instance.task_id,
@@ -119,7 +123,7 @@ def get_mapped_summary(parent_instance, task_instances):
         'mapped_states': mapped_states,
         'operator': parent_instance.operator,
         'execution_date': datetime_to_string(parent_instance.execution_date),
-        'try_number': parent_instance.try_number,
+        'try_number': try_count,
     }
 
 
@@ -132,6 +136,11 @@ def encode_ti(
     if is_mapped:
         return get_mapped_summary(task_instance, task_instances=get_mapped_instances(task_instance, session))
 
+    try_count = (
+        task_instance.prev_attempted_tries
+        if task_instance.prev_attempted_tries != 0
+        else task_instance.try_number
+    )
     return {
         'task_id': task_instance.task_id,
         'dag_id': task_instance.dag_id,
@@ -143,7 +152,7 @@ def encode_ti(
         'end_date': datetime_to_string(task_instance.end_date),
         'operator': task_instance.operator,
         'execution_date': datetime_to_string(task_instance.execution_date),
-        'try_number': task_instance.try_number,
+        'try_number': try_count,
     }