You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2021/09/21 21:04:57 UTC

[airflow] branch main updated: Require can_edit on DAG privileges to modify TaskInstances and DagRuns (#16634)

This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new f74d0ab  Require can_edit on DAG privileges to modify TaskInstances and DagRuns (#16634)
f74d0ab is described below

commit f74d0ab38e00b83b49e20d410736e2a8c42de095
Author: Jorrick Sleijster <jo...@gmail.com>
AuthorDate: Tue Sep 21 23:04:38 2021 +0200

    Require can_edit on DAG privileges to modify TaskInstances and DagRuns (#16634)
---
 INTHEWILD.md                                       |   1 +
 .../api_connexion/endpoints/dag_run_endpoint.py    |   2 +-
 .../endpoints/task_instance_endpoint.py            |   4 +-
 airflow/www/views.py                               | 134 +++++++++++++++------
 .../endpoints/test_dag_run_endpoint.py             |  23 +++-
 .../endpoints/test_task_instance_endpoint.py       |  58 +++++++--
 tests/www/views/test_views_acl.py                  |   2 +-
 tests/www/views/test_views_dagrun.py               |  77 +++++++++++-
 tests/www/views/test_views_decorators.py           |  56 ++++++++-
 tests/www/views/test_views_tasks.py                |  83 ++++++++++++-
 10 files changed, 386 insertions(+), 54 deletions(-)

diff --git a/INTHEWILD.md b/INTHEWILD.md
index c1a5b32..588db42 100644
--- a/INTHEWILD.md
+++ b/INTHEWILD.md
@@ -33,6 +33,7 @@ Currently, **officially** using Airflow:
 1. [Accenture](https://www.accenture.com/au-en) [[@nijanthanvijayakumar](https://github.com/nijanthanvijayakumar)]
 1. [AdBOOST](https://www.adboost.sk) [[AdBOOST](https://github.com/AdBOOST)]
 1. [Adobe](https://www.adobe.com/) [[@mishikaSingh](https://github.com/mishikaSingh), [@ramandumcs](https://github.com/ramandumcs), [@vardancse](https://github.com/vardancse)]
+1. [Adyen](https://www.adyen.com/) [[@jorricks](https://github.com/jorricks)]
 1. [Agari](https://github.com/agaridata) [[@r39132](https://github.com/r39132)]
 1. [Agoda](https://agoda.com) [[@akki](https://github.com/akki)]
 1. [Airbnb](https://airbnb.io/) [[@mistercrunch](https://github.com/mistercrunch), [@artwr](https://github.com/artwr)]
diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py b/airflow/api_connexion/endpoints/dag_run_endpoint.py
index 29e0efc..9327804 100644
--- a/airflow/api_connexion/endpoints/dag_run_endpoint.py
+++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py
@@ -40,7 +40,7 @@ from airflow.utils.types import DagRunType
 
 @security.requires_access(
     [
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
         (permissions.ACTION_CAN_DELETE, permissions.RESOURCE_DAG_RUN),
     ]
 )
diff --git a/airflow/api_connexion/endpoints/task_instance_endpoint.py b/airflow/api_connexion/endpoints/task_instance_endpoint.py
index 361d29e..cfa27ac 100644
--- a/airflow/api_connexion/endpoints/task_instance_endpoint.py
+++ b/airflow/api_connexion/endpoints/task_instance_endpoint.py
@@ -227,7 +227,7 @@ def get_task_instances_batch(session=None):
 
 @security.requires_access(
     [
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
         (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
         (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
     ]
@@ -261,7 +261,7 @@ def post_clear_task_instances(dag_id: str, session=None):
 
 @security.requires_access(
     [
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
         (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
         (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
     ]
diff --git a/airflow/www/views.py b/airflow/www/views.py
index ee09908..b2c8712 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -27,9 +27,10 @@ import sys
 import traceback
 from collections import defaultdict
 from datetime import timedelta
+from functools import wraps
 from json import JSONDecodeError
 from operator import itemgetter
-from typing import Any, Iterable, List, Optional, Tuple
+from typing import Any, Callable, Iterable, List, Optional, Set, Tuple, Union
 from urllib.parse import parse_qsl, unquote, urlencode, urlparse
 
 import lazy_object_proxy
@@ -1515,7 +1516,7 @@ class Airflow(AirflowBaseView):
     @expose('/run', methods=['POST'])
     @auth.has_access(
         [
-            (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+            (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
             (permissions.ACTION_CAN_CREATE, permissions.RESOURCE_TASK_INSTANCE),
         ]
     )
@@ -1793,7 +1794,7 @@ class Airflow(AirflowBaseView):
     @expose('/clear', methods=['POST'])
     @auth.has_access(
         [
-            (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+            (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
             (permissions.ACTION_CAN_DELETE, permissions.RESOURCE_TASK_INSTANCE),
         ]
     )
@@ -1837,7 +1838,7 @@ class Airflow(AirflowBaseView):
     @expose('/dagrun_clear', methods=['POST'])
     @auth.has_access(
         [
-            (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+            (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
             (permissions.ACTION_CAN_DELETE, permissions.RESOURCE_TASK_INSTANCE),
         ]
     )
@@ -1859,7 +1860,7 @@ class Airflow(AirflowBaseView):
     @expose('/blocked', methods=['POST'])
     @auth.has_access(
         [
-            (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+            (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
             (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
         ]
     )
@@ -1966,7 +1967,7 @@ class Airflow(AirflowBaseView):
     @expose('/dagrun_failed', methods=['POST'])
     @auth.has_access(
         [
-            (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+            (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
             (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG_RUN),
         ]
     )
@@ -1982,7 +1983,7 @@ class Airflow(AirflowBaseView):
     @expose('/dagrun_success', methods=['POST'])
     @auth.has_access(
         [
-            (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+            (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
             (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG_RUN),
         ]
     )
@@ -2026,7 +2027,7 @@ class Airflow(AirflowBaseView):
     @expose('/confirm', methods=['GET'])
     @auth.has_access(
         [
-            (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+            (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
             (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
         ]
     )
@@ -2099,7 +2100,7 @@ class Airflow(AirflowBaseView):
     @expose('/failed', methods=['POST'])
     @auth.has_access(
         [
-            (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+            (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
             (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
         ]
     )
@@ -2132,7 +2133,7 @@ class Airflow(AirflowBaseView):
     @expose('/success', methods=['POST'])
     @auth.has_access(
         [
-            (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+            (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
             (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
         ]
     )
@@ -3140,6 +3141,14 @@ class DagFilter(BaseFilter):
         return query.filter(self.model.dag_id.in_(filter_dag_ids))
 
 
+class DagEditFilter(BaseFilter):
+    """Filter using DagIDs"""
+
+    def apply(self, query, func):  # pylint: disable=redefined-outer-name,unused-argument
+        filter_dag_ids = current_app.appbuilder.sm.get_editable_dag_ids(g.user)
+        return query.filter(self.model.dag_id.in_(filter_dag_ids))
+
+
 class AirflowModelView(ModelView):
     """Airflow Mode View."""
 
@@ -3149,6 +3158,71 @@ class AirflowModelView(ModelView):
     CustomSQLAInterface = wwwutils.CustomSQLAInterface
 
 
+class AirflowPrivilegeVerifierModelView(AirflowModelView):
+    """
+    This ModelView prevents ability to pass primary keys of objects relating to DAGs you shouldn't be able to
+    edit. This only holds for the add, update and delete operations.
+    You will still need to use the `action_has_dag_edit_access()` for actions.
+    """
+
+    @staticmethod
+    def validate_dag_edit_access(item: Union[DagRun, TaskInstance]):
+        """Validates whether the user has 'can_edit' access for this specific DAG."""
+        if not current_app.appbuilder.sm.can_edit_dag(item.dag_id):
+            raise AirflowException(f"Access denied for dag_id {item.dag_id}")
+
+    def pre_add(self, item: Union[DagRun, TaskInstance]):
+        self.validate_dag_edit_access(item)
+
+    def pre_update(self, item: Union[DagRun, TaskInstance]):
+        self.validate_dag_edit_access(item)
+
+    def pre_delete(self, item: Union[DagRun, TaskInstance]):
+        self.validate_dag_edit_access(item)
+
+    def post_add_redirect(self):  # Required to prevent redirect loop
+        return redirect(self.get_default_url())
+
+    def post_edit_redirect(self):  # Required to prevent redirect loop
+        return redirect(self.get_default_url())
+
+    def post_delete_redirect(self):  # Required to prevent redirect loop
+        return redirect(self.get_default_url())
+
+
+def action_has_dag_edit_access(action_func: Callable) -> Callable:
+    """Decorator for actions which verifies you have DAG edit access on the given tis/drs."""
+
+    @wraps(action_func)
+    def check_dag_edit_acl_for_actions(
+        self,
+        items: Optional[Union[List[TaskInstance], List[DagRun], TaskInstance, DagRun]],
+        *args,
+        **kwargs,
+    ) -> None:
+        if items is None:
+            dag_ids: Set[str] = set()
+        elif isinstance(items, list):
+            dag_ids = {item.dag_id for item in items if item is not None}
+        elif isinstance(items, TaskInstance) or isinstance(items, DagRun):
+            dag_ids = {items.dag_id}
+        else:
+            raise ValueError(
+                "Was expecting the first argument of the action to be of type "
+                "Optional[Union[List[TaskInstance], List[DagRun], TaskInstance, DagRun]]."
+                f"Was of type: {type(items)}"
+            )
+
+        for dag_id in dag_ids:
+            if not current_app.appbuilder.sm.can_edit_dag(dag_id):
+                flash(f"Access denied for dag_id {dag_id}", "danger")
+                logging.warning("User %s tried to modify %s without having access.", g.user.username, dag_id)
+                return redirect(self.get_default_url())
+        return action_func(self, items, *args, **kwargs)
+
+    return check_dag_edit_acl_for_actions
+
+
 class SlaMissModelView(AirflowModelView):
     """View to show SlaMiss table"""
 
@@ -3812,7 +3886,7 @@ class JobModelView(AirflowModelView):
     }
 
 
-class DagRunModelView(AirflowModelView):
+class DagRunModelView(AirflowPrivilegeVerifierModelView):
     """View to show records from DagRun table"""
 
     route_base = '/dagrun'
@@ -3861,7 +3935,7 @@ class DagRunModelView(AirflowModelView):
 
     base_order = ('execution_date', 'desc')
 
-    base_filters = [['dag_id', DagFilter, lambda: []]]
+    base_filters = [['dag_id', DagEditFilter, lambda: []]]
 
     edit_form = DagRunEditForm
 
@@ -3876,6 +3950,7 @@ class DagRunModelView(AirflowModelView):
     }
 
     @action('muldelete', "Delete", "Are you sure you want to delete selected records?", single=False)
+    @action_has_dag_edit_access
     @provide_session
     def action_muldelete(self, items, session=None):
         """Multiple delete."""
@@ -3884,6 +3959,7 @@ class DagRunModelView(AirflowModelView):
         return redirect(self.get_redirect())
 
     @action('set_running', "Set state to 'running'", '', single=False)
+    @action_has_dag_edit_access
     @provide_session
     def action_set_running(self, drs, session=None):
         """Set state to running."""
@@ -3906,6 +3982,7 @@ class DagRunModelView(AirflowModelView):
         "All running task instances would also be marked as failed, are you sure?",
         single=False,
     )
+    @action_has_dag_edit_access
     @provide_session
     def action_set_failed(self, drs, session=None):
         """Set state to failed."""
@@ -3932,6 +4009,7 @@ class DagRunModelView(AirflowModelView):
         "All task instances would also be marked as success, are you sure?",
         single=False,
     )
+    @action_has_dag_edit_access
     @provide_session
     def action_set_success(self, drs, session=None):
         """Set state to success."""
@@ -3953,6 +4031,7 @@ class DagRunModelView(AirflowModelView):
         return redirect(self.get_default_url())
 
     @action('clear', "Clear the state", "All task instances would be cleared, are you sure?", single=False)
+    @action_has_dag_edit_access
     @provide_session
     def action_clear(self, drs, session=None):
         """Clears the state."""
@@ -4114,7 +4193,7 @@ class TriggerModelView(AirflowModelView):
     }
 
 
-class TaskInstanceModelView(AirflowModelView):
+class TaskInstanceModelView(AirflowPrivilegeVerifierModelView):
     """View to show records from TaskInstance table"""
 
     route_base = '/taskinstance'
@@ -4198,7 +4277,7 @@ class TaskInstanceModelView(AirflowModelView):
 
     base_order = ('job_id', 'asc')
 
-    base_filters = [['dag_id', DagFilter, lambda: []]]
+    base_filters = [['dag_id', DagEditFilter, lambda: []]]
 
     def log_url_formatter(self):
         """Formats log URL."""
@@ -4229,7 +4308,6 @@ class TaskInstanceModelView(AirflowModelView):
         'duration': duration_f,
     }
 
-    @provide_session
     @action(
         'clear',
         lazy_gettext('Clear'),
@@ -4239,6 +4317,8 @@ class TaskInstanceModelView(AirflowModelView):
         ),
         single=False,
     )
+    @action_has_dag_edit_access
+    @provide_session
     def action_clear(self, task_instances, session=None):
         """Clears the action."""
         try:
@@ -4271,11 +4351,7 @@ class TaskInstanceModelView(AirflowModelView):
             flash('Failed to set state', 'error')
 
     @action('set_running', "Set state to 'running'", '', single=False)
-    @auth.has_access(
-        [
-            (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
-        ]
-    )
+    @action_has_dag_edit_access
     def action_set_running(self, tis):
         """Set state to 'running'"""
         self.set_task_instance_state(tis, State.RUNNING)
@@ -4283,11 +4359,7 @@ class TaskInstanceModelView(AirflowModelView):
         return redirect(self.get_redirect())
 
     @action('set_failed', "Set state to 'failed'", '', single=False)
-    @auth.has_access(
-        [
-            (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
-        ]
-    )
+    @action_has_dag_edit_access
     def action_set_failed(self, tis):
         """Set state to 'failed'"""
         self.set_task_instance_state(tis, State.FAILED)
@@ -4295,11 +4367,7 @@ class TaskInstanceModelView(AirflowModelView):
         return redirect(self.get_redirect())
 
     @action('set_success', "Set state to 'success'", '', single=False)
-    @auth.has_access(
-        [
-            (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
-        ]
-    )
+    @action_has_dag_edit_access
     def action_set_success(self, tis):
         """Set state to 'success'"""
         self.set_task_instance_state(tis, State.SUCCESS)
@@ -4307,11 +4375,7 @@ class TaskInstanceModelView(AirflowModelView):
         return redirect(self.get_redirect())
 
     @action('set_retry', "Set state to 'up_for_retry'", '', single=False)
-    @auth.has_access(
-        [
-            (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
-        ]
-    )
+    @action_has_dag_edit_access
     def action_set_retry(self, tis):
         """Set state to 'up_for_retry'"""
         self.set_task_instance_state(tis, State.UP_FOR_RETRY)
diff --git a/tests/api_connexion/endpoints/test_dag_run_endpoint.py b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
index 47d89cb..69c06b7 100644
--- a/tests/api_connexion/endpoints/test_dag_run_endpoint.py
+++ b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
@@ -28,7 +28,7 @@ from airflow.security import permissions
 from airflow.utils import timezone
 from airflow.utils.session import create_session, provide_session
 from airflow.utils.types import DagRunType
-from tests.test_utils.api_connexion_utils import assert_401, create_user, delete_user
+from tests.test_utils.api_connexion_utils import assert_401, create_user, delete_roles, delete_user
 from tests.test_utils.config import conf_vars
 from tests.test_utils.db import clear_db_dags, clear_db_runs
 
@@ -52,6 +52,18 @@ def configured_app(minimal_app_for_api):
     )
     create_user(
         app,  # type: ignore
+        username="test_dag_view_only",
+        role_name="TestViewDags",
+        permissions=[
+            (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+            (permissions.ACTION_CAN_CREATE, permissions.RESOURCE_DAG_RUN),
+            (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
+            (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG_RUN),
+            (permissions.ACTION_CAN_DELETE, permissions.RESOURCE_DAG_RUN),
+        ],
+    )
+    create_user(
+        app,  # type: ignore
         username="test_view_dags",
         role_name="TestViewDags",
         permissions=[
@@ -74,9 +86,11 @@ def configured_app(minimal_app_for_api):
     yield app
 
     delete_user(app, username="test")  # type: ignore
+    delete_user(app, username="test_dag_view_only")  # type: ignore
     delete_user(app, username="test_view_dags")  # type: ignore
     delete_user(app, username="test_granular_permissions")  # type: ignore
     delete_user(app, username="test_no_permissions")  # type: ignore
+    delete_roles(app)
 
 
 class TestDagRunEndpoint:
@@ -1145,7 +1159,10 @@ class TestPostDagRun(TestDagRunEndpoint):
 
         assert_401(response)
 
-    def test_should_raises_403_unauthorized(self):
+    @parameterized.expand(
+        ["test_dag_view_only", "test_view_dags", "test_granular_permissions", "test_no_permissions"]
+    )
+    def test_should_raises_403_unauthorized(self, username):
         self._create_dag("TEST_DAG_ID")
         response = self.client.post(
             "api/v1/dags/TEST_DAG_ID/dagRuns",
@@ -1153,7 +1170,7 @@ class TestPostDagRun(TestDagRunEndpoint):
                 "dag_run_id": "TEST_DAG_RUN_ID_1",
                 "execution_date": self.default_time,
             },
-            environ_overrides={'REMOTE_USER': "test_view_dags"},
+            environ_overrides={'REMOTE_USER': username},
         )
         assert response.status_code == 403
 
diff --git a/tests/api_connexion/endpoints/test_task_instance_endpoint.py b/tests/api_connexion/endpoints/test_task_instance_endpoint.py
index 7696b76..8ed4cda 100644
--- a/tests/api_connexion/endpoints/test_task_instance_endpoint.py
+++ b/tests/api_connexion/endpoints/test_task_instance_endpoint.py
@@ -28,7 +28,7 @@ from airflow.utils.session import provide_session
 from airflow.utils.state import State
 from airflow.utils.timezone import datetime
 from airflow.utils.types import DagRunType
-from tests.test_utils.api_connexion_utils import assert_401, create_user, delete_user
+from tests.test_utils.api_connexion_utils import assert_401, create_user, delete_roles, delete_user
 from tests.test_utils.db import clear_db_runs, clear_db_sla_miss
 
 DEFAULT_DATETIME_1 = datetime(2020, 1, 1)
@@ -45,16 +45,43 @@ def configured_app(minimal_app_for_api):
         role_name="Test",
         permissions=[
             (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+            (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
             (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
             (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
             (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
         ],
     )
+    create_user(
+        app,  # type: ignore
+        username="test_dag_read_only",
+        role_name="TestDagReadOnly",
+        permissions=[
+            (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+            (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
+            (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
+            (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
+        ],
+    )
+    create_user(
+        app,  # type: ignore
+        username="test_task_read_only",
+        role_name="TestTaskReadOnly",
+        permissions=[
+            (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+            (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
+            (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
+            (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
+        ],
+    )
     create_user(app, username="test_no_permissions", role_name="TestNoPermissions")  # type: ignore
 
     yield app
 
     delete_user(app, username="test")  # type: ignore
+    delete_user(app, username="test_dag_read_only")  # type: ignore
+    delete_user(app, username="test_task_read_only")  # type: ignore
+    delete_user(app, username="test_no_permissions")  # type: ignore
+    delete_roles(app)
 
 
 class TestTaskInstanceEndpoint:
@@ -133,7 +160,9 @@ class TestTaskInstanceEndpoint:
 
 
 class TestGetTaskInstance(TestTaskInstanceEndpoint):
-    def test_should_respond_200(self, session):
+    @parameterized.expand(["test", "test_dag_read_only", "test_task_read_only"])
+    @provide_session
+    def test_should_respond_200(self, username, session):
         self.create_task_instances(session)
         # Update ti and set operator to None to
         # test that operator field is nullable.
@@ -144,7 +173,7 @@ class TestGetTaskInstance(TestTaskInstanceEndpoint):
         session.commit()
         response = self.client.get(
             "/api/v1/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context",
-            environ_overrides={"REMOTE_USER": "test"},
+            environ_overrides={"REMOTE_USER": username},
         )
         assert response.status_code == 200
         assert response.json == {
@@ -451,6 +480,7 @@ class TestGetTaskInstancesBatch(TestTaskInstanceEndpoint):
                 True,
                 {"queue": ["test_queue_1", "test_queue_2"]},
                 2,
+                "test",
             ),
             (
                 "test pool filter",
@@ -462,6 +492,7 @@ class TestGetTaskInstancesBatch(TestTaskInstanceEndpoint):
                 True,
                 {"pool": ["test_pool_1", "test_pool_2"]},
                 2,
+                "test_dag_read_only",
             ),
             (
                 "test state filter",
@@ -473,6 +504,7 @@ class TestGetTaskInstancesBatch(TestTaskInstanceEndpoint):
                 False,
                 {"state": ["running", "queued"]},
                 2,
+                "test_task_read_only",
             ),
             (
                 "test duration filter",
@@ -484,6 +516,7 @@ class TestGetTaskInstancesBatch(TestTaskInstanceEndpoint):
                 True,
                 {"duration_gte": 100, "duration_lte": 200},
                 3,
+                "test",
             ),
             (
                 "test end date filter",
@@ -498,6 +531,7 @@ class TestGetTaskInstancesBatch(TestTaskInstanceEndpoint):
                     "end_date_lte": DEFAULT_DATETIME_STR_2,
                 },
                 2,
+                "test_task_read_only",
             ),
             (
                 "test start date filter",
@@ -512,6 +546,7 @@ class TestGetTaskInstancesBatch(TestTaskInstanceEndpoint):
                     "start_date_lte": DEFAULT_DATETIME_STR_2,
                 },
                 2,
+                "test_dag_read_only",
             ),
             (
                 "with execution date filter",
@@ -529,11 +564,14 @@ class TestGetTaskInstancesBatch(TestTaskInstanceEndpoint):
                     "execution_date_lte": (DEFAULT_DATETIME_1 + dt.timedelta(days=2)),
                 },
                 3,
+                "test",
             ),
         ]
     )
     @provide_session
-    def test_should_respond_200(self, _, task_instances, update_extras, payload, expected_ti_count, session):
+    def test_should_respond_200(
+        self, _, task_instances, update_extras, payload, expected_ti_count, username, session
+    ):
         self.create_task_instances(
             session,
             update_extras=update_extras,
@@ -541,7 +579,7 @@ class TestGetTaskInstancesBatch(TestTaskInstanceEndpoint):
         )
         response = self.client.post(
             "/api/v1/dags/~/dagRuns/~/taskInstances/list",
-            environ_overrides={"REMOTE_USER": "test"},
+            environ_overrides={"REMOTE_USER": username},
             json=payload,
         )
         assert response.status_code == 200, response.json
@@ -943,10 +981,11 @@ class TestPostClearTaskInstances(TestTaskInstanceEndpoint):
         )
         assert_401(response)
 
-    def test_should_raise_403_forbidden(self):
+    @parameterized.expand(["test_no_permissions", "test_dag_read_only", "test_task_read_only"])
+    def test_should_raise_403_forbidden(self, username: str):
         response = self.client.post(
             "/api/v1/dags/example_python_operator/clearTaskInstances",
-            environ_overrides={'REMOTE_USER': "test_no_permissions"},
+            environ_overrides={'REMOTE_USER': username},
             json={
                 "dry_run": False,
                 "reset_dag_runs": True,
@@ -1055,10 +1094,11 @@ class TestPostSetTaskInstanceState(TestTaskInstanceEndpoint):
         )
         assert_401(response)
 
-    def test_should_raise_403_forbidden(self):
+    @parameterized.expand(["test_no_permissions", "test_dag_read_only", "test_task_read_only"])
+    def test_should_raise_403_forbidden(self, username):
         response = self.client.post(
             "/api/v1/dags/example_python_operator/updateTaskInstancesState",
-            environ_overrides={'REMOTE_USER': "test_no_permissions"},
+            environ_overrides={'REMOTE_USER': username},
             json={
                 "dry_run": True,
                 "task_id": "print_the_context",
diff --git a/tests/www/views/test_views_acl.py b/tests/www/views/test_views_acl.py
index 82b77c0..de5a281 100644
--- a/tests/www/views/test_views_acl.py
+++ b/tests/www/views/test_views_acl.py
@@ -697,7 +697,7 @@ def user_all_dags_edit_tis(acl_app):
         username="user_all_dags_edit_tis",
         role_name="role_all_dags_edit_tis",
         permissions=[
-            (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+            (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
             (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
             (permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE),
         ],
diff --git a/tests/www/views/test_views_dagrun.py b/tests/www/views/test_views_dagrun.py
index 919adff..ebd4f30 100644
--- a/tests/www/views/test_views_dagrun.py
+++ b/tests/www/views/test_views_dagrun.py
@@ -16,11 +16,42 @@
 # specific language governing permissions and limitations
 # under the License.
 import pytest
+import werkzeug
 
 from airflow.models import DagBag, DagRun, TaskInstance
+from airflow.security import permissions
 from airflow.utils import timezone
 from airflow.utils.session import create_session
-from tests.test_utils.www import check_content_in_response
+from airflow.www.views import DagRunModelView
+from tests.test_utils.api_connexion_utils import create_user, delete_roles, delete_user
+from tests.test_utils.www import check_content_in_response, client_with_login
+from tests.www.views.test_views_tasks import _get_appbuilder_pk_string
+
+
+@pytest.fixture(scope="module")
+def client_dr_without_dag_edit(app):
+    create_user(
+        app,
+        username="all_dr_permissions_except_dag_edit",
+        role_name="all_dr_permissions_except_dag_edit",
+        permissions=[
+            (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+            (permissions.ACTION_CAN_CREATE, permissions.RESOURCE_DAG_RUN),
+            (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
+            (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG_RUN),
+            (permissions.ACTION_CAN_DELETE, permissions.RESOURCE_DAG_RUN),
+            (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_DAG_RUN),
+        ],
+    )
+
+    yield client_with_login(
+        app,
+        username="all_dr_permissions_except_dag_edit",
+        password="all_dr_permissions_except_dag_edit",
+    )
+
+    delete_user(app, username="all_dr_permissions_except_dag_edit")  # type: ignore
+    delete_roles(app)
 
 
 @pytest.fixture(scope="module", autouse=True)
@@ -42,6 +73,19 @@ def reset_dagrun():
         session.query(TaskInstance).delete()
 
 
+def test_create_dagrun_permission_denied(session, client_dr_without_dag_edit):
+    data = {
+        "state": "running",
+        "dag_id": "example_bash_operator",
+        "execution_date": "2018-07-06 05:06:03",
+        "run_id": "test_list_dagrun_includes_conf",
+        "conf": '{"include": "me"}',
+    }
+
+    with pytest.raises(werkzeug.test.ClientRedirectError):
+        client_dr_without_dag_edit.post('/dagrun/add', data=data, follow_redirects=True)
+
+
 @pytest.fixture()
 def running_dag_run(session):
     dag = DagBag().get_dag("example_bash_operator")
@@ -62,6 +106,22 @@ def running_dag_run(session):
     return dr
 
 
+def test_delete_dagrun(session, admin_client, running_dag_run):
+    composite_key = _get_appbuilder_pk_string(DagRunModelView, running_dag_run)
+    assert session.query(DagRun).filter(DagRun.dag_id == running_dag_run.dag_id).count() == 1
+    admin_client.post(f"/dagrun/delete/{composite_key}", follow_redirects=True)
+    assert session.query(DagRun).filter(DagRun.dag_id == running_dag_run.dag_id).count() == 0
+
+
+def test_delete_dagrun_permission_denied(session, client_dr_without_dag_edit, running_dag_run):
+    composite_key = _get_appbuilder_pk_string(DagRunModelView, running_dag_run)
+
+    assert session.query(DagRun).filter(DagRun.dag_id == running_dag_run.dag_id).count() == 1
+    resp = client_dr_without_dag_edit.post(f"/dagrun/delete/{composite_key}", follow_redirects=True)
+    assert resp.status_code == 404  # If it doesn't fully succeed it gives a 404.
+    assert session.query(DagRun).filter(DagRun.dag_id == running_dag_run.dag_id).count() == 1
+
+
 @pytest.mark.parametrize(
     "action, expected_ti_states, expected_message",
     [
@@ -134,3 +194,18 @@ def test_muldelete_dag_runs_action(session, admin_client, running_dag_run):
     assert resp.status_code == 200
     assert session.query(TaskInstance).count() == 0  # Deletes associated TIs.
     assert session.query(DagRun).filter(DagRun.id == dag_run_id).count() == 0
+
+
+@pytest.mark.parametrize(
+    "action",
+    ["clear", "set_success", "set_failed", "set_running"],
+    ids=["clear", "success", "failed", "running"],
+)
+def test_set_dag_runs_action_permission_denied(client_dr_without_dag_edit, running_dag_run, action):
+    running_dag_id = running_dag_run.id
+    resp = client_dr_without_dag_edit.post(
+        "/dagrun/action_post",
+        data={"action": action, "rowid": [str(running_dag_id)]},
+        follow_redirects=True,
+    )
+    check_content_in_response(f"Access denied for dag_id {running_dag_run.dag_id}", resp)
diff --git a/tests/www/views/test_views_decorators.py b/tests/www/views/test_views_decorators.py
index ce2ffe4..621d3a4 100644
--- a/tests/www/views/test_views_decorators.py
+++ b/tests/www/views/test_views_decorators.py
@@ -16,13 +16,17 @@
 # specific language governing permissions and limitations
 # under the License.
 import urllib.parse
+from typing import List
+from unittest import mock
 
 import pytest
 
-from airflow.models import DagBag, Log
+from airflow.models import DagBag, DagRun, Log, TaskInstance
 from airflow.utils import dates, timezone
 from airflow.utils.state import State
 from airflow.utils.types import DagRunType
+from airflow.www import app
+from airflow.www.views import action_has_dag_edit_access
 from tests.test_utils.db import clear_db_runs
 from tests.test_utils.www import check_content_in_response
 
@@ -78,6 +82,11 @@ def dagruns(bash_dag, sub_dag, xcom_dag):
     clear_db_runs()
 
 
+@action_has_dag_edit_access
+def some_view_action_which_requires_dag_edit_access(*args) -> bool:
+    return True
+
+
 def _check_last_log(session, dag_id, event, execution_date):
     logs = (
         session.query(
@@ -150,3 +159,48 @@ def test_calendar(admin_client, dagruns):
     datestr = bash_dagrun.execution_date.date().isoformat()
     expected = rf'{{\"date\":\"{datestr}\",\"state\":\"running\",\"count\":1}}'
     check_content_in_response(expected, resp)
+
+
+@pytest.mark.parametrize(
+    "class_type, no_instances, no_unique_dags",
+    [
+        (None, 0, 0),
+        (TaskInstance, 0, 0),
+        (TaskInstance, 1, 1),
+        (TaskInstance, 10, 1),
+        (TaskInstance, 10, 5),
+        (DagRun, 0, 0),
+        (DagRun, 1, 1),
+        (DagRun, 10, 1),
+        (DagRun, 10, 9),
+    ],
+)
+def test_action_has_dag_edit_access(create_task_instance, class_type, no_instances, no_unique_dags):
+    unique_dag_ids = [f"test_dag_id_{nr}" for nr in range(no_unique_dags)]
+    tis: List[TaskInstance] = [
+        create_task_instance(
+            task_id=f"test_task_instance_{nr}",
+            execution_date=timezone.datetime(2021, 1, 1 + nr),
+            dag_id=unique_dag_ids[nr % len(unique_dag_ids)],
+            run_id=f"test_run_id_{nr}",
+        )
+        for nr in range(no_instances)
+    ]
+    if class_type is None:
+        test_items = None
+    else:
+        test_items = tis if class_type == TaskInstance else [ti.get_dagrun() for ti in tis]
+        test_items = test_items[0] if len(test_items) == 1 else test_items
+
+    with app.create_app(testing=True).app_context():
+        with mock.patch("airflow.www.views.current_app.appbuilder.sm.can_edit_dag") as mocked_can_edit:
+            mocked_can_edit.return_value = True
+            assert not isinstance(test_items, list) or len(test_items) == no_instances
+            assert some_view_action_which_requires_dag_edit_access(None, test_items) is True
+            assert mocked_can_edit.call_count == no_unique_dags
+    clear_db_runs()
+
+
+def test_action_has_dag_edit_access_exception():
+    with pytest.raises(ValueError):
+        some_view_action_which_requires_dag_edit_access(None, "some_incorrect_value")
diff --git a/tests/www/views/test_views_tasks.py b/tests/www/views/test_views_tasks.py
index 9376ae2..05eca9a 100644
--- a/tests/www/views/test_views_tasks.py
+++ b/tests/www/views/test_views_tasks.py
@@ -27,6 +27,7 @@ from airflow import settings
 from airflow.executors.celery_executor import CeleryExecutor
 from airflow.models import DagBag, DagModel, TaskInstance
 from airflow.models.dagcode import DagCode
+from airflow.security import permissions
 from airflow.ti_deps.dependencies_states import QUEUEABLE_STATES, RUNNABLE_STATES
 from airflow.utils import dates, timezone
 from airflow.utils.log.logging_mixin import ExternalLoggingMixin
@@ -34,9 +35,10 @@ from airflow.utils.session import create_session
 from airflow.utils.state import State
 from airflow.utils.types import DagRunType
 from airflow.www.views import TaskInstanceModelView
+from tests.test_utils.api_connexion_utils import create_user, delete_roles, delete_user
 from tests.test_utils.config import conf_vars
 from tests.test_utils.db import clear_db_runs
-from tests.test_utils.www import check_content_in_response, check_content_not_in_response
+from tests.test_utils.www import check_content_in_response, check_content_not_in_response, client_with_login
 
 DEFAULT_DATE = dates.days_ago(2)
 
@@ -73,6 +75,32 @@ def init_dagruns(app, reset_dagruns):
     clear_db_runs()
 
 
+@pytest.fixture(scope="module")
+def client_ti_without_dag_edit(app):
+    create_user(
+        app,
+        username="all_ti_permissions_except_dag_edit",
+        role_name="all_ti_permissions_except_dag_edit",
+        permissions=[
+            (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+            (permissions.ACTION_CAN_CREATE, permissions.RESOURCE_TASK_INSTANCE),
+            (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
+            (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
+            (permissions.ACTION_CAN_DELETE, permissions.RESOURCE_TASK_INSTANCE),
+            (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_TASK_INSTANCE),
+        ],
+    )
+
+    yield client_with_login(
+        app,
+        username="all_ti_permissions_except_dag_edit",
+        password="all_ti_permissions_except_dag_edit",
+    )
+
+    delete_user(app, username="all_ti_permissions_except_dag_edit")  # type: ignore
+    delete_roles(app)
+
+
 @pytest.mark.parametrize(
     "url, contents",
     [
@@ -594,6 +622,35 @@ def _get_appbuilder_pk_string(model_view_cls, instance) -> str:
     return model_view_cls._serialize_pk_if_composite(model_view_cls, pk_value)
 
 
+def test_task_instance_delete(session, admin_client, create_task_instance):
+    task_instance_to_delete = create_task_instance(
+        task_id="test_task_instance_delete",
+        execution_date=timezone.utcnow(),
+        state=State.DEFERRED,
+    )
+    composite_key = _get_appbuilder_pk_string(TaskInstanceModelView, task_instance_to_delete)
+    task_id = task_instance_to_delete.task_id
+
+    assert session.query(TaskInstance).filter(TaskInstance.task_id == task_id).count() == 1
+    admin_client.post(f"/taskinstance/delete/{composite_key}", follow_redirects=True)
+    assert session.query(TaskInstance).filter(TaskInstance.task_id == task_id).count() == 0
+
+
+def test_task_instance_delete_permission_denied(session, client_ti_without_dag_edit, create_task_instance):
+    task_instance_to_delete = create_task_instance(
+        task_id="test_task_instance_delete_permission_denied",
+        execution_date=timezone.utcnow(),
+        state=State.DEFERRED,
+    )
+    composite_key = _get_appbuilder_pk_string(TaskInstanceModelView, task_instance_to_delete)
+    task_id = task_instance_to_delete.task_id
+
+    assert session.query(TaskInstance).filter(TaskInstance.task_id == task_id).count() == 1
+    resp = client_ti_without_dag_edit.post(f"/taskinstance/delete/{composite_key}", follow_redirects=True)
+    assert resp.status_code == 404  # If it doesn't fully succeed it gives a 404.
+    assert session.query(TaskInstance).filter(TaskInstance.task_id == task_id).count() == 1
+
+
 def test_task_instance_clear(session, admin_client):
     task_id = "runme_0"
 
@@ -673,3 +730,27 @@ def test_task_instance_set_state_failure(admin_client, action):
     )
     assert resp.status_code == 200
     check_content_in_response("Failed to set state", resp)
+
+
+@pytest.mark.parametrize(
+    "action",
+    ["clear", "set_success", "set_failed", "set_running"],
+    ids=["clear", "success", "failed", "running"],
+)
+def test_set_task_instance_action_permission_denied(session, client_ti_without_dag_edit, action):
+    task_id = "runme_0"
+
+    # Set the state to success for clearing.
+    ti_q = session.query(TaskInstance).filter(TaskInstance.task_id == task_id)
+    ti_q.update({"state": State.SUCCESS})
+    session.commit()
+
+    # Send a request to clear.
+    rowid = _get_appbuilder_pk_string(TaskInstanceModelView, ti_q.one())
+    expected_message = f"Access denied for dag_id {ti_q.one().dag_id}"
+    resp = client_ti_without_dag_edit.post(
+        "/taskinstance/action_post",
+        data={"action": action, "rowid": [rowid]},
+        follow_redirects=True,
+    )
+    check_content_in_response(expected_message, resp)