You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bb...@apache.org on 2022/02/25 16:59:13 UTC

[airflow] branch mapped-task-drawer created (now fbd49c6)

This is an automated email from the ASF dual-hosted git repository.

bbovenzi pushed a change to branch mapped-task-drawer
in repository https://gitbox.apache.org/repos/asf/airflow.git.


      at fbd49c6  reformat grid background colors

This branch includes the following new commits:

     new b7ea8f8  Create an end-to-end test for running a DAG via the scheduler.
     new c6da3ea  Expand mapped tasks in the Scheduler
     new 911aaeb  make UI and tree work with mapped tasks
     new d98af9a  basic slide drawer
     new fbd49c6  reformat grid background colors

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[airflow] 05/05: reformat grid background colors

Posted by bb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bbovenzi pushed a commit to branch mapped-task-drawer
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit fbd49c6d79d9ac21dd0a404500152a4c49db6c38
Author: Brent Bovenzi <br...@gmail.com>
AuthorDate: Fri Feb 25 11:58:23 2022 -0500

    reformat grid background colors
---
 airflow/www/static/js/tree/InstanceTooltip.jsx | 16 ++---
 airflow/www/static/js/tree/SidePanel.jsx       | 45 +++++++++++--
 airflow/www/static/js/tree/StatusBox.jsx       | 19 +++---
 airflow/www/static/js/tree/Tree.jsx            | 28 ++++----
 airflow/www/static/js/tree/dagRuns/Bar.jsx     |  9 ++-
 airflow/www/static/js/tree/dagRuns/index.jsx   |  3 +-
 airflow/www/static/js/tree/renderTaskRows.jsx  | 88 ++++++++++++++------------
 7 files changed, 128 insertions(+), 80 deletions(-)

diff --git a/airflow/www/static/js/tree/InstanceTooltip.jsx b/airflow/www/static/js/tree/InstanceTooltip.jsx
index 612905b..6abe7dc 100644
--- a/airflow/www/static/js/tree/InstanceTooltip.jsx
+++ b/airflow/www/static/js/tree/InstanceTooltip.jsx
@@ -123,29 +123,29 @@ const InstanceTooltip = ({
         </>
       )}
       <br />
-      <Text>
+      {/* <Text>
         {taskIdTitle}
         {taskId}
-      </Text>
-      <Text whiteSpace="nowrap">
+      </Text> */}
+      {/* <Text whiteSpace="nowrap">
         Run Id:
         {' '}
         {runId}
-      </Text>
-      {operator && (
+      </Text> */}
+      {/* {operator && (
       <Text>
         Operator:
         {' '}
         {operator}
       </Text>
-      )}
+      )} */}
       <Text>
         Duration:
         {' '}
         {formatDuration(duration || getDuration(startDate, endDate))}
       </Text>
       <br />
-      <Text as="strong">UTC</Text>
+      {/* <Text as="strong">UTC</Text>
       <Text>
         Started:
         {' '}
@@ -171,7 +171,7 @@ const InstanceTooltip = ({
         Ended:
         {' '}
         {endDate && formatDateTime(endDate)}
-      </Text>
+      </Text> */}
     </Box>
   );
 };
diff --git a/airflow/www/static/js/tree/SidePanel.jsx b/airflow/www/static/js/tree/SidePanel.jsx
index 3515f1c..7a21eba 100644
--- a/airflow/www/static/js/tree/SidePanel.jsx
+++ b/airflow/www/static/js/tree/SidePanel.jsx
@@ -21,29 +21,60 @@
 
 import React from 'react';
 import {
-  Box,
   chakra,
   Flex,
   Text,
+  useDisclosure,
+  CloseButton,
+  Button,
+  IconButton,
 } from '@chakra-ui/react';
+import { MdKeyboardArrowLeft, MdKeyboardArrowRight } from 'react-icons/md';
 
 import { formatDateTime } from '../datetime_utils';
 
-const SidePanel = ({ instance: { runId, taskId, executionDate }, isOpen }) => (
-  <Box bg="gray.200" maxWidth={isOpen ? 300 : 0} minWidth={isOpen ? 300 : 0} transition="all 0.5s" position="relative" overflow="hidden">
-    <Flex right={isOpen ? 0 : -300} top={0} transition="right 0.5s, max-width 0.5s" width={300} flexDirection="column" m={2}>
-      <Text as="h4">
+const SidePanel = ({ instance: { runId, taskId, executionDate } }) => {
+  const { isOpen, onOpen, onClose } = useDisclosure();
+  if (!isOpen) {
+    return (
+      <IconButton
+        ml={2}
+        icon={<MdKeyboardArrowLeft size={18} />}
+        aria-label="Open Details Panel"
+        onClick={onOpen}
+      />
+    );
+  }
+  const title = runId && taskId
+    ? (
+      <>
         <chakra.span>Task Instance: </chakra.span>
         <chakra.b>{taskId}</chakra.b>
         <chakra.span> at </chakra.span>
         <chakra.b>{formatDateTime(moment.utc(executionDate))}</chakra.b>
+      </>
+    )
+    : (
+      <chakra.span>Dag Details: </chakra.span>
+    );
+
+  return (
+    <Flex bg="gray.200" maxWidth={300} minWidth={300} flexDirection="column" p={3}>
+      <IconButton
+        ml={2}
+        icon={<MdKeyboardArrowRight size={18} />}
+        aria-label="Close Details Panel"
+        onClick={onClose}
+      />
+      <Text as="h4">
+        {title}
       </Text>
       <Text>
         {/* eslint-disable-next-line max-len */}
         Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Nunc vel risus commodo viverra maecenas accumsan. Ut porttitor leo a diam sollicitudin tempor id eu. Molestie at elementum eu facilisis sed odio morbi quis commodo. Facilisis leo vel fringilla est ullamcorper eget nulla facilisi etiam. Est sit amet facilisis magna etiam tempor orci eu. Id semper risus in hendrerit gravida rutrum. Ac odio tempor orci dapibus  [...]
       </Text>
     </Flex>
-  </Box>
-);
+  );
+};
 
 export default SidePanel;
diff --git a/airflow/www/static/js/tree/StatusBox.jsx b/airflow/www/static/js/tree/StatusBox.jsx
index dce0fda..b4abc93 100644
--- a/airflow/www/static/js/tree/StatusBox.jsx
+++ b/airflow/www/static/js/tree/StatusBox.jsx
@@ -24,9 +24,10 @@ import {
   Flex,
   Box,
   Tooltip,
+  useTheme,
 } from '@chakra-ui/react';
 
