You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ur...@apache.org on 2021/11/30 16:34:21 UTC

[airflow] branch main updated: Convert www.security test suite to pytest and remove residuals (#18961)

This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 33a4502  Convert www.security test suite to pytest and remove residuals (#18961)
33a4502 is described below

commit 33a4502f68be6b25fbccb96ed1832c5b527bb02a
Author: Khalid Mammadov <xm...@hotmail.com>
AuthorDate: Tue Nov 30 16:33:55 2021 +0000

    Convert www.security test suite to pytest and remove residuals (#18961)
    
    Co-authored-by: Tzu-ping Chung <ur...@gmail.com>
---
 tests/test_utils/api_connexion_utils.py |   16 +-
 tests/www/test_security.py              | 1164 +++++++++++++++++--------------
 tests/www/views/conftest.py             |   65 +-
 tests/www/views/test_views_base.py      |   13 +-
 4 files changed, 691 insertions(+), 567 deletions(-)

diff --git a/tests/test_utils/api_connexion_utils.py b/tests/test_utils/api_connexion_utils.py
index bee38c3..e8063f2 100644
--- a/tests/test_utils/api_connexion_utils.py
+++ b/tests/test_utils/api_connexion_utils.py
@@ -80,15 +80,23 @@ def create_role(app, name, permissions=None):
     return role
 
 
+def set_user_single_role(app, username, role_name):
+    role = create_role(app, role_name)
+    user = app.appbuilder.sm.find_user(username)
+    if role not in user.roles:
+        user.roles = [role]
+        app.appbuilder.sm.update_user(user)
+
+
 def delete_role(app, name):
-    if app.appbuilder.sm.find_role(name):
-        app.appbuilder.sm.delete_role(name)
+    if name not in EXISTING_ROLES:
+        if app.appbuilder.sm.find_role(name):
+            app.appbuilder.sm.delete_role(name)
 
 
 def delete_roles(app):
     for role in app.appbuilder.sm.get_all_roles():
-        if role.name not in EXISTING_ROLES:
-            app.appbuilder.sm.delete_role(role.name)
+        delete_role(app, role.name)
 
 
 def delete_user(app, username):
diff --git a/tests/www/test_security.py b/tests/www/test_security.py
index 66cc26d..efa53dd 100644
--- a/tests/www/test_security.py
+++ b/tests/www/test_security.py
@@ -15,9 +15,8 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
+import contextlib
 import logging
-import unittest
 from unittest import mock
 
 import pytest
@@ -25,15 +24,15 @@ from flask_appbuilder import SQLA, Model, expose, has_access
 from flask_appbuilder.views import BaseView, ModelView
 from sqlalchemy import Column, Date, Float, Integer, String
 
-from airflow import settings
 from airflow.exceptions import AirflowException
 from airflow.models import DagModel
 from airflow.models.dag import DAG
 from airflow.security import permissions
 from airflow.www import app as application
 from airflow.www.fab_security.sqla.models import assoc_permission_role
+from airflow.www.security import AirflowSecurityManager
 from airflow.www.utils import CustomSQLAInterface
-from tests.test_utils import api_connexion_utils
+from tests.test_utils.api_connexion_utils import create_user_scope, delete_role, set_user_single_role
 from tests.test_utils.asserts import assert_queries_count
 from tests.test_utils.db import clear_db_dags, clear_db_runs
 from tests.test_utils.mock_security_manager import MockSecurityManager
@@ -78,627 +77,738 @@ class SomeBaseView(BaseView):
         return "action!"
 
 
-class TestSecurity(unittest.TestCase):
-    @classmethod
-    def setUpClass(cls):
-        settings.configure_orm()
-        cls.session = settings.Session
-        cls.app = application.create_app(testing=True)
-        cls.appbuilder = cls.app.appbuilder
-        cls.app.config['WTF_CSRF_ENABLED'] = False
-        cls.security_manager = cls.appbuilder.sm
-        cls.delete_roles()
-
-    def setUp(self):
-        clear_db_runs()
-        clear_db_dags()
-        self.db = SQLA(self.app)
-        self.appbuilder.add_view(SomeBaseView, "SomeBaseView", category="BaseViews")
-        self.appbuilder.add_view(SomeModelView, "SomeModelView", category="ModelViews")
-
-        log.debug("Complete setup!")
-
-    @classmethod
-    def delete_roles(cls):
-        for role_name in [
-            'team-a',
-            'MyRole1',
-            'MyRole5',
-            'Test_Role',
-            'MyRole3',
-            'MyRole2',
-            'dag_permission_role',
-        ]:
-            api_connexion_utils.delete_role(cls.app, role_name)
-
-    def expect_user_is_in_role(self, user, rolename):
-        self.security_manager.bulk_sync_roles([{'role': rolename, 'perms': []}])
-        role = self.security_manager.find_role(rolename)
-        if not role:
-            self.security_manager.add_role(rolename)
-            role = self.security_manager.find_role(rolename)
-        user.roles = [role]
-        self.security_manager.update_user(user)
-
-    def assert_user_has_dag_perms(self, perms, dag_id, user=None):
-        for perm in perms:
-            assert self._has_dag_perm(perm, dag_id, user), f"User should have '{perm}' on DAG '{dag_id}'"
+def _clear_db_dag_and_runs():
+    clear_db_runs()
+    clear_db_dags()
 
-    def assert_user_does_not_have_dag_perms(self, dag_id, perms, user=None):
-        for perm in perms:
-            assert not self._has_dag_perm(
-                perm, dag_id, user
-            ), f"User should not have '{perm}' on DAG '{dag_id}'"
-
-    def _has_dag_perm(self, perm, dag_id, user):
-        return self.security_manager.has_access(perm, permissions.resource_name_for_dag(dag_id), user)
-
-    def _create_dag(self, dag_id):
-        dag_model = DagModel(dag_id=dag_id)
-        self.session.add(dag_model)
-        self.session.commit()
-        self.security_manager.sync_perm_for_dag(dag_id, access_control=None)
-
-    def tearDown(self):
-        clear_db_runs()
-        clear_db_dags()
-        self.appbuilder = None
-        self.app = None
-        self.db = None
-        log.debug("Complete teardown!")
-
-    def test_init_role_baseview(self):
-        role_name = 'MyRole7'
-        role_perms = [('can_some_other_action', 'AnotherBaseView')]
-        with pytest.warns(
-            DeprecationWarning,
-            match="`init_role` has been deprecated\\. Please use `bulk_sync_roles` instead\\.",
-        ):
-            self.security_manager.init_role(role_name, role_perms)
-
-        role = self.appbuilder.sm.find_role(role_name)
-        assert role is not None
-        assert len(role_perms) == len(role.permissions)
-
-    def test_bulk_sync_roles_baseview(self):
-        role_name = 'MyRole3'
-        role_perms = [('can_some_action', 'SomeBaseView')]
-        self.security_manager.bulk_sync_roles([{'role': role_name, 'perms': role_perms}])
-
-        role = self.appbuilder.sm.find_role(role_name)
-        assert role is not None
-        assert len(role_perms) == len(role.permissions)
-
-    def test_bulk_sync_roles_modelview(self):
-        role_name = 'MyRole2'
-        role_perms = [
-            ('can_list', 'SomeModelView'),
-            ('can_show', 'SomeModelView'),
-            ('can_add', 'SomeModelView'),
-            (permissions.ACTION_CAN_EDIT, 'SomeModelView'),
-            (permissions.ACTION_CAN_DELETE, 'SomeModelView'),
-        ]
-        mock_roles = [{'role': role_name, 'perms': role_perms}]
-        self.security_manager.bulk_sync_roles(mock_roles)
 
-        role = self.appbuilder.sm.find_role(role_name)
-        assert role is not None
-        assert len(role_perms) == len(role.permissions)
+def _delete_dag_permissions(dag_id, security_manager):
+    dag_resource_name = permissions.resource_name_for_dag(dag_id)
+    for dag_action_name in security_manager.DAG_ACTIONS:
+        security_manager.delete_permission(dag_action_name, dag_resource_name)
 
-        # Check short circuit works
-        with assert_queries_count(2):  # One for Permission, one for roles
-            self.security_manager.bulk_sync_roles(mock_roles)
 
-    def test_update_and_verify_permission_role(self):
-        role_name = 'Test_Role'
-        role_perms = []
-        mock_roles = [{'role': role_name, 'perms': role_perms}]
-        self.security_manager.bulk_sync_roles(mock_roles)
-        role = self.security_manager.find_role(role_name)
+def _create_dag_model(dag_id, session, security_manager):
+    dag_model = DagModel(dag_id=dag_id)
+    session.add(dag_model)
+    session.commit()
+    security_manager.sync_perm_for_dag(dag_id, access_control=None)
+    return dag_model
 
-        perm = self.security_manager.get_permission(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_ROLE)
-        self.security_manager.add_permission_to_role(role, perm)
-        role_perms_len = len(role.permissions)
 
-        self.security_manager.bulk_sync_roles(mock_roles)
-        new_role_perms_len = len(role.permissions)
+def _delete_dag_model(dag_model, session, security_manager):
+    session.delete(dag_model)
+    session.commit()
+    _delete_dag_permissions(dag_model.dag_id, security_manager)
 
-        assert role_perms_len == new_role_perms_len
-        assert new_role_perms_len == 1
 
-    def test_verify_public_role_has_no_permissions(self):
-        public = self.appbuilder.sm.find_role("Public")
+@contextlib.contextmanager
+def _create_dag_model_context(dag_id, session, security_manager):
+    dag = _create_dag_model(dag_id, session, security_manager)
+    yield dag
+    _delete_dag_model(dag, session, security_manager)
 
-        assert public.permissions == []
 
-    def test_verify_default_anon_user_has_no_accessible_dag_ids(self):
-        with self.app.app_context():
-            user = mock.MagicMock()
-            user.is_anonymous = True
-            self.app.config['AUTH_ROLE_PUBLIC'] = 'Public'
-            assert self.app.appbuilder.sm.get_user_roles(user) == [self.app.appbuilder.sm.get_public_role()]
+@pytest.fixture(scope="module", autouse=True)
+def clear_db_after_suite():
+    yield None
+    _clear_db_dag_and_runs()
 
-            self._create_dag("test_dag_id")
-            self.security_manager.sync_roles()
 
-            assert self.security_manager.get_accessible_dag_ids(user) == set()
+@pytest.fixture(scope="function", autouse=True)
+def clear_db_before_test():
+    _clear_db_dag_and_runs()
 
-    def test_verify_default_anon_user_has_no_access_to_specific_dag(self):
-        with self.app.app_context():
-            user = mock.MagicMock()
-            user.is_anonymous = True
-            self.app.config['AUTH_ROLE_PUBLIC'] = 'Public'
-            assert self.app.appbuilder.sm.get_user_roles(user) == [self.app.appbuilder.sm.get_public_role()]
 
-            dag_id = "test_dag_id"
-            self._create_dag(dag_id)
-            self.app.appbuilder.sm.sync_roles()
+@pytest.fixture(scope="module")
+def app():
+    _app = application.create_app(testing=True)
+    _app.config['WTF_CSRF_ENABLED'] = False
+    return _app
 
-            assert self.app.appbuilder.sm.can_read_dag(dag_id, user) is False
-            assert self.app.appbuilder.sm.can_edit_dag(dag_id, user) is False
-            assert self._has_dag_perm(permissions.ACTION_CAN_READ, dag_id, user) is False
-            assert self._has_dag_perm(permissions.ACTION_CAN_EDIT, dag_id, user) is False
 
-    def test_verify_anon_user_with_admin_role_has_all_dag_access(self):
-        with self.app.app_context():
-            self.app.config['AUTH_ROLE_PUBLIC'] = 'Admin'
-            user = mock.MagicMock()
-            user.is_anonymous = True
+@pytest.fixture(scope="module")
+def app_builder(app):
+    app_builder = app.appbuilder
+    app_builder.add_view(SomeBaseView, "SomeBaseView", category="BaseViews")
+    app_builder.add_view(SomeModelView, "SomeModelView", category="ModelViews")
+    return app.appbuilder
 
-            assert self.app.appbuilder.sm.get_user_roles(user) == [self.app.appbuilder.sm.get_public_role()]
 
-            test_dag_ids = ["test_dag_id_1", "test_dag_id_2", "test_dag_id_3"]
-            for dag_id in test_dag_ids:
-                self._create_dag(dag_id)
-            self.security_manager.sync_roles()
+@pytest.fixture(scope="module")
+def security_manager(app_builder):
+    return app_builder.sm
 
-            assert self.security_manager.get_accessible_dag_ids(user) == set(test_dag_ids)
 
-    def test_verify_anon_user_with_admin_role_has_access_to_each_dag(self):
-        with self.app.app_context():
-            user = mock.MagicMock()
-            user.is_anonymous = True
-            self.app.config['AUTH_ROLE_PUBLIC'] = 'Admin'
+@pytest.fixture(scope="module")
+def session(app_builder):
+    return app_builder.get_session
 
-            # Call `.get_user_roles` bc `user` is a mock and the `user.roles` prop needs to be set.
-            user.roles = self.app.appbuilder.sm.get_user_roles(user)
-            assert user.roles == [self.app.appbuilder.sm.get_public_role()]
 
-            test_dag_ids = ["test_dag_id_1", "test_dag_id_2", "test_dag_id_3"]
+@pytest.fixture(scope="module")
+def db(app):
+    return SQLA(app)
 
-            for dag_id in test_dag_ids:
-                self._create_dag(dag_id)
-            self.security_manager.sync_roles()
 
-            for dag_id in test_dag_ids:
-                assert self.app.appbuilder.sm.can_read_dag(dag_id, user) is True
-                assert self.app.appbuilder.sm.can_edit_dag(dag_id, user) is True
-                assert self._has_dag_perm(permissions.ACTION_CAN_READ, dag_id, user) is True
-                assert self._has_dag_perm(permissions.ACTION_CAN_EDIT, dag_id, user) is True
+@pytest.fixture(scope="function")
+def role(request, app, security_manager):
+    params = request.param
+    _role = None
+    params['mock_roles'] = [{'role': params['name'], 'perms': params['permissions']}]
+    if params.get("create", True):
+        security_manager.bulk_sync_roles(params['mock_roles'])
+        _role = security_manager.find_role(params['name'])
+    yield _role, params
+    delete_role(app, params['name'])
 
-    def test_get_user_roles(self):
-        user = mock.MagicMock()
-        user.is_anonymous = False
-        roles = self.appbuilder.sm.find_role('Admin')
-        user.roles = roles
-        assert self.security_manager.get_user_roles(user) == roles
-
-    def test_get_user_roles_for_anonymous_user(self):
-        viewer_role_perms = {
-            (permissions.ACTION_CAN_READ, permissions.RESOURCE_AUDIT_LOG),
-            (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
-            (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_DEPENDENCIES),
-            (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_CODE),
-            (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
-            (permissions.ACTION_CAN_READ, permissions.RESOURCE_IMPORT_ERROR),
-            (permissions.ACTION_CAN_READ, permissions.RESOURCE_JOB),
-            (permissions.ACTION_CAN_READ, permissions.RESOURCE_PLUGIN),
-            (permissions.ACTION_CAN_READ, permissions.RESOURCE_SLA_MISS),
-            (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
-            (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_LOG),
-            (permissions.ACTION_CAN_READ, permissions.RESOURCE_XCOM),
-            (permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE),
-            (permissions.ACTION_CAN_READ, permissions.RESOURCE_MY_PASSWORD),
-            (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_MY_PASSWORD),
-            (permissions.ACTION_CAN_READ, permissions.RESOURCE_MY_PROFILE),
-            (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_MY_PROFILE),
-            (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_BROWSE_MENU),
-            (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_DAG_DEPENDENCIES),
-            (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_DAG_RUN),
-            (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_JOB),
-            (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_AUDIT_LOG),
-            (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_PLUGIN),
-            (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_SLA_MISS),
-            (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_TASK_INSTANCE),
-            (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_DOCS_MENU),
-            (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_DOCS),
+
+@pytest.fixture(scope="function")
+def mock_dag_models(request, session, security_manager):
+    dags_ids = request.param
+    dags = [_create_dag_model(dag_id, session, security_manager) for dag_id in dags_ids]
+
+    yield dags_ids
+
+    for dag in dags:
+        _delete_dag_model(dag, session, security_manager)
+
+
+@pytest.fixture(scope="function")
+def sample_dags(security_manager):
+    dags = [
+        DAG('has_access_control', access_control={'Public': {permissions.ACTION_CAN_READ}}),
+        DAG('no_access_control'),
+    ]
+
+    yield dags
+
+    for dag in dags:
+        _delete_dag_permissions(dag.dag_id, security_manager)
+
+
+@pytest.fixture(scope="module")
+def has_dag_perm(security_manager):
+    def _has_dag_perm(perm, dag_id, user):
+        return security_manager.has_access(perm, permissions.resource_name_for_dag(dag_id), user)
+
+    return _has_dag_perm
+
+
+@pytest.fixture(scope="module")
+def assert_user_has_dag_perms(has_dag_perm):
+    def _assert_user_has_dag_perms(perms, dag_id, user=None):
+        for perm in perms:
+            assert has_dag_perm(perm, dag_id, user), f"User should have '{perm}' on DAG '{dag_id}'"
+
+    return _assert_user_has_dag_perms
+
+
+@pytest.fixture(scope="module")
+def assert_user_does_not_have_dag_perms(has_dag_perm):
+    def _assert_user_does_not_have_dag_perms(dag_id, perms, user=None):
+        for perm in perms:
+            assert not has_dag_perm(perm, dag_id, user), f"User should not have '{perm}' on DAG '{dag_id}'"
+
+    return _assert_user_does_not_have_dag_perms
+
+
+@pytest.mark.parametrize(
+    "role",
+    [{"name": "MyRole7", "permissions": [('can_some_other_action', 'AnotherBaseView')], "create": False}],
+    indirect=True,
+)
+def test_init_role_baseview(app, security_manager, role):
+    _, params = role
+
+    with pytest.warns(
+        DeprecationWarning,
+        match="`init_role` has been deprecated\\. Please use `bulk_sync_roles` instead\\.",
+    ):
+        security_manager.init_role(params['name'], params['permissions'])
+
+    _role = security_manager.find_role(params['name'])
+    assert _role is not None
+    assert len(_role.permissions) == len(params['permissions'])
+
+
+@pytest.mark.parametrize(
+    "role",
+    [{"name": "MyRole3", "permissions": [('can_some_action', 'SomeBaseView')]}],
+    indirect=True,
+)
+def test_bulk_sync_roles_baseview(app, security_manager, role):
+    _role, params = role
+    assert _role is not None
+    assert len(_role.permissions) == len(params['permissions'])
+
+
+@pytest.mark.parametrize(
+    "role",
+    [
+        {
+            "name": "MyRole2",
+            "permissions": [
+                ('can_list', 'SomeModelView'),
+                ('can_show', 'SomeModelView'),
+                ('can_add', 'SomeModelView'),
+                (permissions.ACTION_CAN_EDIT, 'SomeModelView'),
+                (permissions.ACTION_CAN_DELETE, 'SomeModelView'),
+            ],
         }
-        self.app.config['AUTH_ROLE_PUBLIC'] = 'Viewer'
-
-        with self.app.app_context():
-            user = mock.MagicMock()
-            user.is_anonymous = True
-
-            perms_views = set()
-            for role in self.security_manager.get_user_roles(user):
-                perms_views.update({(perm.action.name, perm.resource.name) for perm in role.permissions})
-            assert perms_views == viewer_role_perms
-
-    @mock.patch('airflow.www.security.AirflowSecurityManager.get_user_roles')
-    def test_get_current_user_permissions(self, mock_get_user_roles):
-        role_name = 'MyRole5'
-        role_perm = 'can_some_action'
-        role_vm = 'SomeBaseView'
-        username = 'get_current_user_permissions'
-
-        with self.app.app_context():
-            user = api_connexion_utils.create_user(
-                self.app,
-                username=username,
-                role_name=role_name,
-                permissions=[
-                    (role_perm, role_vm),
-                ],
-            )
+    ],
+    indirect=True,
+)
+def test_bulk_sync_roles_modelview(app, security_manager, role):
+    _role, params = role
+    assert role is not None
+    assert len(_role.permissions) == len(params['permissions'])
+
+    # Check short circuit works
+    with assert_queries_count(2):  # One for Permission, one for roles
+        security_manager.bulk_sync_roles(params['mock_roles'])
+
+
+@pytest.mark.parametrize(
+    "role",
+    [{"name": "Test_Role", "permissions": []}],
+    indirect=True,
+)
+def test_update_and_verify_permission_role(app, security_manager, role):
+    _role, params = role
+    perm = security_manager.get_permission(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_ROLE)
+    security_manager.add_permission_to_role(_role, perm)
+    role_perms_len = len(_role.permissions)
+
+    security_manager.bulk_sync_roles(params['mock_roles'])
+    new_role_perms_len = len(_role.permissions)
+
+    assert role_perms_len == new_role_perms_len
+    assert new_role_perms_len == 1
+
+
+def test_verify_public_role_has_no_permissions(security_manager):
+    public = security_manager.find_role("Public")
+    assert public.permissions == []
+
+
+def test_verify_default_anon_user_has_no_accessible_dag_ids(app, session, security_manager):
+    with app.app_context():
+        user = mock.MagicMock()
+        user.is_anonymous = True
+        app.config['AUTH_ROLE_PUBLIC'] = 'Public'
+        assert security_manager.get_user_roles(user) == [security_manager.get_public_role()]
+
+        with _create_dag_model_context("test_dag_id", session, security_manager):
+            security_manager.sync_roles()
+
+            assert security_manager.get_accessible_dag_ids(user) == set()
+
+
+def test_verify_default_anon_user_has_no_access_to_specific_dag(app, session, security_manager, has_dag_perm):
+    with app.app_context():
+        user = mock.MagicMock()
+        user.is_anonymous = True
+        app.config['AUTH_ROLE_PUBLIC'] = 'Public'
+        assert security_manager.get_user_roles(user) == [security_manager.get_public_role()]
+
+        dag_id = "test_dag_id"
+        with _create_dag_model_context(dag_id, session, security_manager):
+            security_manager.sync_roles()
+
+            assert security_manager.can_read_dag(dag_id, user) is False
+            assert security_manager.can_edit_dag(dag_id, user) is False
+            assert has_dag_perm(permissions.ACTION_CAN_READ, dag_id, user) is False
+            assert has_dag_perm(permissions.ACTION_CAN_EDIT, dag_id, user) is False
+
+
+@pytest.mark.parametrize(
+    "mock_dag_models",
+    [["test_dag_id_1", "test_dag_id_2", "test_dag_id_3"]],
+    indirect=True,
+)
+def test_verify_anon_user_with_admin_role_has_all_dag_access(app, session, security_manager, mock_dag_models):
+    test_dag_ids = mock_dag_models
+    with app.app_context():
+        app.config['AUTH_ROLE_PUBLIC'] = 'Admin'
+        user = mock.MagicMock()
+        user.is_anonymous = True
+
+        assert security_manager.get_user_roles(user) == [security_manager.get_public_role()]
+
+        security_manager.sync_roles()
+
+        assert security_manager.get_accessible_dag_ids(user) == set(test_dag_ids)
+
+
+def test_verify_anon_user_with_admin_role_has_access_to_each_dag(
+    app, session, security_manager, has_dag_perm
+):
+    with app.app_context():
+        user = mock.MagicMock()
+        user.is_anonymous = True
+        app.config['AUTH_ROLE_PUBLIC'] = 'Admin'
+
+        # Call `.get_user_roles` bc `user` is a mock and the `user.roles` prop needs to be set.
+        user.roles = security_manager.get_user_roles(user)
+        assert user.roles == [security_manager.get_public_role()]
+
+        test_dag_ids = ["test_dag_id_1", "test_dag_id_2", "test_dag_id_3"]
+
+        for dag_id in test_dag_ids:
+            with _create_dag_model_context(dag_id, session, security_manager):
+                security_manager.sync_roles()
+
+                assert security_manager.can_read_dag(dag_id, user) is True
+                assert security_manager.can_edit_dag(dag_id, user) is True
+                assert has_dag_perm(permissions.ACTION_CAN_READ, dag_id, user) is True
+                assert has_dag_perm(permissions.ACTION_CAN_EDIT, dag_id, user) is True
+
+
+def test_get_user_roles(app_builder, security_manager):
+    user = mock.MagicMock()
+    user.is_anonymous = False
+    roles = app_builder.sm.find_role('Admin')
+    user.roles = roles
+    assert security_manager.get_user_roles(user) == roles
+
+
+def test_get_user_roles_for_anonymous_user(app, security_manager):
+    viewer_role_perms = {
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_AUDIT_LOG),
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_DEPENDENCIES),
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_CODE),
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_IMPORT_ERROR),
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_JOB),
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_PLUGIN),
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_SLA_MISS),
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_LOG),
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_XCOM),
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE),
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_MY_PASSWORD),
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_MY_PASSWORD),
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_MY_PROFILE),
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_MY_PROFILE),
+        (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_BROWSE_MENU),
+        (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_DAG_DEPENDENCIES),
+        (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_DAG_RUN),
+        (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_JOB),
+        (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_AUDIT_LOG),
+        (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_PLUGIN),
+        (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_SLA_MISS),
+        (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_TASK_INSTANCE),
+        (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_DOCS_MENU),
+        (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_DOCS),
+    }
+    app.config['AUTH_ROLE_PUBLIC'] = 'Viewer'
+
+    with app.app_context():
+        user = mock.MagicMock()
+        user.is_anonymous = True
+
+        perms_views = set()
+        for role in security_manager.get_user_roles(user):
+            perms_views.update({(perm.action.name, perm.resource.name) for perm in role.permissions})
+        assert perms_views == viewer_role_perms
+
+
+def test_get_current_user_permissions(app, security_manager, monkeypatch):
+    role_perm = 'can_some_action'
+    role_vm = 'SomeBaseView'
+
+    from airflow.www.security import AirflowSecurityManager
+
+    with app.app_context():
+        with create_user_scope(
+            app,
+            username='get_current_user_permissions',
+            role_name='MyRole5',
+            permissions=[
+                (role_perm, role_vm),
+            ],
+        ) as user:
             role = user.roles[0]
