You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2020/12/02 20:58:15 UTC

[airflow] branch master updated: Refactor and speed up "DAG:" prefix permissions migration (#12720)

This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 386f6b2  Refactor and speed up "DAG:" prefix permissions migration (#12720)
386f6b2 is described below

commit 386f6b2ecb86677d2b59c2192d14bac11aa0a23d
Author: James Timmins <ja...@astronomer.io>
AuthorDate: Wed Dec 2 12:57:00 2020 -0800

    Refactor and speed up "DAG:" prefix permissions migration (#12720)
---
 .../849da589634d_prefix_dag_permissions.py         | 205 +++++++++++++--------
 1 file changed, 131 insertions(+), 74 deletions(-)

diff --git a/airflow/migrations/versions/849da589634d_prefix_dag_permissions.py b/airflow/migrations/versions/849da589634d_prefix_dag_permissions.py
index 86edf44..556c1a5 100644
--- a/airflow/migrations/versions/849da589634d_prefix_dag_permissions.py
+++ b/airflow/migrations/versions/849da589634d_prefix_dag_permissions.py
@@ -24,8 +24,11 @@ Create Date: 2020-10-01 17:25:10.006322
 
 """
 
+from flask_appbuilder import SQLA
+from flask_appbuilder.security.sqla.models import Permission, PermissionView, ViewMenu
+
+from airflow import settings
 from airflow.security import permissions
-from airflow.www.app import cached_app
 
 # revision identifiers, used by Alembic.
 revision = '849da589634d'
@@ -34,80 +37,134 @@ branch_labels = None
 depends_on = None
 
 
+def prefix_individual_dag_permissions(session):  # noqa: D103
+    dag_perms = ['can_dag_read', 'can_dag_edit']
+    prefix = "DAG:"
+    permission_view_menus = (
+        session.query(PermissionView)
+        .join(Permission)
+        .filter(Permission.name.in_(dag_perms))
+        .join(ViewMenu)
+        .filter(ViewMenu.name != 'all_dags')
+        .filter(ViewMenu.name.notlike(prefix + '%'))
+        .all()
+    )
+    view_menu_ids = {pvm.view_menu.id for pvm in permission_view_menus}
+    vm_query = session.query(ViewMenu).filter(ViewMenu.id.in_(view_menu_ids))
+    vm_query.update({ViewMenu.name: prefix + ViewMenu.name}, synchronize_session=False)
+    session.commit()
+
+
+def get_or_create_dag_resource(session):  # noqa: D103
+    dag_resource = get_resource_query(session, permissions.RESOURCE_DAG).first()
+    if dag_resource:
+        return dag_resource
+
+    dag_resource = ViewMenu()
+    dag_resource.name = permissions.RESOURCE_DAG
+    session.add(dag_resource)
+    session.commit()
+
+    return dag_resource
+
+
+def get_or_create_action(session, action_name):  # noqa: D103
+    action = get_action_query(session, action_name).first()
+    if action:
+        return action
+
+    action = Permission()
+    action.name = action_name
+    session.add(action)
+    session.commit()
+
+    return action
+
+
+def get_resource_query(session, resource_name):  # noqa: D103
+    return session.query(ViewMenu).filter(ViewMenu.name == resource_name)
+
+
+def get_action_query(session, action_name):  # noqa: D103
+    return session.query(Permission).filter(Permission.name == action_name)
+
+
+def get_pv_with_action_query(session, action):  # noqa: D103
+    return session.query(PermissionView).filter(PermissionView.permission == action)
+
+
+def get_pv_with_resource_query(session, resource):  # noqa: D103
+    return session.query(PermissionView).filter(PermissionView.view_menu_id == resource.id)
+
+
+def update_pv_action(session, pv_query, action):  # noqa: D103
+    pv_query.update({PermissionView.permission_id: action.id}, synchronize_session=False)
+    session.commit()
+
+
+def get_pv(session, resource, action):  # noqa: D103
+    return (
+        session.query(PermissionView)
+        .filter(PermissionView.view_menu == resource)
+        .filter(PermissionView.permission == action)
+        .first()
+    )
+
+
+def update_pv_resource(session, pv_query, resource):  # noqa: D103
+    for pv in pv_query.all():  # noqa: D103
+        if not get_pv(session, resource, pv.permission):  # noqa: D103
+            pv.view_menu = resource
+        else:
+            session.delete(pv)
+
+    session.commit()
+
+
+def migrate_to_new_dag_permissions(db):  # noqa: D103
+    # Prefix individual dag perms with `DAG:`
+    prefix_individual_dag_permissions(db.session)
+
+    # Update existing PVs to use `can_read` instead of `can_dag_read`
+    can_dag_read_action = get_action_query(db.session, 'can_dag_read').first()
+    old_can_dag_read_pvs = get_pv_with_action_query(db.session, can_dag_read_action)
+    can_read_action = get_or_create_action(db.session, 'can_read')
+    update_pv_action(db.session, old_can_dag_read_pvs, can_read_action)
+
+    # Update existing PVs to use `can_edit` instead of `can_dag_edit`
+    can_dag_edit_action = get_action_query(db.session, 'can_dag_edit').first()
+    old_can_dag_edit_pvs = get_pv_with_action_query(db.session, can_dag_edit_action)
+    can_edit_action = get_or_create_action(db.session, 'can_edit')
+    update_pv_action(db.session, old_can_dag_edit_pvs, can_edit_action)
+
+    # Update existing PVs for `all_dags` resource to use `DAGs` resource.
+    all_dags_resource = get_resource_query(db.session, 'all_dags').first()
+    if all_dags_resource:
+        old_all_dags_pv = get_pv_with_resource_query(db.session, all_dags_resource)
+        dag_resource = get_or_create_dag_resource(db.session)
+        update_pv_resource(db.session, old_all_dags_pv, dag_resource)
+
+        # Delete the `all_dags` resource
+        db.session.delete(all_dags_resource)
+
+    # Delete `can_dag_read` action
+    if can_dag_read_action:
+        db.session.delete(can_dag_read_action)
+
+    # Delete `can_dag_edit` action
+    if can_dag_edit_action:
+        db.session.delete(can_dag_edit_action)
+
+    db.session.commit()
+
+
 def upgrade():  # noqa: D103
-    permissions = ['can_dag_read', 'can_dag_edit']
-    view_menus = cached_app().appbuilder.sm.get_all_view_menu()
-    convert_permissions(permissions, view_menus, upgrade_action, upgrade_dag_id)
+    db = SQLA()
+    db.session = settings.Session
+    migrate_to_new_dag_permissions(db)
+    db.session.commit()
+    db.session.close()
 
 
 def downgrade():  # noqa: D103
-    permissions = ['can_read', 'can_edit']
-    vms = cached_app().appbuilder.sm.get_all_view_menu()
-    view_menus = [vm for vm in vms if (vm.name == permissions.RESOURCE_DAG or vm.name.startswith('DAG:'))]
-    convert_permissions(permissions, view_menus, downgrade_action, downgrade_dag_id)
-
-
-def upgrade_dag_id(dag_id):
-    """Adds the 'DAG:' prefix to a DAG view if appropriate."""
-    if dag_id == 'all_dags':
-        return permissions.RESOURCE_DAG
-    if dag_id.startswith("DAG:"):
-        return dag_id
-    return f"DAG:{dag_id}"
-
-
-def downgrade_dag_id(dag_id):
-    """Removes the 'DAG:' prefix from a DAG view name to return the DAG id."""
-    if dag_id == permissions.RESOURCE_DAG:
-        return 'all_dags'
-    if dag_id.startswith("DAG:"):
-        return dag_id[len("DAG:") :]
-    return dag_id
-
-
-def upgrade_action(action):
-    """Converts the a DAG permission name from the old style to the new style."""
-    if action == 'can_dag_read':
-        return 'can_read'
-    return 'can_edit'
-
-
-def downgrade_action(action):
-    """Converts the a DAG permission name from the old style to the new style."""
-    if action == 'can_read':
-        return 'can_dag_read'
-    return 'can_dag_edit'
-
-
-def convert_permissions(permissions, view_menus, convert_action, convert_dag_id):
-    """Creates new empty role in DB"""
-    appbuilder = cached_app().appbuilder  # pylint: disable=no-member
-    roles = appbuilder.sm.get_all_roles()
-    views_to_remove = set()
-    for permission_name in permissions:  # pylint: disable=too-many-nested-blocks
-        for view_menu in view_menus:
-            view_name = view_menu.name
-            old_pvm = appbuilder.sm.find_permission_view_menu(permission_name, view_name)
-            if not old_pvm:
-                continue
-
-            views_to_remove.add(view_name)
-            new_permission_name = convert_action(permission_name)
-            new_pvm = appbuilder.sm.add_permission_view_menu(new_permission_name, convert_dag_id(view_name))
-            for role in roles:
-                if appbuilder.sm.exist_permission_on_roles(view_name, permission_name, [role.id]):
-                    appbuilder.sm.add_permission_role(role, new_pvm)
-                    appbuilder.sm.del_permission_role(role, old_pvm)
-                    print(f"DELETING: {role.name}  ---->   {view_name}.{permission_name}")
-            appbuilder.sm.del_permission_view_menu(permission_name, view_name)
-            print(f"DELETING: perm_view  ---->   {view_name}.{permission_name}")
-    for view_name in views_to_remove:
-        if appbuilder.sm.find_view_menu(view_name):
-            appbuilder.sm.del_view_menu(view_name)
-            print(f"DELETING: view_menu  ---->   {view_name}")
-
-    if 'can_dag_read' in permissions:
-        for permission_name in permissions:
-            if appbuilder.sm.find_permission(permission_name):
-                appbuilder.sm.del_permission(permission_name)
-                print(f"DELETING: permission  ---->   {permission_name}")
+    pass