You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2022/06/29 15:19:50 UTC
[airflow] 05/45: Fix permission issue for dag that has dot in name (#23510)
This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit d7b58db1588474b287669dc58ae2ca72bd35e139
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Wed Jun 8 08:47:26 2022 +0100
Fix permission issue for dag that has dot in name (#23510)
How we determine if a DAG is a subdag in airflow.security.permissions.resource_name_for_dag is not right.
If a dag_id contains a dot, the permission is not recorded correctly.
The current solution makes a query every time we check for permission for dags that has a dot in the name. Not that I like it but I think it's better than other options I considered such as changing how we name dags for subdag. That's not
good in UX. Another option I considered was making a query when parsing, that's not good and it's avoided
by passing root_dag to resource_name_for_dag
Co-authored-by: Ash Berlin-Taylor <as...@firemirror.com>
Co-authored-by: Tzu-ping Chung <ur...@gmail.com>
(cherry picked from commit cc35fcaf89eeff3d89e18088c2e68f01f8baad56)
---
airflow/models/dagbag.py | 8 +++--
airflow/security/permissions.py | 19 ++++++------
airflow/www/security.py | 29 ++++++++++++++----
tests/www/test_security.py | 66 +++++++++++++++++++++++++++++++++++------
4 files changed, 95 insertions(+), 27 deletions(-)
diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py
index c0ef0941b6..929842fd0d 100644
--- a/airflow/models/dagbag.py
+++ b/airflow/models/dagbag.py
@@ -641,6 +641,8 @@ class DagBag(LoggingMixin):
from airflow.security.permissions import DAG_ACTIONS, resource_name_for_dag
from airflow.www.fab_security.sqla.models import Action, Permission, Resource
+ root_dag_id = dag.parent_dag.dag_id if dag.parent_dag else dag.dag_id
+
def needs_perms(dag_id: str) -> bool:
dag_resource_name = resource_name_for_dag(dag_id)
for permission_name in DAG_ACTIONS:
@@ -655,9 +657,9 @@ class DagBag(LoggingMixin):
return True
return False
- if dag.access_control or needs_perms(dag.dag_id):
- self.log.debug("Syncing DAG permissions: %s to the DB", dag.dag_id)
+ if dag.access_control or needs_perms(root_dag_id):
+ self.log.debug("Syncing DAG permissions: %s to the DB", root_dag_id)
from airflow.www.security import ApplessAirflowSecurityManager
security_manager = ApplessAirflowSecurityManager(session=session)
- security_manager.sync_perm_for_dag(dag.dag_id, dag.access_control)
+ security_manager.sync_perm_for_dag(root_dag_id, dag.access_control)
diff --git a/airflow/security/permissions.py b/airflow/security/permissions.py
index 2d5c0b9399..2d02c773b4 100644
--- a/airflow/security/permissions.py
+++ b/airflow/security/permissions.py
@@ -66,14 +66,15 @@ DEPRECATED_ACTION_CAN_DAG_EDIT = "can_dag_edit"
DAG_ACTIONS = {ACTION_CAN_READ, ACTION_CAN_EDIT, ACTION_CAN_DELETE}
-def resource_name_for_dag(dag_id):
- """Returns the resource name for a DAG id."""
- if dag_id == RESOURCE_DAG:
- return dag_id
+def resource_name_for_dag(root_dag_id: str) -> str:
+ """Returns the resource name for a DAG id.
- if dag_id.startswith(RESOURCE_DAG_PREFIX):
- return dag_id
-
- # To account for SubDags
- root_dag_id = dag_id.split(".")[0]
+ Note that since a sub-DAG should follow the permission of its
+ parent DAG, you should pass ``DagModel.root_dag_id`` to this function,
+ for a subdag. A normal dag should pass the ``DagModel.dag_id``.
+ """
+ if root_dag_id == RESOURCE_DAG:
+ return root_dag_id
+ if root_dag_id.startswith(RESOURCE_DAG_PREFIX):
+ return root_dag_id
return f"{RESOURCE_DAG_PREFIX}{root_dag_id}"
diff --git a/airflow/www/security.py b/airflow/www/security.py
index 42188f0618..de6b0d646e 100644
--- a/airflow/www/security.py
+++ b/airflow/www/security.py
@@ -200,6 +200,16 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin):
view.datamodel = CustomSQLAInterface(view.datamodel.obj)
self.perms = None
+ def _get_root_dag_id(self, dag_id):
+ if '.' in dag_id:
+ dm = (
+ self.get_session.query(DagModel.dag_id, DagModel.root_dag_id)
+ .filter(DagModel.dag_id == dag_id)
+ .first()
+ )
+ return dm.root_dag_id or dm.dag_id
+ return dag_id
+
def init_role(self, role_name, perms):
"""
Initialize the role with actions and related resources.
@@ -340,7 +350,8 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin):
def can_access_some_dags(self, action: str, dag_id: Optional[str] = None) -> bool:
"""Checks if user has read or write access to some dags."""
if dag_id and dag_id != '~':
- return self.has_access(action, permissions.resource_name_for_dag(dag_id))
+ root_dag_id = self._get_root_dag_id(dag_id)
+ return self.has_access(action, permissions.resource_name_for_dag(root_dag_id))
user = g.user
if action == permissions.ACTION_CAN_READ:
@@ -349,17 +360,20 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin):
def can_read_dag(self, dag_id, user=None) -> bool:
"""Determines whether a user has DAG read access."""
- dag_resource_name = permissions.resource_name_for_dag(dag_id)
+ root_dag_id = self._get_root_dag_id(dag_id)
+ dag_resource_name = permissions.resource_name_for_dag(root_dag_id)
return self.has_access(permissions.ACTION_CAN_READ, dag_resource_name, user=user)
def can_edit_dag(self, dag_id, user=None) -> bool:
"""Determines whether a user has DAG edit access."""
- dag_resource_name = permissions.resource_name_for_dag(dag_id)
+ root_dag_id = self._get_root_dag_id(dag_id)
+ dag_resource_name = permissions.resource_name_for_dag(root_dag_id)
return self.has_access(permissions.ACTION_CAN_EDIT, dag_resource_name, user=user)
def can_delete_dag(self, dag_id, user=None) -> bool:
"""Determines whether a user has DAG delete access."""
- dag_resource_name = permissions.resource_name_for_dag(dag_id)
+ root_dag_id = self._get_root_dag_id(dag_id)
+ dag_resource_name = permissions.resource_name_for_dag(root_dag_id)
return self.has_access(permissions.ACTION_CAN_DELETE, dag_resource_name, user=user)
def prefixed_dag_id(self, dag_id):
@@ -370,7 +384,8 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin):
DeprecationWarning,
stacklevel=2,
)
- return permissions.resource_name_for_dag(dag_id)
+ root_dag_id = self._get_root_dag_id(dag_id)
+ return permissions.resource_name_for_dag(root_dag_id)
def is_dag_resource(self, resource_name):
"""Determines if a resource belongs to a DAG or all DAGs."""
@@ -530,7 +545,8 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin):
dags = dagbag.dags.values()
for dag in dags:
- dag_resource_name = permissions.resource_name_for_dag(dag.dag_id)
+ root_dag_id = dag.parent_dag.dag_id if dag.parent_dag else dag.dag_id
+ dag_resource_name = permissions.resource_name_for_dag(root_dag_id)
for action_name in self.DAG_ACTIONS:
if (action_name, dag_resource_name) not in perms:
self._merge_perm(action_name, dag_resource_name)
@@ -615,6 +631,7 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin):
:param access_control: a dict where each key is a rolename and
each value is a set() of action names (e.g. {'can_read'})
"""
+
dag_resource_name = permissions.resource_name_for_dag(dag_id)
def _get_or_create_dag_permission(action_name: str) -> Optional[Permission]:
diff --git a/tests/www/test_security.py b/tests/www/test_security.py
index 8c90062600..7b8541ca81 100644
--- a/tests/www/test_security.py
+++ b/tests/www/test_security.py
@@ -192,7 +192,8 @@ def sample_dags(security_manager):
@pytest.fixture(scope="module")
def has_dag_perm(security_manager):
def _has_dag_perm(perm, dag_id, user):
- return security_manager.has_access(perm, permissions.resource_name_for_dag(dag_id), user)
+ root_dag_id = security_manager._get_root_dag_id(dag_id)
+ return security_manager.has_access(perm, permissions.resource_name_for_dag(root_dag_id), user)
return _has_dag_perm
@@ -351,7 +352,7 @@ def test_verify_anon_user_with_admin_role_has_access_to_each_dag(
user.roles = security_manager.get_user_roles(user)
assert user.roles == {security_manager.get_public_role()}
- test_dag_ids = ["test_dag_id_1", "test_dag_id_2", "test_dag_id_3"]
+ test_dag_ids = ["test_dag_id_1", "test_dag_id_2", "test_dag_id_3", "test_dag_id_4.with_dot"]
for dag_id in test_dag_ids:
with _create_dag_model_context(dag_id, session, security_manager):
@@ -588,7 +589,8 @@ def test_access_control_with_invalid_permission(app, security_manager):
for action in invalid_actions:
with pytest.raises(AirflowException) as ctx:
security_manager._sync_dag_view_permissions(
- 'access_control_test', access_control={rolename: {action}}
+ 'access_control_test',
+ access_control={rolename: {action}},
)
assert "invalid permissions" in str(ctx.value)
@@ -728,11 +730,13 @@ def test_create_dag_specific_permissions(session, security_manager, monkeypatch,
assert ('can_edit', dag_resource_name) in all_perms
security_manager._sync_dag_view_permissions.assert_called_once_with(
- permissions.resource_name_for_dag('has_access_control'), access_control
+ permissions.resource_name_for_dag('has_access_control'),
+ access_control,
)
del dagbag_mock.dags["has_access_control"]
- with assert_queries_count(1): # one query to get all perms; dagbag is mocked
+ with assert_queries_count(2): # two query to get all perms; dagbag is mocked
+ # The extra query happens at permission check
security_manager.create_dag_specific_permissions()
@@ -782,10 +786,12 @@ def test_prefixed_dag_id_is_deprecated(security_manager):
security_manager.prefixed_dag_id("hello")
-def test_parent_dag_access_applies_to_subdag(app, security_manager, assert_user_has_dag_perms):
+def test_parent_dag_access_applies_to_subdag(app, security_manager, assert_user_has_dag_perms, session):
username = 'dag_permission_user'
role_name = 'dag_permission_role'
parent_dag_name = "parent_dag"
+ subdag_name = parent_dag_name + ".subdag"
+ subsubdag_name = parent_dag_name + ".subdag.subsubdag"
with app.app_context():
mock_roles = [
{
@@ -801,15 +807,57 @@ def test_parent_dag_access_applies_to_subdag(app, security_manager, assert_user_
username=username,
role_name=role_name,
) as user:
+ dag1 = DagModel(dag_id=parent_dag_name)
+ dag2 = DagModel(dag_id=subdag_name, is_subdag=True, root_dag_id=parent_dag_name)
+ dag3 = DagModel(dag_id=subsubdag_name, is_subdag=True, root_dag_id=parent_dag_name)
+ session.add_all([dag1, dag2, dag3])
+ session.commit()
security_manager.bulk_sync_roles(mock_roles)
- security_manager._sync_dag_view_permissions(
- parent_dag_name, access_control={role_name: READ_WRITE}
- )
+ for dag in [dag1, dag2, dag3]:
+ security_manager._sync_dag_view_permissions(
+ parent_dag_name, access_control={role_name: READ_WRITE}
+ )
+
assert_user_has_dag_perms(perms=READ_WRITE, dag_id=parent_dag_name, user=user)
assert_user_has_dag_perms(perms=READ_WRITE, dag_id=parent_dag_name + ".subdag", user=user)
assert_user_has_dag_perms(
perms=READ_WRITE, dag_id=parent_dag_name + ".subdag.subsubdag", user=user
)
+ session.query(DagModel).delete()
+
+
+def test_permissions_work_for_dags_with_dot_in_dagname(
+ app, security_manager, assert_user_has_dag_perms, assert_user_does_not_have_dag_perms, session
+):
+ username = 'dag_permission_user'
+ role_name = 'dag_permission_role'
+ dag_id = "dag_id_1"
+ dag_id_2 = "dag_id_1.with_dot"
+ with app.app_context():
+ mock_roles = [
+ {
+ 'role': role_name,
+ 'perms': [
+ (permissions.ACTION_CAN_READ, f"DAG:{dag_id}"),
+ (permissions.ACTION_CAN_EDIT, f"DAG:{dag_id}"),
+ ],
+ }
+ ]
+ with create_user_scope(
+ app,
+ username=username,
+ role_name=role_name,
+ ) as user:
+ dag1 = DagModel(dag_id=dag_id)
+ dag2 = DagModel(dag_id=dag_id_2)
+ session.add_all([dag1, dag2])
+ session.commit()
+ security_manager.bulk_sync_roles(mock_roles)
+ security_manager.sync_perm_for_dag(dag1.dag_id, access_control={role_name: READ_WRITE})
+ security_manager.sync_perm_for_dag(dag2.dag_id, access_control={role_name: READ_WRITE})
+ assert_user_has_dag_perms(perms=READ_WRITE, dag_id=dag_id, user=user)
+ assert_user_does_not_have_dag_perms(perms=READ_WRITE, dag_id=dag_id_2, user=user)
+ session.query(DagModel).delete()
def test_fab_models_use_airflow_base_meta():