You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bb...@apache.org on 2022/04/05 21:00:24 UTC

[airflow] branch webserver-use-dag-run-id created (now de650c73fc)

This is an automated email from the ASF dual-hosted git repository.

bbovenzi pushed a change to branch webserver-use-dag-run-id
in repository https://gitbox.apache.org/repos/asf/airflow.git


      at de650c73fc Use dag_run_id instead of execution_date

This branch includes the following new commits:

     new c338832ffa Remove extraneous treeData fields
     new de650c73fc Use dag_run_id instead of execution_date

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[airflow] 01/02: Remove extraneous treeData fields

Posted by bb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bbovenzi pushed a commit to branch webserver-use-dag-run-id
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit c338832ffa063764f96e3a6e560f6b45de0b218e
Author: Brent Bovenzi <br...@gmail.com>
AuthorDate: Tue Apr 5 15:09:58 2022 -0400

    Remove extraneous treeData fields
---
 .../js/tree/details/content/dagRun/index.jsx       |  2 +-
 .../tree/details/content/taskInstance/Details.jsx  | 21 +++++----
 .../js/tree/details/content/taskInstance/Nav.jsx   | 12 ++---
 .../js/tree/details/content/taskInstance/index.jsx | 52 +++++++++++++---------
 .../taskInstance/taskActions/MarkFailed.jsx        |  2 +-
 airflow/www/static/js/tree/treeDataUtils.js        |  2 +-
 airflow/www/utils.py                               |  7 ---
 7 files changed, 50 insertions(+), 48 deletions(-)

diff --git a/airflow/www/static/js/tree/details/content/dagRun/index.jsx b/airflow/www/static/js/tree/details/content/dagRun/index.jsx
index 68e6d6526a..fd609b7ad0 100644
--- a/airflow/www/static/js/tree/details/content/dagRun/index.jsx
+++ b/airflow/www/static/js/tree/details/content/dagRun/index.jsx
@@ -37,6 +37,7 @@ import ClearRun from './ClearRun';
 import { useTreeData } from '../../../api';
 import { appendSearchParams, getMetaValue } from '../../../../utils';
 
+const dagId = getMetaValue('dag_id');
 const graphUrl = getMetaValue('graph_url');
 const dagRunDetailsUrl = getMetaValue('dagrun_details_url');
 
