You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2022/06/29 15:19:50 UTC

[airflow] 05/45: Fix permission issue for dag that has dot in name (#23510)

This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit d7b58db1588474b287669dc58ae2ca72bd35e139
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Wed Jun 8 08:47:26 2022 +0100

    Fix permission issue for dag that has dot in name (#23510)
    
    How we determine if a DAG is a subdag in airflow.security.permissions.resource_name_for_dag is not right.
    If a dag_id contains a dot, the permission is not recorded correctly.
    
    The current solution makes a query every time we check for permission for dags that has a dot in the name. Not that I like it but I think it's better than other options I considered such as changing how we name dags for subdag. That's not
    good in UX. Another option I considered was making a query when parsing, that's not good and it's avoided
    by passing root_dag to resource_name_for_dag
    
    Co-authored-by: Ash Berlin-Taylor <as...@firemirror.com>
    Co-authored-by: Tzu-ping Chung <ur...@gmail.com>
    (cherry picked from commit cc35fcaf89eeff3d89e18088c2e68f01f8baad56)
---
 airflow/models/dagbag.py        |  8 +++--
 airflow/security/permissions.py | 19 ++++++------
 airflow/www/security.py         | 29 ++++++++++++++----
 tests/www/test_security.py      | 66 +++++++++++++++++++++++++++++++++++------
 4 files changed, 95 insertions(+), 27 deletions(-)

diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py
index c0ef0941b6..929842fd0d 100644
--- a/airflow/models/dagbag.py
+++ b/airflow/models/dagbag.py
@@ -641,6 +641,8 @@ class DagBag(LoggingMixin):
         from airflow.security.permissions import DAG_ACTIONS, resource_name_for_dag
         from airflow.www.fab_security.sqla.models import Action, Permission, Resource
 
+        root_dag_id = dag.parent_dag.dag_id if dag.parent_dag else dag.dag_id
+
         def needs_perms(dag_id: str) -> bool:
             dag_resource_name = resource_name_for_dag(dag_id)
             for permission_name in DAG_ACTIONS:
@@ -655,9 +657,9 @@ class DagBag(LoggingMixin):
                     return True
             return False
 
-        if dag.access_control or needs_perms(dag.dag_id):
-            self.log.debug("Syncing DAG permissions: %s to the DB", dag.dag_id)
+        if dag.access_control or needs_perms(root_dag_id):
+            self.log.debug("Syncing DAG permissions: %s to the DB", root_dag_id)
             from airflow.www.security import ApplessAirflowSecurityManager
 
             security_manager = ApplessAirflowSecurityManager(session=session)
-            security_manager.sync_perm_for_dag(dag.dag_id, dag.access_control)
+            security_manager.sync_perm_for_dag(root_dag_id, dag.access_control)
diff --git a/airflow/security/permissions.py b/airflow/security/permissions.py
index 2d5c0b9399..2d02c773b4 100644
--- a/airflow/security/permissions.py
+++ b/airflow/security/permissions.py
@@ -66,14 +66,15 @@ DEPRECATED_ACTION_CAN_DAG_EDIT = "can_dag_edit"
 DAG_ACTIONS = {ACTION_CAN_READ, ACTION_CAN_EDIT, ACTION_CAN_DELETE}
 
 
-def resource_name_for_dag(dag_id):
-    """Returns the resource name for a DAG id."""
-    if dag_id == RESOURCE_DAG:
-        return dag_id
+def resource_name_for_dag(root_dag_id: str) -> str:
+    """Returns the resource name for a DAG id.
 
-    if dag_id.startswith(RESOURCE_DAG_PREFIX):
-        return dag_id
-
-    # To account for SubDags
-    root_dag_id = dag_id.split(".")[0]
+    Note that since a sub-DAG should follow the permission of its
+    parent DAG, you should pass ``DagModel.root_dag_id`` to this function,
+    for a subdag. A normal dag should pass the ``DagModel.dag_id``.
+    """
+    if root_dag_id == RESOURCE_DAG:
+        return root_dag_id
+    if root_dag_id.startswith(RESOURCE_DAG_PREFIX):
+        return root_dag_id
     return f"{RESOURCE_DAG_PREFIX}{root_dag_id}"
diff --git a/airflow/www/security.py b/airflow/www/security.py
index 42188f0618..de6b0d646e 100644
--- a/airflow/www/security.py
+++ b/airflow/www/security.py
@@ -200,6 +200,16 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin):
             view.datamodel = CustomSQLAInterface(view.datamodel.obj)
         self.perms = None
 
+    def _get_root_dag_id(self, dag_id):
+        if '.' in dag_id:
+            dm = (
+                self.get_session.query(DagModel.dag_id, DagModel.root_dag_id)
+                .filter(DagModel.dag_id == dag_id)
+                .first()
+            )
+            return dm.root_dag_id or dm.dag_id
+        return dag_id
+
     def init_role(self, role_name, perms):
         """
         Initialize the role with actions and related resources.
@@ -340,7 +350,8 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin):
     def can_access_some_dags(self, action: str, dag_id: Optional[str] = None) -> bool:
         """Checks if user has read or write access to some dags."""
         if dag_id and dag_id != '~':
-            return self.has_access(action, permissions.resource_name_for_dag(dag_id))
+            root_dag_id = self._get_root_dag_id(dag_id)
+            return self.has_access(action, permissions.resource_name_for_dag(root_dag_id))
 
         user = g.user
         if action == permissions.ACTION_CAN_READ:
@@ -349,17 +360,20 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin):
 
     def can_read_dag(self, dag_id, user=None) -> bool:
         """Determines whether a user has DAG read access."""
-        dag_resource_name = permissions.resource_name_for_dag(dag_id)
+        root_dag_id = self._get_root_dag_id(dag_id)
+        dag_resource_name = permissions.resource_name_for_dag(root_dag_id)
         return self.has_access(permissions.ACTION_CAN_READ, dag_resource_name, user=user)
 
     def can_edit_dag(self, dag_id, user=None) -> bool:
         """Determines whether a user has DAG edit access."""
-        dag_resource_name = permissions.resource_name_for_dag(dag_id)
+        root_dag_id = self._get_root_dag_id(dag_id)
+        dag_resource_name = permissions.resource_name_for_dag(root_dag_id)
         return self.has_access(permissions.ACTION_CAN_EDIT, dag_resource_name, user=user)
 
     def can_delete_dag(self, dag_id, user=None) -> bool:
         """Determines whether a user has DAG delete access."""
-        dag_resource_name = permissions.resource_name_for_dag(dag_id)
+        root_dag_id = self._get_root_dag_id(dag_id)
+        dag_resource_name = permissions.resource_name_for_dag(root_dag_id)
         return self.has_access(permissions.ACTION_CAN_DELETE, dag_resource_name, user=user)
 
     def prefixed_dag_id(self, dag_id):
@@ -370,7 +384,8 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin):
             DeprecationWarning,
             stacklevel=2,
         )
-        return permissions.resource_name_for_dag(dag_id)
+        root_dag_id = self._get_root_dag_id(dag_id)
+        return permissions.resource_name_for_dag(root_dag_id)
 
     def is_dag_resource(self, resource_name):
         """Determines if a resource belongs to a DAG or all DAGs."""
@@ -530,7 +545,8 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin):
         dags = dagbag.dags.values()
 
         for dag in dags:
-            dag_resource_name = permissions.resource_name_for_dag(dag.dag_id)
+            root_dag_id = dag.parent_dag.dag_id if dag.parent_dag else dag.dag_id
+            dag_resource_name = permissions.resource_name_for_dag(root_dag_id)
             for action_name in self.DAG_ACTIONS:
                 if (action_name, dag_resource_name) not in perms:
                     self._merge_perm(action_name, dag_resource_name)
@@ -615,6 +631,7 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin):
         :param access_control: a dict where each key is a rolename and
             each value is a set() of action names (e.g. {'can_read'})
         """
+
         dag_resource_name = permissions.resource_name_for_dag(dag_id)
 
         def _get_or_create_dag_permission(action_name: str) -> Optional[Permission]:
diff --git a/tests/www/test_security.py b/tests/www/test_security.py
index 8c90062600..7b8541ca81 100644
--- a/tests/www/test_security.py
+++ b/tests/www/test_security.py
@@ -192,7 +192,8 @@ def sample_dags(security_manager):
 @pytest.fixture(scope="module")
 def has_dag_perm(security_manager):
     def _has_dag_perm(perm, dag_id, user):
-        return security_manager.has_access(perm, permissions.resource_name_for_dag(dag_id), user)
+        root_dag_id = security_manager._get_root_dag_id(dag_id)
+        return security_manager.has_access(perm, permissions.resource_name_for_dag(root_dag_id), user)
 
     return _has_dag_perm
 
@@ -351,7 +352,7 @@ def test_verify_anon_user_with_admin_role_has_access_to_each_dag(
         user.roles = security_manager.get_user_roles(user)
         assert user.roles == {security_manager.get_public_role()}
 
-        test_dag_ids = ["test_dag_id_1", "test_dag_id_2", "test_dag_id_3"]
+        test_dag_ids = ["test_dag_id_1", "test_dag_id_2", "test_dag_id_3", "test_dag_id_4.with_dot"]
 
         for dag_id in test_dag_ids:
             with _create_dag_model_context(dag_id, session, security_manager):
@@ -588,7 +589,8 @@ def test_access_control_with_invalid_permission(app, security_manager):
         for action in invalid_actions:
             with pytest.raises(AirflowException) as ctx:
                 security_manager._sync_dag_view_permissions(
-                    'access_control_test', access_control={rolename: {action}}
+                    'access_control_test',
+                    access_control={rolename: {action}},
                 )
             assert "invalid permissions" in str(ctx.value)
 
@@ -728,11 +730,13 @@ def test_create_dag_specific_permissions(session, security_manager, monkeypatch,
         assert ('can_edit', dag_resource_name) in all_perms
 
     security_manager._sync_dag_view_permissions.assert_called_once_with(
-        permissions.resource_name_for_dag('has_access_control'), access_control
+        permissions.resource_name_for_dag('has_access_control'),
+        access_control,
     )
 
     del dagbag_mock.dags["has_access_control"]
-    with assert_queries_count(1):  # one query to get all perms; dagbag is mocked
+    with assert_queries_count(2):  # two query to get all perms; dagbag is mocked
+        # The extra query happens at permission check
         security_manager.create_dag_specific_permissions()
 
 
@@ -782,10 +786,12 @@ def test_prefixed_dag_id_is_deprecated(security_manager):
         security_manager.prefixed_dag_id("hello")
 
 
-def test_parent_dag_access_applies_to_subdag(app, security_manager, assert_user_has_dag_perms):
+def test_parent_dag_access_applies_to_subdag(app, security_manager, assert_user_has_dag_perms, session):
     username = 'dag_permission_user'
     role_name = 'dag_permission_role'
     parent_dag_name = "parent_dag"
+    subdag_name = parent_dag_name + ".subdag"
+    subsubdag_name = parent_dag_name + ".subdag.subsubdag"
     with app.app_context():
         mock_roles = [
             {
@@ -801,15 +807,57 @@ def test_parent_dag_access_applies_to_subdag(app, security_manager, assert_user_
             username=username,
             role_name=role_name,
         ) as user:
+            dag1 = DagModel(dag_id=parent_dag_name)
+            dag2 = DagModel(dag_id=subdag_name, is_subdag=True, root_dag_id=parent_dag_name)
+            dag3 = DagModel(dag_id=subsubdag_name, is_subdag=True, root_dag_id=parent_dag_name)
+            session.add_all([dag1, dag2, dag3])
+            session.commit()
             security_manager.bulk_sync_roles(mock_roles)
-            security_manager._sync_dag_view_permissions(
-                parent_dag_name, access_control={role_name: READ_WRITE}
-            )
+            for dag in [dag1, dag2, dag3]:
+                security_manager._sync_dag_view_permissions(
+                    parent_dag_name, access_control={role_name: READ_WRITE}
+                )
+
             assert_user_has_dag_perms(perms=READ_WRITE, dag_id=parent_dag_name, user=user)
             assert_user_has_dag_perms(perms=READ_WRITE, dag_id=parent_dag_name + ".subdag", user=user)
             assert_user_has_dag_perms(
                 perms=READ_WRITE, dag_id=parent_dag_name + ".subdag.subsubdag", user=user
             )
+            session.query(DagModel).delete()
+
+
+def test_permissions_work_for_dags_with_dot_in_dagname(
+    app, security_manager, assert_user_has_dag_perms, assert_user_does_not_have_dag_perms, session
+):
+    username = 'dag_permission_user'
+    role_name = 'dag_permission_role'
+    dag_id = "dag_id_1"
+    dag_id_2 = "dag_id_1.with_dot"
+    with app.app_context():
+        mock_roles = [
+            {
+                'role': role_name,
+                'perms': [
+                    (permissions.ACTION_CAN_READ, f"DAG:{dag_id}"),
+                    (permissions.ACTION_CAN_EDIT, f"DAG:{dag_id}"),
+                ],
+            }
+        ]
+        with create_user_scope(
+            app,
+            username=username,
+            role_name=role_name,
+        ) as user:
+            dag1 = DagModel(dag_id=dag_id)
+            dag2 = DagModel(dag_id=dag_id_2)
+            session.add_all([dag1, dag2])
+            session.commit()
+            security_manager.bulk_sync_roles(mock_roles)
+            security_manager.sync_perm_for_dag(dag1.dag_id, access_control={role_name: READ_WRITE})
+            security_manager.sync_perm_for_dag(dag2.dag_id, access_control={role_name: READ_WRITE})
+            assert_user_has_dag_perms(perms=READ_WRITE, dag_id=dag_id, user=user)
+            assert_user_does_not_have_dag_perms(perms=READ_WRITE, dag_id=dag_id_2, user=user)
+            session.query(DagModel).delete()
 
 
 def test_fab_models_use_airflow_base_meta():