You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bb...@apache.org on 2022/02/25 16:59:16 UTC

[airflow] 03/05: make UI and tree work with mapped tasks

This is an automated email from the ASF dual-hosted git repository.

bbovenzi pushed a commit to branch mapped-task-drawer
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 911aaeb4716a0c8964e63212247a41b945381dea
Author: Brent Bovenzi <br...@gmail.com>
AuthorDate: Wed Feb 16 11:27:29 2022 -0500

    make UI and tree work with mapped tasks
---
 airflow/www/static/js/tree/InstanceTooltip.jsx | 64 ++++++++++++++++++------
 airflow/www/static/js/tree/renderTaskRows.jsx  |  7 ++-
 airflow/www/utils.py                           | 68 +++++++++++++++++++++++++-
 airflow/www/views.py                           | 28 ++++++-----
 4 files changed, 138 insertions(+), 29 deletions(-)

diff --git a/airflow/www/static/js/tree/InstanceTooltip.jsx b/airflow/www/static/js/tree/InstanceTooltip.jsx
index a1ef192..612905b 100644
--- a/airflow/www/static/js/tree/InstanceTooltip.jsx
+++ b/airflow/www/static/js/tree/InstanceTooltip.jsx
@@ -24,30 +24,33 @@ import { Box, Text } from '@chakra-ui/react';
 
 import { formatDateTime, getDuration, formatDuration } from '../datetime_utils';
 
