You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2021/09/21 21:04:57 UTC
[airflow] branch main updated: Require can_edit on DAG privileges
to modify TaskInstances and DagRuns (#16634)
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new f74d0ab Require can_edit on DAG privileges to modify TaskInstances and DagRuns (#16634)
f74d0ab is described below
commit f74d0ab38e00b83b49e20d410736e2a8c42de095
Author: Jorrick Sleijster <jo...@gmail.com>
AuthorDate: Tue Sep 21 23:04:38 2021 +0200
Require can_edit on DAG privileges to modify TaskInstances and DagRuns (#16634)
---
INTHEWILD.md | 1 +
.../api_connexion/endpoints/dag_run_endpoint.py | 2 +-
.../endpoints/task_instance_endpoint.py | 4 +-
airflow/www/views.py | 134 +++++++++++++++------
.../endpoints/test_dag_run_endpoint.py | 23 +++-
.../endpoints/test_task_instance_endpoint.py | 58 +++++++--
tests/www/views/test_views_acl.py | 2 +-
tests/www/views/test_views_dagrun.py | 77 +++++++++++-
tests/www/views/test_views_decorators.py | 56 ++++++++-
tests/www/views/test_views_tasks.py | 83 ++++++++++++-
10 files changed, 386 insertions(+), 54 deletions(-)
diff --git a/INTHEWILD.md b/INTHEWILD.md
index c1a5b32..588db42 100644
--- a/INTHEWILD.md
+++ b/INTHEWILD.md
@@ -33,6 +33,7 @@ Currently, **officially** using Airflow:
1. [Accenture](https://www.accenture.com/au-en) [[@nijanthanvijayakumar](https://github.com/nijanthanvijayakumar)]
1. [AdBOOST](https://www.adboost.sk) [[AdBOOST](https://github.com/AdBOOST)]
1. [Adobe](https://www.adobe.com/) [[@mishikaSingh](https://github.com/mishikaSingh), [@ramandumcs](https://github.com/ramandumcs), [@vardancse](https://github.com/vardancse)]
+1. [Adyen](https://www.adyen.com/) [[@jorricks](https://github.com/jorricks)]
1. [Agari](https://github.com/agaridata) [[@r39132](https://github.com/r39132)]
1. [Agoda](https://agoda.com) [[@akki](https://github.com/akki)]
1. [Airbnb](https://airbnb.io/) [[@mistercrunch](https://github.com/mistercrunch), [@artwr](https://github.com/artwr)]
diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py b/airflow/api_connexion/endpoints/dag_run_endpoint.py
index 29e0efc..9327804 100644
--- a/airflow/api_connexion/endpoints/dag_run_endpoint.py
+++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py
@@ -40,7 +40,7 @@ from airflow.utils.types import DagRunType
@security.requires_access(
[
- (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+ (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_DELETE, permissions.RESOURCE_DAG_RUN),
]
)
diff --git a/airflow/api_connexion/endpoints/task_instance_endpoint.py b/airflow/api_connexion/endpoints/task_instance_endpoint.py
index 361d29e..cfa27ac 100644
--- a/airflow/api_connexion/endpoints/task_instance_endpoint.py
+++ b/airflow/api_connexion/endpoints/task_instance_endpoint.py
@@ -227,7 +227,7 @@ def get_task_instances_batch(session=None):
@security.requires_access(
[
- (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+ (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
]
@@ -261,7 +261,7 @@ def post_clear_task_instances(dag_id: str, session=None):
@security.requires_access(
[
- (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+ (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
]
diff --git a/airflow/www/views.py b/airflow/www/views.py
index ee09908..b2c8712 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -27,9 +27,10 @@ import sys
import traceback
from collections import defaultdict
from datetime import timedelta
+from functools import wraps
from json import JSONDecodeError
from operator import itemgetter
-from typing import Any, Iterable, List, Optional, Tuple
+from typing import Any, Callable, Iterable, List, Optional, Set, Tuple, Union
from urllib.parse import parse_qsl, unquote, urlencode, urlparse
import lazy_object_proxy
@@ -1515,7 +1516,7 @@ class Airflow(AirflowBaseView):
@expose('/run', methods=['POST'])
@auth.has_access(
[
- (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+ (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_CREATE, permissions.RESOURCE_TASK_INSTANCE),
]
)
@@ -1793,7 +1794,7 @@ class Airflow(AirflowBaseView):
@expose('/clear', methods=['POST'])
@auth.has_access(
[
- (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+ (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_DELETE, permissions.RESOURCE_TASK_INSTANCE),
]
)
@@ -1837,7 +1838,7 @@ class Airflow(AirflowBaseView):
@expose('/dagrun_clear', methods=['POST'])
@auth.has_access(
[
- (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+ (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_DELETE, permissions.RESOURCE_TASK_INSTANCE),
]
)
@@ -1859,7 +1860,7 @@ class Airflow(AirflowBaseView):
@expose('/blocked', methods=['POST'])
@auth.has_access(
[
- (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+ (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
]
)
@@ -1966,7 +1967,7 @@ class Airflow(AirflowBaseView):
@expose('/dagrun_failed', methods=['POST'])
@auth.has_access(
[
- (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+ (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG_RUN),
]
)
@@ -1982,7 +1983,7 @@ class Airflow(AirflowBaseView):
@expose('/dagrun_success', methods=['POST'])
@auth.has_access(
[
- (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+ (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG_RUN),
]
)
@@ -2026,7 +2027,7 @@ class Airflow(AirflowBaseView):
@expose('/confirm', methods=['GET'])
@auth.has_access(
[
- (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+ (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
]
)
@@ -2099,7 +2100,7 @@ class Airflow(AirflowBaseView):
@expose('/failed', methods=['POST'])
@auth.has_access(
[
- (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+ (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
]
)
@@ -2132,7 +2133,7 @@ class Airflow(AirflowBaseView):
@expose('/success', methods=['POST'])
@auth.has_access(
[
- (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+ (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
]
)
@@ -3140,6 +3141,14 @@ class DagFilter(BaseFilter):
return query.filter(self.model.dag_id.in_(filter_dag_ids))
+class DagEditFilter(BaseFilter):
+ """Filter using DagIDs"""
+
+ def apply(self, query, func): # pylint: disable=redefined-outer-name,unused-argument
+ filter_dag_ids = current_app.appbuilder.sm.get_editable_dag_ids(g.user)
+ return query.filter(self.model.dag_id.in_(filter_dag_ids))
+
+
class AirflowModelView(ModelView):
"""Airflow Mode View."""
@@ -3149,6 +3158,71 @@ class AirflowModelView(ModelView):
CustomSQLAInterface = wwwutils.CustomSQLAInterface
+class AirflowPrivilegeVerifierModelView(AirflowModelView):
+ """
+ This ModelView prevents ability to pass primary keys of objects relating to DAGs you shouldn't be able to
+ edit. This only holds for the add, update and delete operations.
+ You will still need to use the `action_has_dag_edit_access()` for actions.
+ """
+
+ @staticmethod
+ def validate_dag_edit_access(item: Union[DagRun, TaskInstance]):
+ """Validates whether the user has 'can_edit' access for this specific DAG."""
+ if not current_app.appbuilder.sm.can_edit_dag(item.dag_id):
+ raise AirflowException(f"Access denied for dag_id {item.dag_id}")
+
+ def pre_add(self, item: Union[DagRun, TaskInstance]):
+ self.validate_dag_edit_access(item)
+
+ def pre_update(self, item: Union[DagRun, TaskInstance]):
+ self.validate_dag_edit_access(item)
+
+ def pre_delete(self, item: Union[DagRun, TaskInstance]):
+ self.validate_dag_edit_access(item)
+
+ def post_add_redirect(self): # Required to prevent redirect loop
+ return redirect(self.get_default_url())
+
+ def post_edit_redirect(self): # Required to prevent redirect loop
+ return redirect(self.get_default_url())
+
+ def post_delete_redirect(self): # Required to prevent redirect loop
+ return redirect(self.get_default_url())
+
+
+def action_has_dag_edit_access(action_func: Callable) -> Callable:
+ """Decorator for actions which verifies you have DAG edit access on the given tis/drs."""
+
+ @wraps(action_func)
+ def check_dag_edit_acl_for_actions(
+ self,
+ items: Optional[Union[List[TaskInstance], List[DagRun], TaskInstance, DagRun]],
+ *args,
+ **kwargs,
+ ) -> None:
+ if items is None:
+ dag_ids: Set[str] = set()
+ elif isinstance(items, list):
+ dag_ids = {item.dag_id for item in items if item is not None}
+ elif isinstance(items, TaskInstance) or isinstance(items, DagRun):
+ dag_ids = {items.dag_id}
+ else:
+ raise ValueError(
+ "Was expecting the first argument of the action to be of type "
+ "Optional[Union[List[TaskInstance], List[DagRun], TaskInstance, DagRun]]."
+ f"Was of type: {type(items)}"
+ )
+
+ for dag_id in dag_ids:
+ if not current_app.appbuilder.sm.can_edit_dag(dag_id):
+ flash(f"Access denied for dag_id {dag_id}", "danger")
+ logging.warning("User %s tried to modify %s without having access.", g.user.username, dag_id)
+ return redirect(self.get_default_url())
+ return action_func(self, items, *args, **kwargs)
+
+ return check_dag_edit_acl_for_actions
+
+
class SlaMissModelView(AirflowModelView):
"""View to show SlaMiss table"""
@@ -3812,7 +3886,7 @@ class JobModelView(AirflowModelView):
}
-class DagRunModelView(AirflowModelView):
+class DagRunModelView(AirflowPrivilegeVerifierModelView):
"""View to show records from DagRun table"""
route_base = '/dagrun'
@@ -3861,7 +3935,7 @@ class DagRunModelView(AirflowModelView):
base_order = ('execution_date', 'desc')
- base_filters = [['dag_id', DagFilter, lambda: []]]
+ base_filters = [['dag_id', DagEditFilter, lambda: []]]
edit_form = DagRunEditForm
@@ -3876,6 +3950,7 @@ class DagRunModelView(AirflowModelView):
}
@action('muldelete', "Delete", "Are you sure you want to delete selected records?", single=False)
+ @action_has_dag_edit_access
@provide_session
def action_muldelete(self, items, session=None):
"""Multiple delete."""
@@ -3884,6 +3959,7 @@ class DagRunModelView(AirflowModelView):
return redirect(self.get_redirect())
@action('set_running', "Set state to 'running'", '', single=False)
+ @action_has_dag_edit_access
@provide_session
def action_set_running(self, drs, session=None):
"""Set state to running."""
@@ -3906,6 +3982,7 @@ class DagRunModelView(AirflowModelView):
"All running task instances would also be marked as failed, are you sure?",
single=False,
)
+ @action_has_dag_edit_access
@provide_session
def action_set_failed(self, drs, session=None):
"""Set state to failed."""
@@ -3932,6 +4009,7 @@ class DagRunModelView(AirflowModelView):
"All task instances would also be marked as success, are you sure?",
single=False,
)
+ @action_has_dag_edit_access
@provide_session
def action_set_success(self, drs, session=None):
"""Set state to success."""
@@ -3953,6 +4031,7 @@ class DagRunModelView(AirflowModelView):
return redirect(self.get_default_url())
@action('clear', "Clear the state", "All task instances would be cleared, are you sure?", single=False)
+ @action_has_dag_edit_access
@provide_session
def action_clear(self, drs, session=None):
"""Clears the state."""
@@ -4114,7 +4193,7 @@ class TriggerModelView(AirflowModelView):
}
-class TaskInstanceModelView(AirflowModelView):
+class TaskInstanceModelView(AirflowPrivilegeVerifierModelView):
"""View to show records from TaskInstance table"""
route_base = '/taskinstance'
@@ -4198,7 +4277,7 @@ class TaskInstanceModelView(AirflowModelView):
base_order = ('job_id', 'asc')
- base_filters = [['dag_id', DagFilter, lambda: []]]
+ base_filters = [['dag_id', DagEditFilter, lambda: []]]
def log_url_formatter(self):
"""Formats log URL."""
@@ -4229,7 +4308,6 @@ class TaskInstanceModelView(AirflowModelView):
'duration': duration_f,
}
- @provide_session
@action(
'clear',
lazy_gettext('Clear'),
@@ -4239,6 +4317,8 @@ class TaskInstanceModelView(AirflowModelView):
),
single=False,
)
+ @action_has_dag_edit_access
+ @provide_session
def action_clear(self, task_instances, session=None):
"""Clears the action."""
try:
@@ -4271,11 +4351,7 @@ class TaskInstanceModelView(AirflowModelView):
flash('Failed to set state', 'error')
@action('set_running', "Set state to 'running'", '', single=False)
- @auth.has_access(
- [
- (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
- ]
- )
+ @action_has_dag_edit_access
def action_set_running(self, tis):
"""Set state to 'running'"""
self.set_task_instance_state(tis, State.RUNNING)
@@ -4283,11 +4359,7 @@ class TaskInstanceModelView(AirflowModelView):
return redirect(self.get_redirect())
@action('set_failed', "Set state to 'failed'", '', single=False)
- @auth.has_access(
- [
- (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
- ]
- )
+ @action_has_dag_edit_access
def action_set_failed(self, tis):
"""Set state to 'failed'"""
self.set_task_instance_state(tis, State.FAILED)
@@ -4295,11 +4367,7 @@ class TaskInstanceModelView(AirflowModelView):
return redirect(self.get_redirect())
@action('set_success', "Set state to 'success'", '', single=False)
- @auth.has_access(
- [
- (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
- ]
- )
+ @action_has_dag_edit_access
def action_set_success(self, tis):
"""Set state to 'success'"""
self.set_task_instance_state(tis, State.SUCCESS)
@@ -4307,11 +4375,7 @@ class TaskInstanceModelView(AirflowModelView):
return redirect(self.get_redirect())
@action('set_retry', "Set state to 'up_for_retry'", '', single=False)
- @auth.has_access(
- [
- (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
- ]
- )
+ @action_has_dag_edit_access
def action_set_retry(self, tis):
"""Set state to 'up_for_retry'"""
self.set_task_instance_state(tis, State.UP_FOR_RETRY)
diff --git a/tests/api_connexion/endpoints/test_dag_run_endpoint.py b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
index 47d89cb..69c06b7 100644
--- a/tests/api_connexion/endpoints/test_dag_run_endpoint.py
+++ b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
@@ -28,7 +28,7 @@ from airflow.security import permissions
from airflow.utils import timezone
from airflow.utils.session import create_session, provide_session
from airflow.utils.types import DagRunType
-from tests.test_utils.api_connexion_utils import assert_401, create_user, delete_user
+from tests.test_utils.api_connexion_utils import assert_401, create_user, delete_roles, delete_user
from tests.test_utils.config import conf_vars
from tests.test_utils.db import clear_db_dags, clear_db_runs
@@ -52,6 +52,18 @@ def configured_app(minimal_app_for_api):
)
create_user(
app, # type: ignore
+ username="test_dag_view_only",
+ role_name="TestViewDags",
+ permissions=[
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+ (permissions.ACTION_CAN_CREATE, permissions.RESOURCE_DAG_RUN),
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
+ (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG_RUN),
+ (permissions.ACTION_CAN_DELETE, permissions.RESOURCE_DAG_RUN),
+ ],
+ )
+ create_user(
+ app, # type: ignore
username="test_view_dags",
role_name="TestViewDags",
permissions=[
@@ -74,9 +86,11 @@ def configured_app(minimal_app_for_api):
yield app
delete_user(app, username="test") # type: ignore
+ delete_user(app, username="test_dag_view_only") # type: ignore
delete_user(app, username="test_view_dags") # type: ignore
delete_user(app, username="test_granular_permissions") # type: ignore
delete_user(app, username="test_no_permissions") # type: ignore
+ delete_roles(app)
class TestDagRunEndpoint:
@@ -1145,7 +1159,10 @@ class TestPostDagRun(TestDagRunEndpoint):
assert_401(response)
- def test_should_raises_403_unauthorized(self):
+ @parameterized.expand(
+ ["test_dag_view_only", "test_view_dags", "test_granular_permissions", "test_no_permissions"]
+ )
+ def test_should_raises_403_unauthorized(self, username):
self._create_dag("TEST_DAG_ID")
response = self.client.post(
"api/v1/dags/TEST_DAG_ID/dagRuns",
@@ -1153,7 +1170,7 @@ class TestPostDagRun(TestDagRunEndpoint):
"dag_run_id": "TEST_DAG_RUN_ID_1",
"execution_date": self.default_time,
},
- environ_overrides={'REMOTE_USER': "test_view_dags"},
+ environ_overrides={'REMOTE_USER': username},
)
assert response.status_code == 403
diff --git a/tests/api_connexion/endpoints/test_task_instance_endpoint.py b/tests/api_connexion/endpoints/test_task_instance_endpoint.py
index 7696b76..8ed4cda 100644
--- a/tests/api_connexion/endpoints/test_task_instance_endpoint.py
+++ b/tests/api_connexion/endpoints/test_task_instance_endpoint.py
@@ -28,7 +28,7 @@ from airflow.utils.session import provide_session
from airflow.utils.state import State
from airflow.utils.timezone import datetime
from airflow.utils.types import DagRunType
-from tests.test_utils.api_connexion_utils import assert_401, create_user, delete_user
+from tests.test_utils.api_connexion_utils import assert_401, create_user, delete_roles, delete_user
from tests.test_utils.db import clear_db_runs, clear_db_sla_miss
DEFAULT_DATETIME_1 = datetime(2020, 1, 1)
@@ -45,16 +45,43 @@ def configured_app(minimal_app_for_api):
role_name="Test",
permissions=[
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+ (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
],
)
+ create_user(
+ app, # type: ignore
+ username="test_dag_read_only",
+ role_name="TestDagReadOnly",
+ permissions=[
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
+ (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
+ ],
+ )
+ create_user(
+ app, # type: ignore
+ username="test_task_read_only",
+ role_name="TestTaskReadOnly",
+ permissions=[
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+ (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
+ ],
+ )
create_user(app, username="test_no_permissions", role_name="TestNoPermissions") # type: ignore
yield app
delete_user(app, username="test") # type: ignore
+ delete_user(app, username="test_dag_read_only") # type: ignore
+ delete_user(app, username="test_task_read_only") # type: ignore
+ delete_user(app, username="test_no_permissions") # type: ignore
+ delete_roles(app)
class TestTaskInstanceEndpoint:
@@ -133,7 +160,9 @@ class TestTaskInstanceEndpoint:
class TestGetTaskInstance(TestTaskInstanceEndpoint):
- def test_should_respond_200(self, session):
+ @parameterized.expand(["test", "test_dag_read_only", "test_task_read_only"])
+ @provide_session
+ def test_should_respond_200(self, username, session):
self.create_task_instances(session)
# Update ti and set operator to None to
# test that operator field is nullable.
@@ -144,7 +173,7 @@ class TestGetTaskInstance(TestTaskInstanceEndpoint):
session.commit()
response = self.client.get(
"/api/v1/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context",
- environ_overrides={"REMOTE_USER": "test"},
+ environ_overrides={"REMOTE_USER": username},
)
assert response.status_code == 200
assert response.json == {
@@ -451,6 +480,7 @@ class TestGetTaskInstancesBatch(TestTaskInstanceEndpoint):
True,
{"queue": ["test_queue_1", "test_queue_2"]},
2,
+ "test",
),
(
"test pool filter",
@@ -462,6 +492,7 @@ class TestGetTaskInstancesBatch(TestTaskInstanceEndpoint):
True,
{"pool": ["test_pool_1", "test_pool_2"]},
2,
+ "test_dag_read_only",
),
(
"test state filter",
@@ -473,6 +504,7 @@ class TestGetTaskInstancesBatch(TestTaskInstanceEndpoint):
False,
{"state": ["running", "queued"]},
2,
+ "test_task_read_only",
),
(
"test duration filter",
@@ -484,6 +516,7 @@ class TestGetTaskInstancesBatch(TestTaskInstanceEndpoint):
True,
{"duration_gte": 100, "duration_lte": 200},
3,
+ "test",
),
(
"test end date filter",
@@ -498,6 +531,7 @@ class TestGetTaskInstancesBatch(TestTaskInstanceEndpoint):
"end_date_lte": DEFAULT_DATETIME_STR_2,
},
2,
+ "test_task_read_only",
),
(
"test start date filter",
@@ -512,6 +546,7 @@ class TestGetTaskInstancesBatch(TestTaskInstanceEndpoint):
"start_date_lte": DEFAULT_DATETIME_STR_2,
},
2,
+ "test_dag_read_only",
),
(
"with execution date filter",
@@ -529,11 +564,14 @@ class TestGetTaskInstancesBatch(TestTaskInstanceEndpoint):
"execution_date_lte": (DEFAULT_DATETIME_1 + dt.timedelta(days=2)),
},
3,
+ "test",
),
]
)
@provide_session
- def test_should_respond_200(self, _, task_instances, update_extras, payload, expected_ti_count, session):
+ def test_should_respond_200(
+ self, _, task_instances, update_extras, payload, expected_ti_count, username, session
+ ):
self.create_task_instances(
session,
update_extras=update_extras,
@@ -541,7 +579,7 @@ class TestGetTaskInstancesBatch(TestTaskInstanceEndpoint):
)
response = self.client.post(
"/api/v1/dags/~/dagRuns/~/taskInstances/list",
- environ_overrides={"REMOTE_USER": "test"},
+ environ_overrides={"REMOTE_USER": username},
json=payload,
)
assert response.status_code == 200, response.json
@@ -943,10 +981,11 @@ class TestPostClearTaskInstances(TestTaskInstanceEndpoint):
)
assert_401(response)
- def test_should_raise_403_forbidden(self):
+ @parameterized.expand(["test_no_permissions", "test_dag_read_only", "test_task_read_only"])
+ def test_should_raise_403_forbidden(self, username: str):
response = self.client.post(
"/api/v1/dags/example_python_operator/clearTaskInstances",
- environ_overrides={'REMOTE_USER': "test_no_permissions"},
+ environ_overrides={'REMOTE_USER': username},
json={
"dry_run": False,
"reset_dag_runs": True,
@@ -1055,10 +1094,11 @@ class TestPostSetTaskInstanceState(TestTaskInstanceEndpoint):
)
assert_401(response)
- def test_should_raise_403_forbidden(self):
+ @parameterized.expand(["test_no_permissions", "test_dag_read_only", "test_task_read_only"])
+ def test_should_raise_403_forbidden(self, username):
response = self.client.post(
"/api/v1/dags/example_python_operator/updateTaskInstancesState",
- environ_overrides={'REMOTE_USER': "test_no_permissions"},
+ environ_overrides={'REMOTE_USER': username},
json={
"dry_run": True,
"task_id": "print_the_context",
diff --git a/tests/www/views/test_views_acl.py b/tests/www/views/test_views_acl.py
index 82b77c0..de5a281 100644
--- a/tests/www/views/test_views_acl.py
+++ b/tests/www/views/test_views_acl.py
@@ -697,7 +697,7 @@ def user_all_dags_edit_tis(acl_app):
username="user_all_dags_edit_tis",
role_name="role_all_dags_edit_tis",
permissions=[
- (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+ (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
(permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE),
],
diff --git a/tests/www/views/test_views_dagrun.py b/tests/www/views/test_views_dagrun.py
index 919adff..ebd4f30 100644
--- a/tests/www/views/test_views_dagrun.py
+++ b/tests/www/views/test_views_dagrun.py
@@ -16,11 +16,42 @@
# specific language governing permissions and limitations
# under the License.
import pytest
+import werkzeug
from airflow.models import DagBag, DagRun, TaskInstance
+from airflow.security import permissions
from airflow.utils import timezone
from airflow.utils.session import create_session
-from tests.test_utils.www import check_content_in_response
+from airflow.www.views import DagRunModelView
+from tests.test_utils.api_connexion_utils import create_user, delete_roles, delete_user
+from tests.test_utils.www import check_content_in_response, client_with_login
+from tests.www.views.test_views_tasks import _get_appbuilder_pk_string
+
+
+@pytest.fixture(scope="module")
+def client_dr_without_dag_edit(app):
+ create_user(
+ app,
+ username="all_dr_permissions_except_dag_edit",
+ role_name="all_dr_permissions_except_dag_edit",
+ permissions=[
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+ (permissions.ACTION_CAN_CREATE, permissions.RESOURCE_DAG_RUN),
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
+ (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG_RUN),
+ (permissions.ACTION_CAN_DELETE, permissions.RESOURCE_DAG_RUN),
+ (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_DAG_RUN),
+ ],
+ )
+
+ yield client_with_login(
+ app,
+ username="all_dr_permissions_except_dag_edit",
+ password="all_dr_permissions_except_dag_edit",
+ )
+
+ delete_user(app, username="all_dr_permissions_except_dag_edit") # type: ignore
+ delete_roles(app)
@pytest.fixture(scope="module", autouse=True)
@@ -42,6 +73,19 @@ def reset_dagrun():
session.query(TaskInstance).delete()
+def test_create_dagrun_permission_denied(session, client_dr_without_dag_edit):
+ data = {
+ "state": "running",
+ "dag_id": "example_bash_operator",
+ "execution_date": "2018-07-06 05:06:03",
+ "run_id": "test_list_dagrun_includes_conf",
+ "conf": '{"include": "me"}',
+ }
+
+ with pytest.raises(werkzeug.test.ClientRedirectError):
+ client_dr_without_dag_edit.post('/dagrun/add', data=data, follow_redirects=True)
+
+
@pytest.fixture()
def running_dag_run(session):
dag = DagBag().get_dag("example_bash_operator")
@@ -62,6 +106,22 @@ def running_dag_run(session):
return dr
+def test_delete_dagrun(session, admin_client, running_dag_run):
+ composite_key = _get_appbuilder_pk_string(DagRunModelView, running_dag_run)
+ assert session.query(DagRun).filter(DagRun.dag_id == running_dag_run.dag_id).count() == 1
+ admin_client.post(f"/dagrun/delete/{composite_key}", follow_redirects=True)
+ assert session.query(DagRun).filter(DagRun.dag_id == running_dag_run.dag_id).count() == 0
+
+
+def test_delete_dagrun_permission_denied(session, client_dr_without_dag_edit, running_dag_run):
+ composite_key = _get_appbuilder_pk_string(DagRunModelView, running_dag_run)
+
+ assert session.query(DagRun).filter(DagRun.dag_id == running_dag_run.dag_id).count() == 1
+ resp = client_dr_without_dag_edit.post(f"/dagrun/delete/{composite_key}", follow_redirects=True)
+ assert resp.status_code == 404 # If it doesn't fully succeed it gives a 404.
+ assert session.query(DagRun).filter(DagRun.dag_id == running_dag_run.dag_id).count() == 1
+
+
@pytest.mark.parametrize(
"action, expected_ti_states, expected_message",
[
@@ -134,3 +194,18 @@ def test_muldelete_dag_runs_action(session, admin_client, running_dag_run):
assert resp.status_code == 200
assert session.query(TaskInstance).count() == 0 # Deletes associated TIs.
assert session.query(DagRun).filter(DagRun.id == dag_run_id).count() == 0
+
+
+@pytest.mark.parametrize(
+ "action",
+ ["clear", "set_success", "set_failed", "set_running"],
+ ids=["clear", "success", "failed", "running"],
+)
+def test_set_dag_runs_action_permission_denied(client_dr_without_dag_edit, running_dag_run, action):
+ running_dag_id = running_dag_run.id
+ resp = client_dr_without_dag_edit.post(
+ "/dagrun/action_post",
+ data={"action": action, "rowid": [str(running_dag_id)]},
+ follow_redirects=True,
+ )
+ check_content_in_response(f"Access denied for dag_id {running_dag_run.dag_id}", resp)
diff --git a/tests/www/views/test_views_decorators.py b/tests/www/views/test_views_decorators.py
index ce2ffe4..621d3a4 100644
--- a/tests/www/views/test_views_decorators.py
+++ b/tests/www/views/test_views_decorators.py
@@ -16,13 +16,17 @@
# specific language governing permissions and limitations
# under the License.
import urllib.parse
+from typing import List
+from unittest import mock
import pytest
-from airflow.models import DagBag, Log
+from airflow.models import DagBag, DagRun, Log, TaskInstance
from airflow.utils import dates, timezone
from airflow.utils.state import State
from airflow.utils.types import DagRunType
+from airflow.www import app
+from airflow.www.views import action_has_dag_edit_access
from tests.test_utils.db import clear_db_runs
from tests.test_utils.www import check_content_in_response
@@ -78,6 +82,11 @@ def dagruns(bash_dag, sub_dag, xcom_dag):
clear_db_runs()
+@action_has_dag_edit_access
+def some_view_action_which_requires_dag_edit_access(*args) -> bool:
+ return True
+
+
def _check_last_log(session, dag_id, event, execution_date):
logs = (
session.query(
@@ -150,3 +159,48 @@ def test_calendar(admin_client, dagruns):
datestr = bash_dagrun.execution_date.date().isoformat()
expected = rf'{{\"date\":\"{datestr}\",\"state\":\"running\",\"count\":1}}'
check_content_in_response(expected, resp)
+
+
+@pytest.mark.parametrize(
+ "class_type, no_instances, no_unique_dags",
+ [
+ (None, 0, 0),
+ (TaskInstance, 0, 0),
+ (TaskInstance, 1, 1),
+ (TaskInstance, 10, 1),
+ (TaskInstance, 10, 5),
+ (DagRun, 0, 0),
+ (DagRun, 1, 1),
+ (DagRun, 10, 1),
+ (DagRun, 10, 9),
+ ],
+)
+def test_action_has_dag_edit_access(create_task_instance, class_type, no_instances, no_unique_dags):
+ unique_dag_ids = [f"test_dag_id_{nr}" for nr in range(no_unique_dags)]
+ tis: List[TaskInstance] = [
+ create_task_instance(
+ task_id=f"test_task_instance_{nr}",
+ execution_date=timezone.datetime(2021, 1, 1 + nr),
+ dag_id=unique_dag_ids[nr % len(unique_dag_ids)],
+ run_id=f"test_run_id_{nr}",
+ )
+ for nr in range(no_instances)
+ ]
+ if class_type is None:
+ test_items = None
+ else:
+ test_items = tis if class_type == TaskInstance else [ti.get_dagrun() for ti in tis]
+ test_items = test_items[0] if len(test_items) == 1 else test_items
+
+ with app.create_app(testing=True).app_context():
+ with mock.patch("airflow.www.views.current_app.appbuilder.sm.can_edit_dag") as mocked_can_edit:
+ mocked_can_edit.return_value = True
+ assert not isinstance(test_items, list) or len(test_items) == no_instances
+ assert some_view_action_which_requires_dag_edit_access(None, test_items) is True
+ assert mocked_can_edit.call_count == no_unique_dags
+ clear_db_runs()
+
+
+def test_action_has_dag_edit_access_exception():
+ with pytest.raises(ValueError):
+ some_view_action_which_requires_dag_edit_access(None, "some_incorrect_value")
diff --git a/tests/www/views/test_views_tasks.py b/tests/www/views/test_views_tasks.py
index 9376ae2..05eca9a 100644
--- a/tests/www/views/test_views_tasks.py
+++ b/tests/www/views/test_views_tasks.py
@@ -27,6 +27,7 @@ from airflow import settings
from airflow.executors.celery_executor import CeleryExecutor
from airflow.models import DagBag, DagModel, TaskInstance
from airflow.models.dagcode import DagCode
+from airflow.security import permissions
from airflow.ti_deps.dependencies_states import QUEUEABLE_STATES, RUNNABLE_STATES
from airflow.utils import dates, timezone
from airflow.utils.log.logging_mixin import ExternalLoggingMixin
@@ -34,9 +35,10 @@ from airflow.utils.session import create_session
from airflow.utils.state import State
from airflow.utils.types import DagRunType
from airflow.www.views import TaskInstanceModelView
+from tests.test_utils.api_connexion_utils import create_user, delete_roles, delete_user
from tests.test_utils.config import conf_vars
from tests.test_utils.db import clear_db_runs
-from tests.test_utils.www import check_content_in_response, check_content_not_in_response
+from tests.test_utils.www import check_content_in_response, check_content_not_in_response, client_with_login
DEFAULT_DATE = dates.days_ago(2)
@@ -73,6 +75,32 @@ def init_dagruns(app, reset_dagruns):
clear_db_runs()
+@pytest.fixture(scope="module")
+def client_ti_without_dag_edit(app):
+ create_user(
+ app,
+ username="all_ti_permissions_except_dag_edit",
+ role_name="all_ti_permissions_except_dag_edit",
+ permissions=[
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+ (permissions.ACTION_CAN_CREATE, permissions.RESOURCE_TASK_INSTANCE),
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
+ (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
+ (permissions.ACTION_CAN_DELETE, permissions.RESOURCE_TASK_INSTANCE),
+ (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_TASK_INSTANCE),
+ ],
+ )
+
+ yield client_with_login(
+ app,
+ username="all_ti_permissions_except_dag_edit",
+ password="all_ti_permissions_except_dag_edit",
+ )
+
+ delete_user(app, username="all_ti_permissions_except_dag_edit") # type: ignore
+ delete_roles(app)
+
+
@pytest.mark.parametrize(
"url, contents",
[
@@ -594,6 +622,35 @@ def _get_appbuilder_pk_string(model_view_cls, instance) -> str:
return model_view_cls._serialize_pk_if_composite(model_view_cls, pk_value)
+def test_task_instance_delete(session, admin_client, create_task_instance):
+ task_instance_to_delete = create_task_instance(
+ task_id="test_task_instance_delete",
+ execution_date=timezone.utcnow(),
+ state=State.DEFERRED,
+ )
+ composite_key = _get_appbuilder_pk_string(TaskInstanceModelView, task_instance_to_delete)
+ task_id = task_instance_to_delete.task_id
+
+ assert session.query(TaskInstance).filter(TaskInstance.task_id == task_id).count() == 1
+ admin_client.post(f"/taskinstance/delete/{composite_key}", follow_redirects=True)
+ assert session.query(TaskInstance).filter(TaskInstance.task_id == task_id).count() == 0
+
+
+def test_task_instance_delete_permission_denied(session, client_ti_without_dag_edit, create_task_instance):
+ task_instance_to_delete = create_task_instance(
+ task_id="test_task_instance_delete_permission_denied",
+ execution_date=timezone.utcnow(),
+ state=State.DEFERRED,
+ )
+ composite_key = _get_appbuilder_pk_string(TaskInstanceModelView, task_instance_to_delete)
+ task_id = task_instance_to_delete.task_id
+
+ assert session.query(TaskInstance).filter(TaskInstance.task_id == task_id).count() == 1
+ resp = client_ti_without_dag_edit.post(f"/taskinstance/delete/{composite_key}", follow_redirects=True)
+ assert resp.status_code == 404 # If it doesn't fully succeed it gives a 404.
+ assert session.query(TaskInstance).filter(TaskInstance.task_id == task_id).count() == 1
+
+
def test_task_instance_clear(session, admin_client):
task_id = "runme_0"
@@ -673,3 +730,27 @@ def test_task_instance_set_state_failure(admin_client, action):
)
assert resp.status_code == 200
check_content_in_response("Failed to set state", resp)
+
+
+@pytest.mark.parametrize(
+ "action",
+ ["clear", "set_success", "set_failed", "set_running"],
+ ids=["clear", "success", "failed", "running"],
+)
+def test_set_task_instance_action_permission_denied(session, client_ti_without_dag_edit, action):
+ task_id = "runme_0"
+
+ # Set the state to success for clearing.
+ ti_q = session.query(TaskInstance).filter(TaskInstance.task_id == task_id)
+ ti_q.update({"state": State.SUCCESS})
+ session.commit()
+
+ # Send a request to clear.
+ rowid = _get_appbuilder_pk_string(TaskInstanceModelView, ti_q.one())
+ expected_message = f"Access denied for dag_id {ti_q.one().dag_id}"
+ resp = client_ti_without_dag_edit.post(
+ "/taskinstance/action_post",
+ data={"action": action, "rowid": [rowid]},
+ follow_redirects=True,
+ )
+ check_content_in_response(expected_message, resp)