You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/11/05 14:18:19 UTC

[GitHub] [airflow] kaxil commented on a change in pull request #19425: Fix moving of dangling TaskInstance rows for SQL Server

kaxil commented on a change in pull request #19425:
URL: https://github.com/apache/airflow/pull/19425#discussion_r743700766



##########
File path: airflow/utils/db.py
##########
@@ -709,18 +709,63 @@ def _format_dangling_error(source_table, target_table, invalid_count, reason):
     )
 
 
-def _move_dangling_run_data_to_new_table(session, source_table, target_table):
+def _move_dangling_run_data_to_new_table(session, source_table: "Table", target_table_name: str):
     where_clause = "where dag_id is null or run_id is null or execution_date is null"
-    session.execute(text(f"create table {target_table} as select * from {source_table} {where_clause}"))
-    session.execute(text(f"delete from {source_table} {where_clause}"))
+    _move_dangling_table(session, source_table, target_table_name, where_clause)
+
+
+def _move_dangling_table(session, source_table: "Table", target_table_name: str, where_clause: str):
+    dialect_name = session.get_bind().dialect.name
+
+    delete_where = " AND ".join(
+        f"{source_table.name}.{c.name} = d.{c.name}" for c in source_table.primary_key.columns
+    )
+    if dialect_name == "mssql":
+        session.execute(
+            text(f"select source.* into {target_table_name} from {source_table} as source {where_clause}")
+        )
+        session.execute(
+            text(
+                f"delete from {source_table} from {source_table} join {target_table_name} AS d ON "
+                + delete_where
+            )
+        )
+    else:
+        # Postgres, MySQL and SQLite all have the same CREATE TABLE a AS SELECT ... syntax
+        session.execute(
+            text(
+                f"create table {target_table_name} as select source.* from {source_table} as source "
+                + where_clause
+            )
+        )
+
+        # But different join-delete syntax.
+        if dialect_name == "mysql":
+            session.execute(
+                text(
+                    f"delete {source_table} from {source_table} join {target_table_name} as d on "
+                    + delete_where

Review comment:
       ```suggestion
                       + {delete_where}
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org