-import { callModal } from '../dag';
+// import { callModal } from '../dag';
 import InstanceTooltip from './InstanceTooltip';
 
 const StatusBox = ({
@@ -35,21 +36,19 @@ const StatusBox = ({
   const {
     executionDate, taskId, tryNumber = 0, operator, runId,
   } = instance;
+  const { colors } = useTheme();
+  const hoverBlue = `${colors.blue[100]}50`;
 
-  const onOpenModal = () => executionDate && callModal(taskId, executionDate, extraLinks, tryNumber, operator === 'SubDagOperator' || undefined, runId);
-  const onClick = () => {
-    if (group.isMapped) {
-      onSelectInstance(instance);
-    } else {
-      onSelectInstance({});
-      onOpenModal();
-    }
+  // const onOpenModal = () => executionDate && callModal(taskId, executionDate, extraLinks, tryNumber, operator === 'SubDagOperator' || undefined, runId);
+  const onClick = (e) => {
+    e.stopPropagation();
+    onSelectInstance(instance);
   };
 
   // Fetch the corresponding column element and set its background color when hovering
   const onMouseOver = () => {
     [...containerRef.current.getElementsByClassName(`js-${runId}`)]
-      .forEach((e) => { e.style.backgroundColor = 'rgba(113, 128, 150, 0.1)'; });
+      .forEach((e) => { e.style.backgroundColor = hoverBlue; });
   };
   const onMouseLeave = () => {
     [...containerRef.current.getElementsByClassName(`js-${runId}`)]
diff --git a/airflow/www/static/js/tree/Tree.jsx b/airflow/www/static/js/tree/Tree.jsx
index 095adbd..ff1402f 100644
--- a/airflow/www/static/js/tree/Tree.jsx
+++ b/airflow/www/static/js/tree/Tree.jsx
@@ -59,7 +59,7 @@ const Tree = () => {
 
   return (
     <Box position="relative" ref={containerRef}>
-      <FormControl display="flex" alignItems="center" justifyContent="flex-end" width="100%">
+      <FormControl display="flex" alignItems="center" justifyContent="flex-end" width="100%" mb={2}>
         {isRefreshOn && <Spinner color="blue.500" speed="1s" mr="4px" />}
         <FormLabel htmlFor="auto-refresh" mb={0} fontSize="12px" fontWeight="normal">
           Auto-refresh
@@ -68,19 +68,21 @@ const Tree = () => {
       </FormControl>
       <Text transform="rotate(-90deg)" position="absolute" left="-6px" top="130px">Runs</Text>
       <Text transform="rotate(-90deg)" position="absolute" left="-6px" top="190px">Tasks</Text>
-      <Box pl="24px">
+      <Box pl="24px" height="100%" onClick={() => setSelectedInstance({})}>
         <Flex position="relative" flexDirection="row" justifyContent="space-between" overflow="hidden">
-          <Table mr="24px" overflowX="auto" ref={scrollRef} height={0}>
-            <Thead>
-              <DagRuns containerRef={containerRef} />
-            </Thead>
-            <Tbody>
-              {renderTaskRows({
-                task: groups, containerRef, onSelectInstance,
-              })}
-            </Tbody>
-          </Table>
-          <SidePanel isOpen={!!runId} instance={selectedInstance} />
+          <Box mr="12px" pb="12px" overflowX="auto" ref={scrollRef} maxWidth="60vw">
+            <Table height={0}>
+              <Thead>
+                <DagRuns containerRef={containerRef} selectedInstance={selectedInstance} />
+              </Thead>
+              <Tbody>
+                {renderTaskRows({
+                  task: groups, containerRef, onSelectInstance, selectedInstance,
+                })}
+              </Tbody>
+            </Table>
+          </Box>
+          <SidePanel instance={selectedInstance} />
         </Flex>
       </Box>
     </Box>
diff --git a/airflow/www/static/js/tree/dagRuns/Bar.jsx b/airflow/www/static/js/tree/dagRuns/Bar.jsx
index a0fa5b1..4dbe20d 100644
--- a/airflow/www/static/js/tree/dagRuns/Bar.jsx
+++ b/airflow/www/static/js/tree/dagRuns/Bar.jsx
@@ -26,6 +26,7 @@ import {
   Tooltip,
   Text,
   VStack,
+  useTheme,
 } from '@chakra-ui/react';
 import { MdPlayArrow } from 'react-icons/md';
 
@@ -35,13 +36,16 @@ import { callModalDag } from '../../dag';
 const BAR_HEIGHT = 100;
 
 const DagRunBar = ({
-  run, max, index, totalRuns, containerRef,
+  run, max, index, totalRuns, containerRef, selectedInstance,
 }) => {
+  const { colors } = useTheme();
+  const hoverBlue = `${colors.blue[100]}50`;
   let highlightHeight = '100%';
   if (containerRef && containerRef.current) {
     const table = containerRef.current.getElementsByTagName('tbody')[0];
     highlightHeight = table.offsetHeight + BAR_HEIGHT;
   }
+  const isSelected = run.runId === selectedInstance.runId;
   return (
     <Box position="relative">
       <Flex
@@ -93,7 +97,8 @@ const DagRunBar = ({
         top="1px"
         height={highlightHeight}
         className={`js-${run.runId}`}
-        _peerHover={{ backgroundColor: 'rgba(113, 128, 150, 0.1)' }}
+        bg={isSelected ? 'blue.100' : undefined}
+        _peerHover={!isSelected && { backgroundColor: hoverBlue }}
         zIndex={0}
         transition="background-color 0.2s"
       />
diff --git a/airflow/www/static/js/tree/dagRuns/index.jsx b/airflow/www/static/js/tree/dagRuns/index.jsx
index a0a4931..7de949f 100644
--- a/airflow/www/static/js/tree/dagRuns/index.jsx
+++ b/airflow/www/static/js/tree/dagRuns/index.jsx
@@ -36,7 +36,7 @@ const DurationTick = ({ children, ...rest }) => (
   </Text>
 );
 
-const DagRuns = ({ containerRef }) => {
+const DagRuns = ({ containerRef, selectedInstance }) => {
   const { data: { dagRuns = [] } } = useTreeData();
   const durations = [];
   const runs = dagRuns.map((dagRun) => {
@@ -97,6 +97,7 @@ const DagRuns = ({ containerRef }) => {
               index={i}
               totalRuns={runs.length}
               containerRef={containerRef}
+              selectedInstance={selectedInstance}
             />
           ))}
         </Flex>
diff --git a/airflow/www/static/js/tree/renderTaskRows.jsx b/airflow/www/static/js/tree/renderTaskRows.jsx
index 23e52dc..f66d2e0 100644
--- a/airflow/www/static/js/tree/renderTaskRows.jsx
+++ b/airflow/www/static/js/tree/renderTaskRows.jsx
@@ -28,6 +28,7 @@ import {
   Flex,
   useDisclosure,
   Collapse,
+  useTheme,
 } from '@chakra-ui/react';
 import { FiChevronUp, FiChevronDown } from 'react-icons/fi';
 
@@ -40,7 +41,7 @@ import getMetaValue from '../meta_value';
 const dagId = getMetaValue('dag_id');
 
 const renderTaskRows = ({
-  task, containerRef, level = 0, isParentOpen, onSelectInstance,
+  task, containerRef, level = 0, isParentOpen, onSelectInstance, selectedInstance,
 }) => task.children.map((t) => (
   <Row
     key={t.id}
@@ -50,40 +51,37 @@ const renderTaskRows = ({
     prevTaskId={task.id}
     isParentOpen={isParentOpen}
     onSelectInstance={onSelectInstance}
+    selectedInstance={selectedInstance}
   />
 ));
 
 const TaskName = ({
   isGroup = false, isMapped = false, onToggle, isOpen, level, taskName,
 }) => (
-  <Box _groupHover={{ backgroundColor: 'rgba(113, 128, 150, 0.1)' }} transition="background-color 0.2s">
-    <Flex
-      as={isGroup ? 'button' : 'div'}
-      onClick={() => isGroup && onToggle()}
-      color={level > 4 && 'white'}
-      aria-label={taskName}
-      title={taskName}
-      mr={4}
-      width="100%"
-      backgroundColor={`rgba(203, 213, 224, ${0.25 * level})`}
-      alignItems="center"
+  <Flex
+    as={isGroup ? 'button' : 'div'}
+    onClick={() => isGroup && onToggle()}
+    aria-label={taskName}
+    title={taskName}
+    mr={4}
+    width="100%"
+    alignItems="center"
+  >
+    <Text
+      display="inline"
+      fontSize="12px"
+      ml={level * 4 + 4}
+      isTruncated
     >
-      <Text
-        display="inline"
-        fontSize="12px"
-        ml={level * 4 + 4}
-        isTruncated
-      >
-        {taskName}
-        {isMapped && (
-          ' [ ]'
-        )}
-      </Text>
-      {isGroup && (
-        isOpen ? <FiChevronDown data-testid="open-group" /> : <FiChevronUp data-testid="closed-group" />
+      {taskName}
+      {isMapped && (
+        ' [ ]'
       )}
-    </Flex>
-  </Box>
+    </Text>
+    {isGroup && (
+      isOpen ? <FiChevronDown data-testid="open-group" /> : <FiChevronUp data-testid="closed-group" />
+    )}
+  </Flex>
 );
 
 const TaskInstances = ({
@@ -104,16 +102,20 @@ const TaskInstances = ({
             onSelectInstance={onSelectInstance}
           />
         )
-        : <Box key={`${run.runId}-${task.id}`} width="18px" data-testid="blank-task" />;
+        : <Box key={`${run.runId}-${task.id}`} width="16px" data-testid="blank-task" />;
     })}
   </Flex>
 );
 
-const Row = ({
-  task, containerRef, level, prevTaskId, isParentOpen = true, onSelectInstance,
-}) => {
+const Row = (props) => {
+  const {
+    task, containerRef, level, prevTaskId, isParentOpen = true, onSelectInstance, selectedInstance,
+  } = props;
   const { data: { dagRuns = [] } } = useTreeData();
+  const { colors } = useTheme();
+  const hoverBlue = `${colors.blue[100]}50`;
   const isGroup = !!task.children;
+  const isSelected = selectedInstance.taskId === task.id;
 
   const taskName = prevTaskId ? task.id.replace(`${prevTaskId}.`, '') : task.id;
 
@@ -136,21 +138,23 @@ const Row = ({
   return (
     <>
       <Tr
-        backgroundColor={`rgba(203, 213, 224, ${0.25 * level})`}
+        bg={isSelected ? 'blue.100' : undefined}
         borderBottomWidth={isFullyOpen ? 1 : 0}
-        borderBottomColor={level > 1 ? 'white' : 'gray.200'}
+        borderBottomColor="gray.200"
         role="group"
+        _hover={!isSelected && { bg: hoverBlue }}
+        transition="background-color 0.2s"
       >
         <Td
-          _groupHover={level > 3 && {
-            color: 'white',
-          }}
+          bg={isSelected ? 'blue.100' : 'white'}
+          _groupHover={!isSelected && ({ bg: 'blue.50' })}
           p={0}
+          transition="background-color 0.2s"
           lineHeight="18px"
           position="sticky"
           left={0}
-          backgroundColor="white"
           borderBottom={0}
+          zIndex={2}
         >
           <Collapse in={isFullyOpen}>
             <TaskName
@@ -164,7 +168,11 @@ const Row = ({
           </Collapse>
         </Td>
         <Td width={0} p={0} borderBottom={0} />
-        <Td p={0} align="right" _groupHover={{ backgroundColor: 'rgba(113, 128, 150, 0.1)' }} transition="background-color 0.2s" borderBottom={0}>
+        <Td
+          p={0}
+          align="right"
+          borderBottom={0}
+        >
           <Collapse in={isFullyOpen}>
             <TaskInstances
               dagRuns={dagRuns}
@@ -177,7 +185,9 @@ const Row = ({
       </Tr>
       {isGroup && (
         renderTaskRows({
-          task, containerRef, level: level + 1, isParentOpen: isOpen, onSelectInstance,
+          ...props,
+          level: level + 1,
+          isParentOpen: isOpen,
         })
       )}
     </>

[airflow] 01/05: Create an end-to-end test for running a DAG via the scheduler.

Posted by bb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bbovenzi pushed a commit to branch mapped-task-drawer
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit b7ea8f86dcd0f5421eb8d36f6f1654e9dc877fa2
Author: Ash Berlin-Taylor <as...@apache.org>
AuthorDate: Tue Feb 15 21:58:55 2022 +0000

    Create an end-to-end test for running a DAG via the scheduler.
    
    This is important as there are a lot of moving parts in mapped DAGs and
    we want to make sure that, somewhere, the DAG runs to completion.
    
    This is marked as a long-running test as it could be :) It is certainly
    more than a "unit" test at any rate!
---
 tests/jobs/test_scheduler_job.py | 97 ++++++++++++++++++++++++++++++++++++++--
 1 file changed, 93 insertions(+), 4 deletions(-)

diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 845ffda..b803c2d 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -17,13 +17,14 @@
 # under the License.
 #
 
+import collections
 import datetime
 import logging
 import os
 import shutil
 from datetime import timedelta
 from tempfile import mkdtemp
-from typing import Generator, Optional
+from typing import Deque, Generator, Optional
 from unittest import mock
 from unittest.mock import MagicMock, patch
 
@@ -55,6 +56,7 @@ from airflow.utils.file import list_py_file_paths
 from airflow.utils.session import create_session, provide_session
 from airflow.utils.state import DagRunState, State, TaskInstanceState
 from airflow.utils.types import DagRunType
+from tests.models import TEST_DAGS_FOLDER
 from tests.test_utils.asserts import assert_queries_count
 from tests.test_utils.config import conf_vars, env_vars
 from tests.test_utils.db import (
@@ -127,7 +129,7 @@ class TestSchedulerJob:
 
     @pytest.fixture(autouse=True)
     def set_instance_attrs(self, dagbag) -> Generator:
-        self.dagbag = dagbag
+        self.dagbag: DagBag = dagbag
         # Speed up some tests by not running the tasks, just look at what we
         # enqueue!
         self.null_exec: Optional[MockExecutor] = MockExecutor()
@@ -139,7 +141,7 @@ class TestSchedulerJob:
             yield
 
         self.null_exec = None
-        self.dagbag = None
+        del self.dagbag
 
     def test_is_alive(self):
         self.scheduler_job = SchedulerJob(None, heartrate=10, state=State.RUNNING)
@@ -1637,8 +1639,10 @@ class TestSchedulerJob:
 
         try:
             dag = DagBag().get_dag(dag.dag_id)
-            assert not isinstance(dag, SerializedDAG)
             # This needs a _REAL_ dag, not the serialized version
+            assert not isinstance(dag, SerializedDAG)
+            # TODO: Can this be replaced with `self.run_scheduler_until_dagrun_terminal. `dag.run` isn't
+            # great to use here as it uses BackfillJob!
             dag.run(start_date=ex_date, end_date=ex_date, executor=self.null_exec, **run_kwargs)
         except AirflowException:
             pass
@@ -3589,6 +3593,91 @@ class TestSchedulerJob:
             result.simple_task_instance.key for result in callback_requests
         }
 
+    @mock.patch.object(settings, 'USE_JOB_SCHEDULE', False)
+    def run_scheduler_until_dagrun_terminal(self, job: SchedulerJob):
+        """
+        Run a scheduler until any dag run reaches a terminal state, or the scheduler becomes "idle".
+
+        This needs a DagRun to be pre-created (it can be in running or queued state) as no more will be
+        created as we turn off creating new DagRuns via setting USE_JOB_SCHEDULE to false
+        """
+        # Spy on _do_scheduling and _process_executor_events so we can notice
+        # if nothing happened, and abort early! Given we are using
+        # SequentialExecutor this shouldn't be possible -- if there is nothing
+        # to schedule and no events, it means we have stalled.
+        def spy_on_return(orig, result):
+            def spy(*args, **kwargs):
+                ret = orig(*args, **kwargs)
+                result.append(ret)
+                return ret
+
+            return spy
+
+        num_queued_tis: Deque[int] = collections.deque([], 3)
+        num_finished_events: Deque[int] = collections.deque([], 3)
+
+        do_scheduling_spy = mock.patch.object(
+            job,
+            '_do_scheduling',
+            side_effect=spy_on_return(job._do_scheduling, num_queued_tis),
+        )
+        executor_events_spy = mock.patch.object(
+            job,
+            '_process_executor_events',
+            side_effect=spy_on_return(job._process_executor_events, num_finished_events),
+        )
+
+        orig_set_state = DagRun.set_state
+
+        def watch_set_state(self: DagRun, state, **kwargs):
+            if state in (DagRunState.SUCCESS, DagRunState.FAILED):
+                # Stop the scheduler
+                job.num_runs = 1
+            orig_set_state(self, state, **kwargs)  # type: ignore[call-arg]
+
+        def watch_heartbeat(*args, **kwargs):
+            if len(num_queued_tis) < 3 or len(num_finished_events) < 3:
+                return
+            queued_any_tis = any(val > 0 for val in num_queued_tis)
+            finished_any_events = any(val > 0 for val in num_finished_events)
+            assert (
+                queued_any_tis or finished_any_events
+            ), "Scheduler has stalled without setting the DagRun state!"
+
+        set_state_spy = mock.patch.object(DagRun, 'set_state', new=watch_set_state)
+        heartbeat_spy = mock.patch.object(job, 'heartbeat', new=watch_heartbeat)
+
+        with heartbeat_spy, set_state_spy, do_scheduling_spy, executor_events_spy:
+            job.run()
+
+    @pytest.mark.long_running
+    @pytest.mark.parametrize("dag_id", ["test_mapped_classic", "test_mapped_taskflow"])
+    def test_mapped_dag(self, dag_id, session):
+        """End-to-end test of a simple mapped dag"""
+        # Use SequentialExecutor for more predictable test behaviour
+        from airflow.executors.sequential_executor import SequentialExecutor
+        from airflow.utils.dates import days_ago
+
+        self.dagbag.process_file(str(TEST_DAGS_FOLDER / f'{dag_id}.py'))
+        dag = self.dagbag.get_dag(dag_id)
+        assert dag
+        dr = dag.create_dagrun(
+            run_type=DagRunType.MANUAL,
+            start_date=timezone.utcnow(),
+            state=State.RUNNING,
+            execution_date=days_ago(2),
+            session=session,
+        )
+
+        executor = SequentialExecutor()
+
+        job = SchedulerJob(subdir=dag.fileloc, executor=executor)
+
+        self.run_scheduler_until_dagrun_terminal(job)
+
+        dr.refresh_from_db(session)
+        assert dr.state == DagRunState.SUCCESS
+
 
 @pytest.mark.xfail(reason="Work out where this goes")
 def test_task_with_upstream_skip_process_task_instances():

[airflow] 02/05: Expand mapped tasks in the Scheduler

Posted by bb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bbovenzi pushed a commit to branch mapped-task-drawer
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit c6da3ea1edc6057d54ea49178142c4a13d435ca5
Author: Ash Berlin-Taylor <as...@apache.org>
AuthorDate: Tue Feb 15 23:18:50 2022 +0000

    Expand mapped tasks in the Scheduler
    
    Technically this is done inside
    DagRun.task_instance_scheduling_decisions, but the only place that is
    currently called is the Scheduler
    
    The way we are getting `upstream_ti` to pass to expand_mapped_task is
    all sorts of wrong and will need fixing, I think the interface for that
    method is wrong and the mapped task should be responsible for finding
    the right upstream TI itself.
---
 airflow/models/dagrun.py         | 30 +++++++++++++++++++++++++++---
 tests/jobs/test_scheduler_job.py |  3 +++
 2 files changed, 30 insertions(+), 3 deletions(-)

diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index beb9b7c..69b003f 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -15,6 +15,7 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+import itertools
 import os
 import warnings
 from collections import defaultdict
@@ -45,6 +46,7 @@ from airflow import settings
 from airflow.configuration import conf as airflow_conf
 from airflow.exceptions import AirflowException, TaskNotFound
 from airflow.models.base import COLLATION_ARGS, ID_LEN, Base
+from airflow.models.mappedoperator import MappedOperator
 from airflow.models.taskinstance import TaskInstance as TI
 from airflow.models.tasklog import LogTemplate
 from airflow.stats import Stats
@@ -649,7 +651,7 @@ class DagRun(Base, LoggingMixin):
 
     def _get_ready_tis(
         self,
-        scheduleable_tasks: List[TI],
+        scheduleable_tis: List[TI],
         finished_tis: List[TI],
         session: Session,
     ) -> Tuple[List[TI], bool]:
@@ -657,11 +659,33 @@ class DagRun(Base, LoggingMixin):
         ready_tis: List[TI] = []
         changed_tis = False
 
-        if not scheduleable_tasks:
+        if not scheduleable_tis:
             return ready_tis, changed_tis
 
+        # If we expand TIs, we need a new list so that we iterate over them too. (We can't alter
+        # `scheduleable_tis` in place and have the `for` loop pick them up
+        expanded_tis: List[TI] = []
+
         # Check dependencies
-        for st in scheduleable_tasks:
+        for st in itertools.chain(scheduleable_tis, expanded_tis):
+
+            # Expansion of last resort! This is ideally handled in the mini-scheduler in LocalTaskJob, but if
+            # for any reason it wasn't, we need to expand it now
+            if st.map_index < 0 and st.task.is_mapped:
+                # HACK. This needs a better way, one that copes with multiple upstreams!
+                for ti in finished_tis:
+                    if st.task_id in ti.task.downstream_task_ids:
+                        upstream = ti
+
+                        assert isinstance(st.task, MappedOperator)
+                        new_tis = st.task.expand_mapped_task(upstream, session=session)
+                        assert new_tis[0] is st
+                        # Add the new TIs to the list to be checked
+                        for new_ti in new_tis[1:]:
+                            new_ti.task = st.task
+                        expanded_tis.extend(new_tis[1:])
+                        break
+
             old_state = st.state
             if st.are_dependencies_met(
                 dep_context=DepContext(flag_upstream_failed=True, finished_tis=finished_tis),
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index b803c2d..0284ead 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -3600,6 +3600,9 @@ class TestSchedulerJob:
 
         This needs a DagRun to be pre-created (it can be in running or queued state) as no more will be
         created as we turn off creating new DagRuns via setting USE_JOB_SCHEDULE to false
+
+        Note: This doesn't currently account for tasks that go into retry -- the scheduler would be detected
+        as idle in that circumstance
         """
         # Spy on _do_scheduling and _process_executor_events so we can notice
         # if nothing happened, and abort early! Given we are using

[airflow] 04/05: basic slide drawer

Posted by bb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bbovenzi pushed a commit to branch mapped-task-drawer
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit d98af9addcf52077cdf4d94cbd41349c29e6bdfd
Author: Brent Bovenzi <br...@gmail.com>
AuthorDate: Wed Feb 16 13:57:33 2022 -0500

    basic slide drawer
---
 airflow/www/static/js/tree/SidePanel.jsx      | 49 +++++++++++++++++
 airflow/www/static/js/tree/StatusBox.jsx      | 75 +++++++++++++++------------
 airflow/www/static/js/tree/Tree.jsx           | 25 ++++++---
 airflow/www/static/js/tree/renderTaskRows.jsx | 19 +++++--
 4 files changed, 125 insertions(+), 43 deletions(-)

diff --git a/airflow/www/static/js/tree/SidePanel.jsx b/airflow/www/static/js/tree/SidePanel.jsx
new file mode 100644
index 0000000..3515f1c
--- /dev/null
+++ b/airflow/www/static/js/tree/SidePanel.jsx
@@ -0,0 +1,49 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/* global moment */
+
+import React from 'react';
+import {
+  Box,
+  chakra,
+  Flex,
+  Text,
+} from '@chakra-ui/react';
+
+import { formatDateTime } from '../datetime_utils';
+
+const SidePanel = ({ instance: { runId, taskId, executionDate }, isOpen }) => (
+  <Box bg="gray.200" maxWidth={isOpen ? 300 : 0} minWidth={isOpen ? 300 : 0} transition="all 0.5s" position="relative" overflow="hidden">
+    <Flex right={isOpen ? 0 : -300} top={0} transition="right 0.5s, max-width 0.5s" width={300} flexDirection="column" m={2}>
+      <Text as="h4">
+        <chakra.span>Task Instance: </chakra.span>
+        <chakra.b>{taskId}</chakra.b>
+        <chakra.span> at </chakra.span>
+        <chakra.b>{formatDateTime(moment.utc(executionDate))}</chakra.b>
+      </Text>
+      <Text>
+        {/* eslint-disable-next-line max-len */}
+        Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Nunc vel risus commodo viverra maecenas accumsan. Ut porttitor leo a diam sollicitudin tempor id eu. Molestie at elementum eu facilisis sed odio morbi quis commodo. Facilisis leo vel fringilla est ullamcorper eget nulla facilisi etiam. Est sit amet facilisis magna etiam tempor orci eu. Id semper risus in hendrerit gravida rutrum. Ac odio tempor orci dapibus  [...]
+      </Text>
+    </Flex>
+  </Box>
+);
+
+export default SidePanel;
diff --git a/airflow/www/static/js/tree/StatusBox.jsx b/airflow/www/static/js/tree/StatusBox.jsx
index c22f0b2..dce0fda 100644
--- a/airflow/www/static/js/tree/StatusBox.jsx
+++ b/airflow/www/static/js/tree/StatusBox.jsx
@@ -30,12 +30,21 @@ import { callModal } from '../dag';
 import InstanceTooltip from './InstanceTooltip';
 
 const StatusBox = ({
-  group, instance, containerRef, extraLinks = [], ...rest
+  group, instance, containerRef, extraLinks = [], onSelectInstance, ...rest
 }) => {
   const {
     executionDate, taskId, tryNumber = 0, operator, runId,
   } = instance;
-  const onClick = () => executionDate && callModal(taskId, executionDate, extraLinks, tryNumber, operator === 'SubDagOperator' || undefined, runId);
+
+  const onOpenModal = () => executionDate && callModal(taskId, executionDate, extraLinks, tryNumber, operator === 'SubDagOperator' || undefined, runId);
+  const onClick = () => {
+    if (group.isMapped) {
+      onSelectInstance(instance);
+    } else {
+      onSelectInstance({});
+      onOpenModal();
+    }
+  };
 
   // Fetch the corresponding column element and set its background color when hovering
   const onMouseOver = () => {
@@ -48,37 +57,39 @@ const StatusBox = ({
   };
 
   return (
-    <Tooltip
-      label={<InstanceTooltip instance={instance} group={group} />}
-      fontSize="md"
-      portalProps={{ containerRef }}
-      hasArrow
-      placement="top"
-      openDelay={400}
-    >
-      <Flex
-        p="1px"
-        my="1px"
-        mx="2px"
-        justifyContent="center"
-        alignItems="center"
-        onClick={onClick}
-        cursor={!group.children && 'pointer'}
-        data-testid="task-instance"
-        zIndex={1}
-        onMouseEnter={onMouseOver}
-        onMouseLeave={onMouseLeave}
-        {...rest}
+    <>
+      <Tooltip
+        label={<InstanceTooltip instance={instance} group={group} />}
+        fontSize="md"
+        portalProps={{ containerRef }}
+        hasArrow
+        placement="top"
+        openDelay={400}
       >
-        <Box
-          width="10px"
-          height="10px"
-          backgroundColor={stateColors[instance.state] || 'white'}
-          borderRadius="2px"
-          borderWidth={instance.state ? 0 : 1}
-        />
-      </Flex>
-    </Tooltip>
+        <Flex
+          p="1px"
+          my="1px"
+          mx="2px"
+          justifyContent="center"
+          alignItems="center"
+          onClick={onClick}
+          cursor={!group.children && 'pointer'}
+          data-testid="task-instance"
+          zIndex={1}
+          onMouseEnter={onMouseOver}
+          onMouseLeave={onMouseLeave}
+          {...rest}
+        >
+          <Box
+            width="10px"
+            height="10px"
+            backgroundColor={stateColors[instance.state] || 'white'}
+            borderRadius="2px"
+            borderWidth={instance.state ? 0 : 1}
+          />
+        </Flex>
+      </Tooltip>
+    </>
   );
 };
 
diff --git a/airflow/www/static/js/tree/Tree.jsx b/airflow/www/static/js/tree/Tree.jsx
index 450e002..095adbd 100644
--- a/airflow/www/static/js/tree/Tree.jsx
+++ b/airflow/www/static/js/tree/Tree.jsx
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-import React, { useRef, useEffect } from 'react';
+import React, { useRef, useEffect, useState } from 'react';
 import {
   Table,
   Tbody,
@@ -28,16 +28,19 @@ import {
   Spinner,
   Text,
   Thead,
+  Flex,
 } from '@chakra-ui/react';
 
 import useTreeData from './useTreeData';
 import renderTaskRows from './renderTaskRows';
 import DagRuns from './dagRuns';
+import SidePanel from './SidePanel';
 
 const Tree = () => {
   const containerRef = useRef();
   const scrollRef = useRef();
   const { data: { groups = {} }, isRefreshOn, onToggleRefresh } = useTreeData();
+  const [selectedInstance, setSelectedInstance] = useState({});
 
   useEffect(() => {
     // Set initial scroll to far right if it is scrollable
@@ -47,6 +50,13 @@ const Tree = () => {
     }
   }, []);
 
+  const { runId, taskId } = selectedInstance;
+  const onSelectInstance = (newInstance) => (
+    (newInstance.runId === runId && newInstance.taskId === taskId)
+      ? setSelectedInstance({})
+      : setSelectedInstance(newInstance)
+  );
+
   return (
     <Box position="relative" ref={containerRef}>
       <FormControl display="flex" alignItems="center" justifyContent="flex-end" width="100%">
@@ -58,17 +68,20 @@ const Tree = () => {
       </FormControl>
       <Text transform="rotate(-90deg)" position="absolute" left="-6px" top="130px">Runs</Text>
       <Text transform="rotate(-90deg)" position="absolute" left="-6px" top="190px">Tasks</Text>
-      <Box px="24px">
-        <Box position="relative" width="100%" overflowX="auto" ref={scrollRef}>
-          <Table>
+      <Box pl="24px">
+        <Flex position="relative" flexDirection="row" justifyContent="space-between" overflow="hidden">
+          <Table mr="24px" overflowX="auto" ref={scrollRef} height={0}>
             <Thead>
               <DagRuns containerRef={containerRef} />
             </Thead>
             <Tbody>
-              {renderTaskRows({ task: groups, containerRef })}
+              {renderTaskRows({
+                task: groups, containerRef, onSelectInstance,
+              })}
             </Tbody>
           </Table>
-        </Box>
+          <SidePanel isOpen={!!runId} instance={selectedInstance} />
+        </Flex>
       </Box>
     </Box>
   );
diff --git a/airflow/www/static/js/tree/renderTaskRows.jsx b/airflow/www/static/js/tree/renderTaskRows.jsx
index 224885b..23e52dc 100644
--- a/airflow/www/static/js/tree/renderTaskRows.jsx
+++ b/airflow/www/static/js/tree/renderTaskRows.jsx
@@ -40,7 +40,7 @@ import getMetaValue from '../meta_value';
 const dagId = getMetaValue('dag_id');
 
 const renderTaskRows = ({
-  task, containerRef, level = 0, isParentOpen,
+  task, containerRef, level = 0, isParentOpen, onSelectInstance,
 }) => task.children.map((t) => (
   <Row
     key={t.id}
@@ -49,6 +49,7 @@ const renderTaskRows = ({
     containerRef={containerRef}
     prevTaskId={task.id}
     isParentOpen={isParentOpen}
+    onSelectInstance={onSelectInstance}
   />
 ));
 
@@ -85,7 +86,9 @@ const TaskName = ({
   </Box>
 );
 
-const TaskInstances = ({ task, containerRef, dagRuns }) => (
+const TaskInstances = ({
+  task, containerRef, dagRuns, onSelectInstance,
+}) => (
   <Flex justifyContent="flex-end">
     {dagRuns.map((run) => {
       // Check if an instance exists for the run, or return an empty box
@@ -98,6 +101,7 @@ const TaskInstances = ({ task, containerRef, dagRuns }) => (
             containerRef={containerRef}
             extraLinks={task.extraLinks}
             group={task}
+            onSelectInstance={onSelectInstance}
           />
         )
         : <Box key={`${run.runId}-${task.id}`} width="18px" data-testid="blank-task" />;
@@ -106,7 +110,7 @@ const TaskInstances = ({ task, containerRef, dagRuns }) => (
 );
 
 const Row = ({
-  task, containerRef, level, prevTaskId, isParentOpen = true,
+  task, containerRef, level, prevTaskId, isParentOpen = true, onSelectInstance,
 }) => {
   const { data: { dagRuns = [] } } = useTreeData();
   const isGroup = !!task.children;
@@ -162,13 +166,18 @@ const Row = ({
         <Td width={0} p={0} borderBottom={0} />
         <Td p={0} align="right" _groupHover={{ backgroundColor: 'rgba(113, 128, 150, 0.1)' }} transition="background-color 0.2s" borderBottom={0}>
           <Collapse in={isFullyOpen}>
-            <TaskInstances dagRuns={dagRuns} task={task} containerRef={containerRef} />
+            <TaskInstances
+              dagRuns={dagRuns}
+              task={task}
+              containerRef={containerRef}
+              onSelectInstance={onSelectInstance}
+            />
           </Collapse>
         </Td>
       </Tr>
       {isGroup && (
         renderTaskRows({
-          task, containerRef, level: level + 1, isParentOpen: isOpen,
+          task, containerRef, level: level + 1, isParentOpen: isOpen, onSelectInstance,
         })
       )}
     </>

[airflow] 03/05: make UI and tree work with mapped tasks

Posted by bb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bbovenzi pushed a commit to branch mapped-task-drawer
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 911aaeb4716a0c8964e63212247a41b945381dea
Author: Brent Bovenzi <br...@gmail.com>
AuthorDate: Wed Feb 16 11:27:29 2022 -0500

    make UI and tree work with mapped tasks
---
 airflow/www/static/js/tree/InstanceTooltip.jsx | 64 ++++++++++++++++++------
 airflow/www/static/js/tree/renderTaskRows.jsx  |  7 ++-
 airflow/www/utils.py                           | 68 +++++++++++++++++++++++++-
 airflow/www/views.py                           | 28 ++++++-----
 4 files changed, 138 insertions(+), 29 deletions(-)

diff --git a/airflow/www/static/js/tree/InstanceTooltip.jsx b/airflow/www/static/js/tree/InstanceTooltip.jsx
index a1ef192..612905b 100644
--- a/airflow/www/static/js/tree/InstanceTooltip.jsx
+++ b/airflow/www/static/js/tree/InstanceTooltip.jsx
@@ -24,30 +24,33 @@ import { Box, Text } from '@chakra-ui/react';
 
 import { formatDateTime, getDuration, formatDuration } from '../datetime_utils';
 
+const STATES = [
+  ['success', 0],
+  ['failed', 0],
+  ['upstream_failed', 0],
+  ['up_for_retry', 0],
+  ['up_for_reschedule', 0],
+  ['running', 0],
+  ['deferred', 0],
+  ['sensing', 0],
+  ['queued', 0],
+  ['scheduled', 0],
+  ['skipped', 0],
+  ['no_status', 0],
+];
+
 const InstanceTooltip = ({
   group,
   instance: {
-    duration, operator, startDate, endDate, state, taskId, runId,
+    duration, operator, startDate, endDate, state, taskId, runId, mappedStates,
   },
 }) => {
   const isGroup = !!group.children;
   const groupSummary = [];
+  const mapSummary = [];
 
   if (isGroup) {
-    const numMap = new Map([
-      ['success', 0],
-      ['failed', 0],
-      ['upstream_failed', 0],
-      ['up_for_retry', 0],
-      ['up_for_reschedule', 0],
-      ['running', 0],
-      ['deferred', 0],
-      ['sensing', 0],
-      ['queued', 0],
-      ['scheduled', 0],
-      ['skipped', 0],
-      ['no_status', 0],
-    ]);
+    const numMap = new Map(STATES);
     group.children.forEach((child) => {
       const taskInstance = child.instances.find((ti) => ti.runId === runId);
       if (taskInstance) {
@@ -69,6 +72,26 @@ const InstanceTooltip = ({
     });
   }
 
+  if (group.isMapped && mappedStates) {
+    const numMap = new Map(STATES);
+    mappedStates.forEach((s) => {
+      const stateKey = s || 'no_status';
+      if (numMap.has(stateKey)) numMap.set(stateKey, numMap.get(stateKey) + 1);
+    });
+    numMap.forEach((key, val) => {
+      if (key > 0) {
+        mapSummary.push(
+          // eslint-disable-next-line react/no-array-index-key
+          <Text key={val} ml="10px">
+            {val}
+            {': '}
+            {key}
+          </Text>,
+        );
+      }
+    });
+  }
+
   const taskIdTitle = isGroup ? 'Task Group Id: ' : 'Task Id: ';
 
   return (
@@ -88,6 +111,17 @@ const InstanceTooltip = ({
           {groupSummary}
         </>
       )}
+      {group.isMapped && (
+        <>
+          <br />
+          <Text as="strong">
+            {mappedStates.length}
+            {' '}
+            Tasks Mapped
+          </Text>
+          {mapSummary}
+        </>
+      )}
       <br />
       <Text>
         {taskIdTitle}
diff --git a/airflow/www/static/js/tree/renderTaskRows.jsx b/airflow/www/static/js/tree/renderTaskRows.jsx
index 8dcd88d..224885b 100644
--- a/airflow/www/static/js/tree/renderTaskRows.jsx
+++ b/airflow/www/static/js/tree/renderTaskRows.jsx
@@ -53,7 +53,7 @@ const renderTaskRows = ({
 ));
 
 const TaskName = ({
-  isGroup, onToggle, isOpen, level, taskName,
+  isGroup = false, isMapped = false, onToggle, isOpen, level, taskName,
 }) => (
   <Box _groupHover={{ backgroundColor: 'rgba(113, 128, 150, 0.1)' }} transition="background-color 0.2s">
     <Flex
@@ -74,6 +74,9 @@ const TaskName = ({
         isTruncated
       >
         {taskName}
+        {isMapped && (
+          ' [ ]'
+        )}
       </Text>
       {isGroup && (
         isOpen ? <FiChevronDown data-testid="open-group" /> : <FiChevronUp data-testid="closed-group" />
@@ -107,6 +110,7 @@ const Row = ({
 }) => {
   const { data: { dagRuns = [] } } = useTreeData();
   const isGroup = !!task.children;
+
   const taskName = prevTaskId ? task.id.replace(`${prevTaskId}.`, '') : task.id;
 
   const storageKey = `${dagId}-open-groups`;
@@ -148,6 +152,7 @@ const Row = ({
             <TaskName
               onToggle={onToggle}
               isGroup={isGroup}
+              isMapped={task.isMapped}
               taskName={taskName}
               isOpen={isOpen}
               level={level}
diff --git a/airflow/www/utils.py b/airflow/www/utils.py
index d9ec10f..ed765d4 100644
--- a/airflow/www/utils.py
+++ b/airflow/www/utils.py
@@ -36,9 +36,11 @@ from pendulum.datetime import DateTime
 from pygments import highlight, lexers
 from pygments.formatters import HtmlFormatter
 from sqlalchemy.ext.associationproxy import AssociationProxy
+from sqlalchemy.orm import Session
 
 from airflow import models
 from airflow.models import errors
+from airflow.models.taskinstance import TaskInstance
 from airflow.utils import timezone
 from airflow.utils.code_utils import get_python_source
 from airflow.utils.json import AirflowJsonEncoder
@@ -53,11 +55,13 @@ def datetime_to_string(value: Optional[DateTime]) -> Optional[str]:
     return value.isoformat()
 
 
-def encode_ti(task_instance: Optional[models.TaskInstance]) -> Optional[Dict[str, Any]]:
+def encode_ti(
+    task_instance: Optional[TaskInstance], is_mapped: Optional[bool], session: Session
+) -> Optional[Dict[str, Any]]:
     if not task_instance:
         return None
 
-    return {
+    summary = {
         'task_id': task_instance.task_id,
         'dag_id': task_instance.dag_id,
         'run_id': task_instance.run_id,
@@ -70,6 +74,66 @@ def encode_ti(task_instance: Optional[models.TaskInstance]) -> Optional[Dict[str
         'try_number': task_instance.try_number,
     }
 
+    def get_mapped_summary(task_instances):
+        priority = [
+            'failed',
+            'upstream_failed',
+            'up_for_retry',
+            'up_for_reschedule',
+            'queued',
+            'scheduled',
+            'deferred',
+            'sensing',
+            'running',
+            'shutdown',
+            'restarting',
+            'removed',
+            'no_status',
+            'success',
+            'skipped',
+        ]
+
+        mapped_states = [ti.state for ti in task_instances]
+
+        group_state = None
+        for state in priority:
+            if state in mapped_states:
+                group_state = state
+                break
+
+        group_start_date = datetime_to_string(
+            min((ti.start_date for ti in task_instances if ti.start_date), default=None)
+        )
+        group_end_date = datetime_to_string(
+            max((ti.end_date for ti in task_instances if ti.end_date), default=None)
+        )
+
+        return {
+            'task_id': task_instance.task_id,
+            'run_id': task_instance.run_id,
+            'state': group_state,
+            'start_date': group_start_date,
+            'end_date': group_end_date,
+            'mapped_states': mapped_states,
+            'operator': task_instance.operator,
+            'execution_date': datetime_to_string(task_instance.execution_date),
+            'try_number': task_instance.try_number,
+        }
+
+    if is_mapped:
+        return get_mapped_summary(
+            session.query(TaskInstance)
+            .filter(
+                TaskInstance.dag_id == task_instance.dag_id,
+                TaskInstance.run_id == task_instance.run_id,
+                TaskInstance.task_id == task_instance.task_id,
+                TaskInstance.map_index >= 0,
+            )
+            .all()
+        )
+
+    return summary
+
 
 def encode_dag_run(dag_run: Optional[models.DagRun]) -> Optional[Dict[str, Any]]:
     if not dag_run:
diff --git a/airflow/www/views.py b/airflow/www/views.py
index c0f9cb4..6d27f97 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -99,7 +99,7 @@ from airflow.jobs.base_job import BaseJob
 from airflow.jobs.scheduler_job import SchedulerJob
 from airflow.jobs.triggerer_job import TriggererJob
 from airflow.models import DAG, Connection, DagModel, DagTag, Log, SlaMiss, TaskFail, XCom, errors
-from airflow.models.baseoperator import BaseOperator
+from airflow.models.abstractoperator import AbstractOperator
 from airflow.models.dagcode import DagCode
 from airflow.models.dagrun import DagRun, DagRunType
 from airflow.models.serialized_dag import SerializedDagModel
@@ -223,23 +223,30 @@ def get_date_time_num_runs_dag_runs_form_data(www_request, session, dag):
     }
 
 
-def task_group_to_tree(task_item_or_group, dag, dag_runs, tis):
+def task_group_to_tree(task_item_or_group, dag, dag_runs, tis, session):
     """
     Create a nested dict representation of this TaskGroup and its children used to construct
     the Graph.
     """
-    if isinstance(task_item_or_group, BaseOperator):
+    if isinstance(task_item_or_group, AbstractOperator):
         return {
             'id': task_item_or_group.task_id,
-            'instances': [wwwutils.encode_ti(ti) for ti in tis if ti.task_id == task_item_or_group.task_id],
+            'instances': [
+                wwwutils.encode_ti(ti, task_item_or_group.is_mapped, session)
+                for ti in tis
+                if ti.task_id == task_item_or_group.task_id
+            ],
             'label': task_item_or_group.label,
-            'extra_links': task_item_or_group.extra_links,
+            'extra_links': [],
+            'is_mapped': task_item_or_group.is_mapped,
         }
 
     # Task Group
     task_group = task_item_or_group
 
-    children = [task_group_to_tree(child, dag, dag_runs, tis) for child in task_group.children.values()]
+    children = [
+        task_group_to_tree(child, dag, dag_runs, tis, session) for child in task_group.children.values()
+    ]
 
     def get_summary(dag_run, children):
         priority = [
@@ -307,7 +314,7 @@ def task_group_to_dict(task_item_or_group):
     Create a nested dict representation of this TaskGroup and its children used to construct
     the Graph.
     """
-    if isinstance(task_item_or_group, BaseOperator):
+    if isinstance(task_item_or_group, AbstractOperator):
         return {
             'id': task_item_or_group.task_id,
             'value': {
@@ -423,7 +430,7 @@ def dag_edges(dag):
 
     def collect_edges(task_group):
         """Update edges_to_add and edges_to_skip according to TaskGroups."""
-        if isinstance(task_group, BaseOperator):
+        if isinstance(task_group, AbstractOperator):
             return
 
         for target_id in task_group.downstream_group_ids:
@@ -2465,7 +2472,7 @@ class Airflow(AirflowBaseView):
         tis = dag.get_task_instances(start_date=min_date, end_date=base_date, session=session)
 
         data = {
-            'groups': task_group_to_tree(dag.task_group, dag, dag_runs, tis),
+            'groups': task_group_to_tree(dag.task_group, dag, dag_runs, tis, session),
             'dag_runs': encoded_runs,
         }
 
@@ -3246,7 +3253,6 @@ class Airflow(AirflowBaseView):
             (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
         ]
     )
-    @action_logging
     def tree_data(self):
         """Returns tree data"""
         dag_id = request.args.get('dag_id')
@@ -3284,7 +3290,7 @@ class Airflow(AirflowBaseView):
             min_date = min(dag_run_dates, default=None)
             tis = dag.get_task_instances(start_date=min_date, end_date=base_date, session=session)
             data = {
-                'groups': task_group_to_tree(dag.task_group, dag, dag_runs, tis),
+                'groups': task_group_to_tree(dag.task_group, dag, dag_runs, tis, session),
                 'dag_runs': encoded_runs,
             }