You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2022/04/21 14:48:32 UTC
[airflow] branch main updated: Task actions UI for individual mapped instances (#23127)
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 1e8ac47589 Task actions UI for individual mapped instances (#23127)
1e8ac47589 is described below
commit 1e8ac47589967f2a7284faeab0f65b01bfd8202d
Author: Brent Bovenzi <br...@gmail.com>
AuthorDate: Thu Apr 21 10:48:24 2022 -0400
Task actions UI for individual mapped instances (#23127)
---
airflow/www/static/js/dag.js | 18 ++++-----
airflow/www/static/js/tree/Table.jsx | 44 ++++++++++++++++++++--
airflow/www/static/js/tree/api/useClearTask.js | 11 ++++--
.../www/static/js/tree/api/useConfirmMarkTask.js | 16 +++++---
.../www/static/js/tree/api/useMarkFailedTask.js | 11 ++++--
.../www/static/js/tree/api/useMarkSuccessTask.js | 12 ++++--
airflow/www/static/js/tree/api/useRunTask.js | 43 +++++++++++----------
.../content/taskInstance/MappedInstances.jsx | 3 +-
.../js/tree/details/content/taskInstance/index.jsx | 22 ++++++++++-
.../content/taskInstance/taskActions/Clear.jsx | 3 ++
.../taskInstance/taskActions/MarkFailed.jsx | 3 ++
.../taskInstance/taskActions/MarkSuccess.jsx | 8 ++--
.../content/taskInstance/taskActions/Run.jsx | 2 +
airflow/www/templates/airflow/dag.html | 3 ++
airflow/www/utils.py | 1 +
15 files changed, 146 insertions(+), 54 deletions(-)
diff --git a/airflow/www/static/js/dag.js b/airflow/www/static/js/dag.js
index ca8b7cc676..0445e0686c 100644
--- a/airflow/www/static/js/dag.js
+++ b/airflow/www/static/js/dag.js
@@ -171,14 +171,11 @@ export function callModal({
$('#extra_links').prev('hr').hide();
$('#extra_links').empty().hide();
if (mi >= 0) {
- // Marking state and clear are not yet supported for mapped instances
- $('#success_action').hide();
- $('#failed_action').hide();
- $('#clear_action').hide();
+ $('#modal_map_index').show();
+ $('#modal_map_index .value').text(mi);
} else {
- $('#success_action').show();
- $('#failed_action').show();
- $('#clear_action').show();
+ $('#modal_map_index').hide();
+ $('#modal_map_index .value').text('');
}
if (isSubDag) {
$('#div_btn_subdag').show();
@@ -339,7 +336,6 @@ $(document).on('click', '.map_index_item', function mapItem() {
$('form[data-action]').on('submit', function submit(e) {
e.preventDefault();
const form = $(this).get(0);
- // Somehow submit is fired twice. Only once is the executionDate/dagRunId valid
if (dagRunId || executionDate) {
if (form.dag_run_id) {
form.dag_run_id.value = dagRunId;
@@ -351,8 +347,10 @@ $('form[data-action]').on('submit', function submit(e) {
if (form.task_id) {
form.task_id.value = taskId;
}
- if (form.map_index) {
- form.map_index.value = mapIndex === undefined ? '' : mapIndex;
+ if (form.map_index && mapIndex >= 0) {
+ form.map_index.value = mapIndex;
+ } else if (form.map_index) {
+ form.map_index.remove();
}
form.action = $(this).data('action');
form.submit();
diff --git a/airflow/www/static/js/tree/Table.jsx b/airflow/www/static/js/tree/Table.jsx
index aef91ce905..e500b08966 100644
--- a/airflow/www/static/js/tree/Table.jsx
+++ b/airflow/www/static/js/tree/Table.jsx
@@ -21,7 +21,7 @@
* Custom wrapper of react-table using Chakra UI components
*/
-import React, { useEffect } from 'react';
+import React, { useEffect, useRef, forwardRef } from 'react';
import {
Flex,
Table as ChakraTable,
@@ -33,9 +33,10 @@ import {
IconButton,
Text,
useColorModeValue,
+ Checkbox,
} from '@chakra-ui/react';
import {
- useTable, useSortBy, usePagination,
+ useTable, useSortBy, usePagination, useRowSelect,
} from 'react-table';
import {
MdKeyboardArrowLeft, MdKeyboardArrowRight,
@@ -44,8 +45,23 @@ import {
TiArrowUnsorted, TiArrowSortedDown, TiArrowSortedUp,
} from 'react-icons/ti';
+const IndeterminateCheckbox = forwardRef(
+ ({ indeterminate, checked, ...rest }, ref) => {
+ const defaultRef = useRef();
+ const resolvedRef = ref || defaultRef;
+
+ useEffect(() => {
+ resolvedRef.current.indeterminate = indeterminate;
+ }, [resolvedRef, indeterminate]);
+
+ return (
+ <Checkbox ref={resolvedRef} isChecked={checked} {...rest} />
+ );
+ },
+);
+
const Table = ({
- data, columns, manualPagination, pageSize = 25, setSortBy, isLoading = false,
+ data, columns, manualPagination, pageSize = 25, setSortBy, isLoading = false, selectRows,
}) => {
const { totalEntries, offset, setOffset } = manualPagination || {};
const oddColor = useColorModeValue('gray.50', 'gray.900');
@@ -66,7 +82,8 @@ const Table = ({
canNextPage,
nextPage,
previousPage,
- state: { pageIndex, sortBy },
+ selectedFlatRows,
+ state: { pageIndex, sortBy, selectedRowIds },
} = useTable(
{
columns,
@@ -81,6 +98,20 @@ const Table = ({
},
useSortBy,
usePagination,
+ useRowSelect,
+ (hooks) => {
+ hooks.visibleColumns.push((cols) => [
+ {
+ id: 'selection',
+ Cell: ({ row }) => (
+ <div>
+ <IndeterminateCheckbox {...row.getToggleRowSelectedProps()} />
+ </div>
+ ),
+ },
+ ...cols,
+ ]);
+ },
);
const handleNext = () => {
@@ -97,6 +128,11 @@ const Table = ({
if (setSortBy) setSortBy(sortBy);
}, [sortBy, setSortBy]);
+ useEffect(() => {
+ if (selectRows) selectRows(selectedFlatRows.map((row) => row.original.mapIndex));
+ // eslint-disable-next-line react-hooks/exhaustive-deps
+ }, [selectedRowIds, selectRows]);
+
return (
<>
<ChakraTable {...getTableProps()}>
diff --git a/airflow/www/static/js/tree/api/useClearTask.js b/airflow/www/static/js/tree/api/useClearTask.js
index bcf99bb250..2ea3eee486 100644
--- a/airflow/www/static/js/tree/api/useClearTask.js
+++ b/airflow/www/static/js/tree/api/useClearTask.js
@@ -36,7 +36,7 @@ export default function useClearTask({
return useMutation(
['clearTask', dagId, runId, taskId],
({
- past, future, upstream, downstream, recursive, failed, confirmed,
+ past, future, upstream, downstream, recursive, failed, confirmed, mapIndexes = [],
}) => {
const params = new URLSearchParams({
csrf_token: csrfToken,
@@ -51,9 +51,13 @@ export default function useClearTask({
downstream,
recursive,
only_failed: failed,
- }).toString();
+ });
+
+ mapIndexes.forEach((mi) => {
+ params.append('map_index', mi);
+ });
- return axios.post(clearUrl, params, {
+ return axios.post(clearUrl, params.toString(), {
headers: {
'Content-Type': 'application/x-www-form-urlencoded',
},
@@ -71,6 +75,7 @@ export default function useClearTask({
}
if (!status || status !== 'error') {
queryClient.invalidateQueries('treeData');
+ queryClient.invalidateQueries('mappedInstances', dagId, runId, taskId);
startRefresh();
}
},
diff --git a/airflow/www/static/js/tree/api/useConfirmMarkTask.js b/airflow/www/static/js/tree/api/useConfirmMarkTask.js
index 1450a15d3d..d1f8eef9d3 100644
--- a/airflow/www/static/js/tree/api/useConfirmMarkTask.js
+++ b/airflow/www/static/js/tree/api/useConfirmMarkTask.js
@@ -29,9 +29,9 @@ export default function useConfirmMarkTask({
return useMutation(
['confirmStateChange', dagId, runId, taskId, state],
({
- past, future, upstream, downstream,
- }) => axios.get(confirmUrl, {
- params: {
+ past, future, upstream, downstream, mapIndexes = [],
+ }) => {
+ const params = new URLSearchParams({
dag_id: dagId,
dag_run_id: runId,
task_id: taskId,
@@ -40,7 +40,13 @@ export default function useConfirmMarkTask({
upstream,
downstream,
state,
- },
- }),
+ });
+
+ mapIndexes.forEach((mi) => {
+ params.append('map_index', mi);
+ });
+
+ return axios.get(confirmUrl, { params });
+ },
);
}
diff --git a/airflow/www/static/js/tree/api/useMarkFailedTask.js b/airflow/www/static/js/tree/api/useMarkFailedTask.js
index f2fd28bdb2..a94ab22d0c 100644
--- a/airflow/www/static/js/tree/api/useMarkFailedTask.js
+++ b/airflow/www/static/js/tree/api/useMarkFailedTask.js
@@ -33,7 +33,7 @@ export default function useMarkFailedTask({
return useMutation(
['markFailed', dagId, runId, taskId],
({
- past, future, upstream, downstream,
+ past, future, upstream, downstream, mapIndexes = [],
}) => {
const params = new URLSearchParams({
csrf_token: csrfToken,
@@ -45,9 +45,13 @@ export default function useMarkFailedTask({
future,
upstream,
downstream,
- }).toString();
+ });
+
+ mapIndexes.forEach((mi) => {
+ params.append('map_index', mi);
+ });
- return axios.post(failedUrl, params, {
+ return axios.post(failedUrl, params.toString(), {
headers: {
'Content-Type': 'application/x-www-form-urlencoded',
},
@@ -56,6 +60,7 @@ export default function useMarkFailedTask({
{
onSuccess: () => {
queryClient.invalidateQueries('treeData');
+ queryClient.invalidateQueries('mappedInstances', dagId, runId, taskId);
startRefresh();
},
},
diff --git a/airflow/www/static/js/tree/api/useMarkSuccessTask.js b/airflow/www/static/js/tree/api/useMarkSuccessTask.js
index 92ba539de6..47fda2f0f8 100644
--- a/airflow/www/static/js/tree/api/useMarkSuccessTask.js
+++ b/airflow/www/static/js/tree/api/useMarkSuccessTask.js
@@ -33,7 +33,7 @@ export default function useMarkSuccessTask({
return useMutation(
['markSuccess', dagId, runId, taskId],
({
- past, future, upstream, downstream,
+ past, future, upstream, downstream, mapIndexes = [],
}) => {
const params = new URLSearchParams({
csrf_token: csrfToken,
@@ -45,9 +45,14 @@ export default function useMarkSuccessTask({
future,
upstream,
downstream,
- }).toString();
+ map_indexes: mapIndexes,
+ });
+
+ mapIndexes.forEach((mi) => {
+ params.append('map_index', mi);
+ });
- return axios.post(successUrl, params, {
+ return axios.post(successUrl, params.toString(), {
headers: {
'Content-Type': 'application/x-www-form-urlencoded',
},
@@ -56,6 +61,7 @@ export default function useMarkSuccessTask({
{
onSuccess: () => {
queryClient.invalidateQueries('treeData');
+ queryClient.invalidateQueries('mappedInstances', dagId, runId, taskId);
startRefresh();
},
},
diff --git a/airflow/www/static/js/tree/api/useRunTask.js b/airflow/www/static/js/tree/api/useRunTask.js
index 44a9e14bf4..9e45c42f59 100644
--- a/airflow/www/static/js/tree/api/useRunTask.js
+++ b/airflow/www/static/js/tree/api/useRunTask.js
@@ -32,32 +32,34 @@ export default function useRunTask(dagId, runId, taskId) {
const { startRefresh } = useAutoRefresh();
return useMutation(
['runTask', dagId, runId, taskId],
- ({
+ async ({
ignoreAllDeps,
ignoreTaskState,
ignoreTaskDeps,
- mapIndex = -1,
- }) => {
- const params = new URLSearchParams({
- csrf_token: csrfToken,
- dag_id: dagId,
- dag_run_id: runId,
- task_id: taskId,
- ignore_all_deps: ignoreAllDeps,
- ignore_task_deps: ignoreTaskDeps,
- ignore_ti_state: ignoreTaskState,
- map_index: mapIndex,
- }).toString();
+ mapIndexes,
+ }) => Promise.all(
+ (mapIndexes.length ? mapIndexes : [-1]).map((mi) => {
+ const params = new URLSearchParams({
+ csrf_token: csrfToken,
+ dag_id: dagId,
+ dag_run_id: runId,
+ task_id: taskId,
+ ignore_all_deps: ignoreAllDeps,
+ ignore_task_deps: ignoreTaskDeps,
+ ignore_ti_state: ignoreTaskState,
+ map_index: mi,
+ }).toString();
- return axios.post(runUrl, params, {
- headers: {
- 'Content-Type': 'application/x-www-form-urlencoded',
- },
- });
- },
+ return axios.post(runUrl, params, {
+ headers: {
+ 'Content-Type': 'application/x-www-form-urlencoded',
+ },
+ });
+ }),
+ ),
{
onSuccess: (data) => {
- const { message, status } = data;
+ const { message, status } = data.length ? data[0] : data;
if (message && status === 'error') {
toast({
description: message,
@@ -67,6 +69,7 @@ export default function useRunTask(dagId, runId, taskId) {
}
if (!status || status !== 'error') {
queryClient.invalidateQueries('treeData');
+ queryClient.invalidateQueries('mappedInstances', dagId, runId, taskId);
startRefresh();
}
},
diff --git a/airflow/www/static/js/tree/details/content/taskInstance/MappedInstances.jsx b/airflow/www/static/js/tree/details/content/taskInstance/MappedInstances.jsx
index 42bbdca66f..77c0713ab3 100644
--- a/airflow/www/static/js/tree/details/content/taskInstance/MappedInstances.jsx
+++ b/airflow/www/static/js/tree/details/content/taskInstance/MappedInstances.jsx
@@ -46,7 +46,7 @@ const IconLink = (props) => (
);
const MappedInstances = ({
- dagId, runId, taskId,
+ dagId, runId, taskId, selectRows,
}) => {
const limit = 25;
const [offset, setOffset] = useState(0);
@@ -147,6 +147,7 @@ const MappedInstances = ({
pageSize={limit}
setSortBy={setSortBy}
isLoading={isLoading}
+ selectRows={selectRows}
/>
</Box>
);
diff --git a/airflow/www/static/js/tree/details/content/taskInstance/index.jsx b/airflow/www/static/js/tree/details/content/taskInstance/index.jsx
index d8b71cb128..0e4f441e24 100644
--- a/airflow/www/static/js/tree/details/content/taskInstance/index.jsx
+++ b/airflow/www/static/js/tree/details/content/taskInstance/index.jsx
@@ -17,12 +17,13 @@
* under the License.
*/
-import React from 'react';
+import React, { useState } from 'react';
import {
Box,
VStack,
Divider,
StackDivider,
+ Text,
} from '@chakra-ui/react';
import RunAction from './taskActions/Run';
@@ -54,6 +55,7 @@ const getTask = ({ taskId, runId, task }) => {
};
const TaskInstance = ({ taskId, runId }) => {
+ const [selectedRows, setSelectedRows] = useState([]);
const { data: { groups = {}, dagRuns = [] } } = useTreeData();
const group = getTask({ taskId, runId, task: groups });
const run = dagRuns.find((r) => r.runId === runId);
@@ -68,6 +70,11 @@ const TaskInstance = ({ taskId, runId }) => {
const instance = group.instances.find((ti) => ti.runId === runId);
+ let taskActionsTitle = 'Task Actions';
+ if (isMapped) {
+ taskActionsTitle += ` for ${selectedRows.length || 'all'} mapped task${selectedRows.length !== 1 ? 's' : ''}`;
+ }
+
return (
<Box py="4px">
{!isGroup && (
@@ -80,27 +87,33 @@ const TaskInstance = ({ taskId, runId }) => {
)}
{!isGroup && (
<Box my={3}>
+ <Text as="strong">{taskActionsTitle}</Text>
+ <Divider my={2} />
<VStack justifyContent="center" divider={<StackDivider my={3} />}>
<RunAction
runId={runId}
taskId={taskId}
dagId={dagId}
+ mapIndexes={selectedRows}
/>
<ClearAction
runId={runId}
taskId={taskId}
dagId={dagId}
executionDate={executionDate}
+ mapIndexes={selectedRows}
/>
<MarkFailedAction
runId={runId}
taskId={taskId}
dagId={dagId}
+ mapIndexes={selectedRows}
/>
<MarkSuccessAction
runId={runId}
taskId={taskId}
dagId={dagId}
+ mapIndexes={selectedRows}
/>
</VStack>
<Divider my={2} />
@@ -122,7 +135,12 @@ const TaskInstance = ({ taskId, runId }) => {
extraLinks={extraLinks}
/>
{isMapped && (
- <MappedInstances dagId={dagId} runId={runId} taskId={taskId} />
+ <MappedInstances
+ dagId={dagId}
+ runId={runId}
+ taskId={taskId}
+ selectRows={setSelectedRows}
+ />
)}
</Box>
);
diff --git a/airflow/www/static/js/tree/details/content/taskInstance/taskActions/Clear.jsx b/airflow/www/static/js/tree/details/content/taskInstance/taskActions/Clear.jsx
index 4196edc6b9..d825976ed2 100644
--- a/airflow/www/static/js/tree/details/content/taskInstance/taskActions/Clear.jsx
+++ b/airflow/www/static/js/tree/details/content/taskInstance/taskActions/Clear.jsx
@@ -34,6 +34,7 @@ const Run = ({
runId,
taskId,
executionDate,
+ mapIndexes,
}) => {
const [affectedTasks, setAffectedTasks] = useState([]);
@@ -73,6 +74,7 @@ const Run = ({
recursive,
failed,
confirmed: false,
+ mapIndexes,
});
setAffectedTasks(data);
onOpen();
@@ -91,6 +93,7 @@ const Run = ({
recursive,
failed,
confirmed: true,
+ mapIndexes,
});
setAffectedTasks([]);
onClose();
diff --git a/airflow/www/static/js/tree/details/content/taskInstance/taskActions/MarkFailed.jsx b/airflow/www/static/js/tree/details/content/taskInstance/taskActions/MarkFailed.jsx
index fe277c9eef..12f8bcfeef 100644
--- a/airflow/www/static/js/tree/details/content/taskInstance/taskActions/MarkFailed.jsx
+++ b/airflow/www/static/js/tree/details/content/taskInstance/taskActions/MarkFailed.jsx
@@ -33,6 +33,7 @@ const MarkFailed = ({
dagId,
runId,
taskId,
+ mapIndexes,
}) => {
const [affectedTasks, setAffectedTasks] = useState([]);
@@ -68,6 +69,7 @@ const MarkFailed = ({
future,
upstream,
downstream,
+ mapIndexes,
});
setAffectedTasks(data);
onOpen();
@@ -83,6 +85,7 @@ const MarkFailed = ({
future,
upstream,
downstream,
+ mapIndexes,
});
setAffectedTasks([]);
onClose();
diff --git a/airflow/www/static/js/tree/details/content/taskInstance/taskActions/MarkSuccess.jsx b/airflow/www/static/js/tree/details/content/taskInstance/taskActions/MarkSuccess.jsx
index 06bc80c756..bdf59e6a4d 100644
--- a/airflow/www/static/js/tree/details/content/taskInstance/taskActions/MarkSuccess.jsx
+++ b/airflow/www/static/js/tree/details/content/taskInstance/taskActions/MarkSuccess.jsx
@@ -29,8 +29,8 @@ import ConfirmDialog from '../../ConfirmDialog';
import ActionButton from './ActionButton';
import { useMarkSuccessTask, useConfirmMarkTask } from '../../../../api';
-const Run = ({
- dagId, runId, taskId,
+const MarkSuccess = ({
+ dagId, runId, taskId, mapIndexes,
}) => {
const [affectedTasks, setAffectedTasks] = useState([]);
@@ -64,6 +64,7 @@ const Run = ({
future,
upstream,
downstream,
+ mapIndexes,
});
setAffectedTasks(data);
onOpen();
@@ -79,6 +80,7 @@ const Run = ({
future,
upstream,
downstream,
+ mapIndexes,
});
setAffectedTasks([]);
onClose();
@@ -109,4 +111,4 @@ const Run = ({
);
};
-export default Run;
+export default MarkSuccess;
diff --git a/airflow/www/static/js/tree/details/content/taskInstance/taskActions/Run.jsx b/airflow/www/static/js/tree/details/content/taskInstance/taskActions/Run.jsx
index 204cec44c2..85d502aeed 100644
--- a/airflow/www/static/js/tree/details/content/taskInstance/taskActions/Run.jsx
+++ b/airflow/www/static/js/tree/details/content/taskInstance/taskActions/Run.jsx
@@ -30,6 +30,7 @@ const Run = ({
dagId,
runId,
taskId,
+ mapIndexes,
}) => {
const [ignoreAllDeps, setIgnoreAllDeps] = useState(false);
const onToggleAllDeps = () => setIgnoreAllDeps(!ignoreAllDeps);
@@ -47,6 +48,7 @@ const Run = ({
ignoreAllDeps,
ignoreTaskState,
ignoreTaskDeps,
+ mapIndexes,
});
};
diff --git a/airflow/www/templates/airflow/dag.html b/airflow/www/templates/airflow/dag.html
index 1dafcb27f7..e7afde8af9 100644
--- a/airflow/www/templates/airflow/dag.html
+++ b/airflow/www/templates/airflow/dag.html
@@ -310,6 +310,7 @@
<input type="hidden" name="dag_id" value="{{ dag.dag_id }}">
<input type="hidden" name="task_id">
<input type="hidden" name="execution_date">
+ <input type="hidden" name="map_index">
<input type="hidden" name="origin" value="{{ request.base_url }}">
<div class="row">
<span class="btn-group col-xs-12 col-sm-9 task-instance-modal-column" data-toggle="buttons">
@@ -351,6 +352,7 @@
<input type="hidden" name="dag_id" value="{{ dag.dag_id }}">
<input type="hidden" name="task_id">
<input type="hidden" name="dag_run_id">
+ <input type="hidden" name="map_index">
<input type="hidden" name="origin" value="{{ request.base_url }}">
<input type="hidden" name="state" value="failed">
<div class="row">
@@ -384,6 +386,7 @@
<input type="hidden" name="dag_id" value="{{ dag.dag_id }}">
<input type="hidden" name="task_id">
<input type="hidden" name="dag_run_id">
+ <input type="hidden" name="map_index">
<input type="hidden" name="origin" value="{{ request.base_url }}">
<input type="hidden" name="state" value="success">
<div class="row">
diff --git a/airflow/www/utils.py b/airflow/www/utils.py
index 8a5cc3c717..0c452dea38 100644
--- a/airflow/www/utils.py
+++ b/airflow/www/utils.py
@@ -64,6 +64,7 @@ def get_mapped_instances(task_instance, session):
TaskInstance.run_id == task_instance.run_id,
TaskInstance.task_id == task_instance.task_id,
)
+ .order_by(TaskInstance.map_index)
.all()
)