You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "phanikumv (via GitHub)" <gi...@apache.org> on 2023/07/27 10:04:06 UTC

[GitHub] [airflow] phanikumv opened a new pull request, #32883: [WIP]:Refactor Sqlalchemy queries to 2.0 style (Part 7)

phanikumv opened a new pull request, #32883:
URL: https://github.com/apache/airflow/pull/32883

   This is a continuation of the effort to refactor the queries to sqlalchemy 2.0
   
   related: https://github.com/apache/airflow/issues/28723
   
   <!--
    Licensed to the Apache Software Foundation (ASF) under one
    or more contributor license agreements.  See the NOTICE file
    distributed with this work for additional information
    regarding copyright ownership.  The ASF licenses this file
    to you under the Apache License, Version 2.0 (the
    "License"); you may not use this file except in compliance
    with the License.  You may obtain a copy of the License at
   
      http://www.apache.org/licenses/LICENSE-2.0
   
    Unless required by applicable law or agreed to in writing,
    software distributed under the License is distributed on an
    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    KIND, either express or implied.  See the License for the
    specific language governing permissions and limitations
    under the License.
    -->
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of an existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a newsfragment file, named `{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #32883: Refactor Sqlalchemy queries to 2.0 style (Part 7)

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #32883:
URL: https://github.com/apache/airflow/pull/32883#discussion_r1299803639


##########
airflow/www/security.py:
##########
@@ -243,11 +243,9 @@ def __init__(self, appbuilder) -> None:
 
     def _get_root_dag_id(self, dag_id: str) -> str:
         if "." in dag_id:
-            dm = (
-                self.appbuilder.get_session.query(DagModel.dag_id, DagModel.root_dag_id)
-                .filter(DagModel.dag_id == dag_id)
-                .first()
-            )
+            dm = self.appbuilder.get_session.execute(
+                select(DagModel.dag_id, DagModel.root_dag_id).where(DagModel.dag_id == dag_id)
+            ).one()

Review Comment:
   I think `limit(1)` probably doesn’t matter since DagModel has a primary key on dag_id. If this query returns more than one result we’re in big trouble.



##########
airflow/www/security.py:
##########
@@ -243,11 +243,9 @@ def __init__(self, appbuilder) -> None:
 
     def _get_root_dag_id(self, dag_id: str) -> str:
         if "." in dag_id:
-            dm = (
-                self.appbuilder.get_session.query(DagModel.dag_id, DagModel.root_dag_id)
-                .filter(DagModel.dag_id == dag_id)
-                .first()
-            )
+            dm = self.appbuilder.get_session.execute(
+                select(DagModel.dag_id, DagModel.root_dag_id).where(DagModel.dag_id == dag_id)
+            ).one()

Review Comment:
   I think `limit(1)` probably doesn’t matter since DagModel has a primary key on dag_id. If this query returns more than one result we’re in big trouble.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] phanikumv commented on a diff in pull request #32883: [WIP]:Refactor Sqlalchemy queries to 2.0 style (Part 7)

Posted by "phanikumv (via GitHub)" <gi...@apache.org>.
phanikumv commented on code in PR #32883:
URL: https://github.com/apache/airflow/pull/32883#discussion_r1284134678


##########
airflow/utils/scheduler_health.py:
##########
@@ -35,12 +37,12 @@ def do_GET(self):
         if self.path == "/health":
             try:
                 with create_session() as session:
-                    scheduler_job = (
-                        session.query(Job)
+                    scheduler_job = session.scalars(

Review Comment:
   fixed



##########
airflow/www/security.py:
##########
@@ -563,11 +561,11 @@ def add_homepage_access_to_custom_roles(self) -> None:
     def get_all_permissions(self) -> set[tuple[str, str]]:
         """Returns all permissions as a set of tuples with the action and resource names."""
         return set(
-            self.appbuilder.get_session.query(self.permission_model)
-            .join(self.permission_model.action)
-            .join(self.permission_model.resource)
-            .with_entities(self.action_model.name, self.resource_model.name)
-            .all()
+            self.appbuilder.get_session.execute(
+                select(self.action_model.name, self.resource_model.name)
+                .join(self.permission_model.action)
+                .join(self.permission_model.resource)
+            ).all()

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #32883: Refactor Sqlalchemy queries to 2.0 style (Part 7)

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #32883:
URL: https://github.com/apache/airflow/pull/32883#discussion_r1297006917


##########
airflow/www/security.py:
##########
@@ -578,22 +576,22 @@ def _get_all_non_dag_permissions(self) -> dict[tuple[str, str], Permission]:
         return {
             (action_name, resource_name): viewmodel
             for action_name, resource_name, viewmodel in (
-                self.appbuilder.get_session.query(self.permission_model)
-                .join(self.permission_model.action)
-                .join(self.permission_model.resource)
-                .filter(~self.resource_model.name.like(f"{permissions.RESOURCE_DAG_PREFIX}%"))
-                .with_entities(self.action_model.name, self.resource_model.name, self.permission_model)
-                .all()
+                self.appbuilder.get_session.execute(
+                    select(self.action_model.name, self.resource_model.name, self.permission_model)
+                    .join(self.permission_model.action)
+                    .join(self.permission_model.resource)
+                    .where(~self.resource_model.name.like(f"{permissions.RESOURCE_DAG_PREFIX}%"))
+                ).all()
             )
         }
 
     def _get_all_roles_with_permissions(self) -> dict[str, Role]:
         """Returns a dict with a key of role name and value of role with early loaded permissions."""
         return {
             r.name: r
-            for r in self.appbuilder.get_session.query(self.role_model).options(
-                joinedload(self.role_model.permissions)
-            )
+            for r in self.appbuilder.get_session.scalars(
+                select(self.role_model).options(joinedload(self.role_model.permissions))
+            ).unique()

Review Comment:
   Why did the previous method not have this issue?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #32883: Refactor Sqlalchemy queries to 2.0 style (Part 7)

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis commented on code in PR #32883:
URL: https://github.com/apache/airflow/pull/32883#discussion_r1299963745


##########
airflow/datasets/manager.py:
##########
@@ -52,7 +52,7 @@ def register_dataset_change(
         For local datasets, look them up, record the dataset event, queue dagruns, and broadcast
         the dataset event
         """
-        dataset_model = session.query(DatasetModel).filter(DatasetModel.uri == dataset.uri).one_or_none()
+        dataset_model = session.scalar(select(DatasetModel).where(DatasetModel.uri == dataset.uri))

Review Comment:
   Just wondering, what kind of error happen?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #32883: Refactor Sqlalchemy queries to 2.0 style (Part 7)

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #32883:
URL: https://github.com/apache/airflow/pull/32883#discussion_r1299807954


##########
airflow/www/security.py:
##########
@@ -358,16 +356,15 @@ def get_accessible_dag_ids(
             if (permissions.ACTION_CAN_EDIT in user_actions and self.can_edit_all_dags(user)) or (
                 permissions.ACTION_CAN_READ in user_actions and self.can_read_all_dags(user)
             ):
-                return {dag.dag_id for dag in session.query(DagModel.dag_id)}
-            user_query = (
-                session.query(User)
+                return {dag.dag_id for dag in session.execute(select(DagModel.dag_id))}
+            user_query = session.scalar(
+                select(User)
                 .options(
                     joinedload(User.roles)
                     .subqueryload(Role.permissions)
                     .options(joinedload(Permission.action), joinedload(Permission.resource))
                 )
-                .filter(User.id == user.id)
-                .first()
+                .where(User.id == user.id)

Review Comment:
   Same as `DagModel`, it should not possible for this query to return more than one row since `User.id` is the primary key.



##########
airflow/www/security.py:
##########
@@ -358,16 +356,15 @@ def get_accessible_dag_ids(
             if (permissions.ACTION_CAN_EDIT in user_actions and self.can_edit_all_dags(user)) or (
                 permissions.ACTION_CAN_READ in user_actions and self.can_read_all_dags(user)
             ):
-                return {dag.dag_id for dag in session.query(DagModel.dag_id)}
-            user_query = (
-                session.query(User)
+                return {dag.dag_id for dag in session.execute(select(DagModel.dag_id))}
+            user_query = session.scalar(
+                select(User)
                 .options(
                     joinedload(User.roles)
                     .subqueryload(Role.permissions)
                     .options(joinedload(Permission.action), joinedload(Permission.resource))
                 )
-                .filter(User.id == user.id)
-                .first()
+                .where(User.id == user.id)

Review Comment:
   Same as `DagModel`, it should not be possible for this query to return more than one row since `User.id` is the primary key.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #32883: [WIP]:Refactor Sqlalchemy queries to 2.0 style (Part 7)

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #32883:
URL: https://github.com/apache/airflow/pull/32883#discussion_r1277985239


##########
airflow/utils/scheduler_health.py:
##########
@@ -36,11 +38,11 @@ def do_GET(self):
             try:
                 with create_session() as session:
                     scheduler_job = (
-                        session.query(Job)
+                        session.scalars(select(Job))
                         .filter_by(job_type=SchedulerJobRunner.job_type)
                         .filter_by(hostname=get_hostname())
                         .order_by(Job.latest_heartbeat.desc())
-                        .first()
+                        .limit(1)
                     )

Review Comment:
   Because you mocked the session object; with mock you can literally call anything without errors. This fails if you use an actual SQLAlchemy session object
   
   ```pycon
   >>> from airflow.utils.db import create_session
   >>> from airflow.models.job import Job
   >>> from sqlalchemy import select
   >>> with create_session() as s:
   ...  s.scalars(select(Job)).filter_by(job_type="SchedulerJob")
   ...
   Traceback (most recent call last):
     File "<stdin>", line 2, in <module>
   AttributeError: 'ScalarResult' object has no attribute 'filter_by'
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #32883: Refactor Sqlalchemy queries to 2.0 style (Part 7)

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #32883:
URL: https://github.com/apache/airflow/pull/32883#discussion_r1298222468


