You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/11/24 20:46:17 UTC

[GitHub] [airflow] ashb commented on a change in pull request #19808: Refactor dangling row check to use SQLA queries

ashb commented on a change in pull request #19808:
URL: https://github.com/apache/airflow/pull/19808#discussion_r756410750



##########
File path: airflow/utils/db.py
##########
@@ -794,16 +744,68 @@ def check_run_id_null(session) -> Iterable[str]:
                 reason="with a NULL dag_id, run_id, or execution_date",
             )
             return
-        _move_dangling_run_data_to_new_table(session, dagrun_table, dagrun_dangling_table_name)
+        _move_dangling_data_to_new_table(
+            session,
+            dagrun_table,
+            dagrun_table.select(invalid_dagrun_filter),
+            dagrun_dangling_table_name,
+        )
 
 
-def _move_dangling_task_data_to_new_table(session, source_table: "Table", target_table_name: str):
-    where_clause = """
-        left join dag_run as dr
-        on (source.dag_id = dr.dag_id and source.execution_date = dr.execution_date)
-        where dr.id is null
-    """
-    _move_dangling_table(session, source_table, target_table_name, where_clause)
+def _move_dangling_data_to_new_table(
+    session, source_table: "Table", source_query: "Query", target_table_name: str
+):
+    from sqlalchemy import column, select, table
+    from sqlalchemy.sql.selectable import Join
+
+    bind = session.get_bind()
+    dialect_name = bind.dialect.name
+
+    # First: Create moved rows from new table
+    if dialect_name == "mssql":
+        cte = source_query.cte("source")
+        moved_data_tbl = table(target_table_name, *(column(c.name) for c in cte.columns))
+        ins = moved_data_tbl.insert().from_select(list(cte.columns), select([cte]))
+
+        stmt = ins.compile(bind=session.get_bind())
+        cte_sql = stmt.ctes[cte]
+
+        session.execute(f"WITH {cte_sql} SELECT source.* INTO {target_table_name} FROM source")
+    else:
+        # Postgres, MySQL and SQLite all support the same "create as select"
+        session.execute(
+            f"CREATE TABLE {target_table_name} AS {source_query.selectable.compile(bind=session.get_bind())}"
+        )
+
+    # Second: Now delete rows we've moved
+    try:
+        clause = source_query.whereclause
+    except AttributeError:
+        clause = source_query._whereclause
+
+    if dialect_name == "sqlite":
+        subq = source_query.selectable.with_only_columns([text(f'{source_table}.ROWID')])
+        delete = source_table.delete().where(column('ROWID').in_(subq))
+        print(delete)

Review comment:
       Whoop, yeah




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org