You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bb...@apache.org on 2023/02/23 22:22:07 UTC

[airflow] branch main updated: Remove Run task action from UI (#29706)

This is an automated email from the ASF dual-hosted git repository.

bbovenzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 101d59c4b8 Remove Run task action from UI (#29706)
101d59c4b8 is described below

commit 101d59c4b88ab979d305b8d96f612c27c8a44aa8
Author: Brent Bovenzi <br...@astronomer.io>
AuthorDate: Thu Feb 23 17:21:59 2023 -0500

    Remove Run task action from UI (#29706)
    
    * Remove Run task action from UI
    
    * remove /run endpoint
    
    * remove tests for Airflow.run
    
    * remove extra imports
---
 airflow/www/static/js/api/index.ts                 |   2 -
 airflow/www/static/js/api/useRunTask.ts            |  75 ------------
 .../dag/details/taskInstance/taskActions/Run.tsx   |  97 ----------------
 .../dag/details/taskInstance/taskActions/index.tsx |   7 --
 airflow/www/templates/airflow/dag.html             |  34 ------
 airflow/www/views.py                               |  70 +----------
 tests/www/views/test_views_acl.py                  |  13 ---
 tests/www/views/test_views_tasks.py                | 128 ---------------------
 8 files changed, 1 insertion(+), 425 deletions(-)

diff --git a/airflow/www/static/js/api/index.ts b/airflow/www/static/js/api/index.ts
index 5df46587e0..d055f737d9 100644
--- a/airflow/www/static/js/api/index.ts
+++ b/airflow/www/static/js/api/index.ts
@@ -24,7 +24,6 @@ import useClearRun from './useClearRun';
 import useQueueRun from './useQueueRun';
 import useMarkFailedRun from './useMarkFailedRun';
 import useMarkSuccessRun from './useMarkSuccessRun';
-import useRunTask from './useRunTask';
 import useClearTask from './useClearTask';
 import useMarkFailedTask from './useMarkFailedTask';
 import useMarkSuccessTask from './useMarkSuccessTask';
@@ -63,7 +62,6 @@ export {
   useMarkSuccessRun,
   useMarkSuccessTask,
   useQueueRun,
-  useRunTask,
   useSetDagRunNote,
   useSetTaskInstanceNote,
   useTaskInstance,
diff --git a/airflow/www/static/js/api/useRunTask.ts b/airflow/www/static/js/api/useRunTask.ts
deleted file mode 100644
index 98b3cc873c..0000000000
--- a/airflow/www/static/js/api/useRunTask.ts
+++ /dev/null
@@ -1,75 +0,0 @@
-/*!
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import axios from 'axios';
-import { useMutation, useQueryClient } from 'react-query';
-import URLSearchParamsWrapper from 'src/utils/URLSearchParamWrapper';
-import { getMetaValue } from '../utils';
-import { useAutoRefresh } from '../context/autorefresh';
-import useErrorToast from '../utils/useErrorToast';
-
-const csrfToken = getMetaValue('csrf_token');
-const runUrl = getMetaValue('run_url');
-
-export default function useRunTask(dagId: string, runId: string, taskId: string) {
-  const queryClient = useQueryClient();
-  const errorToast = useErrorToast();
-  const { startRefresh } = useAutoRefresh();
-  return useMutation(
-    ['runTask', dagId, runId, taskId],
-    async ({
-      ignoreAllDeps,
-      ignoreTaskState,
-      ignoreTaskDeps,
-      mapIndexes,
-    }:{
-      ignoreAllDeps: boolean,
-      ignoreTaskState: boolean,
-      ignoreTaskDeps: boolean,
-      mapIndexes: number[],
-    }) => Promise.all(
-      (mapIndexes.length ? mapIndexes : [-1]).map((mi) => {
-        const params = new URLSearchParamsWrapper({
-          csrf_token: csrfToken,
-          dag_id: dagId,
-          dag_run_id: runId,
-          task_id: taskId,
-          ignore_all_deps: ignoreAllDeps,
-          ignore_task_deps: ignoreTaskDeps,
-          ignore_ti_state: ignoreTaskState,
-          map_index: mi,
-        }).toString();
-
-        return axios.post(runUrl, params, {
-          headers: {
-            'Content-Type': 'application/x-www-form-urlencoded',
-          },
-        });
-      }),
-    ),
-    {
-      onSuccess: () => {
-        queryClient.invalidateQueries('gridData');
-        queryClient.invalidateQueries(['mappedInstances', dagId, runId, taskId]);
-        startRefresh();
-      },
-      onError: (error: Error) => errorToast({ error }),
-    },
-  );
-}
diff --git a/airflow/www/static/js/dag/details/taskInstance/taskActions/Run.tsx b/airflow/www/static/js/dag/details/taskInstance/taskActions/Run.tsx
deleted file mode 100644
index 62d08a77ef..0000000000
--- a/airflow/www/static/js/dag/details/taskInstance/taskActions/Run.tsx
+++ /dev/null
@@ -1,97 +0,0 @@
-/*!
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import React, { useState } from 'react';
-import {
-  Button,
-  Flex,
-  ButtonGroup,
-} from '@chakra-ui/react';
-
-import { useRunTask } from 'src/api';
-import { getMetaValue } from 'src/utils';
-
-const canEdit = getMetaValue('can_edit') === 'True';
-
-interface Props {
-  dagId: string;
-  runId: string;
-  taskId: string;
-  mapIndexes: number[];
-}
-
-const Run = ({
-  dagId,
-  runId,
-  taskId,
-  mapIndexes,
-}: Props) => {
-  const [ignoreAllDeps, setIgnoreAllDeps] = useState(false);
-  const onToggleAllDeps = () => setIgnoreAllDeps(!ignoreAllDeps);
-
-  const [ignoreTaskState, setIgnoreTaskState] = useState(false);
-  const onToggleTaskState = () => setIgnoreTaskState(!ignoreTaskState);
-
-  const [ignoreTaskDeps, setIgnoreTaskDeps] = useState(false);
-  const onToggleTaskDeps = () => setIgnoreTaskDeps(!ignoreTaskDeps);
-
-  const { mutate: onRun, isLoading } = useRunTask(dagId, runId, taskId);
-
-  const onClick = () => {
-    onRun({
-      ignoreAllDeps,
-      ignoreTaskState,
-      ignoreTaskDeps,
-      mapIndexes,
-    });
-  };
-
-  return (
-    <Flex justifyContent="space-between" width="100%">
-      <ButtonGroup isAttached variant="outline" isDisabled={!canEdit}>
-        <Button
-          bg={ignoreAllDeps ? 'gray.100' : undefined}
-          onClick={onToggleAllDeps}
-          title="Ignores all non-critical dependencies, including task state and task_deps"
-        >
-          Ignore All Deps
-        </Button>
-        <Button
-          bg={ignoreTaskState ? 'gray.100' : undefined}
-          onClick={onToggleTaskState}
-          title="Ignore previous success/failure"
-        >
-          Ignore Task State
-        </Button>
-        <Button
-          bg={ignoreTaskDeps ? 'gray.100' : undefined}
-          onClick={onToggleTaskDeps}
-          title="Disregard the task-specific dependencies, e.g. status of upstream task instances and depends_on_past"
-        >
-          Ignore Task Deps
-        </Button>
-      </ButtonGroup>
-      <Button colorScheme="blue" onClick={onClick} isLoading={isLoading} isDisabled={!canEdit}>
-        Run
-      </Button>
-    </Flex>
-  );
-};
-
-export default Run;
diff --git a/airflow/www/static/js/dag/details/taskInstance/taskActions/index.tsx b/airflow/www/static/js/dag/details/taskInstance/taskActions/index.tsx
index 72f3f238ab..55a1a2529d 100644
--- a/airflow/www/static/js/dag/details/taskInstance/taskActions/index.tsx
+++ b/airflow/www/static/js/dag/details/taskInstance/taskActions/index.tsx
@@ -27,7 +27,6 @@ import {
 } from '@chakra-ui/react';
 
 import type { CommonActionProps } from './types';
-import RunAction from './Run';
 import ClearAction from './Clear';
 import MarkFailedAction from './MarkFailed';
 import MarkSuccessAction from './MarkSuccess';
@@ -54,12 +53,6 @@ const TaskActions = ({
       />
     ) : (
       <VStack justifyContent="center" divider={<StackDivider my={3} />}>
-        <RunAction
-          runId={runId}
-          taskId={taskId}
-          dagId={dagId}
-          mapIndexes={mapIndexes}
-        />
         <ClearAction
           runId={runId}
           taskId={taskId}
diff --git a/airflow/www/templates/airflow/dag.html b/airflow/www/templates/airflow/dag.html
index f1f029e516..2ed843bb78 100644
--- a/airflow/www/templates/airflow/dag.html
+++ b/airflow/www/templates/airflow/dag.html
@@ -56,7 +56,6 @@
   <meta name="confirm_url" content="{{ url_for('Airflow.confirm') }}">
   <meta name="grid_data_url" content="{{ url_for('Airflow.grid_data') }}">
   <meta name="next_run_datasets_url" content="{{ url_for('Airflow.next_run_datasets', dag_id=dag.dag_id) }}">
-  <meta name="run_url" content="{{ url_for('Airflow.run') }}">
   <meta name="grid_url" content="{{ url_for('Airflow.grid', dag_id=dag.dag_id) }}">
   <meta name="datasets_url" content="{{ url_for('Airflow.datasets') }}">
   <meta name="grid_url_no_root" content="{{ url_for('Airflow.grid', dag_id=dag.dag_id, num_runs=num_runs_arg, base_date=base_date_arg) }}">
@@ -332,39 +331,6 @@
             </div>
           {% endif %}
           <h4 id="task_actions">Task Actions</h4>
-          <form method="POST" data-action="{{ url_for('Airflow.run') }}" id="run_action">
-            <input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
-            <input type="hidden" name="dag_id" value="{{ dag.dag_id }}">
-            <input type="hidden" name="task_id">
-            <input type="hidden" name="dag_run_id">
-            <input type="hidden" name="map_index">
-            <input type="hidden" name="origin" value="{{ request.base_url }}">
-            <div class="row">
-              <span class="btn-group col-xs-12 col-sm-9 task-instance-modal-column" data-toggle="buttons">
-                <label
-                  class="btn btn-default"
-                  title="Ignores all non-critical dependencies, including task state and task_deps">
-                  <input type="checkbox" value="true" name="ignore_all_deps" autocomplete="off">
-                  Ignore All Deps</label>
-                <label class="btn btn-default"
-                  title="Ignore previous success/failure">
-                  <input type="checkbox" value="true" name="ignore_ti_state" autocomplete="off">
-                  Ignore Task State
-                </label>
-                <label class="btn btn-default"
-                  title="Disregard the task-specific dependencies, e.g. status of upstream task instances and depends_on_past">
-                  <input type="checkbox" value="true" name="ignore_task_deps" autocomplete="off">
-                  Ignore Task Deps
-                </label>
-              </span>
-              <span class="col-xs-12 col-sm-3 task-instance-modal-column">
-                <button type="submit" id="btn_run" class="btn btn-primary btn-block" title="Runs a single task instance">
-                  Run
-                </button>
-              </span>
-            </div>
-            <hr style="margin-bottom: 8px;">
-          </form>
           <form method="POST" data-action="{{ url_for('Airflow.clear') }}" id="clear_action">
             <input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
             <input type="hidden" name="dag_id" value="{{ dag.dag_id }}">
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 5335d7bd37..6a6a9fb02b 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -89,7 +89,6 @@ from airflow.compat.functools import cached_property
 from airflow.configuration import AIRFLOW_CONFIG, conf
 from airflow.datasets import Dataset
 from airflow.exceptions import AirflowException, ParamValidationError, RemovedInAirflow3Warning
-from airflow.executors.executor_loader import ExecutorLoader
 from airflow.jobs.base_job import BaseJob
 from airflow.jobs.scheduler_job import SchedulerJob
 from airflow.jobs.triggerer_job import TriggererJob
@@ -106,7 +105,7 @@ from airflow.models.taskinstance import TaskInstance, TaskInstanceNote
 from airflow.providers_manager import ProvidersManager
 from airflow.security import permissions
 from airflow.ti_deps.dep_context import DepContext
-from airflow.ti_deps.dependencies_deps import RUNNING_DEPS, SCHEDULER_QUEUED_DEPS
+from airflow.ti_deps.dependencies_deps import SCHEDULER_QUEUED_DEPS
 from airflow.timetables._cron import CronMixin
 from airflow.timetables.base import DataInterval, TimeRestriction
 from airflow.utils import json as utils_json, timezone, yaml
@@ -1836,73 +1835,6 @@ class Airflow(AirflowBaseView):
             title=title,
         )
 
-    @expose("/run", methods=["POST"])
-    @auth.has_access(
-        [
-            (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
-            (permissions.ACTION_CAN_CREATE, permissions.RESOURCE_TASK_INSTANCE),
-        ]
-    )
-    @action_logging
-    @provide_session
-    def run(self, session=None):
-        """Runs Task Instance."""
-        dag_id = request.form.get("dag_id")
-        task_id = request.form.get("task_id")
-        dag_run_id = request.form.get("dag_run_id")
-        map_index = request.args.get("map_index", -1, type=int)
-        origin = get_safe_url(request.form.get("origin"))
-        dag = get_airflow_app().dag_bag.get_dag(dag_id)
-        if not dag:
-            return redirect_or_json(origin, "DAG not found", "error", 404)
-        task = dag.get_task(task_id)
-
-        ignore_all_deps = request.form.get("ignore_all_deps") == "true"
-        ignore_task_deps = request.form.get("ignore_task_deps") == "true"
-        ignore_ti_state = request.form.get("ignore_ti_state") == "true"
-
-        executor = ExecutorLoader.get_default_executor()
-
-        if not executor.supports_ad_hoc_ti_run:
-            msg = f"{executor.__class__.__name__} does not support ad hoc task runs"
-            return redirect_or_json(origin, msg, "error", 400)
-        dag_run = dag.get_dagrun(run_id=dag_run_id, session=session)
-        if not dag_run:
-            return redirect_or_json(origin, "DAG run not found", "error", 404)
-        ti = dag_run.get_task_instance(task_id=task.task_id, map_index=map_index, session=session)
-        if not ti:
-            msg = "Could not queue task instance for execution, task instance is missing"
-            return redirect_or_json(origin, msg, "error", 400)
-
-        ti.refresh_from_task(task)
-
-        # Make sure the task instance can be run
-        dep_context = DepContext(
-            deps=RUNNING_DEPS,
-            ignore_all_deps=ignore_all_deps,
-            ignore_task_deps=ignore_task_deps,
-            ignore_ti_state=ignore_ti_state,
-        )
-        failed_deps = list(ti.get_failed_dep_statuses(dep_context=dep_context))
-        if failed_deps:
-            failed_deps_str = ", ".join(f"{dep.dep_name}: {dep.reason}" for dep in failed_deps)
-            msg = f"Could not queue task instance for execution, dependencies not met: {failed_deps_str}"
-            return redirect_or_json(origin, msg, "error", 400)
-
-        executor.job_id = None
-        executor.start()
-        executor.queue_task_instance(
-            ti,
-            ignore_all_deps=ignore_all_deps,
-            ignore_task_deps=ignore_task_deps,
-            ignore_ti_state=ignore_ti_state,
-        )
-        executor.heartbeat()
-        ti.queued_dttm = timezone.utcnow()
-        session.merge(ti)
-        msg = f"Sent {ti} to the message queue, it should start any moment now."
-        return redirect_or_json(origin, msg)
-
     @expose("/delete", methods=["POST"])
     @auth.has_access(
         [
diff --git a/tests/www/views/test_views_acl.py b/tests/www/views/test_views_acl.py
index 6b197f3bf8..d087c311e3 100644
--- a/tests/www/views/test_views_acl.py
+++ b/tests/www/views/test_views_acl.py
@@ -636,19 +636,6 @@ def test_failure(dag_faker_client, url, unexpected_content):
     check_content_not_in_response(unexpected_content, resp)
 
 
-@pytest.mark.parametrize("client", ["dag_test_client", "all_dag_user_client"])
-def test_run_success(request, client):
-    form = dict(
-        task_id="runme_0",
-        dag_id="example_bash_operator",
-        ignore_all_deps="false",
-        ignore_ti_state="true",
-        execution_date=DEFAULT_DATE,
-    )
-    resp = request.getfixturevalue(client).post("run", data=form)
-    assert resp.status_code == 302
-
-
 def test_blocked_success(client_all_dags_dagruns):
     resp = client_all_dags_dagruns.post("blocked", follow_redirects=True)
     check_content_in_response("example_bash_operator", resp)
diff --git a/tests/www/views/test_views_tasks.py b/tests/www/views/test_views_tasks.py
index 5add70b261..6f7d70f19d 100644
--- a/tests/www/views/test_views_tasks.py
+++ b/tests/www/views/test_views_tasks.py
@@ -30,12 +30,10 @@ import time_machine
 from airflow import settings
 from airflow.exceptions import AirflowException
 from airflow.executors.celery_executor import CeleryExecutor
-from airflow.executors.local_executor import LocalExecutor
 from airflow.models import DAG, DagBag, DagModel, TaskFail, TaskInstance, TaskReschedule
 from airflow.models.dagcode import DagCode
 from airflow.operators.bash import BashOperator
 from airflow.security import permissions
-from airflow.ti_deps.dependencies_states import QUEUEABLE_STATES, RUNNABLE_STATES
 from airflow.utils import timezone
 from airflow.utils.log.logging_mixin import ExternalLoggingMixin
 from airflow.utils.session import create_session
@@ -485,24 +483,12 @@ def test_code_from_db_all_example_dags(admin_client):
             ),
             "example_bash_operator",
         ),
-        (
-            "run",
-            dict(
-                task_id="runme_0",
-                dag_id="example_bash_operator",
-                ignore_all_deps="false",
-                ignore_ti_state="true",
-                dag_run_id=DEFAULT_DAGRUN,
-            ),
-            "",
-        ),
     ],
     ids=[
         "paused",
         "failed-flash-hint",
         "success-flash-hint",
         "clear",
-        "run",
     ],
 )
 def test_views_post(admin_client, url, data, content):
@@ -533,120 +519,6 @@ class _ForceHeartbeatCeleryExecutor(CeleryExecutor):
         return True
 
 
-@pytest.mark.parametrize("state", RUNNABLE_STATES)
-@unittest.mock.patch(
-    "airflow.executors.executor_loader.ExecutorLoader.get_default_executor",
-    return_value=_ForceHeartbeatCeleryExecutor(),
-)
-def test_run_with_runnable_states(_, admin_client, session, state):
-    task_id = "runme_0"
-    session.query(TaskInstance).filter(TaskInstance.task_id == task_id).update(
-        {"state": state, "end_date": timezone.utcnow()}
-    )
-    session.commit()
-
-    form = dict(
-        task_id=task_id,
-        dag_id="example_bash_operator",
-        ignore_all_deps="false",
-        ignore_ti_state="false",
-        dag_run_id=DEFAULT_DAGRUN,
-        origin="/home",
-    )
-    resp = admin_client.post("run", data=form, follow_redirects=True)
-    check_content_in_response("", resp)
-
-    msg = f"Task is in the &#39;{state}&#39 state."
-    assert not re.search(msg, resp.get_data(as_text=True))
-
-
-@unittest.mock.patch(
-    "airflow.executors.executor_loader.ExecutorLoader.get_default_executor",
-    return_value=_ForceHeartbeatCeleryExecutor(),
-)
-def test_run_ignoring_deps_sets_queued_dttm(_, admin_client, session, time_machine):
-    task_id = "runme_0"
-    session.query(TaskInstance).filter(TaskInstance.task_id == task_id).update(
-        {"state": State.SCHEDULED, "queued_dttm": None}
-    )
-    session.commit()
-
-    assert session.query(TaskInstance.queued_dttm).filter(TaskInstance.task_id == task_id).all() == [(None,)]
-
-    form = dict(
-        task_id=task_id,
-        dag_id="example_bash_operator",
-        ignore_all_deps="true",
-        dag_run_id=DEFAULT_DAGRUN,
-        origin="/home",
-    )
-    now = timezone.utcnow()
-
-    time_machine.move_to(now, tick=False)
-    resp = admin_client.post("run", data=form, follow_redirects=True)
-
-    assert resp.status_code == 200
-    assert session.query(TaskInstance.queued_dttm).filter(TaskInstance.task_id == task_id).scalar() == now
-
-
-@pytest.mark.parametrize("state", QUEUEABLE_STATES)
-@unittest.mock.patch(
-    "airflow.executors.executor_loader.ExecutorLoader.get_default_executor",
-    return_value=CeleryExecutor(),
-)
-def test_run_with_not_runnable_states(_, admin_client, session, state):
-    assert state not in RUNNABLE_STATES
-
-    task_id = "runme_0"
-    session.query(TaskInstance).filter(TaskInstance.task_id == task_id).update(
-        {"state": state, "end_date": timezone.utcnow()}
-    )
-    session.commit()
-
-    form = dict(
-        task_id=task_id,
-        dag_id="example_bash_operator",
-        ignore_all_deps="false",
-        ignore_ti_state="false",
-        dag_run_id=DEFAULT_DAGRUN,
-        origin="/home",
-    )
-    resp = admin_client.post("run", data=form, follow_redirects=True)
-    check_content_in_response("", resp)
-
-    msg = f"Task is in the &#39;{state}&#39; state."
-    assert re.search(msg, resp.get_data(as_text=True))
-
-
-@pytest.mark.parametrize("state", QUEUEABLE_STATES)
-@unittest.mock.patch(
-    "airflow.executors.executor_loader.ExecutorLoader.get_default_executor",
-    return_value=LocalExecutor(),
-)
-def test_run_with_the_unsupported_executor(_, admin_client, session, state):
-    assert state not in RUNNABLE_STATES
-
-    task_id = "runme_0"
-    session.query(TaskInstance).filter(TaskInstance.task_id == task_id).update(
-        {"state": state, "end_date": timezone.utcnow()}
-    )
-    session.commit()
-
-    form = dict(
-        task_id=task_id,
-        dag_id="example_bash_operator",
-        ignore_all_deps="false",
-        ignore_ti_state="false",
-        dag_run_id=DEFAULT_DAGRUN,
-        origin="/home",
-    )
-    resp = admin_client.post("run", data=form, follow_redirects=True)
-    check_content_in_response("", resp)
-
-    msg = "LocalExecutor does not support ad hoc task runs"
-    assert re.search(msg, resp.get_data(as_text=True))
-
-
 @pytest.fixture()
 def new_id_example_bash_operator():
     dag_id = "example_bash_operator"