You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bb...@apache.org on 2022/04/20 14:31:31 UTC

[airflow] branch mapped-instance-actions updated (cae2962fb6 -> 009eb4d4fe)

This is an automated email from the ASF dual-hosted git repository.

bbovenzi pushed a change to branch mapped-instance-actions
in repository https://gitbox.apache.org/repos/asf/airflow.git


 discard cae2962fb6 Chain map_index params
 discard 51ae59ca02 Fix gantt/graph modal
 discard 5bee03ac9e Readd mapped instance table selection
 discard 1179cb3e73 Allow bulk mapped task actions
 discard 0a5260911a Introduce tuple_().in_() shim for MSSQL compat
 discard 9d073dfd96 Accept multiple map_index param from front end
 discard 45cb2d6a90 Refactor to straighten up types
 discard 1f8d0c3249 Merge branch 'main' into clearing-and-marking-mapped-task
 discard 98abe5fcb8 fixup! Apply suggestions from code review
 discard 0b4c4857da Apply suggestions from code review
 discard 3dd0ef3de6 fixup! fixup! fixup! fixup! fixup! fixup! Allow marking/clearing mapped taskinstances from the UI
 discard c931816645 fixup! fixup! fixup! fixup! fixup! Allow marking/clearing mapped taskinstances from the UI
 discard bb8c9523a3 fixup! fixup! fixup! fixup! Allow marking/clearing mapped taskinstances from the UI
 discard bdfe553c4f add tests
 discard 826c650dbe fixup! fixup! fixup! Allow marking/clearing mapped taskinstances from the UI
 discard 5fc16f2a0c fixup! fixup! Allow marking/clearing mapped taskinstances from the UI
 discard 729f9c3f43 fixup! Allow marking/clearing mapped taskinstances from the UI
 discard 6cd1437987 Allow marking/clearing mapped taskinstances from the UI
     add cc3503e368 Fix Grid autoscroll with ResizeObserver (#23022)
     add 7d97ee5b3a Default side panel open vs closed (#23039)
     add 10c9cb5318 Revert disabling run task button (#23038)
     add b24650c0cc Show map_index in states-for-dag-run (#23030)
     add 5164cdbe98 Make presto and trino compatible with airflow 2.1 (#23061)
     add 5144bedcee Add SnowSQL installation script to Breeze (#23065)
     add c3d883a971 KubernetesPodOperator should patch "already checked" always (#22734)
     add 5fca11ef85 Improve speed of `dag.partial_subset` by not deep-copying TaskGroup (#23088)
     add 918bd33993 Protect against using try_number from context in provider (#23069)
     add 831e84d9c3 Correct default conn ID in WASB connection doc (#23057)
     add b8bbfd4b31 Add migration to update DAG default_view (#23091)
     add 197cff3194 Ensure TaskMap only checks "relevant" dependencies (#23053)
     add 647c155893 fix link to dbt docs by removing extra h (#23086)
     add f63c5afae5 Fix typo in scheduler_job.py (#23095)
     add 4874068d99 Improve Graph view task actions for Dynamic Tasks (#23064)
     add c897ccd6ed Fix artifact for MyPy checks (#23094)
     add f7cd5ca8bb Fix moto/pyparsing issue. (#23096)
     add abaef54d54 Allow offline upgrade with no options (#23093)
     add 5b76552e6d Make Grid and Graph buttons consistent. (#23097)
     add f471d4f54d Add server default for map_index in Log table (#23056)
     add 0139c14212 Fix MyPy errors in dev folder (#23100)
     add 1f03977735 Allow marking/clearing mapped taskinstances from the UI
     add 7837df0994 fixup! Allow marking/clearing mapped taskinstances from the UI
     add 6f16553973 fixup! fixup! Allow marking/clearing mapped taskinstances from the UI
     add 59b1769171 fixup! fixup! fixup! Allow marking/clearing mapped taskinstances from the UI
     add 6ce2589991 add tests
     add df4cfb4524 fixup! fixup! fixup! fixup! Allow marking/clearing mapped taskinstances from the UI
     add 4186bb220d fixup! fixup! fixup! fixup! fixup! Allow marking/clearing mapped taskinstances from the UI
     add e65e5fcf7f fixup! fixup! fixup! fixup! fixup! fixup! Allow marking/clearing mapped taskinstances from the UI
     add 1ccc3f6131 Apply suggestions from code review
     add 1c0741234e fixup! Apply suggestions from code review
     add e1cdc961b3 Refactor to straighten up types
     new 4ffae5cf1f Accept multiple map_index param from front end
     new 898480765f Introduce tuple_().in_() shim for MSSQL compat
     new 6c7f1dfb16 fixup! Accept multiple map_index param from front end
     new a890173f5a fixup! Introduce tuple_().in_() shim for MSSQL compat
     new c633ca8895 Allow bulk mapped task actions
     new 4ffa59eaf4 Readd mapped instance table selection
     new 4410c01192 Fix gantt/graph modal
     new 7fcedbd7c7 Chain map_index params
     new 009eb4d4fe Get gantt/graph modal actions working again

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (cae2962fb6)
            \
             N -- N -- N   refs/heads/mapped-instance-actions (009eb4d4fe)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 9 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .pre-commit-config.yaml                            |  16 ++-
 BREEZE.rst                                         |   1 +
 TESTING.rst                                        |   1 +
 airflow/cli/commands/task_command.py               |  18 ++--
 airflow/jobs/backfill_job.py                       |   2 +-
 airflow/jobs/scheduler_job.py                      |   2 +-
 ... => 0108_b1b348e02d07_default_dag_view_grid.py} |  36 ++++---
 airflow/models/dag.py                              |  16 +--
 airflow/models/log.py                              |   4 +-
 airflow/models/mappedoperator.py                   |   9 ++
 airflow/models/taskinstance.py                     |   2 +-
 airflow/models/taskmixin.py                        |  23 +++--
 airflow/models/xcom_arg.py                         |  30 ++++--
 .../cncf/kubernetes/operators/kubernetes_pod.py    |   6 +-
 airflow/providers/presto/hooks/presto.py           |   5 +-
 airflow/providers/trino/hooks/trino.py             |   5 +-
 airflow/sensors/base.py                            |   3 +-
 airflow/utils/db.py                                |   4 +
 airflow/utils/sqlalchemy.py                        |   7 +-
 airflow/www/static/js/dag.js                       | 113 +++++++++++----------
 airflow/www/static/js/graph.js                     |   6 +-
 airflow/www/static/js/tree/Tree.jsx                |  41 ++++----
 airflow/www/static/js/tree/dagRuns/index.jsx       |   4 +-
 .../js/tree/details/content/taskInstance/Nav.jsx   |   2 +-
 .../content/taskInstance/taskActions/Run.jsx       |  21 +---
 airflow/www/static/js/tree/renderTaskRows.jsx      |   3 +-
 airflow/www/templates/airflow/dag.html             |  45 ++++----
 airflow/www/utils.py                               |   1 +
 airflow/www/views.py                               |  44 ++++----
 dev/breeze/src/airflow_breeze/breeze.py            |  28 ++---
 .../build_image/ci/build_ci_image.py               |   2 +-
 .../build_image/ci/build_ci_params.py              |  14 +--
 .../build_image/prod/build_prod_image.py           |   2 +-
 .../build_image/prod/build_prod_params.py          |  16 +--
 .../src/airflow_breeze/shell/shell_params.py       |   2 +-
 .../airflow_breeze/utils/docker_command_utils.py   |   6 +-
 dev/breeze/src/airflow_breeze/utils/path_utils.py  |   2 +-
 dev/breeze/src/airflow_breeze/utils/run_utils.py   |   4 +-
 dev/provider_packages/prepare_provider_packages.py |   3 +-
 .../connections.rst                                |   2 +-
 .../connections/wasb.rst                           |   2 +-
 docs/apache-airflow/migrations-ref.rst             |   4 +-
 kubernetes_tests/test_kubernetes_pod_operator.py   |  52 +++++++---
 .../test_kubernetes_pod_operator_backcompat.py     |  11 +-
 scripts/ci/docker-compose/forward-credentials.yml  |   1 +
 .../pre_commit_check_2_1_compatibility.py          |  10 ++
 scripts/in_container/bin/install_snowsql.sh        |  85 ++++++++++++++++
 setup.py                                           |   2 +-
 tests/models/test_taskinstance.py                  |  51 +++++++++-
 .../kubernetes/operators/test_kubernetes_pod.py    |  81 +++++++--------
 tests/utils/test_db.py                             |  11 ++
 51 files changed, 553 insertions(+), 308 deletions(-)
 copy airflow/migrations/versions/{0101_a3bcd0914482_add_data_compressed_to_serialized_dag.py => 0108_b1b348e02d07_default_dag_view_grid.py} (58%)
 create mode 100755 scripts/in_container/bin/install_snowsql.sh


[airflow] 09/09: Get gantt/graph modal actions working again

Posted by bb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bbovenzi pushed a commit to branch mapped-instance-actions
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 009eb4d4fe6f20459ea8cc355b71326e6fd6f8a4
Author: Brent Bovenzi <br...@gmail.com>
AuthorDate: Wed Apr 20 10:29:06 2022 -0400

    Get gantt/graph modal actions working again
---
 airflow/www/static/js/dag.js           | 9 ++++-----
 airflow/www/templates/airflow/dag.html | 6 +++---
 airflow/www/utils.py                   | 1 +
 3 files changed, 8 insertions(+), 8 deletions(-)

diff --git a/airflow/www/static/js/dag.js b/airflow/www/static/js/dag.js
index ded26baeab..0445e0686c 100644
--- a/airflow/www/static/js/dag.js
+++ b/airflow/www/static/js/dag.js
@@ -347,11 +347,10 @@ $('form[data-action]').on('submit', function submit(e) {
     if (form.task_id) {
       form.task_id.value = taskId;
     }
-    if (form.map_index) {
-      form.map_index.value = mapIndex === undefined ? '' : mapIndex;
-    }
-    if (form.map_indexes) {
-      form.map_indexes.value = mapIndex === undefined ? '' : mapIndex;
+    if (form.map_index && mapIndex >= 0) {
+      form.map_index.value = mapIndex;
+    } else if (form.map_index) {
+      form.map_index.remove();
     }
     form.action = $(this).data('action');
     form.submit();
diff --git a/airflow/www/templates/airflow/dag.html b/airflow/www/templates/airflow/dag.html
index 8259c7045c..e7afde8af9 100644
--- a/airflow/www/templates/airflow/dag.html
+++ b/airflow/www/templates/airflow/dag.html
@@ -310,7 +310,7 @@
             <input type="hidden" name="dag_id" value="{{ dag.dag_id }}">
             <input type="hidden" name="task_id">
             <input type="hidden" name="execution_date">
-            <input type="hidden" name="map_indexes">
+            <input type="hidden" name="map_index">
             <input type="hidden" name="origin" value="{{ request.base_url }}">
             <div class="row">
               <span class="btn-group col-xs-12 col-sm-9 task-instance-modal-column" data-toggle="buttons">
@@ -352,7 +352,7 @@
             <input type="hidden" name="dag_id" value="{{ dag.dag_id }}">
             <input type="hidden" name="task_id">
             <input type="hidden" name="dag_run_id">
-            <input type="hidden" name="map_indexes">
+            <input type="hidden" name="map_index">
             <input type="hidden" name="origin" value="{{ request.base_url }}">
             <input type="hidden" name="state" value="failed">
             <div class="row">
@@ -386,7 +386,7 @@
             <input type="hidden" name="dag_id" value="{{ dag.dag_id }}">
             <input type="hidden" name="task_id">
             <input type="hidden" name="dag_run_id">
-            <input type="hidden" name="map_indexes">
+            <input type="hidden" name="map_index">
             <input type="hidden" name="origin" value="{{ request.base_url }}">
             <input type="hidden" name="state" value="success">
             <div class="row">
diff --git a/airflow/www/utils.py b/airflow/www/utils.py
index 8a5cc3c717..0c452dea38 100644
--- a/airflow/www/utils.py
+++ b/airflow/www/utils.py
@@ -64,6 +64,7 @@ def get_mapped_instances(task_instance, session):
             TaskInstance.run_id == task_instance.run_id,
             TaskInstance.task_id == task_instance.task_id,
         )
+        .order_by(TaskInstance.map_index)
         .all()
     )
 


[airflow] 04/09: fixup! Introduce tuple_().in_() shim for MSSQL compat

Posted by bb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bbovenzi pushed a commit to branch mapped-instance-actions
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit a890173f5aee5b247248b6198c9aa80a78c9203e
Author: Tzu-ping Chung <tp...@astronomer.io>
AuthorDate: Wed Apr 20 10:37:53 2022 +0800

    fixup! Introduce tuple_().in_() shim for MSSQL compat
---
 airflow/utils/sqlalchemy.py | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --git a/airflow/utils/sqlalchemy.py b/airflow/utils/sqlalchemy.py
index 5c36d826b2..2838a30d60 100644
--- a/airflow/utils/sqlalchemy.py
+++ b/airflow/utils/sqlalchemy.py
@@ -24,7 +24,7 @@ from typing import Any, Dict, Iterable, Tuple
 
 import pendulum
 from dateutil import relativedelta
-from sqlalchemy import event, nullsfirst, tuple_
+from sqlalchemy import event, false, nullsfirst, tuple_
 from sqlalchemy.exc import OperationalError
 from sqlalchemy.orm.session import Session
 from sqlalchemy.sql import ColumnElement
@@ -339,4 +339,7 @@ def tuple_in_condition(
     """
     if settings.engine.dialect.name != "mssql":
         return tuple_(*columns).in_(collection)
-    return or_(*(and_(*(c == v for c, v in zip(columns, values))) for values in collection))
+    clauses = [and_(*(c == v for c, v in zip(columns, values))) for values in collection]
+    if not clauses:
+        return false()
+    return or_(*clauses)


[airflow] 05/09: Allow bulk mapped task actions

Posted by bb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bbovenzi pushed a commit to branch mapped-instance-actions
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit c633ca889595dee07d8a07defcf4a30d0bac4c82
Author: Brent Bovenzi <br...@gmail.com>
AuthorDate: Sat Apr 9 14:50:23 2022 -0400

    Allow bulk mapped task actions
---
 airflow/www/static/js/tree/Table.jsx               | 44 ++++++++++++++++++++--
 .../content/taskInstance/MappedInstances.jsx       |  3 +-
 .../js/tree/details/content/taskInstance/index.jsx | 25 +++++++++++-
 .../content/taskInstance/taskActions/Clear.jsx     |  2 +
 .../taskInstance/taskActions/MarkFailed.jsx        |  3 +-
 .../taskInstance/taskActions/MarkSuccess.jsx       |  4 +-
 .../content/taskInstance/taskActions/Run.jsx       | 22 ++++++++---
 7 files changed, 88 insertions(+), 15 deletions(-)

diff --git a/airflow/www/static/js/tree/Table.jsx b/airflow/www/static/js/tree/Table.jsx
index aef91ce905..152f647ea3 100644
--- a/airflow/www/static/js/tree/Table.jsx
+++ b/airflow/www/static/js/tree/Table.jsx
@@ -21,7 +21,7 @@
  * Custom wrapper of react-table using Chakra UI components
 */
 
-import React, { useEffect } from 'react';
+import React, { useEffect, useRef, forwardRef } from 'react';
 import {
   Flex,
   Table as ChakraTable,
@@ -33,9 +33,10 @@ import {
   IconButton,
   Text,
   useColorModeValue,
+  Checkbox,
 } from '@chakra-ui/react';
 import {
-  useTable, useSortBy, usePagination,
+  useTable, useSortBy, usePagination, useRowSelect,
 } from 'react-table';
 import {
   MdKeyboardArrowLeft, MdKeyboardArrowRight,
@@ -44,8 +45,23 @@ import {
   TiArrowUnsorted, TiArrowSortedDown, TiArrowSortedUp,
 } from 'react-icons/ti';
 
+const IndeterminateCheckbox = forwardRef(
+  ({ indeterminate, ...rest }, ref) => {
+    const defaultRef = useRef();
+    const resolvedRef = ref || defaultRef;
+
+    useEffect(() => {
+      resolvedRef.current.indeterminate = indeterminate;
+    }, [resolvedRef, indeterminate]);
+
+    return (
+      <Checkbox ref={resolvedRef} {...rest} />
+    );
+  },
+);
+
 const Table = ({
-  data, columns, manualPagination, pageSize = 25, setSortBy, isLoading = false,
+  data, columns, manualPagination, pageSize = 25, setSortBy, isLoading = false, selectRows,
 }) => {
   const { totalEntries, offset, setOffset } = manualPagination || {};
   const oddColor = useColorModeValue('gray.50', 'gray.900');
@@ -66,7 +82,8 @@ const Table = ({
     canNextPage,
     nextPage,
     previousPage,
-    state: { pageIndex, sortBy },
+    selectedFlatRows,
+    state: { pageIndex, sortBy, selectedRowIds },
   } = useTable(
     {
       columns,
@@ -81,6 +98,20 @@ const Table = ({
     },
     useSortBy,
     usePagination,
+    useRowSelect,
+    (hooks) => {
+      hooks.visibleColumns.push((cols) => [
+        {
+          id: 'selection',
+          Cell: ({ row }) => (
+            <div>
+              <IndeterminateCheckbox {...row.getToggleRowSelectedProps()} />
+            </div>
+          ),
+        },
+        ...cols,
+      ]);
+    },
   );
 
   const handleNext = () => {
@@ -97,6 +128,11 @@ const Table = ({
     if (setSortBy) setSortBy(sortBy);
   }, [sortBy, setSortBy]);
 
+  useEffect(() => {
+    if (selectRows) selectRows(selectedFlatRows.map((row) => row.original.mapIndex));
+  // eslint-disable-next-line react-hooks/exhaustive-deps
+  }, [selectedRowIds, selectRows]);
+
   return (
     <>
       <ChakraTable {...getTableProps()}>
diff --git a/airflow/www/static/js/tree/details/content/taskInstance/MappedInstances.jsx b/airflow/www/static/js/tree/details/content/taskInstance/MappedInstances.jsx
index 42bbdca66f..77c0713ab3 100644
--- a/airflow/www/static/js/tree/details/content/taskInstance/MappedInstances.jsx
+++ b/airflow/www/static/js/tree/details/content/taskInstance/MappedInstances.jsx
@@ -46,7 +46,7 @@ const IconLink = (props) => (
 );
 
 const MappedInstances = ({
-  dagId, runId, taskId,
+  dagId, runId, taskId, selectRows,
 }) => {
   const limit = 25;
   const [offset, setOffset] = useState(0);
@@ -147,6 +147,7 @@ const MappedInstances = ({
         pageSize={limit}
         setSortBy={setSortBy}
         isLoading={isLoading}
+        selectRows={selectRows}
       />
     </Box>
   );
diff --git a/airflow/www/static/js/tree/details/content/taskInstance/index.jsx b/airflow/www/static/js/tree/details/content/taskInstance/index.jsx
index d8b71cb128..62ffee156a 100644
--- a/airflow/www/static/js/tree/details/content/taskInstance/index.jsx
+++ b/airflow/www/static/js/tree/details/content/taskInstance/index.jsx
@@ -17,12 +17,14 @@
  * under the License.
  */
 
-import React from 'react';
+import React, { useState } from 'react';
 import {
   Box,
   VStack,
   Divider,
   StackDivider,
+  Text,
+  Flex,
 } from '@chakra-ui/react';
 
 import RunAction from './taskActions/Run';
@@ -54,6 +56,7 @@ const getTask = ({ taskId, runId, task }) => {
 };
 
 const TaskInstance = ({ taskId, runId }) => {
+  const [selectedRows, setSelectedRows] = useState([]);
   const { data: { groups = {}, dagRuns = [] } } = useTreeData();
   const group = getTask({ taskId, runId, task: groups });
   const run = dagRuns.find((r) => r.runId === runId);
@@ -68,6 +71,11 @@ const TaskInstance = ({ taskId, runId }) => {
 
   const instance = group.instances.find((ti) => ti.runId === runId);
 
+  let taskActionsTitle = 'Task Actions';
+  if (isMapped) {
+    taskActionsTitle += ` for ${selectedRows.length || 'all'} mapped task${selectedRows.length !== 1 ? 's' : ''}`;
+  }
+
   return (
     <Box py="4px">
       {!isGroup && (
@@ -80,27 +88,40 @@ const TaskInstance = ({ taskId, runId }) => {
       )}
       {!isGroup && (
         <Box my={3}>
+          <Text as="strong">{taskActionsTitle}</Text>
+          <Flex maxHeight="20px" minHeight="20px">
+            {selectedRows.length ? (
+              <Text color="red.500">
+                Clear, Mark Failed, and Mark Success do not yet work with individual mapped tasks.
+              </Text>
+            ) : <Divider my={2} />}
+          </Flex>
+          {/* visibility={selectedRows.length ? 'visible' : 'hidden'} */}
           <VStack justifyContent="center" divider={<StackDivider my={3} />}>
             <RunAction
               runId={runId}
               taskId={taskId}
               dagId={dagId}
+              selectedRows={selectedRows}
             />
             <ClearAction
               runId={runId}
               taskId={taskId}
               dagId={dagId}
               executionDate={executionDate}
+              selectedRows={selectedRows}
             />
             <MarkFailedAction
               runId={runId}
               taskId={taskId}
               dagId={dagId}
+              selectedRows={selectedRows}
             />
             <MarkSuccessAction
               runId={runId}
               taskId={taskId}
               dagId={dagId}
+              selectedRows={selectedRows}
             />
           </VStack>
           <Divider my={2} />
@@ -122,7 +143,7 @@ const TaskInstance = ({ taskId, runId }) => {
         extraLinks={extraLinks}
       />
       {isMapped && (
-        <MappedInstances dagId={dagId} runId={runId} taskId={taskId} />
+        <MappedInstances dagId={dagId} runId={runId} taskId={taskId} selectRows={setSelectedRows} />
       )}
     </Box>
   );
diff --git a/airflow/www/static/js/tree/details/content/taskInstance/taskActions/Clear.jsx b/airflow/www/static/js/tree/details/content/taskInstance/taskActions/Clear.jsx
index 4196edc6b9..cada7b59ed 100644
--- a/airflow/www/static/js/tree/details/content/taskInstance/taskActions/Clear.jsx
+++ b/airflow/www/static/js/tree/details/content/taskInstance/taskActions/Clear.jsx
@@ -34,6 +34,7 @@ const Run = ({
   runId,
   taskId,
   executionDate,
+  selectedRows,
 }) => {
   const [affectedTasks, setAffectedTasks] = useState([]);
 
@@ -113,6 +114,7 @@ const Run = ({
         colorScheme="blue"
         onClick={onClick}
         isLoading={isLoading}
+        isDisabled={!!selectedRows.length}
         title="Clearing deletes the previous state of the task instance, allowing it to get re-triggered by the scheduler or a backfill command"
       >
         Clear
diff --git a/airflow/www/static/js/tree/details/content/taskInstance/taskActions/MarkFailed.jsx b/airflow/www/static/js/tree/details/content/taskInstance/taskActions/MarkFailed.jsx
index fe277c9eef..6bc10c066e 100644
--- a/airflow/www/static/js/tree/details/content/taskInstance/taskActions/MarkFailed.jsx
+++ b/airflow/www/static/js/tree/details/content/taskInstance/taskActions/MarkFailed.jsx
@@ -33,6 +33,7 @@ const MarkFailed = ({
   dagId,
   runId,
   taskId,
+  selectedRows,
 }) => {
   const [affectedTasks, setAffectedTasks] = useState([]);
 
@@ -99,7 +100,7 @@ const MarkFailed = ({
         <ActionButton bg={upstream && 'gray.100'} onClick={onToggleUpstream} name="Upstream" />
         <ActionButton bg={downstream && 'gray.100'} onClick={onToggleDownstream} name="Downstream" />
       </ButtonGroup>
-      <Button colorScheme="red" onClick={onClick} isLoading={isMarkLoading || isConfirmLoading}>
+      <Button colorScheme="red" onClick={onClick} isLoading={isMarkLoading || isConfirmLoading} isDisabled={!!selectedRows.length}>
         Mark Failed
       </Button>
       <ConfirmDialog
diff --git a/airflow/www/static/js/tree/details/content/taskInstance/taskActions/MarkSuccess.jsx b/airflow/www/static/js/tree/details/content/taskInstance/taskActions/MarkSuccess.jsx
index 06bc80c756..b4d2b8c047 100644
--- a/airflow/www/static/js/tree/details/content/taskInstance/taskActions/MarkSuccess.jsx
+++ b/airflow/www/static/js/tree/details/content/taskInstance/taskActions/MarkSuccess.jsx
@@ -30,7 +30,7 @@ import ActionButton from './ActionButton';
 import { useMarkSuccessTask, useConfirmMarkTask } from '../../../../api';
 
 const Run = ({
-  dagId, runId, taskId,
+  dagId, runId, taskId, selectedRows,
 }) => {
   const [affectedTasks, setAffectedTasks] = useState([]);
 
@@ -95,7 +95,7 @@ const Run = ({
         <ActionButton bg={upstream && 'gray.100'} onClick={onToggleUpstream} name="Upstream" />
         <ActionButton bg={downstream && 'gray.100'} onClick={onToggleDownstream} name="Downstream" />
       </ButtonGroup>
-      <Button colorScheme="green" onClick={onClick} isLoading={isMarkLoading || isConfirmLoading}>
+      <Button colorScheme="green" onClick={onClick} isLoading={isMarkLoading || isConfirmLoading} isDisabled={!!selectedRows.length}>
         Mark Success
       </Button>
       <ConfirmDialog
diff --git a/airflow/www/static/js/tree/details/content/taskInstance/taskActions/Run.jsx b/airflow/www/static/js/tree/details/content/taskInstance/taskActions/Run.jsx
index 204cec44c2..41c8bb9c6f 100644
--- a/airflow/www/static/js/tree/details/content/taskInstance/taskActions/Run.jsx
+++ b/airflow/www/static/js/tree/details/content/taskInstance/taskActions/Run.jsx
@@ -30,6 +30,7 @@ const Run = ({
   dagId,
   runId,
   taskId,
+  selectedRows,
 }) => {
   const [ignoreAllDeps, setIgnoreAllDeps] = useState(false);
   const onToggleAllDeps = () => setIgnoreAllDeps(!ignoreAllDeps);
@@ -43,11 +44,22 @@ const Run = ({
   const { mutate: onRun, isLoading } = useRunTask(dagId, runId, taskId);
 
   const onClick = () => {
-    onRun({
-      ignoreAllDeps,
-      ignoreTaskState,
-      ignoreTaskDeps,
-    });
+    if (selectedRows.length) {
+      selectedRows.forEach((mapIndex) => {
+        onRun({
+          ignoreAllDeps,
+          ignoreTaskState,
+          ignoreTaskDeps,
+          mapIndex,
+        });
+      });
+    } else {
+      onRun({
+        ignoreAllDeps,
+        ignoreTaskState,
+        ignoreTaskDeps,
+      });
+    }
   };
 
   return (


[airflow] 03/09: fixup! Accept multiple map_index param from front end

Posted by bb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bbovenzi pushed a commit to branch mapped-instance-actions
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 6c7f1dfb16a758466c3425a917952dd740b08779
Author: Tzu-ping Chung <tp...@astronomer.io>
AuthorDate: Wed Apr 20 10:35:00 2022 +0800

    fixup! Accept multiple map_index param from front end
---
 airflow/www/views.py | 44 ++++++++++++++++++++++++--------------------
 1 file changed, 24 insertions(+), 20 deletions(-)

diff --git a/airflow/www/views.py b/airflow/www/views.py
index aac30f64ff..ae0186e493 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -1960,7 +1960,7 @@ class Airflow(AirflowBaseView):
 
     def _clear_dag_tis(
         self,
-        dag,
+        dag: DAG,
         start_date,
         end_date,
         origin,
@@ -1995,24 +1995,19 @@ class Airflow(AirflowBaseView):
         except AirflowException as ex:
             return redirect_or_json(origin, msg=str(ex), status="error")
 
-        if not tis:
-            msg = "No task instances to clear"
-            return redirect_or_json(origin, msg, status="error")
-        elif request.headers.get('Accept') == 'application/json':
-            details = [str(t) for t in tis]
+        assert isinstance(tis, collections.abc.Iterable)
+        details = [str(t) for t in tis]
 
+        if not details:
+            return redirect_or_json(origin, "No task instances to clear", status="error")
+        elif request.headers.get('Accept') == 'application/json':
             return htmlsafe_json_dumps(details, separators=(',', ':'))
-        else:
-            details = "\n".join(str(t) for t in tis)
-
-            response = self.render_template(
-                'airflow/confirm.html',
-                endpoint=None,
-                message="Task instances you are about to clear:",
-                details=details,
-            )
-
-        return response
+        return self.render_template(
+            'airflow/confirm.html',
+            endpoint=None,
+            message="Task instances you are about to clear:",
+            details="\n".join(details),
+        )
 
     @expose('/clear', methods=['POST'])
     @auth.has_access(
@@ -2028,7 +2023,11 @@ class Airflow(AirflowBaseView):
         task_id = request.form.get('task_id')
         origin = get_safe_url(request.form.get('origin'))
         dag = current_app.dag_bag.get_dag(dag_id)
-        map_index = request.form.get('map_index')
+
+        if 'map_index' not in request.form:
+            map_indexes: Optional[List[int]] = None
+        else:
+            map_indexes = request.form.getlist('map_index', type=int)
 
         execution_date = request.form.get('execution_date')
         execution_date = timezone.parse(execution_date)
@@ -2047,7 +2046,12 @@ class Airflow(AirflowBaseView):
         )
         end_date = execution_date if not future else None
         start_date = execution_date if not past else None
-        task_ids = [(task_id, map_index)] if map_index else [task_id]
+
+        if map_indexes is None:
+            task_ids: Union[List[str], List[Tuple[str, int]]] = [task_id]
+        else:
+            task_ids = [(task_id, map_index) for map_index in map_indexes]
+
         return self._clear_dag_tis(
             dag,
             start_date,
@@ -2298,7 +2302,7 @@ class Airflow(AirflowBaseView):
         past: bool,
         state: TaskInstanceState,
     ):
-        dag = current_app.dag_bag.get_dag(dag_id)
+        dag: DAG = current_app.dag_bag.get_dag(dag_id)
 
         if not run_id:
             flash(f"Cannot mark tasks as {state}, seem that DAG {dag_id} has never run", "error")


[airflow] 08/09: Chain map_index params

Posted by bb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bbovenzi pushed a commit to branch mapped-instance-actions
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 7fcedbd7c7db48d269bfe8a1ab889e60a292f765
Author: Brent Bovenzi <br...@gmail.com>
AuthorDate: Tue Apr 19 09:57:37 2022 -0400

    Chain map_index params
---
 airflow/www/static/js/tree/api/useClearTask.js       | 9 ++++++---
 airflow/www/static/js/tree/api/useConfirmMarkTask.js | 6 +++++-
 airflow/www/static/js/tree/api/useMarkFailedTask.js  | 9 ++++++---
 airflow/www/static/js/tree/api/useMarkSuccessTask.js | 8 ++++++--
 4 files changed, 23 insertions(+), 9 deletions(-)

diff --git a/airflow/www/static/js/tree/api/useClearTask.js b/airflow/www/static/js/tree/api/useClearTask.js
index eea4b2b656..2ea3eee486 100644
--- a/airflow/www/static/js/tree/api/useClearTask.js
+++ b/airflow/www/static/js/tree/api/useClearTask.js
@@ -51,10 +51,13 @@ export default function useClearTask({
         downstream,
         recursive,
         only_failed: failed,
-        map_indexes: mapIndexes,
-      }).toString();
+      });
+
+      mapIndexes.forEach((mi) => {
+        params.append('map_index', mi);
+      });
 
-      return axios.post(clearUrl, params, {
+      return axios.post(clearUrl, params.toString(), {
         headers: {
           'Content-Type': 'application/x-www-form-urlencoded',
         },
diff --git a/airflow/www/static/js/tree/api/useConfirmMarkTask.js b/airflow/www/static/js/tree/api/useConfirmMarkTask.js
index 85b5f7df42..d1f8eef9d3 100644
--- a/airflow/www/static/js/tree/api/useConfirmMarkTask.js
+++ b/airflow/www/static/js/tree/api/useConfirmMarkTask.js
@@ -40,8 +40,12 @@ export default function useConfirmMarkTask({
         upstream,
         downstream,
         state,
-        map_indexes: mapIndexes,
       });
+
+      mapIndexes.forEach((mi) => {
+        params.append('map_index', mi);
+      });
+
       return axios.get(confirmUrl, { params });
     },
   );
diff --git a/airflow/www/static/js/tree/api/useMarkFailedTask.js b/airflow/www/static/js/tree/api/useMarkFailedTask.js
index 333fed21c2..a94ab22d0c 100644
--- a/airflow/www/static/js/tree/api/useMarkFailedTask.js
+++ b/airflow/www/static/js/tree/api/useMarkFailedTask.js
@@ -45,10 +45,13 @@ export default function useMarkFailedTask({
         future,
         upstream,
         downstream,
-        map_indexes: mapIndexes,
-      }).toString();
+      });
+
+      mapIndexes.forEach((mi) => {
+        params.append('map_index', mi);
+      });
 
-      return axios.post(failedUrl, params, {
+      return axios.post(failedUrl, params.toString(), {
         headers: {
           'Content-Type': 'application/x-www-form-urlencoded',
         },
diff --git a/airflow/www/static/js/tree/api/useMarkSuccessTask.js b/airflow/www/static/js/tree/api/useMarkSuccessTask.js
index cde919a274..47fda2f0f8 100644
--- a/airflow/www/static/js/tree/api/useMarkSuccessTask.js
+++ b/airflow/www/static/js/tree/api/useMarkSuccessTask.js
@@ -46,9 +46,13 @@ export default function useMarkSuccessTask({
         upstream,
         downstream,
         map_indexes: mapIndexes,
-      }).toString();
+      });
+
+      mapIndexes.forEach((mi) => {
+        params.append('map_index', mi);
+      });
 
-      return axios.post(successUrl, params, {
+      return axios.post(successUrl, params.toString(), {
         headers: {
           'Content-Type': 'application/x-www-form-urlencoded',
         },


[airflow] 01/09: Accept multiple map_index param from front end

Posted by bb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bbovenzi pushed a commit to branch mapped-instance-actions
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 4ffae5cf1fa8d460451c875c0940c57d91187b43
Author: Tzu-ping Chung <tp...@astronomer.io>
AuthorDate: Tue Apr 19 09:16:04 2022 +0800

    Accept multiple map_index param from front end
    
    This allows setting multiple instances of the same task to SUCCESS or
    FAILED in one request. This is translated to multiple task specifier
    tuples (task_id, map_index) when passed to set_state().
    
    Also made some drive-through improvements adding types and clean some
    formatting up.
---
 airflow/api/common/mark_tasks.py    |   4 +-
 airflow/models/dag.py               |  12 ++---
 airflow/www/views.py                | 105 ++++++++++++++++++++----------------
 tests/www/views/test_views.py       |   3 +-
 tests/www/views/test_views_tasks.py |   2 +-
 5 files changed, 71 insertions(+), 55 deletions(-)

diff --git a/airflow/api/common/mark_tasks.py b/airflow/api/common/mark_tasks.py
index 594423305c..349b935e82 100644
--- a/airflow/api/common/mark_tasks.py
+++ b/airflow/api/common/mark_tasks.py
@@ -18,7 +18,7 @@
 """Marks tasks APIs."""
 
 from datetime import datetime
-from typing import TYPE_CHECKING, Iterable, Iterator, List, NamedTuple, Optional, Tuple, Union
+from typing import TYPE_CHECKING, Collection, Iterable, Iterator, List, NamedTuple, Optional, Tuple, Union
 
 from sqlalchemy import or_, tuple_
 from sqlalchemy.orm import contains_eager
@@ -78,7 +78,7 @@ def _create_dagruns(
 @provide_session
 def set_state(
     *,
-    tasks: Union[Iterable[Operator], Iterable[Tuple[Operator, int]]],
+    tasks: Union[Collection[Operator], Collection[Tuple[Operator, int]]],
     run_id: Optional[str] = None,
     execution_date: Optional[datetime] = None,
     upstream: bool = False,
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 755505b5d0..9c93bcef13 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -1620,7 +1620,7 @@ class DAG(LoggingMixin):
         self,
         *,
         task_id: str,
-        map_index: Optional[int] = None,
+        map_indexes: Optional[Collection[int]] = None,
         execution_date: Optional[datetime] = None,
         run_id: Optional[str] = None,
         state: TaskInstanceState,
@@ -1636,8 +1636,8 @@ class DAG(LoggingMixin):
         in failed or upstream_failed state.
 
         :param task_id: Task ID of the TaskInstance
-        :param map_index: The TaskInstance map_index, if None, would set state for all mapped
-            TaskInstances of the task
+        :param map_indexes: Only set TaskInstance if its map_index matches.
+            If None (default), all mapped TaskInstances of the task are set.
         :param execution_date: Execution date of the TaskInstance
         :param run_id: The run_id of the TaskInstance
         :param state: State to set the TaskInstance to
@@ -1665,12 +1665,12 @@ class DAG(LoggingMixin):
 
         tasks_to_set_state: Union[List[Operator], List[Tuple[Operator, int]]]
         task_ids_to_exclude_from_clear: Union[Set[str], Set[Tuple[str, int]]]
-        if map_index is None:
+        if map_indexes is None:
             tasks_to_set_state = [task]
             task_ids_to_exclude_from_clear = {task_id}
         else:
-            tasks_to_set_state = [(task, map_index)]
-            task_ids_to_exclude_from_clear = {(task_id, map_index)}
+            tasks_to_set_state = [(task, map_index) for map_index in map_indexes]
+            task_ids_to_exclude_from_clear = {(task_id, map_index) for map_index in map_indexes}
 
         altered = set_state(
             tasks=tasks_to_set_state,
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 437a60cca0..aac30f64ff 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -95,6 +95,7 @@ from airflow.api.common.mark_tasks import (
     set_dag_run_state_to_failed,
     set_dag_run_state_to_queued,
     set_dag_run_state_to_success,
+    set_state,
 )
 from airflow.compat.functools import cached_property
 from airflow.configuration import AIRFLOW_CONFIG, conf
@@ -107,6 +108,7 @@ from airflow.models import DAG, Connection, DagModel, DagTag, Log, SlaMiss, Task
 from airflow.models.abstractoperator import AbstractOperator
 from airflow.models.dagcode import DagCode
 from airflow.models.dagrun import DagRun, DagRunType
+from airflow.models.operator import Operator
 from airflow.models.serialized_dag import SerializedDagModel
 from airflow.models.taskinstance import TaskInstance
 from airflow.providers_manager import ProvidersManager
@@ -2284,28 +2286,28 @@ class Airflow(AirflowBaseView):
 
     def _mark_task_instance_state(
         self,
-        dag_id,
-        task_id,
-        origin,
-        dag_run_id,
-        upstream,
-        downstream,
-        future,
-        past,
-        state,
-        map_index=None,
+        *,
+        dag_id: str,
+        run_id: str,
+        task_id: str,
+        map_indexes: Optional[List[int]],
+        origin: str,
+        upstream: bool,
+        downstream: bool,
+        future: bool,
+        past: bool,
+        state: TaskInstanceState,
     ):
         dag = current_app.dag_bag.get_dag(dag_id)
-        latest_execution_date = dag.get_latest_execution_date()
 
-        if not latest_execution_date:
-            flash(f"Cannot mark tasks as {state}, seem that dag {dag_id} has never run", "error")
+        if not run_id:
+            flash(f"Cannot mark tasks as {state}, seem that DAG {dag_id} has never run", "error")
             return redirect(origin)
 
         altered = dag.set_task_instance_state(
             task_id=task_id,
-            map_index=map_index,
-            run_id=dag_run_id,
+            map_indexes=map_indexes,
+            run_id=run_id,
             state=state,
             upstream=upstream,
             downstream=downstream,
@@ -2332,7 +2334,11 @@ class Airflow(AirflowBaseView):
         dag_run_id = args.get('dag_run_id')
         state = args.get('state')
         origin = args.get('origin')
-        map_index = args.get('map_index')
+
+        if 'map_index' not in args:
+            map_indexes: Optional[List[int]] = None
+        else:
+            map_indexes = args.getlist('map_index', type=int)
 
         upstream = to_boolean(args.get('upstream'))
         downstream = to_boolean(args.get('downstream'))
@@ -2365,9 +2371,10 @@ class Airflow(AirflowBaseView):
             msg = f"Cannot mark tasks as {state}, seem that dag {dag_id} has never run"
             return redirect_or_json(origin, msg, status='error')
 
-        from airflow.api.common.mark_tasks import set_state
-
-        tasks = [(task, map_index)] if map_index else [task]
+        if map_indexes is None:
+            tasks: Union[List[Operator], List[Tuple[Operator, int]]] = [task]
+        else:
+            tasks = [(task, map_index) for map_index in map_indexes]
 
         to_be_altered = set_state(
             tasks=tasks,
@@ -2408,26 +2415,30 @@ class Airflow(AirflowBaseView):
         args = request.form
         dag_id = args.get('dag_id')
         task_id = args.get('task_id')
-        origin = get_safe_url(args.get('origin'))
-        dag_run_id = args.get('dag_run_id')
-        map_index = args.get('map_index')
+        run_id = args.get('dag_run_id')
 
+        if 'map_index' not in args:
+            map_indexes: Optional[List[int]] = None
+        else:
+            map_indexes = args.getlist('map_index', type=int)
+
+        origin = get_safe_url(args.get('origin'))
         upstream = to_boolean(args.get('upstream'))
         downstream = to_boolean(args.get('downstream'))
         future = to_boolean(args.get('future'))
         past = to_boolean(args.get('past'))
 
         return self._mark_task_instance_state(
-            dag_id,
-            task_id,
-            origin,
-            dag_run_id,
-            upstream,
-            downstream,
-            future,
-            past,
-            State.FAILED,
-            map_index=map_index,
+            dag_id=dag_id,
+            run_id=run_id,
+            task_id=task_id,
+            map_indexes=map_indexes,
+            origin=origin,
+            upstream=upstream,
+            downstream=downstream,
+            future=future,
+            past=past,
+            state=TaskInstanceState.FAILED,
         )
 
     @expose('/success', methods=['POST'])
@@ -2443,26 +2454,30 @@ class Airflow(AirflowBaseView):
         args = request.form
         dag_id = args.get('dag_id')
         task_id = args.get('task_id')
-        origin = get_safe_url(args.get('origin'))
-        dag_run_id = args.get('dag_run_id')
-        map_index = args.get('map_index')
+        run_id = args.get('dag_run_id')
+
+        if 'map_index' not in args:
+            map_indexes: Optional[List[int]] = None
+        else:
+            map_indexes = args.getlist('map_index', type=int)
 
+        origin = get_safe_url(args.get('origin'))
         upstream = to_boolean(args.get('upstream'))
         downstream = to_boolean(args.get('downstream'))
         future = to_boolean(args.get('future'))
         past = to_boolean(args.get('past'))
 
         return self._mark_task_instance_state(
-            dag_id,
-            task_id,
-            origin,
-            dag_run_id,
-            upstream,
-            downstream,
-            future,
-            past,
-            State.SUCCESS,
-            map_index=map_index,
+            dag_id=dag_id,
+            run_id=run_id,
+            task_id=task_id,
+            map_indexes=map_indexes,
+            origin=origin,
+            upstream=upstream,
+            downstream=downstream,
+            future=future,
+            past=past,
+            state=TaskInstanceState.SUCCESS,
         )
 
     @expose('/dags/<string:dag_id>')
diff --git a/tests/www/views/test_views.py b/tests/www/views/test_views.py
index f4be0540c3..c7900d64fd 100644
--- a/tests/www/views/test_views.py
+++ b/tests/www/views/test_views.py
@@ -271,9 +271,10 @@ def test_mark_task_instance_state(test_app):
 
         view._mark_task_instance_state(
             dag_id=dag.dag_id,
+            run_id=dagrun.run_id,
             task_id=task_1.task_id,
+            map_indexes=None,
             origin="",
-            dag_run_id=dagrun.run_id,
             upstream=False,
             downstream=False,
             future=False,
diff --git a/tests/www/views/test_views_tasks.py b/tests/www/views/test_views_tasks.py
index fce94fd5e4..ebed9ab05f 100644
--- a/tests/www/views/test_views_tasks.py
+++ b/tests/www/views/test_views_tasks.py
@@ -517,7 +517,7 @@ def test_dag_never_run(admin_client, url):
     )
     clear_db_runs()
     resp = admin_client.post(url, data=form, follow_redirects=True)
-    check_content_in_response(f"Cannot mark tasks as {url}, seem that dag {dag_id} has never run", resp)
+    check_content_in_response(f"Cannot mark tasks as {url}, seem that DAG {dag_id} has never run", resp)
 
 
 class _ForceHeartbeatCeleryExecutor(CeleryExecutor):


[airflow] 07/09: Fix gantt/graph modal

Posted by bb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bbovenzi pushed a commit to branch mapped-instance-actions
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 4410c0119202dc97de631c6661117aaafb84f944
Author: Brent Bovenzi <br...@gmail.com>
AuthorDate: Tue Apr 12 16:01:31 2022 -0400

    Fix gantt/graph modal
---
 airflow/www/static/js/dag.js           | 15 +++++++--------
 airflow/www/templates/airflow/dag.html |  3 +++
 2 files changed, 10 insertions(+), 8 deletions(-)

diff --git a/airflow/www/static/js/dag.js b/airflow/www/static/js/dag.js
index ca8b7cc676..ded26baeab 100644
--- a/airflow/www/static/js/dag.js
+++ b/airflow/www/static/js/dag.js
@@ -171,14 +171,11 @@ export function callModal({
   $('#extra_links').prev('hr').hide();
   $('#extra_links').empty().hide();
   if (mi >= 0) {
-    // Marking state and clear are not yet supported for mapped instances
-    $('#success_action').hide();
-    $('#failed_action').hide();
-    $('#clear_action').hide();
+    $('#modal_map_index').show();
+    $('#modal_map_index .value').text(mi);
   } else {
-    $('#success_action').show();
-    $('#failed_action').show();
-    $('#clear_action').show();
+    $('#modal_map_index').hide();
+    $('#modal_map_index .value').text('');
   }
   if (isSubDag) {
     $('#div_btn_subdag').show();
@@ -339,7 +336,6 @@ $(document).on('click', '.map_index_item', function mapItem() {
 $('form[data-action]').on('submit', function submit(e) {
   e.preventDefault();
   const form = $(this).get(0);
-  // Somehow submit is fired twice. Only once is the executionDate/dagRunId valid
   if (dagRunId || executionDate) {
     if (form.dag_run_id) {
       form.dag_run_id.value = dagRunId;
@@ -354,6 +350,9 @@ $('form[data-action]').on('submit', function submit(e) {
     if (form.map_index) {
       form.map_index.value = mapIndex === undefined ? '' : mapIndex;
     }
+    if (form.map_indexes) {
+      form.map_indexes.value = mapIndex === undefined ? '' : mapIndex;
+    }
     form.action = $(this).data('action');
     form.submit();
   }
diff --git a/airflow/www/templates/airflow/dag.html b/airflow/www/templates/airflow/dag.html
index 1dafcb27f7..8259c7045c 100644
--- a/airflow/www/templates/airflow/dag.html
+++ b/airflow/www/templates/airflow/dag.html
@@ -310,6 +310,7 @@
             <input type="hidden" name="dag_id" value="{{ dag.dag_id }}">
             <input type="hidden" name="task_id">
             <input type="hidden" name="execution_date">
+            <input type="hidden" name="map_indexes">
             <input type="hidden" name="origin" value="{{ request.base_url }}">
             <div class="row">
               <span class="btn-group col-xs-12 col-sm-9 task-instance-modal-column" data-toggle="buttons">
@@ -351,6 +352,7 @@
             <input type="hidden" name="dag_id" value="{{ dag.dag_id }}">
             <input type="hidden" name="task_id">
             <input type="hidden" name="dag_run_id">
+            <input type="hidden" name="map_indexes">
             <input type="hidden" name="origin" value="{{ request.base_url }}">
             <input type="hidden" name="state" value="failed">
             <div class="row">
@@ -384,6 +386,7 @@
             <input type="hidden" name="dag_id" value="{{ dag.dag_id }}">
             <input type="hidden" name="task_id">
             <input type="hidden" name="dag_run_id">
+            <input type="hidden" name="map_indexes">
             <input type="hidden" name="origin" value="{{ request.base_url }}">
             <input type="hidden" name="state" value="success">
             <div class="row">


[airflow] 06/09: Readd mapped instance table selection

Posted by bb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bbovenzi pushed a commit to branch mapped-instance-actions
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 4ffa59eaf438c88b0f504ff237b05b91b4cc3c00
Author: Brent Bovenzi <br...@gmail.com>
AuthorDate: Tue Apr 12 14:51:45 2022 -0400

    Readd mapped instance table selection
---
 airflow/www/static/js/tree/Table.jsx               |  4 +-
 airflow/www/static/js/tree/api/useClearTask.js     |  4 +-
 .../www/static/js/tree/api/useConfirmMarkTask.js   | 12 +++---
 .../www/static/js/tree/api/useMarkFailedTask.js    |  4 +-
 .../www/static/js/tree/api/useMarkSuccessTask.js   |  4 +-
 airflow/www/static/js/tree/api/useRunTask.js       | 43 ++++++++++++----------
 .../js/tree/details/content/taskInstance/index.jsx | 25 ++++++-------
 .../content/taskInstance/taskActions/Clear.jsx     |  5 ++-
 .../taskInstance/taskActions/MarkFailed.jsx        |  6 ++-
 .../taskInstance/taskActions/MarkSuccess.jsx       | 10 +++--
 .../content/taskInstance/taskActions/Run.jsx       | 24 ++++--------
 11 files changed, 72 insertions(+), 69 deletions(-)

diff --git a/airflow/www/static/js/tree/Table.jsx b/airflow/www/static/js/tree/Table.jsx
index 152f647ea3..e500b08966 100644
--- a/airflow/www/static/js/tree/Table.jsx
+++ b/airflow/www/static/js/tree/Table.jsx
@@ -46,7 +46,7 @@ import {
 } from 'react-icons/ti';
 
 const IndeterminateCheckbox = forwardRef(
-  ({ indeterminate, ...rest }, ref) => {
+  ({ indeterminate, checked, ...rest }, ref) => {
     const defaultRef = useRef();
     const resolvedRef = ref || defaultRef;
 
@@ -55,7 +55,7 @@ const IndeterminateCheckbox = forwardRef(
     }, [resolvedRef, indeterminate]);
 
     return (
-      <Checkbox ref={resolvedRef} {...rest} />
+      <Checkbox ref={resolvedRef} isChecked={checked} {...rest} />
     );
   },
 );
diff --git a/airflow/www/static/js/tree/api/useClearTask.js b/airflow/www/static/js/tree/api/useClearTask.js
index bcf99bb250..eea4b2b656 100644
--- a/airflow/www/static/js/tree/api/useClearTask.js
+++ b/airflow/www/static/js/tree/api/useClearTask.js
@@ -36,7 +36,7 @@ export default function useClearTask({
   return useMutation(
     ['clearTask', dagId, runId, taskId],
     ({
-      past, future, upstream, downstream, recursive, failed, confirmed,
+      past, future, upstream, downstream, recursive, failed, confirmed, mapIndexes = [],
     }) => {
       const params = new URLSearchParams({
         csrf_token: csrfToken,
@@ -51,6 +51,7 @@ export default function useClearTask({
         downstream,
         recursive,
         only_failed: failed,
+        map_indexes: mapIndexes,
       }).toString();
 
       return axios.post(clearUrl, params, {
@@ -71,6 +72,7 @@ export default function useClearTask({
         }
         if (!status || status !== 'error') {
           queryClient.invalidateQueries('treeData');
+          queryClient.invalidateQueries('mappedInstances', dagId, runId, taskId);
           startRefresh();
         }
       },
diff --git a/airflow/www/static/js/tree/api/useConfirmMarkTask.js b/airflow/www/static/js/tree/api/useConfirmMarkTask.js
index 1450a15d3d..85b5f7df42 100644
--- a/airflow/www/static/js/tree/api/useConfirmMarkTask.js
+++ b/airflow/www/static/js/tree/api/useConfirmMarkTask.js
@@ -29,9 +29,9 @@ export default function useConfirmMarkTask({
   return useMutation(
     ['confirmStateChange', dagId, runId, taskId, state],
     ({
-      past, future, upstream, downstream,
-    }) => axios.get(confirmUrl, {
-      params: {
+      past, future, upstream, downstream, mapIndexes = [],
+    }) => {
+      const params = new URLSearchParams({
         dag_id: dagId,
         dag_run_id: runId,
         task_id: taskId,
@@ -40,7 +40,9 @@ export default function useConfirmMarkTask({
         upstream,
         downstream,
         state,
-      },
-    }),
+        map_indexes: mapIndexes,
+      });
+      return axios.get(confirmUrl, { params });
+    },
   );
 }
diff --git a/airflow/www/static/js/tree/api/useMarkFailedTask.js b/airflow/www/static/js/tree/api/useMarkFailedTask.js
index f2fd28bdb2..333fed21c2 100644
--- a/airflow/www/static/js/tree/api/useMarkFailedTask.js
+++ b/airflow/www/static/js/tree/api/useMarkFailedTask.js
@@ -33,7 +33,7 @@ export default function useMarkFailedTask({
   return useMutation(
     ['markFailed', dagId, runId, taskId],
     ({
-      past, future, upstream, downstream,
+      past, future, upstream, downstream, mapIndexes = [],
     }) => {
       const params = new URLSearchParams({
         csrf_token: csrfToken,
@@ -45,6 +45,7 @@ export default function useMarkFailedTask({
         future,
         upstream,
         downstream,
+        map_indexes: mapIndexes,
       }).toString();
 
       return axios.post(failedUrl, params, {
@@ -56,6 +57,7 @@ export default function useMarkFailedTask({
     {
       onSuccess: () => {
         queryClient.invalidateQueries('treeData');
+        queryClient.invalidateQueries('mappedInstances', dagId, runId, taskId);
         startRefresh();
       },
     },
diff --git a/airflow/www/static/js/tree/api/useMarkSuccessTask.js b/airflow/www/static/js/tree/api/useMarkSuccessTask.js
index 92ba539de6..cde919a274 100644
--- a/airflow/www/static/js/tree/api/useMarkSuccessTask.js
+++ b/airflow/www/static/js/tree/api/useMarkSuccessTask.js
@@ -33,7 +33,7 @@ export default function useMarkSuccessTask({
   return useMutation(
     ['markSuccess', dagId, runId, taskId],
     ({
-      past, future, upstream, downstream,
+      past, future, upstream, downstream, mapIndexes = [],
     }) => {
       const params = new URLSearchParams({
         csrf_token: csrfToken,
@@ -45,6 +45,7 @@ export default function useMarkSuccessTask({
         future,
         upstream,
         downstream,
+        map_indexes: mapIndexes,
       }).toString();
 
       return axios.post(successUrl, params, {
@@ -56,6 +57,7 @@ export default function useMarkSuccessTask({
     {
       onSuccess: () => {
         queryClient.invalidateQueries('treeData');
+        queryClient.invalidateQueries('mappedInstances', dagId, runId, taskId);
         startRefresh();
       },
     },
diff --git a/airflow/www/static/js/tree/api/useRunTask.js b/airflow/www/static/js/tree/api/useRunTask.js
index 44a9e14bf4..9e45c42f59 100644
--- a/airflow/www/static/js/tree/api/useRunTask.js
+++ b/airflow/www/static/js/tree/api/useRunTask.js
@@ -32,32 +32,34 @@ export default function useRunTask(dagId, runId, taskId) {
   const { startRefresh } = useAutoRefresh();
   return useMutation(
     ['runTask', dagId, runId, taskId],
-    ({
+    async ({
       ignoreAllDeps,
       ignoreTaskState,
       ignoreTaskDeps,
-      mapIndex = -1,
-    }) => {
-      const params = new URLSearchParams({
-        csrf_token: csrfToken,
-        dag_id: dagId,
-        dag_run_id: runId,
-        task_id: taskId,
-        ignore_all_deps: ignoreAllDeps,
-        ignore_task_deps: ignoreTaskDeps,
-        ignore_ti_state: ignoreTaskState,
-        map_index: mapIndex,
-      }).toString();
+      mapIndexes,
+    }) => Promise.all(
+      (mapIndexes.length ? mapIndexes : [-1]).map((mi) => {
+        const params = new URLSearchParams({
+          csrf_token: csrfToken,
+          dag_id: dagId,
+          dag_run_id: runId,
+          task_id: taskId,
+          ignore_all_deps: ignoreAllDeps,
+          ignore_task_deps: ignoreTaskDeps,
+          ignore_ti_state: ignoreTaskState,
+          map_index: mi,
+        }).toString();
 
-      return axios.post(runUrl, params, {
-        headers: {
-          'Content-Type': 'application/x-www-form-urlencoded',
-        },
-      });
-    },
+        return axios.post(runUrl, params, {
+          headers: {
+            'Content-Type': 'application/x-www-form-urlencoded',
+          },
+        });
+      }),
+    ),
     {
       onSuccess: (data) => {
-        const { message, status } = data;
+        const { message, status } = data.length ? data[0] : data;
         if (message && status === 'error') {
           toast({
             description: message,
@@ -67,6 +69,7 @@ export default function useRunTask(dagId, runId, taskId) {
         }
         if (!status || status !== 'error') {
           queryClient.invalidateQueries('treeData');
+          queryClient.invalidateQueries('mappedInstances', dagId, runId, taskId);
           startRefresh();
         }
       },
diff --git a/airflow/www/static/js/tree/details/content/taskInstance/index.jsx b/airflow/www/static/js/tree/details/content/taskInstance/index.jsx
index 62ffee156a..0e4f441e24 100644
--- a/airflow/www/static/js/tree/details/content/taskInstance/index.jsx
+++ b/airflow/www/static/js/tree/details/content/taskInstance/index.jsx
@@ -24,7 +24,6 @@ import {
   Divider,
   StackDivider,
   Text,
-  Flex,
 } from '@chakra-ui/react';
 
 import RunAction from './taskActions/Run';
@@ -89,39 +88,32 @@ const TaskInstance = ({ taskId, runId }) => {
       {!isGroup && (
         <Box my={3}>
           <Text as="strong">{taskActionsTitle}</Text>
-          <Flex maxHeight="20px" minHeight="20px">
-            {selectedRows.length ? (
-              <Text color="red.500">
-                Clear, Mark Failed, and Mark Success do not yet work with individual mapped tasks.
-              </Text>
-            ) : <Divider my={2} />}
-          </Flex>
-          {/* visibility={selectedRows.length ? 'visible' : 'hidden'} */}
+          <Divider my={2} />
           <VStack justifyContent="center" divider={<StackDivider my={3} />}>
             <RunAction
               runId={runId}
               taskId={taskId}
               dagId={dagId}
-              selectedRows={selectedRows}
+              mapIndexes={selectedRows}
             />
             <ClearAction
               runId={runId}
               taskId={taskId}
               dagId={dagId}
               executionDate={executionDate}
-              selectedRows={selectedRows}
+              mapIndexes={selectedRows}
             />
             <MarkFailedAction
               runId={runId}
               taskId={taskId}
               dagId={dagId}
-              selectedRows={selectedRows}
+              mapIndexes={selectedRows}
             />
             <MarkSuccessAction
               runId={runId}
               taskId={taskId}
               dagId={dagId}
-              selectedRows={selectedRows}
+              mapIndexes={selectedRows}
             />
           </VStack>
           <Divider my={2} />
@@ -143,7 +135,12 @@ const TaskInstance = ({ taskId, runId }) => {
         extraLinks={extraLinks}
       />
       {isMapped && (
-        <MappedInstances dagId={dagId} runId={runId} taskId={taskId} selectRows={setSelectedRows} />
+        <MappedInstances
+          dagId={dagId}
+          runId={runId}
+          taskId={taskId}
+          selectRows={setSelectedRows}
+        />
       )}
     </Box>
   );
diff --git a/airflow/www/static/js/tree/details/content/taskInstance/taskActions/Clear.jsx b/airflow/www/static/js/tree/details/content/taskInstance/taskActions/Clear.jsx
index cada7b59ed..d825976ed2 100644
--- a/airflow/www/static/js/tree/details/content/taskInstance/taskActions/Clear.jsx
+++ b/airflow/www/static/js/tree/details/content/taskInstance/taskActions/Clear.jsx
@@ -34,7 +34,7 @@ const Run = ({
   runId,
   taskId,
   executionDate,
-  selectedRows,
+  mapIndexes,
 }) => {
   const [affectedTasks, setAffectedTasks] = useState([]);
 
@@ -74,6 +74,7 @@ const Run = ({
         recursive,
         failed,
         confirmed: false,
+        mapIndexes,
       });
       setAffectedTasks(data);
       onOpen();
@@ -92,6 +93,7 @@ const Run = ({
         recursive,
         failed,
         confirmed: true,
+        mapIndexes,
       });
       setAffectedTasks([]);
       onClose();
@@ -114,7 +116,6 @@ const Run = ({
         colorScheme="blue"
         onClick={onClick}
         isLoading={isLoading}
-        isDisabled={!!selectedRows.length}
         title="Clearing deletes the previous state of the task instance, allowing it to get re-triggered by the scheduler or a backfill command"
       >
         Clear
diff --git a/airflow/www/static/js/tree/details/content/taskInstance/taskActions/MarkFailed.jsx b/airflow/www/static/js/tree/details/content/taskInstance/taskActions/MarkFailed.jsx
index 6bc10c066e..12f8bcfeef 100644
--- a/airflow/www/static/js/tree/details/content/taskInstance/taskActions/MarkFailed.jsx
+++ b/airflow/www/static/js/tree/details/content/taskInstance/taskActions/MarkFailed.jsx
@@ -33,7 +33,7 @@ const MarkFailed = ({
   dagId,
   runId,
   taskId,
-  selectedRows,
+  mapIndexes,
 }) => {
   const [affectedTasks, setAffectedTasks] = useState([]);
 
@@ -69,6 +69,7 @@ const MarkFailed = ({
         future,
         upstream,
         downstream,
+        mapIndexes,
       });
       setAffectedTasks(data);
       onOpen();
@@ -84,6 +85,7 @@ const MarkFailed = ({
         future,
         upstream,
         downstream,
+        mapIndexes,
       });
       setAffectedTasks([]);
       onClose();
@@ -100,7 +102,7 @@ const MarkFailed = ({
         <ActionButton bg={upstream && 'gray.100'} onClick={onToggleUpstream} name="Upstream" />
         <ActionButton bg={downstream && 'gray.100'} onClick={onToggleDownstream} name="Downstream" />
       </ButtonGroup>
-      <Button colorScheme="red" onClick={onClick} isLoading={isMarkLoading || isConfirmLoading} isDisabled={!!selectedRows.length}>
+      <Button colorScheme="red" onClick={onClick} isLoading={isMarkLoading || isConfirmLoading}>
         Mark Failed
       </Button>
       <ConfirmDialog
diff --git a/airflow/www/static/js/tree/details/content/taskInstance/taskActions/MarkSuccess.jsx b/airflow/www/static/js/tree/details/content/taskInstance/taskActions/MarkSuccess.jsx
index b4d2b8c047..bdf59e6a4d 100644
--- a/airflow/www/static/js/tree/details/content/taskInstance/taskActions/MarkSuccess.jsx
+++ b/airflow/www/static/js/tree/details/content/taskInstance/taskActions/MarkSuccess.jsx
@@ -29,8 +29,8 @@ import ConfirmDialog from '../../ConfirmDialog';
 import ActionButton from './ActionButton';
 import { useMarkSuccessTask, useConfirmMarkTask } from '../../../../api';
 
-const Run = ({
-  dagId, runId, taskId, selectedRows,
+const MarkSuccess = ({
+  dagId, runId, taskId, mapIndexes,
 }) => {
   const [affectedTasks, setAffectedTasks] = useState([]);
 
@@ -64,6 +64,7 @@ const Run = ({
         future,
         upstream,
         downstream,
+        mapIndexes,
       });
       setAffectedTasks(data);
       onOpen();
@@ -79,6 +80,7 @@ const Run = ({
         future,
         upstream,
         downstream,
+        mapIndexes,
       });
       setAffectedTasks([]);
       onClose();
@@ -95,7 +97,7 @@ const Run = ({
         <ActionButton bg={upstream && 'gray.100'} onClick={onToggleUpstream} name="Upstream" />
         <ActionButton bg={downstream && 'gray.100'} onClick={onToggleDownstream} name="Downstream" />
       </ButtonGroup>
-      <Button colorScheme="green" onClick={onClick} isLoading={isMarkLoading || isConfirmLoading} isDisabled={!!selectedRows.length}>
+      <Button colorScheme="green" onClick={onClick} isLoading={isMarkLoading || isConfirmLoading}>
         Mark Success
       </Button>
       <ConfirmDialog
@@ -109,4 +111,4 @@ const Run = ({
   );
 };
 
-export default Run;
+export default MarkSuccess;
diff --git a/airflow/www/static/js/tree/details/content/taskInstance/taskActions/Run.jsx b/airflow/www/static/js/tree/details/content/taskInstance/taskActions/Run.jsx
index 41c8bb9c6f..85d502aeed 100644
--- a/airflow/www/static/js/tree/details/content/taskInstance/taskActions/Run.jsx
+++ b/airflow/www/static/js/tree/details/content/taskInstance/taskActions/Run.jsx
@@ -30,7 +30,7 @@ const Run = ({
   dagId,
   runId,
   taskId,
-  selectedRows,
+  mapIndexes,
 }) => {
   const [ignoreAllDeps, setIgnoreAllDeps] = useState(false);
   const onToggleAllDeps = () => setIgnoreAllDeps(!ignoreAllDeps);
@@ -44,22 +44,12 @@ const Run = ({
   const { mutate: onRun, isLoading } = useRunTask(dagId, runId, taskId);
 
   const onClick = () => {
-    if (selectedRows.length) {
-      selectedRows.forEach((mapIndex) => {
-        onRun({
-          ignoreAllDeps,
-          ignoreTaskState,
-          ignoreTaskDeps,
-          mapIndex,
-        });
-      });
-    } else {
-      onRun({
-        ignoreAllDeps,
-        ignoreTaskState,
-        ignoreTaskDeps,
-      });
-    }
+    onRun({
+      ignoreAllDeps,
+      ignoreTaskState,
+      ignoreTaskDeps,
+      mapIndexes,
+    });
   };
 
   return (


[airflow] 02/09: Introduce tuple_().in_() shim for MSSQL compat

Posted by bb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bbovenzi pushed a commit to branch mapped-instance-actions
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 898480765fe8117938037b194a10663565d44e3a
Author: Tzu-ping Chung <tp...@astronomer.io>
AuthorDate: Tue Apr 19 18:01:55 2022 +0800

    Introduce tuple_().in_() shim for MSSQL compat
---
 airflow/api/common/mark_tasks.py |  5 +++--
 airflow/jobs/scheduler_job.py    | 47 +++++++++++++++-------------------------
 airflow/models/dag.py            |  8 +++----
 airflow/models/taskinstance.py   | 21 +++++-------------
 airflow/utils/sqlalchemy.py      | 25 +++++++++++++++++++--
 5 files changed, 52 insertions(+), 54 deletions(-)

diff --git a/airflow/api/common/mark_tasks.py b/airflow/api/common/mark_tasks.py
index 349b935e82..1d4709fb82 100644
--- a/airflow/api/common/mark_tasks.py
+++ b/airflow/api/common/mark_tasks.py
@@ -20,7 +20,7 @@
 from datetime import datetime
 from typing import TYPE_CHECKING, Collection, Iterable, Iterator, List, NamedTuple, Optional, Tuple, Union
 
-from sqlalchemy import or_, tuple_
+from sqlalchemy import or_
 from sqlalchemy.orm import contains_eager
 from sqlalchemy.orm.session import Session as SASession
 
@@ -32,6 +32,7 @@ from airflow.operators.subdag import SubDagOperator
 from airflow.utils import timezone
 from airflow.utils.helpers import exactly_one
 from airflow.utils.session import NEW_SESSION, provide_session
+from airflow.utils.sqlalchemy import tuple_in_condition
 from airflow.utils.state import DagRunState, State, TaskInstanceState
 from airflow.utils.types import DagRunType
 
@@ -203,7 +204,7 @@ def get_all_dag_task_query(
     if is_string_list:
         qry_dag = qry_dag.filter(TaskInstance.task_id.in_(task_ids))
     else:
-        qry_dag = qry_dag.filter(tuple_(TaskInstance.task_id, TaskInstance.map_index).in_(task_ids))
+        qry_dag = qry_dag.filter(tuple_in_condition((TaskInstance.task_id, TaskInstance.map_index), task_ids))
     qry_dag = qry_dag.filter(or_(TaskInstance.state.is_(None), TaskInstance.state != state)).options(
         contains_eager(TaskInstance.dag_run)
     )
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index e0b8c437ac..ac1d25833b 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -28,7 +28,7 @@ from collections import defaultdict
 from datetime import timedelta
 from typing import Collection, DefaultDict, Dict, Iterator, List, Optional, Set, Tuple
 
-from sqlalchemy import and_, func, not_, or_, text, tuple_
+from sqlalchemy import func, not_, or_, text
 from sqlalchemy.exc import OperationalError
 from sqlalchemy.orm import load_only, selectinload
 from sqlalchemy.orm.session import Session, make_transient
@@ -55,7 +55,13 @@ from airflow.utils.docs import get_docs_url
 from airflow.utils.event_scheduler import EventScheduler
 from airflow.utils.retries import MAX_DB_RETRIES, retry_db_transaction, run_with_db_retries
 from airflow.utils.session import create_session, provide_session
-from airflow.utils.sqlalchemy import is_lock_not_available_error, prohibit_commit, skip_locked, with_row_locks
+from airflow.utils.sqlalchemy import (
+    is_lock_not_available_error,
+    prohibit_commit,
+    skip_locked,
+    tuple_in_condition,
+    with_row_locks,
+)
 from airflow.utils.state import DagRunState, State, TaskInstanceState
 from airflow.utils.types import DagRunType
 
@@ -321,17 +327,7 @@ class SchedulerJob(BaseJob):
                 query = query.filter(not_(TI.dag_id.in_(starved_dags)))
 
             if starved_tasks:
-                if settings.engine.dialect.name == 'mssql':
-                    task_filter = or_(
-                        and_(
-                            TaskInstance.dag_id == dag_id,
-                            TaskInstance.task_id == task_id,
-                        )
-                        for (dag_id, task_id) in starved_tasks
-                    )
-                else:
-                    task_filter = tuple_(TaskInstance.dag_id, TaskInstance.task_id).in_(starved_tasks)
-
+                task_filter = tuple_in_condition((TaskInstance.dag_id, TaskInstance.task_id), starved_tasks)
                 query = query.filter(not_(task_filter))
 
             query = query.limit(max_tis)
@@ -980,24 +976,15 @@ class SchedulerJob(BaseJob):
         # as DagModel.dag_id and DagModel.next_dagrun
         # This list is used to verify if the DagRun already exist so that we don't attempt to create
         # duplicate dag runs
-
-        if session.bind.dialect.name == 'mssql':
-            existing_dagruns_filter = or_(
-                *(
-                    and_(
-                        DagRun.dag_id == dm.dag_id,
-                        DagRun.execution_date == dm.next_dagrun,
-                    )
-                    for dm in dag_models
-                )
-            )
-        else:
-            existing_dagruns_filter = tuple_(DagRun.dag_id, DagRun.execution_date).in_(
-                [(dm.dag_id, dm.next_dagrun) for dm in dag_models]
-            )
-
         existing_dagruns = (
-            session.query(DagRun.dag_id, DagRun.execution_date).filter(existing_dagruns_filter).all()
+            session.query(DagRun.dag_id, DagRun.execution_date)
+            .filter(
+                tuple_in_condition(
+                    (DagRun.dag_id, DagRun.execution_date),
+                    ((dm.dag_id, dm.next_dagrun) for dm in dag_models),
+                ),
+            )
+            .all()
         )
 
         active_runs_of_dags = defaultdict(
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 9c93bcef13..83860ba591 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -52,7 +52,7 @@ import jinja2
 import pendulum
 from dateutil.relativedelta import relativedelta
 from pendulum.tz.timezone import Timezone
-from sqlalchemy import Boolean, Column, ForeignKey, Index, Integer, String, Text, func, or_, tuple_
+from sqlalchemy import Boolean, Column, ForeignKey, Index, Integer, String, Text, func, not_, or_
 from sqlalchemy.orm import backref, joinedload, relationship
 from sqlalchemy.orm.query import Query
 from sqlalchemy.orm.session import Session
@@ -85,7 +85,7 @@ from airflow.utils.file import correct_maybe_zipped
 from airflow.utils.helpers import exactly_one, validate_key
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.session import NEW_SESSION, provide_session
-from airflow.utils.sqlalchemy import Interval, UtcDateTime, skip_locked, with_row_locks
+from airflow.utils.sqlalchemy import Interval, UtcDateTime, skip_locked, tuple_in_condition, with_row_locks
 from airflow.utils.state import DagRunState, State, TaskInstanceState
 from airflow.utils.types import NOTSET, ArgNotSet, DagRunType, EdgeInfoType
 
@@ -1451,7 +1451,7 @@ class DAG(LoggingMixin):
         elif isinstance(next(iter(task_ids), None), str):
             tis = tis.filter(TI.task_id.in_(task_ids))
         else:
-            tis = tis.filter(tuple_(TI.task_id, TI.map_index).in_(task_ids))
+            tis = tis.filter(tuple_in_condition((TI.task_id, TI.map_index), task_ids))
 
         # This allows allow_trigger_in_future config to take affect, rather than mandating exec_date <= UTC
         if end_date or not self.allow_future_exec_dates:
@@ -1611,7 +1611,7 @@ class DAG(LoggingMixin):
         elif isinstance(next(iter(exclude_task_ids), None), str):
             tis = tis.filter(TI.task_id.notin_(exclude_task_ids))
         else:
-            tis = tis.filter(tuple_(TI.task_id, TI.map_index).notin_(exclude_task_ids))
+            tis = tis.filter(not_(tuple_in_condition((TI.task_id, TI.map_index), exclude_task_ids)))
 
         return tis
 
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 48d3a047fb..9d135a47b8 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -67,7 +67,6 @@ from sqlalchemy import (
     inspect,
     or_,
     text,
-    tuple_,
 )
 from sqlalchemy.ext.associationproxy import association_proxy
 from sqlalchemy.ext.mutable import MutableDict
@@ -122,7 +121,7 @@ from airflow.utils.operator_helpers import context_to_airflow_vars
 from airflow.utils.platform import getuser
 from airflow.utils.retries import run_with_db_retries
 from airflow.utils.session import NEW_SESSION, create_session, provide_session
-from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime, with_row_locks
+from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime, tuple_in_condition, with_row_locks
 from airflow.utils.state import DagRunState, State, TaskInstanceState
 from airflow.utils.timeout import timeout
 
@@ -2540,20 +2539,10 @@ class TaskInstance(Base, LoggingMixin):
                 TaskInstance.task_id == first_task_id,
             )
 
-        if settings.engine.dialect.name == 'mssql':
-            return or_(
-                and_(
-                    TaskInstance.dag_id == ti.dag_id,
-                    TaskInstance.task_id == ti.task_id,
-                    TaskInstance.run_id == ti.run_id,
-                    TaskInstance.map_index == ti.map_index,
-                )
-                for ti in tis
-            )
-        else:
-            return tuple_(
-                TaskInstance.dag_id, TaskInstance.task_id, TaskInstance.run_id, TaskInstance.map_index
-            ).in_([ti.key.primary for ti in tis])
+        return tuple_in_condition(
+            (TaskInstance.dag_id, TaskInstance.task_id, TaskInstance.run_id, TaskInstance.map_index),
+            (ti.key.primary for ti in tis),
+        )
 
 
 # State of the task instance.
diff --git a/airflow/utils/sqlalchemy.py b/airflow/utils/sqlalchemy.py
index c240a94456..5c36d826b2 100644
--- a/airflow/utils/sqlalchemy.py
+++ b/airflow/utils/sqlalchemy.py
@@ -19,15 +19,19 @@
 import datetime
 import json
 import logging
-from typing import Any, Dict
+from operator import and_, or_
+from typing import Any, Dict, Iterable, Tuple
 
 import pendulum
 from dateutil import relativedelta
-from sqlalchemy import event, nullsfirst
+from sqlalchemy import event, nullsfirst, tuple_
 from sqlalchemy.exc import OperationalError
 from sqlalchemy.orm.session import Session
+from sqlalchemy.sql import ColumnElement
+from sqlalchemy.sql.expression import ColumnOperators
 from sqlalchemy.types import JSON, DateTime, Text, TypeDecorator, TypeEngine, UnicodeText
 
+from airflow import settings
 from airflow.configuration import conf
 
 log = logging.getLogger(__name__)
@@ -319,3 +323,20 @@ def is_lock_not_available_error(error: OperationalError):
     if db_err_code in ('55P03', 1205, 3572):
         return True
     return False
+
+
+def tuple_in_condition(
+    columns: Tuple[ColumnElement, ...],
+    collection: Iterable[Any],
+) -> ColumnOperators:
+    """Generates a tuple-in-collection operator to use in ``.filter()``.
+
+    For most SQL backends, this generates a simple ``([col, ...]) IN [condition]``
+    clause. This however does not work with MSSQL, where we need to expand to
+    ``(c1 = v1a AND c2 = v2a ...) OR (c1 = v1b AND c2 = v2b ...) ...`` manually.
+
+    :meta private:
+    """
+    if settings.engine.dialect.name != "mssql":
+        return tuple_(*columns).in_(collection)
+    return or_(*(and_(*(c == v for c, v in zip(columns, values))) for values in collection))