You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "dondaum (via GitHub)" <gi...@apache.org> on 2024/04/24 21:26:42 UTC

[PR] fix: sqa deprecations for airflow task cmd [airflow]

dondaum opened a new pull request, #39244:
URL: https://github.com/apache/airflow/pull/39244

   <!--
    Licensed to the Apache Software Foundation (ASF) under one
    or more contributor license agreements.  See the NOTICE file
    distributed with this work for additional information
    regarding copyright ownership.  The ASF licenses this file
    to you under the Apache License, Version 2.0 (the
    "License"); you may not use this file except in compliance
    with the License.  You may obtain a copy of the License at
   
      http://www.apache.org/licenses/LICENSE-2.0
   
    Unless required by applicable law or agreed to in writing,
    software distributed under the License is distributed on an
    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    KIND, either express or implied.  See the License for the
    specific language governing permissions and limitations
    under the License.
    -->
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of an existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   related: #28723
   
   fix deprecations for SQLAlchemy 2.0 for Airflow core task command.
   
   SQLAlchemy 2.0 is changing the behavior when an object is being merged into a Session along the backref cascade. Until SQLAlchemy 1.4 and assuming a bidirectional relationship between a TaskInstance and a DagRun, if a DagRun object is already in a Session the TaskInstance object gets put into the Session as well. This behavior is deprecated for [removal](https://docs.sqlalchemy.org/en/20/changelog/migration_14.html#cascade-backrefs-behavior-deprecated-for-removal-in-2-0) in SQLAlchemy 2.0.
   
   In order to mentain the actual behavior and to fix the warning, we need to ensure that both objects are either not in the session or are in the session when they are associated with each other. See [here](https://github.com/sqlalchemy/sqlalchemy/discussions/7693) for more information.
   
   ### Reported in core
   - [x] airflow/cli/commands/task_command.py:202
   
   
   
   <!-- Please keep an empty line above the dashes. -->
   ---
   **^ Add meaningful description above**
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)** for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a newsfragment file, named `{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] fix: sqa deprecations for airflow task cmd [airflow]

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis merged PR #39244:
URL: https://github.com/apache/airflow/pull/39244


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] fix: sqa deprecations for airflow task cmd [airflow]

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #39244:
URL: https://github.com/apache/airflow/pull/39244#discussion_r1580524472


##########
airflow/cli/commands/task_command.py:
##########
@@ -199,6 +199,8 @@ def _get_ti_db_access(
             )
         # TODO: Validate map_index is in range?
         ti = TaskInstance(task, run_id=dag_run.run_id, map_index=map_index)
+        if create_if_necessary == "db":
+            session.add(ti)

Review Comment:
   Yeah that check looks much sensible to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] fix: sqa deprecations for airflow task cmd [airflow]

Posted by "dondaum (via GitHub)" <gi...@apache.org>.
dondaum commented on code in PR #39244:
URL: https://github.com/apache/airflow/pull/39244#discussion_r1580726639


##########
airflow/cli/commands/task_command.py:
##########
@@ -199,6 +199,8 @@ def _get_ti_db_access(
             )
         # TODO: Validate map_index is in range?
         ti = TaskInstance(task, run_id=dag_run.run_id, map_index=map_index)
+        if create_if_necessary == "db":
+            session.add(ti)

Review Comment:
   Changed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] fix: sqa deprecations for airflow task cmd [airflow]

Posted by "dondaum (via GitHub)" <gi...@apache.org>.
dondaum commented on PR #39244:
URL: https://github.com/apache/airflow/pull/39244#issuecomment-2077440979

   > is there any way test the same?
   
   Yeah it would be great to have a test for it. 
   
   A good test should test the current behavior in SQLAlchemy 1.4 against 2.0. 
   
   I guess locally one could install SQLAlchemy 2.0 and run it against. Perhaps we could also add a new CI workflow that upgrades to SQLAlchemy 2.0 and run all tests ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] fix: sqa deprecations for airflow task cmd [airflow]

Posted by "dondaum (via GitHub)" <gi...@apache.org>.
dondaum commented on code in PR #39244:
URL: https://github.com/apache/airflow/pull/39244#discussion_r1579606851


##########
airflow/cli/commands/task_command.py:
##########
@@ -199,6 +199,8 @@ def _get_ti_db_access(
             )
         # TODO: Validate map_index is in range?
         ti = TaskInstance(task, run_id=dag_run.run_id, map_index=map_index)
+        if create_if_necessary == "db":
+            session.add(ti)

Review Comment:
   Thanks for the review.
   
   I am not sure if I got your comment right. 
   
   I assume you are asking if we can implement it in a generic way that doesn't depend on `create_if_necessary` as if there is a change this might re-open the issue again? 
   
   If correct, yes we could check in general if the DagRun object is in session:
   
   ```Python
   
   if dag_run in session:
       session.add(ti)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] fix: sqa deprecations for airflow task cmd [airflow]

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #39244:
URL: https://github.com/apache/airflow/pull/39244#discussion_r1578810106


##########
airflow/cli/commands/task_command.py:
##########
@@ -199,6 +199,8 @@ def _get_ti_db_access(
             )
         # TODO: Validate map_index is in range?
         ti = TaskInstance(task, run_id=dag_run.run_id, map_index=map_index)
+        if create_if_necessary == "db":
+            session.add(ti)

Review Comment:
   Would this not just delay this until when we need to handle `create_if_necessary == "memory"`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] fix: sqa deprecations for airflow task cmd [airflow]

Posted by "dondaum (via GitHub)" <gi...@apache.org>.
dondaum commented on PR #39244:
URL: https://github.com/apache/airflow/pull/39244#issuecomment-2078992353

   > is there any way test the same?
   
   
   
   > > is there any way test the same?
   > 
   > Yeah it would be great to have a test for it.
   > 
   > A good test should test the current behavior in SQLAlchemy 1.4 against 2.0.
   > 
   > I guess locally one could install SQLAlchemy 2.0 and run it against. Perhaps we could also add a new CI workflow that upgrades to SQLAlchemy 2.0 and run all tests ?
   
   I tried to make all tests run with SQLAlchemy 2.0 but it is a lot of effort to adjust and fix everything (to make Airflow core compatible with SQLAlchemy 2.0). 
   
   Instead I created a small test setup and verified that the fix indeed preserve the current behavior.
   
   **SQLAlchemy 1.4**
   ```Python
   
   from sqlalchemy import Column, Integer, String, create_engine, text, ForeignKeyConstraint, UniqueConstraint, select
   from sqlalchemy.orm import Session, declarative_base, relationship
   
   
   Base = declarative_base()
   
   
   
   class TaskInstance(Base):
       __tablename__ = "task_instance"
   
       task_id = Column(String(50), primary_key=True, nullable=False)
       dag_id = Column(String(50), primary_key=True, nullable=False)
       run_id = Column(String(50), primary_key=True, nullable=False)
       map_index = Column(Integer, primary_key=True, nullable=False, server_default=text("-1"))
   
       __table_args__ = (
           ForeignKeyConstraint(
               [dag_id, run_id],
               ["dag_run.dag_id", "dag_run.run_id"],
               name="task_instance_dag_run_fkey",
               ondelete="CASCADE",
           ),
       )
   
       dag_run = relationship("DagRun", back_populates="task_instances", lazy="joined", innerjoin=True)
   
   class DagRun(Base):
       __tablename__ = "dag_run"
   
       id = Column(Integer, primary_key=True)
       dag_id = Column(String(50), nullable=False)
       run_id = Column(String(50), nullable=False)
       
       task_instances = relationship(
           TaskInstance, back_populates="dag_run", cascade="save-update, merge, delete, delete-orphan"
       )
   
       __table_args__ = (
           UniqueConstraint("dag_id", "run_id", name="dag_run_dag_id_run_id_key"),
       )
   
   
   
   engine = create_engine("sqlite://", echo=False, future=True)
   Base.metadata.drop_all(engine)
   Base.metadata.create_all(engine)
   
   
   with Session(engine) as session:
       dag_run = DagRun(id=1, dag_id="dag_1", run_id="id_1")
       session.add(dag_run)
       session.commit()
   
   
   # Simulate current behavior
   with Session(engine) as session:
       dag_run = session.scalars(select(DagRun).where(DagRun.id == 1)).one()
       print(dag_run)
       print("Dag run in session:", dag_run in session)
   
   
       ti = TaskInstance(task_id="ti_1", dag_id="dag_1", dag_run=dag_run)
       print("TaskInstance in session:", ti in session)
   
       session.commit()
   
   
   # Check if task instance is in db
   with Session(engine) as session:
       all_tis = session.scalars(select(TaskInstance).where(TaskInstance.dag_id == "dag_1")).all()
       print(all_tis)
   
   ```
   
   **SQLAlchemy 1.4 output**
   ```shell
   vscode ➜ /workspaces/sqa2backref/backpop $ python sqa_1_4.py 
   <__main__.DagRun object at 0x7f68a696c0d0>
   Dag run in session: True
   /workspaces/sqa2backref/backpop/sqa_1_4.py:63: RemovedIn20Warning: Deprecated API features detected! These feature(s) are not compatible with SQLAlchemy 2.0. To prevent incompatible upgrades prior to updating applications, ensure requirements files are pinned to "sqlalchemy<2.0". Set environment variable SQLALCHEMY_WARN_20=1 to show all deprecation warnings.  Set environment variable SQLALCHEMY_SILENCE_UBER_WARNING=1 to silence this message. (Background on SQLAlchemy 2.0 at: https://sqlalche.me/e/b8d9)
     ti = TaskInstance(task_id="ti_1", dag_id="dag_1", dag_run=dag_run)
   TaskInstance in session: True
   [<__main__.TaskInstance object at 0x7f68a6992c90>
   ```
   
   **SQLAlchemy 2.0**
   ```Python
   
   from sqlalchemy import Column, Integer, String, create_engine, text, ForeignKeyConstraint, UniqueConstraint, select
   from sqlalchemy.orm import Session, declarative_base, relationship
   
   
   Base = declarative_base()
   
   
   
   class TaskInstance(Base):
       __tablename__ = "task_instance"
   
       task_id = Column(String(50), primary_key=True, nullable=False)
       dag_id = Column(String(50), primary_key=True, nullable=False)
       run_id = Column(String(50), primary_key=True, nullable=False)
       map_index = Column(Integer, primary_key=True, nullable=False, server_default=text("-1"))
   
       __table_args__ = (
           ForeignKeyConstraint(
               [dag_id, run_id],
               ["dag_run.dag_id", "dag_run.run_id"],
               name="task_instance_dag_run_fkey",
               ondelete="CASCADE",
           ),
       )
   
       dag_run = relationship("DagRun", back_populates="task_instances", lazy="joined", innerjoin=True)
   
   class DagRun(Base):
       __tablename__ = "dag_run"
   
       id = Column(Integer, primary_key=True)
       dag_id = Column(String(50), nullable=False)
       run_id = Column(String(50), nullable=False)
       
       task_instances = relationship(
           TaskInstance, back_populates="dag_run", cascade="save-update, merge, delete, delete-orphan"
       )
   
       __table_args__ = (
           UniqueConstraint("dag_id", "run_id", name="dag_run_dag_id_run_id_key"),
       )
   
   
   
   engine = create_engine("sqlite://", echo=False, future=True)
   Base.metadata.drop_all(engine)
   Base.metadata.create_all(engine)
   
   
   with Session(engine) as session:
       dag_run = DagRun(id=1, dag_id="dag_1", run_id="id_1")
       session.add(dag_run)
       session.commit()
   
   
   # Simulate current behavior
   with Session(engine) as session:
       dag_run = session.scalars(select(DagRun).where(DagRun.id == 1)).one()
       print(dag_run)
       print("Dag run in session:", dag_run in session)
   
   
       ti = TaskInstance(task_id="ti_1", dag_id="dag_1", dag_run=dag_run)
       print("TaskInstance in session:", ti in session)
   
       session.commit()
   
   
   # Check if task instance is in db
   with Session(engine) as session:
       all_tis = session.scalars(select(TaskInstance).where(TaskInstance.dag_id == "dag_1")).all()
       print(all_tis)
   
   ```
   
   **SQLAlchemy 2.0 output**
   ```shell
   vscode ➜ /workspaces/sqa2backref/backpop $ python sqa_2_0.py 
   <__main__.DagRun object at 0x7f24bad1f850>
   Dag run in session: True
   TaskInstance in session: False
   []
   ```
   
   **SQLAlchemy 2.0 with fix**
   ```Python
   from sqlalchemy import Column, Integer, String, create_engine, text, ForeignKeyConstraint, UniqueConstraint, select
   from sqlalchemy.orm import Session, declarative_base, relationship
   
   
   Base = declarative_base()
   
   
   
   class TaskInstance(Base):
       __tablename__ = "task_instance"
   
       task_id = Column(String(50), primary_key=True, nullable=False)
       dag_id = Column(String(50), primary_key=True, nullable=False)
       run_id = Column(String(50), primary_key=True, nullable=False)
       map_index = Column(Integer, primary_key=True, nullable=False, server_default=text("-1"))
   
       __table_args__ = (
           ForeignKeyConstraint(
               [dag_id, run_id],
               ["dag_run.dag_id", "dag_run.run_id"],
               name="task_instance_dag_run_fkey",
               ondelete="CASCADE",
           ),
       )
   
       dag_run = relationship("DagRun", back_populates="task_instances", lazy="joined", innerjoin=True)
   
   class DagRun(Base):
       __tablename__ = "dag_run"
   
       id = Column(Integer, primary_key=True)
       dag_id = Column(String(50), nullable=False)
       run_id = Column(String(50), nullable=False)
       
       task_instances = relationship(
           TaskInstance, back_populates="dag_run", cascade="save-update, merge, delete, delete-orphan"
       )
   
       __table_args__ = (
           UniqueConstraint("dag_id", "run_id", name="dag_run_dag_id_run_id_key"),
       )
   
   
   
   engine = create_engine("sqlite://", echo=False, future=True)
   Base.metadata.drop_all(engine)
   Base.metadata.create_all(engine)
   
   
   with Session(engine) as session:
       dag_run = DagRun(id=1, dag_id="dag_1", run_id="id_1")
       session.add(dag_run)
       session.commit()
   
   
   # Simulate current behavior
   with Session(engine) as session:
       dag_run = session.scalars(select(DagRun).where(DagRun.id == 1)).one()
       print(dag_run)
       print("Dag run in session:", dag_run in session)
   
   
       ti = TaskInstance(task_id="ti_1", dag_id="dag_1", dag_run=dag_run)
       print("TaskInstance in session:", ti in session)
   
       session.add(ti)  # <-- fix
   
       session.commit()
   
   
   # Check if task instance is in db
   with Session(engine) as session:
       all_tis = session.scalars(select(TaskInstance).where(TaskInstance.dag_id == "dag_1")).all()
       print(all_tis)
   ```
   
   **SQLAlchemy 2.0 with fix output**
   ```shell
   vscode ➜ /workspaces/sqa2backref/backpop $ python sqa_2_0_fix.py
   <__main__.DagRun object at 0x7efc93103e10>
   Dag run in session: True
   TaskInstance in session: False
   [<__main__.TaskInstance object at 0x7efc937081d0>
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org