You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/03/09 06:56:08 UTC

[GitHub] [airflow] ephraimbuddy commented on a change in pull request #22102: Enhance `db upgrade` args

ephraimbuddy commented on a change in pull request #22102:
URL: https://github.com/apache/airflow/pull/22102#discussion_r822337281



##########
File path: airflow/utils/db.py
##########
@@ -1043,103 +1061,106 @@ def _offline_migration(migration_func: Callable, config, revision):
         logging.disable(logging.NOTSET)
 
 
-def _validate_version_range(script_, version_range):
-    if ':' not in version_range:
-        raise AirflowException(
-            'Please provide Airflow version range with the format "old_version:new_version"'
-        )
-    lower, upper = version_range.split(':')
-
-    if not REVISION_HEADS_MAP.get(lower) or not REVISION_HEADS_MAP.get(upper):
-        raise AirflowException('Please provide valid Airflow versions above 2.0.0.')
-    if REVISION_HEADS_MAP.get(lower) == REVISION_HEADS_MAP.get(upper):
-        if sys.stdout.isatty():
-            size = os.get_terminal_size().columns
-        else:
-            size = 0
-        print(f"Hey this is your migration script from {lower}, to {upper}, but guess what?".center(size))
-        print(
-            "There is no migration needed as the database has not changed between those versions. "
-            "You are done.".center(size)
-        )
-        print("""/\\_/\\""".center(size))
-        print("""(='_' )""".center(size))
-        print("""(,(") (")""".center(size))
-        print("""^^^""".center(size))
-        return
-    dbname = settings.engine.dialect.name
-    if dbname == 'sqlite':
-        raise AirflowException('SQLite is not supported for offline migration.')
-    elif dbname == 'mssql' and (lower != '2.2.0' or int(lower.split('.')[1]) < 2):
-        raise AirflowException(
-            'MSSQL is not supported for offline migration in Airflow versions less than 2.2.0.'
-        )
-    _lower, _upper = REVISION_HEADS_MAP[lower], REVISION_HEADS_MAP[upper]
-    revision = f"{_lower}:{_upper}"
+def print_happy_cat(message):
+    if sys.stdout.isatty():
+        size = os.get_terminal_size().columns
+    else:
+        size = 0
+    print(message.center(size))
+    print("""/\\_/\\""".center(size))
+    print("""(='_' )""".center(size))
+    print("""(,(") (")""".center(size))
+    print("""^^^""".center(size))
+    return
+
+
+def _revision_greater(config, this_rev, base_rev):
+    # Check if there is history between the revisions and the start revision
+    # This ensures that the revisions are above `min_revision`
+    script = _get_script_object(config)
     try:
-        # Check if there is history between the revisions
-        list(script_.revision_map.iterate_revisions(_upper, _lower))
+        list(script.revision_map.iterate_revisions(upper=this_rev, lower=base_rev))
+        return True
     except Exception:
-        raise AirflowException(
-            f"Error while checking history for revision range {revision}. "
-            f"Check that the supplied airflow version is in the format 'old_version:new_version'."
-        )
-    return revision
+        return False
 
 
-def _validate_revision(script_, revision_range):
-    if ':' not in revision_range:
-        raise AirflowException(
-            'Please provide Airflow revision range with the format "old_revision:new_revision"'
-        )
+def _revisions_above_min_for_offline(config, revisions):
+    """
+    Checks that all supplied revision ids are above the minimum revision for the dialect.
+
+    :param config: Alembic config
+    :param revisions: list of Alembic revision ids
+    :return: None
+    :rtype: None
+    """
     dbname = settings.engine.dialect.name
     if dbname == 'sqlite':