-            mock_get_user_roles.return_value = [role]
+            monkeypatch.setattr(AirflowSecurityManager, "get_user_roles", lambda x: [role])
 
-            assert self.security_manager.get_current_user_permissions() == {(role_perm, role_vm)}
+            assert security_manager.get_current_user_permissions() == {(role_perm, role_vm)}
 
-            mock_get_user_roles.return_value = []
-            assert len(self.security_manager.get_current_user_permissions()) == 0
+            monkeypatch.setattr(AirflowSecurityManager, "get_user_roles", lambda x: [])
+            assert len(security_manager.get_current_user_permissions()) == 0
 
-    @mock.patch('airflow.www.security.AirflowSecurityManager.get_user_roles')
-    def test_current_user_has_permissions(self, mock_get_user_roles):
-        with self.app.app_context():
-            user = api_connexion_utils.create_user(
-                self.app,
-                username="current_user_has_permissions",
-                role_name="current_user_has_permissions",
-                permissions=[("can_some_action", "SomeBaseView")],
-            )
+
+def test_current_user_has_permissions(app, security_manager, monkeypatch):
+    with app.app_context():
+        with create_user_scope(
+            app,
+            username="current_user_has_permissions",
+            role_name="current_user_has_permissions",
+            permissions=[("can_some_action", "SomeBaseView")],
+        ) as user:
             role = user.roles[0]
