You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bb...@apache.org on 2022/04/05 21:00:26 UTC
[airflow] 02/02: Use dag_run_id instead of execution_date
This is an automated email from the ASF dual-hosted git repository.
bbovenzi pushed a commit to branch webserver-use-dag-run-id
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit de650c73fc4ddb3c24e070bad9ec0aff61e03149
Author: Brent Bovenzi <br...@gmail.com>
AuthorDate: Tue Apr 5 16:58:29 2022 -0400
Use dag_run_id instead of execution_date
---
airflow/www/static/js/tree/api/useClearTask.js | 3 +-
airflow/www/static/js/tree/api/useExtraLinks.js | 6 +-
.../js/tree/details/content/dagRun/index.jsx | 3 +-
.../details/content/taskInstance/ExtraLinks.jsx | 4 +-
.../js/tree/details/content/taskInstance/Logs.jsx | 6 +-
.../js/tree/details/content/taskInstance/Nav.jsx | 9 +-
.../js/tree/details/content/taskInstance/index.jsx | 17 ++--
.../content/taskInstance/taskActions/Clear.jsx | 7 +-
airflow/www/templates/airflow/dag.html | 2 +-
airflow/www/utils.py | 1 -
airflow/www/views.py | 95 ++++++++++++++++++----
11 files changed, 100 insertions(+), 53 deletions(-)
diff --git a/airflow/www/static/js/tree/api/useClearTask.js b/airflow/www/static/js/tree/api/useClearTask.js
index bcf99bb250..8e2ff52e2a 100644
--- a/airflow/www/static/js/tree/api/useClearTask.js
+++ b/airflow/www/static/js/tree/api/useClearTask.js
@@ -27,7 +27,7 @@ const csrfToken = getMetaValue('csrf_token');
const clearUrl = getMetaValue('clear_url');
export default function useClearTask({
- dagId, runId, taskId, executionDate,
+ dagId, runId, taskId,
}) {
const queryClient = useQueryClient();
const toast = useToast();
@@ -44,7 +44,6 @@ export default function useClearTask({
dag_run_id: runId,
task_id: taskId,
confirmed,
- execution_date: executionDate,
past,
future,
upstream,
diff --git a/airflow/www/static/js/tree/api/useExtraLinks.js b/airflow/www/static/js/tree/api/useExtraLinks.js
index ecc92ddc80..d88cc9b171 100644
--- a/airflow/www/static/js/tree/api/useExtraLinks.js
+++ b/airflow/www/static/js/tree/api/useExtraLinks.js
@@ -24,16 +24,16 @@ import { getMetaValue } from '../../utils';
const extraLinksUrl = getMetaValue('extra_links_url');
export default function useExtraLinks({
- dagId, taskId, executionDate, extraLinks,
+ dagId, taskId, runId, extraLinks,
}) {
return useQuery(
- ['extraLinks', dagId, taskId, executionDate],
+ ['extraLinks', dagId, taskId, runId],
async () => {
const data = await Promise.all(extraLinks.map(async (link) => {
const url = `${extraLinksUrl
}?task_id=${encodeURIComponent(taskId)
}&dag_id=${encodeURIComponent(dagId)
- }&execution_date=${encodeURIComponent(executionDate)
+ }&dag_run_id=${encodeURIComponent(runId)
}&link_name=${encodeURIComponent(link)}`;
try {
const datum = await axios.get(url);
diff --git a/airflow/www/static/js/tree/details/content/dagRun/index.jsx b/airflow/www/static/js/tree/details/content/dagRun/index.jsx
index fd609b7ad0..dd77e59119 100644
--- a/airflow/www/static/js/tree/details/content/dagRun/index.jsx
+++ b/airflow/www/static/js/tree/details/content/dagRun/index.jsx
@@ -46,7 +46,6 @@ const DagRun = ({ runId }) => {
const run = dagRuns.find((dr) => dr.runId === runId);
if (!run) return null;
const {
- executionDate,
state,
runType,
duration,
@@ -60,7 +59,7 @@ const DagRun = ({ runId }) => {
run_id: runId,
}).toString();
const graphParams = new URLSearchParams({
- execution_date: executionDate,
+ dag_run_id: runId,
}).toString();
const graphLink = appendSearchParams(graphUrl, graphParams);
const detailsLink = appendSearchParams(dagRunDetailsUrl, detailsParams);
diff --git a/airflow/www/static/js/tree/details/content/taskInstance/ExtraLinks.jsx b/airflow/www/static/js/tree/details/content/taskInstance/ExtraLinks.jsx
index b9b60b4c01..e53c1583ad 100644
--- a/airflow/www/static/js/tree/details/content/taskInstance/ExtraLinks.jsx
+++ b/airflow/www/static/js/tree/details/content/taskInstance/ExtraLinks.jsx
@@ -30,11 +30,11 @@ import { useExtraLinks } from '../../../api';
const ExtraLinks = ({
dagId,
taskId,
- executionDate,
+ runId,
extraLinks = [],
}) => {
const { data: links = [] } = useExtraLinks({
- dagId, taskId, executionDate, extraLinks,
+ dagId, taskId, runId, extraLinks,
});
if (!links.length) return null;
diff --git a/airflow/www/static/js/tree/details/content/taskInstance/Logs.jsx b/airflow/www/static/js/tree/details/content/taskInstance/Logs.jsx
index c89e1d3364..462503aebe 100644
--- a/airflow/www/static/js/tree/details/content/taskInstance/Logs.jsx
+++ b/airflow/www/static/js/tree/details/content/taskInstance/Logs.jsx
@@ -39,7 +39,7 @@ const LinkButton = ({ children, ...rest }) => (<Button as={Link} variant="ghost"
const Logs = ({
dagId,
taskId,
- executionDate,
+ runId,
tryNumber,
}) => {
const externalLogs = [];
@@ -53,7 +53,7 @@ const Logs = ({
const fullExternalUrl = `${externalLogUrl
}?dag_id=${encodeURIComponent(dagId)
}&task_id=${encodeURIComponent(taskId)
- }&execution_date=${encodeURIComponent(executionDate)
+ }&dag_run_id=${encodeURIComponent(runId)
}&try_number=${index}`;
externalLogs.push(
<LinkButton
@@ -70,7 +70,7 @@ const Logs = ({
const fullMetadataUrl = `${logsWithMetadataUrl
}?dag_id=${encodeURIComponent(dagId)
}&task_id=${encodeURIComponent(taskId)
- }&execution_date=${encodeURIComponent(executionDate)
+ }&dag_run_id=${encodeURIComponent(runId)
}&metadata=null&format=file${index > 0 && `&try_number=${index}`}`;
return (
diff --git a/airflow/www/static/js/tree/details/content/taskInstance/Nav.jsx b/airflow/www/static/js/tree/details/content/taskInstance/Nav.jsx
index 17b423130a..29dd79c0a7 100644
--- a/airflow/www/static/js/tree/details/content/taskInstance/Nav.jsx
+++ b/airflow/www/static/js/tree/details/content/taskInstance/Nav.jsx
@@ -42,11 +42,11 @@ const gridUrlNoRoot = getMetaValue('grid_url_no_root');
const LinkButton = ({ children, ...rest }) => (<Button as={Link} variant="ghost" colorScheme="blue" {...rest}>{children}</Button>);
const Nav = ({
- taskId, executionDate, operator, isMapped,
+ taskId, operator, isMapped, runId,
}) => {
const params = new URLSearchParams({
task_id: taskId,
- execution_date: executionDate,
+ dag_run_id: runId,
}).toString();
const detailsLink = `${taskUrl}&${params}`;
const renderedLink = `${renderedTemplatesUrl}&${params}`;
@@ -57,9 +57,6 @@ const Nav = ({
_flt_3_task_id: taskId,
_oc_TaskInstanceModelView: 'dag_run.execution_date',
});
- const subDagParams = new URLSearchParams({
- execution_date: executionDate,
- }).toString();
const filterParams = new URLSearchParams({
base_date: baseDate,
@@ -70,7 +67,7 @@ const Nav = ({
const allInstancesLink = `${taskInstancesUrl}?${listParams.toString()}`;
const filterUpstreamLink = appendSearchParams(gridUrlNoRoot, filterParams);
- const subDagLink = appendSearchParams(gridUrl.replace(dagId, `${dagId}.${taskId}`), subDagParams);
+ const subDagLink = gridUrl.replace(dagId, `${dagId}.${taskId}`);
// TODO: base subdag zooming as its own attribute instead of via operator name
const isSubDag = operator === 'SubDagOperator';
diff --git a/airflow/www/static/js/tree/details/content/taskInstance/index.jsx b/airflow/www/static/js/tree/details/content/taskInstance/index.jsx
index 93d22dbde7..a48aceac5e 100644
--- a/airflow/www/static/js/tree/details/content/taskInstance/index.jsx
+++ b/airflow/www/static/js/tree/details/content/taskInstance/index.jsx
@@ -54,10 +54,8 @@ const getTask = ({ taskId, runId, task }) => {
};
const TaskInstance = ({ taskId, runId }) => {
- const { data: { groups = {}, dagRuns = [] } } = useTreeData();
+ const { data: { groups = {} } } = useTreeData();
const group = getTask({ taskId, runId, task: groups });
- const run = dagRuns.find((r) => r.runId === runId);
- const { executionDate } = run;
const { data: { tasks } } = useTasks(dagId);
if (!group) return null;
const task = tasks.find((t) => t.taskId === taskId);
@@ -74,20 +72,15 @@ const TaskInstance = ({ taskId, runId }) => {
<TaskNav
taskId={taskId}
isMapped={isMapped}
- executionDate={executionDate}
operator={operator}
+ runId={runId}
/>
)}
{!isGroup && (
<>
<VStack justifyContent="center" divider={<StackDivider my={3} />} my={3}>
<RunAction runId={runId} taskId={taskId} dagId={dagId} />
- <ClearAction
- runId={runId}
- taskId={taskId}
- dagId={dagId}
- executionDate={executionDate}
- />
+ <ClearAction runId={runId} taskId={taskId} dagId={dagId} />
<MarkFailedAction runId={runId} taskId={taskId} dagId={dagId} />
<MarkSuccessAction runId={runId} taskId={taskId} dagId={dagId} />
</VStack>
@@ -98,7 +91,7 @@ const TaskInstance = ({ taskId, runId }) => {
<Logs
dagId={dagId}
taskId={taskId}
- executionDate={executionDate}
+ runId={runId}
tryNumber={instance.tryNumber}
/>
)}
@@ -106,7 +99,7 @@ const TaskInstance = ({ taskId, runId }) => {
<ExtraLinks
taskId={taskId}
dagId={dagId}
- executionDate={executionDate}
+ runId={runId}
extraLinks={extraLinks}
/>
{isMapped && (
diff --git a/airflow/www/static/js/tree/details/content/taskInstance/taskActions/Clear.jsx b/airflow/www/static/js/tree/details/content/taskInstance/taskActions/Clear.jsx
index 4196edc6b9..ec7729cf07 100644
--- a/airflow/www/static/js/tree/details/content/taskInstance/taskActions/Clear.jsx
+++ b/airflow/www/static/js/tree/details/content/taskInstance/taskActions/Clear.jsx
@@ -29,11 +29,10 @@ import ActionButton from './ActionButton';
import ConfirmDialog from '../../ConfirmDialog';
import { useClearTask } from '../../../../api';
-const Run = ({
+const Clear = ({
dagId,
runId,
taskId,
- executionDate,
}) => {
const [affectedTasks, setAffectedTasks] = useState([]);
@@ -60,7 +59,7 @@ const Run = ({
const { isOpen, onOpen, onClose } = useDisclosure();
const { mutateAsync: clearTask, isLoading } = useClearTask({
- dagId, runId, taskId, executionDate,
+ dagId, runId, taskId,
});
const onClick = async () => {
@@ -128,4 +127,4 @@ const Run = ({
);
};
-export default Run;
+export default Clear;
diff --git a/airflow/www/templates/airflow/dag.html b/airflow/www/templates/airflow/dag.html
index d77d7f2bc0..a167877779 100644
--- a/airflow/www/templates/airflow/dag.html
+++ b/airflow/www/templates/airflow/dag.html
@@ -310,8 +310,8 @@
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<input type="hidden" name="dag_id" value="{{ dag.dag_id }}">
<input type="hidden" name="task_id">
- <input type="hidden" name="execution_date">
<input type="hidden" name="origin" value="{{ request.base_url }}">
+ <input type="hidden" name="dag_run_id">
<div class="row">
<span class="btn-group col-xs-12 col-sm-9 task-instance-modal-column" data-toggle="buttons">
<label class="btn btn-default btn-sm" title="Also include past task instances when clearing this one">
diff --git a/airflow/www/utils.py b/airflow/www/utils.py
index b07cd0f44f..aa1db26db2 100644
--- a/airflow/www/utils.py
+++ b/airflow/www/utils.py
@@ -150,7 +150,6 @@ def encode_dag_run(dag_run: Optional[models.DagRun]) -> Optional[Dict[str, Any]]
'start_date': datetime_to_string(dag_run.start_date),
'end_date': datetime_to_string(dag_run.end_date),
'state': dag_run.state,
- 'execution_date': datetime_to_string(dag_run.execution_date),
'data_interval_start': datetime_to_string(dag_run.data_interval_start),
'data_interval_end': datetime_to_string(dag_run.data_interval_end),
'run_type': dag_run.run_type,
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 03cd5e6e02..af78539819 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -183,8 +183,12 @@ def get_safe_url(url):
def get_date_time_num_runs_dag_runs_form_data(www_request, session, dag):
"""Get Execution Data, Base Date & Number of runs from a Request"""
date_time = www_request.args.get('execution_date')
+ dag_run_id = www_request.args.get('dag_run_id')
if date_time:
date_time = timezone.parse(date_time)
+ elif dag_run_id:
+ dr = dag.get_dagrun(run_id=dag_run_id)
+ date_time = dr.execution_date
else:
date_time = dag.get_latest_execution_date(session=session) or timezone.utcnow()
@@ -1253,13 +1257,20 @@ class Airflow(AirflowBaseView):
task_id = request.args.get('task_id')
map_index = request.args.get('map_index', -1, type=int)
execution_date = request.args.get('execution_date')
- dttm = timezone.parse(execution_date)
- form = DateTimeForm(data={'execution_date': dttm})
- root = request.args.get('root', '')
+ dag_run_id = request.args.get('dag_run_id')
logging.info("Retrieving rendered templates.")
dag: DAG = current_app.dag_bag.get_dag(dag_id)
- dag_run = dag.get_dagrun(execution_date=dttm, session=session)
+ if dag_run_id:
+ dag_run = dag.get_dagrun(run_id=dag_run_id)
+ execution_date = dag_run.execution_date
+ dttm = dag_run.execution_date.isoformat()
+ else:
+ dttm = timezone.parse(execution_date)
+ dag_run = dag.get_dagrun(execution_date=dttm, session=session)
+
+ form = DateTimeForm(data={'execution_date': dttm})
+ root = request.args.get('root', '')
raw_task = dag.get_task(task_id).prepare_for_execution()
ti: TaskInstance
@@ -1354,15 +1365,23 @@ class Airflow(AirflowBaseView):
dag_id = request.args.get('dag_id')
task_id = request.args.get('task_id')
execution_date = request.args.get('execution_date')
- dttm = timezone.parse(execution_date)
+ dag_run_id = request.args.get('dag_run_id')
+ dag: DAG = current_app.dag_bag.get_dag(dag_id)
+
+ if dag_run_id:
+ dag_run = dag.get_dagrun(run_id=dag_run_id)
+ execution_date = dag_run.execution_date
+ dttm = dag_run.execution_date.isoformat()
+ else:
+ dttm = timezone.parse(execution_date)
+ dag_run = dag.get_dagrun(execution_date=dttm, session=session)
+
form = DateTimeForm(data={'execution_date': dttm})
root = request.args.get('root', '')
map_index = request.args.get('map_index', -1, type=int)
logging.info("Retrieving rendered templates.")
- dag: DAG = current_app.dag_bag.get_dag(dag_id)
task = dag.get_task(task_id)
- dag_run = dag.get_dagrun(execution_date=dttm, session=session)
ti = dag_run.get_task_instance(task_id=task.task_id, map_index=map_index, session=session)
pod_spec = None
@@ -1412,12 +1431,18 @@ class Airflow(AirflowBaseView):
dag_id = request.args.get('dag_id')
task_id = request.args.get('task_id')
execution_date = request.args.get('execution_date')
+ dag_run_id = request.args.get('dag_run_id')
map_index = request.args.get('map_index', -1, type=int)
try_number = request.args.get('try_number', type=int)
metadata = request.args.get('metadata')
metadata = json.loads(metadata)
response_format = request.args.get('format', 'json')
+ dag = current_app.dag_bag.get_dag(dag_id)
+ if dag_run_id:
+ dr = dag.get_dagrun(run_id=dag_run_id)
+ execution_date = dr.execution_date.isoformat()
+
# metadata may be null
if not metadata:
metadata = {}
@@ -1457,7 +1482,6 @@ class Airflow(AirflowBaseView):
)
try:
- dag = current_app.dag_bag.get_dag(dag_id)
if dag:
ti.task = dag.get_task(ti.task_id)
@@ -1495,7 +1519,16 @@ class Airflow(AirflowBaseView):
task_id = request.args.get('task_id')
map_index = request.args.get('map_index', -1, type=int)
execution_date = request.args.get('execution_date')
- dttm = timezone.parse(execution_date) if execution_date else None
+ dag_run_id = request.args.get('dag_run_id')
+ dag = current_app.dag_bag.get_dag(dag_id)
+
+ if dag_run_id:
+ dr = dag.get_dagrun(run_id=dag_run_id)
+ execution_date = dr.execution_date.isoformat()
+ dttm = dr.execution_date
+ else:
+ dttm = timezone.parse(execution_date) if execution_date else None
+
form = DateTimeForm(data={'execution_date': dttm})
dag_model = DagModel.get_dagmodel(dag_id)
@@ -1540,8 +1573,15 @@ class Airflow(AirflowBaseView):
def redirect_to_external_log(self, session=None):
"""Redirects to external log."""
dag_id = request.args.get('dag_id')
+ dag_run_id = request.args.get('dag_run_id')
task_id = request.args.get('task_id')
execution_date = request.args.get('execution_date')
+
+ if dag_run_id:
+ dag = current_app.dag_bag.get_dag(dag_id)
+ dr = dag.get_dagrun(run_id=dag_run_id)
+ execution_date = dr.execution_date.isoformat()
+
dttm = timezone.parse(execution_date)
map_index = request.args.get('map_index', -1, type=int)
try_number = request.args.get('try_number', 1)
@@ -1579,11 +1619,17 @@ class Airflow(AirflowBaseView):
dag_id = request.args.get('dag_id')
task_id = request.args.get('task_id')
execution_date = request.args.get('execution_date')
+ dag_run_id = request.args.get('dag_run_id')
+ dag = current_app.dag_bag.get_dag(dag_id)
+
+ if dag_run_id:
+ dr = dag.get_dagrun(run_id=dag_run_id)
+ execution_date = dr.execution_date.isoformat()
+
dttm = timezone.parse(execution_date)
map_index = request.args.get('map_index', -1, type=int)
form = DateTimeForm(data={'execution_date': dttm})
root = request.args.get('root', '')
- dag = current_app.dag_bag.get_dag(dag_id)
if not dag or task_id not in dag.task_ids:
flash(f"Task [{dag_id}.{task_id}] doesn't seem to exist at the moment", "error")
@@ -2019,9 +2065,8 @@ class Airflow(AirflowBaseView):
task_id = request.form.get('task_id')
origin = get_safe_url(request.form.get('origin'))
dag = current_app.dag_bag.get_dag(dag_id)
+ dag_run_id = request.form.get('dag_run_id')
- execution_date = request.form.get('execution_date')
- execution_date = timezone.parse(execution_date)
confirmed = request.form.get('confirmed') == "true"
upstream = request.form.get('upstream') == "true"
downstream = request.form.get('downstream') == "true"
@@ -2030,13 +2075,17 @@ class Airflow(AirflowBaseView):
recursive = request.form.get('recursive') == "true"
only_failed = request.form.get('only_failed') == "true"
+ dr = dag.get_dagrun(run_id=dag_run_id)
+ start_date = dr.logical_date
+ end_date = dr.logical_date
+ end_date = dr.logical_date if not future else None
+ start_date = dr.logical_date if not past else None
+
dag = dag.partial_subset(
task_ids_or_regex=fr"^{task_id}$",
include_downstream=downstream,
include_upstream=upstream,
)
- end_date = execution_date if not future else None
- start_date = execution_date if not past else None
return self._clear_dag_tis(
dag,
@@ -2741,8 +2790,13 @@ class Airflow(AirflowBaseView):
dt_nr_dr_data = get_date_time_num_runs_dag_runs_form_data(request, session, dag)
dt_nr_dr_data['arrange'] = arrange
dttm = dt_nr_dr_data['dttm']
- dag_run = dag.get_dagrun(execution_date=dttm)
- dag_run_id = dag_run.run_id if dag_run else None
+ dag_run_id = request.args.get('dag_run_id')
+ if dag_run_id:
+ dag_run = dag.get_dagrun(run_id=dag_run_id)
+ dttm = dag_run.execution_date
+ else:
+ dag_run = dag.get_dagrun(execution_date=dttm)
+ dag_run_id = dag_run.run_id if dag_run else None
class GraphForm(DateTimeWithNumRunsWithDagRunsForm):
"""Graph Form class."""
@@ -3313,10 +3367,17 @@ class Airflow(AirflowBaseView):
task_id = request.args.get('task_id')
map_index = request.args.get('map_index', -1, type=int)
execution_date = request.args.get('execution_date')
+ dag_run_id = request.args.get('dag_run_id')
link_name = request.args.get('link_name')
- dttm = timezone.parse(execution_date)
+
dag = current_app.dag_bag.get_dag(dag_id)
+ if dag_run_id:
+ dr = dag.get_dagrun(run_id=dag_run_id)
+ dttm = dr.execution_date.isoformat()
+ else:
+ dttm = timezone.parse(execution_date)
+
if not dag or task_id not in dag.task_ids:
response = jsonify(
{