+const STATES = [
+  ['success', 0],
+  ['failed', 0],
+  ['upstream_failed', 0],
+  ['up_for_retry', 0],
+  ['up_for_reschedule', 0],
+  ['running', 0],
+  ['deferred', 0],
+  ['sensing', 0],
+  ['queued', 0],
+  ['scheduled', 0],
+  ['skipped', 0],
+  ['no_status', 0],
+];
+
 const InstanceTooltip = ({
   group,
   instance: {
-    duration, operator, startDate, endDate, state, taskId, runId,
+    duration, operator, startDate, endDate, state, taskId, runId, mappedStates,
   },
 }) => {
   const isGroup = !!group.children;
   const groupSummary = [];
+  const mapSummary = [];
 
   if (isGroup) {
-    const numMap = new Map([
-      ['success', 0],
-      ['failed', 0],
-      ['upstream_failed', 0],
-      ['up_for_retry', 0],
-      ['up_for_reschedule', 0],
-      ['running', 0],
-      ['deferred', 0],
-      ['sensing', 0],
-      ['queued', 0],
-      ['scheduled', 0],
-      ['skipped', 0],
-      ['no_status', 0],
-    ]);
+    const numMap = new Map(STATES);
     group.children.forEach((child) => {
       const taskInstance = child.instances.find((ti) => ti.runId === runId);
       if (taskInstance) {
@@ -69,6 +72,26 @@ const InstanceTooltip = ({
     });
   }
 
+  if (group.isMapped && mappedStates) {
+    const numMap = new Map(STATES);
+    mappedStates.forEach((s) => {
+      const stateKey = s || 'no_status';
+      if (numMap.has(stateKey)) numMap.set(stateKey, numMap.get(stateKey) + 1);
+    });
+    numMap.forEach((key, val) => {
+      if (key > 0) {
+        mapSummary.push(
+          // eslint-disable-next-line react/no-array-index-key
+          <Text key={val} ml="10px">
+            {val}
+            {': '}
+            {key}
+          </Text>,
+        );
+      }
+    });
+  }
+
   const taskIdTitle = isGroup ? 'Task Group Id: ' : 'Task Id: ';
 
   return (
@@ -88,6 +111,17 @@ const InstanceTooltip = ({
           {groupSummary}
         </>
       )}
+      {group.isMapped && (
+        <>
+          <br />
+          <Text as="strong">
+            {mappedStates.length}
+            {' '}
+            Tasks Mapped
+          </Text>
+          {mapSummary}
+        </>
+      )}
       <br />
       <Text>
         {taskIdTitle}
diff --git a/airflow/www/static/js/tree/renderTaskRows.jsx b/airflow/www/static/js/tree/renderTaskRows.jsx
index 8dcd88d..224885b 100644
--- a/airflow/www/static/js/tree/renderTaskRows.jsx
+++ b/airflow/www/static/js/tree/renderTaskRows.jsx
@@ -53,7 +53,7 @@ const renderTaskRows = ({
 ));
 
 const TaskName = ({
-  isGroup, onToggle, isOpen, level, taskName,
+  isGroup = false, isMapped = false, onToggle, isOpen, level, taskName,
 }) => (
   <Box _groupHover={{ backgroundColor: 'rgba(113, 128, 150, 0.1)' }} transition="background-color 0.2s">
     <Flex
@@ -74,6 +74,9 @@ const TaskName = ({
         isTruncated
       >
         {taskName}
+        {isMapped && (
+          ' [ ]'
+        )}
       </Text>
       {isGroup && (
         isOpen ? <FiChevronDown data-testid="open-group" /> : <FiChevronUp data-testid="closed-group" />
@@ -107,6 +110,7 @@ const Row = ({
 }) => {
   const { data: { dagRuns = [] } } = useTreeData();
   const isGroup = !!task.children;
+
   const taskName = prevTaskId ? task.id.replace(`${prevTaskId}.`, '') : task.id;
 
   const storageKey = `${dagId}-open-groups`;
@@ -148,6 +152,7 @@ const Row = ({
             <TaskName
               onToggle={onToggle}
               isGroup={isGroup}
+              isMapped={task.isMapped}
               taskName={taskName}
               isOpen={isOpen}
               level={level}
diff --git a/airflow/www/utils.py b/airflow/www/utils.py
index d9ec10f..ed765d4 100644
--- a/airflow/www/utils.py
+++ b/airflow/www/utils.py
@@ -36,9 +36,11 @@ from pendulum.datetime import DateTime
 from pygments import highlight, lexers
 from pygments.formatters import HtmlFormatter
 from sqlalchemy.ext.associationproxy import AssociationProxy
+from sqlalchemy.orm import Session
 
 from airflow import models
 from airflow.models import errors
+from airflow.models.taskinstance import TaskInstance
 from airflow.utils import timezone
 from airflow.utils.code_utils import get_python_source
 from airflow.utils.json import AirflowJsonEncoder
@@ -53,11 +55,13 @@ def datetime_to_string(value: Optional[DateTime]) -> Optional[str]:
     return value.isoformat()
 
 
-def encode_ti(task_instance: Optional[models.TaskInstance]) -> Optional[Dict[str, Any]]:
+def encode_ti(
+    task_instance: Optional[TaskInstance], is_mapped: Optional[bool], session: Session
+) -> Optional[Dict[str, Any]]:
     if not task_instance:
         return None
 
-    return {
+    summary = {
         'task_id': task_instance.task_id,
         'dag_id': task_instance.dag_id,
         'run_id': task_instance.run_id,
@@ -70,6 +74,66 @@ def encode_ti(task_instance: Optional[models.TaskInstance]) -> Optional[Dict[str
         'try_number': task_instance.try_number,
     }
 
+    def get_mapped_summary(task_instances):
+        priority = [
+            'failed',
+            'upstream_failed',
+            'up_for_retry',
+            'up_for_reschedule',
+            'queued',
+            'scheduled',
+            'deferred',
+            'sensing',
+            'running',
+            'shutdown',
+            'restarting',
+            'removed',
+            'no_status',
+            'success',
+            'skipped',
+        ]
+
+        mapped_states = [ti.state for ti in task_instances]
+
+        group_state = None
+        for state in priority:
+            if state in mapped_states:
+                group_state = state
+                break
+
+        group_start_date = datetime_to_string(
+            min((ti.start_date for ti in task_instances if ti.start_date), default=None)
+        )
+        group_end_date = datetime_to_string(
+            max((ti.end_date for ti in task_instances if ti.end_date), default=None)
+        )
+
+        return {
+            'task_id': task_instance.task_id,
+            'run_id': task_instance.run_id,
+            'state': group_state,
+            'start_date': group_start_date,
+            'end_date': group_end_date,
+            'mapped_states': mapped_states,
+            'operator': task_instance.operator,
+            'execution_date': datetime_to_string(task_instance.execution_date),
+            'try_number': task_instance.try_number,
+        }
+
+    if is_mapped:
+        return get_mapped_summary(
+            session.query(TaskInstance)
+            .filter(
+                TaskInstance.dag_id == task_instance.dag_id,
+                TaskInstance.run_id == task_instance.run_id,
+                TaskInstance.task_id == task_instance.task_id,
+                TaskInstance.map_index >= 0,
+            )
+            .all()
+        )
+
+    return summary
+
 
 def encode_dag_run(dag_run: Optional[models.DagRun]) -> Optional[Dict[str, Any]]:
     if not dag_run:
diff --git a/airflow/www/views.py b/airflow/www/views.py
index c0f9cb4..6d27f97 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -99,7 +99,7 @@ from airflow.jobs.base_job import BaseJob
 from airflow.jobs.scheduler_job import SchedulerJob
 from airflow.jobs.triggerer_job import TriggererJob
 from airflow.models import DAG, Connection, DagModel, DagTag, Log, SlaMiss, TaskFail, XCom, errors
-from airflow.models.baseoperator import BaseOperator
+from airflow.models.abstractoperator import AbstractOperator
 from airflow.models.dagcode import DagCode
 from airflow.models.dagrun import DagRun, DagRunType
 from airflow.models.serialized_dag import SerializedDagModel
@@ -223,23 +223,30 @@ def get_date_time_num_runs_dag_runs_form_data(www_request, session, dag):
     }
 
 
-def task_group_to_tree(task_item_or_group, dag, dag_runs, tis):
+def task_group_to_tree(task_item_or_group, dag, dag_runs, tis, session):
     """
     Create a nested dict representation of this TaskGroup and its children used to construct
     the Graph.
     """
-    if isinstance(task_item_or_group, BaseOperator):
+    if isinstance(task_item_or_group, AbstractOperator):
         return {
             'id': task_item_or_group.task_id,
-            'instances': [wwwutils.encode_ti(ti) for ti in tis if ti.task_id == task_item_or_group.task_id],
+            'instances': [
+                wwwutils.encode_ti(ti, task_item_or_group.is_mapped, session)
+                for ti in tis
+                if ti.task_id == task_item_or_group.task_id
+            ],
             'label': task_item_or_group.label,
-            'extra_links': task_item_or_group.extra_links,
+            'extra_links': [],
+            'is_mapped': task_item_or_group.is_mapped,
         }
 
     # Task Group
     task_group = task_item_or_group
 
-    children = [task_group_to_tree(child, dag, dag_runs, tis) for child in task_group.children.values()]
+    children = [
+        task_group_to_tree(child, dag, dag_runs, tis, session) for child in task_group.children.values()
+    ]
 
     def get_summary(dag_run, children):
         priority = [
@@ -307,7 +314,7 @@ def task_group_to_dict(task_item_or_group):
     Create a nested dict representation of this TaskGroup and its children used to construct
     the Graph.
     """
-    if isinstance(task_item_or_group, BaseOperator):
+    if isinstance(task_item_or_group, AbstractOperator):
         return {
             'id': task_item_or_group.task_id,
             'value': {
@@ -423,7 +430,7 @@ def dag_edges(dag):
 
     def collect_edges(task_group):
         """Update edges_to_add and edges_to_skip according to TaskGroups."""
-        if isinstance(task_group, BaseOperator):
+        if isinstance(task_group, AbstractOperator):
             return
 
         for target_id in task_group.downstream_group_ids:
@@ -2465,7 +2472,7 @@ class Airflow(AirflowBaseView):
         tis = dag.get_task_instances(start_date=min_date, end_date=base_date, session=session)
 
         data = {
-            'groups': task_group_to_tree(dag.task_group, dag, dag_runs, tis),
+            'groups': task_group_to_tree(dag.task_group, dag, dag_runs, tis, session),
             'dag_runs': encoded_runs,
         }
 
@@ -3246,7 +3253,6 @@ class Airflow(AirflowBaseView):
             (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
         ]
     )
-    @action_logging
     def tree_data(self):
         """Returns tree data"""
         dag_id = request.args.get('dag_id')
@@ -3284,7 +3290,7 @@ class Airflow(AirflowBaseView):
             min_date = min(dag_run_dates, default=None)
             tis = dag.get_task_instances(start_date=min_date, end_date=base_date, session=session)
             data = {
-                'groups': task_group_to_tree(dag.task_group, dag, dag_runs, tis),
+                'groups': task_group_to_tree(dag.task_group, dag, dag_runs, tis, session),
                 'dag_runs': encoded_runs,
             }