You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bb...@apache.org on 2022/04/05 21:00:26 UTC

[airflow] 02/02: Use dag_run_id instead of execution_date

This is an automated email from the ASF dual-hosted git repository.

bbovenzi pushed a commit to branch webserver-use-dag-run-id
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit de650c73fc4ddb3c24e070bad9ec0aff61e03149
Author: Brent Bovenzi <br...@gmail.com>
AuthorDate: Tue Apr 5 16:58:29 2022 -0400

    Use dag_run_id instead of execution_date
---
 airflow/www/static/js/tree/api/useClearTask.js     |  3 +-
 airflow/www/static/js/tree/api/useExtraLinks.js    |  6 +-
 .../js/tree/details/content/dagRun/index.jsx       |  3 +-
 .../details/content/taskInstance/ExtraLinks.jsx    |  4 +-
 .../js/tree/details/content/taskInstance/Logs.jsx  |  6 +-
 .../js/tree/details/content/taskInstance/Nav.jsx   |  9 +-
 .../js/tree/details/content/taskInstance/index.jsx | 17 ++--
 .../content/taskInstance/taskActions/Clear.jsx     |  7 +-
 airflow/www/templates/airflow/dag.html             |  2 +-
 airflow/www/utils.py                               |  1 -
 airflow/www/views.py                               | 95 ++++++++++++++++++----
 11 files changed, 100 insertions(+), 53 deletions(-)

diff --git a/airflow/www/static/js/tree/api/useClearTask.js b/airflow/www/static/js/tree/api/useClearTask.js
index bcf99bb250..8e2ff52e2a 100644
--- a/airflow/www/static/js/tree/api/useClearTask.js
+++ b/airflow/www/static/js/tree/api/useClearTask.js
@@ -27,7 +27,7 @@ const csrfToken = getMetaValue('csrf_token');
 const clearUrl = getMetaValue('clear_url');
 
 export default function useClearTask({
-  dagId, runId, taskId, executionDate,
+  dagId, runId, taskId,
 }) {
   const queryClient = useQueryClient();
   const toast = useToast();
@@ -44,7 +44,6 @@ export default function useClearTask({
         dag_run_id: runId,
         task_id: taskId,
         confirmed,
-        execution_date: executionDate,
         past,
         future,
         upstream,
diff --git a/airflow/www/static/js/tree/api/useExtraLinks.js b/airflow/www/static/js/tree/api/useExtraLinks.js
index ecc92ddc80..d88cc9b171 100644
--- a/airflow/www/static/js/tree/api/useExtraLinks.js
+++ b/airflow/www/static/js/tree/api/useExtraLinks.js
@@ -24,16 +24,16 @@ import { getMetaValue } from '../../utils';
 const extraLinksUrl = getMetaValue('extra_links_url');
 
 export default function useExtraLinks({
-  dagId, taskId, executionDate, extraLinks,
+  dagId, taskId, runId, extraLinks,
 }) {
   return useQuery(
-    ['extraLinks', dagId, taskId, executionDate],
+    ['extraLinks', dagId, taskId, runId],
     async () => {
       const data = await Promise.all(extraLinks.map(async (link) => {
         const url = `${extraLinksUrl
         }?task_id=${encodeURIComponent(taskId)
         }&dag_id=${encodeURIComponent(dagId)
-        }&execution_date=${encodeURIComponent(executionDate)
+        }&dag_run_id=${encodeURIComponent(runId)
         }&link_name=${encodeURIComponent(link)}`;
         try {
           const datum = await axios.get(url);
diff --git a/airflow/www/static/js/tree/details/content/dagRun/index.jsx b/airflow/www/static/js/tree/details/content/dagRun/index.jsx
index fd609b7ad0..dd77e59119 100644
--- a/airflow/www/static/js/tree/details/content/dagRun/index.jsx
+++ b/airflow/www/static/js/tree/details/content/dagRun/index.jsx
@@ -46,7 +46,6 @@ const DagRun = ({ runId }) => {
   const run = dagRuns.find((dr) => dr.runId === runId);
   if (!run) return null;
   const {
-    executionDate,
     state,
     runType,
     duration,
@@ -60,7 +59,7 @@ const DagRun = ({ runId }) => {
     run_id: runId,
   }).toString();
   const graphParams = new URLSearchParams({
-    execution_date: executionDate,
+    dag_run_id: runId,
   }).toString();
   const graphLink = appendSearchParams(graphUrl, graphParams);
   const detailsLink = appendSearchParams(dagRunDetailsUrl, detailsParams);
diff --git a/airflow/www/static/js/tree/details/content/taskInstance/ExtraLinks.jsx b/airflow/www/static/js/tree/details/content/taskInstance/ExtraLinks.jsx
index b9b60b4c01..e53c1583ad 100644
--- a/airflow/www/static/js/tree/details/content/taskInstance/ExtraLinks.jsx
+++ b/airflow/www/static/js/tree/details/content/taskInstance/ExtraLinks.jsx
@@ -30,11 +30,11 @@ import { useExtraLinks } from '../../../api';
 const ExtraLinks = ({
   dagId,
   taskId,
-  executionDate,
+  runId,
   extraLinks = [],
 }) => {
   const { data: links = [] } = useExtraLinks({
-    dagId, taskId, executionDate, extraLinks,
+    dagId, taskId, runId, extraLinks,
   });
 
   if (!links.length) return null;
diff --git a/airflow/www/static/js/tree/details/content/taskInstance/Logs.jsx b/airflow/www/static/js/tree/details/content/taskInstance/Logs.jsx
index c89e1d3364..462503aebe 100644
--- a/airflow/www/static/js/tree/details/content/taskInstance/Logs.jsx
+++ b/airflow/www/static/js/tree/details/content/taskInstance/Logs.jsx
@@ -39,7 +39,7 @@ const LinkButton = ({ children, ...rest }) => (<Button as={Link} variant="ghost"
 const Logs = ({
   dagId,
   taskId,
-  executionDate,
+  runId,
   tryNumber,
 }) => {
   const externalLogs = [];
@@ -53,7 +53,7 @@ const Logs = ({
       const fullExternalUrl = `${externalLogUrl
       }?dag_id=${encodeURIComponent(dagId)
       }&task_id=${encodeURIComponent(taskId)
-      }&execution_date=${encodeURIComponent(executionDate)
+      }&dag_run_id=${encodeURIComponent(runId)
       }&try_number=${index}`;
       externalLogs.push(
         <LinkButton
@@ -70,7 +70,7 @@ const Logs = ({
     const fullMetadataUrl = `${logsWithMetadataUrl
     }?dag_id=${encodeURIComponent(dagId)
     }&task_id=${encodeURIComponent(taskId)
-    }&execution_date=${encodeURIComponent(executionDate)
+    }&dag_run_id=${encodeURIComponent(runId)
     }&metadata=null&format=file${index > 0 && `&try_number=${index}`}`;
 
     return (
diff --git a/airflow/www/static/js/tree/details/content/taskInstance/Nav.jsx b/airflow/www/static/js/tree/details/content/taskInstance/Nav.jsx
index 17b423130a..29dd79c0a7 100644
--- a/airflow/www/static/js/tree/details/content/taskInstance/Nav.jsx
+++ b/airflow/www/static/js/tree/details/content/taskInstance/Nav.jsx
@@ -42,11 +42,11 @@ const gridUrlNoRoot = getMetaValue('grid_url_no_root');
 const LinkButton = ({ children, ...rest }) => (<Button as={Link} variant="ghost" colorScheme="blue" {...rest}>{children}</Button>);
 
 const Nav = ({
-  taskId, executionDate, operator, isMapped,
+  taskId, operator, isMapped, runId,
 }) => {
   const params = new URLSearchParams({
     task_id: taskId,
-    execution_date: executionDate,
+    dag_run_id: runId,
   }).toString();
   const detailsLink = `${taskUrl}&${params}`;
   const renderedLink = `${renderedTemplatesUrl}&${params}`;
@@ -57,9 +57,6 @@ const Nav = ({
     _flt_3_task_id: taskId,
     _oc_TaskInstanceModelView: 'dag_run.execution_date',
   });
-  const subDagParams = new URLSearchParams({
-    execution_date: executionDate,
-  }).toString();
 
   const filterParams = new URLSearchParams({
     base_date: baseDate,
@@ -70,7 +67,7 @@ const Nav = ({
   const allInstancesLink = `${taskInstancesUrl}?${listParams.toString()}`;
 
   const filterUpstreamLink = appendSearchParams(gridUrlNoRoot, filterParams);
-  const subDagLink = appendSearchParams(gridUrl.replace(dagId, `${dagId}.${taskId}`), subDagParams);
+  const subDagLink = gridUrl.replace(dagId, `${dagId}.${taskId}`);
 
   // TODO: base subdag zooming as its own attribute instead of via operator name
   const isSubDag = operator === 'SubDagOperator';
diff --git a/airflow/www/static/js/tree/details/content/taskInstance/index.jsx b/airflow/www/static/js/tree/details/content/taskInstance/index.jsx
index 93d22dbde7..a48aceac5e 100644
--- a/airflow/www/static/js/tree/details/content/taskInstance/index.jsx
+++ b/airflow/www/static/js/tree/details/content/taskInstance/index.jsx
@@ -54,10 +54,8 @@ const getTask = ({ taskId, runId, task }) => {
 };
 
 const TaskInstance = ({ taskId, runId }) => {
-  const { data: { groups = {}, dagRuns = [] } } = useTreeData();
+  const { data: { groups = {} } } = useTreeData();
   const group = getTask({ taskId, runId, task: groups });
-  const run = dagRuns.find((r) => r.runId === runId);
-  const { executionDate } = run;
   const { data: { tasks } } = useTasks(dagId);
   if (!group) return null;
   const task = tasks.find((t) => t.taskId === taskId);
@@ -74,20 +72,15 @@ const TaskInstance = ({ taskId, runId }) => {
         <TaskNav
           taskId={taskId}
           isMapped={isMapped}
-          executionDate={executionDate}
           operator={operator}
+          runId={runId}
         />
       )}
       {!isGroup && (
         <>
           <VStack justifyContent="center" divider={<StackDivider my={3} />} my={3}>
             <RunAction runId={runId} taskId={taskId} dagId={dagId} />
-            <ClearAction
-              runId={runId}
-              taskId={taskId}
-              dagId={dagId}
-              executionDate={executionDate}
-            />
+            <ClearAction runId={runId} taskId={taskId} dagId={dagId} />
             <MarkFailedAction runId={runId} taskId={taskId} dagId={dagId} />
             <MarkSuccessAction runId={runId} taskId={taskId} dagId={dagId} />
           </VStack>
@@ -98,7 +91,7 @@ const TaskInstance = ({ taskId, runId }) => {
         <Logs
           dagId={dagId}
           taskId={taskId}
-          executionDate={executionDate}
+          runId={runId}
           tryNumber={instance.tryNumber}
         />
       )}
@@ -106,7 +99,7 @@ const TaskInstance = ({ taskId, runId }) => {
       <ExtraLinks
         taskId={taskId}
         dagId={dagId}
-        executionDate={executionDate}
+        runId={runId}
         extraLinks={extraLinks}
       />
       {isMapped && (
diff --git a/airflow/www/static/js/tree/details/content/taskInstance/taskActions/Clear.jsx b/airflow/www/static/js/tree/details/content/taskInstance/taskActions/Clear.jsx
index 4196edc6b9..ec7729cf07 100644
--- a/airflow/www/static/js/tree/details/content/taskInstance/taskActions/Clear.jsx
+++ b/airflow/www/static/js/tree/details/content/taskInstance/taskActions/Clear.jsx
@@ -29,11 +29,10 @@ import ActionButton from './ActionButton';
 import ConfirmDialog from '../../ConfirmDialog';
 import { useClearTask } from '../../../../api';
 
-const Run = ({
+const Clear = ({
   dagId,
   runId,
   taskId,
-  executionDate,
 }) => {
   const [affectedTasks, setAffectedTasks] = useState([]);
 
@@ -60,7 +59,7 @@ const Run = ({
   const { isOpen, onOpen, onClose } = useDisclosure();
 
   const { mutateAsync: clearTask, isLoading } = useClearTask({
-    dagId, runId, taskId, executionDate,
+    dagId, runId, taskId,
   });
 
   const onClick = async () => {
@@ -128,4 +127,4 @@ const Run = ({
   );
 };
 
-export default Run;
+export default Clear;
diff --git a/airflow/www/templates/airflow/dag.html b/airflow/www/templates/airflow/dag.html
index d77d7f2bc0..a167877779 100644
--- a/airflow/www/templates/airflow/dag.html
+++ b/airflow/www/templates/airflow/dag.html
@@ -310,8 +310,8 @@
             <input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
             <input type="hidden" name="dag_id" value="{{ dag.dag_id }}">
             <input type="hidden" name="task_id">
-            <input type="hidden" name="execution_date">
             <input type="hidden" name="origin" value="{{ request.base_url }}">
+            <input type="hidden" name="dag_run_id">
             <div class="row">
               <span class="btn-group col-xs-12 col-sm-9 task-instance-modal-column" data-toggle="buttons">
                 <label class="btn btn-default btn-sm" title="Also include past task instances when clearing this one">
diff --git a/airflow/www/utils.py b/airflow/www/utils.py
index b07cd0f44f..aa1db26db2 100644
--- a/airflow/www/utils.py
+++ b/airflow/www/utils.py
@@ -150,7 +150,6 @@ def encode_dag_run(dag_run: Optional[models.DagRun]) -> Optional[Dict[str, Any]]
         'start_date': datetime_to_string(dag_run.start_date),
         'end_date': datetime_to_string(dag_run.end_date),
         'state': dag_run.state,
-        'execution_date': datetime_to_string(dag_run.execution_date),
         'data_interval_start': datetime_to_string(dag_run.data_interval_start),
         'data_interval_end': datetime_to_string(dag_run.data_interval_end),
         'run_type': dag_run.run_type,
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 03cd5e6e02..af78539819 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -183,8 +183,12 @@ def get_safe_url(url):
 def get_date_time_num_runs_dag_runs_form_data(www_request, session, dag):
     """Get Execution Data, Base Date & Number of runs from a Request"""
     date_time = www_request.args.get('execution_date')
+    dag_run_id = www_request.args.get('dag_run_id')
     if date_time:
         date_time = timezone.parse(date_time)
+    elif dag_run_id:
+        dr = dag.get_dagrun(run_id=dag_run_id)
+        date_time = dr.execution_date
     else:
         date_time = dag.get_latest_execution_date(session=session) or timezone.utcnow()
 
@@ -1253,13 +1257,20 @@ class Airflow(AirflowBaseView):
         task_id = request.args.get('task_id')
         map_index = request.args.get('map_index', -1, type=int)
         execution_date = request.args.get('execution_date')
-        dttm = timezone.parse(execution_date)
-        form = DateTimeForm(data={'execution_date': dttm})
-        root = request.args.get('root', '')
+        dag_run_id = request.args.get('dag_run_id')
 
         logging.info("Retrieving rendered templates.")
         dag: DAG = current_app.dag_bag.get_dag(dag_id)
-        dag_run = dag.get_dagrun(execution_date=dttm, session=session)
+        if dag_run_id:
+            dag_run = dag.get_dagrun(run_id=dag_run_id)
+            execution_date = dag_run.execution_date
+            dttm = dag_run.execution_date.isoformat()
+        else:
+            dttm = timezone.parse(execution_date)
+            dag_run = dag.get_dagrun(execution_date=dttm, session=session)
+
+        form = DateTimeForm(data={'execution_date': dttm})
+        root = request.args.get('root', '')
         raw_task = dag.get_task(task_id).prepare_for_execution()
 
         ti: TaskInstance
@@ -1354,15 +1365,23 @@ class Airflow(AirflowBaseView):
         dag_id = request.args.get('dag_id')
         task_id = request.args.get('task_id')
         execution_date = request.args.get('execution_date')
-        dttm = timezone.parse(execution_date)
+        dag_run_id = request.args.get('dag_run_id')
+        dag: DAG = current_app.dag_bag.get_dag(dag_id)
+
+        if dag_run_id:
+            dag_run = dag.get_dagrun(run_id=dag_run_id)
+            execution_date = dag_run.execution_date
+            dttm = dag_run.execution_date.isoformat()
+        else:
+            dttm = timezone.parse(execution_date)
+            dag_run = dag.get_dagrun(execution_date=dttm, session=session)
+
         form = DateTimeForm(data={'execution_date': dttm})
         root = request.args.get('root', '')
         map_index = request.args.get('map_index', -1, type=int)
         logging.info("Retrieving rendered templates.")
 
-        dag: DAG = current_app.dag_bag.get_dag(dag_id)
         task = dag.get_task(task_id)
-        dag_run = dag.get_dagrun(execution_date=dttm, session=session)
         ti = dag_run.get_task_instance(task_id=task.task_id, map_index=map_index, session=session)
 
         pod_spec = None
@@ -1412,12 +1431,18 @@ class Airflow(AirflowBaseView):
         dag_id = request.args.get('dag_id')
         task_id = request.args.get('task_id')
         execution_date = request.args.get('execution_date')
+        dag_run_id = request.args.get('dag_run_id')
         map_index = request.args.get('map_index', -1, type=int)
         try_number = request.args.get('try_number', type=int)
         metadata = request.args.get('metadata')
         metadata = json.loads(metadata)
         response_format = request.args.get('format', 'json')
 
+        dag = current_app.dag_bag.get_dag(dag_id)
+        if dag_run_id:
+            dr = dag.get_dagrun(run_id=dag_run_id)
+            execution_date = dr.execution_date.isoformat()
+
         # metadata may be null
         if not metadata:
             metadata = {}
@@ -1457,7 +1482,6 @@ class Airflow(AirflowBaseView):
             )
 
         try:
-            dag = current_app.dag_bag.get_dag(dag_id)
             if dag:
                 ti.task = dag.get_task(ti.task_id)
 
@@ -1495,7 +1519,16 @@ class Airflow(AirflowBaseView):
         task_id = request.args.get('task_id')
         map_index = request.args.get('map_index', -1, type=int)
         execution_date = request.args.get('execution_date')
-        dttm = timezone.parse(execution_date) if execution_date else None
+        dag_run_id = request.args.get('dag_run_id')
+        dag = current_app.dag_bag.get_dag(dag_id)
+
+        if dag_run_id:
+            dr = dag.get_dagrun(run_id=dag_run_id)
+            execution_date = dr.execution_date.isoformat()
+            dttm = dr.execution_date
+        else:
+            dttm = timezone.parse(execution_date) if execution_date else None
+
         form = DateTimeForm(data={'execution_date': dttm})
         dag_model = DagModel.get_dagmodel(dag_id)
 
@@ -1540,8 +1573,15 @@ class Airflow(AirflowBaseView):
     def redirect_to_external_log(self, session=None):
         """Redirects to external log."""
         dag_id = request.args.get('dag_id')
+        dag_run_id = request.args.get('dag_run_id')
         task_id = request.args.get('task_id')
         execution_date = request.args.get('execution_date')
+
+        if dag_run_id:
+            dag = current_app.dag_bag.get_dag(dag_id)
+            dr = dag.get_dagrun(run_id=dag_run_id)
+            execution_date = dr.execution_date.isoformat()
+
         dttm = timezone.parse(execution_date)
         map_index = request.args.get('map_index', -1, type=int)
         try_number = request.args.get('try_number', 1)
@@ -1579,11 +1619,17 @@ class Airflow(AirflowBaseView):
         dag_id = request.args.get('dag_id')
         task_id = request.args.get('task_id')
         execution_date = request.args.get('execution_date')
+        dag_run_id = request.args.get('dag_run_id')
+        dag = current_app.dag_bag.get_dag(dag_id)
+
+        if dag_run_id:
+            dr = dag.get_dagrun(run_id=dag_run_id)
+            execution_date = dr.execution_date.isoformat()
+
         dttm = timezone.parse(execution_date)
         map_index = request.args.get('map_index', -1, type=int)
         form = DateTimeForm(data={'execution_date': dttm})
         root = request.args.get('root', '')
-        dag = current_app.dag_bag.get_dag(dag_id)
 
         if not dag or task_id not in dag.task_ids:
             flash(f"Task [{dag_id}.{task_id}] doesn't seem to exist at the moment", "error")
@@ -2019,9 +2065,8 @@ class Airflow(AirflowBaseView):
         task_id = request.form.get('task_id')
         origin = get_safe_url(request.form.get('origin'))
         dag = current_app.dag_bag.get_dag(dag_id)
+        dag_run_id = request.form.get('dag_run_id')
 
-        execution_date = request.form.get('execution_date')
-        execution_date = timezone.parse(execution_date)
         confirmed = request.form.get('confirmed') == "true"
         upstream = request.form.get('upstream') == "true"
         downstream = request.form.get('downstream') == "true"
@@ -2030,13 +2075,17 @@ class Airflow(AirflowBaseView):
         recursive = request.form.get('recursive') == "true"
         only_failed = request.form.get('only_failed') == "true"
 
+        dr = dag.get_dagrun(run_id=dag_run_id)
+        start_date = dr.logical_date
+        end_date = dr.logical_date
+        end_date = dr.logical_date if not future else None
+        start_date = dr.logical_date if not past else None
+
         dag = dag.partial_subset(
             task_ids_or_regex=fr"^{task_id}$",
             include_downstream=downstream,
             include_upstream=upstream,
         )
-        end_date = execution_date if not future else None
-        start_date = execution_date if not past else None
 
         return self._clear_dag_tis(
             dag,
@@ -2741,8 +2790,13 @@ class Airflow(AirflowBaseView):
         dt_nr_dr_data = get_date_time_num_runs_dag_runs_form_data(request, session, dag)
         dt_nr_dr_data['arrange'] = arrange
         dttm = dt_nr_dr_data['dttm']
-        dag_run = dag.get_dagrun(execution_date=dttm)
-        dag_run_id = dag_run.run_id if dag_run else None
+        dag_run_id = request.args.get('dag_run_id')
+        if dag_run_id:
+            dag_run = dag.get_dagrun(run_id=dag_run_id)
+            dttm = dag_run.execution_date
+        else:
+            dag_run = dag.get_dagrun(execution_date=dttm)
+            dag_run_id = dag_run.run_id if dag_run else None
 
         class GraphForm(DateTimeWithNumRunsWithDagRunsForm):
             """Graph Form class."""
@@ -3313,10 +3367,17 @@ class Airflow(AirflowBaseView):
         task_id = request.args.get('task_id')
         map_index = request.args.get('map_index', -1, type=int)
         execution_date = request.args.get('execution_date')
+        dag_run_id = request.args.get('dag_run_id')
         link_name = request.args.get('link_name')
-        dttm = timezone.parse(execution_date)
+
         dag = current_app.dag_bag.get_dag(dag_id)
 
+        if dag_run_id:
+            dr = dag.get_dagrun(run_id=dag_run_id)
+            dttm = dr.execution_date.isoformat()
+        else:
+            dttm = timezone.parse(execution_date)
+
         if not dag or task_id not in dag.task_ids:
             response = jsonify(
                 {