You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bb...@apache.org on 2022/03/08 16:27:23 UTC

[airflow] 01/09: make UI and tree work with mapped tasks

This is an automated email from the ASF dual-hosted git repository.

bbovenzi pushed a commit to branch mapped-task-drawer
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit db7a659fbec0c74e5f5ea398d59e58f542778452
Author: Brent Bovenzi <br...@gmail.com>
AuthorDate: Wed Feb 16 11:27:29 2022 -0500

    make UI and tree work with mapped tasks
---
 airflow/www/static/js/tree/InstanceTooltip.jsx |  15 +++
 airflow/www/utils.py                           | 136 ++++++++++++-------------
 2 files changed, 78 insertions(+), 73 deletions(-)

diff --git a/airflow/www/static/js/tree/InstanceTooltip.jsx b/airflow/www/static/js/tree/InstanceTooltip.jsx
index e22189d..bc0d58c 100644
--- a/airflow/www/static/js/tree/InstanceTooltip.jsx
+++ b/airflow/www/static/js/tree/InstanceTooltip.jsx
@@ -25,6 +25,21 @@ import { Box, Text } from '@chakra-ui/react';
 import { formatDateTime, getDuration, formatDuration } from '../datetime_utils';
 import { finalStatesMap } from '../utils';
 
+const STATES = [
+  ['success', 0],
+  ['failed', 0],
+  ['upstream_failed', 0],
+  ['up_for_retry', 0],
+  ['up_for_reschedule', 0],
+  ['running', 0],
+  ['deferred', 0],
+  ['sensing', 0],
+  ['queued', 0],
+  ['scheduled', 0],
+  ['skipped', 0],
+  ['no_status', 0],
+];
+
 const InstanceTooltip = ({
   group,
   instance: {
diff --git a/airflow/www/utils.py b/airflow/www/utils.py
index b1f5e0d..ed765d4 100644
--- a/airflow/www/utils.py
+++ b/airflow/www/utils.py
@@ -43,9 +43,8 @@ from airflow.models import errors
 from airflow.models.taskinstance import TaskInstance
 from airflow.utils import timezone
 from airflow.utils.code_utils import get_python_source
-from airflow.utils.helpers import alchemy_to_dict
 from airflow.utils.json import AirflowJsonEncoder
-from airflow.utils.state import State, TaskInstanceState
+from airflow.utils.state import State
 from airflow.www.forms import DateTimeWithTimezoneField
 from airflow.www.widgets import AirflowDateTimePickerWidget
 
@@ -56,82 +55,13 @@ def datetime_to_string(value: Optional[DateTime]) -> Optional[str]:
     return value.isoformat()
 
 
-def get_mapped_instances(task_instance, session):
-    return (
-        session.query(TaskInstance)
-        .filter(
-            TaskInstance.dag_id == task_instance.dag_id,
-            TaskInstance.run_id == task_instance.run_id,
-            TaskInstance.task_id == task_instance.task_id,
-            TaskInstance.map_index >= 0,
-        )
-        .all()
-    )
-
-
-def get_instance_with_map(task_instance, session):
-    if task_instance.map_index == -1:
-        return alchemy_to_dict(task_instance)
-    mapped_instances = get_mapped_instances(task_instance, session)
-    return get_mapped_summary(task_instance, mapped_instances)
-
-
-def get_mapped_summary(parent_instance, task_instances):
-    priority = [
-        TaskInstanceState.FAILED,
-        TaskInstanceState.UPSTREAM_FAILED,
-        TaskInstanceState.UP_FOR_RETRY,
-        TaskInstanceState.UP_FOR_RESCHEDULE,
-        TaskInstanceState.QUEUED,
-        TaskInstanceState.SCHEDULED,
-        TaskInstanceState.DEFERRED,
-        TaskInstanceState.SENSING,
-        TaskInstanceState.RUNNING,
-        TaskInstanceState.SHUTDOWN,
-        TaskInstanceState.RESTARTING,
-        TaskInstanceState.REMOVED,
-        TaskInstanceState.SUCCESS,
-        TaskInstanceState.SKIPPED,
-    ]
-
-    mapped_states = [ti.state for ti in task_instances]
-
-    group_state = None
-    for state in priority:
-        if state in mapped_states:
-            group_state = state
-            break
-
-    group_start_date = datetime_to_string(
-        min((ti.start_date for ti in task_instances if ti.start_date), default=None)
-    )
-    group_end_date = datetime_to_string(
-        max((ti.end_date for ti in task_instances if ti.end_date), default=None)
-    )
-
-    return {
-        'task_id': parent_instance.task_id,
-        'run_id': parent_instance.run_id,
-        'state': group_state,
-        'start_date': group_start_date,
-        'end_date': group_end_date,
-        'mapped_states': mapped_states,
-        'operator': parent_instance.operator,
-        'execution_date': datetime_to_string(parent_instance.execution_date),
-        'try_number': parent_instance.try_number,
-    }
-
-
 def encode_ti(
-    task_instance: Optional[TaskInstance], is_mapped: Optional[bool], session: Optional[Session]
+    task_instance: Optional[TaskInstance], is_mapped: Optional[bool], session: Session
 ) -> Optional[Dict[str, Any]]:
     if not task_instance:
         return None
 
-    if is_mapped:
-        return get_mapped_summary(task_instance, task_instances=get_mapped_instances(task_instance, session))
-
-    return {
+    summary = {
         'task_id': task_instance.task_id,
         'dag_id': task_instance.dag_id,
         'run_id': task_instance.run_id,
@@ -144,6 +74,66 @@ def encode_ti(
         'try_number': task_instance.try_number,
     }
 
+    def get_mapped_summary(task_instances):
+        priority = [
+            'failed',
+            'upstream_failed',
+            'up_for_retry',
+            'up_for_reschedule',
+            'queued',
+            'scheduled',
+            'deferred',
+            'sensing',
+            'running',
+            'shutdown',
+            'restarting',
+            'removed',
+            'no_status',
+            'success',
+            'skipped',
+        ]
+
+        mapped_states = [ti.state for ti in task_instances]
+
+        group_state = None
+        for state in priority:
+            if state in mapped_states:
+                group_state = state
+                break
+
+        group_start_date = datetime_to_string(
+            min((ti.start_date for ti in task_instances if ti.start_date), default=None)
+        )
+        group_end_date = datetime_to_string(
+            max((ti.end_date for ti in task_instances if ti.end_date), default=None)
+        )
+
+        return {
+            'task_id': task_instance.task_id,
+            'run_id': task_instance.run_id,
+            'state': group_state,
+            'start_date': group_start_date,
+            'end_date': group_end_date,
+            'mapped_states': mapped_states,
+            'operator': task_instance.operator,
+            'execution_date': datetime_to_string(task_instance.execution_date),
+            'try_number': task_instance.try_number,
+        }
+
+    if is_mapped:
+        return get_mapped_summary(
+            session.query(TaskInstance)
+            .filter(
+                TaskInstance.dag_id == task_instance.dag_id,
+                TaskInstance.run_id == task_instance.run_id,
+                TaskInstance.task_id == task_instance.task_id,
+                TaskInstance.map_index >= 0,
+            )
+            .all()
+        )
+
+    return summary
+
 
 def encode_dag_run(dag_run: Optional[models.DagRun]) -> Optional[Dict[str, Any]]:
     if not dag_run: