You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/11/24 19:57:07 UTC

[GitHub] [airflow] uranusjr commented on a change in pull request #19808: Refactor dangling row check to use SQLA queries

uranusjr commented on a change in pull request #19808:
URL: https://github.com/apache/airflow/pull/19808#discussion_r756385159



##########
File path: airflow/utils/db.py
##########
@@ -794,16 +744,68 @@ def check_run_id_null(session) -> Iterable[str]:
                 reason="with a NULL dag_id, run_id, or execution_date",
             )
             return
-        _move_dangling_run_data_to_new_table(session, dagrun_table, dagrun_dangling_table_name)
+        _move_dangling_data_to_new_table(
+            session,
+            dagrun_table,
+            dagrun_table.select(invalid_dagrun_filter),
+            dagrun_dangling_table_name,
+        )
 
 
-def _move_dangling_task_data_to_new_table(session, source_table: "Table", target_table_name: str):
-    where_clause = """
-        left join dag_run as dr
-        on (source.dag_id = dr.dag_id and source.execution_date = dr.execution_date)
-        where dr.id is null
-    """
-    _move_dangling_table(session, source_table, target_table_name, where_clause)
+def _move_dangling_data_to_new_table(
+    session, source_table: "Table", source_query: "Query", target_table_name: str
+):
+    from sqlalchemy import column, select, table
+    from sqlalchemy.sql.selectable import Join
+
+    bind = session.get_bind()
+    dialect_name = bind.dialect.name
+
+    # First: Create moved rows from new table
+    if dialect_name == "mssql":
+        cte = source_query.cte("source")
+        moved_data_tbl = table(target_table_name, *(column(c.name) for c in cte.columns))
+        ins = moved_data_tbl.insert().from_select(list(cte.columns), select([cte]))
+
+        stmt = ins.compile(bind=session.get_bind())
+        cte_sql = stmt.ctes[cte]
+
+        session.execute(f"WITH {cte_sql} SELECT source.* INTO {target_table_name} FROM source")
+    else:
+        # Postgres, MySQL and SQLite all support the same "create as select"
+        session.execute(
+            f"CREATE TABLE {target_table_name} AS {source_query.selectable.compile(bind=session.get_bind())}"
+        )
+
+    # Second: Now delete rows we've moved
+    try:
+        clause = source_query.whereclause
+    except AttributeError:
+        clause = source_query._whereclause
+
+    if dialect_name == "sqlite":
+        subq = source_query.selectable.with_only_columns([text(f'{source_table}.ROWID')])
+        delete = source_table.delete().where(column('ROWID').in_(subq))
+        print(delete)
+    elif dialect_name in ("mysql", "mssql"):
+        # This is not foolproof! But it works for the limited queries (with no params) that we use here
+        stmt = source_query.selectable
+
+        def _from_name(from_) -> str:
+            if isinstance(from_, Join):
+                return str(from_.compile(bind=bind))
+            return str(from_)

Review comment:
       SQLAlchemy really has a lot of rough edges in unexpected places.

##########
File path: airflow/utils/db.py
##########
@@ -794,16 +744,68 @@ def check_run_id_null(session) -> Iterable[str]:
                 reason="with a NULL dag_id, run_id, or execution_date",
             )
             return
-        _move_dangling_run_data_to_new_table(session, dagrun_table, dagrun_dangling_table_name)
+        _move_dangling_data_to_new_table(
+            session,
+            dagrun_table,
+            dagrun_table.select(invalid_dagrun_filter),
+            dagrun_dangling_table_name,
+        )
 
 
-def _move_dangling_task_data_to_new_table(session, source_table: "Table", target_table_name: str):
-    where_clause = """
-        left join dag_run as dr
-        on (source.dag_id = dr.dag_id and source.execution_date = dr.execution_date)
-        where dr.id is null
-    """
-    _move_dangling_table(session, source_table, target_table_name, where_clause)
+def _move_dangling_data_to_new_table(
+    session, source_table: "Table", source_query: "Query", target_table_name: str
+):
+    from sqlalchemy import column, select, table
+    from sqlalchemy.sql.selectable import Join
+
+    bind = session.get_bind()
+    dialect_name = bind.dialect.name
+
+    # First: Create moved rows from new table
+    if dialect_name == "mssql":
+        cte = source_query.cte("source")
+        moved_data_tbl = table(target_table_name, *(column(c.name) for c in cte.columns))
+        ins = moved_data_tbl.insert().from_select(list(cte.columns), select([cte]))
+
+        stmt = ins.compile(bind=session.get_bind())
+        cte_sql = stmt.ctes[cte]
+
+        session.execute(f"WITH {cte_sql} SELECT source.* INTO {target_table_name} FROM source")
+    else:
+        # Postgres, MySQL and SQLite all support the same "create as select"
+        session.execute(
+            f"CREATE TABLE {target_table_name} AS {source_query.selectable.compile(bind=session.get_bind())}"
+        )
+
+    # Second: Now delete rows we've moved
+    try:
+        clause = source_query.whereclause
+    except AttributeError:
+        clause = source_query._whereclause
+
+    if dialect_name == "sqlite":
+        subq = source_query.selectable.with_only_columns([text(f'{source_table}.ROWID')])
+        delete = source_table.delete().where(column('ROWID').in_(subq))
+        print(delete)

Review comment:
       Stray debugging code?

##########
File path: airflow/utils/db.py
##########
@@ -794,16 +744,68 @@ def check_run_id_null(session) -> Iterable[str]:
                 reason="with a NULL dag_id, run_id, or execution_date",
             )
             return
-        _move_dangling_run_data_to_new_table(session, dagrun_table, dagrun_dangling_table_name)
+        _move_dangling_data_to_new_table(
+            session,
+            dagrun_table,
+            dagrun_table.select(invalid_dagrun_filter),
+            dagrun_dangling_table_name,
+        )
 
 
-def _move_dangling_task_data_to_new_table(session, source_table: "Table", target_table_name: str):
-    where_clause = """
-        left join dag_run as dr
-        on (source.dag_id = dr.dag_id and source.execution_date = dr.execution_date)
-        where dr.id is null
-    """
-    _move_dangling_table(session, source_table, target_table_name, where_clause)
+def _move_dangling_data_to_new_table(
+    session, source_table: "Table", source_query: "Query", target_table_name: str
+):
+    from sqlalchemy import column, select, table
+    from sqlalchemy.sql.selectable import Join

Review comment:
       Why not import globally?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org