You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bb...@apache.org on 2022/03/02 00:47:02 UTC
[airflow] 01/08: make UI and tree work with mapped tasks
This is an automated email from the ASF dual-hosted git repository.
bbovenzi pushed a commit to branch mapped-task-drawer
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit adfc609476992318a2068dd20ab62f1a0591b806
Author: Brent Bovenzi <br...@gmail.com>
AuthorDate: Wed Feb 16 11:27:29 2022 -0500
make UI and tree work with mapped tasks
---
airflow/www/static/js/tree/InstanceTooltip.jsx | 15 +++
airflow/www/utils.py | 136 ++++++++++++-------------
2 files changed, 78 insertions(+), 73 deletions(-)
diff --git a/airflow/www/static/js/tree/InstanceTooltip.jsx b/airflow/www/static/js/tree/InstanceTooltip.jsx
index e22189d..bc0d58c 100644
--- a/airflow/www/static/js/tree/InstanceTooltip.jsx
+++ b/airflow/www/static/js/tree/InstanceTooltip.jsx
@@ -25,6 +25,21 @@ import { Box, Text } from '@chakra-ui/react';
import { formatDateTime, getDuration, formatDuration } from '../datetime_utils';
import { finalStatesMap } from '../utils';
+const STATES = [
+ ['success', 0],
+ ['failed', 0],
+ ['upstream_failed', 0],
+ ['up_for_retry', 0],
+ ['up_for_reschedule', 0],
+ ['running', 0],
+ ['deferred', 0],
+ ['sensing', 0],
+ ['queued', 0],
+ ['scheduled', 0],
+ ['skipped', 0],
+ ['no_status', 0],
+];
+
const InstanceTooltip = ({
group,
instance: {
diff --git a/airflow/www/utils.py b/airflow/www/utils.py
index b1f5e0d..ed765d4 100644
--- a/airflow/www/utils.py
+++ b/airflow/www/utils.py
@@ -43,9 +43,8 @@ from airflow.models import errors
from airflow.models.taskinstance import TaskInstance
from airflow.utils import timezone
from airflow.utils.code_utils import get_python_source
-from airflow.utils.helpers import alchemy_to_dict
from airflow.utils.json import AirflowJsonEncoder
-from airflow.utils.state import State, TaskInstanceState
+from airflow.utils.state import State
from airflow.www.forms import DateTimeWithTimezoneField
from airflow.www.widgets import AirflowDateTimePickerWidget
@@ -56,82 +55,13 @@ def datetime_to_string(value: Optional[DateTime]) -> Optional[str]:
return value.isoformat()
-def get_mapped_instances(task_instance, session):
- return (
- session.query(TaskInstance)
- .filter(
- TaskInstance.dag_id == task_instance.dag_id,
- TaskInstance.run_id == task_instance.run_id,
- TaskInstance.task_id == task_instance.task_id,
- TaskInstance.map_index >= 0,
- )
- .all()
- )
-
-
-def get_instance_with_map(task_instance, session):
- if task_instance.map_index == -1:
- return alchemy_to_dict(task_instance)
- mapped_instances = get_mapped_instances(task_instance, session)
- return get_mapped_summary(task_instance, mapped_instances)
-
-
-def get_mapped_summary(parent_instance, task_instances):
- priority = [
- TaskInstanceState.FAILED,
- TaskInstanceState.UPSTREAM_FAILED,
- TaskInstanceState.UP_FOR_RETRY,
- TaskInstanceState.UP_FOR_RESCHEDULE,
- TaskInstanceState.QUEUED,
- TaskInstanceState.SCHEDULED,
- TaskInstanceState.DEFERRED,
- TaskInstanceState.SENSING,
- TaskInstanceState.RUNNING,
- TaskInstanceState.SHUTDOWN,
- TaskInstanceState.RESTARTING,
- TaskInstanceState.REMOVED,
- TaskInstanceState.SUCCESS,
- TaskInstanceState.SKIPPED,
- ]
-
- mapped_states = [ti.state for ti in task_instances]
-
- group_state = None
- for state in priority:
- if state in mapped_states:
- group_state = state
- break
-
- group_start_date = datetime_to_string(
- min((ti.start_date for ti in task_instances if ti.start_date), default=None)
- )
- group_end_date = datetime_to_string(
- max((ti.end_date for ti in task_instances if ti.end_date), default=None)
- )
-
- return {
- 'task_id': parent_instance.task_id,
- 'run_id': parent_instance.run_id,
- 'state': group_state,
- 'start_date': group_start_date,
- 'end_date': group_end_date,
- 'mapped_states': mapped_states,
- 'operator': parent_instance.operator,
- 'execution_date': datetime_to_string(parent_instance.execution_date),
- 'try_number': parent_instance.try_number,
- }
-
-
def encode_ti(
- task_instance: Optional[TaskInstance], is_mapped: Optional[bool], session: Optional[Session]
+ task_instance: Optional[TaskInstance], is_mapped: Optional[bool], session: Session
) -> Optional[Dict[str, Any]]:
if not task_instance:
return None
- if is_mapped:
- return get_mapped_summary(task_instance, task_instances=get_mapped_instances(task_instance, session))
-
- return {
+ summary = {
'task_id': task_instance.task_id,
'dag_id': task_instance.dag_id,
'run_id': task_instance.run_id,
@@ -144,6 +74,66 @@ def encode_ti(
'try_number': task_instance.try_number,
}
+ def get_mapped_summary(task_instances):
+ priority = [
+ 'failed',
+ 'upstream_failed',
+ 'up_for_retry',
+ 'up_for_reschedule',
+ 'queued',
+ 'scheduled',
+ 'deferred',
+ 'sensing',
+ 'running',
+ 'shutdown',
+ 'restarting',
+ 'removed',
+ 'no_status',
+ 'success',
+ 'skipped',
+ ]
+
+ mapped_states = [ti.state for ti in task_instances]
+
+ group_state = None
+ for state in priority:
+ if state in mapped_states:
+ group_state = state
+ break
+
+ group_start_date = datetime_to_string(
+ min((ti.start_date for ti in task_instances if ti.start_date), default=None)
+ )
+ group_end_date = datetime_to_string(
+ max((ti.end_date for ti in task_instances if ti.end_date), default=None)
+ )
+
+ return {
+ 'task_id': task_instance.task_id,
+ 'run_id': task_instance.run_id,
+ 'state': group_state,
+ 'start_date': group_start_date,
+ 'end_date': group_end_date,
+ 'mapped_states': mapped_states,
+ 'operator': task_instance.operator,
+ 'execution_date': datetime_to_string(task_instance.execution_date),
+ 'try_number': task_instance.try_number,
+ }
+
+ if is_mapped:
+ return get_mapped_summary(
+ session.query(TaskInstance)
+ .filter(
+ TaskInstance.dag_id == task_instance.dag_id,
+ TaskInstance.run_id == task_instance.run_id,
+ TaskInstance.task_id == task_instance.task_id,
+ TaskInstance.map_index >= 0,
+ )
+ .all()
+ )
+
+ return summary
+
def encode_dag_run(dag_run: Optional[models.DagRun]) -> Optional[Dict[str, Any]]:
if not dag_run: