You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bb...@apache.org on 2022/03/08 20:57:18 UTC

[airflow] 09/11: make side panel collapsible, useTasks,

This is an automated email from the ASF dual-hosted git repository.

bbovenzi pushed a commit to branch mapped-task-drawer
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit e106d1721b9340ff031a4793d3b485265d33edba
Author: Brent Bovenzi <br...@gmail.com>
AuthorDate: Wed Mar 2 14:23:10 2022 -0500

    make side panel collapsible, useTasks,
---
 airflow/www/static/js/tree/StatusBox.jsx           |   3 +-
 airflow/www/static/js/tree/Tree.jsx                | 126 +++++++++++++------
 airflow/www/static/js/tree/api/useDag.js           |   2 +-
 airflow/www/static/js/tree/dagRuns/Bar.jsx         |   5 +-
 airflow/www/static/js/tree/details/Header.jsx      |   3 +-
 airflow/www/static/js/tree/details/content/Dag.jsx |  27 +++-
 airflow/www/static/js/tree/details/index.jsx       |   1 -
 airflow/www/static/js/tree/renderTaskRows.jsx      |   8 +-
 airflow/www/utils.py                               | 136 +++++++++++----------
 9 files changed, 196 insertions(+), 115 deletions(-)

diff --git a/airflow/www/static/js/tree/StatusBox.jsx b/airflow/www/static/js/tree/StatusBox.jsx
index 62762c5..0f5294f 100644
--- a/airflow/www/static/js/tree/StatusBox.jsx
+++ b/airflow/www/static/js/tree/StatusBox.jsx
@@ -48,8 +48,7 @@ const StatusBox = ({
       .forEach((e) => { e.style.backgroundColor = null; });
   };
 
-  const onClick = (e) => {
-    e.stopPropagation();
+  const onClick = () => {
     onMouseLeave();
     onSelect({
       taskId, runId, instance, task: group,
diff --git a/airflow/www/static/js/tree/Tree.jsx b/airflow/www/static/js/tree/Tree.jsx
index aeeaeb5..0d84639 100644
--- a/airflow/www/static/js/tree/Tree.jsx
+++ b/airflow/www/static/js/tree/Tree.jsx
@@ -17,6 +17,8 @@
  * under the License.
  */
 
+/* global localStorage */
+
 import React, { useRef, useEffect, useState } from 'react';
 import {
   Table,
@@ -29,18 +31,32 @@ import {
   Text,
   Thead,
   Flex,
+  useDisclosure,
+  IconButton,
 } from '@chakra-ui/react';
+import { MdArrowForward, MdArrowBack } from 'react-icons/md';
 
 import useTreeData from './useTreeData';
 import renderTaskRows from './renderTaskRows';
 import DagRuns from './dagRuns';
 import Details from './details';
+import { callModal, callModalDag } from '../dag';
+
+const sidePanelKey = 'showSidePanel';
 
 const Tree = () => {
   const containerRef = useRef();
   const scrollRef = useRef();
   const { data: { groups = {}, dagRuns = [] }, isRefreshOn, onToggleRefresh } = useTreeData();
-  const [selected, setSelected] = useState({});
+  const [selected, setSelected] = useState({}); // selected task instance or dag run
+  const isPanelOpen = JSON.parse(localStorage.getItem(sidePanelKey));
+  const { isOpen, onToggle } = useDisclosure({ defaultIsOpen: isPanelOpen });
+
+  const toggleSidePanel = () => {
+    if (!isOpen) localStorage.setItem(sidePanelKey, true);
+    else localStorage.setItem(sidePanelKey, false);
+    onToggle();
+  };
 
   const dagRunIds = dagRuns.map((dr) => dr.runId);
 
@@ -48,46 +64,88 @@ const Tree = () => {
     // Set initial scroll to far right if it is scrollable
     const runsContainer = scrollRef.current;
     if (runsContainer && runsContainer.scrollWidth > runsContainer.clientWidth) {
-      runsContainer.scrollBy(runsContainer.clientWidth, 0);
+      runsContainer.scrollBy(runsContainer.clientWidth, 250);
     }
-  }, []);
+  }, [isOpen]); // isOpen is to redo the scroll when the side panel opens/closes
 
   const { runId, taskId } = selected;
-  const onSelect = (newInstance) => (
-    (newInstance.runId === runId && newInstance.taskId === taskId)
-      ? setSelected({})
-      : setSelected(newInstance)
-  );
+
+  // show task/run info in the side panel, or just call the regular action modal
+  const onSelect = (newSelected) => {
+    if (isOpen) {
+      const isSame = newSelected.runId === runId && newSelected.taskId === taskId;
+      setSelected(isSame ? {} : newSelected);
+    } else if (!isOpen) {
+      if (newSelected.dagRun) {
+        const { dagRun } = newSelected;
+        callModalDag({
+          execution_date: dagRun.executionDate,
+          dag_id: dagRun.dagId,
+          run_id: dagRun.runId,
+        });
+      } else if (newSelected.instance) {
+        const extraLinks = newSelected.task.extraLinks || [];
+        const { instance } = newSelected;
+        callModal(
+          taskId,
+          instance.executionDate,
+          extraLinks,
+          instance.tryNumber,
+          instance.operator === 'SubDagOperator' || undefined,
+          instance.runId,
+        );
+      }
+    }
+  };
 
   return (
-    <Flex pl="24px" position="relative" flexDirection="row" justifyContent="space-between" ref={containerRef}>
+    <Box pl="24px" position="relative" ref={containerRef}>
       <Text transform="rotate(-90deg)" position="absolute" left="-6px" top="130px">Runs</Text>
       <Text transform="rotate(-90deg)" position="absolute" left="-6px" top="190px">Tasks</Text>
-      <FormControl display="flex" position="absolute" left="220px">
-        {isRefreshOn && <Spinner color="blue.500" speed="1s" mr="4px" />}
-        <FormLabel htmlFor="auto-refresh" mb={0} fontSize="12px" fontWeight="normal">
-          Auto-refresh
-        </FormLabel>
-        <Switch id="auto-refresh" onChange={onToggleRefresh} isChecked={isRefreshOn} size="lg" />
-      </FormControl>
-      <Box mr="12px" pb="12px" overflowX="auto" ref={scrollRef} maxWidth="300px" minWidth="300px" position="relative" mt="24px">
-        <Table height={0}>
-          <Thead>
-            <DagRuns
-              containerRef={containerRef}
-              selected={selected}
-              onSelect={onSelect}
-            />
-          </Thead>
-          <Tbody>
-            {renderTaskRows({
-              task: groups, containerRef, onSelect, selected, dagRunIds,
-            })}
-          </Tbody>
-        </Table>
-      </Box>
-      <Details selected={selected} onSelect={onSelect} />
-    </Flex>
+      <Flex flexGrow={1} justifyContent="flex-end" alignItems="center">
+        <FormControl display="flex" width="auto" mr={2}>
+          {isRefreshOn && <Spinner color="blue.500" speed="1s" mr="4px" />}
+          <FormLabel htmlFor="auto-refresh" mb={0} fontSize="12px" fontWeight="normal">
+            Auto-refresh
+          </FormLabel>
+          <Switch id="auto-refresh" onChange={onToggleRefresh} isChecked={isRefreshOn} size="lg" />
+        </FormControl>
+        <IconButton onClick={toggleSidePanel}>
+          {isOpen
+            ? <MdArrowForward size="18px" aria-label="Collapse Details" title="Collapse Details" />
+            : <MdArrowBack size="18px" title="Expand Details" aria-label="Expand Details" />}
+        </IconButton>
+      </Flex>
+      <Flex flexDirection="row" justifyContent="space-between">
+        <Box
+          mr="12px"
+          mt="24px"
+          pb="12px"
+          overflowX="auto"
+          ref={scrollRef}
+          maxWidth={isOpen && '300px'}
+          minWidth={isOpen && '300px'}
+        >
+          <Table height={0}>
+            <Thead>
+              <DagRuns
+                containerRef={containerRef}
+                selected={selected}
+                onSelect={onSelect}
+              />
+            </Thead>
+            <Tbody>
+              {renderTaskRows({
+                task: groups, containerRef, onSelect, selected, dagRunIds,
+              })}
+            </Tbody>
+          </Table>
+        </Box>
+        {isOpen && (
+          <Details selected={selected} onSelect={onSelect} />
+        )}
+      </Flex>
+    </Box>
   );
 };
 
diff --git a/airflow/www/static/js/tree/api/useDag.js b/airflow/www/static/js/tree/api/useDag.js
index 6c19ee4..1343302 100644
--- a/airflow/www/static/js/tree/api/useDag.js
+++ b/airflow/www/static/js/tree/api/useDag.js
@@ -23,6 +23,6 @@ import { useQuery } from 'react-query';
 export default function useDag(dagId) {
   return useQuery(
     ['dag', dagId],
-    () => axios.get(`/dags/${dagId}`),
+    () => axios.get(`/dags/${dagId}/details`),
   );
 }
diff --git a/airflow/www/static/js/tree/dagRuns/Bar.jsx b/airflow/www/static/js/tree/dagRuns/Bar.jsx
index 94c84e7..14df756 100644
--- a/airflow/www/static/js/tree/dagRuns/Bar.jsx
+++ b/airflow/www/static/js/tree/dagRuns/Bar.jsx
@@ -57,10 +57,7 @@ const DagRunBar = ({
         cursor="pointer"
         width="14px"
         zIndex={1}
-        onClick={(e) => {
-          e.stopPropagation();
-          onSelect({ runId: run.runId, dagRun: run });
-        }}
+        onClick={() => onSelect({ runId: run.runId, dagRun: run })}
         position="relative"
         data-peer
       >
diff --git a/airflow/www/static/js/tree/details/Header.jsx b/airflow/www/static/js/tree/details/Header.jsx
index c362606..2982a4e 100644
--- a/airflow/www/static/js/tree/details/Header.jsx
+++ b/airflow/www/static/js/tree/details/Header.jsx
@@ -45,7 +45,6 @@ const Header = ({
   dagRuns,
 }) => {
   const dagRun = dagRuns.find((r) => r.runId === runId);
-  // console.log(dagRun);
   let runLabel = dagRun ? formatDateTime(dagRun.dataIntervalEnd) : '';
   if (dagRun && dagRun.runType === 'manual') {
     runLabel = (
@@ -65,7 +64,7 @@ const Header = ({
       </BreadcrumbItem>
       {runId && (
         <BreadcrumbItem isCurrentPage={runId && !taskId}>
-          <BreadcrumbLink onClick={() => onSelect({ runId })}>
+          <BreadcrumbLink onClick={() => onSelect({ runId, dagRun })}>
             <LabelValue label="Run" value={runLabel} />
           </BreadcrumbLink>
         </BreadcrumbItem>
diff --git a/airflow/www/static/js/tree/details/content/Dag.jsx b/airflow/www/static/js/tree/details/content/Dag.jsx
index e71d9c2..7ee19ec 100644
--- a/airflow/www/static/js/tree/details/content/Dag.jsx
+++ b/airflow/www/static/js/tree/details/content/Dag.jsx
@@ -27,13 +27,23 @@ import {
 } from '@chakra-ui/react';
 
 import { getMetaValue } from '../../../utils';
-import { useDag } from '../../api';
+import { useDag, useTasks } from '../../api';
 
 const dagId = getMetaValue('dag_id');
 
 const Dag = () => {
   const { data: dag } = useDag(dagId);
-  if (!dag) return null;
+  const { data: taskData } = useTasks(dagId);
+  if (!dag || !taskData) return null;
+  const { tasks = [], totalEntries = '' } = taskData;
+  const operators = {};
+  tasks.forEach((t) => {
+    if (!operators[t.classRef.className]) {
+      operators[t.classRef.className] = 1;
+    } else {
+      operators[t.classRef.className] += 1;
+    }
+  });
   const {
     description, tags, fileloc, owners,
   } = dag;
@@ -50,6 +60,19 @@ const Dag = () => {
         <Text mr={2}>Owner:</Text>
         {owners.map((o) => <Text key={o}>{o}</Text>)}
       </Flex>
+      <Text>
+        {totalEntries}
+        {' '}
+        Tasks
+      </Text>
+      {Object.entries(operators).map(([key, value]) => (
+        <Text key={key}>
+          {value}
+          {' '}
+          {key}
+          {value > 1 && 's'}
+        </Text>
+      ))}
     </>
   );
 };
diff --git a/airflow/www/static/js/tree/details/index.jsx b/airflow/www/static/js/tree/details/index.jsx
index ee3b044..c932b05 100644
--- a/airflow/www/static/js/tree/details/index.jsx
+++ b/airflow/www/static/js/tree/details/index.jsx
@@ -35,7 +35,6 @@ const Details = ({
   onSelect,
 }) => {
   const { data: { dagRuns = [] } } = useTreeData();
-  console.log(selected);
   return (
     <Flex borderLeftWidth="1px" flexDirection="column" p={3} flexGrow={1}>
       <Header selected={selected} onSelect={onSelect} dagRuns={dagRuns} />
diff --git a/airflow/www/static/js/tree/renderTaskRows.jsx b/airflow/www/static/js/tree/renderTaskRows.jsx
index 36e0b94..f5a2345 100644
--- a/airflow/www/static/js/tree/renderTaskRows.jsx
+++ b/airflow/www/static/js/tree/renderTaskRows.jsx
@@ -40,18 +40,14 @@ import { getMetaValue } from '../utils';
 const dagId = getMetaValue('dag_id');
 
 const renderTaskRows = ({
-  task, containerRef, level = 0, isParentOpen, onSelect, selected, dagRunIds,
+  task, level = 0, ...rest
 }) => task.children.map((t) => (
   <Row
+    {...rest}
     key={t.id}
     task={t}
     level={level}
-    containerRef={containerRef}
     prevTaskId={task.id}
-    isParentOpen={isParentOpen}
-    onSelect={onSelect}
-    selected={selected}
-    dagRunIds={dagRunIds}
   />
 ));
 
diff --git a/airflow/www/utils.py b/airflow/www/utils.py
index ed765d4..b1f5e0d 100644
--- a/airflow/www/utils.py
+++ b/airflow/www/utils.py
@@ -43,8 +43,9 @@ from airflow.models import errors
 from airflow.models.taskinstance import TaskInstance
 from airflow.utils import timezone
 from airflow.utils.code_utils import get_python_source
+from airflow.utils.helpers import alchemy_to_dict
 from airflow.utils.json import AirflowJsonEncoder
-from airflow.utils.state import State
+from airflow.utils.state import State, TaskInstanceState
 from airflow.www.forms import DateTimeWithTimezoneField
 from airflow.www.widgets import AirflowDateTimePickerWidget
 
@@ -55,13 +56,82 @@ def datetime_to_string(value: Optional[DateTime]) -> Optional[str]:
     return value.isoformat()
 
 
+def get_mapped_instances(task_instance, session):
+    return (
+        session.query(TaskInstance)
+        .filter(
+            TaskInstance.dag_id == task_instance.dag_id,
+            TaskInstance.run_id == task_instance.run_id,
+            TaskInstance.task_id == task_instance.task_id,
+            TaskInstance.map_index >= 0,
+        )
+        .all()
+    )
+
+
+def get_instance_with_map(task_instance, session):
+    if task_instance.map_index == -1:
+        return alchemy_to_dict(task_instance)
+    mapped_instances = get_mapped_instances(task_instance, session)
+    return get_mapped_summary(task_instance, mapped_instances)
+
+
+def get_mapped_summary(parent_instance, task_instances):
+    priority = [
+        TaskInstanceState.FAILED,
+        TaskInstanceState.UPSTREAM_FAILED,
+        TaskInstanceState.UP_FOR_RETRY,
+        TaskInstanceState.UP_FOR_RESCHEDULE,
+        TaskInstanceState.QUEUED,
+        TaskInstanceState.SCHEDULED,
+        TaskInstanceState.DEFERRED,
+        TaskInstanceState.SENSING,
+        TaskInstanceState.RUNNING,
+        TaskInstanceState.SHUTDOWN,
+        TaskInstanceState.RESTARTING,
+        TaskInstanceState.REMOVED,
+        TaskInstanceState.SUCCESS,
+        TaskInstanceState.SKIPPED,
+    ]
+
+    mapped_states = [ti.state for ti in task_instances]
+
+    group_state = None
+    for state in priority:
+        if state in mapped_states:
+            group_state = state
+            break
+
+    group_start_date = datetime_to_string(
+        min((ti.start_date for ti in task_instances if ti.start_date), default=None)
+    )
+    group_end_date = datetime_to_string(
+        max((ti.end_date for ti in task_instances if ti.end_date), default=None)
+    )
+
+    return {
+        'task_id': parent_instance.task_id,
+        'run_id': parent_instance.run_id,
+        'state': group_state,
+        'start_date': group_start_date,
+        'end_date': group_end_date,
+        'mapped_states': mapped_states,
+        'operator': parent_instance.operator,
+        'execution_date': datetime_to_string(parent_instance.execution_date),
+        'try_number': parent_instance.try_number,
+    }
+
+
 def encode_ti(
-    task_instance: Optional[TaskInstance], is_mapped: Optional[bool], session: Session
+    task_instance: Optional[TaskInstance], is_mapped: Optional[bool], session: Optional[Session]
 ) -> Optional[Dict[str, Any]]:
     if not task_instance:
         return None
 
-    summary = {
+    if is_mapped:
+        return get_mapped_summary(task_instance, task_instances=get_mapped_instances(task_instance, session))
+
+    return {
         'task_id': task_instance.task_id,
         'dag_id': task_instance.dag_id,
         'run_id': task_instance.run_id,
@@ -74,66 +144,6 @@ def encode_ti(
         'try_number': task_instance.try_number,
     }
 
-    def get_mapped_summary(task_instances):
-        priority = [
-            'failed',
-            'upstream_failed',
-            'up_for_retry',
-            'up_for_reschedule',
-            'queued',
-            'scheduled',
-            'deferred',
-            'sensing',
-            'running',
-            'shutdown',
-            'restarting',
-            'removed',
-            'no_status',
-            'success',
-            'skipped',
-        ]
-
-        mapped_states = [ti.state for ti in task_instances]
-
-        group_state = None
-        for state in priority:
-            if state in mapped_states:
-                group_state = state
-                break
-
-        group_start_date = datetime_to_string(
-            min((ti.start_date for ti in task_instances if ti.start_date), default=None)
-        )
-        group_end_date = datetime_to_string(
-            max((ti.end_date for ti in task_instances if ti.end_date), default=None)
-        )
-
-        return {
-            'task_id': task_instance.task_id,
-            'run_id': task_instance.run_id,
-            'state': group_state,
-            'start_date': group_start_date,
-            'end_date': group_end_date,
-            'mapped_states': mapped_states,
-            'operator': task_instance.operator,
-            'execution_date': datetime_to_string(task_instance.execution_date),
-            'try_number': task_instance.try_number,
-        }
-
-    if is_mapped:
-        return get_mapped_summary(
-            session.query(TaskInstance)
-            .filter(
-                TaskInstance.dag_id == task_instance.dag_id,
-                TaskInstance.run_id == task_instance.run_id,
-                TaskInstance.task_id == task_instance.task_id,
-                TaskInstance.map_index >= 0,
-            )
-            .all()
-        )
-
-    return summary
-
 
 def encode_dag_run(dag_run: Optional[models.DagRun]) -> Optional[Dict[str, Any]]:
     if not dag_run: