You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Taragolis (via GitHub)" <gi...@apache.org> on 2023/08/25 09:07:29 UTC

[GitHub] [airflow] Taragolis opened a new pull request, #33720: Optimise and migrate to SA2-compatible syntax for TaskReschedule

Taragolis opened a new pull request, #33720:
URL: https://github.com/apache/airflow/pull/33720

   <!--
    Licensed to the Apache Software Foundation (ASF) under one
    or more contributor license agreements.  See the NOTICE file
    distributed with this work for additional information
    regarding copyright ownership.  The ASF licenses this file
    to you under the Apache License, Version 2.0 (the
    "License"); you may not use this file except in compliance
    with the License.  You may obtain a copy of the License at
   
      http://www.apache.org/licenses/LICENSE-2.0
   
    Unless required by applicable law or agreed to in writing,
    software distributed under the License is distributed on an
    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    KIND, either express or implied.  See the License for the
    specific language governing permissions and limitations
    under the License.
    -->
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of an existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   
   
   <!-- Please keep an empty line above the dashes. -->
   ---
   **^ Add meaningful description above**
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a newsfragment file, named `{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #33720: Optimise and migrate to SA2-compatible syntax for TaskReschedule

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis commented on code in PR #33720:
URL: https://github.com/apache/airflow/pull/33720#discussion_r1310041422


##########
airflow/models/taskreschedule.py:
##########
@@ -142,9 +154,93 @@ def find_for_task_instance(
         :param try_number: Look for TaskReschedule of the given try_number. Default is None which
             looks for the same try_number of the given task_instance.
         """
-        return TaskReschedule.query_for_task_instance(
-            task_instance, session=session, try_number=try_number
-        ).all()
+        return session.scalars(cls.stmt_for_task_instance(task_instance, try_number=try_number)).all()
+
+    @classmethod
+    @provide_session
+    def find_last_for_task_instance(

Review Comment:
   Nice! I just need to figure out how to deal with tests now, I'm not a fan of mocking DB execution if we could avoid it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #33720: Optimise and migrate to SA2-compatible syntax for TaskReschedule

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #33720:
URL: https://github.com/apache/airflow/pull/33720#discussion_r1310023089


##########
airflow/models/taskreschedule.py:
##########
@@ -142,9 +154,93 @@ def find_for_task_instance(
         :param try_number: Look for TaskReschedule of the given try_number. Default is None which
             looks for the same try_number of the given task_instance.
         """
-        return TaskReschedule.query_for_task_instance(
-            task_instance, session=session, try_number=try_number
-        ).all()
+        return session.scalars(cls.stmt_for_task_instance(task_instance, try_number=try_number)).all()
+
+    @classmethod
+    @provide_session
+    def find_last_for_task_instance(

Review Comment:
   You don’t even need a private method, just use `with create_session()`, that’s what `@provide_session` does anyway.



##########
airflow/models/taskreschedule.py:
##########
@@ -142,9 +154,93 @@ def find_for_task_instance(
         :param try_number: Look for TaskReschedule of the given try_number. Default is None which
             looks for the same try_number of the given task_instance.
         """
-        return TaskReschedule.query_for_task_instance(
-            task_instance, session=session, try_number=try_number
-        ).all()
+        return session.scalars(cls.stmt_for_task_instance(task_instance, try_number=try_number)).all()
+
+    @classmethod
+    @provide_session
+    def find_last_for_task_instance(

Review Comment:
   You don’t even need a private method, just use `with create_session()`, that’s what `@provide_session` does anyway.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on pull request #33720: Optimise and migrate to SA2-compatible syntax for TaskReschedule

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis commented on PR #33720:
URL: https://github.com/apache/airflow/pull/33720#issuecomment-1694738424

   Interesting errors which happen on, it might refers either and error in code during change behaviour or the fact that something wrong with test on Postgres. Locally I've also have this error, wit flakey behaviour: 5-10 sequential fail, 2-3 success, and again failure
   
   ```console
   FAILED tests/sensors/test_base.py::TestBaseSensor::test_ok_with_reschedule - AssertionError: assert datetime.datetime(2023, 8, 27, 16, 39, 45, 511220, tzinfo=Timezone('UTC')) == datetime.datetime(2023, 8, 27, 16, 39, 55, 511220, tzinfo=Timezone('UTC'))
    +  where datetime.datetime(2023, 8, 27, 16, 39, 45, 511220, tzinfo=Timezone('UTC')) = <airflow.models.taskreschedule.TaskReschedule object at 0x7fee0a9bdf70>.start_date
   FAILED tests/sensors/test_base.py::TestBaseSensor::test_ok_with_custom_reschedule_exception - AssertionError: assert datetime.datetime(2023, 8, 27, 16, 39, 46, 338455, tzinfo=Timezone('UTC')) == datetime.datetime(2023, 8, 27, 16, 40, 46, 338455, tzinfo=Timezone('UTC'))
    +  where datetime.datetime(2023, 8, 27, 16, 39, 46, 338455, tzinfo=Timezone('UTC')) = <airflow.models.taskreschedule.TaskReschedule object at 0x7fee0a900e80>.start_date
   FAILED tests/sensors/test_base.py::TestBaseSensor::test_reschedule_and_retry_timeout - Failed: DID NOT RAISE <class 'airflow.exceptions.AirflowSensorTimeout'>
   FAILED tests/sensors/test_base.py::TestBaseSensor::test_reschedule_and_retry_timeout_and_silent_fail - Failed: DID NOT RAISE <class 'airflow.exceptions.AirflowSensorTimeout'>
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #33720: Optimise and migrate to SA2-compatible syntax for TaskReschedule

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis commented on code in PR #33720:
URL: https://github.com/apache/airflow/pull/33720#discussion_r1311284425