##########
airflow/models/skipmixin.py:
##########
@@ -121,14 +128,12 @@ def skip(
                 stacklevel=2,
             )
 
-            dag_run = (
-                session.query(DagRun)
-                .filter(
-                    DagRun.dag_id == task_list[0].dag_id,
-                    DagRun.execution_date == execution_date,
+            dag_run = session.scalar(
+                select(DagRun).where(
+                    DagRun.dag_id == task_list[0].dag_id, DagRun.execution_date == execution_date
                 )
-                .one()
             )
+

Review Comment:
   Or
   
   ```python
   session.scalars(...).one()
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #32883: Refactor Sqlalchemy queries to 2.0 style (Part 7)

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #32883:
URL: https://github.com/apache/airflow/pull/32883#discussion_r1299813724


##########
airflow/operators/subdag.py:
##########
@@ -112,7 +113,7 @@ def _validate_pool(self, session):
             conflicts = [t for t in self.subdag.tasks if t.pool == self.pool]
             if conflicts:
                 # only query for pool conflicts if one may exist
-                pool = session.query(Pool).filter(Pool.slots == 1).filter(Pool.pool == self.pool).first()
+                pool = session.scalar(select(Pool).where(Pool.slots == 1, Pool.pool == self.pool))

Review Comment:
   `Pool.pool` is also unique



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] phanikumv commented on a diff in pull request #32883: [WIP]:Refactor Sqlalchemy queries to 2.0 style (Part 7)

Posted by "phanikumv (via GitHub)" <gi...@apache.org>.
phanikumv commented on code in PR #32883:
URL: https://github.com/apache/airflow/pull/32883#discussion_r1284134926


##########
airflow/www/security.py:
##########
@@ -578,22 +576,22 @@ def _get_all_non_dag_permissions(self) -> dict[tuple[str, str], Permission]:
         return {
             (action_name, resource_name): viewmodel
             for action_name, resource_name, viewmodel in (
-                self.appbuilder.get_session.query(self.permission_model)
-                .join(self.permission_model.action)
-                .join(self.permission_model.resource)
-                .filter(~self.resource_model.name.like(f"{permissions.RESOURCE_DAG_PREFIX}%"))
-                .with_entities(self.action_model.name, self.resource_model.name, self.permission_model)
-                .all()
+                self.appbuilder.get_session.execute(
+                    select(self.action_model.name, self.resource_model.name, self.permission_model)
+                    .join(self.permission_model.action)
+                    .join(self.permission_model.resource)
+                    .where(~self.resource_model.name.like(f"{permissions.RESOURCE_DAG_PREFIX}%"))
+                ).all()

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] phanikumv commented on a diff in pull request #32883: [WIP]:Refactor Sqlalchemy queries to 2.0 style (Part 7)

Posted by "phanikumv (via GitHub)" <gi...@apache.org>.
phanikumv commented on code in PR #32883:
URL: https://github.com/apache/airflow/pull/32883#discussion_r1289693312


##########
airflow/dag_processing/processor.py:
##########
@@ -428,20 +428,20 @@ def manage_slas(cls, dag_folder, dag_id: str, session: Session = NEW_SESSION) ->
         if not any(isinstance(ti.sla, timedelta) for ti in dag.tasks):
             cls.logger().info("Skipping SLA check for %s because no tasks in DAG have SLAs", dag)
             return
-
         qry = (
-            session.query(TI.task_id, func.max(DR.execution_date).label("max_ti"))
+            select(TI.task_id, func.max(DR.execution_date).label("max_ti"))
             .join(TI.dag_run)
-            .filter(TI.dag_id == dag.dag_id)
-            .filter(or_(TI.state == TaskInstanceState.SUCCESS, TI.state == TaskInstanceState.SKIPPED))
-            .filter(TI.task_id.in_(dag.task_ids))
+            .where(TI.dag_id == dag.dag_id)
+            .where(or_(TI.state == TaskInstanceState.SUCCESS, TI.state == TaskInstanceState.SKIPPED))
+            .where(TI.task_id.in_(dag.task_ids))
             .group_by(TI.task_id)
-            .subquery("sq")

Review Comment:
   ok will make changes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] phanikumv commented on a diff in pull request #32883: Refactor Sqlalchemy queries to 2.0 style (Part 7)

Posted by "phanikumv (via GitHub)" <gi...@apache.org>.
phanikumv commented on code in PR #32883:
URL: https://github.com/apache/airflow/pull/32883#discussion_r1297221347


##########
airflow/www/security.py:
##########
@@ -578,22 +576,22 @@ def _get_all_non_dag_permissions(self) -> dict[tuple[str, str], Permission]:
         return {
             (action_name, resource_name): viewmodel
             for action_name, resource_name, viewmodel in (
-                self.appbuilder.get_session.query(self.permission_model)
-                .join(self.permission_model.action)
-                .join(self.permission_model.resource)
-                .filter(~self.resource_model.name.like(f"{permissions.RESOURCE_DAG_PREFIX}%"))
-                .with_entities(self.action_model.name, self.resource_model.name, self.permission_model)
-                .all()
+                self.appbuilder.get_session.execute(
+                    select(self.action_model.name, self.resource_model.name, self.permission_model)
+                    .join(self.permission_model.action)
+                    .join(self.permission_model.resource)
+                    .where(~self.resource_model.name.like(f"{permissions.RESOURCE_DAG_PREFIX}%"))
+                ).all()
             )
         }
 
     def _get_all_roles_with_permissions(self) -> dict[str, Role]:
         """Returns a dict with a key of role name and value of role with early loaded permissions."""
         return {
             r.name: r
-            for r in self.appbuilder.get_session.query(self.role_model).options(
-                joinedload(self.role_model.permissions)
-            )
+            for r in self.appbuilder.get_session.scalars(
+                select(self.role_model).options(joinedload(self.role_model.permissions))
+            ).unique()

Review Comment:
   one is a ScalarResult and the other is a orm query. May be the ScalarResult is treated as collection and the query is not 🤷‍♂️ 
   ``` 
   (Pdb) type(self.appbuilder.get_session.scalars(select(self.role_model).options(joinedload(self.role_model.permissions))))
   <class 'sqlalchemy.engine.result.ScalarResult'>
   (Pdb) type(self.appbuilder.get_session.query(self.role_model).options(joinedload(self.role_model.permissions)))
   <class 'sqlalchemy.orm.query.Query'>
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] phanikumv commented on a diff in pull request #32883: Refactor Sqlalchemy queries to 2.0 style (Part 7)

Posted by "phanikumv (via GitHub)" <gi...@apache.org>.
phanikumv commented on code in PR #32883:
URL: https://github.com/apache/airflow/pull/32883#discussion_r1299996816


##########
airflow/datasets/manager.py:
##########
@@ -52,7 +52,7 @@ def register_dataset_change(
         For local datasets, look them up, record the dataset event, queue dagruns, and broadcast
         the dataset event
         """
-        dataset_model = session.query(DatasetModel).filter(DatasetModel.uri == dataset.uri).one_or_none()
+        dataset_model = session.scalar(select(DatasetModel).where(DatasetModel.uri == dataset.uri))

Review Comment:
   thank you for confirming !



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] phanikumv commented on a diff in pull request #32883: Refactor Sqlalchemy queries to 2.0 style (Part 7)

Posted by "phanikumv (via GitHub)" <gi...@apache.org>.
phanikumv commented on code in PR #32883:
URL: https://github.com/apache/airflow/pull/32883#discussion_r1299157545


##########
airflow/models/skipmixin.py:
##########
@@ -121,14 +128,12 @@ def skip(
                 stacklevel=2,
             )
 
-            dag_run = (
-                session.query(DagRun)
-                .filter(
-                    DagRun.dag_id == task_list[0].dag_id,
-                    DagRun.execution_date == execution_date,
+            dag_run = session.scalar(
+                select(DagRun).where(
+                    DagRun.dag_id == task_list[0].dag_id, DagRun.execution_date == execution_date
                 )
-                .one()
             )
+

Review Comment:
   fixed it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] phanikumv commented on a diff in pull request #32883: Refactor Sqlalchemy queries to 2.0 style (Part 7)

Posted by "phanikumv (via GitHub)" <gi...@apache.org>.
phanikumv commented on code in PR #32883:
URL: https://github.com/apache/airflow/pull/32883#discussion_r1296953332


##########
airflow/www/security.py:
##########
@@ -578,22 +576,22 @@ def _get_all_non_dag_permissions(self) -> dict[tuple[str, str], Permission]:
         return {
             (action_name, resource_name): viewmodel
             for action_name, resource_name, viewmodel in (
-                self.appbuilder.get_session.query(self.permission_model)
-                .join(self.permission_model.action)
-                .join(self.permission_model.resource)
-                .filter(~self.resource_model.name.like(f"{permissions.RESOURCE_DAG_PREFIX}%"))
-                .with_entities(self.action_model.name, self.resource_model.name, self.permission_model)
-                .all()
+                self.appbuilder.get_session.execute(
+                    select(self.action_model.name, self.resource_model.name, self.permission_model)
+                    .join(self.permission_model.action)
+                    .join(self.permission_model.resource)
+                    .where(~self.resource_model.name.like(f"{permissions.RESOURCE_DAG_PREFIX}%"))
+                ).all()
             )
         }
 
     def _get_all_roles_with_permissions(self) -> dict[str, Role]:
         """Returns a dict with a key of role name and value of role with early loaded permissions."""
         return {
             r.name: r
-            for r in self.appbuilder.get_session.query(self.role_model).options(
-                joinedload(self.role_model.permissions)
-            )
+            for r in self.appbuilder.get_session.scalars(
+                select(self.role_model).options(joinedload(self.role_model.permissions))
+            ).unique()

Review Comment:
   if we do not use `unique()`, tests are failing with below error:
   
   ```
   ERROR test_security.py::test_init_role_baseview[role0] - sqlalchemy.exc.InvalidRequestError: The unique() method must be invoked on this Result, as it contains results that include joined eager loads against collections
   ERROR test_security.py::test_bulk_sync_roles_baseview[role0] - sqlalchemy.exc.InvalidRequestError: The unique() method must be invoked on this Result, as it contains results that include joined eager loads against collections
   ERROR test_security.py::test_bulk_sync_roles_modelview[role0] - sqlalchemy.exc.InvalidRequestError: The unique() method must be invoked on this Result, as it contains results that include joined eager loads against collections
   ERROR test_security.py::test_update_and_verify_permission_role[role0] - sqlalchemy.exc.InvalidRequestError: The unique() method must be invoked on this Result, as it contains results that include joined eager loads against collections
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] phanikumv commented on a diff in pull request #32883: Refactor Sqlalchemy queries to 2.0 style (Part 7)

Posted by "phanikumv (via GitHub)" <gi...@apache.org>.
phanikumv commented on code in PR #32883:
URL: https://github.com/apache/airflow/pull/32883#discussion_r1299931291


##########
airflow/www/security.py:
##########
@@ -358,16 +356,15 @@ def get_accessible_dag_ids(
             if (permissions.ACTION_CAN_EDIT in user_actions and self.can_edit_all_dags(user)) or (
                 permissions.ACTION_CAN_READ in user_actions and self.can_read_all_dags(user)
             ):
-                return {dag.dag_id for dag in session.query(DagModel.dag_id)}
-            user_query = (
-                session.query(User)
+                return {dag.dag_id for dag in session.execute(select(DagModel.dag_id))}
+            user_query = session.scalar(
+                select(User)
                 .options(
                     joinedload(User.roles)
                     .subqueryload(Role.permissions)
                     .options(joinedload(Permission.action), joinedload(Permission.resource))
                 )
-                .filter(User.id == user.id)
-                .first()
+                .where(User.id == user.id)

Review Comment:
   yeah, hence no change made



##########
airflow/www/security.py:
##########
@@ -524,10 +524,8 @@ def _merge_perm(self, action_name: str, resource_name: str) -> None:
         resource = self.get_resource(resource_name)
         perm = None
         if action and resource:
-            perm = (
-                self.appbuilder.get_session.query(self.permission_model)
-                .filter_by(action=action, resource=resource)
-                .first()
+            perm = self.appbuilder.get_session.scalar(
+                select(self.permission_model).filter_by(action=action, resource=resource)

Review Comment:
   made the change



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #32883: Refactor Sqlalchemy queries to 2.0 style (Part 7)

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis commented on code in PR #32883:
URL: https://github.com/apache/airflow/pull/32883#discussion_r1299979575


##########
airflow/datasets/manager.py:
##########
@@ -52,7 +52,7 @@ def register_dataset_change(
         For local datasets, look them up, record the dataset event, queue dagruns, and broadcast
         the dataset event
         """
-        dataset_model = session.query(DatasetModel).filter(DatasetModel.uri == dataset.uri).one_or_none()
+        dataset_model = session.scalar(select(DatasetModel).where(DatasetModel.uri == dataset.uri))

Review Comment:
   I guess it failed because suggested changes also required to change in mocking object
   
   https://github.com/apache/airflow/blob/eacdeb4ed967313ba49721944c95a3721b5d7afb/tests/datasets/test_manager.py#L57 
   
   ```python
   mock_session.execute.return_value.scalar_one_or_none.return_value = None
   ```
   
   In this case locally `TestDatasetManager` executed successfully
   ![image](https://github.com/apache/airflow/assets/3998685/f7de0e7d-ba17-4359-a816-f39c09326078)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #32883: Refactor Sqlalchemy queries to 2.0 style (Part 7)

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis commented on code in PR #32883:
URL: https://github.com/apache/airflow/pull/32883#discussion_r1299837135


##########
airflow/auth/managers/fab/models/__init__.py:
##########
@@ -210,12 +211,13 @@ def perms(self):
             if current_app:
                 sm = current_app.appbuilder.sm
                 self._perms: set[tuple[str, str]] = set(
-                    sm.get_session.query(sm.action_model.name, sm.resource_model.name)
-                    .join(sm.permission_model.action)
-                    .join(sm.permission_model.resource)
-                    .join(sm.permission_model.role)
-                    .filter(sm.role_model.user.contains(self))
-                    .all()
+                    sm.get_session.execute(
+                        select(sm.action_model.name, sm.resource_model.name)
+                        .join(sm.permission_model.action)
+                        .join(sm.permission_model.resource)
+                        .join(sm.permission_model.role)
+                        .where(sm.role_model.user.contains(self))
+                    )

Review Comment:
    guess it is not affect, because without `all` it will return iterator, and `set` container unwrap it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] phanikumv commented on a diff in pull request #32883: [WIP]:Refactor Sqlalchemy queries to 2.0 style (Part 7)

Posted by "phanikumv (via GitHub)" <gi...@apache.org>.
phanikumv commented on code in PR #32883:
URL: https://github.com/apache/airflow/pull/32883#discussion_r1284134425


##########
airflow/auth/managers/fab/models.py:
##########
@@ -210,12 +211,13 @@ def perms(self):
             if current_app:
                 sm = current_app.appbuilder.sm
                 self._perms: set[tuple[str, str]] = set(
-                    sm.get_session.query(sm.action_model.name, sm.resource_model.name)
-                    .join(sm.permission_model.action)
-                    .join(sm.permission_model.resource)
-                    .join(sm.permission_model.role)
-                    .filter(sm.role_model.user.contains(self))
-                    .all()
+                    sm.get_session.execute(
+                        select(sm.action_model.name, sm.resource_model.name)
+                        .join(sm.permission_model.action)
+                        .join(sm.permission_model.resource)
+                        .join(sm.permission_model.role)
+                        .where(sm.role_model.user.contains(self))
+                    ).all()

Review Comment:
   fixed



##########
airflow/operators/subdag.py:
##########
@@ -112,7 +113,7 @@ def _validate_pool(self, session):
             conflicts = [t for t in self.subdag.tasks if t.pool == self.pool]
             if conflicts:
                 # only query for pool conflicts if one may exist
-                pool = session.query(Pool).filter(Pool.slots == 1).filter(Pool.pool == self.pool).first()
+                pool = session.scalar(select(Pool).where(Pool.slots == 1).where(Pool.pool == self.pool))

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #32883: Refactor Sqlalchemy queries to 2.0 style (Part 7)

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #32883:
URL: https://github.com/apache/airflow/pull/32883#discussion_r1298073537


##########
airflow/www/security.py:
##########
@@ -578,22 +576,22 @@ def _get_all_non_dag_permissions(self) -> dict[tuple[str, str], Permission]:
         return {
             (action_name, resource_name): viewmodel
             for action_name, resource_name, viewmodel in (
-                self.appbuilder.get_session.query(self.permission_model)
-                .join(self.permission_model.action)
-                .join(self.permission_model.resource)
-                .filter(~self.resource_model.name.like(f"{permissions.RESOURCE_DAG_PREFIX}%"))
-                .with_entities(self.action_model.name, self.resource_model.name, self.permission_model)
-                .all()
+                self.appbuilder.get_session.execute(
+                    select(self.action_model.name, self.resource_model.name, self.permission_model)
+                    .join(self.permission_model.action)
+                    .join(self.permission_model.resource)
+                    .where(~self.resource_model.name.like(f"{permissions.RESOURCE_DAG_PREFIX}%"))
+                ).all()
             )
         }
 
     def _get_all_roles_with_permissions(self) -> dict[str, Role]:
         """Returns a dict with a key of role name and value of role with early loaded permissions."""
         return {
             r.name: r
-            for r in self.appbuilder.get_session.query(self.role_model).options(
-                joinedload(self.role_model.permissions)
-            )
+            for r in self.appbuilder.get_session.scalars(
+                select(self.role_model).options(joinedload(self.role_model.permissions))
+            ).unique()

Review Comment:
   Looks like the query API automatically deduplicates on primary key:
   
   https://docs.sqlalchemy.org/en/20/faq/sessions.html#my-query-does-not-return-the-same-number-of-objects-as-query-count-tells-me-why
   
   > The Query object, when asked to return a list of ORM-mapped objects, will deduplicate the objects based on primary key.
   
   For this particular purpose `unique()` is fine since duplicated rows would have the same hash, but alternatively `unique(lambda r: r.id)` is probably both more accurate and slightly faster.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #32883: [WIP]:Refactor Sqlalchemy queries to 2.0 style (Part 7)

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #32883:
URL: https://github.com/apache/airflow/pull/32883#discussion_r1277175387


##########
airflow/models/variable.py:
##########
@@ -195,8 +195,7 @@ def update(
 
         if Variable.get_variable_from_secrets(key=key) is None:
             raise KeyError(f"Variable {key} does not exist")
-
-        obj = session.query(Variable).filter(Variable.key == key).first()
+        obj = session.scalars(select(Variable).where(Variable.key == key)).first()

Review Comment:
   ```suggestion
           obj = session.scalar(select(Variable).where(Variable.key == key))
   ```
   
   Equivalent since `key` is unique.



##########
airflow/utils/scheduler_health.py:
##########
@@ -36,11 +38,11 @@ def do_GET(self):
             try:
                 with create_session() as session:
                     scheduler_job = (
-                        session.query(Job)
+                        session.scalars(select(Job))
                         .filter_by(job_type=SchedulerJobRunner.job_type)
                         .filter_by(hostname=get_hostname())
                         .order_by(Job.latest_heartbeat.desc())
-                        .first()
+                        .limit(1)
                     )

Review Comment:
   This change is wrong!



##########
airflow/www/security.py:
##########
@@ -578,22 +576,24 @@ def _get_all_non_dag_permissions(self) -> dict[tuple[str, str], Permission]:
         return {
             (action_name, resource_name): viewmodel
             for action_name, resource_name, viewmodel in (
-                self.appbuilder.get_session.query(self.permission_model)
-                .join(self.permission_model.action)
-                .join(self.permission_model.resource)
-                .filter(~self.resource_model.name.like(f"{permissions.RESOURCE_DAG_PREFIX}%"))
-                .with_entities(self.action_model.name, self.resource_model.name, self.permission_model)
-                .all()
+                self.appbuilder.get_session.execute(
+                    select(self.action_model.name, self.resource_model.name, self.permission_model)
+                    .join(self.permission_model.action)
+                    .join(self.permission_model.resource)
+                    .where(~self.resource_model.name.like(f"{permissions.RESOURCE_DAG_PREFIX}%"))
+                ).all()
             )
         }
 
     def _get_all_roles_with_permissions(self) -> dict[str, Role]:
         """Returns a dict with a key of role name and value of role with early loaded permissions."""
         return {
             r.name: r
-            for r in self.appbuilder.get_session.query(self.role_model).options(
-                joinedload(self.role_model.permissions)
+            for r in self.appbuilder.get_session.scalars(
+                select(self.role_model).options(joinedload(self.role_model.permissions))
             )
+            .unique()
+            .all()

Review Comment:
   ```suggestion
   ```



##########
airflow/www/security.py:
##########
@@ -398,13 +395,16 @@ def get_accessible_dag_ids(
 
                 resource = permission.resource.name
                 if resource == permissions.RESOURCE_DAG:
-                    return {dag.dag_id for dag in session.query(DagModel.dag_id)}
+                    return {dag.dag_id for dag in session.execute(select(DagModel.dag_id)).all()}

Review Comment:
   ```suggestion
                       return {dag.dag_id for dag in session.execute(select(DagModel.dag_id))}
   ```



##########
airflow/operators/subdag.py:
##########
@@ -112,7 +113,9 @@ def _validate_pool(self, session):
             conflicts = [t for t in self.subdag.tasks if t.pool == self.pool]
             if conflicts:
                 # only query for pool conflicts if one may exist
-                pool = session.query(Pool).filter(Pool.slots == 1).filter(Pool.pool == self.pool).first()
+                pool = session.scalar(
+                    select(Pool).where(Pool.slots == 1).where(Pool.pool == self.pool).limit(1)
+                )

Review Comment:
   ```suggestion
                   pool = session.scalar(select(Pool).where(Pool.slots == 1).where(Pool.pool == self.pool))
   ```
   
   Since `pool` is unique



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #32883: Refactor Sqlalchemy queries to 2.0 style (Part 7)

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #32883:
URL: https://github.com/apache/airflow/pull/32883#discussion_r1299809718


##########
airflow/www/security.py:
##########
@@ -524,10 +524,8 @@ def _merge_perm(self, action_name: str, resource_name: str) -> None:
         resource = self.get_resource(resource_name)
         perm = None
         if action and resource:
-            perm = (
-                self.appbuilder.get_session.query(self.permission_model)
-                .filter_by(action=action, resource=resource)
-                .first()
+            perm = self.appbuilder.get_session.scalar(
+                select(self.permission_model).filter_by(action=action, resource=resource)

Review Comment:
   But limiting this one is probably a good idea



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #32883: Refactor Sqlalchemy queries to 2.0 style (Part 7)

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis commented on code in PR #32883:
URL: https://github.com/apache/airflow/pull/32883#discussion_r1299867029


##########
airflow/datasets/manager.py:
##########
@@ -52,7 +52,7 @@ def register_dataset_change(
         For local datasets, look them up, record the dataset event, queue dagruns, and broadcast
         the dataset event
         """
-        dataset_model = session.query(DatasetModel).filter(DatasetModel.uri == dataset.uri).one_or_none()
+        dataset_model = session.scalar(select(DatasetModel).where(DatasetModel.uri == dataset.uri))

Review Comment:
   Just to keep the same logic as original query, however might be there is no possible to get more than 1 record here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] phanikumv commented on a diff in pull request #32883: Refactor Sqlalchemy queries to 2.0 style (Part 7)

Posted by "phanikumv (via GitHub)" <gi...@apache.org>.
phanikumv commented on code in PR #32883:
URL: https://github.com/apache/airflow/pull/32883#discussion_r1299873919


##########
airflow/datasets/manager.py:
##########
@@ -52,7 +52,7 @@ def register_dataset_change(
         For local datasets, look them up, record the dataset event, queue dagruns, and broadcast
         the dataset event
         """
-        dataset_model = session.query(DatasetModel).filter(DatasetModel.uri == dataset.uri).one_or_none()
+        dataset_model = session.scalar(select(DatasetModel).where(DatasetModel.uri == dataset.uri))

Review Comment:
   okay, will make it as `session.scalars(...).one()` to be more close to original query.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #32883: Refactor Sqlalchemy queries to 2.0 style (Part 7)

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis commented on code in PR #32883:
URL: https://github.com/apache/airflow/pull/32883#discussion_r1299986513


##########
airflow/datasets/manager.py:
##########
@@ -52,7 +52,7 @@ def register_dataset_change(
         For local datasets, look them up, record the dataset event, queue dagruns, and broadcast
         the dataset event
         """
-        dataset_model = session.query(DatasetModel).filter(DatasetModel.uri == dataset.uri).one_or_none()
+        dataset_model = session.scalar(select(DatasetModel).where(DatasetModel.uri == dataset.uri))

Review Comment:
   Anyway, I've checked locally that if I tried to put with same `DatasetModel.uri` I've got an error
   
   ```console
   sqlalchemy.exc.IntegrityError: (sqlite3.IntegrityError) UNIQUE constraint failed: dataset.uri
   ```
   
   So I guess we don't need `scalar_one_or_none()` in the current codebase.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #32883: [WIP]:Refactor Sqlalchemy queries to 2.0 style (Part 7)

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #32883:
URL: https://github.com/apache/airflow/pull/32883#discussion_r1277219533


##########
airflow/utils/scheduler_health.py:
##########
@@ -36,11 +38,11 @@ def do_GET(self):
             try:
                 with create_session() as session:
                     scheduler_job = (
-                        session.query(Job)
+                        session.scalars(select(Job))
                         .filter_by(job_type=SchedulerJobRunner.job_type)
                         .filter_by(hostname=get_hostname())
                         .order_by(Job.latest_heartbeat.desc())
-                        .first()
+                        .limit(1)
                     )

Review Comment:
   You’re calling `filter_by` etc. against the result of `session.scalars` instead of `select`. The parenthesis on line 41 is probably misplaced?



##########
airflow/utils/scheduler_health.py:
##########
@@ -36,11 +38,11 @@ def do_GET(self):
             try:
                 with create_session() as session:
                     scheduler_job = (
-                        session.query(Job)
+                        session.scalars(select(Job))
                         .filter_by(job_type=SchedulerJobRunner.job_type)
                         .filter_by(hostname=get_hostname())
                         .order_by(Job.latest_heartbeat.desc())
-                        .first()
+                        .limit(1)
                     )

Review Comment:
   You’re calling `filter_by` etc. against the result of `session.scalars` instead of `select`. The parenthesis on line 41 is probably misplaced?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] phanikumv commented on a diff in pull request #32883: Refactor Sqlalchemy queries to 2.0 style (Part 7)

Posted by "phanikumv (via GitHub)" <gi...@apache.org>.
phanikumv commented on code in PR #32883:
URL: https://github.com/apache/airflow/pull/32883#discussion_r1299852858


##########
airflow/datasets/manager.py:
##########
@@ -52,7 +52,7 @@ def register_dataset_change(
         For local datasets, look them up, record the dataset event, queue dagruns, and broadcast
         the dataset event
         """
-        dataset_model = session.query(DatasetModel).filter(DatasetModel.uri == dataset.uri).one_or_none()
+        dataset_model = session.scalar(select(DatasetModel).where(DatasetModel.uri == dataset.uri))

Review Comment:
   what is the benefit of using `scalar_one_or_none()` here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] phanikumv commented on a diff in pull request #32883: Refactor Sqlalchemy queries to 2.0 style (Part 7)

Posted by "phanikumv (via GitHub)" <gi...@apache.org>.
phanikumv commented on code in PR #32883:
URL: https://github.com/apache/airflow/pull/32883#discussion_r1299931869


##########
airflow/www/security.py:
##########
@@ -243,11 +243,9 @@ def __init__(self, appbuilder) -> None:
 
     def _get_root_dag_id(self, dag_id: str) -> str:
         if "." in dag_id:
-            dm = (
-                self.appbuilder.get_session.query(DagModel.dag_id, DagModel.root_dag_id)
-                .filter(DagModel.dag_id == dag_id)
-                .first()
-            )
+            dm = self.appbuilder.get_session.execute(
+                select(DagModel.dag_id, DagModel.root_dag_id).where(DagModel.dag_id == dag_id)
+            ).one()

Review Comment:
   yeah, hence didnt make any change



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #32883: Refactor Sqlalchemy queries to 2.0 style (Part 7)

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #32883:
URL: https://github.com/apache/airflow/pull/32883#discussion_r1299813254


##########
airflow/models/variable.py:
##########
@@ -200,8 +200,7 @@ def update(
 
         if Variable.get_variable_from_secrets(key=key) is None:
             raise KeyError(f"Variable {key} does not exist")
-
-        obj = session.query(Variable).filter(Variable.key == key).first()
+        obj = session.scalar(select(Variable).where(Variable.key == key))

Review Comment:
   `Variable.key` is unique so it shouldn’t matter



##########
airflow/models/variable.py:
##########
@@ -200,8 +200,7 @@ def update(
 
         if Variable.get_variable_from_secrets(key=key) is None:
             raise KeyError(f"Variable {key} does not exist")
-
-        obj = session.query(Variable).filter(Variable.key == key).first()
+        obj = session.scalar(select(Variable).where(Variable.key == key))

Review Comment:
   `Variable.key` is unique so it shouldn’t matter



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] phanikumv commented on a diff in pull request #32883: Refactor Sqlalchemy queries to 2.0 style (Part 7)

Posted by "phanikumv (via GitHub)" <gi...@apache.org>.
phanikumv commented on code in PR #32883:
URL: https://github.com/apache/airflow/pull/32883#discussion_r1299873919


##########
airflow/datasets/manager.py:
##########
@@ -52,7 +52,7 @@ def register_dataset_change(
         For local datasets, look them up, record the dataset event, queue dagruns, and broadcast
         the dataset event
         """
-        dataset_model = session.query(DatasetModel).filter(DatasetModel.uri == dataset.uri).one_or_none()
+        dataset_model = session.scalar(select(DatasetModel).where(DatasetModel.uri == dataset.uri))

Review Comment:
   okay, will make it as `session.scalars(...).one()` to be more close to original query.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] phanikumv commented on a diff in pull request #32883: Refactor Sqlalchemy queries to 2.0 style (Part 7)

Posted by "phanikumv (via GitHub)" <gi...@apache.org>.
phanikumv commented on code in PR #32883:
URL: https://github.com/apache/airflow/pull/32883#discussion_r1297221347


##########
airflow/www/security.py:
##########
@@ -578,22 +576,22 @@ def _get_all_non_dag_permissions(self) -> dict[tuple[str, str], Permission]:
         return {
             (action_name, resource_name): viewmodel
             for action_name, resource_name, viewmodel in (
-                self.appbuilder.get_session.query(self.permission_model)
-                .join(self.permission_model.action)
-                .join(self.permission_model.resource)
-                .filter(~self.resource_model.name.like(f"{permissions.RESOURCE_DAG_PREFIX}%"))
-                .with_entities(self.action_model.name, self.resource_model.name, self.permission_model)
-                .all()
+                self.appbuilder.get_session.execute(
+                    select(self.action_model.name, self.resource_model.name, self.permission_model)
+                    .join(self.permission_model.action)
+                    .join(self.permission_model.resource)
+                    .where(~self.resource_model.name.like(f"{permissions.RESOURCE_DAG_PREFIX}%"))
+                ).all()
             )
         }
 
     def _get_all_roles_with_permissions(self) -> dict[str, Role]:
         """Returns a dict with a key of role name and value of role with early loaded permissions."""
         return {
             r.name: r
-            for r in self.appbuilder.get_session.query(self.role_model).options(
-                joinedload(self.role_model.permissions)
-            )
+            for r in self.appbuilder.get_session.scalars(
+                select(self.role_model).options(joinedload(self.role_model.permissions))
+            ).unique()

Review Comment:
   one is a ScalarResult and the other is a orm query. May be the ScalarResult is treated as collection and the query is not
   ```
   (Pdb) type(self.appbuilder.get_session.scalars(select(self.role_model).options(joinedload(self.role_model.permissions))))
   <class 'sqlalchemy.engine.result.ScalarResult'>
   (Pdb) type(self.appbuilder.get_session.query(self.role_model).options(joinedload(self.role_model.permissions)))
   <class 'sqlalchemy.orm.query.Query'>
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] hussein-awala commented on a diff in pull request #32883: Refactor Sqlalchemy queries to 2.0 style (Part 7)

Posted by "hussein-awala (via GitHub)" <gi...@apache.org>.
hussein-awala commented on code in PR #32883:
URL: https://github.com/apache/airflow/pull/32883#discussion_r1298165388


##########
airflow/models/skipmixin.py:
##########
@@ -121,14 +128,12 @@ def skip(
                 stacklevel=2,
             )
 
-            dag_run = (
-                session.query(DagRun)
-                .filter(
-                    DagRun.dag_id == task_list[0].dag_id,
-                    DagRun.execution_date == execution_date,
+            dag_run = session.scalar(
+                select(DagRun).where(
+                    DagRun.dag_id == task_list[0].dag_id, DagRun.execution_date == execution_date
                 )
-                .one()
             )
+

Review Comment:
   the .one() method in the v1 fails if there is no result, which is data validation check. For that we cannot replace it with `session.scalar` which returns None if there is no result.
   
   According to [the migration doc](https://docs.sqlalchemy.org/en/20/changelog/migration_20.html):
   the following query in v1:
   ```python
   session.query(User).\
     filter_by(name="some user").\
     one()
   ```
   could be written as the following in v2 style:
   ```python
   session.execute(
     select(User).
     filter_by(name="some user")
   ).scalar_one()
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #32883: [WIP]:Refactor Sqlalchemy queries to 2.0 style (Part 7)

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #32883:
URL: https://github.com/apache/airflow/pull/32883#discussion_r1289595412


##########
airflow/dag_processing/processor.py:
##########
@@ -428,20 +428,20 @@ def manage_slas(cls, dag_folder, dag_id: str, session: Session = NEW_SESSION) ->
         if not any(isinstance(ti.sla, timedelta) for ti in dag.tasks):
             cls.logger().info("Skipping SLA check for %s because no tasks in DAG have SLAs", dag)
             return
-
         qry = (
-            session.query(TI.task_id, func.max(DR.execution_date).label("max_ti"))
+            select(TI.task_id, func.max(DR.execution_date).label("max_ti"))
             .join(TI.dag_run)
-            .filter(TI.dag_id == dag.dag_id)
-            .filter(or_(TI.state == TaskInstanceState.SUCCESS, TI.state == TaskInstanceState.SKIPPED))
-            .filter(TI.task_id.in_(dag.task_ids))
+            .where(TI.dag_id == dag.dag_id)
+            .where(or_(TI.state == TaskInstanceState.SUCCESS, TI.state == TaskInstanceState.SKIPPED))
+            .where(TI.task_id.in_(dag.task_ids))
             .group_by(TI.task_id)
-            .subquery("sq")

Review Comment:
   IIRC not using `subquery` here is deprecated so the call should be kept.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #32883: [WIP]:Refactor Sqlalchemy queries to 2.0 style (Part 7)

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #32883:
URL: https://github.com/apache/airflow/pull/32883#discussion_r1280012837


##########
airflow/operators/subdag.py:
##########
@@ -112,7 +113,7 @@ def _validate_pool(self, session):
             conflicts = [t for t in self.subdag.tasks if t.pool == self.pool]
             if conflicts:
                 # only query for pool conflicts if one may exist
-                pool = session.query(Pool).filter(Pool.slots == 1).filter(Pool.pool == self.pool).first()
+                pool = session.scalar(select(Pool).where(Pool.slots == 1).where(Pool.pool == self.pool))

Review Comment:
   ```suggestion
                   pool = session.scalar(select(Pool).where(Pool.slots == 1, Pool.pool == self.pool))
   ```



##########
airflow/auth/managers/fab/models.py:
##########
@@ -210,12 +211,13 @@ def perms(self):
             if current_app:
                 sm = current_app.appbuilder.sm
                 self._perms: set[tuple[str, str]] = set(
-                    sm.get_session.query(sm.action_model.name, sm.resource_model.name)
-                    .join(sm.permission_model.action)
-                    .join(sm.permission_model.resource)
-                    .join(sm.permission_model.role)
-                    .filter(sm.role_model.user.contains(self))
-                    .all()
+                    sm.get_session.execute(
+                        select(sm.action_model.name, sm.resource_model.name)
+                        .join(sm.permission_model.action)
+                        .join(sm.permission_model.resource)
+                        .join(sm.permission_model.role)
+                        .where(sm.role_model.user.contains(self))
+                    ).all()

Review Comment:
   ```suggestion
                       )
   ```
   
   Not needed since this is immediately iterated into a set.



##########
airflow/www/security.py:
##########
@@ -563,11 +561,11 @@ def add_homepage_access_to_custom_roles(self) -> None:
     def get_all_permissions(self) -> set[tuple[str, str]]:
         """Returns all permissions as a set of tuples with the action and resource names."""
         return set(
-            self.appbuilder.get_session.query(self.permission_model)
-            .join(self.permission_model.action)
-            .join(self.permission_model.resource)
-            .with_entities(self.action_model.name, self.resource_model.name)
-            .all()
+            self.appbuilder.get_session.execute(
+                select(self.action_model.name, self.resource_model.name)
+                .join(self.permission_model.action)
+                .join(self.permission_model.resource)
+            ).all()

Review Comment:
   ```suggestion
               )
   ```
   
   This is also immediately turned into a set.



##########
airflow/www/security.py:
##########
@@ -578,22 +576,22 @@ def _get_all_non_dag_permissions(self) -> dict[tuple[str, str], Permission]:
         return {
             (action_name, resource_name): viewmodel
             for action_name, resource_name, viewmodel in (
-                self.appbuilder.get_session.query(self.permission_model)
-                .join(self.permission_model.action)
-                .join(self.permission_model.resource)
-                .filter(~self.resource_model.name.like(f"{permissions.RESOURCE_DAG_PREFIX}%"))
-                .with_entities(self.action_model.name, self.resource_model.name, self.permission_model)
-                .all()
+                self.appbuilder.get_session.execute(
+                    select(self.action_model.name, self.resource_model.name, self.permission_model)
+                    .join(self.permission_model.action)
+                    .join(self.permission_model.resource)
+                    .where(~self.resource_model.name.like(f"{permissions.RESOURCE_DAG_PREFIX}%"))
+                ).all()
             )
         }
 
     def _get_all_roles_with_permissions(self) -> dict[str, Role]:
         """Returns a dict with a key of role name and value of role with early loaded permissions."""
         return {
             r.name: r
-            for r in self.appbuilder.get_session.query(self.role_model).options(
-                joinedload(self.role_model.permissions)
-            )
+            for r in self.appbuilder.get_session.scalars(
+                select(self.role_model).options(joinedload(self.role_model.permissions))
+            ).unique()

Review Comment:
   Is this correct? There wasn’t a unique call previously (although it probably doesn’t matter since this is also immediately turned into a set so duplicates are eliminated anyway)



##########
airflow/www/security.py:
##########
@@ -578,22 +576,22 @@ def _get_all_non_dag_permissions(self) -> dict[tuple[str, str], Permission]:
         return {
             (action_name, resource_name): viewmodel
             for action_name, resource_name, viewmodel in (
-                self.appbuilder.get_session.query(self.permission_model)
-                .join(self.permission_model.action)
-                .join(self.permission_model.resource)
-                .filter(~self.resource_model.name.like(f"{permissions.RESOURCE_DAG_PREFIX}%"))
-                .with_entities(self.action_model.name, self.resource_model.name, self.permission_model)
-                .all()
+                self.appbuilder.get_session.execute(
+                    select(self.action_model.name, self.resource_model.name, self.permission_model)
+                    .join(self.permission_model.action)
+                    .join(self.permission_model.resource)
+                    .where(~self.resource_model.name.like(f"{permissions.RESOURCE_DAG_PREFIX}%"))
+                ).all()

Review Comment:
   ```suggestion
                   )
   ```
   
   Same here.



##########
airflow/utils/scheduler_health.py:
##########
@@ -35,12 +37,12 @@ def do_GET(self):
         if self.path == "/health":
             try:
                 with create_session() as session:
-                    scheduler_job = (
-                        session.query(Job)
+                    scheduler_job = session.scalars(

Review Comment:
   ```suggestion
                       scheduler_job = session.scalar(
   ```
   
   The original query uses `first()`; changing it to `scalars` is wrong (I believe)



##########
airflow/www/security.py:
##########
@@ -578,22 +576,22 @@ def _get_all_non_dag_permissions(self) -> dict[tuple[str, str], Permission]:
         return {
             (action_name, resource_name): viewmodel
             for action_name, resource_name, viewmodel in (
-                self.appbuilder.get_session.query(self.permission_model)
-                .join(self.permission_model.action)
-                .join(self.permission_model.resource)
-                .filter(~self.resource_model.name.like(f"{permissions.RESOURCE_DAG_PREFIX}%"))
-                .with_entities(self.action_model.name, self.resource_model.name, self.permission_model)
-                .all()
+                self.appbuilder.get_session.execute(
+                    select(self.action_model.name, self.resource_model.name, self.permission_model)
+                    .join(self.permission_model.action)
+                    .join(self.permission_model.resource)
+                    .where(~self.resource_model.name.like(f"{permissions.RESOURCE_DAG_PREFIX}%"))
+                ).all()
             )
         }
 
     def _get_all_roles_with_permissions(self) -> dict[str, Role]:
         """Returns a dict with a key of role name and value of role with early loaded permissions."""
         return {
             r.name: r
-            for r in self.appbuilder.get_session.query(self.role_model).options(
-                joinedload(self.role_model.permissions)
-            )
+            for r in self.appbuilder.get_session.scalars(
+                select(self.role_model).options(joinedload(self.role_model.permissions))
+            ).unique()

Review Comment:
   Is this correct? There wasn’t a unique call previously (although it probably doesn’t matter since this is also immediately turned into a set so duplicates are eliminated anyway)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] phanikumv closed pull request #32883: [WIP]:Refactor Sqlalchemy queries to 2.0 style (Part 7)

Posted by "phanikumv (via GitHub)" <gi...@apache.org>.
phanikumv closed pull request #32883: [WIP]:Refactor Sqlalchemy queries to 2.0 style (Part 7)
URL: https://github.com/apache/airflow/pull/32883


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] phanikumv commented on a diff in pull request #32883: [WIP]:Refactor Sqlalchemy queries to 2.0 style (Part 7)

Posted by "phanikumv (via GitHub)" <gi...@apache.org>.
phanikumv commented on code in PR #32883:
URL: https://github.com/apache/airflow/pull/32883#discussion_r1277476894


##########
airflow/utils/scheduler_health.py:
##########
@@ -36,11 +38,11 @@ def do_GET(self):
             try:
                 with create_session() as session:
                     scheduler_job = (
-                        session.query(Job)
+                        session.scalars(select(Job))
                         .filter_by(job_type=SchedulerJobRunner.job_type)
                         .filter_by(hostname=get_hostname())
                         .order_by(Job.latest_heartbeat.desc())
-                        .first()
+                        .limit(1)
                     )

Review Comment:
   I didnt get any error syntactically or when executing the tests. 🤷‍♂️ 
   
   ```
   abc=session.query(Job).filter_by(job_type=SchedulerJobRunner.job_type).filter_by(hostname=get_hostname()).order_by(Job.latest_heartbeat.desc()).first()
   
   (Pdb) abc
   <MagicMock name='create_session().__enter__().query().filter_by().filter_by().order_by().first()' id='281473176118176'>
   
   def1 = session.scalars(select(Job)).filter_by(job_type=SchedulerJobRunner.job_type).filter_by(hostname=get_hostname()).order_by(Job.latest_heartbeat.desc()).limit(1)
   
   (Pdb) def1
   <MagicMock name='create_session().__enter__().scalars().filter_by().filter_by().order_by().limit()' id='281473183539744'>
   ```



##########
airflow/utils/scheduler_health.py:
##########
@@ -36,11 +38,11 @@ def do_GET(self):
             try:
                 with create_session() as session:
                     scheduler_job = (
-                        session.query(Job)
+                        session.scalars(select(Job))
                         .filter_by(job_type=SchedulerJobRunner.job_type)
                         .filter_by(hostname=get_hostname())
                         .order_by(Job.latest_heartbeat.desc())
-                        .first()
+                        .limit(1)
                     )

Review Comment:
   I didnt get any error syntactically or when executing the tests. 🤷‍♂️ 
   
   ```
   abc=session.query(Job).filter_by(job_type=SchedulerJobRunner.job_type).filter_by(hostname=get_hostname()).order_by(Job.latest_heartbeat.desc()).first()
   
   (Pdb) abc
   <MagicMock name='create_session().__enter__().query().filter_by().filter_by().order_by().first()' id='281473176118176'>
   
   def1 = session.scalars(select(Job)).filter_by(job_type=SchedulerJobRunner.job_type).filter_by(hostname=get_hostname()).order_by(Job.latest_heartbeat.desc()).limit(1)
   
   (Pdb) def1
   <MagicMock name='create_session().__enter__().scalars().filter_by().filter_by().order_by().limit()' id='281473183539744'>
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] phanikumv commented on a diff in pull request #32883: [WIP]:Refactor Sqlalchemy queries to 2.0 style (Part 7)

Posted by "phanikumv (via GitHub)" <gi...@apache.org>.
phanikumv commented on code in PR #32883:
URL: https://github.com/apache/airflow/pull/32883#discussion_r1277202969


##########
airflow/utils/scheduler_health.py:
##########
@@ -36,11 +38,11 @@ def do_GET(self):
             try:
                 with create_session() as session:
                     scheduler_job = (
-                        session.query(Job)
+                        session.scalars(select(Job))
                         .filter_by(job_type=SchedulerJobRunner.job_type)
                         .filter_by(hostname=get_hostname())
                         .order_by(Job.latest_heartbeat.desc())
-                        .first()
+                        .limit(1)
                     )

Review Comment:
   sorry, why is this wrong @uranusjr ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] phanikumv commented on a diff in pull request #32883: Refactor Sqlalchemy queries to 2.0 style (Part 7)

Posted by "phanikumv (via GitHub)" <gi...@apache.org>.
phanikumv commented on code in PR #32883:
URL: https://github.com/apache/airflow/pull/32883#discussion_r1299929085


##########
airflow/datasets/manager.py:
##########
@@ -52,7 +52,7 @@ def register_dataset_change(
         For local datasets, look them up, record the dataset event, queue dagruns, and broadcast
         the dataset event
         """
-        dataset_model = session.query(DatasetModel).filter(DatasetModel.uri == dataset.uri).one_or_none()
+        dataset_model = session.scalar(select(DatasetModel).where(DatasetModel.uri == dataset.uri))

Review Comment:
   test are failing if I make change according to your suggestion , so retaining my change



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ephraimbuddy commented on a diff in pull request #32883: Refactor Sqlalchemy queries to 2.0 style (Part 7)

Posted by "ephraimbuddy (via GitHub)" <gi...@apache.org>.
ephraimbuddy commented on code in PR #32883:
URL: https://github.com/apache/airflow/pull/32883#discussion_r1299772232


##########
airflow/utils/sqlalchemy.py:
##########
@@ -436,7 +436,7 @@ def lock_rows(query: Query, session: Session) -> Generator[None, None, None]:
 
     :meta private:
     """
-    locked_rows = with_row_locks(query, session).all()
+    locked_rows = with_row_locks(query, session)

Review Comment:
   Is the removal necessary?



##########
airflow/models/variable.py:
##########
@@ -200,8 +200,7 @@ def update(
 
         if Variable.get_variable_from_secrets(key=key) is None:
             raise KeyError(f"Variable {key} does not exist")
-
-        obj = session.query(Variable).filter(Variable.key == key).first()
+        obj = session.scalar(select(Variable).where(Variable.key == key))

Review Comment:
   Should we use `limit(1)` since it was picking the first item?



##########
airflow/www/security.py:
##########
@@ -243,11 +243,9 @@ def __init__(self, appbuilder) -> None:
 
     def _get_root_dag_id(self, dag_id: str) -> str:
         if "." in dag_id:
-            dm = (
-                self.appbuilder.get_session.query(DagModel.dag_id, DagModel.root_dag_id)
-                .filter(DagModel.dag_id == dag_id)
-                .first()
-            )
+            dm = self.appbuilder.get_session.execute(
+                select(DagModel.dag_id, DagModel.root_dag_id).where(DagModel.dag_id == dag_id)
+            ).one()

Review Comment:
   Since `first` was used originally, we should use `limit(1)` because using `one()` will throw error



##########
airflow/www/security.py:
##########
@@ -358,16 +356,15 @@ def get_accessible_dag_ids(
             if (permissions.ACTION_CAN_EDIT in user_actions and self.can_edit_all_dags(user)) or (
                 permissions.ACTION_CAN_READ in user_actions and self.can_read_all_dags(user)
             ):
-                return {dag.dag_id for dag in session.query(DagModel.dag_id)}
-            user_query = (
-                session.query(User)
+                return {dag.dag_id for dag in session.execute(select(DagModel.dag_id))}
+            user_query = session.scalar(
+                select(User)
                 .options(
                     joinedload(User.roles)
                     .subqueryload(Role.permissions)
                     .options(joinedload(Permission.action), joinedload(Permission.resource))
                 )
-                .filter(User.id == user.id)
-                .first()
+                .where(User.id == user.id)

Review Comment:
   ```suggestion
                   .where(User.id == user.id).limit(1)
   ```



##########
airflow/auth/managers/fab/models/__init__.py:
##########
@@ -210,12 +211,13 @@ def perms(self):
             if current_app:
                 sm = current_app.appbuilder.sm
                 self._perms: set[tuple[str, str]] = set(
-                    sm.get_session.query(sm.action_model.name, sm.resource_model.name)
-                    .join(sm.permission_model.action)
-                    .join(sm.permission_model.resource)
-                    .join(sm.permission_model.role)
-                    .filter(sm.role_model.user.contains(self))
-                    .all()
+                    sm.get_session.execute(
+                        select(sm.action_model.name, sm.resource_model.name)
+                        .join(sm.permission_model.action)
+                        .join(sm.permission_model.resource)
+                        .join(sm.permission_model.role)
+                        .where(sm.role_model.user.contains(self))
+                    )

Review Comment:
   Seems we are missing `all()` but if that doesn't have an effect then fine



##########
airflow/operators/subdag.py:
##########
@@ -112,7 +113,7 @@ def _validate_pool(self, session):
             conflicts = [t for t in self.subdag.tasks if t.pool == self.pool]
             if conflicts:
                 # only query for pool conflicts if one may exist
-                pool = session.query(Pool).filter(Pool.slots == 1).filter(Pool.pool == self.pool).first()
+                pool = session.scalar(select(Pool).where(Pool.slots == 1, Pool.pool == self.pool))

Review Comment:
   Also here, should we apply a limit to get the first item



##########
airflow/www/security.py:
##########
@@ -524,10 +524,8 @@ def _merge_perm(self, action_name: str, resource_name: str) -> None:
         resource = self.get_resource(resource_name)
         perm = None
         if action and resource:
-            perm = (
-                self.appbuilder.get_session.query(self.permission_model)
-                .filter_by(action=action, resource=resource)
-                .first()
+            perm = self.appbuilder.get_session.scalar(
+                select(self.permission_model).filter_by(action=action, resource=resource)

Review Comment:
   apply `limit(1)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #32883: Refactor Sqlalchemy queries to 2.0 style (Part 7)

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis commented on code in PR #32883:
URL: https://github.com/apache/airflow/pull/32883#discussion_r1299817038


##########
airflow/datasets/manager.py:
##########
@@ -52,7 +52,7 @@ def register_dataset_change(
         For local datasets, look them up, record the dataset event, queue dagruns, and broadcast
         the dataset event
         """
-        dataset_model = session.query(DatasetModel).filter(DatasetModel.uri == dataset.uri).one_or_none()
+        dataset_model = session.scalar(select(DatasetModel).where(DatasetModel.uri == dataset.uri))

Review Comment:
   ```suggestion
           dataset_model = session.execute(
               select(DatasetModel).where(DatasetModel.uri == dataset.uri)
           ).scalar_one_or_none()
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] phanikumv commented on a diff in pull request #32883: Refactor Sqlalchemy queries to 2.0 style (Part 7)

Posted by "phanikumv (via GitHub)" <gi...@apache.org>.
phanikumv commented on code in PR #32883:
URL: https://github.com/apache/airflow/pull/32883#discussion_r1299857217


##########
airflow/utils/sqlalchemy.py:
##########
@@ -436,7 +436,7 @@ def lock_rows(query: Query, session: Session) -> Generator[None, None, None]:
 
     :meta private:
     """
-    locked_rows = with_row_locks(query, session).all()
+    locked_rows = with_row_locks(query, session)

Review Comment:
   yes it is necessary, removed it as few other test were failing without this change



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] phanikumv merged pull request #32883: Refactor Sqlalchemy queries to 2.0 style (Part 7)

Posted by "phanikumv (via GitHub)" <gi...@apache.org>.
phanikumv merged PR #32883:
URL: https://github.com/apache/airflow/pull/32883


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org