You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/01/18 10:48:51 UTC

[GitHub] [airflow] ashb opened a new pull request #19808: Refactor dangling row check to use SQLA queries

ashb opened a new pull request #19808:
URL: https://github.com/apache/airflow/pull/19808


   This is a prepaorator refactor to have the move dangling rows
   pre-upgrade check make better use of the SQLA Queries -- this is needed
   because in a future PR we will add a check for dangling XCom rows, and
   that will need to conditionally join against DagRun to get
   execution_date (depending on if it is run pre- or post-2.2).
   
   This has been tested with Postgres 9.6, SQLite, MSSQL 2017 and MySQL 5.7
   
   codespell didn't like `froms` as it thinks it is a typo of forms, and
   most other cases it would be, except here. Codespell doesn't currently
   have a method of ignoring a _single_ line without ignoring the word
   everywhere (which we don't want to do) so I have to ignore the exact
   _line_. Sad panda
   
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #19808: Refactor dangling row check to use SQLA queries

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #19808:
URL: https://github.com/apache/airflow/pull/19808#issuecomment-1020121832


   @ashb This shadowed #19999


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #19808: Refactor dangling row check to use SQLA queries

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #19808:
URL: https://github.com/apache/airflow/pull/19808#discussion_r756411250



##########
File path: airflow/utils/db.py
##########
@@ -794,16 +744,68 @@ def check_run_id_null(session) -> Iterable[str]:
                 reason="with a NULL dag_id, run_id, or execution_date",
             )
             return
-        _move_dangling_run_data_to_new_table(session, dagrun_table, dagrun_dangling_table_name)
+        _move_dangling_data_to_new_table(
+            session,
+            dagrun_table,
+            dagrun_table.select(invalid_dagrun_filter),
+            dagrun_dangling_table_name,
+        )
 
 
-def _move_dangling_task_data_to_new_table(session, source_table: "Table", target_table_name: str):
-    where_clause = """
-        left join dag_run as dr
-        on (source.dag_id = dr.dag_id and source.execution_date = dr.execution_date)
-        where dr.id is null
-    """
-    _move_dangling_table(session, source_table, target_table_name, where_clause)
+def _move_dangling_data_to_new_table(
+    session, source_table: "Table", source_query: "Query", target_table_name: str
+):
+    from sqlalchemy import column, select, table
+    from sqlalchemy.sql.selectable import Join
+
+    bind = session.get_bind()
+    dialect_name = bind.dialect.name
+
+    # First: Create moved rows from new table
+    if dialect_name == "mssql":
+        cte = source_query.cte("source")
+        moved_data_tbl = table(target_table_name, *(column(c.name) for c in cte.columns))
+        ins = moved_data_tbl.insert().from_select(list(cte.columns), select([cte]))
+
+        stmt = ins.compile(bind=session.get_bind())
+        cte_sql = stmt.ctes[cte]
+
+        session.execute(f"WITH {cte_sql} SELECT source.* INTO {target_table_name} FROM source")
+    else:
+        # Postgres, MySQL and SQLite all support the same "create as select"
+        session.execute(
+            f"CREATE TABLE {target_table_name} AS {source_query.selectable.compile(bind=session.get_bind())}"
+        )
+
+    # Second: Now delete rows we've moved
+    try:
+        clause = source_query.whereclause
+    except AttributeError:
+        clause = source_query._whereclause
+
+    if dialect_name == "sqlite":
+        subq = source_query.selectable.with_only_columns([text(f'{source_table}.ROWID')])
+        delete = source_table.delete().where(column('ROWID').in_(subq))
+        print(delete)
+    elif dialect_name in ("mysql", "mssql"):
+        # This is not foolproof! But it works for the limited queries (with no params) that we use here
+        stmt = source_query.selectable
+
+        def _from_name(from_) -> str:
+            if isinstance(from_, Join):
+                return str(from_.compile(bind=bind))
+            return str(from_)

Review comment:
       Yeah, this one wasn't even the worse -- the `.whereclause ... except ._whereclause` annoyed me more.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #19808: Refactor dangling row check to use SQLA queries

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #19808:
URL: https://github.com/apache/airflow/pull/19808#issuecomment-989866029


   @uranusjr Any further  thoughts on this? Should we go with this approach or stick with the string-based we have now?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #19808: Refactor dangling row check to use SQLA queries

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #19808:
URL: https://github.com/apache/airflow/pull/19808#issuecomment-1011949593


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #19808: Refactor dangling row check to use SQLA queries

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #19808:
URL: https://github.com/apache/airflow/pull/19808#discussion_r756410750



##########
File path: airflow/utils/db.py
##########
@@ -794,16 +744,68 @@ def check_run_id_null(session) -> Iterable[str]:
                 reason="with a NULL dag_id, run_id, or execution_date",
             )
             return
