You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ur...@apache.org on 2021/11/30 16:34:21 UTC
[airflow] branch main updated: Convert www.security test suite to pytest and remove residuals (#18961)
This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 33a4502 Convert www.security test suite to pytest and remove residuals (#18961)
33a4502 is described below
commit 33a4502f68be6b25fbccb96ed1832c5b527bb02a
Author: Khalid Mammadov <xm...@hotmail.com>
AuthorDate: Tue Nov 30 16:33:55 2021 +0000
Convert www.security test suite to pytest and remove residuals (#18961)
Co-authored-by: Tzu-ping Chung <ur...@gmail.com>
---
tests/test_utils/api_connexion_utils.py | 16 +-
tests/www/test_security.py | 1164 +++++++++++++++++--------------
tests/www/views/conftest.py | 65 +-
tests/www/views/test_views_base.py | 13 +-
4 files changed, 691 insertions(+), 567 deletions(-)
diff --git a/tests/test_utils/api_connexion_utils.py b/tests/test_utils/api_connexion_utils.py
index bee38c3..e8063f2 100644
--- a/tests/test_utils/api_connexion_utils.py
+++ b/tests/test_utils/api_connexion_utils.py
@@ -80,15 +80,23 @@ def create_role(app, name, permissions=None):
return role
+def set_user_single_role(app, username, role_name):
+ role = create_role(app, role_name)
+ user = app.appbuilder.sm.find_user(username)
+ if role not in user.roles:
+ user.roles = [role]
+ app.appbuilder.sm.update_user(user)
+
+
def delete_role(app, name):
- if app.appbuilder.sm.find_role(name):
- app.appbuilder.sm.delete_role(name)
+ if name not in EXISTING_ROLES:
+ if app.appbuilder.sm.find_role(name):
+ app.appbuilder.sm.delete_role(name)
def delete_roles(app):
for role in app.appbuilder.sm.get_all_roles():
- if role.name not in EXISTING_ROLES:
- app.appbuilder.sm.delete_role(role.name)
+ delete_role(app, role.name)
def delete_user(app, username):
diff --git a/tests/www/test_security.py b/tests/www/test_security.py
index 66cc26d..efa53dd 100644
--- a/tests/www/test_security.py
+++ b/tests/www/test_security.py
@@ -15,9 +15,8 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
+import contextlib
import logging
-import unittest
from unittest import mock
import pytest
@@ -25,15 +24,15 @@ from flask_appbuilder import SQLA, Model, expose, has_access
from flask_appbuilder.views import BaseView, ModelView
from sqlalchemy import Column, Date, Float, Integer, String
-from airflow import settings
from airflow.exceptions import AirflowException
from airflow.models import DagModel
from airflow.models.dag import DAG
from airflow.security import permissions
from airflow.www import app as application
from airflow.www.fab_security.sqla.models import assoc_permission_role
+from airflow.www.security import AirflowSecurityManager
from airflow.www.utils import CustomSQLAInterface
-from tests.test_utils import api_connexion_utils
+from tests.test_utils.api_connexion_utils import create_user_scope, delete_role, set_user_single_role
from tests.test_utils.asserts import assert_queries_count
from tests.test_utils.db import clear_db_dags, clear_db_runs
from tests.test_utils.mock_security_manager import MockSecurityManager
@@ -78,627 +77,738 @@ class SomeBaseView(BaseView):
return "action!"
-class TestSecurity(unittest.TestCase):
- @classmethod
- def setUpClass(cls):
- settings.configure_orm()
- cls.session = settings.Session
- cls.app = application.create_app(testing=True)
- cls.appbuilder = cls.app.appbuilder
- cls.app.config['WTF_CSRF_ENABLED'] = False
- cls.security_manager = cls.appbuilder.sm
- cls.delete_roles()
-
- def setUp(self):
- clear_db_runs()
- clear_db_dags()
- self.db = SQLA(self.app)
- self.appbuilder.add_view(SomeBaseView, "SomeBaseView", category="BaseViews")
- self.appbuilder.add_view(SomeModelView, "SomeModelView", category="ModelViews")
-
- log.debug("Complete setup!")
-
- @classmethod
- def delete_roles(cls):
- for role_name in [
- 'team-a',
- 'MyRole1',
- 'MyRole5',
- 'Test_Role',
- 'MyRole3',
- 'MyRole2',
- 'dag_permission_role',
- ]:
- api_connexion_utils.delete_role(cls.app, role_name)
-
- def expect_user_is_in_role(self, user, rolename):
- self.security_manager.bulk_sync_roles([{'role': rolename, 'perms': []}])
- role = self.security_manager.find_role(rolename)
- if not role:
- self.security_manager.add_role(rolename)
- role = self.security_manager.find_role(rolename)
- user.roles = [role]
- self.security_manager.update_user(user)
-
- def assert_user_has_dag_perms(self, perms, dag_id, user=None):
- for perm in perms:
- assert self._has_dag_perm(perm, dag_id, user), f"User should have '{perm}' on DAG '{dag_id}'"
+def _clear_db_dag_and_runs():
+ clear_db_runs()
+ clear_db_dags()
- def assert_user_does_not_have_dag_perms(self, dag_id, perms, user=None):
- for perm in perms:
- assert not self._has_dag_perm(
- perm, dag_id, user
- ), f"User should not have '{perm}' on DAG '{dag_id}'"
-
- def _has_dag_perm(self, perm, dag_id, user):
- return self.security_manager.has_access(perm, permissions.resource_name_for_dag(dag_id), user)
-
- def _create_dag(self, dag_id):
- dag_model = DagModel(dag_id=dag_id)
- self.session.add(dag_model)
- self.session.commit()
- self.security_manager.sync_perm_for_dag(dag_id, access_control=None)
-
- def tearDown(self):
- clear_db_runs()
- clear_db_dags()
- self.appbuilder = None
- self.app = None
- self.db = None
- log.debug("Complete teardown!")
-
- def test_init_role_baseview(self):
- role_name = 'MyRole7'
- role_perms = [('can_some_other_action', 'AnotherBaseView')]
- with pytest.warns(
- DeprecationWarning,
- match="`init_role` has been deprecated\\. Please use `bulk_sync_roles` instead\\.",
- ):
- self.security_manager.init_role(role_name, role_perms)
-
- role = self.appbuilder.sm.find_role(role_name)
- assert role is not None
- assert len(role_perms) == len(role.permissions)
-
- def test_bulk_sync_roles_baseview(self):
- role_name = 'MyRole3'
- role_perms = [('can_some_action', 'SomeBaseView')]
- self.security_manager.bulk_sync_roles([{'role': role_name, 'perms': role_perms}])
-
- role = self.appbuilder.sm.find_role(role_name)
- assert role is not None
- assert len(role_perms) == len(role.permissions)
-
- def test_bulk_sync_roles_modelview(self):
- role_name = 'MyRole2'
- role_perms = [
- ('can_list', 'SomeModelView'),
- ('can_show', 'SomeModelView'),
- ('can_add', 'SomeModelView'),
- (permissions.ACTION_CAN_EDIT, 'SomeModelView'),
- (permissions.ACTION_CAN_DELETE, 'SomeModelView'),
- ]
- mock_roles = [{'role': role_name, 'perms': role_perms}]
- self.security_manager.bulk_sync_roles(mock_roles)
- role = self.appbuilder.sm.find_role(role_name)
- assert role is not None
- assert len(role_perms) == len(role.permissions)
+def _delete_dag_permissions(dag_id, security_manager):
+ dag_resource_name = permissions.resource_name_for_dag(dag_id)
+ for dag_action_name in security_manager.DAG_ACTIONS:
+ security_manager.delete_permission(dag_action_name, dag_resource_name)
- # Check short circuit works
- with assert_queries_count(2): # One for Permission, one for roles
- self.security_manager.bulk_sync_roles(mock_roles)
- def test_update_and_verify_permission_role(self):
- role_name = 'Test_Role'
- role_perms = []
- mock_roles = [{'role': role_name, 'perms': role_perms}]
- self.security_manager.bulk_sync_roles(mock_roles)
- role = self.security_manager.find_role(role_name)
+def _create_dag_model(dag_id, session, security_manager):
+ dag_model = DagModel(dag_id=dag_id)
+ session.add(dag_model)
+ session.commit()
+ security_manager.sync_perm_for_dag(dag_id, access_control=None)
+ return dag_model
- perm = self.security_manager.get_permission(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_ROLE)
- self.security_manager.add_permission_to_role(role, perm)
- role_perms_len = len(role.permissions)
- self.security_manager.bulk_sync_roles(mock_roles)
- new_role_perms_len = len(role.permissions)
+def _delete_dag_model(dag_model, session, security_manager):
+ session.delete(dag_model)
+ session.commit()
+ _delete_dag_permissions(dag_model.dag_id, security_manager)
- assert role_perms_len == new_role_perms_len
- assert new_role_perms_len == 1
- def test_verify_public_role_has_no_permissions(self):
- public = self.appbuilder.sm.find_role("Public")
+@contextlib.contextmanager
+def _create_dag_model_context(dag_id, session, security_manager):
+ dag = _create_dag_model(dag_id, session, security_manager)
+ yield dag
+ _delete_dag_model(dag, session, security_manager)
- assert public.permissions == []
- def test_verify_default_anon_user_has_no_accessible_dag_ids(self):
- with self.app.app_context():
- user = mock.MagicMock()
- user.is_anonymous = True
- self.app.config['AUTH_ROLE_PUBLIC'] = 'Public'
- assert self.app.appbuilder.sm.get_user_roles(user) == [self.app.appbuilder.sm.get_public_role()]
+@pytest.fixture(scope="module", autouse=True)
+def clear_db_after_suite():
+ yield None
+ _clear_db_dag_and_runs()
- self._create_dag("test_dag_id")
- self.security_manager.sync_roles()
- assert self.security_manager.get_accessible_dag_ids(user) == set()
+@pytest.fixture(scope="function", autouse=True)
+def clear_db_before_test():
+ _clear_db_dag_and_runs()
- def test_verify_default_anon_user_has_no_access_to_specific_dag(self):
- with self.app.app_context():
- user = mock.MagicMock()
- user.is_anonymous = True
- self.app.config['AUTH_ROLE_PUBLIC'] = 'Public'
- assert self.app.appbuilder.sm.get_user_roles(user) == [self.app.appbuilder.sm.get_public_role()]
- dag_id = "test_dag_id"
- self._create_dag(dag_id)
- self.app.appbuilder.sm.sync_roles()
+@pytest.fixture(scope="module")
+def app():
+ _app = application.create_app(testing=True)
+ _app.config['WTF_CSRF_ENABLED'] = False
+ return _app
- assert self.app.appbuilder.sm.can_read_dag(dag_id, user) is False
- assert self.app.appbuilder.sm.can_edit_dag(dag_id, user) is False
- assert self._has_dag_perm(permissions.ACTION_CAN_READ, dag_id, user) is False
- assert self._has_dag_perm(permissions.ACTION_CAN_EDIT, dag_id, user) is False
- def test_verify_anon_user_with_admin_role_has_all_dag_access(self):
- with self.app.app_context():
- self.app.config['AUTH_ROLE_PUBLIC'] = 'Admin'
- user = mock.MagicMock()
- user.is_anonymous = True
+@pytest.fixture(scope="module")
+def app_builder(app):
+ app_builder = app.appbuilder
+ app_builder.add_view(SomeBaseView, "SomeBaseView", category="BaseViews")
+ app_builder.add_view(SomeModelView, "SomeModelView", category="ModelViews")
+ return app.appbuilder
- assert self.app.appbuilder.sm.get_user_roles(user) == [self.app.appbuilder.sm.get_public_role()]
- test_dag_ids = ["test_dag_id_1", "test_dag_id_2", "test_dag_id_3"]
- for dag_id in test_dag_ids:
- self._create_dag(dag_id)
- self.security_manager.sync_roles()
+@pytest.fixture(scope="module")
+def security_manager(app_builder):
+ return app_builder.sm
- assert self.security_manager.get_accessible_dag_ids(user) == set(test_dag_ids)
- def test_verify_anon_user_with_admin_role_has_access_to_each_dag(self):
- with self.app.app_context():
- user = mock.MagicMock()
- user.is_anonymous = True
- self.app.config['AUTH_ROLE_PUBLIC'] = 'Admin'
+@pytest.fixture(scope="module")
+def session(app_builder):
+ return app_builder.get_session
- # Call `.get_user_roles` bc `user` is a mock and the `user.roles` prop needs to be set.
- user.roles = self.app.appbuilder.sm.get_user_roles(user)
- assert user.roles == [self.app.appbuilder.sm.get_public_role()]
- test_dag_ids = ["test_dag_id_1", "test_dag_id_2", "test_dag_id_3"]
+@pytest.fixture(scope="module")
+def db(app):
+ return SQLA(app)
- for dag_id in test_dag_ids:
- self._create_dag(dag_id)
- self.security_manager.sync_roles()
- for dag_id in test_dag_ids:
- assert self.app.appbuilder.sm.can_read_dag(dag_id, user) is True
- assert self.app.appbuilder.sm.can_edit_dag(dag_id, user) is True
- assert self._has_dag_perm(permissions.ACTION_CAN_READ, dag_id, user) is True
- assert self._has_dag_perm(permissions.ACTION_CAN_EDIT, dag_id, user) is True
+@pytest.fixture(scope="function")
+def role(request, app, security_manager):
+ params = request.param
+ _role = None
+ params['mock_roles'] = [{'role': params['name'], 'perms': params['permissions']}]
+ if params.get("create", True):
+ security_manager.bulk_sync_roles(params['mock_roles'])
+ _role = security_manager.find_role(params['name'])
+ yield _role, params
+ delete_role(app, params['name'])
- def test_get_user_roles(self):
- user = mock.MagicMock()
- user.is_anonymous = False
- roles = self.appbuilder.sm.find_role('Admin')
- user.roles = roles
- assert self.security_manager.get_user_roles(user) == roles
-
- def test_get_user_roles_for_anonymous_user(self):
- viewer_role_perms = {
- (permissions.ACTION_CAN_READ, permissions.RESOURCE_AUDIT_LOG),
- (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
- (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_DEPENDENCIES),
- (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_CODE),
- (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
- (permissions.ACTION_CAN_READ, permissions.RESOURCE_IMPORT_ERROR),
- (permissions.ACTION_CAN_READ, permissions.RESOURCE_JOB),
- (permissions.ACTION_CAN_READ, permissions.RESOURCE_PLUGIN),
- (permissions.ACTION_CAN_READ, permissions.RESOURCE_SLA_MISS),
- (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
- (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_LOG),
- (permissions.ACTION_CAN_READ, permissions.RESOURCE_XCOM),
- (permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE),
- (permissions.ACTION_CAN_READ, permissions.RESOURCE_MY_PASSWORD),
- (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_MY_PASSWORD),
- (permissions.ACTION_CAN_READ, permissions.RESOURCE_MY_PROFILE),
- (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_MY_PROFILE),
- (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_BROWSE_MENU),
- (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_DAG_DEPENDENCIES),
- (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_DAG_RUN),
- (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_JOB),
- (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_AUDIT_LOG),
- (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_PLUGIN),
- (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_SLA_MISS),
- (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_TASK_INSTANCE),
- (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_DOCS_MENU),
- (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_DOCS),
+
+@pytest.fixture(scope="function")
+def mock_dag_models(request, session, security_manager):
+ dags_ids = request.param
+ dags = [_create_dag_model(dag_id, session, security_manager) for dag_id in dags_ids]
+
+ yield dags_ids
+
+ for dag in dags:
+ _delete_dag_model(dag, session, security_manager)
+
+
+@pytest.fixture(scope="function")
+def sample_dags(security_manager):
+ dags = [
+ DAG('has_access_control', access_control={'Public': {permissions.ACTION_CAN_READ}}),
+ DAG('no_access_control'),
+ ]
+
+ yield dags
+
+ for dag in dags:
+ _delete_dag_permissions(dag.dag_id, security_manager)
+
+
+@pytest.fixture(scope="module")
+def has_dag_perm(security_manager):
+ def _has_dag_perm(perm, dag_id, user):
+ return security_manager.has_access(perm, permissions.resource_name_for_dag(dag_id), user)
+
+ return _has_dag_perm
+
+
+@pytest.fixture(scope="module")
+def assert_user_has_dag_perms(has_dag_perm):
+ def _assert_user_has_dag_perms(perms, dag_id, user=None):
+ for perm in perms:
+ assert has_dag_perm(perm, dag_id, user), f"User should have '{perm}' on DAG '{dag_id}'"
+
+ return _assert_user_has_dag_perms
+
+
+@pytest.fixture(scope="module")
+def assert_user_does_not_have_dag_perms(has_dag_perm):
+ def _assert_user_does_not_have_dag_perms(dag_id, perms, user=None):
+ for perm in perms:
+ assert not has_dag_perm(perm, dag_id, user), f"User should not have '{perm}' on DAG '{dag_id}'"
+
+ return _assert_user_does_not_have_dag_perms
+
+
+@pytest.mark.parametrize(
+ "role",
+ [{"name": "MyRole7", "permissions": [('can_some_other_action', 'AnotherBaseView')], "create": False}],
+ indirect=True,
+)
+def test_init_role_baseview(app, security_manager, role):
+ _, params = role
+
+ with pytest.warns(
+ DeprecationWarning,
+ match="`init_role` has been deprecated\\. Please use `bulk_sync_roles` instead\\.",
+ ):
+ security_manager.init_role(params['name'], params['permissions'])
+
+ _role = security_manager.find_role(params['name'])
+ assert _role is not None
+ assert len(_role.permissions) == len(params['permissions'])
+
+
+@pytest.mark.parametrize(
+ "role",
+ [{"name": "MyRole3", "permissions": [('can_some_action', 'SomeBaseView')]}],
+ indirect=True,
+)
+def test_bulk_sync_roles_baseview(app, security_manager, role):
+ _role, params = role
+ assert _role is not None
+ assert len(_role.permissions) == len(params['permissions'])
+
+
+@pytest.mark.parametrize(
+ "role",
+ [
+ {
+ "name": "MyRole2",
+ "permissions": [
+ ('can_list', 'SomeModelView'),
+ ('can_show', 'SomeModelView'),
+ ('can_add', 'SomeModelView'),
+ (permissions.ACTION_CAN_EDIT, 'SomeModelView'),
+ (permissions.ACTION_CAN_DELETE, 'SomeModelView'),
+ ],
}
- self.app.config['AUTH_ROLE_PUBLIC'] = 'Viewer'
-
- with self.app.app_context():
- user = mock.MagicMock()
- user.is_anonymous = True
-
- perms_views = set()
- for role in self.security_manager.get_user_roles(user):
- perms_views.update({(perm.action.name, perm.resource.name) for perm in role.permissions})
- assert perms_views == viewer_role_perms
-
- @mock.patch('airflow.www.security.AirflowSecurityManager.get_user_roles')
- def test_get_current_user_permissions(self, mock_get_user_roles):
- role_name = 'MyRole5'
- role_perm = 'can_some_action'
- role_vm = 'SomeBaseView'
- username = 'get_current_user_permissions'
-
- with self.app.app_context():
- user = api_connexion_utils.create_user(
- self.app,
- username=username,
- role_name=role_name,
- permissions=[
- (role_perm, role_vm),
- ],
- )
+ ],
+ indirect=True,
+)
+def test_bulk_sync_roles_modelview(app, security_manager, role):
+ _role, params = role
+ assert role is not None
+ assert len(_role.permissions) == len(params['permissions'])
+
+ # Check short circuit works
+ with assert_queries_count(2): # One for Permission, one for roles
+ security_manager.bulk_sync_roles(params['mock_roles'])
+
+
+@pytest.mark.parametrize(
+ "role",
+ [{"name": "Test_Role", "permissions": []}],
+ indirect=True,
+)
+def test_update_and_verify_permission_role(app, security_manager, role):
+ _role, params = role
+ perm = security_manager.get_permission(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_ROLE)
+ security_manager.add_permission_to_role(_role, perm)
+ role_perms_len = len(_role.permissions)
+
+ security_manager.bulk_sync_roles(params['mock_roles'])
+ new_role_perms_len = len(_role.permissions)
+
+ assert role_perms_len == new_role_perms_len
+ assert new_role_perms_len == 1
+
+
+def test_verify_public_role_has_no_permissions(security_manager):
+ public = security_manager.find_role("Public")
+ assert public.permissions == []
+
+
+def test_verify_default_anon_user_has_no_accessible_dag_ids(app, session, security_manager):
+ with app.app_context():
+ user = mock.MagicMock()
+ user.is_anonymous = True
+ app.config['AUTH_ROLE_PUBLIC'] = 'Public'
+ assert security_manager.get_user_roles(user) == [security_manager.get_public_role()]
+
+ with _create_dag_model_context("test_dag_id", session, security_manager):
+ security_manager.sync_roles()
+
+ assert security_manager.get_accessible_dag_ids(user) == set()
+
+
+def test_verify_default_anon_user_has_no_access_to_specific_dag(app, session, security_manager, has_dag_perm):
+ with app.app_context():
+ user = mock.MagicMock()
+ user.is_anonymous = True
+ app.config['AUTH_ROLE_PUBLIC'] = 'Public'
+ assert security_manager.get_user_roles(user) == [security_manager.get_public_role()]
+
+ dag_id = "test_dag_id"
+ with _create_dag_model_context(dag_id, session, security_manager):
+ security_manager.sync_roles()
+
+ assert security_manager.can_read_dag(dag_id, user) is False
+ assert security_manager.can_edit_dag(dag_id, user) is False
+ assert has_dag_perm(permissions.ACTION_CAN_READ, dag_id, user) is False
+ assert has_dag_perm(permissions.ACTION_CAN_EDIT, dag_id, user) is False
+
+
+@pytest.mark.parametrize(
+ "mock_dag_models",
+ [["test_dag_id_1", "test_dag_id_2", "test_dag_id_3"]],
+ indirect=True,
+)
+def test_verify_anon_user_with_admin_role_has_all_dag_access(app, session, security_manager, mock_dag_models):
+ test_dag_ids = mock_dag_models
+ with app.app_context():
+ app.config['AUTH_ROLE_PUBLIC'] = 'Admin'
+ user = mock.MagicMock()
+ user.is_anonymous = True
+
+ assert security_manager.get_user_roles(user) == [security_manager.get_public_role()]
+
+ security_manager.sync_roles()
+
+ assert security_manager.get_accessible_dag_ids(user) == set(test_dag_ids)
+
+
+def test_verify_anon_user_with_admin_role_has_access_to_each_dag(
+ app, session, security_manager, has_dag_perm
+):
+ with app.app_context():
+ user = mock.MagicMock()
+ user.is_anonymous = True
+ app.config['AUTH_ROLE_PUBLIC'] = 'Admin'
+
+ # Call `.get_user_roles` bc `user` is a mock and the `user.roles` prop needs to be set.
+ user.roles = security_manager.get_user_roles(user)
+ assert user.roles == [security_manager.get_public_role()]
+
+ test_dag_ids = ["test_dag_id_1", "test_dag_id_2", "test_dag_id_3"]
+
+ for dag_id in test_dag_ids:
+ with _create_dag_model_context(dag_id, session, security_manager):
+ security_manager.sync_roles()
+
+ assert security_manager.can_read_dag(dag_id, user) is True
+ assert security_manager.can_edit_dag(dag_id, user) is True
+ assert has_dag_perm(permissions.ACTION_CAN_READ, dag_id, user) is True
+ assert has_dag_perm(permissions.ACTION_CAN_EDIT, dag_id, user) is True
+
+
+def test_get_user_roles(app_builder, security_manager):
+ user = mock.MagicMock()
+ user.is_anonymous = False
+ roles = app_builder.sm.find_role('Admin')
+ user.roles = roles
+ assert security_manager.get_user_roles(user) == roles
+
+
+def test_get_user_roles_for_anonymous_user(app, security_manager):
+ viewer_role_perms = {
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_AUDIT_LOG),
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_DEPENDENCIES),
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_CODE),
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_IMPORT_ERROR),
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_JOB),
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_PLUGIN),
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_SLA_MISS),
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_LOG),
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_XCOM),
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE),
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_MY_PASSWORD),
+ (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_MY_PASSWORD),
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_MY_PROFILE),
+ (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_MY_PROFILE),
+ (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_BROWSE_MENU),
+ (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_DAG_DEPENDENCIES),
+ (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_DAG_RUN),
+ (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_JOB),
+ (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_AUDIT_LOG),
+ (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_PLUGIN),
+ (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_SLA_MISS),
+ (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_TASK_INSTANCE),
+ (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_DOCS_MENU),
+ (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_DOCS),
+ }
+ app.config['AUTH_ROLE_PUBLIC'] = 'Viewer'
+
+ with app.app_context():
+ user = mock.MagicMock()
+ user.is_anonymous = True
+
+ perms_views = set()
+ for role in security_manager.get_user_roles(user):
+ perms_views.update({(perm.action.name, perm.resource.name) for perm in role.permissions})
+ assert perms_views == viewer_role_perms
+
+
+def test_get_current_user_permissions(app, security_manager, monkeypatch):
+ role_perm = 'can_some_action'
+ role_vm = 'SomeBaseView'
+
+ from airflow.www.security import AirflowSecurityManager
+
+ with app.app_context():
+ with create_user_scope(
+ app,
+ username='get_current_user_permissions',
+ role_name='MyRole5',
+ permissions=[
+ (role_perm, role_vm),
+ ],
+ ) as user:
role = user.roles[0]
- mock_get_user_roles.return_value = [role]
+ monkeypatch.setattr(AirflowSecurityManager, "get_user_roles", lambda x: [role])
- assert self.security_manager.get_current_user_permissions() == {(role_perm, role_vm)}
+ assert security_manager.get_current_user_permissions() == {(role_perm, role_vm)}
- mock_get_user_roles.return_value = []
- assert len(self.security_manager.get_current_user_permissions()) == 0
+ monkeypatch.setattr(AirflowSecurityManager, "get_user_roles", lambda x: [])
+ assert len(security_manager.get_current_user_permissions()) == 0
- @mock.patch('airflow.www.security.AirflowSecurityManager.get_user_roles')
- def test_current_user_has_permissions(self, mock_get_user_roles):
- with self.app.app_context():
- user = api_connexion_utils.create_user(
- self.app,
- username="current_user_has_permissions",
- role_name="current_user_has_permissions",
- permissions=[("can_some_action", "SomeBaseView")],
- )
+
+def test_current_user_has_permissions(app, security_manager, monkeypatch):
+ with app.app_context():
+ with create_user_scope(
+ app,
+ username="current_user_has_permissions",
+ role_name="current_user_has_permissions",
+ permissions=[("can_some_action", "SomeBaseView")],
+ ) as user:
role = user.roles[0]
- mock_get_user_roles.return_value = [role]
- assert self.security_manager.current_user_has_permissions()
+ monkeypatch.setattr(AirflowSecurityManager, "get_user_roles", lambda x: [role])
+ assert security_manager.current_user_has_permissions()
# Role, but no permissions
role.permissions = []
- assert not self.security_manager.current_user_has_permissions()
+ assert not security_manager.current_user_has_permissions()
# No role
- mock_get_user_roles.return_value = []
- assert not self.security_manager.current_user_has_permissions()
+ monkeypatch.setattr(AirflowSecurityManager, "get_user_roles", lambda x: [])
+ assert not security_manager.current_user_has_permissions()
- def test_get_accessible_dag_ids(self):
- role_name = 'MyRole1'
- permission_action = [permissions.ACTION_CAN_READ]
- dag_id = 'dag_id'
- username = "ElUser"
- user = api_connexion_utils.create_user(
- self.app,
- username=username,
- role_name=role_name,
- permissions=[
- (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
- (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
- ],
- )
+def test_get_accessible_dag_ids(app, security_manager, session):
+ role_name = 'MyRole1'
+ permission_action = [permissions.ACTION_CAN_READ]
+ dag_id = 'dag_id'
+ username = "ElUser"
+
+ with create_user_scope(
+ app,
+ username=username,
+ role_name=role_name,
+ permissions=[
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+ ],
+ ) as user:
dag_model = DagModel(dag_id=dag_id, fileloc="/tmp/dag_.py", schedule_interval="2 2 * * *")
- self.session.add(dag_model)
- self.session.commit()
+ session.add(dag_model)
+ session.commit()
- self.security_manager.sync_perm_for_dag( # type: ignore
+ security_manager.sync_perm_for_dag( # type: ignore
dag_id, access_control={role_name: permission_action}
)
- assert self.security_manager.get_accessible_dag_ids(user) == {'dag_id'}
+ assert security_manager.get_accessible_dag_ids(user) == {'dag_id'}
- def test_dont_get_inaccessible_dag_ids_for_dag_resource_permission(self):
- # In this test case,
- # get_readable_dag_ids() don't return DAGs to which the user has CAN_EDIT action
- username = "Monsieur User"
- role_name = "MyRole1"
- permission_action = [permissions.ACTION_CAN_EDIT]
- dag_id = "dag_id"
- user = api_connexion_utils.create_user(
- self.app,
- username=username,
- role_name=role_name,
- permissions=[
- (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
- ],
- )
+def test_dont_get_inaccessible_dag_ids_for_dag_resource_permission(app, security_manager, session):
+ # In this test case,
+ # get_readable_dag_ids() don't return DAGs to which the user has CAN_EDIT action
+ username = "Monsieur User"
+ role_name = "MyRole1"
+ permission_action = [permissions.ACTION_CAN_EDIT]
+ dag_id = "dag_id"
+
+ with create_user_scope(
+ app,
+ username=username,
+ role_name=role_name,
+ permissions=[
+ (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
+ ],
+ ) as user:
dag_model = DagModel(dag_id=dag_id, fileloc="/tmp/dag_.py", schedule_interval="2 2 * * *")
- self.session.add(dag_model)
- self.session.commit()
+ session.add(dag_model)
+ session.commit()
- self.security_manager.sync_perm_for_dag( # type: ignore
+ security_manager.sync_perm_for_dag( # type: ignore
dag_id, access_control={role_name: permission_action}
)
- assert self.security_manager.get_readable_dag_ids(user) == set()
+ assert security_manager.get_readable_dag_ids(user) == set()
- @mock.patch('airflow.www.security.AirflowSecurityManager._has_resource_access')
- def test_has_access(self, mock_has_resource_access):
- user = mock.MagicMock()
- user.is_anonymous = False
- mock_has_resource_access.return_value = True
- assert self.security_manager.has_access('perm', 'view', user)
-
- def test_sync_perm_for_dag_creates_permissions_on_resources(self):
- test_dag_id = 'TEST_DAG'
- prefixed_test_dag_id = f'DAG:{test_dag_id}'
- self.security_manager.sync_perm_for_dag(test_dag_id, access_control=None)
- assert (
- self.security_manager.get_permission(permissions.ACTION_CAN_READ, prefixed_test_dag_id)
- is not None
- )
- assert (
- self.security_manager.get_permission(permissions.ACTION_CAN_EDIT, prefixed_test_dag_id)
- is not None
+
+def test_has_access(security_manager, monkeypatch):
+ user = mock.MagicMock()
+ user.is_anonymous = False
+ from airflow.www.security import AirflowSecurityManager
+
+ monkeypatch.setattr(AirflowSecurityManager, "_has_resource_access", lambda a, b, c, d: True)
+ assert security_manager.has_access('perm', 'view', user)
+
+
+def test_sync_perm_for_dag_creates_permissions_on_resources(security_manager):
+ test_dag_id = 'TEST_DAG'
+ prefixed_test_dag_id = f'DAG:{test_dag_id}'
+ security_manager.sync_perm_for_dag(test_dag_id, access_control=None)
+ assert security_manager.get_permission(permissions.ACTION_CAN_READ, prefixed_test_dag_id) is not None
+ assert security_manager.get_permission(permissions.ACTION_CAN_EDIT, prefixed_test_dag_id) is not None
+
+
+def test_has_all_dag_access(security_manager, monkeypatch):
+ from airflow.www.security import AirflowSecurityManager
+
+ monkeypatch.setattr(AirflowSecurityManager, "_has_role", lambda a, b: True)
+ assert security_manager.has_all_dags_access()
+
+ monkeypatch.setattr(AirflowSecurityManager, "_has_role", lambda a, b: False)
+ monkeypatch.setattr(AirflowSecurityManager, "_has_perm", lambda a, b, c: False)
+ assert not security_manager.has_all_dags_access()
+
+ monkeypatch.setattr(AirflowSecurityManager, "_has_perm", lambda a, b, c: True)
+ assert security_manager.has_all_dags_access()
+
+
+def test_access_control_with_non_existent_role(security_manager):
+ with pytest.raises(AirflowException) as ctx:
+ security_manager._sync_dag_view_permissions(
+ dag_id='access-control-test',
+ access_control={
+ 'this-role-does-not-exist': [permissions.ACTION_CAN_EDIT, permissions.ACTION_CAN_READ]
+ },
)
+ assert "role does not exist" in str(ctx.value)
- @mock.patch('airflow.www.security.AirflowSecurityManager._has_perm')
- @mock.patch('airflow.www.security.AirflowSecurityManager._has_role')
- def test_has_all_dag_access(self, mock_has_role, mock_has_perm):
- mock_has_role.return_value = True
- assert self.security_manager.has_all_dags_access()
-
- mock_has_role.return_value = False
- mock_has_perm.return_value = False
- assert not self.security_manager.has_all_dags_access()
-
- mock_has_perm.return_value = True
- assert self.security_manager.has_all_dags_access()
-
- def test_access_control_with_non_existent_role(self):
- with pytest.raises(AirflowException) as ctx:
- self.security_manager._sync_dag_view_permissions(
- dag_id='access-control-test',
- access_control={
- 'this-role-does-not-exist': [permissions.ACTION_CAN_EDIT, permissions.ACTION_CAN_READ]
- },
- )
- assert "role does not exist" in str(ctx.value)
-
- def test_all_dag_access_doesnt_give_non_dag_access(self):
- username = 'dag_access_user'
- role_name = 'dag_access_role'
- with self.app.app_context():
- user = api_connexion_utils.create_user(
- self.app,
- username=username,
- role_name=role_name,
- permissions=[
- (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
- (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
- ],
- )
- assert self.security_manager.has_access(
- permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG, user
- )
- assert not self.security_manager.has_access(
+
+def test_all_dag_access_doesnt_give_non_dag_access(app, security_manager):
+ username = 'dag_access_user'
+ role_name = 'dag_access_role'
+ with app.app_context():
+ with create_user_scope(
+ app,
+ username=username,
+ role_name=role_name,
+ permissions=[
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+ ],
+ ) as user:
+ assert security_manager.has_access(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG, user)
+ assert not security_manager.has_access(
permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE, user
)
- def test_access_control_with_invalid_permission(self):
- invalid_actions = [
- 'can_varimport', # a real action, but not a member of DAG_ACTIONS
- 'can_eat_pudding', # clearly not a real action
- ]
- username = "LaUser"
- user = api_connexion_utils.create_user(
- self.app,
- username=username,
- role_name='team-a',
- )
+
+def test_access_control_with_invalid_permission(app, security_manager):
+ invalid_actions = [
+ 'can_varimport', # a real action, but not a member of DAG_ACTIONS
+ 'can_eat_pudding', # clearly not a real action
+ ]
+ username = "LaUser"
+ rolename = "team-a"
+ with create_user_scope(
+ app,
+ username=username,
+ role_name=rolename,
+ ):
for action in invalid_actions:
- self.expect_user_is_in_role(user, rolename='team-a')
with pytest.raises(AirflowException) as ctx:
- self.security_manager._sync_dag_view_permissions(
- 'access_control_test', access_control={'team-a': {action}}
+ security_manager._sync_dag_view_permissions(
+ 'access_control_test', access_control={rolename: {action}}
)
assert "invalid permissions" in str(ctx.value)
- def test_access_control_is_set_on_init(self):
- username = 'access_control_is_set_on_init'
- role_name = 'team-a'
- with self.app.app_context():
- user = api_connexion_utils.create_user(
- self.app,
- username=username,
- role_name=role_name,
- permissions=[],
- )
- self.expect_user_is_in_role(user, rolename='team-a')
- self.security_manager._sync_dag_view_permissions(
+
+def test_access_control_is_set_on_init(
+ app,
+ security_manager,
+ assert_user_has_dag_perms,
+ assert_user_does_not_have_dag_perms,
+):
+ username = 'access_control_is_set_on_init'
+ role_name = 'team-a'
+ negated_role = 'NOT-team-a'
+ with app.app_context():
+ with create_user_scope(
+ app,
+ username=username,
+ role_name=role_name,
+ permissions=[],
+ ) as user:
+ security_manager._sync_dag_view_permissions(
'access_control_test',
- access_control={'team-a': [permissions.ACTION_CAN_EDIT, permissions.ACTION_CAN_READ]},
+ access_control={role_name: [permissions.ACTION_CAN_EDIT, permissions.ACTION_CAN_READ]},
)
- self.assert_user_has_dag_perms(
+ assert_user_has_dag_perms(
perms=[permissions.ACTION_CAN_EDIT, permissions.ACTION_CAN_READ],
dag_id='access_control_test',
user=user,
)
- self.expect_user_is_in_role(user, rolename='NOT-team-a')
- self.assert_user_does_not_have_dag_perms(
+ security_manager.bulk_sync_roles([{'role': negated_role, 'perms': []}])
+ set_user_single_role(app, username, role_name=negated_role)
+ assert_user_does_not_have_dag_perms(
perms=[permissions.ACTION_CAN_EDIT, permissions.ACTION_CAN_READ],
dag_id='access_control_test',
user=user,
)
- def test_access_control_stale_perms_are_revoked(self):
- username = 'access_control_stale_perms_are_revoked'
- role_name = 'team-a'
- with self.app.app_context():
- user = api_connexion_utils.create_user(
- self.app,
- username=username,
- role_name=role_name,
- permissions=[],
- )
- self.expect_user_is_in_role(user, rolename='team-a')
- self.security_manager._sync_dag_view_permissions(
+
+def test_access_control_stale_perms_are_revoked(
+ app,
+ security_manager,
+ assert_user_has_dag_perms,
+ assert_user_does_not_have_dag_perms,
+):
+ username = 'access_control_stale_perms_are_revoked'
+ role_name = 'team-a'
+ with app.app_context():
+ with create_user_scope(
+ app,
+ username=username,
+ role_name=role_name,
+ permissions=[],
+ ) as user:
+ set_user_single_role(app, username, role_name='team-a')
+ security_manager._sync_dag_view_permissions(
'access_control_test', access_control={'team-a': READ_WRITE}
)
- self.assert_user_has_dag_perms(perms=READ_WRITE, dag_id='access_control_test', user=user)
+ assert_user_has_dag_perms(perms=READ_WRITE, dag_id='access_control_test', user=user)
- self.security_manager._sync_dag_view_permissions(
+ security_manager._sync_dag_view_permissions(
'access_control_test', access_control={'team-a': READ_ONLY}
)
- self.assert_user_has_dag_perms(
+ assert_user_has_dag_perms(
perms=[permissions.ACTION_CAN_READ], dag_id='access_control_test', user=user
)
- self.assert_user_does_not_have_dag_perms(
+ assert_user_does_not_have_dag_perms(
perms=[permissions.ACTION_CAN_EDIT], dag_id='access_control_test', user=user
)
- def test_no_additional_dag_permission_views_created(self):
- ab_perm_role = assoc_permission_role
-
- self.security_manager.sync_roles()
- num_pv_before = self.db.session().query(ab_perm_role).count()
- self.security_manager.sync_roles()
- num_pv_after = self.db.session().query(ab_perm_role).count()
- assert num_pv_before == num_pv_after
-
- def test_override_role_vm(self):
- test_security_manager = MockSecurityManager(appbuilder=self.appbuilder)
- assert len(test_security_manager.VIEWER_VMS) == 1
- assert test_security_manager.VIEWER_VMS == {'Airflow'}
-
- def test_correct_roles_have_perms_to_read_config(self):
- roles_to_check = self.security_manager.get_all_roles()
- assert len(roles_to_check) >= 5
- for role in roles_to_check:
- if role.name in ["Admin", "Op"]:
- assert self.security_manager.permission_exists_in_one_or_more_roles(
- permissions.RESOURCE_CONFIG, permissions.ACTION_CAN_READ, [role.id]
- )
- else:
- assert not self.security_manager.permission_exists_in_one_or_more_roles(
- permissions.RESOURCE_CONFIG, permissions.ACTION_CAN_READ, [role.id]
- ), (
- f"{role.name} should not have {permissions.ACTION_CAN_READ} "
- f"on {permissions.RESOURCE_CONFIG}"
- )
- @mock.patch("airflow.www.security.DagBag")
- def test_create_dag_specific_permissions(self, dagbag_mock):
- access_control = {'Public': {permissions.ACTION_CAN_READ}}
- dags = [
- DAG('has_access_control', access_control=access_control),
- DAG('no_access_control'),
- ]
+def test_no_additional_dag_permission_views_created(db, security_manager):
+ ab_perm_role = assoc_permission_role
- collect_dags_from_db_mock = mock.Mock()
- dagbag = mock.Mock()
+ security_manager.sync_roles()
+ num_pv_before = db.session().query(ab_perm_role).count()
+ security_manager.sync_roles()
+ num_pv_after = db.session().query(ab_perm_role).count()
+ assert num_pv_before == num_pv_after
- dagbag.dags = {dag.dag_id: dag for dag in dags}
- dagbag.collect_dags_from_db = collect_dags_from_db_mock
- dagbag_mock.return_value = dagbag
- self.security_manager._sync_dag_view_permissions = mock.Mock()
+def test_override_role_vm(app_builder):
+ test_security_manager = MockSecurityManager(appbuilder=app_builder)
+ assert len(test_security_manager.VIEWER_VMS) == 1
+ assert test_security_manager.VIEWER_VMS == {'Airflow'}
- for dag in dags:
- dag_resource_name = permissions.resource_name_for_dag(dag.dag_id)
- all_perms = self.security_manager.get_all_permissions()
- assert ('can_read', dag_resource_name) not in all_perms
- assert ('can_edit', dag_resource_name) not in all_perms
- self.security_manager.create_dag_specific_permissions()
+def test_correct_roles_have_perms_to_read_config(security_manager):
+ roles_to_check = security_manager.get_all_roles()
+ assert len(roles_to_check) >= 5
+ for role in roles_to_check:
+ if role.name in ["Admin", "Op"]:
+ assert security_manager.permission_exists_in_one_or_more_roles(
+ permissions.RESOURCE_CONFIG, permissions.ACTION_CAN_READ, [role.id]
+ )
+ else:
+ assert not security_manager.permission_exists_in_one_or_more_roles(
+ permissions.RESOURCE_CONFIG, permissions.ACTION_CAN_READ, [role.id]
+ ), (
+ f"{role.name} should not have {permissions.ACTION_CAN_READ} "
+ f"on {permissions.RESOURCE_CONFIG}"
+ )
- dagbag_mock.assert_called_once_with(read_dags_from_db=True)
- collect_dags_from_db_mock.assert_called_once_with()
- for dag in dags:
- dag_resource_name = permissions.resource_name_for_dag(dag.dag_id)
- all_perms = self.security_manager.get_all_permissions()
- assert ('can_read', dag_resource_name) in all_perms
- assert ('can_edit', dag_resource_name) in all_perms
+def test_create_dag_specific_permissions(session, security_manager, monkeypatch, sample_dags):
- self.security_manager._sync_dag_view_permissions.assert_called_once_with(
- permissions.resource_name_for_dag('has_access_control'), access_control
- )
+ access_control = {'Public': {permissions.ACTION_CAN_READ}}
- del dagbag.dags["has_access_control"]
- with assert_queries_count(1): # one query to get all perms; dagbag is mocked
- self.security_manager.create_dag_specific_permissions()
+ collect_dags_from_db_mock = mock.Mock()
+ dagbag_mock = mock.Mock()
+ dagbag_mock.dags = {dag.dag_id: dag for dag in sample_dags}
+ dagbag_mock.collect_dags_from_db = collect_dags_from_db_mock
+ dagbag_class_mock = mock.Mock()
+ dagbag_class_mock.return_value = dagbag_mock
+ import airflow.www.security
- def test_get_all_permissions(self):
- with assert_queries_count(1):
- perms = self.security_manager.get_all_permissions()
+ monkeypatch.setitem(airflow.www.security.__dict__, "DagBag", dagbag_class_mock)
+ security_manager._sync_dag_view_permissions = mock.Mock()
- assert isinstance(perms, set)
- for perm in perms:
- assert isinstance(perm, tuple)
- assert len(perm) == 2
-
- assert ('can_read', 'Connections') in perms
-
- def test_get_all_non_dag_permissions(self):
- with assert_queries_count(1):
- pvs = self.security_manager._get_all_non_dag_permissions()
-
- assert isinstance(pvs, dict)
- for (perm_name, viewmodel_name), perm in pvs.items():
- assert isinstance(perm_name, str)
- assert isinstance(viewmodel_name, str)
- assert isinstance(perm, self.security_manager.permission_model)
-
- assert ('can_read', 'Connections') in pvs
-
- def test_get_all_roles_with_permissions(self):
- with assert_queries_count(1):
- roles = self.security_manager._get_all_roles_with_permissions()
-
- assert isinstance(roles, dict)
- for role_name, role in roles.items():
- assert isinstance(role_name, str)
- assert isinstance(role, self.security_manager.role_model)
-
- assert 'Admin' in roles
-
- def test_prefixed_dag_id_is_deprecated(self):
- with pytest.warns(
- DeprecationWarning,
- match=(
- "`prefixed_dag_id` has been deprecated. "
- "Please use `airflow.security.permissions.resource_name_for_dag` instead."
- ),
- ):
- self.security_manager.prefixed_dag_id("hello")
-
- def test_parent_dag_access_applies_to_subdag(self):
- username = 'dag_permission_user'
- role_name = 'dag_permission_role'
- parent_dag_name = "parent_dag"
- with self.app.app_context():
- mock_roles = [
- {
- 'role': role_name,
- 'perms': [
- (permissions.ACTION_CAN_READ, f"DAG:{parent_dag_name}"),
- (permissions.ACTION_CAN_EDIT, f"DAG:{parent_dag_name}"),
- ],
- }
- ]
- user = api_connexion_utils.create_user(
- self.app,
- username=username,
- role_name=role_name,
- )
- self.security_manager.bulk_sync_roles(mock_roles)
- self.security_manager._sync_dag_view_permissions(
+ for dag in sample_dags:
+ dag_resource_name = permissions.resource_name_for_dag(dag.dag_id)
+ all_perms = security_manager.get_all_permissions()
+ assert ('can_read', dag_resource_name) not in all_perms
+ assert ('can_edit', dag_resource_name) not in all_perms
+
+ security_manager.create_dag_specific_permissions()
+
+ dagbag_class_mock.assert_called_once_with(read_dags_from_db=True)
+ collect_dags_from_db_mock.assert_called_once_with()
+
+ for dag in sample_dags:
+ dag_resource_name = permissions.resource_name_for_dag(dag.dag_id)
+ all_perms = security_manager.get_all_permissions()
+ assert ('can_read', dag_resource_name) in all_perms
+ assert ('can_edit', dag_resource_name) in all_perms
+
+ security_manager._sync_dag_view_permissions.assert_called_once_with(
+ permissions.resource_name_for_dag('has_access_control'), access_control
+ )
+
+ del dagbag_mock.dags["has_access_control"]
+ with assert_queries_count(1): # one query to get all perms; dagbag is mocked
+ security_manager.create_dag_specific_permissions()
+
+
+def test_get_all_permissions(security_manager):
+ with assert_queries_count(1):
+ perms = security_manager.get_all_permissions()
+
+ assert isinstance(perms, set)
+ for perm in perms:
+ assert isinstance(perm, tuple)
+ assert len(perm) == 2
+
+ assert ('can_read', 'Connections') in perms
+
+
+def test_get_all_non_dag_permissions(security_manager):
+ with assert_queries_count(1):
+ pvs = security_manager._get_all_non_dag_permissions()
+
+ assert isinstance(pvs, dict)
+ for (perm_name, viewmodel_name), perm in pvs.items():
+ assert isinstance(perm_name, str)
+ assert isinstance(viewmodel_name, str)
+ assert isinstance(perm, security_manager.permission_model)
+
+ assert ('can_read', 'Connections') in pvs
+
+
+def test_get_all_roles_with_permissions(security_manager):
+ with assert_queries_count(1):
+ roles = security_manager._get_all_roles_with_permissions()
+
+ assert isinstance(roles, dict)
+ for role_name, role in roles.items():
+ assert isinstance(role_name, str)
+ assert isinstance(role, security_manager.role_model)
+
+ assert 'Admin' in roles
+
+
+def test_prefixed_dag_id_is_deprecated(security_manager):
+ with pytest.warns(
+ DeprecationWarning,
+ match=(
+ "`prefixed_dag_id` has been deprecated. "
+ "Please use `airflow.security.permissions.resource_name_for_dag` instead."
+ ),
+ ):
+ security_manager.prefixed_dag_id("hello")
+
+
+def test_parent_dag_access_applies_to_subdag(app, security_manager, assert_user_has_dag_perms):
+ username = 'dag_permission_user'
+ role_name = 'dag_permission_role'
+ parent_dag_name = "parent_dag"
+ with app.app_context():
+ mock_roles = [
+ {
+ 'role': role_name,
+ 'perms': [
+ (permissions.ACTION_CAN_READ, f"DAG:{parent_dag_name}"),
+ (permissions.ACTION_CAN_EDIT, f"DAG:{parent_dag_name}"),
+ ],
+ }
+ ]
+ with create_user_scope(
+ app,
+ username=username,
+ role_name=role_name,
+ ) as user:
+ security_manager.bulk_sync_roles(mock_roles)
+ security_manager._sync_dag_view_permissions(
parent_dag_name, access_control={role_name: READ_WRITE}
)
- self.assert_user_has_dag_perms(perms=READ_WRITE, dag_id=parent_dag_name, user=user)
- self.assert_user_has_dag_perms(perms=READ_WRITE, dag_id=parent_dag_name + ".subdag", user=user)
- self.assert_user_has_dag_perms(
+ assert_user_has_dag_perms(perms=READ_WRITE, dag_id=parent_dag_name, user=user)
+ assert_user_has_dag_perms(perms=READ_WRITE, dag_id=parent_dag_name + ".subdag", user=user)
+ assert_user_has_dag_perms(
perms=READ_WRITE, dag_id=parent_dag_name + ".subdag.subsubdag", user=user
)
diff --git a/tests/www/views/conftest.py b/tests/www/views/conftest.py
index 049742b..fab4ef4 100644
--- a/tests/www/views/conftest.py
+++ b/tests/www/views/conftest.py
@@ -25,7 +25,7 @@ import pytest
from airflow import settings
from airflow.models import DagBag
from airflow.www.app import create_app
-from tests.test_utils.api_connexion_utils import delete_roles
+from tests.test_utils.api_connexion_utils import delete_user
from tests.test_utils.decorators import dont_initialize_flask_app_submodules
from tests.test_utils.www import client_with_login
@@ -66,42 +66,47 @@ def app(examples_dag_bag):
app.jinja_env.undefined = jinja2.StrictUndefined
security_manager = app.appbuilder.sm
- if not security_manager.find_user(username='test'):
- security_manager.add_user(
- username='test',
- first_name='test',
- last_name='test',
- email='test@fab.org',
- role=security_manager.find_role('Admin'),
- password='test',
- )
- if not security_manager.find_user(username='test_user'):
- security_manager.add_user(
- username='test_user',
- first_name='test_user',
- last_name='test_user',
- email='test_user@fab.org',
- role=security_manager.find_role('User'),
- password='test_user',
- )
- if not security_manager.find_user(username='test_viewer'):
- security_manager.add_user(
- username='test_viewer',
- first_name='test_viewer',
- last_name='test_viewer',
- email='test_viewer@fab.org',
- role=security_manager.find_role('Viewer'),
- password='test_viewer',
- )
+
+ test_users = [
+ {
+ 'username': 'test_admin',
+ 'first_name': 'test_admin_first_name',
+ 'last_name': 'test_admin_last_name',
+ 'email': 'test_admin@fab.org',
+ 'role': security_manager.find_role('Admin'),
+ 'password': 'test_admin_password',
+ },
+ {
+ 'username': 'test_user',
+ 'first_name': 'test_user_first_name',
+ 'last_name': 'test_user_last_name',
+ 'email': 'test_user@fab.org',
+ 'role': security_manager.find_role('User'),
+ 'password': 'test_user_password',
+ },
+ {
+ 'username': 'test_viewer',
+ 'first_name': 'test_viewer_first_name',
+ 'last_name': 'test_viewer_last_name',
+ 'email': 'test_viewer@fab.org',
+ 'role': security_manager.find_role('Viewer'),
+ 'password': 'test_viewer_password',
+ },
+ ]
+
+ for user_dict in test_users:
+ if not security_manager.find_user(username=user_dict['username']):
+ security_manager.add_user(**user_dict)
yield app
- delete_roles(app)
+ for user_dict in test_users:
+ delete_user(app, user_dict['username'])
@pytest.fixture()
def admin_client(app):
- return client_with_login(app, username="test", password="test")
+ return client_with_login(app, username="test_admin", password="test_admin")
@pytest.fixture()
diff --git a/tests/www/views/test_views_base.py b/tests/www/views/test_views_base.py
index 807552b..cc99682 100644
--- a/tests/www/views/test_views_base.py
+++ b/tests/www/views/test_views_base.py
@@ -260,17 +260,18 @@ def test_views_post(admin_client, url, check_response):
@pytest.mark.parametrize(
- "url, client, content",
+ "url, client, content, username",
[
- ("resetmypassword/form", "viewer_client", "Password Changed"),
- ("resetpassword/form?pk=1", "admin_client", "Password Changed"),
- ("resetpassword/form?pk=1", "viewer_client", "Access is Denied"),
+ ("resetmypassword/form", "viewer_client", "Password Changed", "test_viewer"),
+ ("resetpassword/form?pk={}", "admin_client", "Password Changed", "test_admin"),
+ ("resetpassword/form?pk={}", "viewer_client", "Access is Denied", "test_viewer"),
],
ids=["my-viewer", "pk-admin", "pk-viewer"],
)
-def test_resetmypasswordview_edit(request, url, client, content):
+def test_resetmypasswordview_edit(app, request, url, client, content, username):
+ user = app.appbuilder.sm.find_user(username)
resp = request.getfixturevalue(client).post(
- url, data={'password': 'blah', 'conf_password': 'blah'}, follow_redirects=True
+ url.format(user.id), data={'password': 'blah', 'conf_password': 'blah'}, follow_redirects=True
)
check_content_in_response(content, resp)