-        raise AirflowException('SQLite is not supported for offline migration.')
-    start_version = '2.0.0'
-    rev_2_0_0_head = 'e959f08ac86c'
-    _lowerband, _upperband = revision_range.split(':')
-    if dbname == 'mssql':
-        rev_2_0_0_head = '7b2661a43ba3'
-        start_version = '2.2.0'
-    for i in [_lowerband, _upperband]:
-        try:
-            # Check if there is history between the revisions and the start revision
-            # This ensures that the revisions are above 2.0.0 head or 2.2.0 head if mssql
-            list(script_.revision_map.iterate_revisions(upper=i, lower=rev_2_0_0_head))
-        except Exception:
-            raise AirflowException(
-                f"Error while checking history for revision range {rev_2_0_0_head}:{i}. "
-                f"Check that {i} is a valid revision. "
-                f"Supported revision for offline migration is from {rev_2_0_0_head} "
-                f"which is airflow {start_version} head"
+        raise AirflowException('Offline migration not supported for SQLite.')
+    min_version, min_revision = ('2.2.0', '7b2661a43ba3') if dbname == 'mssql' else ('2.0.0', 'e959f08ac86c')
+
+    # Check if there is history between the revisions and the start revision
+    # This ensures that the revisions are above `min_revision`
+    for rev in revisions:
+        if not rev:
+            raise ValueError('unexpected')
+        if not _revision_greater(config, rev, min_revision):
+            raise ValueError(
+                f"Error while checking history for revision range {min_revision}:{rev}. "
+                f"Check that {rev} is a valid revision. "
+                f"For dialect {dbname!r}, supported revision for offline migration is from {min_revision} "
+                f"which corresponds to Airflow {min_version}."
             )
 
 
 @provide_session
 def upgradedb(
-    version_range: Optional[str] = None, revision_range: Optional[str] = None, session: Session = NEW_SESSION
+    to_revision: Optional[str] = None,
+    from_revision: Optional[str] = None,
+    sql: bool = False,
+    session: Session = NEW_SESSION,
 ):
-    """Upgrade the database."""
+    """
+
+    :param to_revision: Optional Alembic revision ID to upgrade *to*.
+        If omitted, upgrades to latest revision.
+    :param from_revision: Optional Alembic revision ID to upgrade *from*.
+        Not compatible with ``sql_only=False``.
+    :param sql: if True, migration statements will be printed but not executed.
+    :param session: sqlalchemy session with connection to Airflow metadata database
+    :return: None
+    :rtype: None
+    """
+    if from_revision and not sql:
+        raise Exception("`from_revision` only supported with `sql_only=True`.")
+
     # alembic adds significant import time, so we import it lazily
     if not settings.SQL_ALCHEMY_CONN:
-        raise RuntimeError("The settings.SQL_ALCHEMY_CONN not set. This is critical assertion.")
+        raise RuntimeError("The settings.SQL_ALCHEMY_CONN not set. This is a critical assertion.")
     from alembic import command
-    from alembic.script import ScriptDirectory
 
     config = _get_alembic_config()
-    script_ = ScriptDirectory.from_config(config)
 
-    config.set_main_option('sqlalchemy.url', settings.SQL_ALCHEMY_CONN.replace('%', '%%'))
-    if version_range:
-        revision = _validate_version_range(script_, version_range)
-        if not revision:
+    if sql:
+        if not from_revision:
+            from_revision = _get_current_revision(session)
+
+        if not from_revision and to_revision:
+            raise Exception('unexpected')
+
+        if to_revision == from_revision:
+            print_happy_cat("No migrations to apply; nothing to do.")
             return
-        log.info("Running offline migrations for version range %s", version_range)
-        return _offline_migration(command.upgrade, config, revision)
-    elif revision_range:
-        _validate_revision(script_, revision_range)
-        log.info("Running offline migrations for revision range %s", revision_range)
-        return _offline_migration(command.upgrade, config, revision_range)
+
+        if not _revision_greater(config, to_revision, from_revision):
+            raise ValueError(
+                f'Requested *to* revision {to_revision} is older than *from* revision {from_revision}. '
+                'Please check your requested versions / revisions.'
+            )
+        _revisions_above_min_for_offline(config=config, revisions=[from_revision, to_revision])
+
+        _offline_migration(command.upgrade, config, f"{from_revision}:{to_revision}")
+        return

Review comment:
       The return is necessary to avoid it continuing to run the real upgrade command. Maybe we should return the _offline_migration?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org