You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bb...@apache.org on 2022/02/25 16:59:16 UTC
[airflow] 03/05: make UI and tree work with mapped tasks
This is an automated email from the ASF dual-hosted git repository.
bbovenzi pushed a commit to branch mapped-task-drawer
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 911aaeb4716a0c8964e63212247a41b945381dea
Author: Brent Bovenzi <br...@gmail.com>
AuthorDate: Wed Feb 16 11:27:29 2022 -0500
make UI and tree work with mapped tasks
---
airflow/www/static/js/tree/InstanceTooltip.jsx | 64 ++++++++++++++++++------
airflow/www/static/js/tree/renderTaskRows.jsx | 7 ++-
airflow/www/utils.py | 68 +++++++++++++++++++++++++-
airflow/www/views.py | 28 ++++++-----
4 files changed, 138 insertions(+), 29 deletions(-)
diff --git a/airflow/www/static/js/tree/InstanceTooltip.jsx b/airflow/www/static/js/tree/InstanceTooltip.jsx
index a1ef192..612905b 100644
--- a/airflow/www/static/js/tree/InstanceTooltip.jsx
+++ b/airflow/www/static/js/tree/InstanceTooltip.jsx
@@ -24,30 +24,33 @@ import { Box, Text } from '@chakra-ui/react';
import { formatDateTime, getDuration, formatDuration } from '../datetime_utils';
+const STATES = [
+ ['success', 0],
+ ['failed', 0],
+ ['upstream_failed', 0],
+ ['up_for_retry', 0],
+ ['up_for_reschedule', 0],
+ ['running', 0],
+ ['deferred', 0],
+ ['sensing', 0],
+ ['queued', 0],
+ ['scheduled', 0],
+ ['skipped', 0],
+ ['no_status', 0],
+];
+
const InstanceTooltip = ({
group,
instance: {
- duration, operator, startDate, endDate, state, taskId, runId,
+ duration, operator, startDate, endDate, state, taskId, runId, mappedStates,
},
}) => {
const isGroup = !!group.children;
const groupSummary = [];
+ const mapSummary = [];
if (isGroup) {
- const numMap = new Map([
- ['success', 0],
- ['failed', 0],
- ['upstream_failed', 0],
- ['up_for_retry', 0],
- ['up_for_reschedule', 0],
- ['running', 0],
- ['deferred', 0],
- ['sensing', 0],
- ['queued', 0],
- ['scheduled', 0],
- ['skipped', 0],
- ['no_status', 0],
- ]);
+ const numMap = new Map(STATES);
group.children.forEach((child) => {
const taskInstance = child.instances.find((ti) => ti.runId === runId);
if (taskInstance) {
@@ -69,6 +72,26 @@ const InstanceTooltip = ({
});
}
+ if (group.isMapped && mappedStates) {
+ const numMap = new Map(STATES);
+ mappedStates.forEach((s) => {
+ const stateKey = s || 'no_status';
+ if (numMap.has(stateKey)) numMap.set(stateKey, numMap.get(stateKey) + 1);
+ });
+ numMap.forEach((key, val) => {
+ if (key > 0) {
+ mapSummary.push(
+ // eslint-disable-next-line react/no-array-index-key
+ <Text key={val} ml="10px">
+ {val}
+ {': '}
+ {key}
+ </Text>,
+ );
+ }
+ });
+ }
+
const taskIdTitle = isGroup ? 'Task Group Id: ' : 'Task Id: ';
return (
@@ -88,6 +111,17 @@ const InstanceTooltip = ({
{groupSummary}
</>
)}
+ {group.isMapped && (
+ <>
+ <br />
+ <Text as="strong">
+ {mappedStates.length}
+ {' '}
+ Tasks Mapped
+ </Text>
+ {mapSummary}
+ </>
+ )}
<br />
<Text>
{taskIdTitle}
diff --git a/airflow/www/static/js/tree/renderTaskRows.jsx b/airflow/www/static/js/tree/renderTaskRows.jsx
index 8dcd88d..224885b 100644
--- a/airflow/www/static/js/tree/renderTaskRows.jsx
+++ b/airflow/www/static/js/tree/renderTaskRows.jsx
@@ -53,7 +53,7 @@ const renderTaskRows = ({
));
const TaskName = ({
- isGroup, onToggle, isOpen, level, taskName,
+ isGroup = false, isMapped = false, onToggle, isOpen, level, taskName,
}) => (
<Box _groupHover={{ backgroundColor: 'rgba(113, 128, 150, 0.1)' }} transition="background-color 0.2s">
<Flex
@@ -74,6 +74,9 @@ const TaskName = ({
isTruncated
>
{taskName}
+ {isMapped && (
+ ' [ ]'
+ )}
</Text>
{isGroup && (
isOpen ? <FiChevronDown data-testid="open-group" /> : <FiChevronUp data-testid="closed-group" />
@@ -107,6 +110,7 @@ const Row = ({
}) => {
const { data: { dagRuns = [] } } = useTreeData();
const isGroup = !!task.children;
+
const taskName = prevTaskId ? task.id.replace(`${prevTaskId}.`, '') : task.id;
const storageKey = `${dagId}-open-groups`;
@@ -148,6 +152,7 @@ const Row = ({
<TaskName
onToggle={onToggle}
isGroup={isGroup}
+ isMapped={task.isMapped}
taskName={taskName}
isOpen={isOpen}
level={level}
diff --git a/airflow/www/utils.py b/airflow/www/utils.py
index d9ec10f..ed765d4 100644
--- a/airflow/www/utils.py
+++ b/airflow/www/utils.py
@@ -36,9 +36,11 @@ from pendulum.datetime import DateTime
from pygments import highlight, lexers
from pygments.formatters import HtmlFormatter
from sqlalchemy.ext.associationproxy import AssociationProxy
+from sqlalchemy.orm import Session
from airflow import models
from airflow.models import errors
+from airflow.models.taskinstance import TaskInstance
from airflow.utils import timezone
from airflow.utils.code_utils import get_python_source
from airflow.utils.json import AirflowJsonEncoder
@@ -53,11 +55,13 @@ def datetime_to_string(value: Optional[DateTime]) -> Optional[str]:
return value.isoformat()
-def encode_ti(task_instance: Optional[models.TaskInstance]) -> Optional[Dict[str, Any]]:
+def encode_ti(
+ task_instance: Optional[TaskInstance], is_mapped: Optional[bool], session: Session
+) -> Optional[Dict[str, Any]]:
if not task_instance:
return None
- return {
+ summary = {
'task_id': task_instance.task_id,
'dag_id': task_instance.dag_id,
'run_id': task_instance.run_id,
@@ -70,6 +74,66 @@ def encode_ti(task_instance: Optional[models.TaskInstance]) -> Optional[Dict[str
'try_number': task_instance.try_number,
}
+ def get_mapped_summary(task_instances):
+ priority = [
+ 'failed',
+ 'upstream_failed',
+ 'up_for_retry',
+ 'up_for_reschedule',
+ 'queued',
+ 'scheduled',
+ 'deferred',
+ 'sensing',
+ 'running',
+ 'shutdown',
+ 'restarting',
+ 'removed',
+ 'no_status',
+ 'success',
+ 'skipped',
+ ]
+
+ mapped_states = [ti.state for ti in task_instances]
+
+ group_state = None
+ for state in priority:
+ if state in mapped_states:
+ group_state = state
+ break
+
+ group_start_date = datetime_to_string(
+ min((ti.start_date for ti in task_instances if ti.start_date), default=None)
+ )
+ group_end_date = datetime_to_string(
+ max((ti.end_date for ti in task_instances if ti.end_date), default=None)
+ )
+
+ return {
+ 'task_id': task_instance.task_id,
+ 'run_id': task_instance.run_id,
+ 'state': group_state,
+ 'start_date': group_start_date,
+ 'end_date': group_end_date,
+ 'mapped_states': mapped_states,
+ 'operator': task_instance.operator,
+ 'execution_date': datetime_to_string(task_instance.execution_date),
+ 'try_number': task_instance.try_number,
+ }
+
+ if is_mapped:
+ return get_mapped_summary(
+ session.query(TaskInstance)
+ .filter(
+ TaskInstance.dag_id == task_instance.dag_id,
+ TaskInstance.run_id == task_instance.run_id,
+ TaskInstance.task_id == task_instance.task_id,
+ TaskInstance.map_index >= 0,
+ )
+ .all()
+ )
+
+ return summary
+
def encode_dag_run(dag_run: Optional[models.DagRun]) -> Optional[Dict[str, Any]]:
if not dag_run:
diff --git a/airflow/www/views.py b/airflow/www/views.py
index c0f9cb4..6d27f97 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -99,7 +99,7 @@ from airflow.jobs.base_job import BaseJob
from airflow.jobs.scheduler_job import SchedulerJob
from airflow.jobs.triggerer_job import TriggererJob
from airflow.models import DAG, Connection, DagModel, DagTag, Log, SlaMiss, TaskFail, XCom, errors
-from airflow.models.baseoperator import BaseOperator
+from airflow.models.abstractoperator import AbstractOperator
from airflow.models.dagcode import DagCode
from airflow.models.dagrun import DagRun, DagRunType
from airflow.models.serialized_dag import SerializedDagModel
@@ -223,23 +223,30 @@ def get_date_time_num_runs_dag_runs_form_data(www_request, session, dag):
}
-def task_group_to_tree(task_item_or_group, dag, dag_runs, tis):
+def task_group_to_tree(task_item_or_group, dag, dag_runs, tis, session):
"""
Create a nested dict representation of this TaskGroup and its children used to construct
the Graph.
"""
- if isinstance(task_item_or_group, BaseOperator):
+ if isinstance(task_item_or_group, AbstractOperator):
return {
'id': task_item_or_group.task_id,
- 'instances': [wwwutils.encode_ti(ti) for ti in tis if ti.task_id == task_item_or_group.task_id],
+ 'instances': [
+ wwwutils.encode_ti(ti, task_item_or_group.is_mapped, session)
+ for ti in tis
+ if ti.task_id == task_item_or_group.task_id
+ ],
'label': task_item_or_group.label,
- 'extra_links': task_item_or_group.extra_links,
+ 'extra_links': [],
+ 'is_mapped': task_item_or_group.is_mapped,
}
# Task Group
task_group = task_item_or_group
- children = [task_group_to_tree(child, dag, dag_runs, tis) for child in task_group.children.values()]
+ children = [
+ task_group_to_tree(child, dag, dag_runs, tis, session) for child in task_group.children.values()
+ ]
def get_summary(dag_run, children):
priority = [
@@ -307,7 +314,7 @@ def task_group_to_dict(task_item_or_group):
Create a nested dict representation of this TaskGroup and its children used to construct
the Graph.
"""
- if isinstance(task_item_or_group, BaseOperator):
+ if isinstance(task_item_or_group, AbstractOperator):
return {
'id': task_item_or_group.task_id,
'value': {
@@ -423,7 +430,7 @@ def dag_edges(dag):
def collect_edges(task_group):
"""Update edges_to_add and edges_to_skip according to TaskGroups."""
- if isinstance(task_group, BaseOperator):
+ if isinstance(task_group, AbstractOperator):
return
for target_id in task_group.downstream_group_ids:
@@ -2465,7 +2472,7 @@ class Airflow(AirflowBaseView):
tis = dag.get_task_instances(start_date=min_date, end_date=base_date, session=session)
data = {
- 'groups': task_group_to_tree(dag.task_group, dag, dag_runs, tis),
+ 'groups': task_group_to_tree(dag.task_group, dag, dag_runs, tis, session),
'dag_runs': encoded_runs,
}
@@ -3246,7 +3253,6 @@ class Airflow(AirflowBaseView):
(permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
]
)
- @action_logging
def tree_data(self):
"""Returns tree data"""
dag_id = request.args.get('dag_id')
@@ -3284,7 +3290,7 @@ class Airflow(AirflowBaseView):
min_date = min(dag_run_dates, default=None)
tis = dag.get_task_instances(start_date=min_date, end_date=base_date, session=session)
data = {
- 'groups': task_group_to_tree(dag.task_group, dag, dag_runs, tis),
+ 'groups': task_group_to_tree(dag.task_group, dag, dag_runs, tis, session),
'dag_runs': encoded_runs,
}