You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2021/03/29 15:59:44 UTC

[airflow] branch v2-0-test updated (8a2a33a -> c97a5d9)

This is an automated email from the ASF dual-hosted git repository.

ash pushed a change to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git.


    from 8a2a33a  Scheduler: Remove TIs from starved pools from the critical path. (#14476)
     new b51a9c6  Speed up webserver start when there are many DAGs (#14993)
     new c97a5d9  Faster default role syncing during webserver start (#15017)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 airflow/www/security.py    | 87 +++++++++++++++++++++++++++++++++-----------
 tests/www/test_security.py | 90 ++++++++++++++++++++++++++++++++++++++++++----
 2 files changed, 150 insertions(+), 27 deletions(-)

[airflow] 01/02: Speed up webserver start when there are many DAGs (#14993)

Posted by as...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit b51a9c618ae9c13e7f67b800cc09ec3446f45ecb
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Thu Mar 25 09:37:46 2021 -0600

    Speed up webserver start when there are many DAGs (#14993)
    
    This fixes a short circuit in `create_dag_specific_permissions` to
    avoid needlessly querying permissions for every single DAG, and changes
    `get_all_permissions` to run 1 query instead of many.
    
    With ~5k DAGs, these changes speed up `create_dag_specific_permissions`
    by more than 65 seconds each call (on my machine), and since that method
    is called twice before the webserver actually responds to requests, this
    effectively speeds up the webserver startup by over 2 minutes.
    
    (cherry picked from commit 35fbb726498e3090258a89c7819b2ff3266948f6)
---
 airflow/www/security.py    | 22 +++++++++++-----------
 tests/www/test_security.py | 32 ++++++++++++++++++++++++++++++++
 2 files changed, 43 insertions(+), 11 deletions(-)

diff --git a/airflow/www/security.py b/airflow/www/security.py
index 5201ef6..8a2b10b 100644
--- a/airflow/www/security.py
+++ b/airflow/www/security.py
@@ -457,17 +457,17 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin):  # pylint: disable=
 
         self.get_session.commit()
 
-    def get_all_permissions(self):
+    def get_all_permissions(self) -> Set[Tuple[str, str]]:
         """Returns all permissions as a set of tuples with the perm name and view menu name"""
-        perms = set()
-        for permission_view in self.get_session.query(self.permissionview_model).all():
-            if permission_view.permission and permission_view.view_menu:
-                perms.add((permission_view.permission.name, permission_view.view_menu.name))
-
-        return perms
+        return set(
+            self.get_session.query(self.permissionview_model)
+            .join(self.permission_model)
+            .join(self.viewmenu_model)
+            .with_entities(self.permission_model.name, self.viewmenu_model.name)
+            .all()
+        )
 
-    @provide_session
-    def create_dag_specific_permissions(self, session=None):
+    def create_dag_specific_permissions(self) -> None:
         """
         Creates 'can_read' and 'can_edit' permissions for all active and paused DAGs.
 
@@ -475,7 +475,7 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin):  # pylint: disable=
         """
         perms = self.get_all_permissions()
         rows = (
-            session.query(models.DagModel.dag_id)
+            self.get_session.query(models.DagModel.dag_id)
             .filter(or_(models.DagModel.is_active, models.DagModel.is_paused))
             .all()
         )
@@ -484,7 +484,7 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin):  # pylint: disable=
             dag_id = row[0]
             for perm_name in self.DAG_PERMS:
                 dag_resource_name = self.prefixed_dag_id(dag_id)
-                if dag_resource_name and perm_name and (dag_resource_name, perm_name) not in perms:
+                if (perm_name, dag_resource_name) not in perms:
                     self._merge_perm(perm_name, dag_resource_name)
 
     def update_admin_perm_view(self):
diff --git a/tests/www/test_security.py b/tests/www/test_security.py
index 9179be2..2ae2608 100644
--- a/tests/www/test_security.py
+++ b/tests/www/test_security.py
@@ -33,6 +33,7 @@ from airflow.security import permissions
 from airflow.www import app as application
 from airflow.www.utils import CustomSQLAInterface
 from tests.test_utils import fab_utils
+from tests.test_utils.asserts import assert_queries_count
 from tests.test_utils.db import clear_db_dags, clear_db_runs
 from tests.test_utils.mock_security_manager import MockSecurityManager
 
@@ -542,3 +543,34 @@ class TestSecurity(unittest.TestCase):
                     f"{role.name} should not have {permissions.ACTION_CAN_READ} "
                     f"on {permissions.RESOURCE_CONFIG}"
                 )
+
+    def test_create_dag_specific_permissions(self):
+        dag_id = 'some_dag_id'
+        dag_permission_name = self.security_manager.prefixed_dag_id(dag_id)
+        assert ('can_read', dag_permission_name) not in self.security_manager.get_all_permissions()
+
+        dag_model = DagModel(
+            dag_id=dag_id, fileloc='/tmp/dag_.py', schedule_interval='2 2 * * *', is_paused=True
+        )
+        self.session.add(dag_model)
+        self.session.commit()
+
+        self.security_manager.create_dag_specific_permissions()
+        self.session.commit()
+
+        assert ('can_read', dag_permission_name) in self.security_manager.get_all_permissions()
+
+        # Make sure we short circuit when the perms already exist
+        with assert_queries_count(2):  # One query to get DagModels, one query to get all perms
+            self.security_manager.create_dag_specific_permissions()
+
+    def test_get_all_permissions(self):
+        with assert_queries_count(1):
+            perms = self.security_manager.get_all_permissions()
+
+        assert isinstance(perms, set)
+        for perm in perms:
+            assert isinstance(perm, tuple)
+            assert len(perm) == 2
+
+        assert ('can_read', 'Connections') in perms

[airflow] 02/02: Faster default role syncing during webserver start (#15017)

Posted by as...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit c97a5d908a0513bbfa62809eefc72dedb8d822ce
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Mon Mar 29 09:58:01 2021 -0600

    Faster default role syncing during webserver start (#15017)
    
    This makes a handful of bigger queries instead of many queries when
    syncing the default Airflow roles. On my machine with 5k DAGs, this led
    to a reduction of 1 second in startup time (bonus, makes tests faster
    too).
    
    (cherry picked from commit 1627323a197bba2c4fbd71816a9a6bd3f78c1657)
---
 airflow/www/security.py    | 67 ++++++++++++++++++++++++++++++++++++++--------
 tests/www/test_security.py | 58 ++++++++++++++++++++++++++++++++++-----
 2 files changed, 108 insertions(+), 17 deletions(-)

diff --git a/airflow/www/security.py b/airflow/www/security.py
index 8a2b10b..a6db620 100644
--- a/airflow/www/security.py
+++ b/airflow/www/security.py
@@ -17,7 +17,8 @@
 # under the License.
 #
 
-from typing import Optional, Sequence, Set, Tuple
+import warnings
+from typing import Dict, Optional, Sequence, Set, Tuple
 
 from flask import current_app, g
 from flask_appbuilder.security.sqla import models as sqla_models
@@ -174,16 +175,34 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin):  # pylint: disable=
     def init_role(self, role_name, perms):
         """
         Initialize the role with the permissions and related view-menus.
-
         :param role_name:
         :param perms:
         :return:
         """
-        role = self.find_role(role_name)
-        if not role:
-            role = self.add_role(role_name)
+        warnings.warn(
+            "`init_role` has been deprecated. Please use `bulk_sync_roles` instead.",
+            DeprecationWarning,
+            stacklevel=2,
+        )
+        self.bulk_sync_roles([{'role': role_name, 'perms': perms}])
+
+    def bulk_sync_roles(self, roles):
+        """Sync the provided roles and permissions."""
+        existing_roles = self._get_all_roles_with_permissions()
+        pvs = self._get_all_non_dag_permissionviews()
+
+        for config in roles:
+            role_name = config['role']
+            perms = config['perms']
+            role = existing_roles.get(role_name) or self.add_role(role_name)
+
+            for perm_name, view_name in perms:
+                perm_view = pvs.get((perm_name, view_name)) or self.add_permission_view_menu(
+                    perm_name, view_name
+                )
 
-        self.add_permissions(role, set(perms))
+                if perm_view not in role.permissions:
+                    self.add_permission_role(role, perm_view)
 
     def add_permissions(self, role, perms):
         """Adds resource permissions to a given role."""
@@ -467,6 +486,34 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin):  # pylint: disable=
             .all()
         )
 
+    def _get_all_non_dag_permissionviews(self) -> Dict[Tuple[str, str], PermissionView]:
+        """
+        Returns a dict with a key of (perm name, view menu name) and value of perm view
+        with all perm views except those that are for specific DAGs.
+        """
+        return {
+            (perm_name, viewmodel_name): viewmodel
+            for perm_name, viewmodel_name, viewmodel in (
+                self.get_session.query(self.permissionview_model)
+                .join(self.permission_model)
+                .join(self.viewmenu_model)
+                .filter(~self.viewmenu_model.name.like(f"{permissions.RESOURCE_DAG_PREFIX}%"))
+                .with_entities(
+                    self.permission_model.name, self.viewmenu_model.name, self.permissionview_model
+                )
+                .all()
+            )
+        }
+
+    def _get_all_roles_with_permissions(self) -> Dict[str, Role]:
+        """Returns a dict with a key of role name and value of role with eagrly loaded permissions"""
+        return {
+            r.name: r
+            for r in (
+                self.get_session.query(self.role_model).options(joinedload(self.role_model.permissions)).all()
+            )
+        }
+
     def create_dag_specific_permissions(self) -> None:
         """
         Creates 'can_read' and 'can_edit' permissions for all active and paused DAGs.
@@ -526,11 +573,9 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin):  # pylint: disable=
         self.create_perm_vm_for_all_dag()
         self.create_dag_specific_permissions()
 
-        # Create default user role.
-        for config in self.ROLE_CONFIGS:
-            role = config['role']
-            perms = config['perms']
-            self.init_role(role, perms)
+        # Sync the default roles (Admin, Viewer, User, Op, public) with related permissions
+        self.bulk_sync_roles(self.ROLE_CONFIGS)
+
         self.add_homepage_access_to_custom_roles()
         # init existing roles, the rest role could be created through UI.
         self.update_admin_perm_view()
diff --git a/tests/www/test_security.py b/tests/www/test_security.py
index 2ae2608..46a1d6b 100644
--- a/tests/www/test_security.py
+++ b/tests/www/test_security.py
@@ -103,7 +103,7 @@ class TestSecurity(unittest.TestCase):
             fab_utils.delete_role(cls.app, role_name)
 
     def expect_user_is_in_role(self, user, rolename):
-        self.security_manager.init_role(rolename, [])
+        self.security_manager.bulk_sync_roles([{'role': rolename, 'perms': []}])
         role = self.security_manager.find_role(rolename)
         if not role:
             self.security_manager.add_role(rolename)
@@ -141,14 +141,28 @@ class TestSecurity(unittest.TestCase):
         log.debug("Complete teardown!")
 
     def test_init_role_baseview(self):
+        role_name = 'MyRole7'
+        role_perms = [('can_some_other_action', 'AnotherBaseView')]
+        with pytest.warns(
+            DeprecationWarning,
+            match="`init_role` has been deprecated\\. Please use `bulk_sync_roles` instead\\.",
+        ):
+            self.security_manager.init_role(role_name, role_perms)
+
+        role = self.appbuilder.sm.find_role(role_name)
+        assert role is not None
+        assert len(role_perms) == len(role.permissions)
+
+    def test_bulk_sync_roles_baseview(self):
         role_name = 'MyRole3'
         role_perms = [('can_some_action', 'SomeBaseView')]
-        self.security_manager.init_role(role_name, perms=role_perms)
+        self.security_manager.bulk_sync_roles([{'role': role_name, 'perms': role_perms}])
+
         role = self.appbuilder.sm.find_role(role_name)
         assert role is not None
         assert len(role_perms) == len(role.permissions)
 
-    def test_init_role_modelview(self):
+    def test_bulk_sync_roles_modelview(self):
         role_name = 'MyRole2'
         role_perms = [
             ('can_list', 'SomeModelView'),
@@ -157,24 +171,33 @@ class TestSecurity(unittest.TestCase):
             (permissions.ACTION_CAN_EDIT, 'SomeModelView'),
             (permissions.ACTION_CAN_DELETE, 'SomeModelView'),
         ]
-        self.security_manager.init_role(role_name, role_perms)
+        mock_roles = [{'role': role_name, 'perms': role_perms}]
+        self.security_manager.bulk_sync_roles(mock_roles)
+
         role = self.appbuilder.sm.find_role(role_name)
         assert role is not None
         assert len(role_perms) == len(role.permissions)
 
+        # Check short circuit works
+        with assert_queries_count(2):  # One for permissionview, one for roles
+            self.security_manager.bulk_sync_roles(mock_roles)
+
     def test_update_and_verify_permission_role(self):
         role_name = 'Test_Role'
-        self.security_manager.init_role(role_name, [])
+        role_perms = []
+        mock_roles = [{'role': role_name, 'perms': role_perms}]
+        self.security_manager.bulk_sync_roles(mock_roles)
         role = self.security_manager.find_role(role_name)
 
         perm = self.security_manager.find_permission_view_menu(permissions.ACTION_CAN_EDIT, 'RoleModelView')
         self.security_manager.add_permission_role(role, perm)
         role_perms_len = len(role.permissions)
 
-        self.security_manager.init_role(role_name, [])
+        self.security_manager.bulk_sync_roles(mock_roles)
         new_role_perms_len = len(role.permissions)
 
         assert role_perms_len == new_role_perms_len
+        assert new_role_perms_len == 1
 
     def test_verify_public_role_has_no_permissions(self):
         public = self.appbuilder.sm.find_role("Public")
@@ -574,3 +597,26 @@ class TestSecurity(unittest.TestCase):
             assert len(perm) == 2
 
         assert ('can_read', 'Connections') in perms
+
+    def test_get_all_non_dag_permissionviews(self):
+        with assert_queries_count(1):
+            pvs = self.security_manager._get_all_non_dag_permissionviews()
+
+        assert isinstance(pvs, dict)
+        for (perm_name, viewmodel_name), perm_view in pvs.items():
+            assert isinstance(perm_name, str)
+            assert isinstance(viewmodel_name, str)
+            assert isinstance(perm_view, self.security_manager.permissionview_model)
+
+        assert ('can_read', 'Connections') in pvs
+
+    def test_get_all_roles_with_permissions(self):
+        with assert_queries_count(1):
+            roles = self.security_manager._get_all_roles_with_permissions()
+
+        assert isinstance(roles, dict)
+        for role_name, role in roles.items():
+            assert isinstance(role_name, str)
+            assert isinstance(role, self.security_manager.role_model)
+
+        assert 'Admin' in roles