You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2022/04/21 14:48:32 UTC

[airflow] branch main updated: Task actions UI for individual mapped instances (#23127)

This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1e8ac47589 Task actions UI for individual mapped instances (#23127)
1e8ac47589 is described below

commit 1e8ac47589967f2a7284faeab0f65b01bfd8202d
Author: Brent Bovenzi <br...@gmail.com>
AuthorDate: Thu Apr 21 10:48:24 2022 -0400

    Task actions UI for individual mapped instances (#23127)
---
 airflow/www/static/js/dag.js                       | 18 ++++-----
 airflow/www/static/js/tree/Table.jsx               | 44 ++++++++++++++++++++--
 airflow/www/static/js/tree/api/useClearTask.js     | 11 ++++--
 .../www/static/js/tree/api/useConfirmMarkTask.js   | 16 +++++---
 .../www/static/js/tree/api/useMarkFailedTask.js    | 11 ++++--
 .../www/static/js/tree/api/useMarkSuccessTask.js   | 12 ++++--
 airflow/www/static/js/tree/api/useRunTask.js       | 43 +++++++++++----------
 .../content/taskInstance/MappedInstances.jsx       |  3 +-
 .../js/tree/details/content/taskInstance/index.jsx | 22 ++++++++++-
 .../content/taskInstance/taskActions/Clear.jsx     |  3 ++
 .../taskInstance/taskActions/MarkFailed.jsx        |  3 ++
 .../taskInstance/taskActions/MarkSuccess.jsx       |  8 ++--
 .../content/taskInstance/taskActions/Run.jsx       |  2 +
 airflow/www/templates/airflow/dag.html             |  3 ++
 airflow/www/utils.py                               |  1 +
 15 files changed, 146 insertions(+), 54 deletions(-)

diff --git a/airflow/www/static/js/dag.js b/airflow/www/static/js/dag.js
index ca8b7cc676..0445e0686c 100644
--- a/airflow/www/static/js/dag.js
+++ b/airflow/www/static/js/dag.js
@@ -171,14 +171,11 @@ export function callModal({
   $('#extra_links').prev('hr').hide();
   $('#extra_links').empty().hide();
   if (mi >= 0) {
-    // Marking state and clear are not yet supported for mapped instances
-    $('#success_action').hide();
-    $('#failed_action').hide();
-    $('#clear_action').hide();
+    $('#modal_map_index').show();
+    $('#modal_map_index .value').text(mi);
   } else {
-    $('#success_action').show();
-    $('#failed_action').show();
-    $('#clear_action').show();
+    $('#modal_map_index').hide();
+    $('#modal_map_index .value').text('');
   }
   if (isSubDag) {
     $('#div_btn_subdag').show();
@@ -339,7 +336,6 @@ $(document).on('click', '.map_index_item', function mapItem() {
 $('form[data-action]').on('submit', function submit(e) {
   e.preventDefault();
   const form = $(this).get(0);
-  // Somehow submit is fired twice. Only once is the executionDate/dagRunId valid
   if (dagRunId || executionDate) {
     if (form.dag_run_id) {
       form.dag_run_id.value = dagRunId;
@@ -351,8 +347,10 @@ $('form[data-action]').on('submit', function submit(e) {
     if (form.task_id) {
       form.task_id.value = taskId;
     }
-    if (form.map_index) {
-      form.map_index.value = mapIndex === undefined ? '' : mapIndex;
+    if (form.map_index && mapIndex >= 0) {
+      form.map_index.value = mapIndex;
+    } else if (form.map_index) {
+      form.map_index.remove();
     }
     form.action = $(this).data('action');
     form.submit();
diff --git a/airflow/www/static/js/tree/Table.jsx b/airflow/www/static/js/tree/Table.jsx
index aef91ce905..e500b08966 100644
--- a/airflow/www/static/js/tree/Table.jsx
+++ b/airflow/www/static/js/tree/Table.jsx
@@ -21,7 +21,7 @@
  * Custom wrapper of react-table using Chakra UI components
 */
 
-import React, { useEffect } from 'react';
+import React, { useEffect, useRef, forwardRef } from 'react';
 import {
   Flex,
   Table as ChakraTable,
@@ -33,9 +33,10 @@ import {
   IconButton,
   Text,
   useColorModeValue,
+  Checkbox,
 } from '@chakra-ui/react';
 import {
-  useTable, useSortBy, usePagination,
+  useTable, useSortBy, usePagination, useRowSelect,
 } from 'react-table';
 import {
   MdKeyboardArrowLeft, MdKeyboardArrowRight,
@@ -44,8 +45,23 @@ import {
   TiArrowUnsorted, TiArrowSortedDown, TiArrowSortedUp,
 } from 'react-icons/ti';
 
+const IndeterminateCheckbox = forwardRef(
+  ({ indeterminate, checked, ...rest }, ref) => {
+    const defaultRef = useRef();
+    const resolvedRef = ref || defaultRef;
+
+    useEffect(() => {
+      resolvedRef.current.indeterminate = indeterminate;
+    }, [resolvedRef, indeterminate]);
+
+    return (
+      <Checkbox ref={resolvedRef} isChecked={checked} {...rest} />
+    );
+  },
+);
+
 const Table = ({
-  data, columns, manualPagination, pageSize = 25, setSortBy, isLoading = false,
+  data, columns, manualPagination, pageSize = 25, setSortBy, isLoading = false, selectRows,
 }) => {
   const { totalEntries, offset, setOffset } = manualPagination || {};
   const oddColor = useColorModeValue('gray.50', 'gray.900');
@@ -66,7 +82,8 @@ const Table = ({
     canNextPage,
     nextPage,
     previousPage,
-    state: { pageIndex, sortBy },
+    selectedFlatRows,
+    state: { pageIndex, sortBy, selectedRowIds },
   } = useTable(
     {
       columns,
@@ -81,6 +98,20 @@ const Table = ({
     },
     useSortBy,
     usePagination,
+    useRowSelect,
+    (hooks) => {
+      hooks.visibleColumns.push((cols) => [
+        {
+          id: 'selection',
+          Cell: ({ row }) => (
+            <div>
+              <IndeterminateCheckbox {...row.getToggleRowSelectedProps()} />
+            </div>
+          ),
+        },
+        ...cols,
+      ]);
+    },
   );
 
   const handleNext = () => {
@@ -97,6 +128,11 @@ const Table = ({
     if (setSortBy) setSortBy(sortBy);
   }, [sortBy, setSortBy]);
 
+  useEffect(() => {
+    if (selectRows) selectRows(selectedFlatRows.map((row) => row.original.mapIndex));
+  // eslint-disable-next-line react-hooks/exhaustive-deps
+  }, [selectedRowIds, selectRows]);
+
   return (
     <>
       <ChakraTable {...getTableProps()}>
diff --git a/airflow/www/static/js/tree/api/useClearTask.js b/airflow/www/static/js/tree/api/useClearTask.js
index bcf99bb250..2ea3eee486 100644
--- a/airflow/www/static/js/tree/api/useClearTask.js
+++ b/airflow/www/static/js/tree/api/useClearTask.js
@@ -36,7 +36,7 @@ export default function useClearTask({
   return useMutation(
     ['clearTask', dagId, runId, taskId],
     ({
-      past, future, upstream, downstream, recursive, failed, confirmed,
+      past, future, upstream, downstream, recursive, failed, confirmed, mapIndexes = [],
     }) => {
       const params = new URLSearchParams({
         csrf_token: csrfToken,
@@ -51,9 +51,13 @@ export default function useClearTask({
         downstream,
         recursive,
         only_failed: failed,
-      }).toString();
+      });
+
+      mapIndexes.forEach((mi) => {
+        params.append('map_index', mi);
+      });
 
-      return axios.post(clearUrl, params, {
+      return axios.post(clearUrl, params.toString(), {
         headers: {
           'Content-Type': 'application/x-www-form-urlencoded',
         },
@@ -71,6 +75,7 @@ export default function useClearTask({
         }
         if (!status || status !== 'error') {
           queryClient.invalidateQueries('treeData');
+          queryClient.invalidateQueries('mappedInstances', dagId, runId, taskId);
           startRefresh();
         }
       },
diff --git a/airflow/www/static/js/tree/api/useConfirmMarkTask.js b/airflow/www/static/js/tree/api/useConfirmMarkTask.js
index 1450a15d3d..d1f8eef9d3 100644
--- a/airflow/www/static/js/tree/api/useConfirmMarkTask.js
+++ b/airflow/www/static/js/tree/api/useConfirmMarkTask.js
@@ -29,9 +29,9 @@ export default function useConfirmMarkTask({
   return useMutation(
     ['confirmStateChange', dagId, runId, taskId, state],
     ({
-      past, future, upstream, downstream,
-    }) => axios.get(confirmUrl, {
-      params: {
+      past, future, upstream, downstream, mapIndexes = [],
+    }) => {
+      const params = new URLSearchParams({
         dag_id: dagId,
         dag_run_id: runId,
         task_id: taskId,
@@ -40,7 +40,13 @@ export default function useConfirmMarkTask({
         upstream,
         downstream,
         state,
-      },
-    }),
+      });
+
+      mapIndexes.forEach((mi) => {
+        params.append('map_index', mi);
+      });
+
+      return axios.get(confirmUrl, { params });
+    },
   );
 }
diff --git a/airflow/www/static/js/tree/api/useMarkFailedTask.js b/airflow/www/static/js/tree/api/useMarkFailedTask.js
index f2fd28bdb2..a94ab22d0c 100644
--- a/airflow/www/static/js/tree/api/useMarkFailedTask.js
+++ b/airflow/www/static/js/tree/api/useMarkFailedTask.js
@@ -33,7 +33,7 @@ export default function useMarkFailedTask({
   return useMutation(
     ['markFailed', dagId, runId, taskId],
     ({
-      past, future, upstream, downstream,
+      past, future, upstream, downstream, mapIndexes = [],
     }) => {
       const params = new URLSearchParams({
         csrf_token: csrfToken,
@@ -45,9 +45,13 @@ export default function useMarkFailedTask({
         future,
         upstream,
         downstream,
-      }).toString();
+      });
+
+      mapIndexes.forEach((mi) => {
+        params.append('map_index', mi);
+      });
 
-      return axios.post(failedUrl, params, {
+      return axios.post(failedUrl, params.toString(), {
         headers: {
           'Content-Type': 'application/x-www-form-urlencoded',
         },
@@ -56,6 +60,7 @@ export default function useMarkFailedTask({
     {
       onSuccess: () => {
         queryClient.invalidateQueries('treeData');
+        queryClient.invalidateQueries('mappedInstances', dagId, runId, taskId);
         startRefresh();
       },
     },
diff --git a/airflow/www/static/js/tree/api/useMarkSuccessTask.js b/airflow/www/static/js/tree/api/useMarkSuccessTask.js
index 92ba539de6..47fda2f0f8 100644
--- a/airflow/www/static/js/tree/api/useMarkSuccessTask.js
+++ b/airflow/www/static/js/tree/api/useMarkSuccessTask.js
@@ -33,7 +33,7 @@ export default function useMarkSuccessTask({
   return useMutation(
     ['markSuccess', dagId, runId, taskId],
     ({
-      past, future, upstream, downstream,
+      past, future, upstream, downstream, mapIndexes = [],
     }) => {
       const params = new URLSearchParams({
         csrf_token: csrfToken,
@@ -45,9 +45,14 @@ export default function useMarkSuccessTask({
         future,
         upstream,
         downstream,
-      }).toString();
+        map_indexes: mapIndexes,
+      });
+
+      mapIndexes.forEach((mi) => {
+        params.append('map_index', mi);
+      });
 
-      return axios.post(successUrl, params, {
+      return axios.post(successUrl, params.toString(), {
         headers: {
           'Content-Type': 'application/x-www-form-urlencoded',
         },
@@ -56,6 +61,7 @@ export default function useMarkSuccessTask({
     {
       onSuccess: () => {
         queryClient.invalidateQueries('treeData');
+        queryClient.invalidateQueries('mappedInstances', dagId, runId, taskId);
         startRefresh();
       },
     },
diff --git a/airflow/www/static/js/tree/api/useRunTask.js b/airflow/www/static/js/tree/api/useRunTask.js
index 44a9e14bf4..9e45c42f59 100644
--- a/airflow/www/static/js/tree/api/useRunTask.js
+++ b/airflow/www/static/js/tree/api/useRunTask.js
@@ -32,32 +32,34 @@ export default function useRunTask(dagId, runId, taskId) {
   const { startRefresh } = useAutoRefresh();
   return useMutation(
     ['runTask', dagId, runId, taskId],
-    ({
+    async ({
       ignoreAllDeps,
       ignoreTaskState,
       ignoreTaskDeps,
-      mapIndex = -1,
-    }) => {
-      const params = new URLSearchParams({
-        csrf_token: csrfToken,
-        dag_id: dagId,
-        dag_run_id: runId,
-        task_id: taskId,
-        ignore_all_deps: ignoreAllDeps,
-        ignore_task_deps: ignoreTaskDeps,
-        ignore_ti_state: ignoreTaskState,
-        map_index: mapIndex,
-      }).toString();
+      mapIndexes,
+    }) => Promise.all(
+      (mapIndexes.length ? mapIndexes : [-1]).map((mi) => {
+        const params = new URLSearchParams({
+          csrf_token: csrfToken,
+          dag_id: dagId,
+          dag_run_id: runId,
+          task_id: taskId,
+          ignore_all_deps: ignoreAllDeps,
+          ignore_task_deps: ignoreTaskDeps,
+          ignore_ti_state: ignoreTaskState,
+          map_index: mi,
+        }).toString();
 
-      return axios.post(runUrl, params, {
-        headers: {
-          'Content-Type': 'application/x-www-form-urlencoded',
-        },
-      });
-    },
+        return axios.post(runUrl, params, {
+          headers: {
+            'Content-Type': 'application/x-www-form-urlencoded',
+          },
+        });
+      }),
+    ),
     {
       onSuccess: (data) => {
-        const { message, status } = data;
+        const { message, status } = data.length ? data[0] : data;
         if (message && status === 'error') {
           toast({
             description: message,
@@ -67,6 +69,7 @@ export default function useRunTask(dagId, runId, taskId) {
         }
         if (!status || status !== 'error') {
           queryClient.invalidateQueries('treeData');
+          queryClient.invalidateQueries('mappedInstances', dagId, runId, taskId);
           startRefresh();
         }
       },
diff --git a/airflow/www/static/js/tree/details/content/taskInstance/MappedInstances.jsx b/airflow/www/static/js/tree/details/content/taskInstance/MappedInstances.jsx
index 42bbdca66f..77c0713ab3 100644
--- a/airflow/www/static/js/tree/details/content/taskInstance/MappedInstances.jsx
+++ b/airflow/www/static/js/tree/details/content/taskInstance/MappedInstances.jsx
@@ -46,7 +46,7 @@ const IconLink = (props) => (
 );
 
 const MappedInstances = ({
-  dagId, runId, taskId,
+  dagId, runId, taskId, selectRows,
 }) => {
   const limit = 25;
   const [offset, setOffset] = useState(0);
@@ -147,6 +147,7 @@ const MappedInstances = ({
         pageSize={limit}
         setSortBy={setSortBy}
         isLoading={isLoading}
+        selectRows={selectRows}
       />
     </Box>
   );
diff --git a/airflow/www/static/js/tree/details/content/taskInstance/index.jsx b/airflow/www/static/js/tree/details/content/taskInstance/index.jsx
index d8b71cb128..0e4f441e24 100644
--- a/airflow/www/static/js/tree/details/content/taskInstance/index.jsx
+++ b/airflow/www/static/js/tree/details/content/taskInstance/index.jsx
@@ -17,12 +17,13 @@
  * under the License.
  */
 
-import React from 'react';
+import React, { useState } from 'react';
 import {
   Box,
   VStack,
   Divider,
   StackDivider,
+  Text,
 } from '@chakra-ui/react';
 
 import RunAction from './taskActions/Run';
@@ -54,6 +55,7 @@ const getTask = ({ taskId, runId, task }) => {
 };
 
 const TaskInstance = ({ taskId, runId }) => {
+  const [selectedRows, setSelectedRows] = useState([]);
   const { data: { groups = {}, dagRuns = [] } } = useTreeData();
   const group = getTask({ taskId, runId, task: groups });
   const run = dagRuns.find((r) => r.runId === runId);
@@ -68,6 +70,11 @@ const TaskInstance = ({ taskId, runId }) => {
 
   const instance = group.instances.find((ti) => ti.runId === runId);
 
+  let taskActionsTitle = 'Task Actions';
+  if (isMapped) {
+    taskActionsTitle += ` for ${selectedRows.length || 'all'} mapped task${selectedRows.length !== 1 ? 's' : ''}`;
+  }
+
   return (
     <Box py="4px">
       {!isGroup && (
@@ -80,27 +87,33 @@ const TaskInstance = ({ taskId, runId }) => {
       )}
       {!isGroup && (
         <Box my={3}>
+          <Text as="strong">{taskActionsTitle}</Text>
+          <Divider my={2} />
           <VStack justifyContent="center" divider={<StackDivider my={3} />}>
             <RunAction
               runId={runId}
               taskId={taskId}
               dagId={dagId}
+              mapIndexes={selectedRows}
             />
             <ClearAction
               runId={runId}
               taskId={taskId}
               dagId={dagId}
               executionDate={executionDate}
+              mapIndexes={selectedRows}
             />
             <MarkFailedAction
               runId={runId}
               taskId={taskId}
               dagId={dagId}
+              mapIndexes={selectedRows}
             />
             <MarkSuccessAction
               runId={runId}
               taskId={taskId}
               dagId={dagId}
+              mapIndexes={selectedRows}
             />
           </VStack>
           <Divider my={2} />
@@ -122,7 +135,12 @@ const TaskInstance = ({ taskId, runId }) => {
         extraLinks={extraLinks}
       />
       {isMapped && (
-        <MappedInstances dagId={dagId} runId={runId} taskId={taskId} />
+        <MappedInstances
+          dagId={dagId}
+          runId={runId}
+          taskId={taskId}
+          selectRows={setSelectedRows}
+        />
       )}
     </Box>
   );
diff --git a/airflow/www/static/js/tree/details/content/taskInstance/taskActions/Clear.jsx b/airflow/www/static/js/tree/details/content/taskInstance/taskActions/Clear.jsx
index 4196edc6b9..d825976ed2 100644
--- a/airflow/www/static/js/tree/details/content/taskInstance/taskActions/Clear.jsx
+++ b/airflow/www/static/js/tree/details/content/taskInstance/taskActions/Clear.jsx
@@ -34,6 +34,7 @@ const Run = ({
   runId,
   taskId,
   executionDate,
+  mapIndexes,
 }) => {
   const [affectedTasks, setAffectedTasks] = useState([]);
 
@@ -73,6 +74,7 @@ const Run = ({
         recursive,
         failed,
         confirmed: false,
+        mapIndexes,
       });
       setAffectedTasks(data);
       onOpen();
@@ -91,6 +93,7 @@ const Run = ({
         recursive,
         failed,
         confirmed: true,
+        mapIndexes,
       });
       setAffectedTasks([]);
       onClose();
diff --git a/airflow/www/static/js/tree/details/content/taskInstance/taskActions/MarkFailed.jsx b/airflow/www/static/js/tree/details/content/taskInstance/taskActions/MarkFailed.jsx
index fe277c9eef..12f8bcfeef 100644
--- a/airflow/www/static/js/tree/details/content/taskInstance/taskActions/MarkFailed.jsx
+++ b/airflow/www/static/js/tree/details/content/taskInstance/taskActions/MarkFailed.jsx
@@ -33,6 +33,7 @@ const MarkFailed = ({
   dagId,
   runId,
   taskId,
+  mapIndexes,
 }) => {
   const [affectedTasks, setAffectedTasks] = useState([]);
 
@@ -68,6 +69,7 @@ const MarkFailed = ({
         future,
         upstream,
         downstream,
+        mapIndexes,
       });
       setAffectedTasks(data);
       onOpen();
@@ -83,6 +85,7 @@ const MarkFailed = ({
         future,
         upstream,
         downstream,
+        mapIndexes,
       });
       setAffectedTasks([]);
       onClose();
diff --git a/airflow/www/static/js/tree/details/content/taskInstance/taskActions/MarkSuccess.jsx b/airflow/www/static/js/tree/details/content/taskInstance/taskActions/MarkSuccess.jsx
index 06bc80c756..bdf59e6a4d 100644
--- a/airflow/www/static/js/tree/details/content/taskInstance/taskActions/MarkSuccess.jsx
+++ b/airflow/www/static/js/tree/details/content/taskInstance/taskActions/MarkSuccess.jsx
@@ -29,8 +29,8 @@ import ConfirmDialog from '../../ConfirmDialog';
 import ActionButton from './ActionButton';
 import { useMarkSuccessTask, useConfirmMarkTask } from '../../../../api';
 
-const Run = ({
-  dagId, runId, taskId,
+const MarkSuccess = ({
+  dagId, runId, taskId, mapIndexes,
 }) => {
   const [affectedTasks, setAffectedTasks] = useState([]);
 
@@ -64,6 +64,7 @@ const Run = ({
         future,
         upstream,
         downstream,
+        mapIndexes,
       });
       setAffectedTasks(data);
       onOpen();
@@ -79,6 +80,7 @@ const Run = ({
         future,
         upstream,
         downstream,
+        mapIndexes,
       });
       setAffectedTasks([]);
       onClose();
@@ -109,4 +111,4 @@ const Run = ({
   );
 };
 
-export default Run;
+export default MarkSuccess;
diff --git a/airflow/www/static/js/tree/details/content/taskInstance/taskActions/Run.jsx b/airflow/www/static/js/tree/details/content/taskInstance/taskActions/Run.jsx
index 204cec44c2..85d502aeed 100644
--- a/airflow/www/static/js/tree/details/content/taskInstance/taskActions/Run.jsx
+++ b/airflow/www/static/js/tree/details/content/taskInstance/taskActions/Run.jsx
@@ -30,6 +30,7 @@ const Run = ({
   dagId,
   runId,
   taskId,
+  mapIndexes,
 }) => {
   const [ignoreAllDeps, setIgnoreAllDeps] = useState(false);
   const onToggleAllDeps = () => setIgnoreAllDeps(!ignoreAllDeps);
@@ -47,6 +48,7 @@ const Run = ({
       ignoreAllDeps,
       ignoreTaskState,
       ignoreTaskDeps,
+      mapIndexes,
     });
   };
 
diff --git a/airflow/www/templates/airflow/dag.html b/airflow/www/templates/airflow/dag.html
index 1dafcb27f7..e7afde8af9 100644
--- a/airflow/www/templates/airflow/dag.html
+++ b/airflow/www/templates/airflow/dag.html
@@ -310,6 +310,7 @@
             <input type="hidden" name="dag_id" value="{{ dag.dag_id }}">
             <input type="hidden" name="task_id">
             <input type="hidden" name="execution_date">
+            <input type="hidden" name="map_index">
             <input type="hidden" name="origin" value="{{ request.base_url }}">
             <div class="row">
               <span class="btn-group col-xs-12 col-sm-9 task-instance-modal-column" data-toggle="buttons">
@@ -351,6 +352,7 @@
             <input type="hidden" name="dag_id" value="{{ dag.dag_id }}">
             <input type="hidden" name="task_id">
             <input type="hidden" name="dag_run_id">
+            <input type="hidden" name="map_index">
             <input type="hidden" name="origin" value="{{ request.base_url }}">
             <input type="hidden" name="state" value="failed">
             <div class="row">
@@ -384,6 +386,7 @@
             <input type="hidden" name="dag_id" value="{{ dag.dag_id }}">
             <input type="hidden" name="task_id">
             <input type="hidden" name="dag_run_id">
+            <input type="hidden" name="map_index">
             <input type="hidden" name="origin" value="{{ request.base_url }}">
             <input type="hidden" name="state" value="success">
             <div class="row">
diff --git a/airflow/www/utils.py b/airflow/www/utils.py
index 8a5cc3c717..0c452dea38 100644
--- a/airflow/www/utils.py
+++ b/airflow/www/utils.py
@@ -64,6 +64,7 @@ def get_mapped_instances(task_instance, session):
             TaskInstance.run_id == task_instance.run_id,
             TaskInstance.task_id == task_instance.task_id,
         )
+        .order_by(TaskInstance.map_index)
         .all()
     )