##########
airflow/models/taskreschedule.py:
##########
@@ -142,9 +154,93 @@ def find_for_task_instance(
         :param try_number: Look for TaskReschedule of the given try_number. Default is None which
             looks for the same try_number of the given task_instance.
         """
-        return TaskReschedule.query_for_task_instance(
-            task_instance, session=session, try_number=try_number
-        ).all()
+        return session.scalars(cls.stmt_for_task_instance(task_instance, try_number=try_number)).all()
+
+    @classmethod
+    @provide_session
+    def find_last_for_task_instance(

Review Comment:
   Actually might be keep only `find_task_reschedule_for_ti` and use it again, WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #33720: Optimise and migrate to SA2-compatible syntax for TaskReschedule

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis commented on code in PR #33720:
URL: https://github.com/apache/airflow/pull/33720#discussion_r1310007883


##########
airflow/models/taskreschedule.py:
##########
@@ -142,9 +154,93 @@ def find_for_task_instance(
         :param try_number: Look for TaskReschedule of the given try_number. Default is None which
             looks for the same try_number of the given task_instance.
         """
-        return TaskReschedule.query_for_task_instance(
-            task_instance, session=session, try_number=try_number
-        ).all()
+        return session.scalars(cls.stmt_for_task_instance(task_instance, try_number=try_number)).all()
+
+    @classmethod
+    @provide_session
+    def find_last_for_task_instance(

Review Comment:
   I've implement by this way due to difference between legacy Query API and new Statement Based (i don't know correct name): you don't have to provide a DB session for build query statement
   
   We could directly use `stmt_for_task_instance` into
   - airflow/models/taskinstance.py
   - airflow/ti_deps/deps/ready_to_reschedule.py
   
   Unfortunetly we can't do that in `airflow/sensors/base.py`, because we do not open session here, however it is not a showstopper, if applicable I could create private method in `BaseSensorOperator` which would open a session, execute query against `stmt_for_task_instance` and return start_date



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #33720: Optimise and migrate to SA2-compatible syntax for TaskReschedule

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis commented on code in PR #33720:
URL: https://github.com/apache/airflow/pull/33720#discussion_r1305460117


##########
airflow/models/taskreschedule.py:
##########
@@ -94,18 +96,16 @@ def __init__(
         self.reschedule_date = reschedule_date
         self.duration = (self.end_date - self.start_date).total_seconds()
 
-    @staticmethod
-    @provide_session
+    @classmethod
     def query_for_task_instance(
+        cls,

Review Comment:
   I hope this class-methods not part of public interface, and we can change signature for migrate out of legacy SQLAlchemy Query API



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #33720: Optimise and migrate to SA2-compatible syntax for TaskReschedule

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #33720:
URL: https://github.com/apache/airflow/pull/33720#discussion_r1311135333


##########
airflow/models/taskreschedule.py:
##########
@@ -97,6 +100,38 @@ def __init__(
         self.reschedule_date = reschedule_date
         self.duration = (self.end_date - self.start_date).total_seconds()
 
+    @classmethod
+    def stmt_for_task_instance(
+        cls,
+        ti: TaskInstance,
+        *,
+        try_number: int | None = None,
+        descending: bool = False,
+    ) -> Select:
+        """
+        Return Statement for task reschedules for a given the task instance.

Review Comment:
   ```suggestion
           Return tatement for task reschedules for a given the task instance.
   ```
   
   or just
   
   ```suggestion
           Statement for task reschedules for a given the task instance.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on pull request #33720: Optimise and migrate to SA2-compatible syntax for TaskReschedule

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis commented on PR #33720:
URL: https://github.com/apache/airflow/pull/33720#issuecomment-1697845252

   >  wit flakey behaviour: 5-10 sequential fail, 2-3 success, and again failure
   
   🤦 🤦 🤦 🤦 🤦 🤦 🤦 🤦 🤦 🤦 🤦 🤦 🤦 
   I've forgot about assignments: `stmt = stmt.order_by()` instead of `stmt.order_by()`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Optimise and migrate to SA2-compatible syntax for TaskReschedule [airflow]

Posted by "phanikumv (via GitHub)" <gi...@apache.org>.
phanikumv commented on PR #33720:
URL: https://github.com/apache/airflow/pull/33720#issuecomment-1764609223

   @Taragolis could you please resolve the conflict here


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #33720: Optimise and migrate to SA2-compatible syntax for TaskReschedule

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis commented on code in PR #33720:
URL: https://github.com/apache/airflow/pull/33720#discussion_r1306689074


##########
tests/ti_deps/deps/test_ready_to_reschedule_dep.py:
##########
@@ -64,118 +76,127 @@ def _get_mapped_task_reschedule(self, reschedule_date):
         )
         return reschedule
 
-    def test_should_pass_if_ignore_in_reschedule_period_is_set(self):
+    def test_should_pass_if_ignore_in_reschedule_period_is_set(self, mocked_find_last_for_task_instance):
+        mocked_find_last_for_task_instance.side_effect = NotExpectedCall
         ti = self._get_task_instance(State.UP_FOR_RESCHEDULE)
         dep_context = DepContext(ignore_in_reschedule_period=True)
         assert ReadyToRescheduleDep().is_met(ti=ti, dep_context=dep_context)
 
-    def test_should_pass_if_not_reschedule_mode(self):
+    def test_should_pass_if_not_reschedule_mode(self, mocked_find_last_for_task_instance):
+        mocked_find_last_for_task_instance.side_effect = NotExpectedCall
         ti = self._get_task_instance(State.UP_FOR_RESCHEDULE)
         del ti.task.reschedule
         assert ReadyToRescheduleDep().is_met(ti=ti)
 
-    def test_should_pass_if_not_in_none_state(self):
+    def test_should_pass_if_not_in_none_state(self, mocked_find_last_for_task_instance):
+        mocked_find_last_for_task_instance.side_effect = NotExpectedCall
         ti = self._get_task_instance(State.UP_FOR_RETRY)
         assert ReadyToRescheduleDep().is_met(ti=ti)
 
-    @patch("airflow.models.taskreschedule.TaskReschedule.query_for_task_instance")
-    def test_should_pass_if_no_reschedule_record_exists(self, mock_query_for_task_instance):
-        mock_query_for_task_instance.return_value.with_entities.return_value.first.return_value = []
+    def test_should_pass_if_no_reschedule_record_exists(self, mocked_find_last_for_task_instance):
+        mocked_find_last_for_task_instance.return_value = None
         ti = self._get_task_instance(State.NONE)
         assert ReadyToRescheduleDep().is_met(ti=ti)
 
-    @patch("airflow.models.taskreschedule.TaskReschedule.query_for_task_instance")
-    def test_should_pass_after_reschedule_date_one(self, mock_query_for_task_instance):
-        mock_query_for_task_instance.return_value.with_entities.return_value.first.return_value = (
-            self._get_task_reschedule(utcnow() - timedelta(minutes=1))
+    def test_should_pass_after_reschedule_date_one(self, mocked_find_last_for_task_instance):
+        mocked_find_last_for_task_instance.return_value = self._get_task_reschedule(
+            utcnow() - timedelta(minutes=1)
         )
         ti = self._get_task_instance(State.UP_FOR_RESCHEDULE)
         assert ReadyToRescheduleDep().is_met(ti=ti)
 
-    @patch("airflow.models.taskreschedule.TaskReschedule.query_for_task_instance")
-    def test_should_pass_after_reschedule_date_multiple(self, mock_query_for_task_instance):
-        mock_query_for_task_instance.return_value.with_entities.return_value.first.return_value = [
+    def test_should_pass_after_reschedule_date_multiple(self, mocked_find_last_for_task_instance):
+        mocked_find_last_for_task_instance.side_effect = [
             self._get_task_reschedule(utcnow() - timedelta(minutes=21)),
             self._get_task_reschedule(utcnow() - timedelta(minutes=11)),
             self._get_task_reschedule(utcnow() - timedelta(minutes=1)),
-        ][-1]
+        ]
         ti = self._get_task_instance(State.UP_FOR_RESCHEDULE)
+        # All TaskReschedules meet requirements
+        assert ReadyToRescheduleDep().is_met(ti=ti)
+        assert ReadyToRescheduleDep().is_met(ti=ti)
         assert ReadyToRescheduleDep().is_met(ti=ti)

Review Comment:
   I'm not sure what kind of behaviour expected here before, so I've change test for emulate sequential query over time



##########
airflow/models/taskreschedule.py:
##########
@@ -142,9 +154,93 @@ def find_for_task_instance(
         :param try_number: Look for TaskReschedule of the given try_number. Default is None which
             looks for the same try_number of the given task_instance.
         """
-        return TaskReschedule.query_for_task_instance(
-            task_instance, session=session, try_number=try_number
-        ).all()
+        return session.scalars(cls.stmt_for_task_instance(task_instance, try_number=try_number)).all()
+
+    @classmethod
+    @provide_session
+    def find_last_for_task_instance(
+        cls,
+        task_instance: TaskInstance,
+        session: Session = NEW_SESSION,
+        try_number: int | None = None,
+    ) -> TaskReschedule | None:
+        """
+        Return last task reschedule for the task instance and try number.
+
+        :param session: the database session object
+        :param task_instance: the task instance to find task reschedules for
+        :param try_number: Look for TaskReschedule of the given try_number. Default is None which
+            looks for the same try_number of the given task_instance.
+
+        :meta private:
+        """
+        return session.scalar(cls.stmt_for_task_instance(task_instance, try_number=try_number, limit=1))
+
+    @classmethod
+    @provide_session
+    def find_date_for_task_instance(
+        cls,
+        task_instance: TaskInstance,
+        *,
+        try_number: int | None = None,
+        find_end_date: bool = False,
+        session: Session = NEW_SESSION,
+    ):
+        """
+        Return date for task reschedule for the task instance and try number.
+
+        :param task_instance: the task instance to find task reschedules for
+        :param try_number: Look for TaskReschedule of the given try_number. Default is None which
+            looks for the same try_number of the given task_instance.
+        :param find_end_date: Should return `end_date` or `start_date`?
+        :param session: the database session object
+
+        :meta private:
+        """
+        return session.scalar(
+            cls.stmt_for_task_instance(
+                task_instance, try_number=try_number, descending=find_end_date, limit=1
+            ).with_only_columns(cls.start_date if not find_end_date else cls.start_date)
+        )
+
+    @staticmethod
+    @provide_session
+    def query_for_task_instance(
+        task_instance: TaskInstance,
+        descending: bool = False,
+        session: Session = NEW_SESSION,
+        try_number: int | None = None,
+    ) -> Query:
+        """
+        Return query for task reschedules for a given the task instance (deprecated).
+
+        :param session: the database session object
+        :param task_instance: the task instance to find task reschedules for
+        :param descending: If True then records are returned in descending order
+        :param try_number: Look for TaskReschedule of the given try_number. Default is None which
+            looks for the same try_number of the given task_instance.
+        """
+        warnings.warn(
+            "`query_for_task_instance` use SQLAlchemy's Legacy Query API.",
+            category=RemovedInAirflow3Warning,
+            stacklevel=2,
+        )
+
+        if try_number is None:
+            try_number = task_instance.try_number
+
+        TR = TaskReschedule
+        qry = session.query(TR).filter(
+            TR.dag_id == task_instance.dag_id,
+            TR.task_id == task_instance.task_id,
+            TR.run_id == task_instance.run_id,
+            TR.map_index == task_instance.map_index,
+            TR.try_number == try_number,
+        )
+        if descending:
+            return qry.order_by(desc(TR.id))
+        else:
+            return qry.order_by(asc(TR.id))

Review Comment:
   I've decided to keep `query_for_task_instance` as is, because if some one use it before, than expects that this method return `sqlalchemy.orm.Query` and there is no direct replacement for this object, which are implements same methods.
   
   If we think that this part never been a part of public interface, we could remove it, this PR also remove usage of this method



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #33720: Optimise and migrate to SA2-compatible syntax for TaskReschedule

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #33720:
URL: https://github.com/apache/airflow/pull/33720#discussion_r1309968634


##########
airflow/models/taskreschedule.py:
##########
@@ -142,9 +154,93 @@ def find_for_task_instance(
         :param try_number: Look for TaskReschedule of the given try_number. Default is None which
             looks for the same try_number of the given task_instance.
         """
-        return TaskReschedule.query_for_task_instance(
-            task_instance, session=session, try_number=try_number
-        ).all()
+        return session.scalars(cls.stmt_for_task_instance(task_instance, try_number=try_number)).all()
+
+    @classmethod
+    @provide_session
+    def find_last_for_task_instance(

Review Comment:
   But I we don’t really need `find_task_reschedule_for_ti` and `find_task_reschedules_for_ti` since those are just one liners that pass most of the arguments (except `session`) to `stmt_for_task_instance` anyway. We can just call `stmt_for_task_instance` and execute the statement as appropriated directly where needed.



##########
airflow/models/taskreschedule.py:
##########
@@ -142,9 +154,93 @@ def find_for_task_instance(
         :param try_number: Look for TaskReschedule of the given try_number. Default is None which
             looks for the same try_number of the given task_instance.
         """
-        return TaskReschedule.query_for_task_instance(
-            task_instance, session=session, try_number=try_number
-        ).all()
+        return session.scalars(cls.stmt_for_task_instance(task_instance, try_number=try_number)).all()
+
+    @classmethod
+    @provide_session
+    def find_last_for_task_instance(

Review Comment:
   But I we don’t really need `find_task_reschedule_for_ti` and `find_task_reschedules_for_ti` since those are just one liners that pass most of the arguments (except `session`) to `stmt_for_task_instance` anyway. We can just call `stmt_for_task_instance` and execute the statement as appropriated directly where needed.



##########
airflow/models/taskreschedule.py:
##########
@@ -142,9 +154,93 @@ def find_for_task_instance(
         :param try_number: Look for TaskReschedule of the given try_number. Default is None which
             looks for the same try_number of the given task_instance.
         """
-        return TaskReschedule.query_for_task_instance(
-            task_instance, session=session, try_number=try_number
-        ).all()
+        return session.scalars(cls.stmt_for_task_instance(task_instance, try_number=try_number)).all()
+
+    @classmethod
+    @provide_session
+    def find_last_for_task_instance(

Review Comment:
   But I mean we don’t really need `find_task_reschedule_for_ti` and `find_task_reschedules_for_ti` since those are just one liners that pass most of the arguments (except `session`) to `stmt_for_task_instance` anyway. We can just call `stmt_for_task_instance` and execute the statement as appropriated directly where needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #33720: Optimise and migrate to SA2-compatible syntax for TaskReschedule

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis commented on code in PR #33720:
URL: https://github.com/apache/airflow/pull/33720#discussion_r1309348422


##########
airflow/models/taskreschedule.py:
##########
@@ -142,9 +154,93 @@ def find_for_task_instance(
         :param try_number: Look for TaskReschedule of the given try_number. Default is None which
             looks for the same try_number of the given task_instance.
         """
-        return TaskReschedule.query_for_task_instance(
-            task_instance, session=session, try_number=try_number
-        ).all()
+        return session.scalars(cls.stmt_for_task_instance(task_instance, try_number=try_number)).all()
+
+    @classmethod
+    @provide_session
+    def find_last_for_task_instance(

Review Comment:
   I change logic a bit methods
   
   New (all marked as private):
   `stmt_for_task_instance` -> Query builder
   `find_task_reschedule_for_ti` -> Return single task_reschedule record, first or last
   `find_task_reschedules_for_ti` -> replacement for `find_for_task_instance`, not used in production code, only in tests
   
   Deprecated Methods, for backward compatibility
   `query_for_task_instance`
   `find_for_task_instance`
   
   In general only `find_task_reschedule_for_ti` and `stmt_for_task_instance` uses, all other are redundant, and in theory could be dropped. However `find_task_reschedules_for_ti` help me to find my mistake during tests



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #33720: Optimise and migrate to SA2-compatible syntax for TaskReschedule

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis commented on code in PR #33720:
URL: https://github.com/apache/airflow/pull/33720#discussion_r1311257524


##########
airflow/models/taskreschedule.py:
##########
@@ -142,9 +154,93 @@ def find_for_task_instance(
         :param try_number: Look for TaskReschedule of the given try_number. Default is None which
             looks for the same try_number of the given task_instance.
         """
-        return TaskReschedule.query_for_task_instance(
-            task_instance, session=session, try_number=try_number
-        ).all()
+        return session.scalars(cls.stmt_for_task_instance(task_instance, try_number=try_number)).all()
+
+    @classmethod
+    @provide_session
+    def find_last_for_task_instance(

Review Comment:
   I've just thought, are `find_task_reschedule_for_ti` / `find_task_reschedules_for_ti` would be required for AIP-44?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] phanikumv commented on pull request #33720: Optimise and migrate to SA2-compatible syntax for TaskReschedule

Posted by "phanikumv (via GitHub)" <gi...@apache.org>.
phanikumv commented on PR #33720:
URL: https://github.com/apache/airflow/pull/33720#issuecomment-1707058440

   @Taragolis can you please rebase so that we can run all the tests on this PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #33720: Optimise and migrate to SA2-compatible syntax for TaskReschedule

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #33720:
URL: https://github.com/apache/airflow/pull/33720#discussion_r1306999984


##########
airflow/models/taskreschedule.py:
##########
@@ -142,9 +154,93 @@ def find_for_task_instance(
         :param try_number: Look for TaskReschedule of the given try_number. Default is None which
             looks for the same try_number of the given task_instance.
         """
-        return TaskReschedule.query_for_task_instance(
-            task_instance, session=session, try_number=try_number
-        ).all()
+        return session.scalars(cls.stmt_for_task_instance(task_instance, try_number=try_number)).all()
+
+    @classmethod
+    @provide_session
+    def find_last_for_task_instance(

Review Comment:
   I wonder if we really need this method at all. The logic is rather simple and does not really provide any encapsulation benefits. (Same for `find_for_task_instance` above but it’s already there so there’s probably an argument for keeping it.)



##########
airflow/models/taskreschedule.py:
##########
@@ -142,9 +154,93 @@ def find_for_task_instance(
         :param try_number: Look for TaskReschedule of the given try_number. Default is None which
             looks for the same try_number of the given task_instance.
         """
-        return TaskReschedule.query_for_task_instance(
-            task_instance, session=session, try_number=try_number
-        ).all()
+        return session.scalars(cls.stmt_for_task_instance(task_instance, try_number=try_number)).all()
+
+    @classmethod
+    @provide_session
+    def find_last_for_task_instance(

Review Comment:
   I wonder if we really need this method at all. The logic is rather simple and does not really provide any encapsulation benefits. (Same for `find_for_task_instance` above but it’s already there so there’s probably an argument for keeping it.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] phanikumv commented on pull request #33720: Optimise and migrate to SA2-compatible syntax for TaskReschedule

Posted by "phanikumv (via GitHub)" <gi...@apache.org>.
phanikumv commented on PR #33720:
URL: https://github.com/apache/airflow/pull/33720#issuecomment-1707053777

   We need to run full tests on the changes related to SqlAlchemy 2.0 , hence applied the label


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on pull request #33720: Optimise and migrate to SA2-compatible syntax for TaskReschedule

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis commented on PR #33720:
URL: https://github.com/apache/airflow/pull/33720#issuecomment-1727884105

   > @[Taragolis](https://github.com/Taragolis) Taragolis [3 weeks ago](https://github.com/apache/airflow/pull/33720#discussion_r1311257524)
   I've just thought, are find_task_reschedule_for_ti / find_task_reschedules_for_ti would be required for AIP-44?
   
   > @[Taragolis](https://github.com/Taragolis) Taragolis [3 weeks ago](https://github.com/apache/airflow/pull/33720#discussion_r1311284425)
   Actually might be keep only find_task_reschedule_for_ti and use it again, WDYT?
   
   cc: @potiuk @phanikumv @uranusjr @mhenc @vincbeck 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Optimise and migrate to SA2-compatible syntax for TaskReschedule [airflow]

Posted by "ephraimbuddy (via GitHub)" <gi...@apache.org>.
ephraimbuddy merged PR #33720:
URL: https://github.com/apache/airflow/pull/33720


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #33720: Optimise and migrate to SA2-compatible syntax for TaskReschedule

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis commented on code in PR #33720:
URL: https://github.com/apache/airflow/pull/33720#discussion_r1311216216


##########
tests/models/test_taskinstance.py:
##########
@@ -103,6 +103,15 @@ def test_pool():
         session.rollback()
 
 
+@pytest.fixture
+def task_reschedules_for_ti():
+    def wrapper(ti):
+        with create_session() as session:
+            return session.scalars(TaskReschedule.stmt_for_task_instance(ti=ti, descending=False)).all()
+
+    return wrapper

Review Comment:
   I would be better try to avoid to keep session longer than it required



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #33720: Optimise and migrate to SA2-compatible syntax for TaskReschedule

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #33720:
URL: https://github.com/apache/airflow/pull/33720#discussion_r1311136061


##########
tests/models/test_taskinstance.py:
##########
@@ -103,6 +103,15 @@ def test_pool():
         session.rollback()
 
 
+@pytest.fixture
+def task_reschedules_for_ti():
+    def wrapper(ti):
+        with create_session() as session:
+            return session.scalars(TaskReschedule.stmt_for_task_instance(ti=ti, descending=False)).all()
+
+    return wrapper

Review Comment:
   ```suggestion
   def task_reschedules_for_ti(session):
       def wrapper(ti):
           return session.scalars(TaskReschedule.stmt_for_task_instance(ti=ti, descending=False)).all()
   
       return wrapper
   ```
   
   (I think this should work. `session` here is a function-level fixture.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #33720: Optimise and migrate to SA2-compatible syntax for TaskReschedule

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #33720:
URL: https://github.com/apache/airflow/pull/33720#discussion_r1311136061


##########
tests/models/test_taskinstance.py:
##########
@@ -103,6 +103,15 @@ def test_pool():
         session.rollback()
 
 
+@pytest.fixture
+def task_reschedules_for_ti():
+    def wrapper(ti):
+        with create_session() as session:
+            return session.scalars(TaskReschedule.stmt_for_task_instance(ti=ti, descending=False)).all()
+
+    return wrapper

Review Comment:
   ```suggestion
   def task_reschedules_for_ti(session):
       def wrapper(ti):
           return session.scalars(TaskReschedule.stmt_for_task_instance(ti=ti, descending=False)).all()
   
       return wrapper
   ```
   
   (I think this should work)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #33720: Optimise and migrate to SA2-compatible syntax for TaskReschedule

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on PR #33720:
URL: https://github.com/apache/airflow/pull/33720#issuecomment-1707097752

   > @Taragolis can you please rebase so that we can run all the tests on this PR
   
   I rebased it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org