You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bb...@apache.org on 2022/02/22 17:20:56 UTC

[airflow] branch show-mapped-task-in-tree-view created (now c646171)

This is an automated email from the ASF dual-hosted git repository.

bbovenzi pushed a change to branch show-mapped-task-in-tree-view
in repository https://gitbox.apache.org/repos/asf/airflow.git.


      at c646171  add graph tooltip and map count

This branch includes the following new commits:

     new 0a315d8  Expand mapped tasks in the Scheduler
     new 9616be4  make UI and tree work with mapped tasks
     new c646171  add graph tooltip and map count

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[airflow] 03/03: add graph tooltip and map count

Posted by bb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bbovenzi pushed a commit to branch show-mapped-task-in-tree-view
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit c646171b7bcbfde1e45dfdc88087b0e9113df2f2
Author: Brent Bovenzi <br...@gmail.com>
AuthorDate: Tue Feb 22 12:17:29 2022 -0500

    add graph tooltip and map count
---
 airflow/models/dagrun.py                |  48 ++++++------
 airflow/www/static/js/graph.js          |  42 ++++++++--
 airflow/www/static/js/task_instances.js |  32 ++++++++
 airflow/www/utils.py                    | 135 +++++++++++++++++---------------
 airflow/www/views.py                    |  12 ++-
 5 files changed, 176 insertions(+), 93 deletions(-)

diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 69b003f..5170ad3 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -43,6 +43,7 @@ from sqlalchemy.orm.session import Session
 from sqlalchemy.sql.expression import false, select, true
 
 from airflow import settings
+from airflow.callbacks.callback_requests import DagCallbackRequest
 from airflow.configuration import conf as airflow_conf
 from airflow.exceptions import AirflowException, TaskNotFound
 from airflow.models.base import COLLATION_ARGS, ID_LEN, Base
@@ -52,7 +53,7 @@ from airflow.models.tasklog import LogTemplate
 from airflow.stats import Stats
 from airflow.ti_deps.dep_context import DepContext
 from airflow.ti_deps.dependencies_states import SCHEDULEABLE_STATES
-from airflow.utils import callback_requests, timezone
+from airflow.utils import timezone
 from airflow.utils.helpers import is_container
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.session import NEW_SESSION, provide_session
@@ -487,7 +488,7 @@ class DagRun(Base, LoggingMixin):
     @provide_session
     def update_state(
         self, session: Session = NEW_SESSION, execute_callbacks: bool = True
-    ) -> Tuple[List[TI], Optional[callback_requests.DagCallbackRequest]]:
+    ) -> Tuple[List[TI], Optional[DagCallbackRequest]]:
         """
         Determines the overall state of the DagRun based on the state
         of its TaskInstances.
@@ -499,7 +500,7 @@ class DagRun(Base, LoggingMixin):
             needs to be executed
         """
         # Callback to execute in case of Task Failures
-        callback: Optional[callback_requests.DagCallbackRequest] = None
+        callback: Optional[DagCallbackRequest] = None
 
         start_dttm = timezone.utcnow()
         self.last_scheduling_decision = start_dttm
@@ -535,7 +536,7 @@ class DagRun(Base, LoggingMixin):
             if execute_callbacks:
                 dag.handle_callback(self, success=False, reason='task_failure', session=session)
             elif dag.has_on_failure_callback:
