You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2020/12/02 20:58:15 UTC
[airflow] branch master updated: Refactor and speed up "DAG:"
prefix permissions migration (#12720)
This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 386f6b2 Refactor and speed up "DAG:" prefix permissions migration (#12720)
386f6b2 is described below
commit 386f6b2ecb86677d2b59c2192d14bac11aa0a23d
Author: James Timmins <ja...@astronomer.io>
AuthorDate: Wed Dec 2 12:57:00 2020 -0800
Refactor and speed up "DAG:" prefix permissions migration (#12720)
---
.../849da589634d_prefix_dag_permissions.py | 205 +++++++++++++--------
1 file changed, 131 insertions(+), 74 deletions(-)
diff --git a/airflow/migrations/versions/849da589634d_prefix_dag_permissions.py b/airflow/migrations/versions/849da589634d_prefix_dag_permissions.py
index 86edf44..556c1a5 100644
--- a/airflow/migrations/versions/849da589634d_prefix_dag_permissions.py
+++ b/airflow/migrations/versions/849da589634d_prefix_dag_permissions.py
@@ -24,8 +24,11 @@ Create Date: 2020-10-01 17:25:10.006322
"""
+from flask_appbuilder import SQLA
+from flask_appbuilder.security.sqla.models import Permission, PermissionView, ViewMenu
+
+from airflow import settings
from airflow.security import permissions
-from airflow.www.app import cached_app
# revision identifiers, used by Alembic.
revision = '849da589634d'
@@ -34,80 +37,134 @@ branch_labels = None
depends_on = None
+def prefix_individual_dag_permissions(session): # noqa: D103
+ dag_perms = ['can_dag_read', 'can_dag_edit']
+ prefix = "DAG:"
+ permission_view_menus = (
+ session.query(PermissionView)
+ .join(Permission)
+ .filter(Permission.name.in_(dag_perms))
+ .join(ViewMenu)
+ .filter(ViewMenu.name != 'all_dags')
+ .filter(ViewMenu.name.notlike(prefix + '%'))
+ .all()
+ )
+ view_menu_ids = {pvm.view_menu.id for pvm in permission_view_menus}
+ vm_query = session.query(ViewMenu).filter(ViewMenu.id.in_(view_menu_ids))
+ vm_query.update({ViewMenu.name: prefix + ViewMenu.name}, synchronize_session=False)
+ session.commit()
+
+
+def get_or_create_dag_resource(session): # noqa: D103
+ dag_resource = get_resource_query(session, permissions.RESOURCE_DAG).first()
+ if dag_resource:
+ return dag_resource
+
+ dag_resource = ViewMenu()
+ dag_resource.name = permissions.RESOURCE_DAG
+ session.add(dag_resource)
+ session.commit()
+
+ return dag_resource
+
+
+def get_or_create_action(session, action_name): # noqa: D103
+ action = get_action_query(session, action_name).first()
+ if action:
+ return action
+
+ action = Permission()
+ action.name = action_name
+ session.add(action)
+ session.commit()
+
+ return action
+
+
+def get_resource_query(session, resource_name): # noqa: D103
+ return session.query(ViewMenu).filter(ViewMenu.name == resource_name)
+
+
+def get_action_query(session, action_name): # noqa: D103
+ return session.query(Permission).filter(Permission.name == action_name)
+
+
+def get_pv_with_action_query(session, action): # noqa: D103
+ return session.query(PermissionView).filter(PermissionView.permission == action)
+
+
+def get_pv_with_resource_query(session, resource): # noqa: D103
+ return session.query(PermissionView).filter(PermissionView.view_menu_id == resource.id)
+
+
+def update_pv_action(session, pv_query, action): # noqa: D103
+ pv_query.update({PermissionView.permission_id: action.id}, synchronize_session=False)
+ session.commit()
+
+
+def get_pv(session, resource, action): # noqa: D103
+ return (
+ session.query(PermissionView)
+ .filter(PermissionView.view_menu == resource)
+ .filter(PermissionView.permission == action)
+ .first()
+ )
+
+
+def update_pv_resource(session, pv_query, resource): # noqa: D103
+ for pv in pv_query.all(): # noqa: D103
+ if not get_pv(session, resource, pv.permission): # noqa: D103
+ pv.view_menu = resource
+ else:
+ session.delete(pv)
+
+ session.commit()
+
+
+def migrate_to_new_dag_permissions(db): # noqa: D103
+ # Prefix individual dag perms with `DAG:`
+ prefix_individual_dag_permissions(db.session)
+
+ # Update existing PVs to use `can_read` instead of `can_dag_read`
+ can_dag_read_action = get_action_query(db.session, 'can_dag_read').first()
+ old_can_dag_read_pvs = get_pv_with_action_query(db.session, can_dag_read_action)
+ can_read_action = get_or_create_action(db.session, 'can_read')
+ update_pv_action(db.session, old_can_dag_read_pvs, can_read_action)
+
+ # Update existing PVs to use `can_edit` instead of `can_dag_edit`
+ can_dag_edit_action = get_action_query(db.session, 'can_dag_edit').first()
+ old_can_dag_edit_pvs = get_pv_with_action_query(db.session, can_dag_edit_action)
+ can_edit_action = get_or_create_action(db.session, 'can_edit')
+ update_pv_action(db.session, old_can_dag_edit_pvs, can_edit_action)
+
+ # Update existing PVs for `all_dags` resource to use `DAGs` resource.
+ all_dags_resource = get_resource_query(db.session, 'all_dags').first()
+ if all_dags_resource:
+ old_all_dags_pv = get_pv_with_resource_query(db.session, all_dags_resource)
+ dag_resource = get_or_create_dag_resource(db.session)
+ update_pv_resource(db.session, old_all_dags_pv, dag_resource)
+
+ # Delete the `all_dags` resource
+ db.session.delete(all_dags_resource)
+
+ # Delete `can_dag_read` action
+ if can_dag_read_action:
+ db.session.delete(can_dag_read_action)
+
+ # Delete `can_dag_edit` action
+ if can_dag_edit_action:
+ db.session.delete(can_dag_edit_action)
+
+ db.session.commit()
+
+
def upgrade(): # noqa: D103
- permissions = ['can_dag_read', 'can_dag_edit']
- view_menus = cached_app().appbuilder.sm.get_all_view_menu()
- convert_permissions(permissions, view_menus, upgrade_action, upgrade_dag_id)
+ db = SQLA()
+ db.session = settings.Session
+ migrate_to_new_dag_permissions(db)
+ db.session.commit()
+ db.session.close()
def downgrade(): # noqa: D103
- permissions = ['can_read', 'can_edit']
- vms = cached_app().appbuilder.sm.get_all_view_menu()
- view_menus = [vm for vm in vms if (vm.name == permissions.RESOURCE_DAG or vm.name.startswith('DAG:'))]
- convert_permissions(permissions, view_menus, downgrade_action, downgrade_dag_id)
-
-
-def upgrade_dag_id(dag_id):
- """Adds the 'DAG:' prefix to a DAG view if appropriate."""
- if dag_id == 'all_dags':
- return permissions.RESOURCE_DAG
- if dag_id.startswith("DAG:"):
- return dag_id
- return f"DAG:{dag_id}"
-
-
-def downgrade_dag_id(dag_id):
- """Removes the 'DAG:' prefix from a DAG view name to return the DAG id."""
- if dag_id == permissions.RESOURCE_DAG:
- return 'all_dags'
- if dag_id.startswith("DAG:"):
- return dag_id[len("DAG:") :]
- return dag_id
-
-
-def upgrade_action(action):
- """Converts the a DAG permission name from the old style to the new style."""
- if action == 'can_dag_read':
- return 'can_read'
- return 'can_edit'
-
-
-def downgrade_action(action):
- """Converts the a DAG permission name from the old style to the new style."""
- if action == 'can_read':
- return 'can_dag_read'
- return 'can_dag_edit'
-
-
-def convert_permissions(permissions, view_menus, convert_action, convert_dag_id):
- """Creates new empty role in DB"""
- appbuilder = cached_app().appbuilder # pylint: disable=no-member
- roles = appbuilder.sm.get_all_roles()
- views_to_remove = set()
- for permission_name in permissions: # pylint: disable=too-many-nested-blocks
- for view_menu in view_menus:
- view_name = view_menu.name
- old_pvm = appbuilder.sm.find_permission_view_menu(permission_name, view_name)
- if not old_pvm:
- continue
-
- views_to_remove.add(view_name)
- new_permission_name = convert_action(permission_name)
- new_pvm = appbuilder.sm.add_permission_view_menu(new_permission_name, convert_dag_id(view_name))
- for role in roles:
- if appbuilder.sm.exist_permission_on_roles(view_name, permission_name, [role.id]):
- appbuilder.sm.add_permission_role(role, new_pvm)
- appbuilder.sm.del_permission_role(role, old_pvm)
- print(f"DELETING: {role.name} ----> {view_name}.{permission_name}")
- appbuilder.sm.del_permission_view_menu(permission_name, view_name)
- print(f"DELETING: perm_view ----> {view_name}.{permission_name}")
- for view_name in views_to_remove:
- if appbuilder.sm.find_view_menu(view_name):
- appbuilder.sm.del_view_menu(view_name)
- print(f"DELETING: view_menu ----> {view_name}")
-
- if 'can_dag_read' in permissions:
- for permission_name in permissions:
- if appbuilder.sm.find_permission(permission_name):
- appbuilder.sm.del_permission(permission_name)
- print(f"DELETING: permission ----> {permission_name}")
+ pass