You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/07/16 00:27:00 UTC

[GitHub] [airflow] uranusjr commented on a change in pull request #17030: Check for missing DagRun rows for "downstream" tables before migrating DB

uranusjr commented on a change in pull request #17030:
URL: https://github.com/apache/airflow/pull/17030#discussion_r670885633



##########
File path: airflow/utils/db.py
##########
@@ -668,47 +667,95 @@ def check_conn_type_null(session=None) -> str:
         pass
 
     if n_nulls:
-        return (
+        yield (
             'The conn_type column in the connection '
             'table must contain content.\n'
             'Make sure you don\'t have null '
             'in the conn_type column.\n'
             f'Null conn_type conn_id: {list(n_nulls)}'
         )
-    return ''
+
+
+def check_task_tables_without_matching_dagruns(session) -> Iterable[str]:
+    import sqlalchemy.schema
+    from sqlalchemy import and_, outerjoin
+
+    from airflow.models.renderedtifields import RenderedTaskInstanceFields
+    from airflow.models.sensorinstance import SensorInstance
+
+    metadata = sqlalchemy.schema.MetaData(session.bind)
+    models = [
+        RenderedTaskInstanceFields,
+        SensorInstance,
+        SlaMiss,
+        TaskInstance,
+        TaskFail,
+        TaskReschedule,
+        XCom,
+    ]
+    metadata.reflect(only=[model.__tablename__ for model in models])
+
+    for model in models:
+        if 'run_id' in metadata.tables[model.__tablename__].columns:
+            # Migration already applied, don't check again
+            continue
+
+        query = (
+            session.query(model.dag_id, model.task_id, model.execution_date)
+            .select_from(
+                outerjoin(
+                    model,
+                    DagRun,
+                    and_(model.dag_id == DagRun.dag_id, model.execution_date == DagRun.execution_date),
+                )
+            )
+            .filter(DagRun.run_id.is_(None))
+        )  # type: ignore
+
+        num = query.count()
+
+        if num > 0:
+            yield (
+                f'The {model.__tablename__} table has {num} row{"s" if num != 1 else ""} without a '
+                + 'corresponding dag_run row. You must manually correct this problem (possibly by deleting '
+                + 'the problem rows).'
+            )
 
 
 @provide_session
-def auto_migrations_available(session=None):
+def auto_migrations_available(session=None) -> Iterable[str]:
     """
     :session: session of the sqlalchemy
     :rtype: list[str]
     """
-    errors_ = []
-
-    for check_fn in (check_conn_id_duplicates, check_conn_type_null):
-        err = check_fn(session)
-        if err:
-            errors_.append(err)
-
-    return errors_
+    for check_fn in (
+        check_conn_id_duplicates,
+        check_conn_type_null,
+        check_task_tables_without_matching_dagruns,
+    ):
+        yield from check_fn(session)
 
 
 def upgradedb():
     """Upgrade the database."""
     # alembic adds significant import time, so we import it lazily
     from alembic import command
 
-    log.info("Creating tables")
     config = _get_alembic_config()
 
     config.set_main_option('sqlalchemy.url', settings.SQL_ALCHEMY_CONN.replace('%', '%%'))
-    # check automatic migration is available
-    errs = auto_migrations_available()
-    if errs:
-        for err in errs:
-            log.error("Automatic migration is not available\n%s", err)
-        return
+
+    errors_seen = False
+    for err in auto_migrations_available():

Review comment:
       This function name doesn’t really read right, but I guess it’s what it is…




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org