-                callback = callback_requests.DagCallbackRequest(
+                callback = DagCallbackRequest(
                     full_filepath=dag.fileloc,
                     dag_id=self.dag_id,
                     run_id=self.run_id,
@@ -550,7 +551,7 @@ class DagRun(Base, LoggingMixin):
             if execute_callbacks:
                 dag.handle_callback(self, success=True, reason='success', session=session)
             elif dag.has_on_success_callback:
-                callback = callback_requests.DagCallbackRequest(
+                callback = DagCallbackRequest(
                     full_filepath=dag.fileloc,
                     dag_id=self.dag_id,
                     run_id=self.run_id,
@@ -571,7 +572,7 @@ class DagRun(Base, LoggingMixin):
             if execute_callbacks:
                 dag.handle_callback(self, success=False, reason='all_tasks_deadlocked', session=session)
             elif dag.has_on_failure_callback:
-                callback = callback_requests.DagCallbackRequest(
+                callback = DagCallbackRequest(
                     full_filepath=dag.fileloc,
                     dag_id=self.dag_id,
                     run_id=self.run_id,
@@ -651,7 +652,7 @@ class DagRun(Base, LoggingMixin):
 
     def _get_ready_tis(
         self,
-        scheduleable_tis: List[TI],
+        schedulable_tis: List[TI],
         finished_tis: List[TI],
         session: Session,
     ) -> Tuple[List[TI], bool]:
@@ -659,41 +660,40 @@ class DagRun(Base, LoggingMixin):
         ready_tis: List[TI] = []
         changed_tis = False
 
-        if not scheduleable_tis:
+        if not schedulable_tis:
             return ready_tis, changed_tis
 
         # If we expand TIs, we need a new list so that we iterate over them too. (We can't alter
-        # `scheduleable_tis` in place and have the `for` loop pick them up
+        # `schedulable_tis` in place and have the `for` loop pick them up
         expanded_tis: List[TI] = []
 
         # Check dependencies
-        for st in itertools.chain(scheduleable_tis, expanded_tis):
+        for schedulable in itertools.chain(schedulable_tis, expanded_tis):
 
             # Expansion of last resort! This is ideally handled in the mini-scheduler in LocalTaskJob, but if
             # for any reason it wasn't, we need to expand it now
-            if st.map_index < 0 and st.task.is_mapped:
+            if schedulable.map_index < 0 and schedulable.task.is_mapped:
                 # HACK. This needs a better way, one that copes with multiple upstreams!
                 for ti in finished_tis:
-                    if st.task_id in ti.task.downstream_task_ids:
-                        upstream = ti
-
-                        assert isinstance(st.task, MappedOperator)
-                        new_tis = st.task.expand_mapped_task(upstream, session=session)
-                        assert new_tis[0] is st
-                        # Add the new TIs to the list to be checked
-                        for new_ti in new_tis[1:]:
-                            new_ti.task = st.task
+                    if schedulable.task_id in ti.task.downstream_task_ids:
+
+                        assert isinstance(schedulable.task, MappedOperator)
+                        new_tis = schedulable.task.expand_mapped_task(self.run_id, session=session)
+                        if schedulable.state == TaskInstanceState.SKIPPED:
+                            # Task is now skipped (likely cos upstream returned 0 tasks
+                            continue
+                        assert new_tis[0] is schedulable
                         expanded_tis.extend(new_tis[1:])
                         break
 
-            old_state = st.state
-            if st.are_dependencies_met(
+            old_state = schedulable.state
+            if schedulable.are_dependencies_met(
                 dep_context=DepContext(flag_upstream_failed=True, finished_tis=finished_tis),
                 session=session,
             ):
-                ready_tis.append(st)
+                ready_tis.append(schedulable)
             else:
-                old_states[st.key] = old_state
+                old_states[schedulable.key] = old_state
 
         # Check if any ti changed state
         tis_filter = TI.filter_for_tis(old_states.keys())
diff --git a/airflow/www/static/js/graph.js b/airflow/www/static/js/graph.js
index 254db5a..258004a 100644
--- a/airflow/www/static/js/graph.js
+++ b/airflow/www/static/js/graph.js
@@ -82,6 +82,33 @@ const render = dagreD3.render();
 const svg = d3.select('#graph-svg');
 let innerSvg = d3.select('#graph-svg g');
 
+const updateNodes = (node, instances) => {
+  const value = {
+    ...node.value,
+    label: tasks[node.id] && tasks[node.id].is_mapped
+      ? `${node.value.label} [${(instances[node.id].mapped_states && instances[node.id].mapped_states.length) || ' '}]`
+      : node.value.label,
+  };
+
+  if (g.node(node.id)) {
+    g.node(node.id).label = value.label;
+  }
+
+  if (node.children) {
+    return {
+      ...node,
+      value,
+      children: node.children.map((n) => updateNodes(n, instances)),
+    };
+  }
+  return {
+    ...node,
+    value,
+  };
+};
+
+let updatedNodes = updateNodes(nodes, taskInstances);
+
 // Remove the node with this nodeId from g.
 function removeNode(nodeId) {
   if (g.hasNode(nodeId)) {
@@ -377,6 +404,9 @@ function handleRefresh() {
           if (isFinal) {
             $('#auto_refresh').prop('checked', false);
             clearInterval(refreshInterval);
+            if (JSON.stringify(nodes) !== JSON.stringify(updateNodes)) {
+              draw();
+            }
           }
         }
         prevTis = tis;
@@ -476,15 +506,17 @@ function groupTooltip(node, tis) {
 // Assigning css classes based on state to nodes
 // Initiating the tooltips
 function updateNodesStates(tis) {
+  updatedNodes = updateNodes(nodes, tis);
   g.nodes().forEach((nodeId) => {
-    const { elem } = g.node(nodeId);
+    const node = g.node(nodeId);
+    const { elem } = node;
+    const taskId = nodeId;
+
     if (elem) {
       const classes = `node enter ${getNodeState(nodeId, tis)}`;
       elem.setAttribute('class', classes);
       elem.setAttribute('data-toggle', 'tooltip');
 
-      const taskId = nodeId;
-      const node = g.node(nodeId);
       elem.onmouseover = (evt) => {
         let tt;
         if (taskId in tis) {
@@ -713,10 +745,10 @@ const focusNodeId = localStorage.getItem(focusedGroupKey(dagId));
 const expandedGroups = getSavedGroups(dagId);
 
 // Always expand the root node
-expandGroup(null, nodes);
+expandGroup(null, updatedNodes);
 
 // Expand the node that were previously expanded
-expandSavedGroups(expandedGroups, nodes);
+expandSavedGroups(expandedGroups, updatedNodes);
 
 // Draw once after all groups have been expanded
 draw();
diff --git a/airflow/www/static/js/task_instances.js b/airflow/www/static/js/task_instances.js
index 13f3a9d..e123bd5 100644
--- a/airflow/www/static/js/task_instances.js
+++ b/airflow/www/static/js/task_instances.js
@@ -69,6 +69,34 @@ export default function tiTooltip(ti, { includeTryNumber = false } = {}) {
   if (ti.state !== undefined) {
     tt += `<strong>Status:</strong> ${escapeHtml(ti.state)}<br><br>`;
   }
+  if (ti.mapped_states) {
+    const STATES = [
+      ['success', 0],
+      ['failed', 0],
+      ['upstream_failed', 0],
+      ['up_for_retry', 0],
+      ['up_for_reschedule', 0],
+      ['running', 0],
+      ['deferred', 0],
+      ['sensing', 0],
+      ['queued', 0],
+      ['scheduled', 0],
+      ['skipped', 0],
+      ['no_status', 0],
+    ];
+    const numMap = new Map(STATES);
+    ti.mapped_states.forEach((s) => {
+      const stateKey = s || 'no_status';
+      if (numMap.has(stateKey)) numMap.set(stateKey, numMap.get(stateKey) + 1);
+    });
+    tt += `<strong>${escapeHtml(ti.mapped_states.length)} Tasks Mapped</strong><br />`;
+    numMap.forEach((key, val) => {
+      if (key > 0) {
+        tt += `<span style="margin-left: 15px">${escapeHtml(val)}: ${escapeHtml(key)}</span><br />`;
+      }
+    });
+    tt += '<br />';
+  }
   if (ti.task_id !== undefined) {
     tt += `Task_id: ${escapeHtml(ti.task_id)}<br>`;
   }
@@ -76,6 +104,10 @@ export default function tiTooltip(ti, { includeTryNumber = false } = {}) {
   if (ti.run_id !== undefined) {
     tt += `Run Id: <nobr>${escapeHtml(ti.run_id)}</nobr><br>`;
   }
+  // Show mapped index for specific child instance, but not for a summary instance
+  if (ti.map_index >= 0 && !ti.mapped_states) {
+    tt += `Map Index: ${escapeHtml(ti.map_index)}<br>`;
+  }
   if (ti.operator !== undefined) {
     tt += `Operator: ${escapeHtml(ti.operator)}<br>`;
   }
diff --git a/airflow/www/utils.py b/airflow/www/utils.py
index ed765d4..ef08f8e 100644
--- a/airflow/www/utils.py
+++ b/airflow/www/utils.py
@@ -43,6 +43,7 @@ from airflow.models import errors
 from airflow.models.taskinstance import TaskInstance
 from airflow.utils import timezone
 from airflow.utils.code_utils import get_python_source
+from airflow.utils.helpers import alchemy_to_dict
 from airflow.utils.json import AirflowJsonEncoder
 from airflow.utils.state import State
 from airflow.www.forms import DateTimeWithTimezoneField
@@ -55,13 +56,83 @@ def datetime_to_string(value: Optional[DateTime]) -> Optional[str]:
     return value.isoformat()
 
 
+def get_mapped_instances(task_instance, session):
+    return (
+        session.query(TaskInstance)
+        .filter(
+            TaskInstance.dag_id == task_instance.dag_id,
+            TaskInstance.run_id == task_instance.run_id,
+            TaskInstance.task_id == task_instance.task_id,
+            TaskInstance.map_index >= 0,
+        )
+        .all()
+    )
+
+
+def get_instance_with_map(task_instance, session):
+    if task_instance.map_index == -1:
+        return alchemy_to_dict(task_instance)
+    mapped_instances = get_mapped_instances(task_instance, session)
+    return get_mapped_summary(task_instance, mapped_instances)
+
+
+def get_mapped_summary(parent_instance, task_instances):
+    priority = [
+        'failed',
+        'upstream_failed',
+        'up_for_retry',
+        'up_for_reschedule',
+        'queued',
+        'scheduled',
+        'deferred',
+        'sensing',
+        'running',
+        'shutdown',
+        'restarting',
+        'removed',
+        'no_status',
+        'success',
+        'skipped',
+    ]
+
+    mapped_states = [ti.state for ti in task_instances]
+
+    group_state = None
+    for state in priority:
+        if state in mapped_states:
+            group_state = state
+            break
+
+    group_start_date = datetime_to_string(
+        min((ti.start_date for ti in task_instances if ti.start_date), default=None)
+    )
+    group_end_date = datetime_to_string(
+        max((ti.end_date for ti in task_instances if ti.end_date), default=None)
+    )
+
+    return {
+        'task_id': parent_instance.task_id,
+        'run_id': parent_instance.run_id,
+        'state': group_state,
+        'start_date': group_start_date,
+        'end_date': group_end_date,
+        'mapped_states': mapped_states,
+        'operator': parent_instance.operator,
+        'execution_date': datetime_to_string(parent_instance.execution_date),
+        'try_number': parent_instance.try_number,
+    }
+
+
 def encode_ti(
-    task_instance: Optional[TaskInstance], is_mapped: Optional[bool], session: Session
+    task_instance: Optional[TaskInstance], is_mapped: Optional[bool], session: Optional[Session]
 ) -> Optional[Dict[str, Any]]:
     if not task_instance:
         return None
 
-    summary = {
+    if is_mapped:
+        return get_mapped_summary(task_instance, task_instances=get_mapped_instances(task_instance, session))
+
+    return {
         'task_id': task_instance.task_id,
         'dag_id': task_instance.dag_id,
         'run_id': task_instance.run_id,
@@ -74,66 +145,6 @@ def encode_ti(
         'try_number': task_instance.try_number,
     }
 
-    def get_mapped_summary(task_instances):
-        priority = [
-            'failed',
-            'upstream_failed',
-            'up_for_retry',
-            'up_for_reschedule',
-            'queued',
-            'scheduled',
-            'deferred',
-            'sensing',
-            'running',
-            'shutdown',
-            'restarting',
-            'removed',
-            'no_status',
-            'success',
-            'skipped',
-        ]
-
-        mapped_states = [ti.state for ti in task_instances]
-
-        group_state = None
-        for state in priority:
-            if state in mapped_states:
-                group_state = state
-                break
-
-        group_start_date = datetime_to_string(
-            min((ti.start_date for ti in task_instances if ti.start_date), default=None)
-        )
-        group_end_date = datetime_to_string(
-            max((ti.end_date for ti in task_instances if ti.end_date), default=None)
-        )
-
-        return {
-            'task_id': task_instance.task_id,
-            'run_id': task_instance.run_id,
-            'state': group_state,
-            'start_date': group_start_date,
-            'end_date': group_end_date,
-            'mapped_states': mapped_states,
-            'operator': task_instance.operator,
-            'execution_date': datetime_to_string(task_instance.execution_date),
-            'try_number': task_instance.try_number,
-        }
-
-    if is_mapped:
-        return get_mapped_summary(
-            session.query(TaskInstance)
-            .filter(
-                TaskInstance.dag_id == task_instance.dag_id,
-                TaskInstance.run_id == task_instance.run_id,
-                TaskInstance.task_id == task_instance.task_id,
-                TaskInstance.map_index >= 0,
-            )
-            .all()
-        )
-
-    return summary
-
 
 def encode_dag_run(dag_run: Optional[models.DagRun]) -> Optional[Dict[str, Any]]:
     if not dag_run:
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 63c9fb1..a61727f 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -2648,12 +2648,16 @@ class Airflow(AirflowBaseView):
         form = GraphForm(data=dt_nr_dr_data)
         form.execution_date.choices = dt_nr_dr_data['dr_choices']
 
-        task_instances = {ti.task_id: alchemy_to_dict(ti) for ti in dag.get_task_instances(dttm, dttm)}
+        task_instances = {
+            ti.task_id: wwwutils.get_instance_with_map(ti, session)
+            for ti in dag.get_task_instances(dttm, dttm)
+        }
         tasks = {
             t.task_id: {
                 'dag_id': t.dag_id,
                 'task_type': t.task_type,
                 'extra_links': t.extra_links,
+                'is_mapped': t.is_mapped,
             }
             for t in dag.tasks
         }
@@ -3244,7 +3248,11 @@ class Airflow(AirflowBaseView):
         else:
             return "Error: Invalid execution_date"
 
-        task_instances = {ti.task_id: alchemy_to_dict(ti) for ti in dag.get_task_instances(dttm, dttm)}
+        with create_session() as session:
+            task_instances = {
+                ti.task_id: wwwutils.get_instance_with_map(ti, session)
+                for ti in dag.get_task_instances(dttm, dttm)
+            }
 
         return json.dumps(task_instances, cls=utils_json.AirflowJsonEncoder)
 

[airflow] 02/03: make UI and tree work with mapped tasks

Posted by bb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bbovenzi pushed a commit to branch show-mapped-task-in-tree-view
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 9616be41fdfe984fd38604a6d423cdd0d7beb3fc
Author: Brent Bovenzi <br...@gmail.com>
AuthorDate: Wed Feb 16 11:27:29 2022 -0500

    make UI and tree work with mapped tasks
---
 airflow/www/static/js/tree/InstanceTooltip.jsx | 64 ++++++++++++++++++------
 airflow/www/static/js/tree/renderTaskRows.jsx  |  7 ++-
 airflow/www/utils.py                           | 68 +++++++++++++++++++++++++-
 airflow/www/views.py                           | 28 ++++++-----
 4 files changed, 138 insertions(+), 29 deletions(-)

diff --git a/airflow/www/static/js/tree/InstanceTooltip.jsx b/airflow/www/static/js/tree/InstanceTooltip.jsx
index a1ef192..612905b 100644
--- a/airflow/www/static/js/tree/InstanceTooltip.jsx
+++ b/airflow/www/static/js/tree/InstanceTooltip.jsx
@@ -24,30 +24,33 @@ import { Box, Text } from '@chakra-ui/react';
 
 import { formatDateTime, getDuration, formatDuration } from '../datetime_utils';
 
+const STATES = [
+  ['success', 0],
+  ['failed', 0],
+  ['upstream_failed', 0],
+  ['up_for_retry', 0],
+  ['up_for_reschedule', 0],
+  ['running', 0],
+  ['deferred', 0],
+  ['sensing', 0],
+  ['queued', 0],
+  ['scheduled', 0],
+  ['skipped', 0],
+  ['no_status', 0],
+];
+
 const InstanceTooltip = ({
   group,
   instance: {
-    duration, operator, startDate, endDate, state, taskId, runId,
+    duration, operator, startDate, endDate, state, taskId, runId, mappedStates,
   },
 }) => {
   const isGroup = !!group.children;
   const groupSummary = [];
+  const mapSummary = [];
 
   if (isGroup) {
-    const numMap = new Map([
-      ['success', 0],
-      ['failed', 0],
-      ['upstream_failed', 0],
-      ['up_for_retry', 0],
-      ['up_for_reschedule', 0],
-      ['running', 0],
-      ['deferred', 0],
-      ['sensing', 0],
-      ['queued', 0],
-      ['scheduled', 0],
-      ['skipped', 0],
-      ['no_status', 0],
-    ]);
+    const numMap = new Map(STATES);
     group.children.forEach((child) => {
       const taskInstance = child.instances.find((ti) => ti.runId === runId);
       if (taskInstance) {
@@ -69,6 +72,26 @@ const InstanceTooltip = ({
     });
   }
 
+  if (group.isMapped && mappedStates) {
+    const numMap = new Map(STATES);
+    mappedStates.forEach((s) => {
+      const stateKey = s || 'no_status';
+      if (numMap.has(stateKey)) numMap.set(stateKey, numMap.get(stateKey) + 1);
+    });
+    numMap.forEach((key, val) => {
+      if (key > 0) {
+        mapSummary.push(
+          // eslint-disable-next-line react/no-array-index-key
+          <Text key={val} ml="10px">
+            {val}
+            {': '}
+            {key}
+          </Text>,
+        );
+      }
+    });
+  }
+
   const taskIdTitle = isGroup ? 'Task Group Id: ' : 'Task Id: ';
 
   return (
@@ -88,6 +111,17 @@ const InstanceTooltip = ({
           {groupSummary}
         </>
       )}
+      {group.isMapped && (
+        <>
+          <br />
+          <Text as="strong">
+            {mappedStates.length}
+            {' '}
+            Tasks Mapped
+          </Text>
+          {mapSummary}
+        </>
+      )}
       <br />
       <Text>
         {taskIdTitle}
diff --git a/airflow/www/static/js/tree/renderTaskRows.jsx b/airflow/www/static/js/tree/renderTaskRows.jsx
index 8dcd88d..224885b 100644
--- a/airflow/www/static/js/tree/renderTaskRows.jsx
+++ b/airflow/www/static/js/tree/renderTaskRows.jsx
@@ -53,7 +53,7 @@ const renderTaskRows = ({
 ));
 
 const TaskName = ({
-  isGroup, onToggle, isOpen, level, taskName,
+  isGroup = false, isMapped = false, onToggle, isOpen, level, taskName,
 }) => (
   <Box _groupHover={{ backgroundColor: 'rgba(113, 128, 150, 0.1)' }} transition="background-color 0.2s">
     <Flex
@@ -74,6 +74,9 @@ const TaskName = ({
         isTruncated
       >
         {taskName}
+        {isMapped && (
+          ' [ ]'
+        )}
       </Text>
       {isGroup && (
         isOpen ? <FiChevronDown data-testid="open-group" /> : <FiChevronUp data-testid="closed-group" />
@@ -107,6 +110,7 @@ const Row = ({
 }) => {
   const { data: { dagRuns = [] } } = useTreeData();
   const isGroup = !!task.children;
+
   const taskName = prevTaskId ? task.id.replace(`${prevTaskId}.`, '') : task.id;
 
   const storageKey = `${dagId}-open-groups`;
@@ -148,6 +152,7 @@ const Row = ({
             <TaskName
               onToggle={onToggle}
               isGroup={isGroup}
+              isMapped={task.isMapped}
               taskName={taskName}
               isOpen={isOpen}
               level={level}
diff --git a/airflow/www/utils.py b/airflow/www/utils.py
index d9ec10f..ed765d4 100644
--- a/airflow/www/utils.py
+++ b/airflow/www/utils.py
@@ -36,9 +36,11 @@ from pendulum.datetime import DateTime
 from pygments import highlight, lexers
 from pygments.formatters import HtmlFormatter
 from sqlalchemy.ext.associationproxy import AssociationProxy
+from sqlalchemy.orm import Session
 
 from airflow import models
 from airflow.models import errors
+from airflow.models.taskinstance import TaskInstance
 from airflow.utils import timezone
 from airflow.utils.code_utils import get_python_source
 from airflow.utils.json import AirflowJsonEncoder
@@ -53,11 +55,13 @@ def datetime_to_string(value: Optional[DateTime]) -> Optional[str]:
     return value.isoformat()
 
 
-def encode_ti(task_instance: Optional[models.TaskInstance]) -> Optional[Dict[str, Any]]:
+def encode_ti(
+    task_instance: Optional[TaskInstance], is_mapped: Optional[bool], session: Session
+) -> Optional[Dict[str, Any]]:
     if not task_instance:
         return None
 
-    return {
+    summary = {
         'task_id': task_instance.task_id,
         'dag_id': task_instance.dag_id,
         'run_id': task_instance.run_id,
@@ -70,6 +74,66 @@ def encode_ti(task_instance: Optional[models.TaskInstance]) -> Optional[Dict[str
         'try_number': task_instance.try_number,
     }
 
+    def get_mapped_summary(task_instances):
+        priority = [
+            'failed',
+            'upstream_failed',
+            'up_for_retry',
+            'up_for_reschedule',
+            'queued',
+            'scheduled',
+            'deferred',
+            'sensing',
+            'running',
+            'shutdown',
+            'restarting',
+            'removed',
+            'no_status',
+            'success',
+            'skipped',
+        ]
+
+        mapped_states = [ti.state for ti in task_instances]
+
+        group_state = None
+        for state in priority:
+            if state in mapped_states:
+                group_state = state
+                break
+
+        group_start_date = datetime_to_string(
+            min((ti.start_date for ti in task_instances if ti.start_date), default=None)
+        )
+        group_end_date = datetime_to_string(
+            max((ti.end_date for ti in task_instances if ti.end_date), default=None)
+        )
+
+        return {
+            'task_id': task_instance.task_id,
+            'run_id': task_instance.run_id,
+            'state': group_state,
+            'start_date': group_start_date,
+            'end_date': group_end_date,
+            'mapped_states': mapped_states,
+            'operator': task_instance.operator,
+            'execution_date': datetime_to_string(task_instance.execution_date),
+            'try_number': task_instance.try_number,
+        }
+
+    if is_mapped:
+        return get_mapped_summary(
+            session.query(TaskInstance)
+            .filter(
+                TaskInstance.dag_id == task_instance.dag_id,
+                TaskInstance.run_id == task_instance.run_id,
+                TaskInstance.task_id == task_instance.task_id,
+                TaskInstance.map_index >= 0,
+            )
+            .all()
+        )
+
+    return summary
+
 
 def encode_dag_run(dag_run: Optional[models.DagRun]) -> Optional[Dict[str, Any]]:
     if not dag_run:
diff --git a/airflow/www/views.py b/airflow/www/views.py
index cd45952..63c9fb1 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -99,7 +99,7 @@ from airflow.jobs.base_job import BaseJob
 from airflow.jobs.scheduler_job import SchedulerJob
 from airflow.jobs.triggerer_job import TriggererJob
 from airflow.models import DAG, Connection, DagModel, DagTag, Log, SlaMiss, TaskFail, XCom, errors
-from airflow.models.baseoperator import BaseOperator
+from airflow.models.abstractoperator import AbstractOperator
 from airflow.models.dagcode import DagCode
 from airflow.models.dagrun import DagRun, DagRunType
 from airflow.models.serialized_dag import SerializedDagModel
@@ -223,23 +223,30 @@ def get_date_time_num_runs_dag_runs_form_data(www_request, session, dag):
     }
 
 
-def task_group_to_tree(task_item_or_group, dag, dag_runs, tis):
+def task_group_to_tree(task_item_or_group, dag, dag_runs, tis, session):
     """
     Create a nested dict representation of this TaskGroup and its children used to construct
     the Graph.
     """
-    if isinstance(task_item_or_group, BaseOperator):
+    if isinstance(task_item_or_group, AbstractOperator):
         return {
             'id': task_item_or_group.task_id,
-            'instances': [wwwutils.encode_ti(ti) for ti in tis if ti.task_id == task_item_or_group.task_id],
+            'instances': [
+                wwwutils.encode_ti(ti, task_item_or_group.is_mapped, session)
+                for ti in tis
+                if ti.task_id == task_item_or_group.task_id
+            ],
             'label': task_item_or_group.label,
-            'extra_links': task_item_or_group.extra_links,
+            'extra_links': [],
+            'is_mapped': task_item_or_group.is_mapped,
         }
 
     # Task Group
     task_group = task_item_or_group
 
-    children = [task_group_to_tree(child, dag, dag_runs, tis) for child in task_group.children.values()]
+    children = [
+        task_group_to_tree(child, dag, dag_runs, tis, session) for child in task_group.children.values()
+    ]
 
     def get_summary(dag_run, children):
         priority = [
@@ -307,7 +314,7 @@ def task_group_to_dict(task_item_or_group):
     Create a nested dict representation of this TaskGroup and its children used to construct
     the Graph.
     """
-    if isinstance(task_item_or_group, BaseOperator):
+    if isinstance(task_item_or_group, AbstractOperator):
         return {
             'id': task_item_or_group.task_id,
             'value': {
@@ -423,7 +430,7 @@ def dag_edges(dag):
 
     def collect_edges(task_group):
         """Update edges_to_add and edges_to_skip according to TaskGroups."""
-        if isinstance(task_group, BaseOperator):
+        if isinstance(task_group, AbstractOperator):
             return
 
         for target_id in task_group.downstream_group_ids:
@@ -2467,7 +2474,7 @@ class Airflow(AirflowBaseView):
         tis = dag.get_task_instances(start_date=min_date, end_date=base_date, session=session)
 
         data = {
-            'groups': task_group_to_tree(dag.task_group, dag, dag_runs, tis),
+            'groups': task_group_to_tree(dag.task_group, dag, dag_runs, tis, session),
             'dag_runs': encoded_runs,
         }
 
@@ -3248,7 +3255,6 @@ class Airflow(AirflowBaseView):
             (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
         ]
     )
-    @action_logging
     def tree_data(self):
         """Returns tree data"""
         dag_id = request.args.get('dag_id')
@@ -3286,7 +3292,7 @@ class Airflow(AirflowBaseView):
             min_date = min(dag_run_dates, default=None)
             tis = dag.get_task_instances(start_date=min_date, end_date=base_date, session=session)
             data = {
-                'groups': task_group_to_tree(dag.task_group, dag, dag_runs, tis),
+                'groups': task_group_to_tree(dag.task_group, dag, dag_runs, tis, session),
                 'dag_runs': encoded_runs,
             }
 

[airflow] 01/03: Expand mapped tasks in the Scheduler

Posted by bb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bbovenzi pushed a commit to branch show-mapped-task-in-tree-view
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 0a315d88c8e35a2e6f832e6fb938086cbee7a025
Author: Ash Berlin-Taylor <as...@apache.org>
AuthorDate: Tue Feb 15 23:18:50 2022 +0000

    Expand mapped tasks in the Scheduler
    
    Technically this is done inside
    DagRun.task_instance_scheduling_decisions, but the only place that is
    currently called is the Scheduler
    
    The way we are getting `upstream_ti` to pass to expand_mapped_task is
    all sorts of wrong and will need fixing, I think the interface for that
    method is wrong and the mapped task should be responsible for finding
    the right upstream TI itself.
---
 airflow/models/dagrun.py | 48 ++++++++++++++++++++++++------------------------
 1 file changed, 24 insertions(+), 24 deletions(-)

diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 5170ad3..69b003f 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -43,7 +43,6 @@ from sqlalchemy.orm.session import Session
 from sqlalchemy.sql.expression import false, select, true
 
 from airflow import settings
-from airflow.callbacks.callback_requests import DagCallbackRequest
 from airflow.configuration import conf as airflow_conf
 from airflow.exceptions import AirflowException, TaskNotFound
 from airflow.models.base import COLLATION_ARGS, ID_LEN, Base
@@ -53,7 +52,7 @@ from airflow.models.tasklog import LogTemplate
 from airflow.stats import Stats
 from airflow.ti_deps.dep_context import DepContext
 from airflow.ti_deps.dependencies_states import SCHEDULEABLE_STATES
-from airflow.utils import timezone
+from airflow.utils import callback_requests, timezone
 from airflow.utils.helpers import is_container
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.session import NEW_SESSION, provide_session
@@ -488,7 +487,7 @@ class DagRun(Base, LoggingMixin):
     @provide_session
     def update_state(
         self, session: Session = NEW_SESSION, execute_callbacks: bool = True
-    ) -> Tuple[List[TI], Optional[DagCallbackRequest]]:
+    ) -> Tuple[List[TI], Optional[callback_requests.DagCallbackRequest]]:
         """
         Determines the overall state of the DagRun based on the state
         of its TaskInstances.
@@ -500,7 +499,7 @@ class DagRun(Base, LoggingMixin):
             needs to be executed
         """
         # Callback to execute in case of Task Failures
-        callback: Optional[DagCallbackRequest] = None
+        callback: Optional[callback_requests.DagCallbackRequest] = None
 
         start_dttm = timezone.utcnow()
         self.last_scheduling_decision = start_dttm
@@ -536,7 +535,7 @@ class DagRun(Base, LoggingMixin):
             if execute_callbacks:
                 dag.handle_callback(self, success=False, reason='task_failure', session=session)
             elif dag.has_on_failure_callback:
-                callback = DagCallbackRequest(
+                callback = callback_requests.DagCallbackRequest(
                     full_filepath=dag.fileloc,
                     dag_id=self.dag_id,
                     run_id=self.run_id,
@@ -551,7 +550,7 @@ class DagRun(Base, LoggingMixin):
             if execute_callbacks:
                 dag.handle_callback(self, success=True, reason='success', session=session)
             elif dag.has_on_success_callback:
-                callback = DagCallbackRequest(
+                callback = callback_requests.DagCallbackRequest(
                     full_filepath=dag.fileloc,
                     dag_id=self.dag_id,
                     run_id=self.run_id,
@@ -572,7 +571,7 @@ class DagRun(Base, LoggingMixin):
             if execute_callbacks:
                 dag.handle_callback(self, success=False, reason='all_tasks_deadlocked', session=session)
             elif dag.has_on_failure_callback:
-                callback = DagCallbackRequest(
+                callback = callback_requests.DagCallbackRequest(
                     full_filepath=dag.fileloc,
                     dag_id=self.dag_id,
                     run_id=self.run_id,
@@ -652,7 +651,7 @@ class DagRun(Base, LoggingMixin):
 
     def _get_ready_tis(
         self,
-        schedulable_tis: List[TI],
+        scheduleable_tis: List[TI],
         finished_tis: List[TI],
         session: Session,
     ) -> Tuple[List[TI], bool]:
@@ -660,40 +659,41 @@ class DagRun(Base, LoggingMixin):
         ready_tis: List[TI] = []
         changed_tis = False
 
-        if not schedulable_tis:
+        if not scheduleable_tis:
             return ready_tis, changed_tis
 
         # If we expand TIs, we need a new list so that we iterate over them too. (We can't alter
-        # `schedulable_tis` in place and have the `for` loop pick them up
+        # `scheduleable_tis` in place and have the `for` loop pick them up
         expanded_tis: List[TI] = []
 
         # Check dependencies
-        for schedulable in itertools.chain(schedulable_tis, expanded_tis):
+        for st in itertools.chain(scheduleable_tis, expanded_tis):
 
             # Expansion of last resort! This is ideally handled in the mini-scheduler in LocalTaskJob, but if
             # for any reason it wasn't, we need to expand it now
-            if schedulable.map_index < 0 and schedulable.task.is_mapped:
+            if st.map_index < 0 and st.task.is_mapped:
                 # HACK. This needs a better way, one that copes with multiple upstreams!
                 for ti in finished_tis:
-                    if schedulable.task_id in ti.task.downstream_task_ids:
-
-                        assert isinstance(schedulable.task, MappedOperator)
-                        new_tis = schedulable.task.expand_mapped_task(self.run_id, session=session)
-                        if schedulable.state == TaskInstanceState.SKIPPED:
-                            # Task is now skipped (likely cos upstream returned 0 tasks
-                            continue
-                        assert new_tis[0] is schedulable
+                    if st.task_id in ti.task.downstream_task_ids:
+                        upstream = ti
+
+                        assert isinstance(st.task, MappedOperator)
+                        new_tis = st.task.expand_mapped_task(upstream, session=session)
+                        assert new_tis[0] is st
+                        # Add the new TIs to the list to be checked
+                        for new_ti in new_tis[1:]:
+                            new_ti.task = st.task
                         expanded_tis.extend(new_tis[1:])
                         break
 
-            old_state = schedulable.state
-            if schedulable.are_dependencies_met(
+            old_state = st.state
+            if st.are_dependencies_met(
                 dep_context=DepContext(flag_upstream_failed=True, finished_tis=finished_tis),
                 session=session,
             ):
-                ready_tis.append(schedulable)
+                ready_tis.append(st)
             else:
-                old_states[schedulable.key] = old_state
+                old_states[st.key] = old_state
 
         # Check if any ti changed state
         tis_filter = TI.filter_for_tis(old_states.keys())