@@ -46,7 +47,6 @@ const DagRun = ({ runId }) => {
   if (!run) return null;
   const {
     executionDate,
-    dagId,
     state,
     runType,
     duration,
diff --git a/airflow/www/static/js/tree/details/content/taskInstance/Details.jsx b/airflow/www/static/js/tree/details/content/taskInstance/Details.jsx
index 44294f51f7..1ae907b00c 100644
--- a/airflow/www/static/js/tree/details/content/taskInstance/Details.jsx
+++ b/airflow/www/static/js/tree/details/content/taskInstance/Details.jsx
@@ -29,8 +29,8 @@ import { getDuration, formatDuration } from '../../../../datetime_utils';
 import { SimpleStatus } from '../../../StatusBox';
 import Time from '../../../Time';
 
-const Details = ({ instance, task }) => {
-  const isGroup = !!task.children;
+const Details = ({ instance, group, operator }) => {
+  const isGroup = !!group.children;
   const groupSummary = [];
   const mapSummary = [];
 
@@ -38,16 +38,21 @@ const Details = ({ instance, task }) => {
     taskId,
     runId,
     duration,
-    operator,
     startDate,
     endDate,
     state,
     mappedStates,
   } = instance;
 
+  const {
+    isMapped,
+    children,
+    tooltip,
+  } = group;
+
   if (isGroup) {
     const numMap = finalStatesMap();
-    task.children.forEach((child) => {
+    children.forEach((child) => {
       const taskInstance = child.instances.find((ti) => ti.runId === runId);
       if (taskInstance) {
         const stateKey = taskInstance.state == null ? 'no_status' : taskInstance.state;
@@ -68,7 +73,7 @@ const Details = ({ instance, task }) => {
     });
   }
 
-  if (task.isMapped && mappedStates) {
+  if (isMapped && mappedStates) {
     const numMap = finalStatesMap();
     mappedStates.forEach((s) => {
       const stateKey = s || 'no_status';
@@ -94,8 +99,8 @@ const Details = ({ instance, task }) => {
   return (
     <Flex flexWrap="wrap" justifyContent="space-between">
       <Box>
-        {task.tooltip && (
-          <Text>{task.tooltip}</Text>
+        {tooltip && (
+          <Text>{tooltip}</Text>
         )}
         <Flex alignItems="center">
           <Text as="strong">Status:</Text>
@@ -109,7 +114,7 @@ const Details = ({ instance, task }) => {
             {groupSummary}
           </>
         )}
-        {task.isMapped && (
+        {isMapped && (
           <>
             <br />
             <Text as="strong">
diff --git a/airflow/www/static/js/tree/details/content/taskInstance/Nav.jsx b/airflow/www/static/js/tree/details/content/taskInstance/Nav.jsx
index d0841d0119..17b423130a 100644
--- a/airflow/www/static/js/tree/details/content/taskInstance/Nav.jsx
+++ b/airflow/www/static/js/tree/details/content/taskInstance/Nav.jsx
@@ -27,6 +27,7 @@ import {
 
 import { getMetaValue, appendSearchParams } from '../../../../utils';
 
+const dagId = getMetaValue('dag_id');
 const isK8sExecutor = getMetaValue('k8s_or_k8scelery_executor') === 'True';
 const numRuns = getMetaValue('num_runs');
 const baseDate = getMetaValue('base_date');
@@ -40,14 +41,9 @@ const gridUrlNoRoot = getMetaValue('grid_url_no_root');
 
 const LinkButton = ({ children, ...rest }) => (<Button as={Link} variant="ghost" colorScheme="blue" {...rest}>{children}</Button>);
 
-const Nav = ({ instance, isMapped }) => {
-  const {
-    taskId,
-    dagId,
-    operator,
-    executionDate,
-  } = instance;
-
+const Nav = ({
+  taskId, executionDate, operator, isMapped,
+}) => {
   const params = new URLSearchParams({
     task_id: taskId,
     execution_date: executionDate,
diff --git a/airflow/www/static/js/tree/details/content/taskInstance/index.jsx b/airflow/www/static/js/tree/details/content/taskInstance/index.jsx
index 1e787c7aba..93d22dbde7 100644
--- a/airflow/www/static/js/tree/details/content/taskInstance/index.jsx
+++ b/airflow/www/static/js/tree/details/content/taskInstance/index.jsx
@@ -34,8 +34,11 @@ import Logs from './Logs';
 import TaskNav from './Nav';
 import Details from './Details';
 
-import { useTreeData } from '../../../api';
+import { useTreeData, useTasks } from '../../../api';
 import MappedInstances from './MappedInstances';
+import { getMetaValue } from '../../../../utils';
+
+const dagId = getMetaValue('dag_id');
 
 const getTask = ({ taskId, runId, task }) => {
   if (task.id === taskId) return task;
@@ -51,57 +54,62 @@ const getTask = ({ taskId, runId, task }) => {
 };
 
 const TaskInstance = ({ taskId, runId }) => {
-  const { data: { groups = {} } } = useTreeData();
-  const task = getTask({ taskId, runId, task: groups });
-  if (!task) return null;
-
-  const isGroup = !!task.children;
+  const { data: { groups = {}, dagRuns = [] } } = useTreeData();
+  const group = getTask({ taskId, runId, task: groups });
+  const run = dagRuns.find((r) => r.runId === runId);
+  const { executionDate } = run;
+  const { data: { tasks } } = useTasks(dagId);
+  if (!group) return null;
+  const task = tasks.find((t) => t.taskId === taskId);
+  const operator = task && task.classRef && task.classRef.className ? task.classRef.className : '';
 
-  const instance = task.instances.find((ti) => ti.runId === runId);
+  const isGroup = !!group.children;
+  const { isMapped, extraLinks } = group;
 
-  const {
-    dagId,
-    executionDate,
-    tryNumber,
-  } = instance;
+  const instance = group.instances.find((ti) => ti.runId === runId);
 
   return (
     <Box fontSize="12px" py="4px">
       {!isGroup && (
-        <TaskNav instance={instance} isMapped={task.isMapped} />
+        <TaskNav
+          taskId={taskId}
+          isMapped={isMapped}
+          executionDate={executionDate}
+          operator={operator}
+        />
       )}
       {!isGroup && (
         <>
           <VStack justifyContent="center" divider={<StackDivider my={3} />} my={3}>
-            <RunAction runId={runId} taskId={task.id} dagId={dagId} />
+            <RunAction runId={runId} taskId={taskId} dagId={dagId} />
             <ClearAction
               runId={runId}
-              taskId={task.id}
+              taskId={taskId}
               dagId={dagId}
               executionDate={executionDate}
             />
-            <MarkFailedAction runId={runId} taskId={task.id} dagId={dagId} />
-            <MarkSuccessAction runId={runId} taskId={task.id} dagId={dagId} />
+            <MarkFailedAction runId={runId} taskId={taskId} dagId={dagId} />
+            <MarkSuccessAction runId={runId} taskId={taskId} dagId={dagId} />
           </VStack>
           <Divider my={2} />
         </>
       )}
-      {!task.isMapped && (
+      {!isMapped && (
         <Logs
           dagId={dagId}
           taskId={taskId}
           executionDate={executionDate}
-          tryNumber={tryNumber}
+          tryNumber={instance.tryNumber}
         />
       )}
-      <Details instance={instance} task={task} />
+      <Details instance={instance} group={group} operator={operator} />
       <ExtraLinks
         taskId={taskId}
         dagId={dagId}
         executionDate={executionDate}
-        extraLinks={task.extraLinks}
+        extraLinks={extraLinks}
       />
-      {task.isMapped && (
+      {isMapped && (
         <MappedInstances dagId={dagId} runId={runId} taskId={taskId} />
       )}
     </Box>
diff --git a/airflow/www/static/js/tree/details/content/taskInstance/taskActions/MarkFailed.jsx b/airflow/www/static/js/tree/details/content/taskInstance/taskActions/MarkFailed.jsx
index 7b61de388e..fe277c9eef 100644
--- a/airflow/www/static/js/tree/details/content/taskInstance/taskActions/MarkFailed.jsx
+++ b/airflow/www/static/js/tree/details/content/taskInstance/taskActions/MarkFailed.jsx
@@ -72,7 +72,7 @@ const MarkFailed = ({
       setAffectedTasks(data);
       onOpen();
     } catch (e) {
-      console.log(e);
+      console.error(e);
     }
   };
 
diff --git a/airflow/www/static/js/tree/treeDataUtils.js b/airflow/www/static/js/tree/treeDataUtils.js
index 680f2e63ac..95171652b7 100644
--- a/airflow/www/static/js/tree/treeDataUtils.js
+++ b/airflow/www/static/js/tree/treeDataUtils.js
@@ -19,7 +19,7 @@
 
 import camelcaseKeys from 'camelcase-keys';
 
-export const areActiveRuns = (runs) => runs.filter((run) => ['queued', 'running', 'scheduled'].includes(run.state)).length > 0;
+export const areActiveRuns = (runs = []) => runs.filter((run) => ['queued', 'running', 'scheduled'].includes(run.state)).length > 0;
 
 export const formatData = (data, emptyData) => {
   if (!data || !Object.keys(data).length) {
diff --git a/airflow/www/utils.py b/airflow/www/utils.py
index fc4c391660..b07cd0f44f 100644
--- a/airflow/www/utils.py
+++ b/airflow/www/utils.py
@@ -110,15 +110,12 @@ def get_mapped_summary(parent_instance, task_instances):
     )
 
     return {
-        'dag_id': parent_instance.dag_id,
         'task_id': parent_instance.task_id,
         'run_id': parent_instance.run_id,
         'state': group_state,
         'start_date': group_start_date,
         'end_date': group_end_date,
         'mapped_states': mapped_states,
-        'operator': parent_instance.operator,
-        'execution_date': datetime_to_string(parent_instance.execution_date),
         'try_number': parent_instance.try_number,
     }
 
@@ -134,15 +131,12 @@ def encode_ti(
 
     return {
         'task_id': task_instance.task_id,
-        'dag_id': task_instance.dag_id,
         'run_id': task_instance.run_id,
         'map_index': task_instance.map_index,
         'state': task_instance.state,
         'duration': task_instance.duration,
         'start_date': datetime_to_string(task_instance.start_date),
         'end_date': datetime_to_string(task_instance.end_date),
-        'operator': task_instance.operator,
-        'execution_date': datetime_to_string(task_instance.execution_date),
         'try_number': task_instance.try_number,
     }
 
@@ -152,7 +146,6 @@ def encode_dag_run(dag_run: Optional[models.DagRun]) -> Optional[Dict[str, Any]]
         return None
 
     return {
-        'dag_id': dag_run.dag_id,
         'run_id': dag_run.run_id,
         'start_date': datetime_to_string(dag_run.start_date),
         'end_date': datetime_to_string(dag_run.end_date),


[airflow] 02/02: Use dag_run_id instead of execution_date

Posted by bb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bbovenzi pushed a commit to branch webserver-use-dag-run-id
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit de650c73fc4ddb3c24e070bad9ec0aff61e03149
Author: Brent Bovenzi <br...@gmail.com>
AuthorDate: Tue Apr 5 16:58:29 2022 -0400

    Use dag_run_id instead of execution_date
---
 airflow/www/static/js/tree/api/useClearTask.js     |  3 +-
 airflow/www/static/js/tree/api/useExtraLinks.js    |  6 +-
 .../js/tree/details/content/dagRun/index.jsx       |  3 +-
 .../details/content/taskInstance/ExtraLinks.jsx    |  4 +-
 .../js/tree/details/content/taskInstance/Logs.jsx  |  6 +-
 .../js/tree/details/content/taskInstance/Nav.jsx   |  9 +-
 .../js/tree/details/content/taskInstance/index.jsx | 17 ++--
 .../content/taskInstance/taskActions/Clear.jsx     |  7 +-
 airflow/www/templates/airflow/dag.html             |  2 +-
 airflow/www/utils.py                               |  1 -
 airflow/www/views.py                               | 95 ++++++++++++++++++----
 11 files changed, 100 insertions(+), 53 deletions(-)

diff --git a/airflow/www/static/js/tree/api/useClearTask.js b/airflow/www/static/js/tree/api/useClearTask.js
index bcf99bb250..8e2ff52e2a 100644
--- a/airflow/www/static/js/tree/api/useClearTask.js
+++ b/airflow/www/static/js/tree/api/useClearTask.js
@@ -27,7 +27,7 @@ const csrfToken = getMetaValue('csrf_token');
 const clearUrl = getMetaValue('clear_url');
 
 export default function useClearTask({
-  dagId, runId, taskId, executionDate,
+  dagId, runId, taskId,
 }) {
   const queryClient = useQueryClient();
   const toast = useToast();
@@ -44,7 +44,6 @@ export default function useClearTask({
         dag_run_id: runId,
         task_id: taskId,
         confirmed,
-        execution_date: executionDate,
         past,
         future,
         upstream,
diff --git a/airflow/www/static/js/tree/api/useExtraLinks.js b/airflow/www/static/js/tree/api/useExtraLinks.js
index ecc92ddc80..d88cc9b171 100644
--- a/airflow/www/static/js/tree/api/useExtraLinks.js
+++ b/airflow/www/static/js/tree/api/useExtraLinks.js
@@ -24,16 +24,16 @@ import { getMetaValue } from '../../utils';
 const extraLinksUrl = getMetaValue('extra_links_url');
 
 export default function useExtraLinks({
-  dagId, taskId, executionDate, extraLinks,
+  dagId, taskId, runId, extraLinks,
 }) {
   return useQuery(
-    ['extraLinks', dagId, taskId, executionDate],
+    ['extraLinks', dagId, taskId, runId],
     async () => {
       const data = await Promise.all(extraLinks.map(async (link) => {
         const url = `${extraLinksUrl
         }?task_id=${encodeURIComponent(taskId)
         }&dag_id=${encodeURIComponent(dagId)
-        }&execution_date=${encodeURIComponent(executionDate)
+        }&dag_run_id=${encodeURIComponent(runId)
         }&link_name=${encodeURIComponent(link)}`;
         try {
           const datum = await axios.get(url);
diff --git a/airflow/www/static/js/tree/details/content/dagRun/index.jsx b/airflow/www/static/js/tree/details/content/dagRun/index.jsx
index fd609b7ad0..dd77e59119 100644
--- a/airflow/www/static/js/tree/details/content/dagRun/index.jsx
+++ b/airflow/www/static/js/tree/details/content/dagRun/index.jsx
@@ -46,7 +46,6 @@ const DagRun = ({ runId }) => {
   const run = dagRuns.find((dr) => dr.runId === runId);
   if (!run) return null;
   const {
-    executionDate,
     state,
     runType,
     duration,
@@ -60,7 +59,7 @@ const DagRun = ({ runId }) => {
     run_id: runId,
   }).toString();
   const graphParams = new URLSearchParams({
-    execution_date: executionDate,
+    dag_run_id: runId,
   }).toString();
   const graphLink = appendSearchParams(graphUrl, graphParams);
   const detailsLink = appendSearchParams(dagRunDetailsUrl, detailsParams);
diff --git a/airflow/www/static/js/tree/details/content/taskInstance/ExtraLinks.jsx b/airflow/www/static/js/tree/details/content/taskInstance/ExtraLinks.jsx
index b9b60b4c01..e53c1583ad 100644
--- a/airflow/www/static/js/tree/details/content/taskInstance/ExtraLinks.jsx
+++ b/airflow/www/static/js/tree/details/content/taskInstance/ExtraLinks.jsx
@@ -30,11 +30,11 @@ import { useExtraLinks } from '../../../api';
 const ExtraLinks = ({
   dagId,
   taskId,
-  executionDate,
+  runId,
   extraLinks = [],
 }) => {
   const { data: links = [] } = useExtraLinks({
-    dagId, taskId, executionDate, extraLinks,
+    dagId, taskId, runId, extraLinks,
   });
 
   if (!links.length) return null;
diff --git a/airflow/www/static/js/tree/details/content/taskInstance/Logs.jsx b/airflow/www/static/js/tree/details/content/taskInstance/Logs.jsx
index c89e1d3364..462503aebe 100644
--- a/airflow/www/static/js/tree/details/content/taskInstance/Logs.jsx
+++ b/airflow/www/static/js/tree/details/content/taskInstance/Logs.jsx
@@ -39,7 +39,7 @@ const LinkButton = ({ children, ...rest }) => (<Button as={Link} variant="ghost"
 const Logs = ({
   dagId,
   taskId,
-  executionDate,
+  runId,
   tryNumber,
 }) => {
   const externalLogs = [];
@@ -53,7 +53,7 @@ const Logs = ({
       const fullExternalUrl = `${externalLogUrl
       }?dag_id=${encodeURIComponent(dagId)
       }&task_id=${encodeURIComponent(taskId)
-      }&execution_date=${encodeURIComponent(executionDate)
+      }&dag_run_id=${encodeURIComponent(runId)
       }&try_number=${index}`;
       externalLogs.push(
         <LinkButton
@@ -70,7 +70,7 @@ const Logs = ({
     const fullMetadataUrl = `${logsWithMetadataUrl
     }?dag_id=${encodeURIComponent(dagId)
     }&task_id=${encodeURIComponent(taskId)
-    }&execution_date=${encodeURIComponent(executionDate)
+    }&dag_run_id=${encodeURIComponent(runId)
     }&metadata=null&format=file${index > 0 && `&try_number=${index}`}`;
 
     return (
diff --git a/airflow/www/static/js/tree/details/content/taskInstance/Nav.jsx b/airflow/www/static/js/tree/details/content/taskInstance/Nav.jsx
index 17b423130a..29dd79c0a7 100644
--- a/airflow/www/static/js/tree/details/content/taskInstance/Nav.jsx
+++ b/airflow/www/static/js/tree/details/content/taskInstance/Nav.jsx
@@ -42,11 +42,11 @@ const gridUrlNoRoot = getMetaValue('grid_url_no_root');
 const LinkButton = ({ children, ...rest }) => (<Button as={Link} variant="ghost" colorScheme="blue" {...rest}>{children}</Button>);
 
 const Nav = ({
-  taskId, executionDate, operator, isMapped,
+  taskId, operator, isMapped, runId,
 }) => {
   const params = new URLSearchParams({
     task_id: taskId,
-    execution_date: executionDate,
+    dag_run_id: runId,
   }).toString();
   const detailsLink = `${taskUrl}&${params}`;
   const renderedLink = `${renderedTemplatesUrl}&${params}`;
@@ -57,9 +57,6 @@ const Nav = ({
     _flt_3_task_id: taskId,
     _oc_TaskInstanceModelView: 'dag_run.execution_date',
   });
-  const subDagParams = new URLSearchParams({
-    execution_date: executionDate,
-  }).toString();
 
   const filterParams = new URLSearchParams({
     base_date: baseDate,
@@ -70,7 +67,7 @@ const Nav = ({
   const allInstancesLink = `${taskInstancesUrl}?${listParams.toString()}`;
 
   const filterUpstreamLink = appendSearchParams(gridUrlNoRoot, filterParams);
-  const subDagLink = appendSearchParams(gridUrl.replace(dagId, `${dagId}.${taskId}`), subDagParams);
+  const subDagLink = gridUrl.replace(dagId, `${dagId}.${taskId}`);
 
   // TODO: base subdag zooming as its own attribute instead of via operator name
   const isSubDag = operator === 'SubDagOperator';
diff --git a/airflow/www/static/js/tree/details/content/taskInstance/index.jsx b/airflow/www/static/js/tree/details/content/taskInstance/index.jsx
index 93d22dbde7..a48aceac5e 100644
--- a/airflow/www/static/js/tree/details/content/taskInstance/index.jsx
+++ b/airflow/www/static/js/tree/details/content/taskInstance/index.jsx
@@ -54,10 +54,8 @@ const getTask = ({ taskId, runId, task }) => {
 };
 
 const TaskInstance = ({ taskId, runId }) => {
-  const { data: { groups = {}, dagRuns = [] } } = useTreeData();
+  const { data: { groups = {} } } = useTreeData();
   const group = getTask({ taskId, runId, task: groups });
-  const run = dagRuns.find((r) => r.runId === runId);
-  const { executionDate } = run;
   const { data: { tasks } } = useTasks(dagId);
   if (!group) return null;
   const task = tasks.find((t) => t.taskId === taskId);
@@ -74,20 +72,15 @@ const TaskInstance = ({ taskId, runId }) => {
         <TaskNav
           taskId={taskId}
           isMapped={isMapped}
-          executionDate={executionDate}
           operator={operator}
+          runId={runId}
         />
       )}
       {!isGroup && (
         <>
           <VStack justifyContent="center" divider={<StackDivider my={3} />} my={3}>
             <RunAction runId={runId} taskId={taskId} dagId={dagId} />
-            <ClearAction
-              runId={runId}
-              taskId={taskId}
-              dagId={dagId}
-              executionDate={executionDate}
-            />
+            <ClearAction runId={runId} taskId={taskId} dagId={dagId} />
             <MarkFailedAction runId={runId} taskId={taskId} dagId={dagId} />
             <MarkSuccessAction runId={runId} taskId={taskId} dagId={dagId} />
           </VStack>
@@ -98,7 +91,7 @@ const TaskInstance = ({ taskId, runId }) => {
         <Logs
           dagId={dagId}
           taskId={taskId}
-          executionDate={executionDate}
+          runId={runId}
           tryNumber={instance.tryNumber}
         />
       )}
@@ -106,7 +99,7 @@ const TaskInstance = ({ taskId, runId }) => {
       <ExtraLinks
         taskId={taskId}
         dagId={dagId}
-        executionDate={executionDate}
+        runId={runId}
         extraLinks={extraLinks}
       />
       {isMapped && (
diff --git a/airflow/www/static/js/tree/details/content/taskInstance/taskActions/Clear.jsx b/airflow/www/static/js/tree/details/content/taskInstance/taskActions/Clear.jsx
index 4196edc6b9..ec7729cf07 100644
--- a/airflow/www/static/js/tree/details/content/taskInstance/taskActions/Clear.jsx
+++ b/airflow/www/static/js/tree/details/content/taskInstance/taskActions/Clear.jsx
@@ -29,11 +29,10 @@ import ActionButton from './ActionButton';
 import ConfirmDialog from '../../ConfirmDialog';
 import { useClearTask } from '../../../../api';
 
-const Run = ({
+const Clear = ({
   dagId,
   runId,
   taskId,
-  executionDate,
 }) => {
   const [affectedTasks, setAffectedTasks] = useState([]);
 
@@ -60,7 +59,7 @@ const Run = ({
   const { isOpen, onOpen, onClose } = useDisclosure();
 
   const { mutateAsync: clearTask, isLoading } = useClearTask({
-    dagId, runId, taskId, executionDate,
+    dagId, runId, taskId,
   });
 
   const onClick = async () => {
@@ -128,4 +127,4 @@ const Run = ({
   );
 };
 
-export default Run;
+export default Clear;
diff --git a/airflow/www/templates/airflow/dag.html b/airflow/www/templates/airflow/dag.html
index d77d7f2bc0..a167877779 100644
--- a/airflow/www/templates/airflow/dag.html
+++ b/airflow/www/templates/airflow/dag.html
@@ -310,8 +310,8 @@
             <input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
             <input type="hidden" name="dag_id" value="{{ dag.dag_id }}">
             <input type="hidden" name="task_id">
-            <input type="hidden" name="execution_date">
             <input type="hidden" name="origin" value="{{ request.base_url }}">
+            <input type="hidden" name="dag_run_id">
             <div class="row">
               <span class="btn-group col-xs-12 col-sm-9 task-instance-modal-column" data-toggle="buttons">
                 <label class="btn btn-default btn-sm" title="Also include past task instances when clearing this one">
diff --git a/airflow/www/utils.py b/airflow/www/utils.py
index b07cd0f44f..aa1db26db2 100644
--- a/airflow/www/utils.py
+++ b/airflow/www/utils.py
@@ -150,7 +150,6 @@ def encode_dag_run(dag_run: Optional[models.DagRun]) -> Optional[Dict[str, Any]]
         'start_date': datetime_to_string(dag_run.start_date),
         'end_date': datetime_to_string(dag_run.end_date),
         'state': dag_run.state,
-        'execution_date': datetime_to_string(dag_run.execution_date),
         'data_interval_start': datetime_to_string(dag_run.data_interval_start),
         'data_interval_end': datetime_to_string(dag_run.data_interval_end),
         'run_type': dag_run.run_type,
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 03cd5e6e02..af78539819 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -183,8 +183,12 @@ def get_safe_url(url):
 def get_date_time_num_runs_dag_runs_form_data(www_request, session, dag):
     """Get Execution Data, Base Date & Number of runs from a Request"""
     date_time = www_request.args.get('execution_date')
+    dag_run_id = www_request.args.get('dag_run_id')
     if date_time:
         date_time = timezone.parse(date_time)
+    elif dag_run_id:
+        dr = dag.get_dagrun(run_id=dag_run_id)
+        date_time = dr.execution_date
     else:
         date_time = dag.get_latest_execution_date(session=session) or timezone.utcnow()
 
@@ -1253,13 +1257,20 @@ class Airflow(AirflowBaseView):
         task_id = request.args.get('task_id')
         map_index = request.args.get('map_index', -1, type=int)
         execution_date = request.args.get('execution_date')
-        dttm = timezone.parse(execution_date)
-        form = DateTimeForm(data={'execution_date': dttm})
-        root = request.args.get('root', '')
+        dag_run_id = request.args.get('dag_run_id')
 
         logging.info("Retrieving rendered templates.")
         dag: DAG = current_app.dag_bag.get_dag(dag_id)
-        dag_run = dag.get_dagrun(execution_date=dttm, session=session)
+        if dag_run_id:
+            dag_run = dag.get_dagrun(run_id=dag_run_id)
+            execution_date = dag_run.execution_date
+            dttm = dag_run.execution_date.isoformat()
+        else:
+            dttm = timezone.parse(execution_date)
+            dag_run = dag.get_dagrun(execution_date=dttm, session=session)
+
+        form = DateTimeForm(data={'execution_date': dttm})
+        root = request.args.get('root', '')
         raw_task = dag.get_task(task_id).prepare_for_execution()
 
         ti: TaskInstance
@@ -1354,15 +1365,23 @@ class Airflow(AirflowBaseView):
         dag_id = request.args.get('dag_id')
         task_id = request.args.get('task_id')
         execution_date = request.args.get('execution_date')
-        dttm = timezone.parse(execution_date)
+        dag_run_id = request.args.get('dag_run_id')
+        dag: DAG = current_app.dag_bag.get_dag(dag_id)
+
+        if dag_run_id:
+            dag_run = dag.get_dagrun(run_id=dag_run_id)
+            execution_date = dag_run.execution_date
+            dttm = dag_run.execution_date.isoformat()
+        else:
+            dttm = timezone.parse(execution_date)
+            dag_run = dag.get_dagrun(execution_date=dttm, session=session)
+
         form = DateTimeForm(data={'execution_date': dttm})
         root = request.args.get('root', '')
         map_index = request.args.get('map_index', -1, type=int)
         logging.info("Retrieving rendered templates.")
 
-        dag: DAG = current_app.dag_bag.get_dag(dag_id)
         task = dag.get_task(task_id)
-        dag_run = dag.get_dagrun(execution_date=dttm, session=session)
         ti = dag_run.get_task_instance(task_id=task.task_id, map_index=map_index, session=session)
 
         pod_spec = None
@@ -1412,12 +1431,18 @@ class Airflow(AirflowBaseView):
         dag_id = request.args.get('dag_id')
         task_id = request.args.get('task_id')
         execution_date = request.args.get('execution_date')
+        dag_run_id = request.args.get('dag_run_id')
         map_index = request.args.get('map_index', -1, type=int)
         try_number = request.args.get('try_number', type=int)
         metadata = request.args.get('metadata')
         metadata = json.loads(metadata)
         response_format = request.args.get('format', 'json')
 
+        dag = current_app.dag_bag.get_dag(dag_id)
+        if dag_run_id:
+            dr = dag.get_dagrun(run_id=dag_run_id)
+            execution_date = dr.execution_date.isoformat()
+
         # metadata may be null
         if not metadata:
             metadata = {}
@@ -1457,7 +1482,6 @@ class Airflow(AirflowBaseView):
             )
 
         try:
-            dag = current_app.dag_bag.get_dag(dag_id)
             if dag:
                 ti.task = dag.get_task(ti.task_id)
 
@@ -1495,7 +1519,16 @@ class Airflow(AirflowBaseView):
         task_id = request.args.get('task_id')
         map_index = request.args.get('map_index', -1, type=int)
         execution_date = request.args.get('execution_date')
-        dttm = timezone.parse(execution_date) if execution_date else None
+        dag_run_id = request.args.get('dag_run_id')
+        dag = current_app.dag_bag.get_dag(dag_id)
+
+        if dag_run_id:
+            dr = dag.get_dagrun(run_id=dag_run_id)
+            execution_date = dr.execution_date.isoformat()
+            dttm = dr.execution_date
+        else:
+            dttm = timezone.parse(execution_date) if execution_date else None
+
         form = DateTimeForm(data={'execution_date': dttm})
         dag_model = DagModel.get_dagmodel(dag_id)
 
@@ -1540,8 +1573,15 @@ class Airflow(AirflowBaseView):
     def redirect_to_external_log(self, session=None):
         """Redirects to external log."""
         dag_id = request.args.get('dag_id')
+        dag_run_id = request.args.get('dag_run_id')
         task_id = request.args.get('task_id')
         execution_date = request.args.get('execution_date')
+
+        if dag_run_id:
+            dag = current_app.dag_bag.get_dag(dag_id)
+            dr = dag.get_dagrun(run_id=dag_run_id)
+            execution_date = dr.execution_date.isoformat()
+
         dttm = timezone.parse(execution_date)
         map_index = request.args.get('map_index', -1, type=int)
         try_number = request.args.get('try_number', 1)
@@ -1579,11 +1619,17 @@ class Airflow(AirflowBaseView):
         dag_id = request.args.get('dag_id')
         task_id = request.args.get('task_id')
         execution_date = request.args.get('execution_date')
+        dag_run_id = request.args.get('dag_run_id')
+        dag = current_app.dag_bag.get_dag(dag_id)
+
+        if dag_run_id:
+            dr = dag.get_dagrun(run_id=dag_run_id)
+            execution_date = dr.execution_date.isoformat()
+
         dttm = timezone.parse(execution_date)
         map_index = request.args.get('map_index', -1, type=int)
         form = DateTimeForm(data={'execution_date': dttm})
         root = request.args.get('root', '')
-        dag = current_app.dag_bag.get_dag(dag_id)
 
         if not dag or task_id not in dag.task_ids:
             flash(f"Task [{dag_id}.{task_id}] doesn't seem to exist at the moment", "error")
@@ -2019,9 +2065,8 @@ class Airflow(AirflowBaseView):
         task_id = request.form.get('task_id')
         origin = get_safe_url(request.form.get('origin'))
         dag = current_app.dag_bag.get_dag(dag_id)
+        dag_run_id = request.form.get('dag_run_id')
 
-        execution_date = request.form.get('execution_date')
-        execution_date = timezone.parse(execution_date)
         confirmed = request.form.get('confirmed') == "true"
         upstream = request.form.get('upstream') == "true"
         downstream = request.form.get('downstream') == "true"
@@ -2030,13 +2075,17 @@ class Airflow(AirflowBaseView):
         recursive = request.form.get('recursive') == "true"
         only_failed = request.form.get('only_failed') == "true"
 
+        dr = dag.get_dagrun(run_id=dag_run_id)
+        start_date = dr.logical_date
+        end_date = dr.logical_date
+        end_date = dr.logical_date if not future else None
+        start_date = dr.logical_date if not past else None
+
         dag = dag.partial_subset(
             task_ids_or_regex=fr"^{task_id}$",
             include_downstream=downstream,
             include_upstream=upstream,
         )
-        end_date = execution_date if not future else None
-        start_date = execution_date if not past else None
 
         return self._clear_dag_tis(
             dag,
@@ -2741,8 +2790,13 @@ class Airflow(AirflowBaseView):
         dt_nr_dr_data = get_date_time_num_runs_dag_runs_form_data(request, session, dag)
         dt_nr_dr_data['arrange'] = arrange
         dttm = dt_nr_dr_data['dttm']
-        dag_run = dag.get_dagrun(execution_date=dttm)
-        dag_run_id = dag_run.run_id if dag_run else None
+        dag_run_id = request.args.get('dag_run_id')
+        if dag_run_id:
+            dag_run = dag.get_dagrun(run_id=dag_run_id)
+            dttm = dag_run.execution_date
+        else:
+            dag_run = dag.get_dagrun(execution_date=dttm)
+            dag_run_id = dag_run.run_id if dag_run else None
 
         class GraphForm(DateTimeWithNumRunsWithDagRunsForm):
             """Graph Form class."""
@@ -3313,10 +3367,17 @@ class Airflow(AirflowBaseView):
         task_id = request.args.get('task_id')
         map_index = request.args.get('map_index', -1, type=int)
         execution_date = request.args.get('execution_date')
+        dag_run_id = request.args.get('dag_run_id')
         link_name = request.args.get('link_name')
-        dttm = timezone.parse(execution_date)
+
         dag = current_app.dag_bag.get_dag(dag_id)
 
+        if dag_run_id:
+            dr = dag.get_dagrun(run_id=dag_run_id)
+            dttm = dr.execution_date.isoformat()
+        else:
+            dttm = timezone.parse(execution_date)
+
         if not dag or task_id not in dag.task_ids:
             response = jsonify(
                 {