-        _move_dangling_run_data_to_new_table(session, dagrun_table, dagrun_dangling_table_name)
+        _move_dangling_data_to_new_table(
+            session,
+            dagrun_table,
+            dagrun_table.select(invalid_dagrun_filter),
+            dagrun_dangling_table_name,
+        )
 
 
-def _move_dangling_task_data_to_new_table(session, source_table: "Table", target_table_name: str):
-    where_clause = """
-        left join dag_run as dr
-        on (source.dag_id = dr.dag_id and source.execution_date = dr.execution_date)
-        where dr.id is null
-    """
-    _move_dangling_table(session, source_table, target_table_name, where_clause)
+def _move_dangling_data_to_new_table(
+    session, source_table: "Table", source_query: "Query", target_table_name: str
+):
+    from sqlalchemy import column, select, table
+    from sqlalchemy.sql.selectable import Join
+
+    bind = session.get_bind()
+    dialect_name = bind.dialect.name
+
+    # First: Create moved rows from new table
+    if dialect_name == "mssql":
+        cte = source_query.cte("source")
+        moved_data_tbl = table(target_table_name, *(column(c.name) for c in cte.columns))
+        ins = moved_data_tbl.insert().from_select(list(cte.columns), select([cte]))
+
+        stmt = ins.compile(bind=session.get_bind())
+        cte_sql = stmt.ctes[cte]
+
+        session.execute(f"WITH {cte_sql} SELECT source.* INTO {target_table_name} FROM source")
+    else:
+        # Postgres, MySQL and SQLite all support the same "create as select"
+        session.execute(
+            f"CREATE TABLE {target_table_name} AS {source_query.selectable.compile(bind=session.get_bind())}"
+        )
+
+    # Second: Now delete rows we've moved
+    try:
+        clause = source_query.whereclause
+    except AttributeError:
+        clause = source_query._whereclause
+
+    if dialect_name == "sqlite":
+        subq = source_query.selectable.with_only_columns([text(f'{source_table}.ROWID')])
+        delete = source_table.delete().where(column('ROWID').in_(subq))
+        print(delete)

Review comment:
       Whoop, yeah




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #19808: Refactor dangling row check to use SQLA queries

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #19808:
URL: https://github.com/apache/airflow/pull/19808#discussion_r756385159



##########
File path: airflow/utils/db.py
##########
@@ -794,16 +744,68 @@ def check_run_id_null(session) -> Iterable[str]:
                 reason="with a NULL dag_id, run_id, or execution_date",
             )
             return
-        _move_dangling_run_data_to_new_table(session, dagrun_table, dagrun_dangling_table_name)
+        _move_dangling_data_to_new_table(
+            session,
+            dagrun_table,
+            dagrun_table.select(invalid_dagrun_filter),
+            dagrun_dangling_table_name,
+        )
 
 
-def _move_dangling_task_data_to_new_table(session, source_table: "Table", target_table_name: str):
-    where_clause = """
-        left join dag_run as dr
-        on (source.dag_id = dr.dag_id and source.execution_date = dr.execution_date)
-        where dr.id is null
-    """
-    _move_dangling_table(session, source_table, target_table_name, where_clause)
+def _move_dangling_data_to_new_table(
+    session, source_table: "Table", source_query: "Query", target_table_name: str
+):
+    from sqlalchemy import column, select, table
+    from sqlalchemy.sql.selectable import Join
+
+    bind = session.get_bind()
+    dialect_name = bind.dialect.name
+
+    # First: Create moved rows from new table
+    if dialect_name == "mssql":
+        cte = source_query.cte("source")
+        moved_data_tbl = table(target_table_name, *(column(c.name) for c in cte.columns))
+        ins = moved_data_tbl.insert().from_select(list(cte.columns), select([cte]))
+
+        stmt = ins.compile(bind=session.get_bind())
+        cte_sql = stmt.ctes[cte]
+
+        session.execute(f"WITH {cte_sql} SELECT source.* INTO {target_table_name} FROM source")
+    else:
+        # Postgres, MySQL and SQLite all support the same "create as select"
+        session.execute(
+            f"CREATE TABLE {target_table_name} AS {source_query.selectable.compile(bind=session.get_bind())}"
+        )
+
+    # Second: Now delete rows we've moved
+    try:
+        clause = source_query.whereclause
+    except AttributeError:
+        clause = source_query._whereclause
+
+    if dialect_name == "sqlite":
+        subq = source_query.selectable.with_only_columns([text(f'{source_table}.ROWID')])
+        delete = source_table.delete().where(column('ROWID').in_(subq))
+        print(delete)
+    elif dialect_name in ("mysql", "mssql"):
+        # This is not foolproof! But it works for the limited queries (with no params) that we use here
+        stmt = source_query.selectable
+
+        def _from_name(from_) -> str:
+            if isinstance(from_, Join):
+                return str(from_.compile(bind=bind))
+            return str(from_)

