You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bb...@apache.org on 2022/03/08 20:57:18 UTC
[airflow] 09/11: make side panel collapsible, useTasks,
This is an automated email from the ASF dual-hosted git repository.
bbovenzi pushed a commit to branch mapped-task-drawer
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit e106d1721b9340ff031a4793d3b485265d33edba
Author: Brent Bovenzi <br...@gmail.com>
AuthorDate: Wed Mar 2 14:23:10 2022 -0500
make side panel collapsible, useTasks,
---
airflow/www/static/js/tree/StatusBox.jsx | 3 +-
airflow/www/static/js/tree/Tree.jsx | 126 +++++++++++++------
airflow/www/static/js/tree/api/useDag.js | 2 +-
airflow/www/static/js/tree/dagRuns/Bar.jsx | 5 +-
airflow/www/static/js/tree/details/Header.jsx | 3 +-
airflow/www/static/js/tree/details/content/Dag.jsx | 27 +++-
airflow/www/static/js/tree/details/index.jsx | 1 -
airflow/www/static/js/tree/renderTaskRows.jsx | 8 +-
airflow/www/utils.py | 136 +++++++++++----------
9 files changed, 196 insertions(+), 115 deletions(-)
diff --git a/airflow/www/static/js/tree/StatusBox.jsx b/airflow/www/static/js/tree/StatusBox.jsx
index 62762c5..0f5294f 100644
--- a/airflow/www/static/js/tree/StatusBox.jsx
+++ b/airflow/www/static/js/tree/StatusBox.jsx
@@ -48,8 +48,7 @@ const StatusBox = ({
.forEach((e) => { e.style.backgroundColor = null; });
};
- const onClick = (e) => {
- e.stopPropagation();
+ const onClick = () => {
onMouseLeave();
onSelect({
taskId, runId, instance, task: group,
diff --git a/airflow/www/static/js/tree/Tree.jsx b/airflow/www/static/js/tree/Tree.jsx
index aeeaeb5..0d84639 100644
--- a/airflow/www/static/js/tree/Tree.jsx
+++ b/airflow/www/static/js/tree/Tree.jsx
@@ -17,6 +17,8 @@
* under the License.
*/
+/* global localStorage */
+
import React, { useRef, useEffect, useState } from 'react';
import {
Table,
@@ -29,18 +31,32 @@ import {
Text,
Thead,
Flex,
+ useDisclosure,
+ IconButton,
} from '@chakra-ui/react';
+import { MdArrowForward, MdArrowBack } from 'react-icons/md';
import useTreeData from './useTreeData';
import renderTaskRows from './renderTaskRows';
import DagRuns from './dagRuns';
import Details from './details';
+import { callModal, callModalDag } from '../dag';
+
+const sidePanelKey = 'showSidePanel';
const Tree = () => {
const containerRef = useRef();
const scrollRef = useRef();
const { data: { groups = {}, dagRuns = [] }, isRefreshOn, onToggleRefresh } = useTreeData();
- const [selected, setSelected] = useState({});
+ const [selected, setSelected] = useState({}); // selected task instance or dag run
+ const isPanelOpen = JSON.parse(localStorage.getItem(sidePanelKey));
+ const { isOpen, onToggle } = useDisclosure({ defaultIsOpen: isPanelOpen });
+
+ const toggleSidePanel = () => {
+ if (!isOpen) localStorage.setItem(sidePanelKey, true);
+ else localStorage.setItem(sidePanelKey, false);
+ onToggle();
+ };
const dagRunIds = dagRuns.map((dr) => dr.runId);
@@ -48,46 +64,88 @@ const Tree = () => {
// Set initial scroll to far right if it is scrollable
const runsContainer = scrollRef.current;
if (runsContainer && runsContainer.scrollWidth > runsContainer.clientWidth) {
- runsContainer.scrollBy(runsContainer.clientWidth, 0);
+ runsContainer.scrollBy(runsContainer.clientWidth, 250);
}
- }, []);
+ }, [isOpen]); // isOpen is to redo the scroll when the side panel opens/closes
const { runId, taskId } = selected;
- const onSelect = (newInstance) => (
- (newInstance.runId === runId && newInstance.taskId === taskId)
- ? setSelected({})
- : setSelected(newInstance)
- );
+
+ // show task/run info in the side panel, or just call the regular action modal
+ const onSelect = (newSelected) => {
+ if (isOpen) {
+ const isSame = newSelected.runId === runId && newSelected.taskId === taskId;
+ setSelected(isSame ? {} : newSelected);
+ } else if (!isOpen) {
+ if (newSelected.dagRun) {
+ const { dagRun } = newSelected;
+ callModalDag({
+ execution_date: dagRun.executionDate,
+ dag_id: dagRun.dagId,
+ run_id: dagRun.runId,
+ });
+ } else if (newSelected.instance) {
+ const extraLinks = newSelected.task.extraLinks || [];
+ const { instance } = newSelected;
+ callModal(
+ taskId,
+ instance.executionDate,
+ extraLinks,
+ instance.tryNumber,
+ instance.operator === 'SubDagOperator' || undefined,
+ instance.runId,
+ );
+ }
+ }
+ };
return (
- <Flex pl="24px" position="relative" flexDirection="row" justifyContent="space-between" ref={containerRef}>
+ <Box pl="24px" position="relative" ref={containerRef}>
<Text transform="rotate(-90deg)" position="absolute" left="-6px" top="130px">Runs</Text>
<Text transform="rotate(-90deg)" position="absolute" left="-6px" top="190px">Tasks</Text>
- <FormControl display="flex" position="absolute" left="220px">
- {isRefreshOn && <Spinner color="blue.500" speed="1s" mr="4px" />}
- <FormLabel htmlFor="auto-refresh" mb={0} fontSize="12px" fontWeight="normal">
- Auto-refresh
- </FormLabel>
- <Switch id="auto-refresh" onChange={onToggleRefresh} isChecked={isRefreshOn} size="lg" />
- </FormControl>
- <Box mr="12px" pb="12px" overflowX="auto" ref={scrollRef} maxWidth="300px" minWidth="300px" position="relative" mt="24px">
- <Table height={0}>
- <Thead>
- <DagRuns
- containerRef={containerRef}
- selected={selected}
- onSelect={onSelect}
- />
- </Thead>
- <Tbody>
- {renderTaskRows({
- task: groups, containerRef, onSelect, selected, dagRunIds,
- })}
- </Tbody>
- </Table>
- </Box>
- <Details selected={selected} onSelect={onSelect} />
- </Flex>
+ <Flex flexGrow={1} justifyContent="flex-end" alignItems="center">
+ <FormControl display="flex" width="auto" mr={2}>
+ {isRefreshOn && <Spinner color="blue.500" speed="1s" mr="4px" />}
+ <FormLabel htmlFor="auto-refresh" mb={0} fontSize="12px" fontWeight="normal">
+ Auto-refresh
+ </FormLabel>
+ <Switch id="auto-refresh" onChange={onToggleRefresh} isChecked={isRefreshOn} size="lg" />
+ </FormControl>
+ <IconButton onClick={toggleSidePanel}>
+ {isOpen
+ ? <MdArrowForward size="18px" aria-label="Collapse Details" title="Collapse Details" />
+ : <MdArrowBack size="18px" title="Expand Details" aria-label="Expand Details" />}
+ </IconButton>
+ </Flex>
+ <Flex flexDirection="row" justifyContent="space-between">
+ <Box
+ mr="12px"
+ mt="24px"
+ pb="12px"
+ overflowX="auto"
+ ref={scrollRef}
+ maxWidth={isOpen && '300px'}
+ minWidth={isOpen && '300px'}
+ >
+ <Table height={0}>
+ <Thead>
+ <DagRuns
+ containerRef={containerRef}
+ selected={selected}
+ onSelect={onSelect}
+ />
+ </Thead>
+ <Tbody>
+ {renderTaskRows({
+ task: groups, containerRef, onSelect, selected, dagRunIds,
+ })}
+ </Tbody>
+ </Table>
+ </Box>
+ {isOpen && (
+ <Details selected={selected} onSelect={onSelect} />
+ )}
+ </Flex>
+ </Box>
);
};
diff --git a/airflow/www/static/js/tree/api/useDag.js b/airflow/www/static/js/tree/api/useDag.js
index 6c19ee4..1343302 100644
--- a/airflow/www/static/js/tree/api/useDag.js
+++ b/airflow/www/static/js/tree/api/useDag.js
@@ -23,6 +23,6 @@ import { useQuery } from 'react-query';
export default function useDag(dagId) {
return useQuery(
['dag', dagId],
- () => axios.get(`/dags/${dagId}`),
+ () => axios.get(`/dags/${dagId}/details`),
);
}
diff --git a/airflow/www/static/js/tree/dagRuns/Bar.jsx b/airflow/www/static/js/tree/dagRuns/Bar.jsx
index 94c84e7..14df756 100644
--- a/airflow/www/static/js/tree/dagRuns/Bar.jsx
+++ b/airflow/www/static/js/tree/dagRuns/Bar.jsx
@@ -57,10 +57,7 @@ const DagRunBar = ({
cursor="pointer"
width="14px"
zIndex={1}
- onClick={(e) => {
- e.stopPropagation();
- onSelect({ runId: run.runId, dagRun: run });
- }}
+ onClick={() => onSelect({ runId: run.runId, dagRun: run })}
position="relative"
data-peer
>
diff --git a/airflow/www/static/js/tree/details/Header.jsx b/airflow/www/static/js/tree/details/Header.jsx
index c362606..2982a4e 100644
--- a/airflow/www/static/js/tree/details/Header.jsx
+++ b/airflow/www/static/js/tree/details/Header.jsx
@@ -45,7 +45,6 @@ const Header = ({
dagRuns,
}) => {
const dagRun = dagRuns.find((r) => r.runId === runId);
- // console.log(dagRun);
let runLabel = dagRun ? formatDateTime(dagRun.dataIntervalEnd) : '';
if (dagRun && dagRun.runType === 'manual') {
runLabel = (
@@ -65,7 +64,7 @@ const Header = ({
</BreadcrumbItem>
{runId && (
<BreadcrumbItem isCurrentPage={runId && !taskId}>
- <BreadcrumbLink onClick={() => onSelect({ runId })}>
+ <BreadcrumbLink onClick={() => onSelect({ runId, dagRun })}>
<LabelValue label="Run" value={runLabel} />
</BreadcrumbLink>
</BreadcrumbItem>
diff --git a/airflow/www/static/js/tree/details/content/Dag.jsx b/airflow/www/static/js/tree/details/content/Dag.jsx
index e71d9c2..7ee19ec 100644
--- a/airflow/www/static/js/tree/details/content/Dag.jsx
+++ b/airflow/www/static/js/tree/details/content/Dag.jsx
@@ -27,13 +27,23 @@ import {
} from '@chakra-ui/react';
import { getMetaValue } from '../../../utils';
-import { useDag } from '../../api';
+import { useDag, useTasks } from '../../api';
const dagId = getMetaValue('dag_id');
const Dag = () => {
const { data: dag } = useDag(dagId);
- if (!dag) return null;
+ const { data: taskData } = useTasks(dagId);
+ if (!dag || !taskData) return null;
+ const { tasks = [], totalEntries = '' } = taskData;
+ const operators = {};
+ tasks.forEach((t) => {
+ if (!operators[t.classRef.className]) {
+ operators[t.classRef.className] = 1;
+ } else {
+ operators[t.classRef.className] += 1;
+ }
+ });
const {
description, tags, fileloc, owners,
} = dag;
@@ -50,6 +60,19 @@ const Dag = () => {
<Text mr={2}>Owner:</Text>
{owners.map((o) => <Text key={o}>{o}</Text>)}
</Flex>
+ <Text>
+ {totalEntries}
+ {' '}
+ Tasks
+ </Text>
+ {Object.entries(operators).map(([key, value]) => (
+ <Text key={key}>
+ {value}
+ {' '}
+ {key}
+ {value > 1 && 's'}
+ </Text>
+ ))}
</>
);
};
diff --git a/airflow/www/static/js/tree/details/index.jsx b/airflow/www/static/js/tree/details/index.jsx
index ee3b044..c932b05 100644
--- a/airflow/www/static/js/tree/details/index.jsx
+++ b/airflow/www/static/js/tree/details/index.jsx
@@ -35,7 +35,6 @@ const Details = ({
onSelect,
}) => {
const { data: { dagRuns = [] } } = useTreeData();
- console.log(selected);
return (
<Flex borderLeftWidth="1px" flexDirection="column" p={3} flexGrow={1}>
<Header selected={selected} onSelect={onSelect} dagRuns={dagRuns} />
diff --git a/airflow/www/static/js/tree/renderTaskRows.jsx b/airflow/www/static/js/tree/renderTaskRows.jsx
index 36e0b94..f5a2345 100644
--- a/airflow/www/static/js/tree/renderTaskRows.jsx
+++ b/airflow/www/static/js/tree/renderTaskRows.jsx
@@ -40,18 +40,14 @@ import { getMetaValue } from '../utils';
const dagId = getMetaValue('dag_id');
const renderTaskRows = ({
- task, containerRef, level = 0, isParentOpen, onSelect, selected, dagRunIds,
+ task, level = 0, ...rest
}) => task.children.map((t) => (
<Row
+ {...rest}
key={t.id}
task={t}
level={level}
- containerRef={containerRef}
prevTaskId={task.id}
- isParentOpen={isParentOpen}
- onSelect={onSelect}
- selected={selected}
- dagRunIds={dagRunIds}
/>
));
diff --git a/airflow/www/utils.py b/airflow/www/utils.py
index ed765d4..b1f5e0d 100644
--- a/airflow/www/utils.py
+++ b/airflow/www/utils.py
@@ -43,8 +43,9 @@ from airflow.models import errors
from airflow.models.taskinstance import TaskInstance
from airflow.utils import timezone
from airflow.utils.code_utils import get_python_source
+from airflow.utils.helpers import alchemy_to_dict
from airflow.utils.json import AirflowJsonEncoder
-from airflow.utils.state import State
+from airflow.utils.state import State, TaskInstanceState
from airflow.www.forms import DateTimeWithTimezoneField
from airflow.www.widgets import AirflowDateTimePickerWidget
@@ -55,13 +56,82 @@ def datetime_to_string(value: Optional[DateTime]) -> Optional[str]:
return value.isoformat()
+def get_mapped_instances(task_instance, session):
+ return (
+ session.query(TaskInstance)
+ .filter(
+ TaskInstance.dag_id == task_instance.dag_id,
+ TaskInstance.run_id == task_instance.run_id,
+ TaskInstance.task_id == task_instance.task_id,
+ TaskInstance.map_index >= 0,
+ )
+ .all()
+ )
+
+
+def get_instance_with_map(task_instance, session):
+ if task_instance.map_index == -1:
+ return alchemy_to_dict(task_instance)
+ mapped_instances = get_mapped_instances(task_instance, session)
+ return get_mapped_summary(task_instance, mapped_instances)
+
+
+def get_mapped_summary(parent_instance, task_instances):
+ priority = [
+ TaskInstanceState.FAILED,
+ TaskInstanceState.UPSTREAM_FAILED,
+ TaskInstanceState.UP_FOR_RETRY,
+ TaskInstanceState.UP_FOR_RESCHEDULE,
+ TaskInstanceState.QUEUED,
+ TaskInstanceState.SCHEDULED,
+ TaskInstanceState.DEFERRED,
+ TaskInstanceState.SENSING,
+ TaskInstanceState.RUNNING,
+ TaskInstanceState.SHUTDOWN,
+ TaskInstanceState.RESTARTING,
+ TaskInstanceState.REMOVED,
+ TaskInstanceState.SUCCESS,
+ TaskInstanceState.SKIPPED,
+ ]
+
+ mapped_states = [ti.state for ti in task_instances]
+
+ group_state = None
+ for state in priority:
+ if state in mapped_states:
+ group_state = state
+ break
+
+ group_start_date = datetime_to_string(
+ min((ti.start_date for ti in task_instances if ti.start_date), default=None)
+ )
+ group_end_date = datetime_to_string(
+ max((ti.end_date for ti in task_instances if ti.end_date), default=None)
+ )
+
+ return {
+ 'task_id': parent_instance.task_id,
+ 'run_id': parent_instance.run_id,
+ 'state': group_state,
+ 'start_date': group_start_date,
+ 'end_date': group_end_date,
+ 'mapped_states': mapped_states,
+ 'operator': parent_instance.operator,
+ 'execution_date': datetime_to_string(parent_instance.execution_date),
+ 'try_number': parent_instance.try_number,
+ }
+
+
def encode_ti(
- task_instance: Optional[TaskInstance], is_mapped: Optional[bool], session: Session
+ task_instance: Optional[TaskInstance], is_mapped: Optional[bool], session: Optional[Session]
) -> Optional[Dict[str, Any]]:
if not task_instance:
return None
- summary = {
+ if is_mapped:
+ return get_mapped_summary(task_instance, task_instances=get_mapped_instances(task_instance, session))
+
+ return {
'task_id': task_instance.task_id,
'dag_id': task_instance.dag_id,
'run_id': task_instance.run_id,
@@ -74,66 +144,6 @@ def encode_ti(
'try_number': task_instance.try_number,
}
- def get_mapped_summary(task_instances):
- priority = [
- 'failed',
- 'upstream_failed',
- 'up_for_retry',
- 'up_for_reschedule',
- 'queued',
- 'scheduled',
- 'deferred',
- 'sensing',
- 'running',
- 'shutdown',
- 'restarting',
- 'removed',
- 'no_status',
- 'success',
- 'skipped',
- ]
-
- mapped_states = [ti.state for ti in task_instances]
-
- group_state = None
- for state in priority:
- if state in mapped_states:
- group_state = state
- break
-
- group_start_date = datetime_to_string(
- min((ti.start_date for ti in task_instances if ti.start_date), default=None)
- )
- group_end_date = datetime_to_string(
- max((ti.end_date for ti in task_instances if ti.end_date), default=None)
- )
-
- return {
- 'task_id': task_instance.task_id,
- 'run_id': task_instance.run_id,
- 'state': group_state,
- 'start_date': group_start_date,
- 'end_date': group_end_date,
- 'mapped_states': mapped_states,
- 'operator': task_instance.operator,
- 'execution_date': datetime_to_string(task_instance.execution_date),
- 'try_number': task_instance.try_number,
- }
-
- if is_mapped:
- return get_mapped_summary(
- session.query(TaskInstance)
- .filter(
- TaskInstance.dag_id == task_instance.dag_id,
- TaskInstance.run_id == task_instance.run_id,
- TaskInstance.task_id == task_instance.task_id,
- TaskInstance.map_index >= 0,
- )
- .all()
- )
-
- return summary
-
def encode_dag_run(dag_run: Optional[models.DagRun]) -> Optional[Dict[str, Any]]:
if not dag_run: