You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "vincbeck (via GitHub)" <gi...@apache.org> on 2023/09/15 16:59:43 UTC

[GitHub] [airflow] vincbeck opened a new pull request, #34317: Use auth manager `is_authorized_` APIs to check user permissions in Rest API

vincbeck opened a new pull request, #34317:
URL: https://github.com/apache/airflow/pull/34317

   As part of #32205.
   
   This PR sits on top of #33213, please review only the last commit.
   
   In #33213, `is_authorized_` APIs are introduced. This PR uses these new APIs across Airflow code. I mostly focused on the rest API but I also did it in some other places.
   
   <!--
    Licensed to the Apache Software Foundation (ASF) under one
    or more contributor license agreements.  See the NOTICE file
    distributed with this work for additional information
    regarding copyright ownership.  The ASF licenses this file
    to you under the Apache License, Version 2.0 (the
    "License"); you may not use this file except in compliance
    with the License.  You may obtain a copy of the License at
   
      http://www.apache.org/licenses/LICENSE-2.0
   
    Unless required by applicable law or agreed to in writing,
    software distributed under the License is distributed on an
    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    KIND, either express or implied.  See the License for the
    specific language governing permissions and limitations
    under the License.
    -->
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of an existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   
   
   <!-- Please keep an empty line above the dashes. -->
   ---
   **^ Add meaningful description above**
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a newsfragment file, named `{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1355974367


##########
airflow/api_connexion/security.py:
##########
@@ -48,10 +61,194 @@ def requires_access_decorator(func: T):
         @wraps(func)
         def decorated(*args, **kwargs):
             check_authentication()
-            if appbuilder.sm.check_authorization(permissions, kwargs.get("dag_id")):
+            if appbuilder.sm.check_authorization(permissions):

Review Comment:
   1. Should we raise deprecation warning if the decorator is used by someone?
   
   2. Should we raise exception here if auth_manager is not FAB in case we run a function decorated with it? 
   
   This would give people indication that if they want to use their own plugins, they should switch to the other methods. 
   
   We could also make a copy of that decorator in FAB auth manager and make sure that all the remaining Airflow embeded APIs (user/roles) do not raise deprecation warnings. Those are just a few lines to copy but this way only user's code would generate deprecation warnings.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1358254738


##########
airflow/www/security_manager.py:
##########
@@ -738,24 +633,9 @@ def create_perm_vm_for_all_dag(self) -> None:
     def check_authorization(
         self,
         perms: Sequence[tuple[str, str]] | None = None,
-        dag_id: str | None = None,
     ) -> bool:
         """Checks that the logged in user has the specified permissions."""
         if not perms:
             return True
 
-        for perm in perms:
-            if perm in (
-                (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
-                (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
-                (permissions.ACTION_CAN_DELETE, permissions.RESOURCE_DAG),
-            ):
-                can_access_all_dags = self.has_access(*perm)
-                if not can_access_all_dags:
-                    action = perm[0]
-                    if not self.can_access_some_dags(action, dag_id):
-                        return False
-            elif not self.has_access(*perm):
-                return False
-
-        return True
+        return all(self.has_access(*perm) for perm in perms)

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1359507817


##########
airflow/auth/managers/fab/fab_auth_manager.py:
##########
@@ -65,22 +79,24 @@
         CLICommand,
     )
 
-_MAP_METHOD_NAME_TO_FAB_ACTION_NAME: dict[ResourceMethod, str] = {
+MAP_METHOD_NAME_TO_FAB_ACTION_NAME: dict[ResourceMethod, str] = {
     "POST": ACTION_CAN_CREATE,
     "GET": ACTION_CAN_READ,
     "PUT": ACTION_CAN_EDIT,
     "DELETE": ACTION_CAN_DELETE,
 }
 
-_MAP_DAG_ACCESS_ENTITY_TO_FAB_RESOURCE_TYPE = {
-    DagAccessEntity.AUDIT_LOG: RESOURCE_AUDIT_LOG,
-    DagAccessEntity.CODE: RESOURCE_DAG_CODE,
-    DagAccessEntity.DATASET: RESOURCE_DATASET,
-    DagAccessEntity.DEPENDENCIES: RESOURCE_DAG_DEPENDENCIES,
-    DagAccessEntity.RUN: RESOURCE_DAG_RUN,
-    DagAccessEntity.TASK_INSTANCE: RESOURCE_TASK_INSTANCE,
-    DagAccessEntity.TASK_LOGS: RESOURCE_TASK_LOG,
-    DagAccessEntity.XCOM: RESOURCE_XCOM,
+_MAP_DAG_ACCESS_ENTITY_TO_FAB_RESOURCE_TYPE: dict[DagAccessEntity, tuple[str, ...]] = {
+    DagAccessEntity.AUDIT_LOG: (RESOURCE_AUDIT_LOG,),
+    DagAccessEntity.CODE: (RESOURCE_DAG_CODE,),
+    DagAccessEntity.DEPENDENCIES: (RESOURCE_DAG_DEPENDENCIES,),
+    DagAccessEntity.IMPORT_ERRORS: (RESOURCE_IMPORT_ERROR,),
+    DagAccessEntity.RUN: (RESOURCE_DAG_RUN,),
+    DagAccessEntity.TASK: (RESOURCE_TASK_INSTANCE,),

Review Comment:
   Maybe we should leave a coomment here  - that this is backwards compatibility due to originally wrong/misleading  use of `TASK_INSTANCE` in FAB permissions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck commented on PR #34317:
URL: https://github.com/apache/airflow/pull/34317#issuecomment-1749648918

   > I think this is OK (a lot to read, won’t be surprised if I missed anything). Some coding things…
   
   Thanks a lot for the review!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vincbeck commented on a diff in pull request #34317: Use auth manager `is_authorized_` APIs to check user permissions in Rest API

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1339177650


##########
airflow/auth/managers/fab/fab_auth_manager.py:
##########
@@ -209,7 +209,9 @@ def is_authorized_dag(
             resource_type = self._get_fab_resource_type(access_entity)
 
             if method == "GET":
-                return self._is_authorized(method=method, resource_type=resource_type, user=user)
+                return self._is_authorized_dag(
+                    method="GET", details=details, user=user
+                ) and self._is_authorized(method=method, resource_type=resource_type, user=user)

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1355923325


##########
airflow/api_connexion/endpoints/task_instance_endpoint.py:
##########
@@ -61,13 +61,8 @@
 T = TypeVar("T")
 
 
-@security.requires_access(
-    [
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
-    ],
-)
+@security.requires_access_dag("GET", DagAccessEntity.RUN)
+@security.requires_access_dag("GET", DagAccessEntity.TASK_INSTANCE)

Review Comment:
   BTW. This also reflects the change in the new approach of the permission model after AIP-56 implementation. We are no more "resource" driven. We are "action" driven. Auth manager should not be checking if you are access to specrific **resources**, it instead checks if you are allowed to execute specific **action**. 
   
   Rather than enumerating the resources necessary to execute the action, auth manager simply responds to the question "can this user execute this action?". For example:
   
   * Is the user able to "modify" this task_instance (and all that it implies)?
   * Is the user able to "read" this dag run (and all that it implies)? 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1355917769


##########
airflow/api_connexion/endpoints/task_instance_endpoint.py:
##########
@@ -61,13 +61,8 @@
 T = TypeVar("T")
 
 
-@security.requires_access(
-    [
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
-    ],
-)
+@security.requires_access_dag("GET", DagAccessEntity.RUN)
+@security.requires_access_dag("GET", DagAccessEntity.TASK_INSTANCE)

Review Comment:
   Looking at the comments I looked up below I have some doubts and different idea.
   
   I think the original permission for task endpoints were just wrong:
   
   ```python
   @security.requires_access(
       [
           (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
           (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
       ],
   )
   ```
   
   It really should be:
   
   ```python
   @security.requires_access(
       [
           (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
           (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK),
       ],
   )
   ```
   
   I know we do not have "RESOURCE_TASK" - but this should be really how it should be done inititially. Those endpoints do not access task instance nor dag_run access - they just add `task` definition. I think we wrongly had only one `task_instance` resource where we should have two different resources for those.
   
   IMHO  - we have the opportunity to fix it (and that will allow us to avoid adding tuple handling from my above proposal.
   
   Why don't we simplify it all by 
   
   * `DagAccessEntity.DAG`
   * `DagAccessEntity.TASK`
   * `DagAccessEntity.RUN`
   * `DagAccessEntity.TASK_INSTANCE` 
   
   We should only need to specify TASK_INSTANCE entity. In FAB implementation we could map:
   
   * DagEntity.TASK => resource.TASK_INSTANCE and 
   * DagEntity.TASK_INSTANCE -> resource.DAG_RUN + resource.TASK_INSTANCE
   
   The fact that sometimes we specified "read" for DAG_RUN and "write" to "TASK_INSTANCE" resources was I think a mistake. I can't imagine situation where we would like to limit the user from writing to DAG_RUN where the user should be able to update TASK_INSTANCE. So this would automatically fix the bad permissions we had originally for those endpoints.



##########
airflow/api_connexion/endpoints/task_instance_endpoint.py:
##########
@@ -61,13 +61,8 @@
 T = TypeVar("T")
 
 
-@security.requires_access(
-    [
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
-    ],
-)
+@security.requires_access_dag("GET", DagAccessEntity.RUN)
+@security.requires_access_dag("GET", DagAccessEntity.TASK_INSTANCE)

Review Comment:
   Looking at the comments/code I looked up below I have some doubts and different idea.
   
   I think the original permission for task endpoints were just wrong:
   
   ```python
   @security.requires_access(
       [
           (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
           (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
       ],
   )
   ```
   
   It really should be:
   
   ```python
   @security.requires_access(
       [
           (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
           (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK),
       ],
   )
   ```
   
   I know we do not have "RESOURCE_TASK" - but this should be really how it should be done inititially. Those endpoints do not access task instance nor dag_run access - they just add `task` definition. I think we wrongly had only one `task_instance` resource where we should have two different resources for those.
   
   IMHO  - we have the opportunity to fix it (and that will allow us to avoid adding tuple handling from my above proposal.
   
   Why don't we simplify it all by 
   
   * `DagAccessEntity.DAG`
   * `DagAccessEntity.TASK`
   * `DagAccessEntity.RUN`
   * `DagAccessEntity.TASK_INSTANCE` 
   
   We should only need to specify TASK_INSTANCE entity. In FAB implementation we could map:
   
   * DagEntity.TASK => resource.TASK_INSTANCE and 
   * DagEntity.TASK_INSTANCE -> resource.DAG_RUN + resource.TASK_INSTANCE
   
   The fact that sometimes we specified "read" for DAG_RUN and "write" to "TASK_INSTANCE" resources was I think a mistake. I can't imagine situation where we would like to limit the user from writing to DAG_RUN where the user should be able to update TASK_INSTANCE. So this would automatically fix the bad permissions we had originally for those endpoints.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1347982536


##########
airflow/www/security_manager.py:
##########
@@ -268,125 +271,53 @@ def get_user_roles(user=None):
             user = g.user
         return user.roles
 
-    def get_readable_dags(self, user) -> Iterable[DagModel]:
-        """Gets the DAGs readable by authenticated user."""
-        warnings.warn(
-            "`get_readable_dags` has been deprecated. Please use `get_readable_dag_ids` instead.",
-            RemovedInAirflow3Warning,
-            stacklevel=2,
-        )
-        with warnings.catch_warnings():
-            warnings.simplefilter("ignore", RemovedInAirflow3Warning)
-            return self.get_accessible_dags([permissions.ACTION_CAN_READ], user)
-
-    def get_editable_dags(self, user) -> Iterable[DagModel]:
-        """Gets the DAGs editable by authenticated user."""
-        warnings.warn(
-            "`get_editable_dags` has been deprecated. Please use `get_editable_dag_ids` instead.",
-            RemovedInAirflow3Warning,
-            stacklevel=2,
-        )
-        with warnings.catch_warnings():
-            warnings.simplefilter("ignore", RemovedInAirflow3Warning)
-            return self.get_accessible_dags([permissions.ACTION_CAN_EDIT], user)
-
-    @provide_session
-    def get_accessible_dags(
-        self,
-        user_actions: Container[str] | None,
-        user,
-        session: Session = NEW_SESSION,
-    ) -> Iterable[DagModel]:
-        warnings.warn(
-            "`get_accessible_dags` has been deprecated. Please use `get_accessible_dag_ids` instead.",
-            RemovedInAirflow3Warning,
-            stacklevel=3,
-        )
-        dag_ids = self.get_accessible_dag_ids(user, user_actions, session)
-        return session.scalars(select(DagModel).where(DagModel.dag_id.in_(dag_ids)))
-
-    def get_readable_dag_ids(self, user) -> set[str]:
+    def get_readable_dag_ids(self, user=None) -> set[str]:
         """Gets the DAG IDs readable by authenticated user."""
-        return self.get_accessible_dag_ids(user, [permissions.ACTION_CAN_READ])
+        return self.get_permitted_dag_ids(methods=["GET"], user=user)
 
-    def get_editable_dag_ids(self, user) -> set[str]:
+    def get_editable_dag_ids(self, user=None) -> set[str]:
         """Gets the DAG IDs editable by authenticated user."""
-        return self.get_accessible_dag_ids(user, [permissions.ACTION_CAN_EDIT])
+        return self.get_permitted_dag_ids(methods=["PUT"], user=user)
 
     @provide_session
-    def get_accessible_dag_ids(
+    def get_permitted_dag_ids(
         self,
-        user,
-        user_actions: Container[str] | None = None,
+        *,
+        methods: Container[ResourceMethod] | None = None,
+        user=None,
         session: Session = NEW_SESSION,
     ) -> set[str]:
         """Generic function to get readable or writable DAGs for user."""
-        if not user_actions:
-            user_actions = [permissions.ACTION_CAN_EDIT, permissions.ACTION_CAN_READ]
+        if not methods:
+            methods = ["PUT", "GET"]
 
-        if not get_auth_manager().is_logged_in():
-            roles = user.roles
-        else:
-            if (permissions.ACTION_CAN_EDIT in user_actions and self.can_edit_all_dags(user)) or (
-                permissions.ACTION_CAN_READ in user_actions and self.can_read_all_dags(user)
-            ):
-                return {dag.dag_id for dag in session.execute(select(DagModel.dag_id))}
-            user_query = session.scalar(
-                select(User)
-                .options(
-                    joinedload(User.roles)
-                    .subqueryload(Role.permissions)
-                    .options(joinedload(Permission.action), joinedload(Permission.resource))
+        dag_ids = {dag.dag_id for dag in session.execute(select(DagModel.dag_id))}
+
+        if ("GET" in methods and get_auth_manager().is_authorized_dag(method="GET", user=user)) or (
+            "PUT" in methods and get_auth_manager().is_authorized_dag(method="PUT", user=user)
+        ):
+            return dag_ids
+
+        return {
+            dag_id
+            for dag_id in dag_ids
+            if (
+                "GET" in methods
+                and get_auth_manager().is_authorized_dag(
+                    method="GET", details=DagDetails(id=dag_id), user=user
+                )
+            )
+            or (
+                "PUT" in methods
+                and get_auth_manager().is_authorized_dag(
+                    method="PUT", details=DagDetails(id=dag_id), user=user
                 )

Review Comment:
   Agree. I created a local function `_is_permitted_dag_id` which, I hope, makes it easier to read :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1355567508


##########
airflow/api_connexion/endpoints/task_endpoint.py:
##########
@@ -22,21 +22,16 @@
 from airflow.api_connexion import security
 from airflow.api_connexion.exceptions import BadRequest, NotFound
 from airflow.api_connexion.schemas.task_schema import TaskCollection, task_collection_schema, task_schema
+from airflow.auth.managers.models.resource_details import DagAccessEntity
 from airflow.exceptions import TaskNotFound
-from airflow.security import permissions
 from airflow.utils.airflow_flask_app import get_airflow_app
 
 if TYPE_CHECKING:
     from airflow import DAG
     from airflow.api_connexion.types import APIResponse
 
 
-@security.requires_access(
-    [
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
-    ],
-)
+@security.requires_access_dag("GET", DagAccessEntity.TASK_INSTANCE)

Review Comment:
   Ah I see! Yes I dont check for DAG run permissions for task instance endpoints now because I thought it was not necessary. But it seems it is necessary. Let me add that back



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck merged PR #34317:
URL: https://github.com/apache/airflow/pull/34317


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1357188653


##########
airflow/api_connexion/endpoints/task_instance_endpoint.py:
##########
@@ -461,13 +436,8 @@ def get_task_instances_batch(session: Session = NEW_SESSION) -> APIResponse:
     )
 
 
-@security.requires_access(
-    [
-        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
-        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
-    ],
-)
+@security.requires_access_dag("GET", DagAccessEntity.RUN)

Review Comment:
   I removed `@security.requires_access_dag("GET", DagAccessEntity.RUN)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1359506835


##########
airflow/api_connexion/security.py:
##########
@@ -48,10 +61,194 @@ def requires_access_decorator(func: T):
         @wraps(func)
         def decorated(*args, **kwargs):
             check_authentication()
-            if appbuilder.sm.check_authorization(permissions, kwargs.get("dag_id")):
+            if appbuilder.sm.check_authorization(permissions):
                 return func(*args, **kwargs)
             raise PermissionDenied()
 
         return cast(T, decorated)
 
     return requires_access_decorator
+
+
+def _requires_access(*, is_authorized_callback: Callable[[], bool], func: Callable, args, kwargs) -> bool:
+    """
+    Define the behavior whether the user is authorized to access the resource.
+
+    :param is_authorized_callback: callback to execute to figure whether the user is authorized to access
+        the resource
+    :param func: the function to call if the user is authorized
+    :param args: the arguments of ``func``
+    :param kwargs: the keyword arguments ``func``
+
+    :meta private:
+    """
+    check_authentication()
+    if is_authorized_callback():
+        return func(*args, **kwargs)
+    raise PermissionDenied()
+
+
+def requires_authentication(func: T):
+    """Decorator for functions that require authentication."""
+
+    @wraps(func)
+    def decorated(*args, **kwargs):
+        check_authentication()
+        return func(*args, **kwargs)
+
+    return cast(T, decorated)
+
+
+def requires_access_configuration(method: ResourceMethod) -> Callable[[T], T]:
+    def requires_access_decorator(func: T):
+        @wraps(func)
+        def decorated(*args, **kwargs):
+            section: str | None = kwargs.get("section")
+            return _requires_access(
+                is_authorized_callback=lambda: get_auth_manager().is_authorized_configuration(
+                    method=method, details=ConfigurationDetails(section=section)
+                ),
+                func=func,
+                args=args,
+                kwargs=kwargs,
+            )
+
+        return cast(T, decorated)
+
+    return requires_access_decorator
+
+
+def requires_access_connection(method: ResourceMethod) -> Callable[[T], T]:
+    def requires_access_decorator(func: T):
+        @wraps(func)
+        def decorated(*args, **kwargs):
+            connection_id: str | None = kwargs.get("connection_id")
+            return _requires_access(
+                is_authorized_callback=lambda: get_auth_manager().is_authorized_connection(
+                    method=method, details=ConnectionDetails(conn_id=connection_id)
+                ),
+                func=func,
+                args=args,
+                kwargs=kwargs,
+            )
+
+        return cast(T, decorated)
+
+    return requires_access_decorator
+
+
+def requires_access_dag(
+    method: ResourceMethod, access_entity: DagAccessEntity | None = None
+) -> Callable[[T], T]:
+    appbuilder = get_airflow_app().appbuilder
+
+    def _is_authorized_callback(dag_id: str):
+        def callback():
+            access = get_auth_manager().is_authorized_dag(
+                method=method,
+                access_entity=access_entity,
+                details=DagDetails(id=dag_id),
+            )
+
+            # ``access`` means here:
+            # - if a DAG id is provided (``dag_id`` not None): is the user authorized to access this DAG
+            # - if no DAG id is provided: is the user authorized to access all DAGs
+            if dag_id or access:
+                return access
+
+            # No DAG id is provided and the user is not authorized to access all DAGs
+            # If method is "GET", return whether the user has read access to any DAGs
+            # If method is "PUT", return whether the user has edit access to any DAGs
+            return (method == "GET" and any(appbuilder.sm.get_readable_dag_ids())) or (

Review Comment:
   Fine for me. But we should not use "appbuilder.sm" here -it should be "get_readable_dag_ids" method from "get_auth_manager".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1357150789


##########
airflow/api_connexion/security.py:
##########
@@ -48,10 +61,194 @@ def requires_access_decorator(func: T):
         @wraps(func)
         def decorated(*args, **kwargs):
             check_authentication()
-            if appbuilder.sm.check_authorization(permissions, kwargs.get("dag_id")):
+            if appbuilder.sm.check_authorization(permissions):
                 return func(*args, **kwargs)
             raise PermissionDenied()
 
         return cast(T, decorated)
 
     return requires_access_decorator
+
+
+def _requires_access(*, is_authorized_callback: Callable[[], bool], func: Callable, args, kwargs) -> bool:
+    """
+    Define the behavior whether the user is authorized to access the resource.
+
+    :param is_authorized_callback: callback to execute to figure whether the user is authorized to access
+        the resource
+    :param func: the function to call if the user is authorized
+    :param args: the arguments of ``func``
+    :param kwargs: the keyword arguments ``func``
+
+    :meta private:
+    """
+    check_authentication()
+    if is_authorized_callback():
+        return func(*args, **kwargs)
+    raise PermissionDenied()
+
+
+def requires_authentication(func: T):
+    """Decorator for functions that require authentication."""
+
+    @wraps(func)
+    def decorated(*args, **kwargs):
+        check_authentication()
+        return func(*args, **kwargs)
+
+    return cast(T, decorated)
+
+
+def requires_access_configuration(method: ResourceMethod) -> Callable[[T], T]:
+    def requires_access_decorator(func: T):
+        @wraps(func)
+        def decorated(*args, **kwargs):
+            section: str | None = kwargs.get("section")
+            return _requires_access(
+                is_authorized_callback=lambda: get_auth_manager().is_authorized_configuration(
+                    method=method, details=ConfigurationDetails(section=section)
+                ),
+                func=func,
+                args=args,
+                kwargs=kwargs,
+            )
+
+        return cast(T, decorated)
+
+    return requires_access_decorator
+
+
+def requires_access_connection(method: ResourceMethod) -> Callable[[T], T]:
+    def requires_access_decorator(func: T):
+        @wraps(func)
+        def decorated(*args, **kwargs):
+            connection_id: str | None = kwargs.get("connection_id")
+            return _requires_access(
+                is_authorized_callback=lambda: get_auth_manager().is_authorized_connection(
+                    method=method, details=ConnectionDetails(conn_id=connection_id)
+                ),
+                func=func,
+                args=args,
+                kwargs=kwargs,
+            )
+
+        return cast(T, decorated)
+
+    return requires_access_decorator
+
+
+def requires_access_dag(
+    method: ResourceMethod, access_entity: DagAccessEntity | None = None
+) -> Callable[[T], T]:
+    appbuilder = get_airflow_app().appbuilder
+
+    def _is_authorized_callback(dag_id: str):
+        def callback():
+            access = get_auth_manager().is_authorized_dag(
+                method=method,
+                access_entity=access_entity,
+                details=DagDetails(id=dag_id),
+            )
+
+            # ``access`` means here:
+            # - if a DAG id is provided (``dag_id`` not None): is the user authorized to access this DAG
+            # - if no DAG id is provided: is the user authorized to access all DAGs
+            if dag_id or access:
+                return access
+
+            # No DAG id is provided and the user is not authorized to access all DAGs
+            # If method is "GET", return whether the user has read access to any DAGs
+            # If method is "PUT", return whether the user has edit access to any DAGs
+            return (method == "GET" and any(appbuilder.sm.get_readable_dag_ids())) or (

Review Comment:
   I dont think it should be in the auth manager. This is a behavior common to all auth managers. When a function is decorated with `@requires_access_dag("GET")` with no `dag_id` provided, it means "does the user has access to read at least one DAG". For instance, the index page (where DAGs are listed) is decorated that way.
   
   So saying that, in case of `GET` (works similar with `PUT`), we want to check whether the user has access to at least one DAG is agnostic to auth managers, am I wrong? The auth manager plays a role to return the list of DAG accessible by the user but the overall logic "does the user has access to read at least one DAG" is generic to Airflow to me.
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1355917769


##########
airflow/api_connexion/endpoints/task_instance_endpoint.py:
##########
@@ -61,13 +61,8 @@
 T = TypeVar("T")
 
 
-@security.requires_access(
-    [
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
-    ],
-)
+@security.requires_access_dag("GET", DagAccessEntity.RUN)
+@security.requires_access_dag("GET", DagAccessEntity.TASK_INSTANCE)

Review Comment:
   Looking at the comments/code I looked at below I have some doubts and different idea.
   
   I think the original permissions for task endpoints were just wrong:
   
   ```python
   @security.requires_access(
       [
           (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
           (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
       ],
   )
   ```
   
   It really should be:
   
   ```python
   @security.requires_access(
       [
           (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
           (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK),
       ],
   )
   ```
   
   I know we do not have "RESOURCE_TASK" - but this should be really how it should be done inititially. Those endpoints do not access task instance nor dag_run access - they just add `task` definition. I think we wrongly had only one `TASK_INSTANCE` resource where we should have two different resources for those.
   
   IMHO  - we have the opportunity to fix it (and that will allow us to avoid adding tuple handling from my above proposal.
   
   Why don't we simplify it all by 
   
   * `DagAccessEntity.DAG`
   * `DagAccessEntity.TASK`
   * `DagAccessEntity.RUN`
   * `DagAccessEntity.TASK_INSTANCE` 
   
   We should only need to specify TASK_INSTANCE entity where we previously had `resource.DAG_RUN` + `resource.TASK_INSTANCE`.
   
    In FAB implementation we could map:
   
   * DagEntity.TASK => resource.TASK_INSTANCE and 
   * DagEntity.TASK_INSTANCE -> resource.DAG_RUN + resource.TASK_INSTANCE
   
   The fact that sometimes we specified "read" for DAG_RUN and "write" to "TASK_INSTANCE" resources was I think a mistake. I can't imagine situation where we would like to limit the user from writing to DAG_RUN where the user should be able to update TASK_INSTANCE. So this would automatically fix the bad permissions we had originally for those endpoints.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1355917769


##########
airflow/api_connexion/endpoints/task_instance_endpoint.py:
##########
@@ -61,13 +61,8 @@
 T = TypeVar("T")
 
 
-@security.requires_access(
-    [
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
-    ],
-)
+@security.requires_access_dag("GET", DagAccessEntity.RUN)
+@security.requires_access_dag("GET", DagAccessEntity.TASK_INSTANCE)

Review Comment:
   Looking at the comments/code I looked at below I have some doubts and different idea.
   
   I think the original permissions for task endpoints were just wrong:
   
   ```python
   @security.requires_access(
       [
           (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
           (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
       ],
   )
   ```
   
   It really should be:
   
   ```python
   @security.requires_access(
       [
           (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
           (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK),
       ],
   )
   ```
   
   I know we do not have "RESOURCE_TASK" - but this should be really how it should be done inititially. Those endpoints do not access task instance nor dag_run access - they just add `task` definition. I think we wrongly had only one `TASK_INSTANCE` resource where we should have two different resources for those.
   
   IMHO  - we have the opportunity to fix it (and that will allow us to avoid adding tuple handling from my above proposal.
   
   Why don't we simplify it all by 
   
   * `DagAccessEntity.DAG`
   * `DagAccessEntity.TASK`
   * `DagAccessEntity.RUN`
   * `DagAccessEntity.TASK_INSTANCE` 
   
   We should only need to specify TASK_INSTANCE entity. In FAB implementation we could map:
   
   * DagEntity.TASK => resource.TASK_INSTANCE and 
   * DagEntity.TASK_INSTANCE -> resource.DAG_RUN + resource.TASK_INSTANCE
   
   The fact that sometimes we specified "read" for DAG_RUN and "write" to "TASK_INSTANCE" resources was I think a mistake. I can't imagine situation where we would like to limit the user from writing to DAG_RUN where the user should be able to update TASK_INSTANCE. So this would automatically fix the bad permissions we had originally for those endpoints.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1357158265


##########
airflow/www/security_manager.py:
##########
@@ -269,125 +272,47 @@ def get_user_roles(user=None):
             user = g.user
         return user.roles
 
-    def get_readable_dags(self, user) -> Iterable[DagModel]:
-        """Gets the DAGs readable by authenticated user."""
-        warnings.warn(
-            "`get_readable_dags` has been deprecated. Please use `get_readable_dag_ids` instead.",
-            RemovedInAirflow3Warning,
-            stacklevel=2,
-        )
-        with warnings.catch_warnings():
-            warnings.simplefilter("ignore", RemovedInAirflow3Warning)
-            return self.get_accessible_dags([permissions.ACTION_CAN_READ], user)
-
-    def get_editable_dags(self, user) -> Iterable[DagModel]:
-        """Gets the DAGs editable by authenticated user."""
-        warnings.warn(
-            "`get_editable_dags` has been deprecated. Please use `get_editable_dag_ids` instead.",
-            RemovedInAirflow3Warning,
-            stacklevel=2,
-        )
-        with warnings.catch_warnings():
-            warnings.simplefilter("ignore", RemovedInAirflow3Warning)
-            return self.get_accessible_dags([permissions.ACTION_CAN_EDIT], user)
-
-    @provide_session
-    def get_accessible_dags(
-        self,
-        user_actions: Container[str] | None,
-        user,
-        session: Session = NEW_SESSION,
-    ) -> Iterable[DagModel]:
-        warnings.warn(
-            "`get_accessible_dags` has been deprecated. Please use `get_accessible_dag_ids` instead.",
-            RemovedInAirflow3Warning,
-            stacklevel=3,
-        )
-        dag_ids = self.get_accessible_dag_ids(user, user_actions, session)
-        return session.scalars(select(DagModel).where(DagModel.dag_id.in_(dag_ids)))
-
-    def get_readable_dag_ids(self, user) -> set[str]:
+    def get_readable_dag_ids(self, user=None) -> set[str]:
         """Gets the DAG IDs readable by authenticated user."""
-        return self.get_accessible_dag_ids(user, [permissions.ACTION_CAN_READ])
+        return self.get_permitted_dag_ids(methods=["GET"], user=user)
 
-    def get_editable_dag_ids(self, user) -> set[str]:
+    def get_editable_dag_ids(self, user=None) -> set[str]:
         """Gets the DAG IDs editable by authenticated user."""
-        return self.get_accessible_dag_ids(user, [permissions.ACTION_CAN_EDIT])
+        return self.get_permitted_dag_ids(methods=["PUT"], user=user)
 
     @provide_session
-    def get_accessible_dag_ids(
+    def get_permitted_dag_ids(
         self,
-        user,
-        user_actions: Container[str] | None = None,
+        *,
+        methods: Container[ResourceMethod] | None = None,
+        user=None,
         session: Session = NEW_SESSION,
     ) -> set[str]:
         """Generic function to get readable or writable DAGs for user."""
-        if not user_actions:
-            user_actions = [permissions.ACTION_CAN_EDIT, permissions.ACTION_CAN_READ]
+        if not methods:
+            methods = ["PUT", "GET"]
 
-        if not get_auth_manager().is_logged_in():
-            roles = user.roles
-        else:
-            if (permissions.ACTION_CAN_EDIT in user_actions and self.can_edit_all_dags(user)) or (
-                permissions.ACTION_CAN_READ in user_actions and self.can_read_all_dags(user)
-            ):
-                return {dag.dag_id for dag in session.execute(select(DagModel.dag_id))}
-            user_query = session.scalar(
-                select(User)
-                .options(
-                    joinedload(User.roles)
-                    .subqueryload(Role.permissions)
-                    .options(joinedload(Permission.action), joinedload(Permission.resource))
-                )
-                .where(User.id == user.id)
-            )
-            roles = user_query.roles
-
-        resources = set()
-        for role in roles:
-            for permission in role.permissions:
-                action = permission.action.name
-                if action in user_actions:
-                    resource = permission.resource.name
-                    if resource == permissions.RESOURCE_DAG:
-                        return {dag.dag_id for dag in session.execute(select(DagModel.dag_id))}
-                    if resource.startswith(permissions.RESOURCE_DAG_PREFIX):
-                        resources.add(resource[len(permissions.RESOURCE_DAG_PREFIX) :])
-                    else:
-                        resources.add(resource)
-        return {
-            dag.dag_id
-            for dag in session.execute(select(DagModel.dag_id).where(DagModel.dag_id.in_(resources)))
-        }
-
-    def can_access_some_dags(self, action: str, dag_id: str | None = None) -> bool:
-        """Checks if user has read or write access to some dags."""
-        if dag_id and dag_id != "~":
-            root_dag_id = self._get_root_dag_id(dag_id)
-            return self.has_access(action, permissions.resource_name_for_dag(root_dag_id))
+        dag_ids = {dag.dag_id for dag in session.execute(select(DagModel.dag_id))}
 
-        user = g.user
-        if action == permissions.ACTION_CAN_READ:
-            return any(self.get_readable_dag_ids(user))
-        return any(self.get_editable_dag_ids(user))
+        if ("GET" in methods and get_auth_manager().is_authorized_dag(method="GET", user=user)) or (
+            "PUT" in methods and get_auth_manager().is_authorized_dag(method="PUT", user=user)
+        ):
+            return dag_ids
 
-    def can_read_dag(self, dag_id: str, user=None) -> bool:
-        """Determines whether a user has DAG read access."""
-        root_dag_id = self._get_root_dag_id(dag_id)
-        dag_resource_name = permissions.resource_name_for_dag(root_dag_id)
-        return self.has_access(permissions.ACTION_CAN_READ, dag_resource_name, user=user)
+        def _is_permitted_dag_id(method: ResourceMethod, methods: Container[ResourceMethod], dag_id: str):
+            return method in methods and get_auth_manager().is_authorized_dag(
+                method=method, details=DagDetails(id=dag_id), user=user
+            )
 
-    def can_edit_dag(self, dag_id: str, user=None) -> bool:
-        """Determines whether a user has DAG edit access."""
-        root_dag_id = self._get_root_dag_id(dag_id)
-        dag_resource_name = permissions.resource_name_for_dag(root_dag_id)
-        return self.has_access(permissions.ACTION_CAN_EDIT, dag_resource_name, user=user)
+        return {

Review Comment:
   I see. I was also worried about the performance issue. The only concern I have with this solution is then we will have to implement such function for every resource we want be tenant aware in the future (connections, variables, ...). All auth managers will have to implement methods like `get_permitted_connections`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1347821224


##########
airflow/api_connexion/endpoints/dag_run_endpoint.py:
##########
@@ -76,12 +77,7 @@
 RESOURCE_EVENT_PREFIX = "dag_run"
 
 
-@security.requires_access(
-    [
-        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
-        (permissions.ACTION_CAN_DELETE, permissions.RESOURCE_DAG_RUN),
-    ],
-)
+@security.requires_access_dag("DELETE", DagAccessEntity.RUN)

Review Comment:
   I guess it really depends on style but the reason I did that is I like having a 1:1 mapping between the decorators `@security.requires_access_*` and APIs in auth manager



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vincbeck closed pull request #34317: Use auth manager `is_authorized_` APIs to check user permissions in Rest API

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck closed pull request #34317: Use auth manager `is_authorized_` APIs to check user permissions in Rest API
URL: https://github.com/apache/airflow/pull/34317


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vincbeck commented on a diff in pull request #34317: Use auth manager `is_authorized_` APIs to check user permissions in Rest API

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1338731975


##########
airflow/api_connexion/endpoints/dag_endpoint.py:
##########
@@ -180,7 +180,7 @@ def patch_dags(limit, session, offset=0, only_active=True, tags=None, dag_id_pat
     return dags_collection_schema.dump(DAGCollection(dags=dags, total_entries=total_entries))
 
 
-@security.requires_access([(permissions.ACTION_CAN_DELETE, permissions.RESOURCE_DAG)])
+@security.requires_access_dag("DELETE")

Review Comment:
   1. The difference is just on the semantic. As of today, permissions are represented by a couple of action and resources. You can find the list of actions and resources [here](https://github.com/apache/airflow/blob/main/airflow/security/permissions.py). In AIP-56, we decided to get rid of these representations and come up with a more formal/generic way to represent permissions. The current list of resources is also too big and we want to make it simpler. I would recommend you reading this [comment thread](https://github.com/apache/airflow/pull/33213#discussion_r1304881823) which might help you the different decisions we took along the process.
   2. We decided to go with a `Literal` instead of an `enum` (see [here](https://github.com/apache/airflow/pull/33213#discussion_r1318202392)). So yes it is a string but constraints with some specific values only



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] o-nikolas commented on a diff in pull request #34317: Use auth manager `is_authorized_` APIs to check user permissions in Rest API

Posted by "o-nikolas (via GitHub)" <gi...@apache.org>.
o-nikolas commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1340496770


##########
airflow/api_connexion/endpoints/dag_endpoint.py:
##########
@@ -69,7 +69,7 @@ def get_dag_details(*, dag_id: str) -> APIResponse:
     return dag_detail_schema.dump(dag)
 
 
-@security.requires_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG)])
+@requires_authentication

Review Comment:
   > I dont think this is perfect so if you think something is off here, please call it out :).
   
   I don't think I have enough context to call out whether something is off here or not. But your explanation does make sense!
   
   > @security.requires_access_dag("GET") would check if the user has access to all DAGs but here this is not what we want to check.
   
   Is this what the decorator is doing for something like `get_dag(...)` above? In that case we only want one specific dag, but we're checking if the user has access to all DAGs?
   
   > While writing this comment I am wondering if `@security.requires_access_dag("GET")` should mean "does the user have access to one DAG" instead of "does the user have access to all DAGs". Thoughts? I dont think checking whether the user has access to all DAGs in Airflow makes sense.
   
   I _think_ this makes more sense. And is more of the common case. But surely there are cases where an action is being done on multiple dags? (but maybe not, you would know better than I).
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vincbeck commented on a diff in pull request #34317: Use auth manager `is_authorized_` APIs to check user permissions in Rest API

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1338734563


##########
airflow/api_connexion/endpoints/xcom_endpoint.py:
##########
@@ -39,14 +39,7 @@
     from airflow.api_connexion.types import APIResponse
 
 
-@security.requires_access(
-    [
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_XCOM),
-    ],
-)
+@security.requires_access_dag("GET", DagAccessEntity.XCOM)

Review Comment:
   Here we dont need to specify `DAG` because we are using `@security.requires_access_dag`. For the rest I dont think it is needed because here, the only thing we do is fetching Xcoms. Making things simpler



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1347964115


##########
airflow/www/security_manager.py:
##########
@@ -738,24 +639,13 @@ def create_perm_vm_for_all_dag(self) -> None:
     def check_authorization(
         self,
         perms: Sequence[tuple[str, str]] | None = None,
-        dag_id: str | None = None,
     ) -> bool:
         """Checks that the logged in user has the specified permissions."""
         if not perms:
             return True
 
         for perm in perms:
-            if perm in (
-                (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
-                (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
-                (permissions.ACTION_CAN_DELETE, permissions.RESOURCE_DAG),
-            ):
-                can_access_all_dags = self.has_access(*perm)
-                if not can_access_all_dags:
-                    action = perm[0]
-                    if not self.can_access_some_dags(action, dag_id):
-                        return False
-            elif not self.has_access(*perm):
+            if not self.has_access(*perm):
                 return False
 
         return True

Review Comment:
   Love that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1355994191


##########
airflow/www/security_manager.py:
##########
@@ -269,125 +272,47 @@ def get_user_roles(user=None):
             user = g.user
         return user.roles
 
-    def get_readable_dags(self, user) -> Iterable[DagModel]:
-        """Gets the DAGs readable by authenticated user."""
-        warnings.warn(
-            "`get_readable_dags` has been deprecated. Please use `get_readable_dag_ids` instead.",
-            RemovedInAirflow3Warning,
-            stacklevel=2,
-        )
-        with warnings.catch_warnings():
-            warnings.simplefilter("ignore", RemovedInAirflow3Warning)
-            return self.get_accessible_dags([permissions.ACTION_CAN_READ], user)
-
-    def get_editable_dags(self, user) -> Iterable[DagModel]:
-        """Gets the DAGs editable by authenticated user."""
-        warnings.warn(
-            "`get_editable_dags` has been deprecated. Please use `get_editable_dag_ids` instead.",
-            RemovedInAirflow3Warning,
-            stacklevel=2,
-        )
-        with warnings.catch_warnings():
-            warnings.simplefilter("ignore", RemovedInAirflow3Warning)
-            return self.get_accessible_dags([permissions.ACTION_CAN_EDIT], user)
-
-    @provide_session
-    def get_accessible_dags(
-        self,
-        user_actions: Container[str] | None,
-        user,
-        session: Session = NEW_SESSION,
-    ) -> Iterable[DagModel]:
-        warnings.warn(
-            "`get_accessible_dags` has been deprecated. Please use `get_accessible_dag_ids` instead.",
-            RemovedInAirflow3Warning,
-            stacklevel=3,
-        )
-        dag_ids = self.get_accessible_dag_ids(user, user_actions, session)
-        return session.scalars(select(DagModel).where(DagModel.dag_id.in_(dag_ids)))
-
-    def get_readable_dag_ids(self, user) -> set[str]:
+    def get_readable_dag_ids(self, user=None) -> set[str]:
         """Gets the DAG IDs readable by authenticated user."""
-        return self.get_accessible_dag_ids(user, [permissions.ACTION_CAN_READ])
+        return self.get_permitted_dag_ids(methods=["GET"], user=user)
 
-    def get_editable_dag_ids(self, user) -> set[str]:
+    def get_editable_dag_ids(self, user=None) -> set[str]:
         """Gets the DAG IDs editable by authenticated user."""
-        return self.get_accessible_dag_ids(user, [permissions.ACTION_CAN_EDIT])
+        return self.get_permitted_dag_ids(methods=["PUT"], user=user)
 
     @provide_session
-    def get_accessible_dag_ids(
+    def get_permitted_dag_ids(
         self,
-        user,
-        user_actions: Container[str] | None = None,
+        *,
+        methods: Container[ResourceMethod] | None = None,
+        user=None,
         session: Session = NEW_SESSION,
     ) -> set[str]:
         """Generic function to get readable or writable DAGs for user."""
-        if not user_actions:
-            user_actions = [permissions.ACTION_CAN_EDIT, permissions.ACTION_CAN_READ]
+        if not methods:
+            methods = ["PUT", "GET"]
 
-        if not get_auth_manager().is_logged_in():
-            roles = user.roles
-        else:
-            if (permissions.ACTION_CAN_EDIT in user_actions and self.can_edit_all_dags(user)) or (
-                permissions.ACTION_CAN_READ in user_actions and self.can_read_all_dags(user)
-            ):
-                return {dag.dag_id for dag in session.execute(select(DagModel.dag_id))}
-            user_query = session.scalar(
-                select(User)
-                .options(
-                    joinedload(User.roles)
-                    .subqueryload(Role.permissions)
-                    .options(joinedload(Permission.action), joinedload(Permission.resource))
-                )
-                .where(User.id == user.id)
-            )
-            roles = user_query.roles
-
-        resources = set()
-        for role in roles:
-            for permission in role.permissions:
-                action = permission.action.name
-                if action in user_actions:
-                    resource = permission.resource.name
-                    if resource == permissions.RESOURCE_DAG:
-                        return {dag.dag_id for dag in session.execute(select(DagModel.dag_id))}
-                    if resource.startswith(permissions.RESOURCE_DAG_PREFIX):
-                        resources.add(resource[len(permissions.RESOURCE_DAG_PREFIX) :])
-                    else:
-                        resources.add(resource)
-        return {
-            dag.dag_id
-            for dag in session.execute(select(DagModel.dag_id).where(DagModel.dag_id.in_(resources)))
-        }
-
-    def can_access_some_dags(self, action: str, dag_id: str | None = None) -> bool:
-        """Checks if user has read or write access to some dags."""
-        if dag_id and dag_id != "~":
-            root_dag_id = self._get_root_dag_id(dag_id)
-            return self.has_access(action, permissions.resource_name_for_dag(root_dag_id))
+        dag_ids = {dag.dag_id for dag in session.execute(select(DagModel.dag_id))}
 
-        user = g.user
-        if action == permissions.ACTION_CAN_READ:
-            return any(self.get_readable_dag_ids(user))
-        return any(self.get_editable_dag_ids(user))
+        if ("GET" in methods and get_auth_manager().is_authorized_dag(method="GET", user=user)) or (
+            "PUT" in methods and get_auth_manager().is_authorized_dag(method="PUT", user=user)
+        ):
+            return dag_ids
 
-    def can_read_dag(self, dag_id: str, user=None) -> bool:
-        """Determines whether a user has DAG read access."""
-        root_dag_id = self._get_root_dag_id(dag_id)
-        dag_resource_name = permissions.resource_name_for_dag(root_dag_id)
-        return self.has_access(permissions.ACTION_CAN_READ, dag_resource_name, user=user)
+        def _is_permitted_dag_id(method: ResourceMethod, methods: Container[ResourceMethod], dag_id: str):
+            return method in methods and get_auth_manager().is_authorized_dag(
+                method=method, details=DagDetails(id=dag_id), user=user
+            )
 
-    def can_edit_dag(self, dag_id: str, user=None) -> bool:
-        """Determines whether a user has DAG edit access."""
-        root_dag_id = self._get_root_dag_id(dag_id)
-        dag_resource_name = permissions.resource_name_for_dag(root_dag_id)
-        return self.has_access(permissions.ACTION_CAN_EDIT, dag_resource_name, user=user)
+        return {

Review Comment:
   I think if we leave it like this, it might incur a performance hit for FAB users who have only access to some dags. In the current implementation reading all dags as set is only done when the user has permission to read all dags in the role.
   
   Also it hampers any future multi-tenant implementation of auth manager, because basically it will not allow to optimize this via auth_manager. Basically for all users or all tenants, they will have to read all dag_ids to memory always when they want to see DAGs page. This will become a problem very quickly for any multi-tenant environment.
   
   I **think** the solution for that should be different. Rather than implementing all the loop here, the "get_permitted_dag_ids" should be implemented in the `auth_manager`.  
   
   Here we can do some basic checks for all dags, but then getting the list of allowed dags should be delegated to auth_manager.
   
   ```python
           if ("GET" in methods and get_auth_manager().is_authorized_dag(method="GET", user=user)) or (
               "PUT" in methods and get_auth_manager().is_authorized_dag(method="PUT", user=user)
           ):
               return {dag.dag_id for dag in session.execute(select(DagModel.dag_id))}
   
          return get_auth_manager().get_permitted_dag_ids(methods=methods, user=user)
   ```
   
   Then each auth manager could implement an optimized version of the `get_permitted_dag_ids`. For FAB, it will be the original implementation with resources, but for others it could be different. Conceptually this for example:
   
   ```python
   tenant = get_tenant(user)
   return {dag.dag_id for dag in session.execute(select(DagModel.dag_id).filter_by(tenant=tenant)}
   ```
   
   This is no problem for auth manager to have two ways of checking permissions:
   
   1) when you have dag_id - you can check it individually
   2) when you get the list you have a method that returns the list based on same criteria but implemented without using the individual check
   
   This is the only way to implement performant version of such permission check IMHO.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck commented on PR #34317:
URL: https://github.com/apache/airflow/pull/34317#issuecomment-1755733216

   Comments are addressed and CI is green, reviews would be appreciated :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1355579389


##########
airflow/www/extensions/init_jinja_globals.py:
##########
@@ -69,10 +70,13 @@ def prepare_jinja_globals():
             "git_version": git_version,
             "k8s_or_k8scelery_executor": IS_K8S_OR_K8SCELERY_EXECUTOR,
             "rest_api_enabled": False,
-            "auth_manager": get_auth_manager(),
             "config_test_connection": conf.get("core", "test_connection", fallback="Disabled"),
         }
 
+        # Extra global specific to auth manager
+        extra_globals["auth_manager"] = get_auth_manager()
+        extra_globals["DagDetails"] = DagDetails

Review Comment:
   Done :)



##########
airflow/www/extensions/init_jinja_globals.py:
##########
@@ -69,10 +70,13 @@ def prepare_jinja_globals():
             "git_version": git_version,
             "k8s_or_k8scelery_executor": IS_K8S_OR_K8SCELERY_EXECUTOR,
             "rest_api_enabled": False,
-            "auth_manager": get_auth_manager(),
             "config_test_connection": conf.get("core", "test_connection", fallback="Disabled"),
         }
 
+        # Extra global specific to auth manager
+        extra_globals["auth_manager"] = get_auth_manager()
+        extra_globals["DagDetails"] = DagDetails

Review Comment:
   Done :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1355941755


##########
airflow/www/extensions/init_jinja_globals.py:
##########
@@ -69,10 +70,13 @@ def prepare_jinja_globals():
             "git_version": git_version,
             "k8s_or_k8scelery_executor": IS_K8S_OR_K8SCELERY_EXECUTOR,
             "rest_api_enabled": False,
-            "auth_manager": get_auth_manager(),
             "config_test_connection": conf.get("core", "test_connection", fallback="Disabled"),
         }
 
+        # Extra global specific to auth manager
+        extra_globals["auth_manager"] = get_auth_manager()
+        extra_globals["DagDetails"] = DagDetails

Review Comment:
   It indeed sounds strange to pass authmanager as JINJA global context (especially that it is only used for checking "can_edit".
   
   I think it could be easily fixed for all those multiple views by injecting "can_edit" in `baseviews.py` based on whether `dag` is already present in the arguments (all the views that extend dag_html must have `dag` present). 
   
   Smth like:
   
   ```python
       def render_template(self, template, **kwargs):
           """
           Use this method on your own endpoints, will pass the extra_args
           to the templates.
   
           :param template: The template relative path
           :param kwargs: arguments to be passed to the template
           """
           kwargs["base_template"] = self.appbuilder.base_template
           kwargs["appbuilder"] = self.appbuilder 
   
           # THIS
           if "dag" in kwargs:  
              kwargs["can_edit"] = get_auth_manager().is_authorized_dag(method="PUT", details=DagDetails(id=kwargs["dag"].dag_id))
           
           return render_template(
               template, **dict(list(kwargs.items()) + list(self.extra_args.items()))
           )
   ```
   
   Or it could be added in all the views individualy in extra_args



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1355941755


##########
airflow/www/extensions/init_jinja_globals.py:
##########
@@ -69,10 +70,13 @@ def prepare_jinja_globals():
             "git_version": git_version,
             "k8s_or_k8scelery_executor": IS_K8S_OR_K8SCELERY_EXECUTOR,
             "rest_api_enabled": False,
-            "auth_manager": get_auth_manager(),
             "config_test_connection": conf.get("core", "test_connection", fallback="Disabled"),
         }
 
+        # Extra global specific to auth manager
+        extra_globals["auth_manager"] = get_auth_manager()
+        extra_globals["DagDetails"] = DagDetails

Review Comment:
   It indeed sounds strange to pass authmanager as JINJA global context (especially that it is only used for checking "can_edit".
   
   I think it could be easily fixed for all those multiple views by injecting "can_edit" in `baseviews.py` based on whether `dag` is already present in the arguments (all the views that extend dag_html must have `dag` present). 
   
   Smth like:
   
   ````python
       def render_template(self, template, **kwargs):
           """
           Use this method on your own endpoints, will pass the extra_args
           to the templates.
   
           :param template: The template relative path
           :param kwargs: arguments to be passed to the template
           """
           kwargs["base_template"] = self.appbuilder.base_template
           kwargs["appbuilder"] = self.appbuilder 
   
           # THIS
           if "dag" in kwargs:  
              kwargs["can_edit"] = get_auth_manager().is_authorized_dag(method="PUT", details=DagDetails(id=kwargs["dag"].dag_id))
           
           return render_template(
               template, **dict(list(kwargs.items()) + list(self.extra_args.items()))
           )
   
   ```
   Or it could be added in all the views individualy in extra_args



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vincbeck commented on a diff in pull request #34317: Use auth manager `is_authorized_` APIs to check user permissions in Rest API

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1341658428


##########
airflow/api_connexion/endpoints/dag_endpoint.py:
##########
@@ -69,7 +69,7 @@ def get_dag_details(*, dag_id: str) -> APIResponse:
     return dag_detail_schema.dump(dag)
 
 
-@security.requires_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG)])
+@requires_authentication

Review Comment:
   I just pushed the changes to address it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vincbeck commented on a diff in pull request #34317: Use auth manager `is_authorized_` APIs to check user permissions in Rest API

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1340301025


##########
airflow/auth/managers/fab/fab_auth_manager.py:
##########
@@ -171,34 +193,53 @@ def is_authorized_dag(
         entity (e.g. DAG runs).
         2. ``dag_access`` is provided which means the user wants to access a sub entity of the DAG
         (e.g. DAG runs).
-            a. If ``method`` is GET, then check the user has READ permissions on the DAG and the sub entity
-            b. Else, check the user has EDIT permissions on the DAG and ``method`` on the sub entity
+            a. If ``method`` is GET, then check the user has READ permissions on the DAG and the sub entity.
+            b. Else, check the user has EDIT permissions on the DAG and ``method`` on the sub entity.
+
+            However, if no specific DAG is targeted, just check the sub entity.
 
         :param method: The method to authorize.
-        :param dag_access_entity: The dag access entity.
-        :param dag_details: The dag details.
+        :param access_entity: The dag access entity.
+        :param details: The dag details.
         :param user: The user.
         """
-        if not dag_access_entity:
+        if not access_entity:
             # Scenario 1
-            return self._is_authorized_dag(method=method, dag_details=dag_details, user=user)
+            return self._is_authorized_dag(method=method, details=details, user=user)
         else:
             # Scenario 2
-            resource_type = self._get_fab_resource_type(dag_access_entity)
+            resource_type = self._get_fab_resource_type(access_entity)
             dag_method: ResourceMethod = "GET" if method == "GET" else "PUT"
 
-            return self._is_authorized_dag(
-                method=dag_method, dag_details=dag_details, user=user
-            ) and self._is_authorized(method=method, resource_type=resource_type, user=user)
+            if (details and details.id) and not self._is_authorized_dag(
+                method=dag_method, details=details, user=user
+            ):
+                return False
+
+            return self._is_authorized(method=method, resource_type=resource_type, user=user)
 
-    def is_authorized_dataset(self, *, method: ResourceMethod, user: BaseUser | None = None) -> bool:
+    def is_authorized_dataset(
+        self, *, method: ResourceMethod, details: DatasetDetails | None = None, user: BaseUser | None = None
+    ) -> bool:
         return self._is_authorized(method=method, resource_type=RESOURCE_DATASET, user=user)
 
-    def is_authorized_variable(self, *, method: ResourceMethod, user: BaseUser | None = None) -> bool:
+    def is_authorized_pool(
+        self, *, method: ResourceMethod, details: PoolDetails | None = None, user: BaseUser | None = None
+    ) -> bool:
+        return self._is_authorized(method=method, resource_type=RESOURCE_POOL, user=user)
+
+    def is_authorized_variable(
+        self, *, method: ResourceMethod, details: VariableDetails | None = None, user: BaseUser | None = None
+    ) -> bool:
         return self._is_authorized(method=method, resource_type=RESOURCE_VARIABLE, user=user)
 
     def is_authorized_website(self, *, user: BaseUser | None = None) -> bool:
-        return self._is_authorized(method="GET", resource_type=RESOURCE_WEBSITE, user=user)
+        return (
+            self._is_authorized(method="GET", resource_type=RESOURCE_PLUGIN, user=user)
+            or self._is_authorized(method="GET", resource_type=RESOURCE_PROVIDER, user=user)
+            or self._is_authorized(method="GET", resource_type=RESOURCE_TRIGGER, user=user)

Review Comment:
   Good question. Long story short is we decided to "group" FAB resource types into more broad categories. The number of resource types defined by FAB is too big and not convenient to define permissions. So we decided to create a category (here called website) which groups `RESOURCE_PLUGIN`, `RESOURCE_PROVIDER `, `RESOURCE_TRIGGER` and `RESOURCE_WEBSITE`. You can see the discussion about this topic in this [comment thread](https://github.com/apache/airflow/pull/33213#discussion_r1308050422).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1347839635


##########
airflow/api_connexion/endpoints/plugin_endpoint.py:
##########
@@ -22,13 +22,12 @@
 from airflow.api_connexion.parameters import check_limit, format_parameters
 from airflow.api_connexion.schemas.plugin_schema import PluginCollection, plugin_collection_schema
 from airflow.plugins_manager import get_plugin_info
-from airflow.security import permissions
 
 if TYPE_CHECKING:
     from airflow.api_connexion.types import APIResponse
 
 
-@security.requires_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_PLUGIN)])
+@security.requires_access_website()

Review Comment:
   Yes. [We decided to group ](https://github.com/apache/airflow/pull/33213#discussion_r1308050422) `RESOURCE_PLUGIN`, `RESOURCE_PROVIDER`, `RESOURCE_TRIGGER` and `RESOURCE_WEBSITE` into one category. Though, I specify the page (like we do with DAGs) as a parameter in the follow-up PR. I have not created it because I stop creating PR on top of another but basically I'll add that precision in the next PR. So that the auth manager decide whether it has fine grained access to the page level or just the category level.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1357045707


##########
airflow/api_connexion/endpoints/task_instance_endpoint.py:
##########
@@ -61,13 +61,8 @@
 T = TypeVar("T")
 
 
-@security.requires_access(
-    [
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
-    ],
-)
+@security.requires_access_dag("GET", DagAccessEntity.RUN)
+@security.requires_access_dag("GET", DagAccessEntity.TASK_INSTANCE)

Review Comment:
   Strongly agree on this one. I tried to achieve this and as you said, just declare the action the user is trying to do, then the auth manager is responsible of checking the permissions needed to perform such action. The new resource type `TASK` makes and simplify it. I dont like having to check `DAG_RUN` and `TASK_INSTANCE` at the same time.
   
   > Why don't we simplify it all by
   > 
   > DagAccessEntity.DAG
   > DagAccessEntity.TASK
   > DagAccessEntity.RUN
   > DagAccessEntity.TASK_INSTANCE
   
   My approach is, if no `DagAccessEntity` is defined then it is on the Dag level. My thinking is `DagAccessEntity` is here to give more information on which entity/information of a DAG the user is trying to access. If none is specified, then it is on the DAG itself. You rather being explicit here?
   
   Also, just to confirm. We still need the other values in `DagAccessEntity` right? Such as `AUDIT_LOG`, `TASK_LOGS`, ...
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1355894015


##########
airflow/api_connexion/endpoints/task_instance_endpoint.py:
##########
@@ -61,13 +61,8 @@
 T = TypeVar("T")
 
 
-@security.requires_access(
-    [
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
-    ],
-)
+@security.requires_access_dag("GET", DagAccessEntity.RUN)
+@security.requires_access_dag("GET", DagAccessEntity.TASK_INSTANCE)

Review Comment:
   Finally got to review it after the break (and this and next week will have a lot of focus on those).
   
   I think this really calls for small refactoring: 
   
   ```python
   def requires_access_dag(
       method: ResourceMethod, access_entities: tuple[DAGAccessEntity] | DagAccessEntity | None = None
   ```
   
   and then:
   
   ```python
   @security.requires_access_dag("GET", (DagAccessEntity.RUN, DagAccessEntity.TASK_INSTANCE))
   ```
   
   and convert `access_entity` to `access_entities: tuple[DAGDetails]` (we can convert single dag detail to one-element tuple if we want to keep simplicity of decorators for most cases).
   
   Not sure if now or as a follow up to this PR, but I am quite sure this will be much better for performance point of view (one callback instead of two and it will be pretty much impossible to optimize it if we have two decorators. 
   
   And since it will change the callback, it's likely better to do it it now.
   
   UPDATE: See below for better (I think) idea.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1355927810


##########
airflow/api_connexion/security.py:
##########
@@ -48,10 +61,194 @@ def requires_access_decorator(func: T):
         @wraps(func)
         def decorated(*args, **kwargs):
             check_authentication()
-            if appbuilder.sm.check_authorization(permissions, kwargs.get("dag_id")):
+            if appbuilder.sm.check_authorization(permissions):
                 return func(*args, **kwargs)
             raise PermissionDenied()
 
         return cast(T, decorated)
 
     return requires_access_decorator
+
+
+def _requires_access(*, is_authorized_callback: Callable[[], bool], func: Callable, args, kwargs) -> bool:
+    """
+    Define the behavior whether the user is authorized to access the resource.
+
+    :param is_authorized_callback: callback to execute to figure whether the user is authorized to access
+        the resource
+    :param func: the function to call if the user is authorized
+    :param args: the arguments of ``func``
+    :param kwargs: the keyword arguments ``func``
+
+    :meta private:
+    """
+    check_authentication()
+    if is_authorized_callback():
+        return func(*args, **kwargs)
+    raise PermissionDenied()
+
+
+def requires_authentication(func: T):
+    """Decorator for functions that require authentication."""
+
+    @wraps(func)
+    def decorated(*args, **kwargs):
+        check_authentication()
+        return func(*args, **kwargs)
+
+    return cast(T, decorated)
+
+
+def requires_access_configuration(method: ResourceMethod) -> Callable[[T], T]:
+    def requires_access_decorator(func: T):
+        @wraps(func)
+        def decorated(*args, **kwargs):
+            section: str | None = kwargs.get("section")
+            return _requires_access(
+                is_authorized_callback=lambda: get_auth_manager().is_authorized_configuration(
+                    method=method, details=ConfigurationDetails(section=section)
+                ),
+                func=func,
+                args=args,
+                kwargs=kwargs,
+            )
+
+        return cast(T, decorated)
+
+    return requires_access_decorator
+
+
+def requires_access_connection(method: ResourceMethod) -> Callable[[T], T]:
+    def requires_access_decorator(func: T):
+        @wraps(func)
+        def decorated(*args, **kwargs):
+            connection_id: str | None = kwargs.get("connection_id")
+            return _requires_access(
+                is_authorized_callback=lambda: get_auth_manager().is_authorized_connection(
+                    method=method, details=ConnectionDetails(conn_id=connection_id)
+                ),
+                func=func,
+                args=args,
+                kwargs=kwargs,
+            )
+
+        return cast(T, decorated)
+
+    return requires_access_decorator
+
+
+def requires_access_dag(
+    method: ResourceMethod, access_entity: DagAccessEntity | None = None
+) -> Callable[[T], T]:
+    appbuilder = get_airflow_app().appbuilder
+
+    def _is_authorized_callback(dag_id: str):
+        def callback():
+            access = get_auth_manager().is_authorized_dag(
+                method=method,
+                access_entity=access_entity,
+                details=DagDetails(id=dag_id),
+            )
+
+            # ``access`` means here:
+            # - if a DAG id is provided (``dag_id`` not None): is the user authorized to access this DAG
+            # - if no DAG id is provided: is the user authorized to access all DAGs
+            if dag_id or access:
+                return access
+
+            # No DAG id is provided and the user is not authorized to access all DAGs
+            # If method is "GET", return whether the user has read access to any DAGs
+            # If method is "PUT", return whether the user has edit access to any DAGs
+            return (method == "GET" and any(appbuilder.sm.get_readable_dag_ids())) or (

Review Comment:
   Is there a readson why this check is here and not in the FAB implementation ? I believe FAB manager should be able to check it on its own in the `is_authorized_dag` method and there is no need to do this check in the decorator? Or maybe I am missing something?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1355894015


##########
airflow/api_connexion/endpoints/task_instance_endpoint.py:
##########
@@ -61,13 +61,8 @@
 T = TypeVar("T")
 
 
-@security.requires_access(
-    [
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
-    ],
-)
+@security.requires_access_dag("GET", DagAccessEntity.RUN)
+@security.requires_access_dag("GET", DagAccessEntity.TASK_INSTANCE)

Review Comment:
   Finally got to review it after the break (and this and next week will have a lot of focus on those).
   
   I think this really calls for small refactoring: 
   
   ```python
   def requires_access_dag(
       method: ResourceMethod, access_entities: tuple[DAGAccessEntity] | DagAccessEntity | None = None
   ```
   
   and then:
   
   ```python
   @security.requires_access_dag("GET", (DagAccessEntity.RUN, DagAccessEntity.TASK_INSTANCE))
   ```
   
   and convert `access_entity` to `access_entities: tuple[DAGDetails]` (we can convert single dag detail to one-element tuple).
   
   Not sure if now or as a follow up to this PR, but I am quite sure this will be much better for performance point of view (one callback instead of two and it will be pretty much impossible to optimize it if we have two decorators. 
   
   And since it will change the callback, it's likely better to do it it now.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1355907725


##########
airflow/api_connexion/endpoints/task_instance_endpoint.py:
##########
@@ -646,13 +606,8 @@ def patch_task_instance(
     return task_instance_reference_schema.dump(ti)
 
 
-@security.requires_access(
-    [
-        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
-        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
-    ],
-)
+@security.requires_access_dag("GET", DagAccessEntity.RUN)

Review Comment:
   Same here -> this call can potentially create dag_runs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1359505166


##########
airflow/www/security_manager.py:
##########
@@ -269,125 +272,47 @@ def get_user_roles(user=None):
             user = g.user
         return user.roles
 
-    def get_readable_dags(self, user) -> Iterable[DagModel]:
-        """Gets the DAGs readable by authenticated user."""
-        warnings.warn(
-            "`get_readable_dags` has been deprecated. Please use `get_readable_dag_ids` instead.",
-            RemovedInAirflow3Warning,
-            stacklevel=2,
-        )
-        with warnings.catch_warnings():
-            warnings.simplefilter("ignore", RemovedInAirflow3Warning)
-            return self.get_accessible_dags([permissions.ACTION_CAN_READ], user)
-
-    def get_editable_dags(self, user) -> Iterable[DagModel]:
-        """Gets the DAGs editable by authenticated user."""
-        warnings.warn(
-            "`get_editable_dags` has been deprecated. Please use `get_editable_dag_ids` instead.",
-            RemovedInAirflow3Warning,
-            stacklevel=2,
-        )
-        with warnings.catch_warnings():
-            warnings.simplefilter("ignore", RemovedInAirflow3Warning)
-            return self.get_accessible_dags([permissions.ACTION_CAN_EDIT], user)
-
-    @provide_session
-    def get_accessible_dags(
-        self,
-        user_actions: Container[str] | None,
-        user,
-        session: Session = NEW_SESSION,
-    ) -> Iterable[DagModel]:
-        warnings.warn(
-            "`get_accessible_dags` has been deprecated. Please use `get_accessible_dag_ids` instead.",
-            RemovedInAirflow3Warning,
-            stacklevel=3,
-        )
-        dag_ids = self.get_accessible_dag_ids(user, user_actions, session)
-        return session.scalars(select(DagModel).where(DagModel.dag_id.in_(dag_ids)))
-
-    def get_readable_dag_ids(self, user) -> set[str]:
+    def get_readable_dag_ids(self, user=None) -> set[str]:
         """Gets the DAG IDs readable by authenticated user."""
-        return self.get_accessible_dag_ids(user, [permissions.ACTION_CAN_READ])
+        return self.get_permitted_dag_ids(methods=["GET"], user=user)
 
-    def get_editable_dag_ids(self, user) -> set[str]:
+    def get_editable_dag_ids(self, user=None) -> set[str]:
         """Gets the DAG IDs editable by authenticated user."""
-        return self.get_accessible_dag_ids(user, [permissions.ACTION_CAN_EDIT])
+        return self.get_permitted_dag_ids(methods=["PUT"], user=user)
 
     @provide_session
-    def get_accessible_dag_ids(
+    def get_permitted_dag_ids(
         self,
-        user,
-        user_actions: Container[str] | None = None,
+        *,
+        methods: Container[ResourceMethod] | None = None,
+        user=None,
         session: Session = NEW_SESSION,
     ) -> set[str]:
         """Generic function to get readable or writable DAGs for user."""
-        if not user_actions:
-            user_actions = [permissions.ACTION_CAN_EDIT, permissions.ACTION_CAN_READ]
+        if not methods:
+            methods = ["PUT", "GET"]
 
-        if not get_auth_manager().is_logged_in():
-            roles = user.roles
-        else:
-            if (permissions.ACTION_CAN_EDIT in user_actions and self.can_edit_all_dags(user)) or (
-                permissions.ACTION_CAN_READ in user_actions and self.can_read_all_dags(user)
-            ):
-                return {dag.dag_id for dag in session.execute(select(DagModel.dag_id))}
-            user_query = session.scalar(
-                select(User)
-                .options(
-                    joinedload(User.roles)
-                    .subqueryload(Role.permissions)
-                    .options(joinedload(Permission.action), joinedload(Permission.resource))
-                )
-                .where(User.id == user.id)
-            )
-            roles = user_query.roles
-
-        resources = set()
-        for role in roles:
-            for permission in role.permissions:
-                action = permission.action.name
-                if action in user_actions:
-                    resource = permission.resource.name
-                    if resource == permissions.RESOURCE_DAG:
-                        return {dag.dag_id for dag in session.execute(select(DagModel.dag_id))}
-                    if resource.startswith(permissions.RESOURCE_DAG_PREFIX):
-                        resources.add(resource[len(permissions.RESOURCE_DAG_PREFIX) :])
-                    else:
-                        resources.add(resource)
-        return {
-            dag.dag_id
-            for dag in session.execute(select(DagModel.dag_id).where(DagModel.dag_id.in_(resources)))
-        }
-
-    def can_access_some_dags(self, action: str, dag_id: str | None = None) -> bool:
-        """Checks if user has read or write access to some dags."""
-        if dag_id and dag_id != "~":
-            root_dag_id = self._get_root_dag_id(dag_id)
-            return self.has_access(action, permissions.resource_name_for_dag(root_dag_id))
+        dag_ids = {dag.dag_id for dag in session.execute(select(DagModel.dag_id))}
 
-        user = g.user
-        if action == permissions.ACTION_CAN_READ:
-            return any(self.get_readable_dag_ids(user))
-        return any(self.get_editable_dag_ids(user))
+        if ("GET" in methods and get_auth_manager().is_authorized_dag(method="GET", user=user)) or (
+            "PUT" in methods and get_auth_manager().is_authorized_dag(method="PUT", user=user)
+        ):
+            return dag_ids
 
-    def can_read_dag(self, dag_id: str, user=None) -> bool:
-        """Determines whether a user has DAG read access."""
-        root_dag_id = self._get_root_dag_id(dag_id)
-        dag_resource_name = permissions.resource_name_for_dag(root_dag_id)
-        return self.has_access(permissions.ACTION_CAN_READ, dag_resource_name, user=user)
+        def _is_permitted_dag_id(method: ResourceMethod, methods: Container[ResourceMethod], dag_id: str):
+            return method in methods and get_auth_manager().is_authorized_dag(
+                method=method, details=DagDetails(id=dag_id), user=user
+            )
 
-    def can_edit_dag(self, dag_id: str, user=None) -> bool:
-        """Determines whether a user has DAG edit access."""
-        root_dag_id = self._get_root_dag_id(dag_id)
-        dag_resource_name = permissions.resource_name_for_dag(root_dag_id)
-        return self.has_access(permissions.ACTION_CAN_EDIT, dag_resource_name, user=user)
+        return {

Review Comment:
   I think there is no other choice. One solution is that we can have default implementation of "get_permitted" in the base auth manager that will iterate over all the dags - with a note that the auth manager should aim to implement an optimised version. This would allow us to have cake and eat it too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] o-nikolas commented on a diff in pull request #34317: Use auth manager `is_authorized_` APIs to check user permissions in Rest API

Posted by "o-nikolas (via GitHub)" <gi...@apache.org>.
o-nikolas commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1340500642


##########
airflow/auth/managers/fab/fab_auth_manager.py:
##########
@@ -171,34 +193,53 @@ def is_authorized_dag(
         entity (e.g. DAG runs).
         2. ``dag_access`` is provided which means the user wants to access a sub entity of the DAG
         (e.g. DAG runs).
-            a. If ``method`` is GET, then check the user has READ permissions on the DAG and the sub entity
-            b. Else, check the user has EDIT permissions on the DAG and ``method`` on the sub entity
+            a. If ``method`` is GET, then check the user has READ permissions on the DAG and the sub entity.
+            b. Else, check the user has EDIT permissions on the DAG and ``method`` on the sub entity.
+
+            However, if no specific DAG is targeted, just check the sub entity.
 
         :param method: The method to authorize.
-        :param dag_access_entity: The dag access entity.
-        :param dag_details: The dag details.
+        :param access_entity: The dag access entity.
+        :param details: The dag details.
         :param user: The user.
         """
-        if not dag_access_entity:
+        if not access_entity:
             # Scenario 1
-            return self._is_authorized_dag(method=method, dag_details=dag_details, user=user)
+            return self._is_authorized_dag(method=method, details=details, user=user)
         else:
             # Scenario 2
-            resource_type = self._get_fab_resource_type(dag_access_entity)
+            resource_type = self._get_fab_resource_type(access_entity)
             dag_method: ResourceMethod = "GET" if method == "GET" else "PUT"
 
-            return self._is_authorized_dag(
-                method=dag_method, dag_details=dag_details, user=user
-            ) and self._is_authorized(method=method, resource_type=resource_type, user=user)
+            if (details and details.id) and not self._is_authorized_dag(
+                method=dag_method, details=details, user=user
+            ):
+                return False
+
+            return self._is_authorized(method=method, resource_type=resource_type, user=user)
 
-    def is_authorized_dataset(self, *, method: ResourceMethod, user: BaseUser | None = None) -> bool:
+    def is_authorized_dataset(
+        self, *, method: ResourceMethod, details: DatasetDetails | None = None, user: BaseUser | None = None
+    ) -> bool:
         return self._is_authorized(method=method, resource_type=RESOURCE_DATASET, user=user)
 
-    def is_authorized_variable(self, *, method: ResourceMethod, user: BaseUser | None = None) -> bool:
+    def is_authorized_pool(
+        self, *, method: ResourceMethod, details: PoolDetails | None = None, user: BaseUser | None = None
+    ) -> bool:
+        return self._is_authorized(method=method, resource_type=RESOURCE_POOL, user=user)
+
+    def is_authorized_variable(
+        self, *, method: ResourceMethod, details: VariableDetails | None = None, user: BaseUser | None = None
+    ) -> bool:
         return self._is_authorized(method=method, resource_type=RESOURCE_VARIABLE, user=user)
 
     def is_authorized_website(self, *, user: BaseUser | None = None) -> bool:
-        return self._is_authorized(method="GET", resource_type=RESOURCE_WEBSITE, user=user)
+        return (
+            self._is_authorized(method="GET", resource_type=RESOURCE_PLUGIN, user=user)
+            or self._is_authorized(method="GET", resource_type=RESOURCE_PROVIDER, user=user)
+            or self._is_authorized(method="GET", resource_type=RESOURCE_TRIGGER, user=user)

Review Comment:
   Yeah, grouping in general makes sense to me. But I suppose in this case I don't see the logical grouping between plugin, provider, trigger and website. But I have much less context so I'll trust Jarek and you on this one :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vincbeck commented on a diff in pull request #34317: Use auth manager `is_authorized_` APIs to check user permissions in Rest API

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1340515262


##########
airflow/api_connexion/endpoints/dag_endpoint.py:
##########
@@ -69,7 +69,7 @@ def get_dag_details(*, dag_id: str) -> APIResponse:
     return dag_detail_schema.dump(dag)
 
 
-@security.requires_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG)])
+@requires_authentication

Review Comment:
   > Is this what the decorator is doing for something like get_dag(...) above? In that case we only want one specific dag, but we're checking if the user has access to all DAGs?
   
   If a dag id is passed as parameter (or in the request), then the authorization check is done only against this specific DAG. This is always the case, for now, every-time `@security.requires_access_dag("GET")` is used, a DAG id is provided. This tells me that we do not necessarily need to check whether a user has access to all DAGs. Thanks, you helped me figuring it out :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1355994191


##########
airflow/www/security_manager.py:
##########
@@ -269,125 +272,47 @@ def get_user_roles(user=None):
             user = g.user
         return user.roles
 
-    def get_readable_dags(self, user) -> Iterable[DagModel]:
-        """Gets the DAGs readable by authenticated user."""
-        warnings.warn(
-            "`get_readable_dags` has been deprecated. Please use `get_readable_dag_ids` instead.",
-            RemovedInAirflow3Warning,
-            stacklevel=2,
-        )
-        with warnings.catch_warnings():
-            warnings.simplefilter("ignore", RemovedInAirflow3Warning)
-            return self.get_accessible_dags([permissions.ACTION_CAN_READ], user)
-
-    def get_editable_dags(self, user) -> Iterable[DagModel]:
-        """Gets the DAGs editable by authenticated user."""
-        warnings.warn(
-            "`get_editable_dags` has been deprecated. Please use `get_editable_dag_ids` instead.",
-            RemovedInAirflow3Warning,
-            stacklevel=2,
-        )
-        with warnings.catch_warnings():
-            warnings.simplefilter("ignore", RemovedInAirflow3Warning)
-            return self.get_accessible_dags([permissions.ACTION_CAN_EDIT], user)
-
-    @provide_session
-    def get_accessible_dags(
-        self,
-        user_actions: Container[str] | None,
-        user,
-        session: Session = NEW_SESSION,
-    ) -> Iterable[DagModel]:
-        warnings.warn(
-            "`get_accessible_dags` has been deprecated. Please use `get_accessible_dag_ids` instead.",
-            RemovedInAirflow3Warning,
-            stacklevel=3,
-        )
-        dag_ids = self.get_accessible_dag_ids(user, user_actions, session)
-        return session.scalars(select(DagModel).where(DagModel.dag_id.in_(dag_ids)))
-
-    def get_readable_dag_ids(self, user) -> set[str]:
+    def get_readable_dag_ids(self, user=None) -> set[str]:
         """Gets the DAG IDs readable by authenticated user."""
-        return self.get_accessible_dag_ids(user, [permissions.ACTION_CAN_READ])
+        return self.get_permitted_dag_ids(methods=["GET"], user=user)
 
-    def get_editable_dag_ids(self, user) -> set[str]:
+    def get_editable_dag_ids(self, user=None) -> set[str]:
         """Gets the DAG IDs editable by authenticated user."""
-        return self.get_accessible_dag_ids(user, [permissions.ACTION_CAN_EDIT])
+        return self.get_permitted_dag_ids(methods=["PUT"], user=user)
 
     @provide_session
-    def get_accessible_dag_ids(
+    def get_permitted_dag_ids(
         self,
-        user,
-        user_actions: Container[str] | None = None,
+        *,
+        methods: Container[ResourceMethod] | None = None,
+        user=None,
         session: Session = NEW_SESSION,
     ) -> set[str]:
         """Generic function to get readable or writable DAGs for user."""
-        if not user_actions:
-            user_actions = [permissions.ACTION_CAN_EDIT, permissions.ACTION_CAN_READ]
+        if not methods:
+            methods = ["PUT", "GET"]
 
-        if not get_auth_manager().is_logged_in():
-            roles = user.roles
-        else:
-            if (permissions.ACTION_CAN_EDIT in user_actions and self.can_edit_all_dags(user)) or (
-                permissions.ACTION_CAN_READ in user_actions and self.can_read_all_dags(user)
-            ):
-                return {dag.dag_id for dag in session.execute(select(DagModel.dag_id))}
-            user_query = session.scalar(
-                select(User)
-                .options(
-                    joinedload(User.roles)
-                    .subqueryload(Role.permissions)
-                    .options(joinedload(Permission.action), joinedload(Permission.resource))
-                )
-                .where(User.id == user.id)
-            )
-            roles = user_query.roles
-
-        resources = set()
-        for role in roles:
-            for permission in role.permissions:
-                action = permission.action.name
-                if action in user_actions:
-                    resource = permission.resource.name
-                    if resource == permissions.RESOURCE_DAG:
-                        return {dag.dag_id for dag in session.execute(select(DagModel.dag_id))}
-                    if resource.startswith(permissions.RESOURCE_DAG_PREFIX):
-                        resources.add(resource[len(permissions.RESOURCE_DAG_PREFIX) :])
-                    else:
-                        resources.add(resource)
-        return {
-            dag.dag_id
-            for dag in session.execute(select(DagModel.dag_id).where(DagModel.dag_id.in_(resources)))
-        }
-
-    def can_access_some_dags(self, action: str, dag_id: str | None = None) -> bool:
-        """Checks if user has read or write access to some dags."""
-        if dag_id and dag_id != "~":
-            root_dag_id = self._get_root_dag_id(dag_id)
-            return self.has_access(action, permissions.resource_name_for_dag(root_dag_id))
+        dag_ids = {dag.dag_id for dag in session.execute(select(DagModel.dag_id))}
 
-        user = g.user
-        if action == permissions.ACTION_CAN_READ:
-            return any(self.get_readable_dag_ids(user))
-        return any(self.get_editable_dag_ids(user))
+        if ("GET" in methods and get_auth_manager().is_authorized_dag(method="GET", user=user)) or (
+            "PUT" in methods and get_auth_manager().is_authorized_dag(method="PUT", user=user)
+        ):
+            return dag_ids
 
-    def can_read_dag(self, dag_id: str, user=None) -> bool:
-        """Determines whether a user has DAG read access."""
-        root_dag_id = self._get_root_dag_id(dag_id)
-        dag_resource_name = permissions.resource_name_for_dag(root_dag_id)
-        return self.has_access(permissions.ACTION_CAN_READ, dag_resource_name, user=user)
+        def _is_permitted_dag_id(method: ResourceMethod, methods: Container[ResourceMethod], dag_id: str):
+            return method in methods and get_auth_manager().is_authorized_dag(
+                method=method, details=DagDetails(id=dag_id), user=user
+            )
 
-    def can_edit_dag(self, dag_id: str, user=None) -> bool:
-        """Determines whether a user has DAG edit access."""
-        root_dag_id = self._get_root_dag_id(dag_id)
-        dag_resource_name = permissions.resource_name_for_dag(root_dag_id)
-        return self.has_access(permissions.ACTION_CAN_EDIT, dag_resource_name, user=user)
+        return {

Review Comment:
   I think if we leave it like this, it might incur a performance hit for FAB users who have only access to some dags. In the current implementation reading all dags as set is only done when the user has permission to read all dags in the role.
   
   Also it hampers any future multi-tenant implementation of auth manager, because basically it will not allow to optimize this via auth_manager. Basically for all users or all tenants, they will have to read all dag_ids to memory always when they want to see DAGs page. This will become a problem very quickly for any multi-tenant environment.
   
   I **think** the solution for that should be different. Rather than implementing all the loop here, the "get_permitted_dag_ids" should be implemented in the `auth_manager`.  
   
   Here we can do some basic checks for all dags, but then getting the list of allowed dags should be delegated to auth_manager.
   
   ```python
           if ("GET" in methods and get_auth_manager().is_authorized_dag(method="GET", user=user)) or (
               "PUT" in methods and get_auth_manager().is_authorized_dag(method="PUT", user=user)
           ):
               return {dag.dag_id for dag in session.execute(select(DagModel.dag_id))}
   
          return get_auth_manager().get_permitted_dag_ids(methods=methods, user=user)
   ```
   
   Then each auth manager could implement an optimized version of the `get_permitted_dag_ids`. For FAB, it will be the original implementation with resources, but for others it could be different. Conceptually this for example:
   
   ```python
   tenant = get_tenant(user)
   return {dag.dag_id for dag in session.execute(select(DagModel.dag_id).filter_by(tenant=tenant)}
   ```
   
   This is no problem for auth manager to have two ways of checking permissions:
   
   1) when you have dag_id - you can check it individually
   2) when you get the list you have a method that returns the list based on same criteria but implemented without using the individual check (bulk).
   
   This is the only way to implement performant version of such permission check IMHO.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1358577475


##########
airflow/www/extensions/init_jinja_globals.py:
##########
@@ -69,10 +70,13 @@ def prepare_jinja_globals():
             "git_version": git_version,
             "k8s_or_k8scelery_executor": IS_K8S_OR_K8SCELERY_EXECUTOR,
             "rest_api_enabled": False,
-            "auth_manager": get_auth_manager(),
             "config_test_connection": conf.get("core", "test_connection", fallback="Disabled"),
         }
 
+        # Extra global specific to auth manager
+        extra_globals["auth_manager"] = get_auth_manager()
+        extra_globals["DagDetails"] = DagDetails

Review Comment:
   It seems `dag.html` is also used to render individual DAG in a view where there are multiple DAGs. Therefore, there is no `kwargs['dag']` but `kwargs['dags']`. I reverted the changes unless there is a better solution



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1355894015


##########
airflow/api_connexion/endpoints/task_instance_endpoint.py:
##########
@@ -61,13 +61,8 @@
 T = TypeVar("T")
 
 
-@security.requires_access(
-    [
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
-    ],
-)
+@security.requires_access_dag("GET", DagAccessEntity.RUN)
+@security.requires_access_dag("GET", DagAccessEntity.TASK_INSTANCE)

Review Comment:
   Finally got to review it after the break (and this and next week will have a lot of focus on those).
   
   I think this really calls for small refactoring: 
   
   ```python
   def requires_access_dag(
       method: ResourceMethod, access_entities: tuple[DAGAccessEntity] | DagAccessEntity | None = None
   ```
   
   and then:
   
   ```
   @security.requires_access_dag("GET", (DagAccessEntity.RUN, DagAccessEntity.TASK_INSTANCE))
   ```
   
   and convert `access_entity` to `access_entities: tuple[DAGDetails]` (we can convert single dag detail to one-element tuple).
   
   Not sure if now or as a follow up to this PR, but I am quite sure this will be much better for performance point of view (one callback instead of two and it will be pretty much impossible to optimize it if we have two decorators. 
   
   And since it will change the callback, it's likely better to do it it now.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1355907009


##########
airflow/api_connexion/endpoints/task_instance_endpoint.py:
##########
@@ -527,13 +497,8 @@ def post_clear_task_instances(*, dag_id: str, session: Session = NEW_SESSION) ->
     )
 
 
-@security.requires_access(
-    [
-        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
-        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
-    ],
-)
+@security.requires_access_dag("GET", DagAccessEntity.RUN)
+@security.requires_access_dag("PUT", DagAccessEntity.TASK_INSTANCE)

Review Comment:
   This is also wrong. I personally think the case where you can "read" dag_run but "update" TASK_INSTANCE is unrealistic and actually wrong. 
   
   When you look deeply into set_task_instance_state you will find out that in some cases it can lead to creating dag_runs (for subtasks).
   
   ```
   set_state -> _iter_subdag_run_ids
   ```
   
   from `_iter_subdag_run_ids`
   
   ```python
               if isinstance(current_task, SubDagOperator) or current_task.task_type == "SubDagOperator":
                   # this works as a kind of integrity check
                   # it creates missing dag runs for subdag operators,
                   # maybe this should be moved to dagrun.verify_integrity
                   dag_runs = _create_dagruns(
                       current_task.subdag,
                       infos=confirmed_infos,
                       state=DagRunState.RUNNING,
                       run_type=DagRunType.BACKFILL_JOB,
                   )
   
                   verify_dagruns(dag_runs, commit, state, session, current_task)
                   dags.append(current_task.subdag)
                   yield current_task.subdag.dag_id
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1355927810


##########
airflow/api_connexion/security.py:
##########
@@ -48,10 +61,194 @@ def requires_access_decorator(func: T):
         @wraps(func)
         def decorated(*args, **kwargs):
             check_authentication()
-            if appbuilder.sm.check_authorization(permissions, kwargs.get("dag_id")):
+            if appbuilder.sm.check_authorization(permissions):
                 return func(*args, **kwargs)
             raise PermissionDenied()
 
         return cast(T, decorated)
 
     return requires_access_decorator
+
+
+def _requires_access(*, is_authorized_callback: Callable[[], bool], func: Callable, args, kwargs) -> bool:
+    """
+    Define the behavior whether the user is authorized to access the resource.
+
+    :param is_authorized_callback: callback to execute to figure whether the user is authorized to access
+        the resource
+    :param func: the function to call if the user is authorized
+    :param args: the arguments of ``func``
+    :param kwargs: the keyword arguments ``func``
+
+    :meta private:
+    """
+    check_authentication()
+    if is_authorized_callback():
+        return func(*args, **kwargs)
+    raise PermissionDenied()
+
+
+def requires_authentication(func: T):
+    """Decorator for functions that require authentication."""
+
+    @wraps(func)
+    def decorated(*args, **kwargs):
+        check_authentication()
+        return func(*args, **kwargs)
+
+    return cast(T, decorated)
+
+
+def requires_access_configuration(method: ResourceMethod) -> Callable[[T], T]:
+    def requires_access_decorator(func: T):
+        @wraps(func)
+        def decorated(*args, **kwargs):
+            section: str | None = kwargs.get("section")
+            return _requires_access(
+                is_authorized_callback=lambda: get_auth_manager().is_authorized_configuration(
+                    method=method, details=ConfigurationDetails(section=section)
+                ),
+                func=func,
+                args=args,
+                kwargs=kwargs,
+            )
+
+        return cast(T, decorated)
+
+    return requires_access_decorator
+
+
+def requires_access_connection(method: ResourceMethod) -> Callable[[T], T]:
+    def requires_access_decorator(func: T):
+        @wraps(func)
+        def decorated(*args, **kwargs):
+            connection_id: str | None = kwargs.get("connection_id")
+            return _requires_access(
+                is_authorized_callback=lambda: get_auth_manager().is_authorized_connection(
+                    method=method, details=ConnectionDetails(conn_id=connection_id)
+                ),
+                func=func,
+                args=args,
+                kwargs=kwargs,
+            )
+
+        return cast(T, decorated)
+
+    return requires_access_decorator
+
+
+def requires_access_dag(
+    method: ResourceMethod, access_entity: DagAccessEntity | None = None
+) -> Callable[[T], T]:
+    appbuilder = get_airflow_app().appbuilder
+
+    def _is_authorized_callback(dag_id: str):
+        def callback():
+            access = get_auth_manager().is_authorized_dag(
+                method=method,
+                access_entity=access_entity,
+                details=DagDetails(id=dag_id),
+            )
+
+            # ``access`` means here:
+            # - if a DAG id is provided (``dag_id`` not None): is the user authorized to access this DAG
+            # - if no DAG id is provided: is the user authorized to access all DAGs
+            if dag_id or access:
+                return access
+
+            # No DAG id is provided and the user is not authorized to access all DAGs
+            # If method is "GET", return whether the user has read access to any DAGs
+            # If method is "PUT", return whether the user has edit access to any DAGs
+            return (method == "GET" and any(appbuilder.sm.get_readable_dag_ids())) or (

Review Comment:
   Is there a reason why this check is here and not in the FAB implementation ? I believe FAB manager should be able to check it on its own in the `is_authorized_dag` method and there is no need to do this check in the decorator? Or maybe I am missing something?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1359507417


##########
airflow/api_connexion/security.py:
##########
@@ -48,10 +61,194 @@ def requires_access_decorator(func: T):
         @wraps(func)
         def decorated(*args, **kwargs):
             check_authentication()
-            if appbuilder.sm.check_authorization(permissions, kwargs.get("dag_id")):
+            if appbuilder.sm.check_authorization(permissions):
                 return func(*args, **kwargs)
             raise PermissionDenied()
 
         return cast(T, decorated)
 
     return requires_access_decorator
+
+
+def _requires_access(*, is_authorized_callback: Callable[[], bool], func: Callable, args, kwargs) -> bool:
+    """
+    Define the behavior whether the user is authorized to access the resource.
+
+    :param is_authorized_callback: callback to execute to figure whether the user is authorized to access
+        the resource
+    :param func: the function to call if the user is authorized
+    :param args: the arguments of ``func``
+    :param kwargs: the keyword arguments ``func``
+
+    :meta private:
+    """
+    check_authentication()
+    if is_authorized_callback():
+        return func(*args, **kwargs)
+    raise PermissionDenied()
+
+
+def requires_authentication(func: T):
+    """Decorator for functions that require authentication."""
+
+    @wraps(func)
+    def decorated(*args, **kwargs):
+        check_authentication()
+        return func(*args, **kwargs)
+
+    return cast(T, decorated)
+
+
+def requires_access_configuration(method: ResourceMethod) -> Callable[[T], T]:
+    def requires_access_decorator(func: T):
+        @wraps(func)
+        def decorated(*args, **kwargs):
+            section: str | None = kwargs.get("section")
+            return _requires_access(
+                is_authorized_callback=lambda: get_auth_manager().is_authorized_configuration(
+                    method=method, details=ConfigurationDetails(section=section)
+                ),
+                func=func,
+                args=args,
+                kwargs=kwargs,
+            )
+
+        return cast(T, decorated)
+
+    return requires_access_decorator
+
+
+def requires_access_connection(method: ResourceMethod) -> Callable[[T], T]:
+    def requires_access_decorator(func: T):
+        @wraps(func)
+        def decorated(*args, **kwargs):
+            connection_id: str | None = kwargs.get("connection_id")
+            return _requires_access(
+                is_authorized_callback=lambda: get_auth_manager().is_authorized_connection(
+                    method=method, details=ConnectionDetails(conn_id=connection_id)
+                ),
+                func=func,
+                args=args,
+                kwargs=kwargs,
+            )
+
+        return cast(T, decorated)
+
+    return requires_access_decorator
+
+
+def requires_access_dag(
+    method: ResourceMethod, access_entity: DagAccessEntity | None = None
+) -> Callable[[T], T]:
+    appbuilder = get_airflow_app().appbuilder
+
+    def _is_authorized_callback(dag_id: str):
+        def callback():
+            access = get_auth_manager().is_authorized_dag(
+                method=method,
+                access_entity=access_entity,
+                details=DagDetails(id=dag_id),
+            )
+
+            # ``access`` means here:
+            # - if a DAG id is provided (``dag_id`` not None): is the user authorized to access this DAG
+            # - if no DAG id is provided: is the user authorized to access all DAGs
+            if dag_id or access:
+                return access
+
+            # No DAG id is provided and the user is not authorized to access all DAGs
+            # If method is "GET", return whether the user has read access to any DAGs
+            # If method is "PUT", return whether the user has edit access to any DAGs
+            return (method == "GET" and any(appbuilder.sm.get_readable_dag_ids())) or (

Review Comment:
   Also that plays nice with actually having "get_readable_dag_ids" as separate method (and default implementation involving iterating over all dags and individually checking if it has access - see the https://github.com/apache/airflow/pull/34317#discussion_r1355994191 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1361283427


##########
airflow/auth/managers/fab/fab_auth_manager.py:
##########
@@ -65,22 +79,24 @@
         CLICommand,
     )
 
-_MAP_METHOD_NAME_TO_FAB_ACTION_NAME: dict[ResourceMethod, str] = {
+MAP_METHOD_NAME_TO_FAB_ACTION_NAME: dict[ResourceMethod, str] = {
     "POST": ACTION_CAN_CREATE,
     "GET": ACTION_CAN_READ,
     "PUT": ACTION_CAN_EDIT,
     "DELETE": ACTION_CAN_DELETE,
 }
 
-_MAP_DAG_ACCESS_ENTITY_TO_FAB_RESOURCE_TYPE = {
-    DagAccessEntity.AUDIT_LOG: RESOURCE_AUDIT_LOG,
-    DagAccessEntity.CODE: RESOURCE_DAG_CODE,
-    DagAccessEntity.DATASET: RESOURCE_DATASET,
-    DagAccessEntity.DEPENDENCIES: RESOURCE_DAG_DEPENDENCIES,
-    DagAccessEntity.RUN: RESOURCE_DAG_RUN,
-    DagAccessEntity.TASK_INSTANCE: RESOURCE_TASK_INSTANCE,
-    DagAccessEntity.TASK_LOGS: RESOURCE_TASK_LOG,
-    DagAccessEntity.XCOM: RESOURCE_XCOM,
+_MAP_DAG_ACCESS_ENTITY_TO_FAB_RESOURCE_TYPE: dict[DagAccessEntity, tuple[str, ...]] = {
+    DagAccessEntity.AUDIT_LOG: (RESOURCE_AUDIT_LOG,),
+    DagAccessEntity.CODE: (RESOURCE_DAG_CODE,),
+    DagAccessEntity.DEPENDENCIES: (RESOURCE_DAG_DEPENDENCIES,),
+    DagAccessEntity.IMPORT_ERRORS: (RESOURCE_IMPORT_ERROR,),
+    DagAccessEntity.RUN: (RESOURCE_DAG_RUN,),
+    DagAccessEntity.TASK: (RESOURCE_TASK_INSTANCE,),

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vincbeck commented on pull request #34317: Use auth manager `is_authorized_` APIs to check user permissions in Rest API

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck commented on PR #34317:
URL: https://github.com/apache/airflow/pull/34317#issuecomment-1736166530

   Tests are passing


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1347963150


##########
airflow/www/views.py:
##########
@@ -3908,9 +3909,11 @@ class DagFilter(BaseFilter):
     """Filter using DagIDs."""
 
     def apply(self, query, func):
-        if get_airflow_app().appbuilder.sm.has_all_dags_access(g.user):
+        if get_auth_manager().is_authorized_dag(
+            method="GET", user=g.user
+        ) or get_auth_manager().is_authorized_dag(method="PUT", user=g.user):
             return query

Review Comment:
   Sounds good to me 👍 



##########
airflow/www/views.py:
##########
@@ -3908,9 +3909,11 @@ class DagFilter(BaseFilter):
     """Filter using DagIDs."""
 
     def apply(self, query, func):
-        if get_airflow_app().appbuilder.sm.has_all_dags_access(g.user):
+        if get_auth_manager().is_authorized_dag(
+            method="GET", user=g.user
+        ) or get_auth_manager().is_authorized_dag(method="PUT", user=g.user):
             return query

Review Comment:
   Sounds good to me 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] o-nikolas commented on a diff in pull request #34317: Use auth manager `is_authorized_` APIs to check user permissions in Rest API

Posted by "o-nikolas (via GitHub)" <gi...@apache.org>.
o-nikolas commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1339330087


##########
airflow/api_connexion/security.py:
##########
@@ -55,3 +68,166 @@ def decorated(*args, **kwargs):
         return cast(T, decorated)
 
     return requires_access_decorator
+
+
+def _requires_access(*, is_authorized_callback: Callable[[], bool], func: Callable, args, kwargs):
+    """
+    Define the behavior whether the user is authorized to access the resource.
+
+    :param is_authorized_callback: callback to execute to figure whether the user is authorized to access
+        the resource?

Review Comment:
   minor nit, but why the question mark at the end here?



##########
airflow/auth/managers/base_auth_manager.py:
##########
@@ -145,12 +151,30 @@ def is_authorized_dataset(
         self,
         *,
         method: ResourceMethod,
+        details: DatasetDetails | None = None,
         user: BaseUser | None = None,
     ) -> bool:
         """
         Return whether the user is authorized to perform a given action on a dataset.
 
         :param method: the method to perform
+        :param details: optional details about the variable

Review Comment:
   ```suggestion
           :param details: optional details about the dataset
   ```



##########
airflow/auth/managers/base_auth_manager.py:
##########
@@ -82,12 +86,14 @@ def is_authorized_configuration(
         self,
         *,
         method: ResourceMethod,
+        details: ConfigurationDetails | None = None,
         user: BaseUser | None = None,
     ) -> bool:
         """
         Return whether the user is authorized to perform a given action on configuration.
 
         :param method: the method to perform
+        :param details: optional details about the connection

Review Comment:
   ```suggestion
           :param details: optional details about the configuration
   ```



##########
airflow/auth/managers/fab/fab_auth_manager.py:
##########
@@ -171,34 +193,53 @@ def is_authorized_dag(
         entity (e.g. DAG runs).
         2. ``dag_access`` is provided which means the user wants to access a sub entity of the DAG
         (e.g. DAG runs).
-            a. If ``method`` is GET, then check the user has READ permissions on the DAG and the sub entity
-            b. Else, check the user has EDIT permissions on the DAG and ``method`` on the sub entity
+            a. If ``method`` is GET, then check the user has READ permissions on the DAG and the sub entity.
+            b. Else, check the user has EDIT permissions on the DAG and ``method`` on the sub entity.
+
+            However, if no specific DAG is targeted, just check the sub entity.
 
         :param method: The method to authorize.
-        :param dag_access_entity: The dag access entity.
-        :param dag_details: The dag details.
+        :param access_entity: The dag access entity.
+        :param details: The dag details.
         :param user: The user.
         """
-        if not dag_access_entity:
+        if not access_entity:
             # Scenario 1
-            return self._is_authorized_dag(method=method, dag_details=dag_details, user=user)
+            return self._is_authorized_dag(method=method, details=details, user=user)
         else:
             # Scenario 2
-            resource_type = self._get_fab_resource_type(dag_access_entity)
+            resource_type = self._get_fab_resource_type(access_entity)
             dag_method: ResourceMethod = "GET" if method == "GET" else "PUT"
 
-            return self._is_authorized_dag(
-                method=dag_method, dag_details=dag_details, user=user
-            ) and self._is_authorized(method=method, resource_type=resource_type, user=user)
+            if (details and details.id) and not self._is_authorized_dag(
+                method=dag_method, details=details, user=user
+            ):
+                return False
+
+            return self._is_authorized(method=method, resource_type=resource_type, user=user)
 
-    def is_authorized_dataset(self, *, method: ResourceMethod, user: BaseUser | None = None) -> bool:
+    def is_authorized_dataset(
+        self, *, method: ResourceMethod, details: DatasetDetails | None = None, user: BaseUser | None = None
+    ) -> bool:
         return self._is_authorized(method=method, resource_type=RESOURCE_DATASET, user=user)
 
-    def is_authorized_variable(self, *, method: ResourceMethod, user: BaseUser | None = None) -> bool:
+    def is_authorized_pool(
+        self, *, method: ResourceMethod, details: PoolDetails | None = None, user: BaseUser | None = None
+    ) -> bool:
+        return self._is_authorized(method=method, resource_type=RESOURCE_POOL, user=user)
+
+    def is_authorized_variable(
+        self, *, method: ResourceMethod, details: VariableDetails | None = None, user: BaseUser | None = None
+    ) -> bool:
         return self._is_authorized(method=method, resource_type=RESOURCE_VARIABLE, user=user)
 
     def is_authorized_website(self, *, user: BaseUser | None = None) -> bool:
-        return self._is_authorized(method="GET", resource_type=RESOURCE_WEBSITE, user=user)
+        return (
+            self._is_authorized(method="GET", resource_type=RESOURCE_PLUGIN, user=user)
+            or self._is_authorized(method="GET", resource_type=RESOURCE_PROVIDER, user=user)
+            or self._is_authorized(method="GET", resource_type=RESOURCE_TRIGGER, user=user)

Review Comment:
   Just for my own understanding, why are we checking these things when we're interested if the user is authorized for the website?



##########
airflow/auth/managers/base_auth_manager.py:
##########
@@ -145,12 +151,30 @@ def is_authorized_dataset(
         self,
         *,
         method: ResourceMethod,
+        details: DatasetDetails | None = None,
         user: BaseUser | None = None,
     ) -> bool:
         """
         Return whether the user is authorized to perform a given action on a dataset.
 
         :param method: the method to perform
+        :param details: optional details about the variable
+        :param user: the user to perform the action on. If not provided (or None), it uses the current user
+        """
+
+    @abstractmethod
+    def is_authorized_pool(
+        self,
+        *,
+        method: ResourceMethod,
+        details: PoolDetails | None = None,
+        user: BaseUser | None = None,
+    ) -> bool:
+        """
+        Return whether the user is authorized to perform a given action on a pool.
+
+        :param method: the method to perform
+        :param details: optional details about the variable

Review Comment:
   ```suggestion
           :param details: optional details about the pool
   ```



##########
airflow/api_connexion/endpoints/dag_endpoint.py:
##########
@@ -69,7 +69,7 @@ def get_dag_details(*, dag_id: str) -> APIResponse:
     return dag_detail_schema.dump(dag)
 
 
-@security.requires_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG)])
+@requires_authentication

Review Comment:
   Just curious, why no method on this one? I expected:
   ```python
   @security.requires_access_dag("GET")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pingu1m commented on a diff in pull request #34317: Use auth manager `is_authorized_` APIs to check user permissions in Rest API

Posted by "pingu1m (via GitHub)" <gi...@apache.org>.
pingu1m commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1337848638


##########
airflow/api_connexion/endpoints/xcom_endpoint.py:
##########
@@ -39,14 +39,7 @@
     from airflow.api_connexion.types import APIResponse
 
 
-@security.requires_access(
-    [
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_XCOM),
-    ],
-)
+@security.requires_access_dag("GET", DagAccessEntity.XCOM)

Review Comment:
   Why only `DagAccessEntity.XCOM` is added to the decorator? On the original code we have other entities: `DAG, DAG_RUN, TASK_INSTANCE`



##########
airflow/api_connexion/endpoints/dag_endpoint.py:
##########
@@ -180,7 +180,7 @@ def patch_dags(limit, session, offset=0, only_active=True, tags=None, dag_id_pat
     return dags_collection_schema.dump(DAGCollection(dags=dags, total_entries=total_entries))
 
 
-@security.requires_access([(permissions.ACTION_CAN_DELETE, permissions.RESOURCE_DAG)])
+@security.requires_access_dag("DELETE")

Review Comment:
   Whats the difference between the current line and 
   `@security.requires_access_dag("DELETE", DagAccessEntity.RUN)`
   ? Also should a enum/constant be used instead of the string "DELETE"? 
   This applies for other methods too.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1358503642


##########
airflow/www/extensions/init_jinja_globals.py:
##########
@@ -69,10 +70,13 @@ def prepare_jinja_globals():
             "git_version": git_version,
             "k8s_or_k8scelery_executor": IS_K8S_OR_K8SCELERY_EXECUTOR,
             "rest_api_enabled": False,
-            "auth_manager": get_auth_manager(),
             "config_test_connection": conf.get("core", "test_connection", fallback="Disabled"),
         }
 
+        # Extra global specific to auth manager
+        extra_globals["auth_manager"] = get_auth_manager()
+        extra_globals["DagDetails"] = DagDetails

Review Comment:
   ~~From the tests it does not seem to work:~~



##########
airflow/www/extensions/init_jinja_globals.py:
##########
@@ -69,10 +70,13 @@ def prepare_jinja_globals():
             "git_version": git_version,
             "k8s_or_k8scelery_executor": IS_K8S_OR_K8SCELERY_EXECUTOR,
             "rest_api_enabled": False,
-            "auth_manager": get_auth_manager(),
             "config_test_connection": conf.get("core", "test_connection", fallback="Disabled"),
         }
 
+        # Extra global specific to auth manager
+        extra_globals["auth_manager"] = get_auth_manager()
+        extra_globals["DagDetails"] = DagDetails

Review Comment:
   ~~It seems `dag.html` is also used to render individual DAG in a view where there are multiple DAGs. Therefore, there is no `kwargs['dag']` but `kwargs['dags']`. I reverted the changes unless there is a better solution~~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1357188838


##########
airflow/api_connexion/endpoints/task_instance_endpoint.py:
##########
@@ -527,13 +497,8 @@ def post_clear_task_instances(*, dag_id: str, session: Session = NEW_SESSION) ->
     )
 
 
-@security.requires_access(
-    [
-        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
-        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
-    ],
-)
+@security.requires_access_dag("GET", DagAccessEntity.RUN)
+@security.requires_access_dag("PUT", DagAccessEntity.TASK_INSTANCE)

Review Comment:
   I removed `@security.requires_access_dag("GET", DagAccessEntity.RUN)`



##########
airflow/api_connexion/endpoints/task_instance_endpoint.py:
##########
@@ -646,13 +606,8 @@ def patch_task_instance(
     return task_instance_reference_schema.dump(ti)
 
 
-@security.requires_access(
-    [
-        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
-        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
-    ],
-)
+@security.requires_access_dag("GET", DagAccessEntity.RUN)

Review Comment:
   I removed `@security.requires_access_dag("GET", DagAccessEntity.RUN)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1357189276


##########
airflow/api_connexion/security.py:
##########
@@ -48,10 +61,194 @@ def requires_access_decorator(func: T):
         @wraps(func)
         def decorated(*args, **kwargs):
             check_authentication()
-            if appbuilder.sm.check_authorization(permissions, kwargs.get("dag_id")):
+            if appbuilder.sm.check_authorization(permissions):

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1355177606


##########
airflow/api_connexion/security.py:
##########
@@ -48,10 +61,194 @@ def requires_access_decorator(func: T):
         @wraps(func)
         def decorated(*args, **kwargs):
             check_authentication()
-            if appbuilder.sm.check_authorization(permissions, kwargs.get("dag_id")):
+            if appbuilder.sm.check_authorization(permissions):
                 return func(*args, **kwargs)
             raise PermissionDenied()
 
         return cast(T, decorated)
 
     return requires_access_decorator
+
+
+def _requires_access(*, is_authorized_callback: Callable[[], bool], func: Callable, args, kwargs) -> bool:
+    """
+    Define the behavior whether the user is authorized to access the resource.
+
+    :param is_authorized_callback: callback to execute to figure whether the user is authorized to access
+        the resource
+    :param func: the function to call if the user is authorized
+    :param args: the arguments of ``func``
+    :param kwargs: the keyword arguments ``func``
+
+    :meta private:
+    """
+    check_authentication()
+    if is_authorized_callback():
+        return func(*args, **kwargs)
+    raise PermissionDenied()
+
+
+def requires_authentication(func: T):
+    """Decorator for functions that require authentication."""
+
+    @wraps(func)
+    def decorated(*args, **kwargs):
+        check_authentication()
+        return func(*args, **kwargs)
+
+    return cast(T, decorated)
+
+
+def requires_access_configuration(method: ResourceMethod) -> Callable[[T], T]:
+    def requires_access_decorator(func: T):
+        @wraps(func)
+        def decorated(*args, **kwargs):
+            section: str | None = kwargs.get("section")
+            return _requires_access(
+                is_authorized_callback=lambda: get_auth_manager().is_authorized_configuration(
+                    method=method, details=ConfigurationDetails(section=section)
+                ),
+                func=func,
+                args=args,
+                kwargs=kwargs,
+            )
+
+        return cast(T, decorated)
+
+    return requires_access_decorator
+
+
+def requires_access_connection(method: ResourceMethod) -> Callable[[T], T]:
+    def requires_access_decorator(func: T):
+        @wraps(func)
+        def decorated(*args, **kwargs):
+            connection_id: str | None = kwargs.get("connection_id")
+            return _requires_access(
+                is_authorized_callback=lambda: get_auth_manager().is_authorized_connection(
+                    method=method, details=ConnectionDetails(conn_id=connection_id)
+                ),
+                func=func,
+                args=args,
+                kwargs=kwargs,
+            )
+
+        return cast(T, decorated)
+
+    return requires_access_decorator
+
+
+def requires_access_dag(
+    method: ResourceMethod, access_entity: DagAccessEntity | None = None
+) -> Callable[[T], T]:
+    appbuilder = get_airflow_app().appbuilder
+
+    def _is_authorized_callback(dag_id: str):
+        def callback():
+            access = get_auth_manager().is_authorized_dag(
+                method=method,
+                access_entity=access_entity,
+                details=DagDetails(id=dag_id),
+            )
+
+            # ``access`` means here:
+            # - if a DAG id is provided (``dag_id`` not None): is the user authorized to access this DAG
+            # - if no DAG id is provided: is the user authorized to access all DAGs
+            if dag_id or access:
+                return access
+
+            # No DAG id is provided and the user is not authorized to access all DAGs
+            # If method is "GET", return whether the user has read access to any DAGs
+            # If method is "PUT", return whether the user has edit access to any DAGs
+            return (method == "GET" and any(appbuilder.sm.get_readable_dag_ids())) or (
+                method == "PUT" and any(appbuilder.sm.get_editable_dag_ids())
+            )
+
+        return callback
+
+    def requires_access_decorator(func: T):
+        @wraps(func)
+        def decorated(*args, **kwargs):

Review Comment:
   Sure! Happy to do that in another PR. Let me put it in my list :)



##########
airflow/www/extensions/init_jinja_globals.py:
##########
@@ -69,10 +70,13 @@ def prepare_jinja_globals():
             "git_version": git_version,
             "k8s_or_k8scelery_executor": IS_K8S_OR_K8SCELERY_EXECUTOR,
             "rest_api_enabled": False,
-            "auth_manager": get_auth_manager(),
             "config_test_connection": conf.get("core", "test_connection", fallback="Disabled"),
         }
 
+        # Extra global specific to auth manager
+        extra_globals["auth_manager"] = get_auth_manager()
+        extra_globals["DagDetails"] = DagDetails

Review Comment:
   I could do that. The only reason I did that was in case we needed it in the future for other things. But I guess I can do as you said for now



##########
airflow/www/extensions/init_jinja_globals.py:
##########
@@ -69,10 +70,13 @@ def prepare_jinja_globals():
             "git_version": git_version,
             "k8s_or_k8scelery_executor": IS_K8S_OR_K8SCELERY_EXECUTOR,
             "rest_api_enabled": False,
-            "auth_manager": get_auth_manager(),
             "config_test_connection": conf.get("core", "test_connection", fallback="Disabled"),
         }
 
+        # Extra global specific to auth manager
+        extra_globals["auth_manager"] = get_auth_manager()
+        extra_globals["DagDetails"] = DagDetails

Review Comment:
   But looking at the code, `dag.html` is used a lot in multiple views. So I would have to compute and pass the value `can_edit` in multiple views. I am not sure this is better either. WDYT?



##########
airflow/api_connexion/endpoints/task_endpoint.py:
##########
@@ -22,21 +22,16 @@
 from airflow.api_connexion import security
 from airflow.api_connexion.exceptions import BadRequest, NotFound
 from airflow.api_connexion.schemas.task_schema import TaskCollection, task_collection_schema, task_schema
+from airflow.auth.managers.models.resource_details import DagAccessEntity
 from airflow.exceptions import TaskNotFound
-from airflow.security import permissions
 from airflow.utils.airflow_flask_app import get_airflow_app
 
 if TYPE_CHECKING:
     from airflow import DAG
     from airflow.api_connexion.types import APIResponse
 
 
-@security.requires_access(
-    [
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
-    ],
-)
+@security.requires_access_dag("GET", DagAccessEntity.TASK_INSTANCE)

Review Comment:
   Can you elaborate? This is the exact same permissions we were checking before. Before we were checking permissions on task instance and dag level. `@security.requires_access_dag("GET", DagAccessEntity.TASK_INSTANCE)` does exactly the same



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1355558587


##########
airflow/api_connexion/endpoints/task_endpoint.py:
##########
@@ -22,21 +22,16 @@
 from airflow.api_connexion import security
 from airflow.api_connexion.exceptions import BadRequest, NotFound
 from airflow.api_connexion.schemas.task_schema import TaskCollection, task_collection_schema, task_schema
+from airflow.auth.managers.models.resource_details import DagAccessEntity
 from airflow.exceptions import TaskNotFound
-from airflow.security import permissions
 from airflow.utils.airflow_flask_app import get_airflow_app
 
 if TYPE_CHECKING:
     from airflow import DAG
     from airflow.api_connexion.types import APIResponse
 
 
-@security.requires_access(
-    [
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
-    ],
-)
+@security.requires_access_dag("GET", DagAccessEntity.TASK_INSTANCE)

Review Comment:
   The task endpoints checked for DAG and task instance perms, while the task instance ones checked for them plus DAG run perms. Now they all check for the same perms.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1355917769


##########
airflow/api_connexion/endpoints/task_instance_endpoint.py:
##########
@@ -61,13 +61,8 @@
 T = TypeVar("T")
 
 
-@security.requires_access(
-    [
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
-    ],
-)
+@security.requires_access_dag("GET", DagAccessEntity.RUN)
+@security.requires_access_dag("GET", DagAccessEntity.TASK_INSTANCE)

Review Comment:
   Looking at the comments/code I looked at below I have some doubts and different idea.
   
   I think the original permission for task endpoints were just wrong:
   
   ```python
   @security.requires_access(
       [
           (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
           (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
       ],
   )
   ```
   
   It really should be:
   
   ```python
   @security.requires_access(
       [
           (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
           (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK),
       ],
   )
   ```
   
   I know we do not have "RESOURCE_TASK" - but this should be really how it should be done inititially. Those endpoints do not access task instance nor dag_run access - they just add `task` definition. I think we wrongly had only one `task_instance` resource where we should have two different resources for those.
   
   IMHO  - we have the opportunity to fix it (and that will allow us to avoid adding tuple handling from my above proposal.
   
   Why don't we simplify it all by 
   
   * `DagAccessEntity.DAG`
   * `DagAccessEntity.TASK`
   * `DagAccessEntity.RUN`
   * `DagAccessEntity.TASK_INSTANCE` 
   
   We should only need to specify TASK_INSTANCE entity. In FAB implementation we could map:
   
   * DagEntity.TASK => resource.TASK_INSTANCE and 
   * DagEntity.TASK_INSTANCE -> resource.DAG_RUN + resource.TASK_INSTANCE
   
   The fact that sometimes we specified "read" for DAG_RUN and "write" to "TASK_INSTANCE" resources was I think a mistake. I can't imagine situation where we would like to limit the user from writing to DAG_RUN where the user should be able to update TASK_INSTANCE. So this would automatically fix the bad permissions we had originally for those endpoints.



##########
airflow/api_connexion/endpoints/task_instance_endpoint.py:
##########
@@ -61,13 +61,8 @@
 T = TypeVar("T")
 
 
-@security.requires_access(
-    [
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
-    ],
-)
+@security.requires_access_dag("GET", DagAccessEntity.RUN)
+@security.requires_access_dag("GET", DagAccessEntity.TASK_INSTANCE)

Review Comment:
   Looking at the comments/code I looked at below I have some doubts and different idea.
   
   I think the original permissions for task endpoints were just wrong:
   
   ```python
   @security.requires_access(
       [
           (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
           (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
       ],
   )
   ```
   
   It really should be:
   
   ```python
   @security.requires_access(
       [
           (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
           (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK),
       ],
   )
   ```
   
   I know we do not have "RESOURCE_TASK" - but this should be really how it should be done inititially. Those endpoints do not access task instance nor dag_run access - they just add `task` definition. I think we wrongly had only one `task_instance` resource where we should have two different resources for those.
   
   IMHO  - we have the opportunity to fix it (and that will allow us to avoid adding tuple handling from my above proposal.
   
   Why don't we simplify it all by 
   
   * `DagAccessEntity.DAG`
   * `DagAccessEntity.TASK`
   * `DagAccessEntity.RUN`
   * `DagAccessEntity.TASK_INSTANCE` 
   
   We should only need to specify TASK_INSTANCE entity. In FAB implementation we could map:
   
   * DagEntity.TASK => resource.TASK_INSTANCE and 
   * DagEntity.TASK_INSTANCE -> resource.DAG_RUN + resource.TASK_INSTANCE
   
   The fact that sometimes we specified "read" for DAG_RUN and "write" to "TASK_INSTANCE" resources was I think a mistake. I can't imagine situation where we would like to limit the user from writing to DAG_RUN where the user should be able to update TASK_INSTANCE. So this would automatically fix the bad permissions we had originally for those endpoints.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1355994191


##########
airflow/www/security_manager.py:
##########
@@ -269,125 +272,47 @@ def get_user_roles(user=None):
             user = g.user
         return user.roles
 
-    def get_readable_dags(self, user) -> Iterable[DagModel]:
-        """Gets the DAGs readable by authenticated user."""
-        warnings.warn(
-            "`get_readable_dags` has been deprecated. Please use `get_readable_dag_ids` instead.",
-            RemovedInAirflow3Warning,
-            stacklevel=2,
-        )
-        with warnings.catch_warnings():
-            warnings.simplefilter("ignore", RemovedInAirflow3Warning)
-            return self.get_accessible_dags([permissions.ACTION_CAN_READ], user)
-
-    def get_editable_dags(self, user) -> Iterable[DagModel]:
-        """Gets the DAGs editable by authenticated user."""
-        warnings.warn(
-            "`get_editable_dags` has been deprecated. Please use `get_editable_dag_ids` instead.",
-            RemovedInAirflow3Warning,
-            stacklevel=2,
-        )
-        with warnings.catch_warnings():
-            warnings.simplefilter("ignore", RemovedInAirflow3Warning)
-            return self.get_accessible_dags([permissions.ACTION_CAN_EDIT], user)
-
-    @provide_session
-    def get_accessible_dags(
-        self,
-        user_actions: Container[str] | None,
-        user,
-        session: Session = NEW_SESSION,
-    ) -> Iterable[DagModel]:
-        warnings.warn(
-            "`get_accessible_dags` has been deprecated. Please use `get_accessible_dag_ids` instead.",
-            RemovedInAirflow3Warning,
-            stacklevel=3,
-        )
-        dag_ids = self.get_accessible_dag_ids(user, user_actions, session)
-        return session.scalars(select(DagModel).where(DagModel.dag_id.in_(dag_ids)))
-
-    def get_readable_dag_ids(self, user) -> set[str]:
+    def get_readable_dag_ids(self, user=None) -> set[str]:
         """Gets the DAG IDs readable by authenticated user."""
-        return self.get_accessible_dag_ids(user, [permissions.ACTION_CAN_READ])
+        return self.get_permitted_dag_ids(methods=["GET"], user=user)
 
-    def get_editable_dag_ids(self, user) -> set[str]:
+    def get_editable_dag_ids(self, user=None) -> set[str]:
         """Gets the DAG IDs editable by authenticated user."""
-        return self.get_accessible_dag_ids(user, [permissions.ACTION_CAN_EDIT])
+        return self.get_permitted_dag_ids(methods=["PUT"], user=user)
 
     @provide_session
-    def get_accessible_dag_ids(
+    def get_permitted_dag_ids(
         self,
-        user,
-        user_actions: Container[str] | None = None,
+        *,
+        methods: Container[ResourceMethod] | None = None,
+        user=None,
         session: Session = NEW_SESSION,
     ) -> set[str]:
         """Generic function to get readable or writable DAGs for user."""
-        if not user_actions:
-            user_actions = [permissions.ACTION_CAN_EDIT, permissions.ACTION_CAN_READ]
+        if not methods:
+            methods = ["PUT", "GET"]
 
-        if not get_auth_manager().is_logged_in():
-            roles = user.roles
-        else:
-            if (permissions.ACTION_CAN_EDIT in user_actions and self.can_edit_all_dags(user)) or (
-                permissions.ACTION_CAN_READ in user_actions and self.can_read_all_dags(user)
-            ):
-                return {dag.dag_id for dag in session.execute(select(DagModel.dag_id))}
-            user_query = session.scalar(
-                select(User)
-                .options(
-                    joinedload(User.roles)
-                    .subqueryload(Role.permissions)
-                    .options(joinedload(Permission.action), joinedload(Permission.resource))
-                )
-                .where(User.id == user.id)
-            )
-            roles = user_query.roles
-
-        resources = set()
-        for role in roles:
-            for permission in role.permissions:
-                action = permission.action.name
-                if action in user_actions:
-                    resource = permission.resource.name
-                    if resource == permissions.RESOURCE_DAG:
-                        return {dag.dag_id for dag in session.execute(select(DagModel.dag_id))}
-                    if resource.startswith(permissions.RESOURCE_DAG_PREFIX):
-                        resources.add(resource[len(permissions.RESOURCE_DAG_PREFIX) :])
-                    else:
-                        resources.add(resource)
-        return {
-            dag.dag_id
-            for dag in session.execute(select(DagModel.dag_id).where(DagModel.dag_id.in_(resources)))
-        }
-
-    def can_access_some_dags(self, action: str, dag_id: str | None = None) -> bool:
-        """Checks if user has read or write access to some dags."""
-        if dag_id and dag_id != "~":
-            root_dag_id = self._get_root_dag_id(dag_id)
-            return self.has_access(action, permissions.resource_name_for_dag(root_dag_id))
+        dag_ids = {dag.dag_id for dag in session.execute(select(DagModel.dag_id))}
 
-        user = g.user
-        if action == permissions.ACTION_CAN_READ:
-            return any(self.get_readable_dag_ids(user))
-        return any(self.get_editable_dag_ids(user))
+        if ("GET" in methods and get_auth_manager().is_authorized_dag(method="GET", user=user)) or (
+            "PUT" in methods and get_auth_manager().is_authorized_dag(method="PUT", user=user)
+        ):
+            return dag_ids
 
-    def can_read_dag(self, dag_id: str, user=None) -> bool:
-        """Determines whether a user has DAG read access."""
-        root_dag_id = self._get_root_dag_id(dag_id)
-        dag_resource_name = permissions.resource_name_for_dag(root_dag_id)
-        return self.has_access(permissions.ACTION_CAN_READ, dag_resource_name, user=user)
+        def _is_permitted_dag_id(method: ResourceMethod, methods: Container[ResourceMethod], dag_id: str):
+            return method in methods and get_auth_manager().is_authorized_dag(
+                method=method, details=DagDetails(id=dag_id), user=user
+            )
 
-    def can_edit_dag(self, dag_id: str, user=None) -> bool:
-        """Determines whether a user has DAG edit access."""
-        root_dag_id = self._get_root_dag_id(dag_id)
-        dag_resource_name = permissions.resource_name_for_dag(root_dag_id)
-        return self.has_access(permissions.ACTION_CAN_EDIT, dag_resource_name, user=user)
+        return {

Review Comment:
   I think if we leave it like this, it might incur a performance hit for FAB users who have only access to some dags. In the current implementation reading all dags as set is only done when the user has permission to read all dags in the role.
   
   Also it hampers any future multi-tenant implementation of auth manager, because basically it will not allow to optimize this via auth_manager. Basically for all users or all tenants, they will have to read all dag_ids to memory always when they want to see DAGs page. This will become a problem very quickly for any multi-tenant environment.
   
   I **think** the solution for that should be different. Rather than implementing all the loop here, the "get_permitted_dag_ids" should be implemented in the `auth_manager`.  
   
   Here we can do some basic checks for all dags, but then getting the list of allowed dags should be delegated to auth_manager.
   
   ```
           if ("GET" in methods and get_auth_manager().is_authorized_dag(method="GET", user=user)) or (
               "PUT" in methods and get_auth_manager().is_authorized_dag(method="PUT", user=user)
           ):
               return {dag.dag_id for dag in session.execute(select(DagModel.dag_id))}
   
          return get_auth_manager().get_permitted_dag_ids(methods=methods, user=user)
   ```
   
   Then each auth manager could implement an optimized version of the `get_permitted_dag_ids`. For FAB, it will be the original implementation with resources, but for others it could be different. Conceptually this for example:
   
   ```python
   tenant = get_tenant(user)
   return {dag.dag_id for dag in session.execute(select(DagModel.dag_id).filter_by(tenant=tenant)}
   ```
   
   This is no problem for auth manager to have two ways of checking permissions:
   
   1) when you have dag_id - you can check it individually
   2) when you get the list you have a method that returns the list based on same criteria but implemented without using the individual check
   
   This is the only way to implement performant version of such permission check IMHO.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1356003454


##########
airflow/www/security_manager.py:
##########
@@ -738,24 +633,9 @@ def create_perm_vm_for_all_dag(self) -> None:
     def check_authorization(
         self,
         perms: Sequence[tuple[str, str]] | None = None,
-        dag_id: str | None = None,
     ) -> bool:
         """Checks that the logged in user has the specified permissions."""
         if not perms:
             return True
 
-        for perm in perms:
-            if perm in (
-                (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
-                (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
-                (permissions.ACTION_CAN_DELETE, permissions.RESOURCE_DAG),
-            ):
-                can_access_all_dags = self.has_access(*perm)
-                if not can_access_all_dags:
-                    action = perm[0]
-                    if not self.can_access_some_dags(action, dag_id):
-                        return False
-            elif not self.has_access(*perm):
-                return False
-
-        return True
+        return all(self.has_access(*perm) for perm in perms)

Review Comment:
   Should we also deprecate / raise exception here? See https://github.com/apache/airflow/pull/34317/files#r1355974367



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1346976365


##########
airflow/api_connexion/endpoints/dag_run_endpoint.py:
##########
@@ -111,13 +102,8 @@ def get_dag_run(*, dag_id: str, dag_run_id: str, session: Session = NEW_SESSION)
     return dagrun_schema.dump(dag_run)
 
 
-@security.requires_access(
-    [
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DATASET),
-    ],
-)
+@security.requires_access_dag("GET", DagAccessEntity.RUN)
+@security.requires_access_dag("GET", DagAccessEntity.DATASET)

Review Comment:
   How is this different from `requires_access_dataset`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] o-nikolas commented on a diff in pull request #34317: Use auth manager `is_authorized_` APIs to check user permissions in Rest API

Posted by "o-nikolas (via GitHub)" <gi...@apache.org>.
o-nikolas commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1340542089


##########
airflow/api_connexion/endpoints/dag_endpoint.py:
##########
@@ -69,7 +69,7 @@ def get_dag_details(*, dag_id: str) -> APIResponse:
     return dag_detail_schema.dump(dag)
 
 
-@security.requires_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG)])
+@requires_authentication

Review Comment:
   Haha glad I could help rubber ducky :sweat_smile: :duck: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vincbeck commented on a diff in pull request #34317: Use auth manager `is_authorized_` APIs to check user permissions in Rest API

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1340310444


##########
airflow/api_connexion/endpoints/dag_endpoint.py:
##########
@@ -69,7 +69,7 @@ def get_dag_details(*, dag_id: str) -> APIResponse:
     return dag_detail_schema.dump(dag)
 
 
-@security.requires_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG)])
+@requires_authentication

Review Comment:
   Glad you asked :) Basically this method returns all the DAGs you have access to. If you look at line 99, there is `readable_dags = get_airflow_app().appbuilder.sm.get_permitted_dag_ids(g.user)` which return the list of DAG ids the user has read access to. Setting `@security.requires_access_dag("GET")` would check if the user has access to all DAGs but here this is not what we want to check.
   
   So I figure we could remove the authorization check since if the user has no access to DAGs, `get_permitted_dag_ids` will return an empty list.
   
   I dont think this is perfect so if you think something is off here, please call it out :).
   
   While writing this comment I am wondering if `@security.requires_access_dag("GET")` should mean "does the user have access to one DAG" instead of "does the user have access to all DAGs". Thoughts? I dont think checking whether the user has access to all DAGs in Airflow makes sense.
   
   That way we could decorate this function with `@security.requires_access_dag("GET")` as you suggested.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1354604953


##########
airflow/api_connexion/security.py:
##########
@@ -48,10 +61,194 @@ def requires_access_decorator(func: T):
         @wraps(func)
         def decorated(*args, **kwargs):
             check_authentication()
-            if appbuilder.sm.check_authorization(permissions, kwargs.get("dag_id")):
+            if appbuilder.sm.check_authorization(permissions):
                 return func(*args, **kwargs)
             raise PermissionDenied()
 
         return cast(T, decorated)
 
     return requires_access_decorator
+
+
+def _requires_access(*, is_authorized_callback: Callable[[], bool], func: Callable, args, kwargs) -> bool:
+    """
+    Define the behavior whether the user is authorized to access the resource.
+
+    :param is_authorized_callback: callback to execute to figure whether the user is authorized to access
+        the resource
+    :param func: the function to call if the user is authorized
+    :param args: the arguments of ``func``
+    :param kwargs: the keyword arguments ``func``
+
+    :meta private:
+    """
+    check_authentication()
+    if is_authorized_callback():
+        return func(*args, **kwargs)
+    raise PermissionDenied()
+
+
+def requires_authentication(func: T):
+    """Decorator for functions that require authentication."""
+
+    @wraps(func)
+    def decorated(*args, **kwargs):
+        check_authentication()
+        return func(*args, **kwargs)
+
+    return cast(T, decorated)
+
+
+def requires_access_configuration(method: ResourceMethod) -> Callable[[T], T]:
+    def requires_access_decorator(func: T):
+        @wraps(func)
+        def decorated(*args, **kwargs):
+            section: str | None = kwargs.get("section")
+            return _requires_access(
+                is_authorized_callback=lambda: get_auth_manager().is_authorized_configuration(
+                    method=method, details=ConfigurationDetails(section=section)
+                ),
+                func=func,
+                args=args,
+                kwargs=kwargs,
+            )
+
+        return cast(T, decorated)
+
+    return requires_access_decorator
+
+
+def requires_access_connection(method: ResourceMethod) -> Callable[[T], T]:
+    def requires_access_decorator(func: T):
+        @wraps(func)
+        def decorated(*args, **kwargs):
+            connection_id: str | None = kwargs.get("connection_id")
+            return _requires_access(
+                is_authorized_callback=lambda: get_auth_manager().is_authorized_connection(
+                    method=method, details=ConnectionDetails(conn_id=connection_id)
+                ),
+                func=func,
+                args=args,
+                kwargs=kwargs,
+            )
+
+        return cast(T, decorated)
+
+    return requires_access_decorator
+
+
+def requires_access_dag(
+    method: ResourceMethod, access_entity: DagAccessEntity | None = None
+) -> Callable[[T], T]:
+    appbuilder = get_airflow_app().appbuilder
+
+    def _is_authorized_callback(dag_id: str):
+        def callback():
+            access = get_auth_manager().is_authorized_dag(
+                method=method,
+                access_entity=access_entity,
+                details=DagDetails(id=dag_id),
+            )
+
+            # ``access`` means here:
+            # - if a DAG id is provided (``dag_id`` not None): is the user authorized to access this DAG
+            # - if no DAG id is provided: is the user authorized to access all DAGs
+            if dag_id or access:
+                return access
+
+            # No DAG id is provided and the user is not authorized to access all DAGs
+            # If method is "GET", return whether the user has read access to any DAGs
+            # If method is "PUT", return whether the user has edit access to any DAGs
+            return (method == "GET" and any(appbuilder.sm.get_readable_dag_ids())) or (
+                method == "PUT" and any(appbuilder.sm.get_editable_dag_ids())
+            )
+
+        return callback
+
+    def requires_access_decorator(func: T):
+        @wraps(func)
+        def decorated(*args, **kwargs):

Review Comment:
   These can probably benefit from ParamSpec. Doesn’t have to be a part of this PR though.



##########
airflow/api_connexion/security.py:
##########
@@ -48,10 +61,194 @@ def requires_access_decorator(func: T):
         @wraps(func)
         def decorated(*args, **kwargs):
             check_authentication()
-            if appbuilder.sm.check_authorization(permissions, kwargs.get("dag_id")):
+            if appbuilder.sm.check_authorization(permissions):
                 return func(*args, **kwargs)
             raise PermissionDenied()
 
         return cast(T, decorated)
 
     return requires_access_decorator
+
+
+def _requires_access(*, is_authorized_callback: Callable[[], bool], func: Callable, args, kwargs) -> bool:
+    """
+    Define the behavior whether the user is authorized to access the resource.
+
+    :param is_authorized_callback: callback to execute to figure whether the user is authorized to access
+        the resource
+    :param func: the function to call if the user is authorized
+    :param args: the arguments of ``func``
+    :param kwargs: the keyword arguments ``func``
+
+    :meta private:
+    """
+    check_authentication()
+    if is_authorized_callback():
+        return func(*args, **kwargs)
+    raise PermissionDenied()
+
+
+def requires_authentication(func: T):
+    """Decorator for functions that require authentication."""
+
+    @wraps(func)
+    def decorated(*args, **kwargs):
+        check_authentication()
+        return func(*args, **kwargs)
+
+    return cast(T, decorated)
+
+
+def requires_access_configuration(method: ResourceMethod) -> Callable[[T], T]:
+    def requires_access_decorator(func: T):
+        @wraps(func)
+        def decorated(*args, **kwargs):
+            section: str | None = kwargs.get("section")
+            return _requires_access(
+                is_authorized_callback=lambda: get_auth_manager().is_authorized_configuration(
+                    method=method, details=ConfigurationDetails(section=section)
+                ),
+                func=func,
+                args=args,
+                kwargs=kwargs,
+            )
+
+        return cast(T, decorated)
+
+    return requires_access_decorator
+
+
+def requires_access_connection(method: ResourceMethod) -> Callable[[T], T]:
+    def requires_access_decorator(func: T):
+        @wraps(func)
+        def decorated(*args, **kwargs):
+            connection_id: str | None = kwargs.get("connection_id")
+            return _requires_access(
+                is_authorized_callback=lambda: get_auth_manager().is_authorized_connection(
+                    method=method, details=ConnectionDetails(conn_id=connection_id)
+                ),
+                func=func,
+                args=args,
+                kwargs=kwargs,
+            )
+
+        return cast(T, decorated)
+
+    return requires_access_decorator
+
+
+def requires_access_dag(
+    method: ResourceMethod, access_entity: DagAccessEntity | None = None
+) -> Callable[[T], T]:
+    appbuilder = get_airflow_app().appbuilder
+
+    def _is_authorized_callback(dag_id: str):
+        def callback():
+            access = get_auth_manager().is_authorized_dag(
+                method=method,
+                access_entity=access_entity,
+                details=DagDetails(id=dag_id),
+            )
+
+            # ``access`` means here:
+            # - if a DAG id is provided (``dag_id`` not None): is the user authorized to access this DAG
+            # - if no DAG id is provided: is the user authorized to access all DAGs
+            if dag_id or access:
+                return access
+
+            # No DAG id is provided and the user is not authorized to access all DAGs
+            # If method is "GET", return whether the user has read access to any DAGs
+            # If method is "PUT", return whether the user has edit access to any DAGs
+            return (method == "GET" and any(appbuilder.sm.get_readable_dag_ids())) or (
+                method == "PUT" and any(appbuilder.sm.get_editable_dag_ids())
+            )
+
+        return callback
+
+    def requires_access_decorator(func: T):
+        @wraps(func)
+        def decorated(*args, **kwargs):

Review Comment:
   These can probably benefit from ParamSpec. Doesn’t have to be a part of this PR though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1355902071


##########
airflow/api_connexion/endpoints/task_instance_endpoint.py:
##########
@@ -461,13 +436,8 @@ def get_task_instances_batch(session: Session = NEW_SESSION) -> APIResponse:
     )
 
 
-@security.requires_access(
-    [
-        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
-        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
-    ],
-)
+@security.requires_access_dag("GET", DagAccessEntity.RUN)

Review Comment:
   I think this change revealed a bug in the original permissions. Both RUN and TASK_INSTANCE should be PUT (the code actually modifies DagRun state. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1355925044


##########
airflow/api_connexion/endpoints/xcom_endpoint.py:
##########
@@ -39,14 +39,7 @@
     from airflow.api_connexion.types import APIResponse
 
 
-@security.requires_access(
-    [
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_XCOM),
-    ],
-)
+@security.requires_access_dag("GET", DagAccessEntity.XCOM)

Review Comment:
   Yes. I think those "Resource" specifications were really too fine grained/unnecessary (see comment above https://github.com/apache/airflow/pull/34317/files#r1355917769). Also see the "philosophy" change we want to have https://github.com/apache/airflow/pull/34317/files#r1355923325. 
   
   The Auth Manager should not be asked if the user needs to access all the multiple resources (XCom, TI, DagRun, dag) -  we just want to check if the user can read this particular Xcom value. For FAB it might be mapped to those 4 resources, but other auth managers might make different decisions - andthe user might be able to only read XCOM data (but not DAG_RUN or DAG - this is entirely plausible.).
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1354608628


##########
airflow/www/extensions/init_jinja_globals.py:
##########
@@ -69,10 +70,13 @@ def prepare_jinja_globals():
             "git_version": git_version,
             "k8s_or_k8scelery_executor": IS_K8S_OR_K8SCELERY_EXECUTOR,
             "rest_api_enabled": False,
-            "auth_manager": get_auth_manager(),
             "config_test_connection": conf.get("core", "test_connection", fallback="Disabled"),
         }
 
+        # Extra global specific to auth manager
+        extra_globals["auth_manager"] = get_auth_manager()
+        extra_globals["DagDetails"] = DagDetails

Review Comment:
   Are these only for `dag.html`? I wonder if there’s a better way, maybe calculate `can_edit` in views instead.



##########
airflow/www/extensions/init_jinja_globals.py:
##########
@@ -69,10 +70,13 @@ def prepare_jinja_globals():
             "git_version": git_version,
             "k8s_or_k8scelery_executor": IS_K8S_OR_K8SCELERY_EXECUTOR,
             "rest_api_enabled": False,
-            "auth_manager": get_auth_manager(),
             "config_test_connection": conf.get("core", "test_connection", fallback="Disabled"),
         }
 
+        # Extra global specific to auth manager
+        extra_globals["auth_manager"] = get_auth_manager()
+        extra_globals["DagDetails"] = DagDetails

Review Comment:
   Are these only for `dag.html`? I wonder if there’s a better way, maybe calculate `can_edit` in views instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1356999376


##########
airflow/www/extensions/init_jinja_globals.py:
##########
@@ -69,10 +70,13 @@ def prepare_jinja_globals():
             "git_version": git_version,
             "k8s_or_k8scelery_executor": IS_K8S_OR_K8SCELERY_EXECUTOR,
             "rest_api_enabled": False,
-            "auth_manager": get_auth_manager(),
             "config_test_connection": conf.get("core", "test_connection", fallback="Disabled"),
         }
 
+        # Extra global specific to auth manager
+        extra_globals["auth_manager"] = get_auth_manager()
+        extra_globals["DagDetails"] = DagDetails

Review Comment:
   Nice, I like it. I no longer pass the auth manager to views in `prepare_jinja_globals` but in `render_template ` now. For context, we still need to pass it to the view because we use it, for instance, to figure whether the user is logged in



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1357158265


##########
airflow/www/security_manager.py:
##########
@@ -269,125 +272,47 @@ def get_user_roles(user=None):
             user = g.user
         return user.roles
 
-    def get_readable_dags(self, user) -> Iterable[DagModel]:
-        """Gets the DAGs readable by authenticated user."""
-        warnings.warn(
-            "`get_readable_dags` has been deprecated. Please use `get_readable_dag_ids` instead.",
-            RemovedInAirflow3Warning,
-            stacklevel=2,
-        )
-        with warnings.catch_warnings():
-            warnings.simplefilter("ignore", RemovedInAirflow3Warning)
-            return self.get_accessible_dags([permissions.ACTION_CAN_READ], user)
-
-    def get_editable_dags(self, user) -> Iterable[DagModel]:
-        """Gets the DAGs editable by authenticated user."""
-        warnings.warn(
-            "`get_editable_dags` has been deprecated. Please use `get_editable_dag_ids` instead.",
-            RemovedInAirflow3Warning,
-            stacklevel=2,
-        )
-        with warnings.catch_warnings():
-            warnings.simplefilter("ignore", RemovedInAirflow3Warning)
-            return self.get_accessible_dags([permissions.ACTION_CAN_EDIT], user)
-
-    @provide_session
-    def get_accessible_dags(
-        self,
-        user_actions: Container[str] | None,
-        user,
-        session: Session = NEW_SESSION,
-    ) -> Iterable[DagModel]:
-        warnings.warn(
-            "`get_accessible_dags` has been deprecated. Please use `get_accessible_dag_ids` instead.",
-            RemovedInAirflow3Warning,
-            stacklevel=3,
-        )
-        dag_ids = self.get_accessible_dag_ids(user, user_actions, session)
-        return session.scalars(select(DagModel).where(DagModel.dag_id.in_(dag_ids)))
-
-    def get_readable_dag_ids(self, user) -> set[str]:
+    def get_readable_dag_ids(self, user=None) -> set[str]:
         """Gets the DAG IDs readable by authenticated user."""
-        return self.get_accessible_dag_ids(user, [permissions.ACTION_CAN_READ])
+        return self.get_permitted_dag_ids(methods=["GET"], user=user)
 
-    def get_editable_dag_ids(self, user) -> set[str]:
+    def get_editable_dag_ids(self, user=None) -> set[str]:
         """Gets the DAG IDs editable by authenticated user."""
-        return self.get_accessible_dag_ids(user, [permissions.ACTION_CAN_EDIT])
+        return self.get_permitted_dag_ids(methods=["PUT"], user=user)
 
     @provide_session
-    def get_accessible_dag_ids(
+    def get_permitted_dag_ids(
         self,
-        user,
-        user_actions: Container[str] | None = None,
+        *,
+        methods: Container[ResourceMethod] | None = None,
+        user=None,
         session: Session = NEW_SESSION,
     ) -> set[str]:
         """Generic function to get readable or writable DAGs for user."""
-        if not user_actions:
-            user_actions = [permissions.ACTION_CAN_EDIT, permissions.ACTION_CAN_READ]
+        if not methods:
+            methods = ["PUT", "GET"]
 
-        if not get_auth_manager().is_logged_in():
-            roles = user.roles
-        else:
-            if (permissions.ACTION_CAN_EDIT in user_actions and self.can_edit_all_dags(user)) or (
-                permissions.ACTION_CAN_READ in user_actions and self.can_read_all_dags(user)
-            ):
-                return {dag.dag_id for dag in session.execute(select(DagModel.dag_id))}
-            user_query = session.scalar(
-                select(User)
-                .options(
-                    joinedload(User.roles)
-                    .subqueryload(Role.permissions)
-                    .options(joinedload(Permission.action), joinedload(Permission.resource))
-                )
-                .where(User.id == user.id)
-            )
-            roles = user_query.roles
-
-        resources = set()
-        for role in roles:
-            for permission in role.permissions:
-                action = permission.action.name
-                if action in user_actions:
-                    resource = permission.resource.name
-                    if resource == permissions.RESOURCE_DAG:
-                        return {dag.dag_id for dag in session.execute(select(DagModel.dag_id))}
-                    if resource.startswith(permissions.RESOURCE_DAG_PREFIX):
-                        resources.add(resource[len(permissions.RESOURCE_DAG_PREFIX) :])
-                    else:
-                        resources.add(resource)
-        return {
-            dag.dag_id
-            for dag in session.execute(select(DagModel.dag_id).where(DagModel.dag_id.in_(resources)))
-        }
-
-    def can_access_some_dags(self, action: str, dag_id: str | None = None) -> bool:
-        """Checks if user has read or write access to some dags."""
-        if dag_id and dag_id != "~":
-            root_dag_id = self._get_root_dag_id(dag_id)
-            return self.has_access(action, permissions.resource_name_for_dag(root_dag_id))
+        dag_ids = {dag.dag_id for dag in session.execute(select(DagModel.dag_id))}
 
-        user = g.user
-        if action == permissions.ACTION_CAN_READ:
-            return any(self.get_readable_dag_ids(user))
-        return any(self.get_editable_dag_ids(user))
+        if ("GET" in methods and get_auth_manager().is_authorized_dag(method="GET", user=user)) or (
+            "PUT" in methods and get_auth_manager().is_authorized_dag(method="PUT", user=user)
+        ):
+            return dag_ids
 
-    def can_read_dag(self, dag_id: str, user=None) -> bool:
-        """Determines whether a user has DAG read access."""
-        root_dag_id = self._get_root_dag_id(dag_id)
-        dag_resource_name = permissions.resource_name_for_dag(root_dag_id)
-        return self.has_access(permissions.ACTION_CAN_READ, dag_resource_name, user=user)
+        def _is_permitted_dag_id(method: ResourceMethod, methods: Container[ResourceMethod], dag_id: str):
+            return method in methods and get_auth_manager().is_authorized_dag(
+                method=method, details=DagDetails(id=dag_id), user=user
+            )
 
-    def can_edit_dag(self, dag_id: str, user=None) -> bool:
-        """Determines whether a user has DAG edit access."""
-        root_dag_id = self._get_root_dag_id(dag_id)
-        dag_resource_name = permissions.resource_name_for_dag(root_dag_id)
-        return self.has_access(permissions.ACTION_CAN_EDIT, dag_resource_name, user=user)
+        return {

Review Comment:
   I see. I was also worried about the performance issue. The only concern I have with this solution is then we will have to implement such function for every resource we want be tenant aware (connections, variables, ...). All auth managers will have to implement methods like `get_permitted_connections`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1360872279


##########
airflow/auth/managers/fab/fab_auth_manager.py:
##########
@@ -65,22 +79,24 @@
         CLICommand,
     )
 
-_MAP_METHOD_NAME_TO_FAB_ACTION_NAME: dict[ResourceMethod, str] = {
+MAP_METHOD_NAME_TO_FAB_ACTION_NAME: dict[ResourceMethod, str] = {
     "POST": ACTION_CAN_CREATE,
     "GET": ACTION_CAN_READ,
     "PUT": ACTION_CAN_EDIT,
     "DELETE": ACTION_CAN_DELETE,
 }
 
-_MAP_DAG_ACCESS_ENTITY_TO_FAB_RESOURCE_TYPE = {
-    DagAccessEntity.AUDIT_LOG: RESOURCE_AUDIT_LOG,
-    DagAccessEntity.CODE: RESOURCE_DAG_CODE,
-    DagAccessEntity.DATASET: RESOURCE_DATASET,
-    DagAccessEntity.DEPENDENCIES: RESOURCE_DAG_DEPENDENCIES,
-    DagAccessEntity.RUN: RESOURCE_DAG_RUN,
-    DagAccessEntity.TASK_INSTANCE: RESOURCE_TASK_INSTANCE,
-    DagAccessEntity.TASK_LOGS: RESOURCE_TASK_LOG,
-    DagAccessEntity.XCOM: RESOURCE_XCOM,
+_MAP_DAG_ACCESS_ENTITY_TO_FAB_RESOURCE_TYPE: dict[DagAccessEntity, tuple[str, ...]] = {
+    DagAccessEntity.AUDIT_LOG: (RESOURCE_AUDIT_LOG,),
+    DagAccessEntity.CODE: (RESOURCE_DAG_CODE,),
+    DagAccessEntity.DEPENDENCIES: (RESOURCE_DAG_DEPENDENCIES,),
+    DagAccessEntity.IMPORT_ERRORS: (RESOURCE_IMPORT_ERROR,),
+    DagAccessEntity.RUN: (RESOURCE_DAG_RUN,),
+    DagAccessEntity.TASK: (RESOURCE_TASK_INSTANCE,),

Review Comment:
   Sounds like a good idea :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck commented on PR #34317:
URL: https://github.com/apache/airflow/pull/34317#issuecomment-1745380219

   All comments are addressed. Reviews are welcome :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1346974337


##########
airflow/api_connexion/endpoints/dag_run_endpoint.py:
##########
@@ -76,12 +77,7 @@
 RESOURCE_EVENT_PREFIX = "dag_run"
 
 
-@security.requires_access(
-    [
-        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
-        (permissions.ACTION_CAN_DELETE, permissions.RESOURCE_DAG_RUN),
-    ],
-)
+@security.requires_access_dag("DELETE", DagAccessEntity.RUN)

Review Comment:
   Makes me wonder if it’s worthwhile to have a `requires_access_dag_run` shorthand



##########
airflow/api_connexion/endpoints/dag_run_endpoint.py:
##########
@@ -76,12 +77,7 @@
 RESOURCE_EVENT_PREFIX = "dag_run"
 
 
-@security.requires_access(
-    [
-        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
-        (permissions.ACTION_CAN_DELETE, permissions.RESOURCE_DAG_RUN),
-    ],
-)
+@security.requires_access_dag("DELETE", DagAccessEntity.RUN)

Review Comment:
   Makes me wonder if it’s worthwhile to have a `requires_access_dag_run` shorthand



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1347821224


##########
airflow/api_connexion/endpoints/dag_run_endpoint.py:
##########
@@ -76,12 +77,7 @@
 RESOURCE_EVENT_PREFIX = "dag_run"
 
 
-@security.requires_access(
-    [
-        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
-        (permissions.ACTION_CAN_DELETE, permissions.RESOURCE_DAG_RUN),
-    ],
-)
+@security.requires_access_dag("DELETE", DagAccessEntity.RUN)

Review Comment:
   I guess it really depends on style but the reason I did that is I like having a mapping 1:1 between the decorators `@security.requires_access_*` and APIs in auth manager



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vincbeck commented on a diff in pull request #34317: Use auth manager `is_authorized_` APIs to check user permissions in Rest API

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1340310444


##########
airflow/api_connexion/endpoints/dag_endpoint.py:
##########
@@ -69,7 +69,7 @@ def get_dag_details(*, dag_id: str) -> APIResponse:
     return dag_detail_schema.dump(dag)
 
 
-@security.requires_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG)])
+@requires_authentication

Review Comment:
   Glad you asked :) Basically this method returns all the DAGs you have access to. If you look at line 99, there is `readable_dags = get_airflow_app().appbuilder.sm.get_permitted_dag_ids(g.user)` which return the list of DAG ids the user has read access to. Setting `@security.requires_access_dag("GET")` would check if the user has access to all DAGs but here this is not what we want to check.
   
   So I figure we could remove the authorization check since if the user has no access to DAGs, `get_permitted_dag_ids` will return an empty list.
   
   I dont think this is perfect so if you think something is off here, please call it out :).
   
   While writing this comment I am wondering if `@security.requires_access_dag("GET")` should mean "does the user have access to one DAG" instead of "does the user have access to all DAGs". Thoughts? I dont think checking whether the user has access to all DAGs in Airflow makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vincbeck commented on a diff in pull request #34317: Use auth manager `is_authorized_` APIs to check user permissions in Rest API

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1340295153


##########
airflow/auth/managers/base_auth_manager.py:
##########
@@ -82,12 +86,14 @@ def is_authorized_configuration(
         self,
         *,
         method: ResourceMethod,
+        details: ConfigurationDetails | None = None,
         user: BaseUser | None = None,
     ) -> bool:
         """
         Return whether the user is authorized to perform a given action on configuration.
 
         :param method: the method to perform
+        :param details: optional details about the connection

Review Comment:
   Nice catch :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1355579727


##########
airflow/api_connexion/endpoints/task_endpoint.py:
##########
@@ -22,21 +22,16 @@
 from airflow.api_connexion import security
 from airflow.api_connexion.exceptions import BadRequest, NotFound
 from airflow.api_connexion.schemas.task_schema import TaskCollection, task_collection_schema, task_schema
+from airflow.auth.managers.models.resource_details import DagAccessEntity
 from airflow.exceptions import TaskNotFound
-from airflow.security import permissions
 from airflow.utils.airflow_flask_app import get_airflow_app
 
 if TYPE_CHECKING:
     from airflow import DAG
     from airflow.api_connexion.types import APIResponse
 
 
-@security.requires_access(
-    [
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
-    ],
-)
+@security.requires_access_dag("GET", DagAccessEntity.TASK_INSTANCE)

Review Comment:
   Done :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1355894015


##########
airflow/api_connexion/endpoints/task_instance_endpoint.py:
##########
@@ -61,13 +61,8 @@
 T = TypeVar("T")
 
 
-@security.requires_access(
-    [
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
-    ],
-)
+@security.requires_access_dag("GET", DagAccessEntity.RUN)
+@security.requires_access_dag("GET", DagAccessEntity.TASK_INSTANCE)

Review Comment:
   Finally got to review it after the break (and this and next week will have a lot of focus on those).
   
   I think this really calls for small refactoring: 
   
   ```python
   def requires_access_dag(
       method: ResourceMethod, access_entities: tuple[DAGAccessEntity] | DagAccessEntity | None = None
   ```
   
   and then:
   
   ```python
   @security.requires_access_dag("GET", (DagAccessEntity.RUN, DagAccessEntity.TASK_INSTANCE))
   ```
   
   and convert `access_entity` to `access_entities: tuple[DAGDetails]` (we can convert single dag detail to one-element tuple if we want to keep simplicity of decorators for most cases).
   
   Not sure if now or as a follow up to this PR, but I am quite sure this will be much better for performance point of view (one callback instead of two and it will be pretty much impossible to optimize it if we have two decorators. 
   
   And since it will change the callback, it's likely better to do it it now.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1357126074


##########
airflow/api_connexion/security.py:
##########
@@ -48,10 +61,194 @@ def requires_access_decorator(func: T):
         @wraps(func)
         def decorated(*args, **kwargs):
             check_authentication()
-            if appbuilder.sm.check_authorization(permissions, kwargs.get("dag_id")):
+            if appbuilder.sm.check_authorization(permissions):

Review Comment:
   I was thinking removing this one once the user and roles endpoints have been migrated but I guess it is the same case as `has_access` decorator and we should keep it. I will:
   - Raise a deprecation warning when used
   - Copy it to the FAB auth manager and call this one from the one above 
   
   > Should we raise exception here if auth_manager is not FAB in case we run a function decorated with it?
   
   I really want to avoid doing `if not isistance(auth_manager, FabAuthManager` as much as possible. I think deprecation warnings are enough.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1361283764


##########
airflow/api_connexion/security.py:
##########
@@ -48,10 +61,194 @@ def requires_access_decorator(func: T):
         @wraps(func)
         def decorated(*args, **kwargs):
             check_authentication()
-            if appbuilder.sm.check_authorization(permissions, kwargs.get("dag_id")):
+            if appbuilder.sm.check_authorization(permissions):
                 return func(*args, **kwargs)
             raise PermissionDenied()
 
         return cast(T, decorated)
 
     return requires_access_decorator
+
+
+def _requires_access(*, is_authorized_callback: Callable[[], bool], func: Callable, args, kwargs) -> bool:
+    """
+    Define the behavior whether the user is authorized to access the resource.
+
+    :param is_authorized_callback: callback to execute to figure whether the user is authorized to access
+        the resource
+    :param func: the function to call if the user is authorized
+    :param args: the arguments of ``func``
+    :param kwargs: the keyword arguments ``func``
+
+    :meta private:
+    """
+    check_authentication()
+    if is_authorized_callback():
+        return func(*args, **kwargs)
+    raise PermissionDenied()
+
+
+def requires_authentication(func: T):
+    """Decorator for functions that require authentication."""
+
+    @wraps(func)
+    def decorated(*args, **kwargs):
+        check_authentication()
+        return func(*args, **kwargs)
+
+    return cast(T, decorated)
+
+
+def requires_access_configuration(method: ResourceMethod) -> Callable[[T], T]:
+    def requires_access_decorator(func: T):
+        @wraps(func)
+        def decorated(*args, **kwargs):
+            section: str | None = kwargs.get("section")
+            return _requires_access(
+                is_authorized_callback=lambda: get_auth_manager().is_authorized_configuration(
+                    method=method, details=ConfigurationDetails(section=section)
+                ),
+                func=func,
+                args=args,
+                kwargs=kwargs,
+            )
+
+        return cast(T, decorated)
+
+    return requires_access_decorator
+
+
+def requires_access_connection(method: ResourceMethod) -> Callable[[T], T]:
+    def requires_access_decorator(func: T):
+        @wraps(func)
+        def decorated(*args, **kwargs):
+            connection_id: str | None = kwargs.get("connection_id")
+            return _requires_access(
+                is_authorized_callback=lambda: get_auth_manager().is_authorized_connection(
+                    method=method, details=ConnectionDetails(conn_id=connection_id)
+                ),
+                func=func,
+                args=args,
+                kwargs=kwargs,
+            )
+
+        return cast(T, decorated)
+
+    return requires_access_decorator
+
+
+def requires_access_dag(
+    method: ResourceMethod, access_entity: DagAccessEntity | None = None
+) -> Callable[[T], T]:
+    appbuilder = get_airflow_app().appbuilder
+
+    def _is_authorized_callback(dag_id: str):
+        def callback():
+            access = get_auth_manager().is_authorized_dag(
+                method=method,
+                access_entity=access_entity,
+                details=DagDetails(id=dag_id),
+            )
+
+            # ``access`` means here:
+            # - if a DAG id is provided (``dag_id`` not None): is the user authorized to access this DAG
+            # - if no DAG id is provided: is the user authorized to access all DAGs
+            if dag_id or access:
+                return access
+
+            # No DAG id is provided and the user is not authorized to access all DAGs
+            # If method is "GET", return whether the user has read access to any DAGs
+            # If method is "PUT", return whether the user has edit access to any DAGs
+            return (method == "GET" and any(appbuilder.sm.get_readable_dag_ids())) or (

Review Comment:
   Done. I agree. I like it that way and open for optimization on the auth managers side :+1:



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1358503642


##########
airflow/www/extensions/init_jinja_globals.py:
##########
@@ -69,10 +70,13 @@ def prepare_jinja_globals():
             "git_version": git_version,
             "k8s_or_k8scelery_executor": IS_K8S_OR_K8SCELERY_EXECUTOR,
             "rest_api_enabled": False,
-            "auth_manager": get_auth_manager(),
             "config_test_connection": conf.get("core", "test_connection", fallback="Disabled"),
         }
 
+        # Extra global specific to auth manager
+        extra_globals["auth_manager"] = get_auth_manager()
+        extra_globals["DagDetails"] = DagDetails

Review Comment:
   From the tests it does not seem to work:
   
   ```
   FAILED tests/www/views/test_views_acl.py::test_success[rendered-templates-all-dag-user] - jinja2.exceptions.UndefinedError: 'can_edit' is undefined
   FAILED tests/www/views/test_views_acl.py::test_success[task] - AssertionError: Couldn't find 'Task Instance Details'
   FAILED tests/www/views/test_views_acl.py::test_success[xcom] - jinja2.exceptions.UndefinedError: 'can_edit' is undefined
   FAILED tests/www/views/test_views_acl.py::test_success[grid-data-for-readonly-role] - AssertionError: Couldn't find 'runme_1'
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1360871612


##########
airflow/www/security_manager.py:
##########
@@ -269,125 +272,47 @@ def get_user_roles(user=None):
             user = g.user
         return user.roles
 
-    def get_readable_dags(self, user) -> Iterable[DagModel]:
-        """Gets the DAGs readable by authenticated user."""
-        warnings.warn(
-            "`get_readable_dags` has been deprecated. Please use `get_readable_dag_ids` instead.",
-            RemovedInAirflow3Warning,
-            stacklevel=2,
-        )
-        with warnings.catch_warnings():
-            warnings.simplefilter("ignore", RemovedInAirflow3Warning)
-            return self.get_accessible_dags([permissions.ACTION_CAN_READ], user)
-
-    def get_editable_dags(self, user) -> Iterable[DagModel]:
-        """Gets the DAGs editable by authenticated user."""
-        warnings.warn(
-            "`get_editable_dags` has been deprecated. Please use `get_editable_dag_ids` instead.",
-            RemovedInAirflow3Warning,
-            stacklevel=2,
-        )
-        with warnings.catch_warnings():
-            warnings.simplefilter("ignore", RemovedInAirflow3Warning)
-            return self.get_accessible_dags([permissions.ACTION_CAN_EDIT], user)
-
-    @provide_session
-    def get_accessible_dags(
-        self,
-        user_actions: Container[str] | None,
-        user,
-        session: Session = NEW_SESSION,
-    ) -> Iterable[DagModel]:
-        warnings.warn(
-            "`get_accessible_dags` has been deprecated. Please use `get_accessible_dag_ids` instead.",
-            RemovedInAirflow3Warning,
-            stacklevel=3,
-        )
-        dag_ids = self.get_accessible_dag_ids(user, user_actions, session)
-        return session.scalars(select(DagModel).where(DagModel.dag_id.in_(dag_ids)))
-
-    def get_readable_dag_ids(self, user) -> set[str]:
+    def get_readable_dag_ids(self, user=None) -> set[str]:
         """Gets the DAG IDs readable by authenticated user."""
-        return self.get_accessible_dag_ids(user, [permissions.ACTION_CAN_READ])
+        return self.get_permitted_dag_ids(methods=["GET"], user=user)
 
-    def get_editable_dag_ids(self, user) -> set[str]:
+    def get_editable_dag_ids(self, user=None) -> set[str]:
         """Gets the DAG IDs editable by authenticated user."""
-        return self.get_accessible_dag_ids(user, [permissions.ACTION_CAN_EDIT])
+        return self.get_permitted_dag_ids(methods=["PUT"], user=user)
 
     @provide_session
-    def get_accessible_dag_ids(
+    def get_permitted_dag_ids(
         self,
-        user,
-        user_actions: Container[str] | None = None,
+        *,
+        methods: Container[ResourceMethod] | None = None,
+        user=None,
         session: Session = NEW_SESSION,
     ) -> set[str]:
         """Generic function to get readable or writable DAGs for user."""
-        if not user_actions:
-            user_actions = [permissions.ACTION_CAN_EDIT, permissions.ACTION_CAN_READ]
+        if not methods:
+            methods = ["PUT", "GET"]
 
-        if not get_auth_manager().is_logged_in():
-            roles = user.roles
-        else:
-            if (permissions.ACTION_CAN_EDIT in user_actions and self.can_edit_all_dags(user)) or (
-                permissions.ACTION_CAN_READ in user_actions and self.can_read_all_dags(user)
-            ):
-                return {dag.dag_id for dag in session.execute(select(DagModel.dag_id))}
-            user_query = session.scalar(
-                select(User)
-                .options(
-                    joinedload(User.roles)
-                    .subqueryload(Role.permissions)
-                    .options(joinedload(Permission.action), joinedload(Permission.resource))
-                )
-                .where(User.id == user.id)
-            )
-            roles = user_query.roles
-
-        resources = set()
-        for role in roles:
-            for permission in role.permissions:
-                action = permission.action.name
-                if action in user_actions:
-                    resource = permission.resource.name
-                    if resource == permissions.RESOURCE_DAG:
-                        return {dag.dag_id for dag in session.execute(select(DagModel.dag_id))}
-                    if resource.startswith(permissions.RESOURCE_DAG_PREFIX):
-                        resources.add(resource[len(permissions.RESOURCE_DAG_PREFIX) :])
-                    else:
-                        resources.add(resource)
-        return {
-            dag.dag_id
-            for dag in session.execute(select(DagModel.dag_id).where(DagModel.dag_id.in_(resources)))
-        }
-
-    def can_access_some_dags(self, action: str, dag_id: str | None = None) -> bool:
-        """Checks if user has read or write access to some dags."""
-        if dag_id and dag_id != "~":
-            root_dag_id = self._get_root_dag_id(dag_id)
-            return self.has_access(action, permissions.resource_name_for_dag(root_dag_id))
+        dag_ids = {dag.dag_id for dag in session.execute(select(DagModel.dag_id))}
 
-        user = g.user
-        if action == permissions.ACTION_CAN_READ:
-            return any(self.get_readable_dag_ids(user))
-        return any(self.get_editable_dag_ids(user))
+        if ("GET" in methods and get_auth_manager().is_authorized_dag(method="GET", user=user)) or (
+            "PUT" in methods and get_auth_manager().is_authorized_dag(method="PUT", user=user)
+        ):
+            return dag_ids
 
-    def can_read_dag(self, dag_id: str, user=None) -> bool:
-        """Determines whether a user has DAG read access."""
-        root_dag_id = self._get_root_dag_id(dag_id)
-        dag_resource_name = permissions.resource_name_for_dag(root_dag_id)
-        return self.has_access(permissions.ACTION_CAN_READ, dag_resource_name, user=user)
+        def _is_permitted_dag_id(method: ResourceMethod, methods: Container[ResourceMethod], dag_id: str):
+            return method in methods and get_auth_manager().is_authorized_dag(
+                method=method, details=DagDetails(id=dag_id), user=user
+            )
 
-    def can_edit_dag(self, dag_id: str, user=None) -> bool:
-        """Determines whether a user has DAG edit access."""
-        root_dag_id = self._get_root_dag_id(dag_id)
-        dag_resource_name = permissions.resource_name_for_dag(root_dag_id)
-        return self.has_access(permissions.ACTION_CAN_EDIT, dag_resource_name, user=user)
+        return {

Review Comment:
   I see. I'll do that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vincbeck commented on a diff in pull request #34317: Use auth manager `is_authorized_` APIs to check user permissions in Rest API

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1324610764


##########
airflow/www/auth.py:
##########
@@ -17,56 +17,157 @@
 from __future__ import annotations
 
 from functools import wraps
-from typing import Callable, Sequence, TypeVar, cast
+from typing import TYPE_CHECKING, Callable, TypeVar, cast
 
-from flask import current_app, flash, g, redirect, render_template, request
+from flask import flash, g, redirect, render_template, request
 
+from airflow.auth.managers.models.resource_details import (
+    ConnectionDetails,
+    DagAccessEntity,
+    DagDetails,
+)
 from airflow.configuration import conf
 from airflow.utils.net import get_hostname
 from airflow.www.extensions.init_auth_manager import get_auth_manager
 
+if TYPE_CHECKING:
+    from airflow.auth.managers.base_auth_manager import ResourceMethod
+    from airflow.models import Connection

Review Comment:
   Noted. That'd be worth writing a pre-commit script for such things :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #34317: Use auth manager `is_authorized_` APIs to check user permissions in Rest API

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1324231238


##########
airflow/www/auth.py:
##########
@@ -17,56 +17,157 @@
 from __future__ import annotations
 
 from functools import wraps
-from typing import Callable, Sequence, TypeVar, cast
+from typing import TYPE_CHECKING, Callable, TypeVar, cast
 
-from flask import current_app, flash, g, redirect, render_template, request
+from flask import flash, g, redirect, render_template, request
 
+from airflow.auth.managers.models.resource_details import (
+    ConnectionDetails,
+    DagAccessEntity,
+    DagDetails,
+)
 from airflow.configuration import conf
 from airflow.utils.net import get_hostname
 from airflow.www.extensions.init_auth_manager import get_auth_manager
 
+if TYPE_CHECKING:
+    from airflow.auth.managers.base_auth_manager import ResourceMethod
+    from airflow.models import Connection

Review Comment:
   ```suggestion
       from airflow.models.connection import Connection
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1347826079


##########
airflow/api_connexion/endpoints/dag_run_endpoint.py:
##########
@@ -111,13 +102,8 @@ def get_dag_run(*, dag_id: str, dag_run_id: str, session: Session = NEW_SESSION)
     return dagrun_schema.dump(dag_run)
 
 
-@security.requires_access(
-    [
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DATASET),
-    ],
-)
+@security.requires_access_dag("GET", DagAccessEntity.RUN)
+@security.requires_access_dag("GET", DagAccessEntity.DATASET)

Review Comment:
   Good point! There is a duplicate here. `DagAccessEntity.DATASET` does not make sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1347823415


##########
airflow/api_connexion/endpoints/dag_run_endpoint.py:
##########
@@ -76,12 +77,7 @@
 RESOURCE_EVENT_PREFIX = "dag_run"
 
 
-@security.requires_access(
-    [
-        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
-        (permissions.ACTION_CAN_DELETE, permissions.RESOURCE_DAG_RUN),
-    ],
-)
+@security.requires_access_dag("DELETE", DagAccessEntity.RUN)

Review Comment:
   Also all entities related to DAGs (task instances, dag runs, task logs) have a common logic so it makes sense to have one decorator for all



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1346978348


##########
airflow/api_connexion/endpoints/plugin_endpoint.py:
##########
@@ -22,13 +22,12 @@
 from airflow.api_connexion.parameters import check_limit, format_parameters
 from airflow.api_connexion.schemas.plugin_schema import PluginCollection, plugin_collection_schema
 from airflow.plugins_manager import get_plugin_info
-from airflow.security import permissions
 
 if TYPE_CHECKING:
     from airflow.api_connexion.types import APIResponse
 
 
-@security.requires_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_PLUGIN)])
+@security.requires_access_website()

Review Comment:
   Is this correct?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1347905048


##########
airflow/www/security_manager.py:
##########
@@ -268,125 +271,53 @@ def get_user_roles(user=None):
             user = g.user
         return user.roles
 
-    def get_readable_dags(self, user) -> Iterable[DagModel]:
-        """Gets the DAGs readable by authenticated user."""
-        warnings.warn(
-            "`get_readable_dags` has been deprecated. Please use `get_readable_dag_ids` instead.",
-            RemovedInAirflow3Warning,
-            stacklevel=2,
-        )
-        with warnings.catch_warnings():
-            warnings.simplefilter("ignore", RemovedInAirflow3Warning)
-            return self.get_accessible_dags([permissions.ACTION_CAN_READ], user)
-
-    def get_editable_dags(self, user) -> Iterable[DagModel]:
-        """Gets the DAGs editable by authenticated user."""
-        warnings.warn(
-            "`get_editable_dags` has been deprecated. Please use `get_editable_dag_ids` instead.",
-            RemovedInAirflow3Warning,
-            stacklevel=2,
-        )
-        with warnings.catch_warnings():
-            warnings.simplefilter("ignore", RemovedInAirflow3Warning)
-            return self.get_accessible_dags([permissions.ACTION_CAN_EDIT], user)
-
-    @provide_session
-    def get_accessible_dags(
-        self,
-        user_actions: Container[str] | None,
-        user,
-        session: Session = NEW_SESSION,
-    ) -> Iterable[DagModel]:
-        warnings.warn(
-            "`get_accessible_dags` has been deprecated. Please use `get_accessible_dag_ids` instead.",
-            RemovedInAirflow3Warning,
-            stacklevel=3,
-        )
-        dag_ids = self.get_accessible_dag_ids(user, user_actions, session)
-        return session.scalars(select(DagModel).where(DagModel.dag_id.in_(dag_ids)))
-
-    def get_readable_dag_ids(self, user) -> set[str]:
+    def get_readable_dag_ids(self, user=None) -> set[str]:
         """Gets the DAG IDs readable by authenticated user."""
-        return self.get_accessible_dag_ids(user, [permissions.ACTION_CAN_READ])
+        return self.get_permitted_dag_ids(methods=["GET"], user=user)
 
-    def get_editable_dag_ids(self, user) -> set[str]:
+    def get_editable_dag_ids(self, user=None) -> set[str]:
         """Gets the DAG IDs editable by authenticated user."""
-        return self.get_accessible_dag_ids(user, [permissions.ACTION_CAN_EDIT])
+        return self.get_permitted_dag_ids(methods=["PUT"], user=user)
 
     @provide_session
-    def get_accessible_dag_ids(
+    def get_permitted_dag_ids(
         self,
-        user,
-        user_actions: Container[str] | None = None,
+        *,
+        methods: Container[ResourceMethod] | None = None,
+        user=None,
         session: Session = NEW_SESSION,
     ) -> set[str]:
         """Generic function to get readable or writable DAGs for user."""
-        if not user_actions:
-            user_actions = [permissions.ACTION_CAN_EDIT, permissions.ACTION_CAN_READ]
+        if not methods:
+            methods = ["PUT", "GET"]
 
-        if not get_auth_manager().is_logged_in():
-            roles = user.roles
-        else:
-            if (permissions.ACTION_CAN_EDIT in user_actions and self.can_edit_all_dags(user)) or (
-                permissions.ACTION_CAN_READ in user_actions and self.can_read_all_dags(user)
-            ):
-                return {dag.dag_id for dag in session.execute(select(DagModel.dag_id))}
-            user_query = session.scalar(
-                select(User)
-                .options(
-                    joinedload(User.roles)
-                    .subqueryload(Role.permissions)
-                    .options(joinedload(Permission.action), joinedload(Permission.resource))
+        dag_ids = {dag.dag_id for dag in session.execute(select(DagModel.dag_id))}
+
+        if ("GET" in methods and get_auth_manager().is_authorized_dag(method="GET", user=user)) or (
+            "PUT" in methods and get_auth_manager().is_authorized_dag(method="PUT", user=user)
+        ):
+            return dag_ids
+
+        return {
+            dag_id
+            for dag_id in dag_ids
+            if (
+                "GET" in methods
+                and get_auth_manager().is_authorized_dag(
+                    method="GET", details=DagDetails(id=dag_id), user=user
+                )
+            )
+            or (
+                "PUT" in methods
+                and get_auth_manager().is_authorized_dag(
+                    method="PUT", details=DagDetails(id=dag_id), user=user
                 )

Review Comment:
   Can we split this up? Maybe with a local function. This is very out of hand.



##########
airflow/www/views.py:
##########
@@ -3908,9 +3909,11 @@ class DagFilter(BaseFilter):
     """Filter using DagIDs."""
 
     def apply(self, query, func):
-        if get_airflow_app().appbuilder.sm.has_all_dags_access(g.user):
+        if get_auth_manager().is_authorized_dag(
+            method="GET", user=g.user
+        ) or get_auth_manager().is_authorized_dag(method="PUT", user=g.user):
             return query

Review Comment:
   ```suggestion
           if get_auth_manager().is_authorized_dag(method="GET", user=g.user):
               return query
           if get_auth_manager().is_authorized_dag(method="PUT", user=g.user):
               return query
   ```
   
   I’m not a fan very long if statements, especially with how Black tends to wrap it very awkwardly.



##########
airflow/www/security_manager.py:
##########
@@ -738,24 +639,13 @@ def create_perm_vm_for_all_dag(self) -> None:
     def check_authorization(
         self,
         perms: Sequence[tuple[str, str]] | None = None,
-        dag_id: str | None = None,
     ) -> bool:
         """Checks that the logged in user has the specified permissions."""
         if not perms:
             return True
 
         for perm in perms:
-            if perm in (
-                (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
-                (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
-                (permissions.ACTION_CAN_DELETE, permissions.RESOURCE_DAG),
-            ):
-                can_access_all_dags = self.has_access(*perm)
-                if not can_access_all_dags:
-                    action = perm[0]
-                    if not self.can_access_some_dags(action, dag_id):
-                        return False
-            elif not self.has_access(*perm):
+            if not self.has_access(*perm):
                 return False
 
         return True

Review Comment:
   ```python
       return all(self.has_access(*perm) for perm in perms)
   ```



##########
airflow/www/views.py:
##########
@@ -3908,9 +3909,11 @@ class DagFilter(BaseFilter):
     """Filter using DagIDs."""
 
     def apply(self, query, func):
-        if get_airflow_app().appbuilder.sm.has_all_dags_access(g.user):
+        if get_auth_manager().is_authorized_dag(
+            method="GET", user=g.user
+        ) or get_auth_manager().is_authorized_dag(method="PUT", user=g.user):
             return query

Review Comment:
   ```suggestion
           if get_auth_manager().is_authorized_dag(method="GET", user=g.user):
               return query
           if get_auth_manager().is_authorized_dag(method="PUT", user=g.user):
               return query
   ```
   
   I’m not a fan very long if statements, especially with how Black tends to wrap it very awkwardly.



##########
airflow/www/extensions/init_jinja_globals.py:
##########
@@ -69,10 +70,17 @@ def prepare_jinja_globals():
             "git_version": git_version,
             "k8s_or_k8scelery_executor": IS_K8S_OR_K8SCELERY_EXECUTOR,
             "rest_api_enabled": False,
-            "auth_manager": get_auth_manager(),
             "config_test_connection": conf.get("core", "test_connection", fallback="Disabled"),
         }
 
+        # Extra global specific to auth manager
+        extra_globals.update(
+            {
+                "auth_manager": get_auth_manager(),
+                "DagDetails": DagDetails,
+            }
+        )

Review Comment:
   ```suggestion
           extra_globals["auth_manager"] = get_auth_manager()
           extra_globals["DagDetails"] = DagDetails
   ```
   
   Feels easier to read and actually shorter.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1347965402


##########
airflow/www/extensions/init_jinja_globals.py:
##########
@@ -69,10 +70,17 @@ def prepare_jinja_globals():
             "git_version": git_version,
             "k8s_or_k8scelery_executor": IS_K8S_OR_K8SCELERY_EXECUTOR,
             "rest_api_enabled": False,
-            "auth_manager": get_auth_manager(),
             "config_test_connection": conf.get("core", "test_connection", fallback="Disabled"),
         }
 
+        # Extra global specific to auth manager
+        extra_globals.update(
+            {
+                "auth_manager": get_auth_manager(),
+                "DagDetails": DagDetails,
+            }
+        )

Review Comment:
   Agree, this is better



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on PR #34317:
URL: https://github.com/apache/airflow/pull/34317#issuecomment-1763096270

   Re-reviewed, only few comments left. Also possibly https://github.com/apache/airflow/pull/34942 should be merged first.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1358699380


##########
airflow/www/extensions/init_jinja_globals.py:
##########
@@ -69,10 +70,13 @@ def prepare_jinja_globals():
             "git_version": git_version,
             "k8s_or_k8scelery_executor": IS_K8S_OR_K8SCELERY_EXECUTOR,
             "rest_api_enabled": False,
-            "auth_manager": get_auth_manager(),
             "config_test_connection": conf.get("core", "test_connection", fallback="Disabled"),
         }
 
+        # Extra global specific to auth manager
+        extra_globals["auth_manager"] = get_auth_manager()
+        extra_globals["DagDetails"] = DagDetails

Review Comment:
   Never mind, I confused myself. I figured it out :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1354594447


##########
airflow/api_connexion/endpoints/task_endpoint.py:
##########
@@ -22,21 +22,16 @@
 from airflow.api_connexion import security
 from airflow.api_connexion.exceptions import BadRequest, NotFound
 from airflow.api_connexion.schemas.task_schema import TaskCollection, task_collection_schema, task_schema
+from airflow.auth.managers.models.resource_details import DagAccessEntity
 from airflow.exceptions import TaskNotFound
-from airflow.security import permissions
 from airflow.utils.airflow_flask_app import get_airflow_app
 
 if TYPE_CHECKING:
     from airflow import DAG
     from airflow.api_connexion.types import APIResponse
 
 
-@security.requires_access(
-    [
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
-    ],
-)
+@security.requires_access_dag("GET", DagAccessEntity.TASK_INSTANCE)

Review Comment:
   Is this right? Task is slightly differently from task instance since the latter also requires permission on DAG run.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1355941755


##########
airflow/www/extensions/init_jinja_globals.py:
##########
@@ -69,10 +70,13 @@ def prepare_jinja_globals():
             "git_version": git_version,
             "k8s_or_k8scelery_executor": IS_K8S_OR_K8SCELERY_EXECUTOR,
             "rest_api_enabled": False,
-            "auth_manager": get_auth_manager(),
             "config_test_connection": conf.get("core", "test_connection", fallback="Disabled"),
         }
 
+        # Extra global specific to auth manager
+        extra_globals["auth_manager"] = get_auth_manager()
+        extra_globals["DagDetails"] = DagDetails

Review Comment:
   It indeed sounds strange to pass authmanager as JINJA global context (especially that it is only used for checking "can_edit".
   
   I think it could be easily fixed for all those multiple views by injecting "can_edit" in `baseviews.py` based on whether `dag` is already present in the arguments (all the views that extend dag_html must have `dag` present). 
   
   Smth like:
   
   ````python
       def render_template(self, template, **kwargs):
           """
           Use this method on your own endpoints, will pass the extra_args
           to the templates.
   
           :param template: The template relative path
           :param kwargs: arguments to be passed to the template
           """
           kwargs["base_template"] = self.appbuilder.base_template
           kwargs["appbuilder"] = self.appbuilder 
   
           # THIS
           if "dag" in kwargs:  
              kwargs["can_edit"] = get_auth_manager().is_authorized_dag(method="PUT", details=DagDetails(id=kwargs["dag"].dag_id))
           
           return render_template(
               template, **dict(list(kwargs.items()) + list(self.extra_args.items()))
           )
   
   ```
   
   Or it could be added in all the views individualy in extra_args



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Use auth manager `is_authorized_` APIs to check user permissions in Rest API [airflow]

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1357188155


##########
airflow/api_connexion/endpoints/task_instance_endpoint.py:
##########
@@ -61,13 +61,8 @@
 T = TypeVar("T")
 
 
-@security.requires_access(
-    [
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
-        (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
-    ],
-)
+@security.requires_access_dag("GET", DagAccessEntity.RUN)
+@security.requires_access_dag("GET", DagAccessEntity.TASK_INSTANCE)

Review Comment:
   I added `DagAccessEntity.TASK`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #34317: Use auth manager `is_authorized_` APIs to check user permissions in Rest API

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1337905995


##########
airflow/auth/managers/fab/fab_auth_manager.py:
##########
@@ -209,7 +209,9 @@ def is_authorized_dag(
             resource_type = self._get_fab_resource_type(access_entity)
 
             if method == "GET":
-                return self._is_authorized(method=method, resource_type=resource_type, user=user)
+                return self._is_authorized_dag(
+                    method="GET", details=details, user=user
+                ) and self._is_authorized(method=method, resource_type=resource_type, user=user)

Review Comment:
   ```suggestion
                   if not self._is_authorized_dag(method="GET", details=details, user=user):
                       return False
                   return self._is_authorized(method=method, resource_type=resource_type, user=user)
   ```
   
   Let’s break this, the statement is quite long.



##########
airflow/auth/managers/fab/fab_auth_manager.py:
##########
@@ -209,7 +209,9 @@ def is_authorized_dag(
             resource_type = self._get_fab_resource_type(access_entity)
 
             if method == "GET":
-                return self._is_authorized(method=method, resource_type=resource_type, user=user)
+                return self._is_authorized_dag(
+                    method="GET", details=details, user=user
+                ) and self._is_authorized(method=method, resource_type=resource_type, user=user)

Review Comment:
   ```suggestion
                   if not self._is_authorized_dag(method="GET", details=details, user=user):
                       return False
                   return self._is_authorized(method=method, resource_type=resource_type, user=user)
   ```
   
   Let’s break this, the statement is quite long.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vincbeck commented on a diff in pull request #34317: Use auth manager `is_authorized_` APIs to check user permissions in Rest API

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1338731975


##########
airflow/api_connexion/endpoints/dag_endpoint.py:
##########
@@ -180,7 +180,7 @@ def patch_dags(limit, session, offset=0, only_active=True, tags=None, dag_id_pat
     return dags_collection_schema.dump(DAGCollection(dags=dags, total_entries=total_entries))
 
 
-@security.requires_access([(permissions.ACTION_CAN_DELETE, permissions.RESOURCE_DAG)])
+@security.requires_access_dag("DELETE")

Review Comment:
   1. The difference is just on the semantic. As of today, permissions are represented by a couple of action and resources. You can find the list of actions and resources [here](https://github.com/apache/airflow/blob/main/airflow/security/permissions.py). In AIP-56, we decided to get rid of these representations and come up with more formal/generic to represent permissions. The current list of resources is also too big and we want to make it simpler. I would recommend you reading this [comment thread](https://github.com/apache/airflow/pull/33213#discussion_r1304881823) which might help you the different decisions we took along the process.
   2. We decided to go with a `Literal` instead of an `enum` (see [here](https://github.com/apache/airflow/pull/33213#discussion_r1318202392)). So yes it is a string but constraints with some specific values only



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org