Review comment:
       SQLAlchemy really has a lot of rough edges in unexpected places.

##########
File path: airflow/utils/db.py
##########
@@ -794,16 +744,68 @@ def check_run_id_null(session) -> Iterable[str]:
                 reason="with a NULL dag_id, run_id, or execution_date",
             )
             return
-        _move_dangling_run_data_to_new_table(session, dagrun_table, dagrun_dangling_table_name)
+        _move_dangling_data_to_new_table(
+            session,
+            dagrun_table,
+            dagrun_table.select(invalid_dagrun_filter),
+            dagrun_dangling_table_name,
+        )
 
 
-def _move_dangling_task_data_to_new_table(session, source_table: "Table", target_table_name: str):
-    where_clause = """
-        left join dag_run as dr
-        on (source.dag_id = dr.dag_id and source.execution_date = dr.execution_date)
-        where dr.id is null
-    """
-    _move_dangling_table(session, source_table, target_table_name, where_clause)
+def _move_dangling_data_to_new_table(
+    session, source_table: "Table", source_query: "Query", target_table_name: str
+):
+    from sqlalchemy import column, select, table
+    from sqlalchemy.sql.selectable import Join
+
+    bind = session.get_bind()
+    dialect_name = bind.dialect.name
+
+    # First: Create moved rows from new table
+    if dialect_name == "mssql":
+        cte = source_query.cte("source")
+        moved_data_tbl = table(target_table_name, *(column(c.name) for c in cte.columns))
+        ins = moved_data_tbl.insert().from_select(list(cte.columns), select([cte]))
+
+        stmt = ins.compile(bind=session.get_bind())
+        cte_sql = stmt.ctes[cte]
+
+        session.execute(f"WITH {cte_sql} SELECT source.* INTO {target_table_name} FROM source")
+    else:
+        # Postgres, MySQL and SQLite all support the same "create as select"
+        session.execute(
+            f"CREATE TABLE {target_table_name} AS {source_query.selectable.compile(bind=session.get_bind())}"
+        )
+
+    # Second: Now delete rows we've moved
+    try:
+        clause = source_query.whereclause
+    except AttributeError:
+        clause = source_query._whereclause
+
+    if dialect_name == "sqlite":
+        subq = source_query.selectable.with_only_columns([text(f'{source_table}.ROWID')])
+        delete = source_table.delete().where(column('ROWID').in_(subq))
+        print(delete)

Review comment:
       Stray debugging code?

##########
File path: airflow/utils/db.py
##########
@@ -794,16 +744,68 @@ def check_run_id_null(session) -> Iterable[str]:
                 reason="with a NULL dag_id, run_id, or execution_date",
             )
             return
-        _move_dangling_run_data_to_new_table(session, dagrun_table, dagrun_dangling_table_name)
+        _move_dangling_data_to_new_table(
+            session,
+            dagrun_table,
+            dagrun_table.select(invalid_dagrun_filter),
+            dagrun_dangling_table_name,
+        )
 
 
-def _move_dangling_task_data_to_new_table(session, source_table: "Table", target_table_name: str):
-    where_clause = """
-        left join dag_run as dr
-        on (source.dag_id = dr.dag_id and source.execution_date = dr.execution_date)
-        where dr.id is null
-    """
-    _move_dangling_table(session, source_table, target_table_name, where_clause)
+def _move_dangling_data_to_new_table(
+    session, source_table: "Table", source_query: "Query", target_table_name: str
+):
+    from sqlalchemy import column, select, table
+    from sqlalchemy.sql.selectable import Join

Review comment:
       Why not import globally?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb edited a comment on pull request #19808: Refactor dangling row check to use SQLA queries

