You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bb...@apache.org on 2023/02/23 22:22:07 UTC
[airflow] branch main updated: Remove Run task action from UI (#29706)
This is an automated email from the ASF dual-hosted git repository.
bbovenzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 101d59c4b8 Remove Run task action from UI (#29706)
101d59c4b8 is described below
commit 101d59c4b88ab979d305b8d96f612c27c8a44aa8
Author: Brent Bovenzi <br...@astronomer.io>
AuthorDate: Thu Feb 23 17:21:59 2023 -0500
Remove Run task action from UI (#29706)
* Remove Run task action from UI
* remove /run endpoint
* remove tests for Airflow.run
* remove extra imports
---
airflow/www/static/js/api/index.ts | 2 -
airflow/www/static/js/api/useRunTask.ts | 75 ------------
.../dag/details/taskInstance/taskActions/Run.tsx | 97 ----------------
.../dag/details/taskInstance/taskActions/index.tsx | 7 --
airflow/www/templates/airflow/dag.html | 34 ------
airflow/www/views.py | 70 +----------
tests/www/views/test_views_acl.py | 13 ---
tests/www/views/test_views_tasks.py | 128 ---------------------
8 files changed, 1 insertion(+), 425 deletions(-)
diff --git a/airflow/www/static/js/api/index.ts b/airflow/www/static/js/api/index.ts
index 5df46587e0..d055f737d9 100644
--- a/airflow/www/static/js/api/index.ts
+++ b/airflow/www/static/js/api/index.ts
@@ -24,7 +24,6 @@ import useClearRun from './useClearRun';
import useQueueRun from './useQueueRun';
import useMarkFailedRun from './useMarkFailedRun';
import useMarkSuccessRun from './useMarkSuccessRun';
-import useRunTask from './useRunTask';
import useClearTask from './useClearTask';
import useMarkFailedTask from './useMarkFailedTask';
import useMarkSuccessTask from './useMarkSuccessTask';
@@ -63,7 +62,6 @@ export {
useMarkSuccessRun,
useMarkSuccessTask,
useQueueRun,
- useRunTask,
useSetDagRunNote,
useSetTaskInstanceNote,
useTaskInstance,
diff --git a/airflow/www/static/js/api/useRunTask.ts b/airflow/www/static/js/api/useRunTask.ts
deleted file mode 100644
index 98b3cc873c..0000000000
--- a/airflow/www/static/js/api/useRunTask.ts
+++ /dev/null
@@ -1,75 +0,0 @@
-/*!
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import axios from 'axios';
-import { useMutation, useQueryClient } from 'react-query';
-import URLSearchParamsWrapper from 'src/utils/URLSearchParamWrapper';
-import { getMetaValue } from '../utils';
-import { useAutoRefresh } from '../context/autorefresh';
-import useErrorToast from '../utils/useErrorToast';
-
-const csrfToken = getMetaValue('csrf_token');
-const runUrl = getMetaValue('run_url');
-
-export default function useRunTask(dagId: string, runId: string, taskId: string) {
- const queryClient = useQueryClient();
- const errorToast = useErrorToast();
- const { startRefresh } = useAutoRefresh();
- return useMutation(
- ['runTask', dagId, runId, taskId],
- async ({
- ignoreAllDeps,
- ignoreTaskState,
- ignoreTaskDeps,
- mapIndexes,
- }:{
- ignoreAllDeps: boolean,
- ignoreTaskState: boolean,
- ignoreTaskDeps: boolean,
- mapIndexes: number[],
- }) => Promise.all(
- (mapIndexes.length ? mapIndexes : [-1]).map((mi) => {
- const params = new URLSearchParamsWrapper({
- csrf_token: csrfToken,
- dag_id: dagId,
- dag_run_id: runId,
- task_id: taskId,
- ignore_all_deps: ignoreAllDeps,
- ignore_task_deps: ignoreTaskDeps,
- ignore_ti_state: ignoreTaskState,
- map_index: mi,
- }).toString();
-
- return axios.post(runUrl, params, {
- headers: {
- 'Content-Type': 'application/x-www-form-urlencoded',
- },
- });
- }),
- ),
- {
- onSuccess: () => {
- queryClient.invalidateQueries('gridData');
- queryClient.invalidateQueries(['mappedInstances', dagId, runId, taskId]);
- startRefresh();
- },
- onError: (error: Error) => errorToast({ error }),
- },
- );
-}
diff --git a/airflow/www/static/js/dag/details/taskInstance/taskActions/Run.tsx b/airflow/www/static/js/dag/details/taskInstance/taskActions/Run.tsx
deleted file mode 100644
index 62d08a77ef..0000000000
--- a/airflow/www/static/js/dag/details/taskInstance/taskActions/Run.tsx
+++ /dev/null
@@ -1,97 +0,0 @@
-/*!
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import React, { useState } from 'react';
-import {
- Button,
- Flex,
- ButtonGroup,
-} from '@chakra-ui/react';
-
-import { useRunTask } from 'src/api';
-import { getMetaValue } from 'src/utils';
-
-const canEdit = getMetaValue('can_edit') === 'True';
-
-interface Props {
- dagId: string;
- runId: string;
- taskId: string;
- mapIndexes: number[];
-}
-
-const Run = ({
- dagId,
- runId,
- taskId,
- mapIndexes,
-}: Props) => {
- const [ignoreAllDeps, setIgnoreAllDeps] = useState(false);
- const onToggleAllDeps = () => setIgnoreAllDeps(!ignoreAllDeps);
-
- const [ignoreTaskState, setIgnoreTaskState] = useState(false);
- const onToggleTaskState = () => setIgnoreTaskState(!ignoreTaskState);
-
- const [ignoreTaskDeps, setIgnoreTaskDeps] = useState(false);
- const onToggleTaskDeps = () => setIgnoreTaskDeps(!ignoreTaskDeps);
-
- const { mutate: onRun, isLoading } = useRunTask(dagId, runId, taskId);
-
- const onClick = () => {
- onRun({
- ignoreAllDeps,
- ignoreTaskState,
- ignoreTaskDeps,
- mapIndexes,
- });
- };
-
- return (
- <Flex justifyContent="space-between" width="100%">
- <ButtonGroup isAttached variant="outline" isDisabled={!canEdit}>
- <Button
- bg={ignoreAllDeps ? 'gray.100' : undefined}
- onClick={onToggleAllDeps}
- title="Ignores all non-critical dependencies, including task state and task_deps"
- >
- Ignore All Deps
- </Button>
- <Button
- bg={ignoreTaskState ? 'gray.100' : undefined}
- onClick={onToggleTaskState}
- title="Ignore previous success/failure"
- >
- Ignore Task State
- </Button>
- <Button
- bg={ignoreTaskDeps ? 'gray.100' : undefined}
- onClick={onToggleTaskDeps}
- title="Disregard the task-specific dependencies, e.g. status of upstream task instances and depends_on_past"
- >
- Ignore Task Deps
- </Button>
- </ButtonGroup>
- <Button colorScheme="blue" onClick={onClick} isLoading={isLoading} isDisabled={!canEdit}>
- Run
- </Button>
- </Flex>
- );
-};
-
-export default Run;
diff --git a/airflow/www/static/js/dag/details/taskInstance/taskActions/index.tsx b/airflow/www/static/js/dag/details/taskInstance/taskActions/index.tsx
index 72f3f238ab..55a1a2529d 100644
--- a/airflow/www/static/js/dag/details/taskInstance/taskActions/index.tsx
+++ b/airflow/www/static/js/dag/details/taskInstance/taskActions/index.tsx
@@ -27,7 +27,6 @@ import {
} from '@chakra-ui/react';
import type { CommonActionProps } from './types';
-import RunAction from './Run';
import ClearAction from './Clear';
import MarkFailedAction from './MarkFailed';
import MarkSuccessAction from './MarkSuccess';
@@ -54,12 +53,6 @@ const TaskActions = ({
/>
) : (
<VStack justifyContent="center" divider={<StackDivider my={3} />}>
- <RunAction
- runId={runId}
- taskId={taskId}
- dagId={dagId}
- mapIndexes={mapIndexes}
- />
<ClearAction
runId={runId}
taskId={taskId}
diff --git a/airflow/www/templates/airflow/dag.html b/airflow/www/templates/airflow/dag.html
index f1f029e516..2ed843bb78 100644
--- a/airflow/www/templates/airflow/dag.html
+++ b/airflow/www/templates/airflow/dag.html
@@ -56,7 +56,6 @@
<meta name="confirm_url" content="{{ url_for('Airflow.confirm') }}">
<meta name="grid_data_url" content="{{ url_for('Airflow.grid_data') }}">
<meta name="next_run_datasets_url" content="{{ url_for('Airflow.next_run_datasets', dag_id=dag.dag_id) }}">
- <meta name="run_url" content="{{ url_for('Airflow.run') }}">
<meta name="grid_url" content="{{ url_for('Airflow.grid', dag_id=dag.dag_id) }}">
<meta name="datasets_url" content="{{ url_for('Airflow.datasets') }}">
<meta name="grid_url_no_root" content="{{ url_for('Airflow.grid', dag_id=dag.dag_id, num_runs=num_runs_arg, base_date=base_date_arg) }}">
@@ -332,39 +331,6 @@
</div>
{% endif %}
<h4 id="task_actions">Task Actions</h4>
- <form method="POST" data-action="{{ url_for('Airflow.run') }}" id="run_action">
- <input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
- <input type="hidden" name="dag_id" value="{{ dag.dag_id }}">
- <input type="hidden" name="task_id">
- <input type="hidden" name="dag_run_id">
- <input type="hidden" name="map_index">
- <input type="hidden" name="origin" value="{{ request.base_url }}">
- <div class="row">
- <span class="btn-group col-xs-12 col-sm-9 task-instance-modal-column" data-toggle="buttons">
- <label
- class="btn btn-default"
- title="Ignores all non-critical dependencies, including task state and task_deps">
- <input type="checkbox" value="true" name="ignore_all_deps" autocomplete="off">
- Ignore All Deps</label>
- <label class="btn btn-default"
- title="Ignore previous success/failure">
- <input type="checkbox" value="true" name="ignore_ti_state" autocomplete="off">
- Ignore Task State
- </label>
- <label class="btn btn-default"
- title="Disregard the task-specific dependencies, e.g. status of upstream task instances and depends_on_past">
- <input type="checkbox" value="true" name="ignore_task_deps" autocomplete="off">
- Ignore Task Deps
- </label>
- </span>
- <span class="col-xs-12 col-sm-3 task-instance-modal-column">
- <button type="submit" id="btn_run" class="btn btn-primary btn-block" title="Runs a single task instance">
- Run
- </button>
- </span>
- </div>
- <hr style="margin-bottom: 8px;">
- </form>
<form method="POST" data-action="{{ url_for('Airflow.clear') }}" id="clear_action">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<input type="hidden" name="dag_id" value="{{ dag.dag_id }}">
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 5335d7bd37..6a6a9fb02b 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -89,7 +89,6 @@ from airflow.compat.functools import cached_property
from airflow.configuration import AIRFLOW_CONFIG, conf
from airflow.datasets import Dataset
from airflow.exceptions import AirflowException, ParamValidationError, RemovedInAirflow3Warning
-from airflow.executors.executor_loader import ExecutorLoader
from airflow.jobs.base_job import BaseJob
from airflow.jobs.scheduler_job import SchedulerJob
from airflow.jobs.triggerer_job import TriggererJob
@@ -106,7 +105,7 @@ from airflow.models.taskinstance import TaskInstance, TaskInstanceNote
from airflow.providers_manager import ProvidersManager
from airflow.security import permissions
from airflow.ti_deps.dep_context import DepContext
-from airflow.ti_deps.dependencies_deps import RUNNING_DEPS, SCHEDULER_QUEUED_DEPS
+from airflow.ti_deps.dependencies_deps import SCHEDULER_QUEUED_DEPS
from airflow.timetables._cron import CronMixin
from airflow.timetables.base import DataInterval, TimeRestriction
from airflow.utils import json as utils_json, timezone, yaml
@@ -1836,73 +1835,6 @@ class Airflow(AirflowBaseView):
title=title,
)
- @expose("/run", methods=["POST"])
- @auth.has_access(
- [
- (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
- (permissions.ACTION_CAN_CREATE, permissions.RESOURCE_TASK_INSTANCE),
- ]
- )
- @action_logging
- @provide_session
- def run(self, session=None):
- """Runs Task Instance."""
- dag_id = request.form.get("dag_id")
- task_id = request.form.get("task_id")
- dag_run_id = request.form.get("dag_run_id")
- map_index = request.args.get("map_index", -1, type=int)
- origin = get_safe_url(request.form.get("origin"))
- dag = get_airflow_app().dag_bag.get_dag(dag_id)
- if not dag:
- return redirect_or_json(origin, "DAG not found", "error", 404)
- task = dag.get_task(task_id)
-
- ignore_all_deps = request.form.get("ignore_all_deps") == "true"
- ignore_task_deps = request.form.get("ignore_task_deps") == "true"
- ignore_ti_state = request.form.get("ignore_ti_state") == "true"
-
- executor = ExecutorLoader.get_default_executor()
-
- if not executor.supports_ad_hoc_ti_run:
- msg = f"{executor.__class__.__name__} does not support ad hoc task runs"
- return redirect_or_json(origin, msg, "error", 400)
- dag_run = dag.get_dagrun(run_id=dag_run_id, session=session)
- if not dag_run:
- return redirect_or_json(origin, "DAG run not found", "error", 404)
- ti = dag_run.get_task_instance(task_id=task.task_id, map_index=map_index, session=session)
- if not ti:
- msg = "Could not queue task instance for execution, task instance is missing"
- return redirect_or_json(origin, msg, "error", 400)
-
- ti.refresh_from_task(task)
-
- # Make sure the task instance can be run
- dep_context = DepContext(
- deps=RUNNING_DEPS,
- ignore_all_deps=ignore_all_deps,
- ignore_task_deps=ignore_task_deps,
- ignore_ti_state=ignore_ti_state,
- )
- failed_deps = list(ti.get_failed_dep_statuses(dep_context=dep_context))
- if failed_deps:
- failed_deps_str = ", ".join(f"{dep.dep_name}: {dep.reason}" for dep in failed_deps)
- msg = f"Could not queue task instance for execution, dependencies not met: {failed_deps_str}"
- return redirect_or_json(origin, msg, "error", 400)
-
- executor.job_id = None
- executor.start()
- executor.queue_task_instance(
- ti,
- ignore_all_deps=ignore_all_deps,
- ignore_task_deps=ignore_task_deps,
- ignore_ti_state=ignore_ti_state,
- )
- executor.heartbeat()
- ti.queued_dttm = timezone.utcnow()
- session.merge(ti)
- msg = f"Sent {ti} to the message queue, it should start any moment now."
- return redirect_or_json(origin, msg)
-
@expose("/delete", methods=["POST"])
@auth.has_access(
[
diff --git a/tests/www/views/test_views_acl.py b/tests/www/views/test_views_acl.py
index 6b197f3bf8..d087c311e3 100644
--- a/tests/www/views/test_views_acl.py
+++ b/tests/www/views/test_views_acl.py
@@ -636,19 +636,6 @@ def test_failure(dag_faker_client, url, unexpected_content):
check_content_not_in_response(unexpected_content, resp)
-@pytest.mark.parametrize("client", ["dag_test_client", "all_dag_user_client"])
-def test_run_success(request, client):
- form = dict(
- task_id="runme_0",
- dag_id="example_bash_operator",
- ignore_all_deps="false",
- ignore_ti_state="true",
- execution_date=DEFAULT_DATE,
- )
- resp = request.getfixturevalue(client).post("run", data=form)
- assert resp.status_code == 302
-
-
def test_blocked_success(client_all_dags_dagruns):
resp = client_all_dags_dagruns.post("blocked", follow_redirects=True)
check_content_in_response("example_bash_operator", resp)
diff --git a/tests/www/views/test_views_tasks.py b/tests/www/views/test_views_tasks.py
index 5add70b261..6f7d70f19d 100644
--- a/tests/www/views/test_views_tasks.py
+++ b/tests/www/views/test_views_tasks.py
@@ -30,12 +30,10 @@ import time_machine
from airflow import settings
from airflow.exceptions import AirflowException
from airflow.executors.celery_executor import CeleryExecutor
-from airflow.executors.local_executor import LocalExecutor
from airflow.models import DAG, DagBag, DagModel, TaskFail, TaskInstance, TaskReschedule
from airflow.models.dagcode import DagCode
from airflow.operators.bash import BashOperator
from airflow.security import permissions
-from airflow.ti_deps.dependencies_states import QUEUEABLE_STATES, RUNNABLE_STATES
from airflow.utils import timezone
from airflow.utils.log.logging_mixin import ExternalLoggingMixin
from airflow.utils.session import create_session
@@ -485,24 +483,12 @@ def test_code_from_db_all_example_dags(admin_client):
),
"example_bash_operator",
),
- (
- "run",
- dict(
- task_id="runme_0",
- dag_id="example_bash_operator",
- ignore_all_deps="false",
- ignore_ti_state="true",
- dag_run_id=DEFAULT_DAGRUN,
- ),
- "",
- ),
],
ids=[
"paused",
"failed-flash-hint",
"success-flash-hint",
"clear",
- "run",
],
)
def test_views_post(admin_client, url, data, content):
@@ -533,120 +519,6 @@ class _ForceHeartbeatCeleryExecutor(CeleryExecutor):
return True
-@pytest.mark.parametrize("state", RUNNABLE_STATES)
-@unittest.mock.patch(
- "airflow.executors.executor_loader.ExecutorLoader.get_default_executor",
- return_value=_ForceHeartbeatCeleryExecutor(),
-)
-def test_run_with_runnable_states(_, admin_client, session, state):
- task_id = "runme_0"
- session.query(TaskInstance).filter(TaskInstance.task_id == task_id).update(
- {"state": state, "end_date": timezone.utcnow()}
- )
- session.commit()
-
- form = dict(
- task_id=task_id,
- dag_id="example_bash_operator",
- ignore_all_deps="false",
- ignore_ti_state="false",
- dag_run_id=DEFAULT_DAGRUN,
- origin="/home",
- )
- resp = admin_client.post("run", data=form, follow_redirects=True)
- check_content_in_response("", resp)
-
- msg = f"Task is in the '{state}' state."
- assert not re.search(msg, resp.get_data(as_text=True))
-
-
-@unittest.mock.patch(
- "airflow.executors.executor_loader.ExecutorLoader.get_default_executor",
- return_value=_ForceHeartbeatCeleryExecutor(),
-)
-def test_run_ignoring_deps_sets_queued_dttm(_, admin_client, session, time_machine):
- task_id = "runme_0"
- session.query(TaskInstance).filter(TaskInstance.task_id == task_id).update(
- {"state": State.SCHEDULED, "queued_dttm": None}
- )
- session.commit()
-
- assert session.query(TaskInstance.queued_dttm).filter(TaskInstance.task_id == task_id).all() == [(None,)]
-
- form = dict(
- task_id=task_id,
- dag_id="example_bash_operator",
- ignore_all_deps="true",
- dag_run_id=DEFAULT_DAGRUN,
- origin="/home",
- )
- now = timezone.utcnow()
-
- time_machine.move_to(now, tick=False)
- resp = admin_client.post("run", data=form, follow_redirects=True)
-
- assert resp.status_code == 200
- assert session.query(TaskInstance.queued_dttm).filter(TaskInstance.task_id == task_id).scalar() == now
-
-
-@pytest.mark.parametrize("state", QUEUEABLE_STATES)
-@unittest.mock.patch(
- "airflow.executors.executor_loader.ExecutorLoader.get_default_executor",
- return_value=CeleryExecutor(),
-)
-def test_run_with_not_runnable_states(_, admin_client, session, state):
- assert state not in RUNNABLE_STATES
-
- task_id = "runme_0"
- session.query(TaskInstance).filter(TaskInstance.task_id == task_id).update(
- {"state": state, "end_date": timezone.utcnow()}
- )
- session.commit()
-
- form = dict(
- task_id=task_id,
- dag_id="example_bash_operator",
- ignore_all_deps="false",
- ignore_ti_state="false",
- dag_run_id=DEFAULT_DAGRUN,
- origin="/home",
- )
- resp = admin_client.post("run", data=form, follow_redirects=True)
- check_content_in_response("", resp)
-
- msg = f"Task is in the '{state}' state."
- assert re.search(msg, resp.get_data(as_text=True))
-
-
-@pytest.mark.parametrize("state", QUEUEABLE_STATES)
-@unittest.mock.patch(
- "airflow.executors.executor_loader.ExecutorLoader.get_default_executor",
- return_value=LocalExecutor(),
-)
-def test_run_with_the_unsupported_executor(_, admin_client, session, state):
- assert state not in RUNNABLE_STATES
-
- task_id = "runme_0"
- session.query(TaskInstance).filter(TaskInstance.task_id == task_id).update(
- {"state": state, "end_date": timezone.utcnow()}
- )
- session.commit()
-
- form = dict(
- task_id=task_id,
- dag_id="example_bash_operator",
- ignore_all_deps="false",
- ignore_ti_state="false",
- dag_run_id=DEFAULT_DAGRUN,
- origin="/home",
- )
- resp = admin_client.post("run", data=form, follow_redirects=True)
- check_content_in_response("", resp)
-
- msg = "LocalExecutor does not support ad hoc task runs"
- assert re.search(msg, resp.get_data(as_text=True))
-
-
@pytest.fixture()
def new_id_example_bash_operator():
dag_id = "example_bash_operator"