-            mock_get_user_roles.return_value = [role]
-            assert self.security_manager.current_user_has_permissions()
+            monkeypatch.setattr(AirflowSecurityManager, "get_user_roles", lambda x: [role])
+            assert security_manager.current_user_has_permissions()
 
             # Role, but no permissions
             role.permissions = []
-            assert not self.security_manager.current_user_has_permissions()
+            assert not security_manager.current_user_has_permissions()
 
             # No role
-            mock_get_user_roles.return_value = []
-            assert not self.security_manager.current_user_has_permissions()
+            monkeypatch.setattr(AirflowSecurityManager, "get_user_roles", lambda x: [])
+            assert not security_manager.current_user_has_permissions()
 
-    def test_get_accessible_dag_ids(self):
-        role_name = 'MyRole1'
-        permission_action = [permissions.ACTION_CAN_READ]
-        dag_id = 'dag_id'
-        username = "ElUser"
 
-        user = api_connexion_utils.create_user(
-            self.app,
-            username=username,
-            role_name=role_name,
-            permissions=[
-                (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
-                (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
-            ],
-        )
+def test_get_accessible_dag_ids(app, security_manager, session):
+    role_name = 'MyRole1'
+    permission_action = [permissions.ACTION_CAN_READ]
+    dag_id = 'dag_id'
+    username = "ElUser"
+
+    with create_user_scope(
+        app,
+        username=username,
+        role_name=role_name,
+        permissions=[
+            (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+            (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+        ],
+    ) as user:
 
         dag_model = DagModel(dag_id=dag_id, fileloc="/tmp/dag_.py", schedule_interval="2 2 * * *")
-        self.session.add(dag_model)
-        self.session.commit()
+        session.add(dag_model)
+        session.commit()
 
-        self.security_manager.sync_perm_for_dag(  # type: ignore
+        security_manager.sync_perm_for_dag(  # type: ignore
             dag_id, access_control={role_name: permission_action}
         )
 
-        assert self.security_manager.get_accessible_dag_ids(user) == {'dag_id'}
+        assert security_manager.get_accessible_dag_ids(user) == {'dag_id'}
 
-    def test_dont_get_inaccessible_dag_ids_for_dag_resource_permission(self):
-        # In this test case,
-        # get_readable_dag_ids() don't return DAGs to which the user has CAN_EDIT action
-        username = "Monsieur User"
-        role_name = "MyRole1"
-        permission_action = [permissions.ACTION_CAN_EDIT]
-        dag_id = "dag_id"
 
-        user = api_connexion_utils.create_user(
-            self.app,
-            username=username,
-            role_name=role_name,
-            permissions=[
-                (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
-            ],
-        )
+def test_dont_get_inaccessible_dag_ids_for_dag_resource_permission(app, security_manager, session):
+    # In this test case,
+    # get_readable_dag_ids() don't return DAGs to which the user has CAN_EDIT action
+    username = "Monsieur User"
+    role_name = "MyRole1"
+    permission_action = [permissions.ACTION_CAN_EDIT]
+    dag_id = "dag_id"
+
+    with create_user_scope(
+        app,
+        username=username,
+        role_name=role_name,
+        permissions=[
+            (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
+        ],
+    ) as user:
 
         dag_model = DagModel(dag_id=dag_id, fileloc="/tmp/dag_.py", schedule_interval="2 2 * * *")
-        self.session.add(dag_model)
-        self.session.commit()
+        session.add(dag_model)
+        session.commit()
 
-        self.security_manager.sync_perm_for_dag(  # type: ignore
+        security_manager.sync_perm_for_dag(  # type: ignore
             dag_id, access_control={role_name: permission_action}
         )
 
-        assert self.security_manager.get_readable_dag_ids(user) == set()
+        assert security_manager.get_readable_dag_ids(user) == set()
 
-    @mock.patch('airflow.www.security.AirflowSecurityManager._has_resource_access')
-    def test_has_access(self, mock_has_resource_access):
-        user = mock.MagicMock()
-        user.is_anonymous = False
-        mock_has_resource_access.return_value = True
-        assert self.security_manager.has_access('perm', 'view', user)
-
-    def test_sync_perm_for_dag_creates_permissions_on_resources(self):
-        test_dag_id = 'TEST_DAG'
-        prefixed_test_dag_id = f'DAG:{test_dag_id}'
-        self.security_manager.sync_perm_for_dag(test_dag_id, access_control=None)
-        assert (
-            self.security_manager.get_permission(permissions.ACTION_CAN_READ, prefixed_test_dag_id)
-            is not None
-        )
-        assert (
-            self.security_manager.get_permission(permissions.ACTION_CAN_EDIT, prefixed_test_dag_id)
-            is not None
+
+def test_has_access(security_manager, monkeypatch):
+    user = mock.MagicMock()
+    user.is_anonymous = False
+    from airflow.www.security import AirflowSecurityManager
+
+    monkeypatch.setattr(AirflowSecurityManager, "_has_resource_access", lambda a, b, c, d: True)
+    assert security_manager.has_access('perm', 'view', user)
+
+
+def test_sync_perm_for_dag_creates_permissions_on_resources(security_manager):
+    test_dag_id = 'TEST_DAG'
+    prefixed_test_dag_id = f'DAG:{test_dag_id}'
+    security_manager.sync_perm_for_dag(test_dag_id, access_control=None)
+    assert security_manager.get_permission(permissions.ACTION_CAN_READ, prefixed_test_dag_id) is not None
+    assert security_manager.get_permission(permissions.ACTION_CAN_EDIT, prefixed_test_dag_id) is not None
+
+
+def test_has_all_dag_access(security_manager, monkeypatch):
+    from airflow.www.security import AirflowSecurityManager
+
+    monkeypatch.setattr(AirflowSecurityManager, "_has_role", lambda a, b: True)
+    assert security_manager.has_all_dags_access()
+
+    monkeypatch.setattr(AirflowSecurityManager, "_has_role", lambda a, b: False)
+    monkeypatch.setattr(AirflowSecurityManager, "_has_perm", lambda a, b, c: False)
+    assert not security_manager.has_all_dags_access()
+
+    monkeypatch.setattr(AirflowSecurityManager, "_has_perm", lambda a, b, c: True)
+    assert security_manager.has_all_dags_access()
+
+
+def test_access_control_with_non_existent_role(security_manager):
+    with pytest.raises(AirflowException) as ctx:
+        security_manager._sync_dag_view_permissions(
+            dag_id='access-control-test',
+            access_control={
+                'this-role-does-not-exist': [permissions.ACTION_CAN_EDIT, permissions.ACTION_CAN_READ]
+            },
         )
+    assert "role does not exist" in str(ctx.value)
 
-    @mock.patch('airflow.www.security.AirflowSecurityManager._has_perm')
-    @mock.patch('airflow.www.security.AirflowSecurityManager._has_role')
-    def test_has_all_dag_access(self, mock_has_role, mock_has_perm):
-        mock_has_role.return_value = True
-        assert self.security_manager.has_all_dags_access()
-
-        mock_has_role.return_value = False
-        mock_has_perm.return_value = False
-        assert not self.security_manager.has_all_dags_access()
-
-        mock_has_perm.return_value = True
-        assert self.security_manager.has_all_dags_access()
-
-    def test_access_control_with_non_existent_role(self):
-        with pytest.raises(AirflowException) as ctx:
-            self.security_manager._sync_dag_view_permissions(
-                dag_id='access-control-test',
-                access_control={
-                    'this-role-does-not-exist': [permissions.ACTION_CAN_EDIT, permissions.ACTION_CAN_READ]
-                },
-            )
-        assert "role does not exist" in str(ctx.value)
-
-    def test_all_dag_access_doesnt_give_non_dag_access(self):
-        username = 'dag_access_user'
-        role_name = 'dag_access_role'
-        with self.app.app_context():
-            user = api_connexion_utils.create_user(
-                self.app,
-                username=username,
-                role_name=role_name,
-                permissions=[
-                    (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
-                    (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
-                ],
-            )
-            assert self.security_manager.has_access(
-                permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG, user
-            )
-            assert not self.security_manager.has_access(
+
+def test_all_dag_access_doesnt_give_non_dag_access(app, security_manager):
+    username = 'dag_access_user'
+    role_name = 'dag_access_role'
+    with app.app_context():
+        with create_user_scope(
+            app,
+            username=username,
+            role_name=role_name,
+            permissions=[
+                (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+                (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+            ],
+        ) as user:
+            assert security_manager.has_access(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG, user)
+            assert not security_manager.has_access(
                 permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE, user
             )
 
-    def test_access_control_with_invalid_permission(self):
-        invalid_actions = [
-            'can_varimport',  # a real action, but not a member of DAG_ACTIONS
-            'can_eat_pudding',  # clearly not a real action
-        ]
-        username = "LaUser"
-        user = api_connexion_utils.create_user(
-            self.app,
-            username=username,
-            role_name='team-a',
-        )
+
+def test_access_control_with_invalid_permission(app, security_manager):
+    invalid_actions = [
+        'can_varimport',  # a real action, but not a member of DAG_ACTIONS
+        'can_eat_pudding',  # clearly not a real action
+    ]
+    username = "LaUser"
+    rolename = "team-a"
+    with create_user_scope(
+        app,
+        username=username,
+        role_name=rolename,
+    ):
         for action in invalid_actions:
-            self.expect_user_is_in_role(user, rolename='team-a')
             with pytest.raises(AirflowException) as ctx:
-                self.security_manager._sync_dag_view_permissions(
-                    'access_control_test', access_control={'team-a': {action}}
+                security_manager._sync_dag_view_permissions(
+                    'access_control_test', access_control={rolename: {action}}
                 )
             assert "invalid permissions" in str(ctx.value)
 
-    def test_access_control_is_set_on_init(self):
-        username = 'access_control_is_set_on_init'
-        role_name = 'team-a'
-        with self.app.app_context():
-            user = api_connexion_utils.create_user(
-                self.app,
-                username=username,
-                role_name=role_name,
-                permissions=[],
-            )
-            self.expect_user_is_in_role(user, rolename='team-a')
-            self.security_manager._sync_dag_view_permissions(
+
+def test_access_control_is_set_on_init(
+    app,
+    security_manager,
+    assert_user_has_dag_perms,
+    assert_user_does_not_have_dag_perms,
+):
+    username = 'access_control_is_set_on_init'
+    role_name = 'team-a'
+    negated_role = 'NOT-team-a'
+    with app.app_context():
+        with create_user_scope(
+            app,
+            username=username,
+            role_name=role_name,
+            permissions=[],
+        ) as user:
+            security_manager._sync_dag_view_permissions(
                 'access_control_test',
-                access_control={'team-a': [permissions.ACTION_CAN_EDIT, permissions.ACTION_CAN_READ]},
+                access_control={role_name: [permissions.ACTION_CAN_EDIT, permissions.ACTION_CAN_READ]},
             )
-            self.assert_user_has_dag_perms(
+            assert_user_has_dag_perms(
                 perms=[permissions.ACTION_CAN_EDIT, permissions.ACTION_CAN_READ],
                 dag_id='access_control_test',
                 user=user,
             )
 
-            self.expect_user_is_in_role(user, rolename='NOT-team-a')
-            self.assert_user_does_not_have_dag_perms(
+            security_manager.bulk_sync_roles([{'role': negated_role, 'perms': []}])
+            set_user_single_role(app, username, role_name=negated_role)
+            assert_user_does_not_have_dag_perms(
                 perms=[permissions.ACTION_CAN_EDIT, permissions.ACTION_CAN_READ],
                 dag_id='access_control_test',
                 user=user,
             )
 
-    def test_access_control_stale_perms_are_revoked(self):
-        username = 'access_control_stale_perms_are_revoked'
-        role_name = 'team-a'
-        with self.app.app_context():
-            user = api_connexion_utils.create_user(
-                self.app,
-                username=username,
-                role_name=role_name,
-                permissions=[],
-            )
-            self.expect_user_is_in_role(user, rolename='team-a')
-            self.security_manager._sync_dag_view_permissions(
+
+def test_access_control_stale_perms_are_revoked(
+    app,
+    security_manager,
+    assert_user_has_dag_perms,
+    assert_user_does_not_have_dag_perms,
+):
+    username = 'access_control_stale_perms_are_revoked'
+    role_name = 'team-a'
+    with app.app_context():
+        with create_user_scope(
+            app,
+            username=username,
+            role_name=role_name,
+            permissions=[],
+        ) as user:
+            set_user_single_role(app, username, role_name='team-a')
+            security_manager._sync_dag_view_permissions(
                 'access_control_test', access_control={'team-a': READ_WRITE}
             )
-            self.assert_user_has_dag_perms(perms=READ_WRITE, dag_id='access_control_test', user=user)
+            assert_user_has_dag_perms(perms=READ_WRITE, dag_id='access_control_test', user=user)
 
-            self.security_manager._sync_dag_view_permissions(
+            security_manager._sync_dag_view_permissions(
                 'access_control_test', access_control={'team-a': READ_ONLY}
             )
-            self.assert_user_has_dag_perms(
+            assert_user_has_dag_perms(
                 perms=[permissions.ACTION_CAN_READ], dag_id='access_control_test', user=user
             )
-            self.assert_user_does_not_have_dag_perms(
+            assert_user_does_not_have_dag_perms(
                 perms=[permissions.ACTION_CAN_EDIT], dag_id='access_control_test', user=user
             )
 
-    def test_no_additional_dag_permission_views_created(self):
-        ab_perm_role = assoc_permission_role
-
-        self.security_manager.sync_roles()
-        num_pv_before = self.db.session().query(ab_perm_role).count()
-        self.security_manager.sync_roles()
-        num_pv_after = self.db.session().query(ab_perm_role).count()
-        assert num_pv_before == num_pv_after
-
-    def test_override_role_vm(self):
-        test_security_manager = MockSecurityManager(appbuilder=self.appbuilder)
-        assert len(test_security_manager.VIEWER_VMS) == 1
-        assert test_security_manager.VIEWER_VMS == {'Airflow'}
-
-    def test_correct_roles_have_perms_to_read_config(self):
-        roles_to_check = self.security_manager.get_all_roles()
-        assert len(roles_to_check) >= 5
-        for role in roles_to_check:
-            if role.name in ["Admin", "Op"]:
-                assert self.security_manager.permission_exists_in_one_or_more_roles(
-                    permissions.RESOURCE_CONFIG, permissions.ACTION_CAN_READ, [role.id]
-                )
-            else:
-                assert not self.security_manager.permission_exists_in_one_or_more_roles(
-                    permissions.RESOURCE_CONFIG, permissions.ACTION_CAN_READ, [role.id]
-                ), (
-                    f"{role.name} should not have {permissions.ACTION_CAN_READ} "
-                    f"on {permissions.RESOURCE_CONFIG}"
-                )
 
-    @mock.patch("airflow.www.security.DagBag")
-    def test_create_dag_specific_permissions(self, dagbag_mock):
-        access_control = {'Public': {permissions.ACTION_CAN_READ}}
-        dags = [
-            DAG('has_access_control', access_control=access_control),
-            DAG('no_access_control'),
-        ]
+def test_no_additional_dag_permission_views_created(db, security_manager):
+    ab_perm_role = assoc_permission_role
 
-        collect_dags_from_db_mock = mock.Mock()
-        dagbag = mock.Mock()
+    security_manager.sync_roles()
+    num_pv_before = db.session().query(ab_perm_role).count()
+    security_manager.sync_roles()
+    num_pv_after = db.session().query(ab_perm_role).count()
+    assert num_pv_before == num_pv_after
 
-        dagbag.dags = {dag.dag_id: dag for dag in dags}
-        dagbag.collect_dags_from_db = collect_dags_from_db_mock
-        dagbag_mock.return_value = dagbag
 
-        self.security_manager._sync_dag_view_permissions = mock.Mock()
+def test_override_role_vm(app_builder):
+    test_security_manager = MockSecurityManager(appbuilder=app_builder)
+    assert len(test_security_manager.VIEWER_VMS) == 1
+    assert test_security_manager.VIEWER_VMS == {'Airflow'}
 
-        for dag in dags:
-            dag_resource_name = permissions.resource_name_for_dag(dag.dag_id)
-            all_perms = self.security_manager.get_all_permissions()
-            assert ('can_read', dag_resource_name) not in all_perms
-            assert ('can_edit', dag_resource_name) not in all_perms
 
-        self.security_manager.create_dag_specific_permissions()
+def test_correct_roles_have_perms_to_read_config(security_manager):
+    roles_to_check = security_manager.get_all_roles()
+    assert len(roles_to_check) >= 5
+    for role in roles_to_check:
+        if role.name in ["Admin", "Op"]:
+            assert security_manager.permission_exists_in_one_or_more_roles(
+                permissions.RESOURCE_CONFIG, permissions.ACTION_CAN_READ, [role.id]
+            )
+        else:
+            assert not security_manager.permission_exists_in_one_or_more_roles(
+                permissions.RESOURCE_CONFIG, permissions.ACTION_CAN_READ, [role.id]
+            ), (
+                f"{role.name} should not have {permissions.ACTION_CAN_READ} "
+                f"on {permissions.RESOURCE_CONFIG}"
+            )
 
-        dagbag_mock.assert_called_once_with(read_dags_from_db=True)
-        collect_dags_from_db_mock.assert_called_once_with()
 
-        for dag in dags:
-            dag_resource_name = permissions.resource_name_for_dag(dag.dag_id)
-            all_perms = self.security_manager.get_all_permissions()
-            assert ('can_read', dag_resource_name) in all_perms
-            assert ('can_edit', dag_resource_name) in all_perms
+def test_create_dag_specific_permissions(session, security_manager, monkeypatch, sample_dags):
 
-        self.security_manager._sync_dag_view_permissions.assert_called_once_with(
-            permissions.resource_name_for_dag('has_access_control'), access_control
-        )
+    access_control = {'Public': {permissions.ACTION_CAN_READ}}
 
-        del dagbag.dags["has_access_control"]
-        with assert_queries_count(1):  # one query to get all perms; dagbag is mocked
-            self.security_manager.create_dag_specific_permissions()
+    collect_dags_from_db_mock = mock.Mock()
+    dagbag_mock = mock.Mock()
+    dagbag_mock.dags = {dag.dag_id: dag for dag in sample_dags}
+    dagbag_mock.collect_dags_from_db = collect_dags_from_db_mock
+    dagbag_class_mock = mock.Mock()
+    dagbag_class_mock.return_value = dagbag_mock
+    import airflow.www.security
 
-    def test_get_all_permissions(self):
-        with assert_queries_count(1):
-            perms = self.security_manager.get_all_permissions()
+    monkeypatch.setitem(airflow.www.security.__dict__, "DagBag", dagbag_class_mock)
+    security_manager._sync_dag_view_permissions = mock.Mock()
 
-        assert isinstance(perms, set)
-        for perm in perms:
-            assert isinstance(perm, tuple)
-            assert len(perm) == 2
-
-        assert ('can_read', 'Connections') in perms
-
-    def test_get_all_non_dag_permissions(self):
-        with assert_queries_count(1):
-            pvs = self.security_manager._get_all_non_dag_permissions()
-
-        assert isinstance(pvs, dict)
-        for (perm_name, viewmodel_name), perm in pvs.items():
-            assert isinstance(perm_name, str)
-            assert isinstance(viewmodel_name, str)
-            assert isinstance(perm, self.security_manager.permission_model)
-
-        assert ('can_read', 'Connections') in pvs
-
-    def test_get_all_roles_with_permissions(self):
-        with assert_queries_count(1):
-            roles = self.security_manager._get_all_roles_with_permissions()
-
-        assert isinstance(roles, dict)
-        for role_name, role in roles.items():
-            assert isinstance(role_name, str)
-            assert isinstance(role, self.security_manager.role_model)
-
-        assert 'Admin' in roles
-
-    def test_prefixed_dag_id_is_deprecated(self):
-        with pytest.warns(
-            DeprecationWarning,
-            match=(
-                "`prefixed_dag_id` has been deprecated. "
-                "Please use `airflow.security.permissions.resource_name_for_dag` instead."
-            ),
-        ):
-            self.security_manager.prefixed_dag_id("hello")
-
-    def test_parent_dag_access_applies_to_subdag(self):
-        username = 'dag_permission_user'
-        role_name = 'dag_permission_role'
-        parent_dag_name = "parent_dag"
-        with self.app.app_context():
-            mock_roles = [
-                {
-                    'role': role_name,
-                    'perms': [
-                        (permissions.ACTION_CAN_READ, f"DAG:{parent_dag_name}"),
-                        (permissions.ACTION_CAN_EDIT, f"DAG:{parent_dag_name}"),
-                    ],
-                }
-            ]
-            user = api_connexion_utils.create_user(
-                self.app,
-                username=username,
-                role_name=role_name,
-            )
-            self.security_manager.bulk_sync_roles(mock_roles)
-            self.security_manager._sync_dag_view_permissions(
+    for dag in sample_dags:
+        dag_resource_name = permissions.resource_name_for_dag(dag.dag_id)
+        all_perms = security_manager.get_all_permissions()
+        assert ('can_read', dag_resource_name) not in all_perms
+        assert ('can_edit', dag_resource_name) not in all_perms
+
+    security_manager.create_dag_specific_permissions()
+
+    dagbag_class_mock.assert_called_once_with(read_dags_from_db=True)
+    collect_dags_from_db_mock.assert_called_once_with()
+
+    for dag in sample_dags:
+        dag_resource_name = permissions.resource_name_for_dag(dag.dag_id)
+        all_perms = security_manager.get_all_permissions()
+        assert ('can_read', dag_resource_name) in all_perms
+        assert ('can_edit', dag_resource_name) in all_perms
+
+    security_manager._sync_dag_view_permissions.assert_called_once_with(
+        permissions.resource_name_for_dag('has_access_control'), access_control
+    )
+
+    del dagbag_mock.dags["has_access_control"]
+    with assert_queries_count(1):  # one query to get all perms; dagbag is mocked
+        security_manager.create_dag_specific_permissions()
+
+
+def test_get_all_permissions(security_manager):
+    with assert_queries_count(1):
+        perms = security_manager.get_all_permissions()
+
+    assert isinstance(perms, set)
+    for perm in perms:
+        assert isinstance(perm, tuple)
+        assert len(perm) == 2
+
+    assert ('can_read', 'Connections') in perms
+
+
+def test_get_all_non_dag_permissions(security_manager):
+    with assert_queries_count(1):
+        pvs = security_manager._get_all_non_dag_permissions()
+
+    assert isinstance(pvs, dict)
+    for (perm_name, viewmodel_name), perm in pvs.items():
+        assert isinstance(perm_name, str)
+        assert isinstance(viewmodel_name, str)
+        assert isinstance(perm, security_manager.permission_model)
+
+    assert ('can_read', 'Connections') in pvs
+
+
+def test_get_all_roles_with_permissions(security_manager):
+    with assert_queries_count(1):
+        roles = security_manager._get_all_roles_with_permissions()
+
+    assert isinstance(roles, dict)
+    for role_name, role in roles.items():
+        assert isinstance(role_name, str)
+        assert isinstance(role, security_manager.role_model)
+
+    assert 'Admin' in roles
+
+
+def test_prefixed_dag_id_is_deprecated(security_manager):
+    with pytest.warns(
+        DeprecationWarning,
+        match=(
+            "`prefixed_dag_id` has been deprecated. "
+            "Please use `airflow.security.permissions.resource_name_for_dag` instead."
+        ),
+    ):
+        security_manager.prefixed_dag_id("hello")
+
+
+def test_parent_dag_access_applies_to_subdag(app, security_manager, assert_user_has_dag_perms):
+    username = 'dag_permission_user'
+    role_name = 'dag_permission_role'
+    parent_dag_name = "parent_dag"
+    with app.app_context():
+        mock_roles = [
+            {
+                'role': role_name,
+                'perms': [
+                    (permissions.ACTION_CAN_READ, f"DAG:{parent_dag_name}"),
+                    (permissions.ACTION_CAN_EDIT, f"DAG:{parent_dag_name}"),
+                ],
+            }
+        ]
+        with create_user_scope(
+            app,
+            username=username,
+            role_name=role_name,
+        ) as user:
+            security_manager.bulk_sync_roles(mock_roles)
+            security_manager._sync_dag_view_permissions(
                 parent_dag_name, access_control={role_name: READ_WRITE}
             )
-            self.assert_user_has_dag_perms(perms=READ_WRITE, dag_id=parent_dag_name, user=user)
-            self.assert_user_has_dag_perms(perms=READ_WRITE, dag_id=parent_dag_name + ".subdag", user=user)
-            self.assert_user_has_dag_perms(
+            assert_user_has_dag_perms(perms=READ_WRITE, dag_id=parent_dag_name, user=user)
+            assert_user_has_dag_perms(perms=READ_WRITE, dag_id=parent_dag_name + ".subdag", user=user)
+            assert_user_has_dag_perms(
                 perms=READ_WRITE, dag_id=parent_dag_name + ".subdag.subsubdag", user=user
             )
diff --git a/tests/www/views/conftest.py b/tests/www/views/conftest.py
index 049742b..fab4ef4 100644
--- a/tests/www/views/conftest.py
+++ b/tests/www/views/conftest.py
@@ -25,7 +25,7 @@ import pytest
 from airflow import settings
 from airflow.models import DagBag
 from airflow.www.app import create_app
-from tests.test_utils.api_connexion_utils import delete_roles
+from tests.test_utils.api_connexion_utils import delete_user
 from tests.test_utils.decorators import dont_initialize_flask_app_submodules
 from tests.test_utils.www import client_with_login
 
@@ -66,42 +66,47 @@ def app(examples_dag_bag):
     app.jinja_env.undefined = jinja2.StrictUndefined
 
     security_manager = app.appbuilder.sm
-    if not security_manager.find_user(username='test'):
-        security_manager.add_user(
-            username='test',
-            first_name='test',
-            last_name='test',
-            email='test@fab.org',
-            role=security_manager.find_role('Admin'),
-            password='test',
-        )
-    if not security_manager.find_user(username='test_user'):
-        security_manager.add_user(
-            username='test_user',
-            first_name='test_user',
-            last_name='test_user',
-            email='test_user@fab.org',
-            role=security_manager.find_role('User'),
-            password='test_user',
-        )
-    if not security_manager.find_user(username='test_viewer'):
-        security_manager.add_user(
-            username='test_viewer',
-            first_name='test_viewer',
-            last_name='test_viewer',
-            email='test_viewer@fab.org',
-            role=security_manager.find_role('Viewer'),
-            password='test_viewer',
-        )
+
+    test_users = [
+        {
+            'username': 'test_admin',
+            'first_name': 'test_admin_first_name',
+            'last_name': 'test_admin_last_name',
+            'email': 'test_admin@fab.org',
+            'role': security_manager.find_role('Admin'),
+            'password': 'test_admin_password',
+        },
+        {
+            'username': 'test_user',
+            'first_name': 'test_user_first_name',
+            'last_name': 'test_user_last_name',
+            'email': 'test_user@fab.org',
+            'role': security_manager.find_role('User'),
+            'password': 'test_user_password',
+        },
+        {
+            'username': 'test_viewer',
+            'first_name': 'test_viewer_first_name',
+            'last_name': 'test_viewer_last_name',
+            'email': 'test_viewer@fab.org',
+            'role': security_manager.find_role('Viewer'),
+            'password': 'test_viewer_password',
+        },
+    ]
+
+    for user_dict in test_users:
+        if not security_manager.find_user(username=user_dict['username']):
+            security_manager.add_user(**user_dict)
 
     yield app
 
-    delete_roles(app)
+    for user_dict in test_users:
+        delete_user(app, user_dict['username'])
 
 
 @pytest.fixture()
 def admin_client(app):
-    return client_with_login(app, username="test", password="test")
+    return client_with_login(app, username="test_admin", password="test_admin")
 
 
 @pytest.fixture()
diff --git a/tests/www/views/test_views_base.py b/tests/www/views/test_views_base.py
index 807552b..cc99682 100644
--- a/tests/www/views/test_views_base.py
+++ b/tests/www/views/test_views_base.py
@@ -260,17 +260,18 @@ def test_views_post(admin_client, url, check_response):
 
 
 @pytest.mark.parametrize(
-    "url, client, content",
+    "url, client, content, username",
     [
-        ("resetmypassword/form", "viewer_client", "Password Changed"),
-        ("resetpassword/form?pk=1", "admin_client", "Password Changed"),
-        ("resetpassword/form?pk=1", "viewer_client", "Access is Denied"),
+        ("resetmypassword/form", "viewer_client", "Password Changed", "test_viewer"),
+        ("resetpassword/form?pk={}", "admin_client", "Password Changed", "test_admin"),
+        ("resetpassword/form?pk={}", "viewer_client", "Access is Denied", "test_viewer"),
     ],
     ids=["my-viewer", "pk-admin", "pk-viewer"],
 )
-def test_resetmypasswordview_edit(request, url, client, content):
+def test_resetmypasswordview_edit(app, request, url, client, content, username):
+    user = app.appbuilder.sm.find_user(username)
     resp = request.getfixturevalue(client).post(
-        url, data={'password': 'blah', 'conf_password': 'blah'}, follow_redirects=True
+        url.format(user.id), data={'password': 'blah', 'conf_password': 'blah'}, follow_redirects=True
     )
     check_content_in_response(content, resp)