Posted by GitBox <gi...@apache.org>.
ashb edited a comment on pull request #19808:
URL: https://github.com/apache/airflow/pull/19808#issuecomment-1020126576


   Damn. Good catch TP.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #19808: Refactor dangling row check to use SQLA queries

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #19808:
URL: https://github.com/apache/airflow/pull/19808#issuecomment-1011160510


   I've rebased this, and I'll need to re-run these checks still work as it's been a while.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #19808: Refactor dangling row check to use SQLA queries

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #19808:
URL: https://github.com/apache/airflow/pull/19808#issuecomment-1020126576


   Damn.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #19808: Refactor dangling row check to use SQLA queries

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #19808:
URL: https://github.com/apache/airflow/pull/19808#issuecomment-978210560


   > Honestly I feel this is a massive degrade on readability, but I guess that’s more or less how ORMs work in general to balance between code duplication and hairy string manipulation.
   
   Yeah, I kind of agree, but to add the XCom migration check (which I've already got written, just not in this PR I had to make a change like this:
   
   ```diff
   -        source_to_dag_run_join_cond = and_(
   -            source_table.c.dag_id == dagrun_table.c.dag_id,
   -            source_table.c.execution_date == dagrun_table.c.execution_date,
   -        )
   -        invalid_rows_query = (
   -            session.query(source_table.c.dag_id, source_table.c.task_id, source_table.c.execution_date)
   -            .select_from(outerjoin(source_table, dagrun_table, source_to_dag_run_join_cond))
   -            .filter(dagrun_table.c.dag_id.is_(None))
   +        if "task_id" in fk_target.columns:
   +            join_condition &= to_migrate.c.task_id == fk_target.c.task_id
   +            query = session.query(
   +                to_migrate.c.dag_id, to_migrate.c.task_id, to_migrate.c.execution_date, to_migrate.c.task_id
   +            )
   +        else:
   +            query = session.query(to_migrate.c.dag_id, to_migrate.c.task_id, to_migrate.c.execution_date)
   +
   +        if "execution_date" in fk_target.columns:
   +            join_target = fk_target
   +            join_condition &= to_migrate.c.execution_date == fk_target.c.execution_date
   +        else:
   +            # Target Table doesn't have execution_date column (anymore?) i.e. TaskInstance after 2.2.0
   +            join_target = fk_target.join(
   +                dagrun_table,
   +                and_(
   +                    dagrun_table.c.dag_id == fk_target.c.dag_id, dagrun_table.c.run_id == fk_target.c.run_id
   +                ),
   +                isouter=True,
   +            )
   +            join_condition &= join_target.c.dag_run_execution_date == to_migrate.c.execution_date
   ```
   
   Which makes it _even less readable_, but the other option is that we have to have a lot of if/else and cases for Xcom pre 2.2, Xcom post 2.2, TI, etc.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #19808: Refactor dangling row check to use SQLA queries

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #19808:
URL: https://github.com/apache/airflow/pull/19808#issuecomment-1020060074


   Tested this, all still working.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb merged pull request #19808: Refactor dangling row check to use SQLA queries

Posted by GitBox <gi...@apache.org>.
ashb merged pull request #19808:
URL: https://github.com/apache/airflow/pull/19808


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #19808: Refactor dangling row check to use SQLA queries

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #19808:
URL: https://github.com/apache/airflow/pull/19808#issuecomment-1006301706


   As long as someone figures out the internals (you do), this is probably better than string-formatting, so +1 to moving this forward.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #19808: Refactor dangling row check to use SQLA queries

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #19808:
URL: https://github.com/apache/airflow/pull/19808#issuecomment-978177249


   Honestly I feel this is a massive degrade on readability, but I guess that’s more or less how ORMs work in general to balance between code duplication and hairy string manipulation.
   
   > Codespell doesn't currently have a method of ignoring a _single_ line without ignoring the word everywhere (which we don't want to do) so I have to ignore the exact _line_.
   
   Including the leading whitespace? That’s… uh.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #19808: Refactor dangling row check to use SQLA queries

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #19808:
URL: https://github.com/apache/airflow/pull/19808#discussion_r756411056



##########
File path: airflow/utils/db.py
##########
@@ -794,16 +744,68 @@ def check_run_id_null(session) -> Iterable[str]:
                 reason="with a NULL dag_id, run_id, or execution_date",
             )
             return
-        _move_dangling_run_data_to_new_table(session, dagrun_table, dagrun_dangling_table_name)
+        _move_dangling_data_to_new_table(
+            session,
+            dagrun_table,
+            dagrun_table.select(invalid_dagrun_filter),
+            dagrun_dangling_table_name,
+        )
 
 
-def _move_dangling_task_data_to_new_table(session, source_table: "Table", target_table_name: str):
-    where_clause = """
-        left join dag_run as dr
-        on (source.dag_id = dr.dag_id and source.execution_date = dr.execution_date)
-        where dr.id is null
-    """
-    _move_dangling_table(session, source_table, target_table_name, where_clause)
+def _move_dangling_data_to_new_table(
+    session, source_table: "Table", source_query: "Query", target_table_name: str
+):
+    from sqlalchemy import column, select, table
+    from sqlalchemy.sql.selectable import Join

Review comment:
       Probably not strictly required, but this is in the hot import path for `airflow` so I want to minimize the global imports in this file




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb closed pull request #19808: Refactor dangling row check to use SQLA queries

Posted by GitBox <gi...@apache.org>.
ashb closed pull request #19808:
URL: https://github.com/apache/airflow/pull/19808


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org