You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/03/09 06:32:57 UTC

[GitHub] [airflow] jedcunningham commented on a change in pull request #22102: Enhance `db upgrade` args

jedcunningham commented on a change in pull request #22102:
URL: https://github.com/apache/airflow/pull/22102#discussion_r822320278



##########
File path: airflow/cli/commands/db_command.py
##########
@@ -47,8 +53,36 @@ def resetdb(args):
 def upgradedb(args):
     """Upgrades the metadata database"""
     print("DB: " + repr(settings.engine.url))
-    db.upgradedb(version_range=args.range, revision_range=args.revision_range)
-    print("Upgrades done")
+    if args.revision and args.version:
+        raise SystemExit("Cannot supply both `revision` and `version`.")
+    if args.from_version and args.from_revision:
+        raise SystemExit("`--from-revision` may not be combined with `--from-version`")

Review comment:
       ```suggestion
           raise SystemExit("Cannot supply both `--from-revision` and `--from-version`")
   ```
    
   For consistency?

##########
File path: airflow/cli/commands/db_command.py
##########
@@ -47,8 +53,36 @@ def resetdb(args):
 def upgradedb(args):
     """Upgrades the metadata database"""
     print("DB: " + repr(settings.engine.url))
-    db.upgradedb(version_range=args.range, revision_range=args.revision_range)
-    print("Upgrades done")
+    if args.revision and args.version:
+        raise SystemExit("Cannot supply both `revision` and `version`.")
+    if args.from_version and args.from_revision:
+        raise SystemExit("`--from-revision` may not be combined with `--from-version`")
+    if (args.from_revision or args.from_version) and not args.sql_only:
+        raise SystemExit("Args `--from-revision` and `--from-version` may only be used with `--sql-only`")
+    revision = None
+    from_revision = None
+    if args.from_revision:
+        from_revision = args.from_revision
+    elif args.from_version:
+        if parse_version(args.from_version) < parse_version('2.0.0'):
+            raise SystemExit("From version must be greater than 2.0.0")

Review comment:
       ```suggestion
               raise SystemExit("From version must be greater or equal to than 2.0.0")
   ```
   
   or 
   
   ```suggestion
               raise SystemExit("From version must be 2.0.0 or later")
   ```

##########
File path: airflow/cli/commands/db_command.py
##########
@@ -47,8 +53,36 @@ def resetdb(args):
 def upgradedb(args):
     """Upgrades the metadata database"""
     print("DB: " + repr(settings.engine.url))
-    db.upgradedb(version_range=args.range, revision_range=args.revision_range)
-    print("Upgrades done")
+    if args.revision and args.version:
+        raise SystemExit("Cannot supply both `revision` and `version`.")
+    if args.from_version and args.from_revision:
+        raise SystemExit("`--from-revision` may not be combined with `--from-version`")
+    if (args.from_revision or args.from_version) and not args.sql_only:
+        raise SystemExit("Args `--from-revision` and `--from-version` may only be used with `--sql-only`")
+    revision = None
+    from_revision = None
+    if args.from_revision:
+        from_revision = args.from_revision
+    elif args.from_version:
+        if parse_version(args.from_version) < parse_version('2.0.0'):
+            raise SystemExit("From version must be greater than 2.0.0")
+        from_revision = REVISION_HEADS_MAP.get(args.from_version)
+        if not from_revision:
+            raise SystemExit(f"Unknown version {args.from_version!r} supplied as `--from-version`.")
+    if args.version:
+        revision = REVISION_HEADS_MAP.get(args.version)
+        if not revision:
+            raise SystemExit(f"Upgrading to version {args.version} is not supported.")
+    elif args.revision:
+        revision = args.revision
+    if not args.sql_only:

Review comment:
       ```suggestion
   
       if not args.sql_only:
   ```

##########
File path: airflow/utils/db.py
##########
@@ -1043,103 +1061,106 @@ def _offline_migration(migration_func: Callable, config, revision):
         logging.disable(logging.NOTSET)
 
 
-def _validate_version_range(script_, version_range):
-    if ':' not in version_range:
-        raise AirflowException(
-            'Please provide Airflow version range with the format "old_version:new_version"'
-        )
-    lower, upper = version_range.split(':')
-
-    if not REVISION_HEADS_MAP.get(lower) or not REVISION_HEADS_MAP.get(upper):
-        raise AirflowException('Please provide valid Airflow versions above 2.0.0.')
-    if REVISION_HEADS_MAP.get(lower) == REVISION_HEADS_MAP.get(upper):
-        if sys.stdout.isatty():
-            size = os.get_terminal_size().columns
-        else:
-            size = 0
-        print(f"Hey this is your migration script from {lower}, to {upper}, but guess what?".center(size))
-        print(
-            "There is no migration needed as the database has not changed between those versions. "
-            "You are done.".center(size)
-        )
-        print("""/\\_/\\""".center(size))
-        print("""(='_' )""".center(size))
-        print("""(,(") (")""".center(size))
-        print("""^^^""".center(size))
-        return
-    dbname = settings.engine.dialect.name
-    if dbname == 'sqlite':
-        raise AirflowException('SQLite is not supported for offline migration.')
-    elif dbname == 'mssql' and (lower != '2.2.0' or int(lower.split('.')[1]) < 2):
-        raise AirflowException(
-            'MSSQL is not supported for offline migration in Airflow versions less than 2.2.0.'
-        )
-    _lower, _upper = REVISION_HEADS_MAP[lower], REVISION_HEADS_MAP[upper]
-    revision = f"{_lower}:{_upper}"
+def print_happy_cat(message):
+    if sys.stdout.isatty():
+        size = os.get_terminal_size().columns
+    else:
+        size = 0
+    print(message.center(size))
+    print("""/\\_/\\""".center(size))
+    print("""(='_' )""".center(size))
+    print("""(,(") (")""".center(size))
+    print("""^^^""".center(size))
+    return
+
+
+def _revision_greater(config, this_rev, base_rev):
+    # Check if there is history between the revisions and the start revision
+    # This ensures that the revisions are above `min_revision`
+    script = _get_script_object(config)
     try:
-        # Check if there is history between the revisions
-        list(script_.revision_map.iterate_revisions(_upper, _lower))
+        list(script.revision_map.iterate_revisions(upper=this_rev, lower=base_rev))
+        return True
     except Exception:
-        raise AirflowException(
-            f"Error while checking history for revision range {revision}. "
-            f"Check that the supplied airflow version is in the format 'old_version:new_version'."
-        )
-    return revision
+        return False
 
 
-def _validate_revision(script_, revision_range):
-    if ':' not in revision_range:
-        raise AirflowException(
-            'Please provide Airflow revision range with the format "old_revision:new_revision"'
-        )
+def _revisions_above_min_for_offline(config, revisions):
+    """
+    Checks that all supplied revision ids are above the minimum revision for the dialect.
+
+    :param config: Alembic config
+    :param revisions: list of Alembic revision ids
+    :return: None
+    :rtype: None
+    """
     dbname = settings.engine.dialect.name
     if dbname == 'sqlite':
-        raise AirflowException('SQLite is not supported for offline migration.')
-    start_version = '2.0.0'
-    rev_2_0_0_head = 'e959f08ac86c'
-    _lowerband, _upperband = revision_range.split(':')
-    if dbname == 'mssql':
-        rev_2_0_0_head = '7b2661a43ba3'
-        start_version = '2.2.0'
-    for i in [_lowerband, _upperband]:
-        try:
-            # Check if there is history between the revisions and the start revision
-            # This ensures that the revisions are above 2.0.0 head or 2.2.0 head if mssql
-            list(script_.revision_map.iterate_revisions(upper=i, lower=rev_2_0_0_head))
-        except Exception:
-            raise AirflowException(
-                f"Error while checking history for revision range {rev_2_0_0_head}:{i}. "
-                f"Check that {i} is a valid revision. "
-                f"Supported revision for offline migration is from {rev_2_0_0_head} "
-                f"which is airflow {start_version} head"
+        raise AirflowException('Offline migration not supported for SQLite.')
+    min_version, min_revision = ('2.2.0', '7b2661a43ba3') if dbname == 'mssql' else ('2.0.0', 'e959f08ac86c')
+
+    # Check if there is history between the revisions and the start revision
+    # This ensures that the revisions are above `min_revision`
+    for rev in revisions:
+        if not rev:
+            raise ValueError('unexpected')
+        if not _revision_greater(config, rev, min_revision):
+            raise ValueError(
+                f"Error while checking history for revision range {min_revision}:{rev}. "
+                f"Check that {rev} is a valid revision. "
+                f"For dialect {dbname!r}, supported revision for offline migration is from {min_revision} "
+                f"which corresponds to Airflow {min_version}."
             )
 
 
 @provide_session
 def upgradedb(
-    version_range: Optional[str] = None, revision_range: Optional[str] = None, session: Session = NEW_SESSION
+    to_revision: Optional[str] = None,
+    from_revision: Optional[str] = None,
+    sql: bool = False,
+    session: Session = NEW_SESSION,
 ):
-    """Upgrade the database."""
+    """
+
+    :param to_revision: Optional Alembic revision ID to upgrade *to*.
+        If omitted, upgrades to latest revision.
+    :param from_revision: Optional Alembic revision ID to upgrade *from*.
+        Not compatible with ``sql_only=False``.
+    :param sql: if True, migration statements will be printed but not executed.
+    :param session: sqlalchemy session with connection to Airflow metadata database
+    :return: None
+    :rtype: None
+    """
+    if from_revision and not sql:
+        raise Exception("`from_revision` only supported with `sql_only=True`.")
+
     # alembic adds significant import time, so we import it lazily
     if not settings.SQL_ALCHEMY_CONN:
-        raise RuntimeError("The settings.SQL_ALCHEMY_CONN not set. This is critical assertion.")
+        raise RuntimeError("The settings.SQL_ALCHEMY_CONN not set. This is a critical assertion.")
     from alembic import command
-    from alembic.script import ScriptDirectory
 
     config = _get_alembic_config()
-    script_ = ScriptDirectory.from_config(config)
 
-    config.set_main_option('sqlalchemy.url', settings.SQL_ALCHEMY_CONN.replace('%', '%%'))
-    if version_range:
-        revision = _validate_version_range(script_, version_range)
-        if not revision:
+    if sql:
+        if not from_revision:
+            from_revision = _get_current_revision(session)
+
+        if not from_revision and to_revision:
+            raise Exception('unexpected')

Review comment:
       Should probably expand on this a bit more.

##########
File path: airflow/cli/commands/db_command.py
##########
@@ -47,8 +53,36 @@ def resetdb(args):
 def upgradedb(args):
     """Upgrades the metadata database"""
     print("DB: " + repr(settings.engine.url))
-    db.upgradedb(version_range=args.range, revision_range=args.revision_range)
-    print("Upgrades done")
+    if args.revision and args.version:
+        raise SystemExit("Cannot supply both `revision` and `version`.")

Review comment:
       ```suggestion
           raise SystemExit("Cannot supply both `--revision` and `--version`.")
   ```
   
   To be consistent with the others?

##########
File path: airflow/cli/commands/db_command.py
##########
@@ -47,8 +53,36 @@ def resetdb(args):
 def upgradedb(args):
     """Upgrades the metadata database"""
     print("DB: " + repr(settings.engine.url))
-    db.upgradedb(version_range=args.range, revision_range=args.revision_range)
-    print("Upgrades done")
+    if args.revision and args.version:
+        raise SystemExit("Cannot supply both `revision` and `version`.")
+    if args.from_version and args.from_revision:
+        raise SystemExit("`--from-revision` may not be combined with `--from-version`")
+    if (args.from_revision or args.from_version) and not args.sql_only:
+        raise SystemExit("Args `--from-revision` and `--from-version` may only be used with `--sql-only`")
+    revision = None
+    from_revision = None
+    if args.from_revision:
+        from_revision = args.from_revision
+    elif args.from_version:
+        if parse_version(args.from_version) < parse_version('2.0.0'):
+            raise SystemExit("From version must be greater than 2.0.0")
+        from_revision = REVISION_HEADS_MAP.get(args.from_version)
+        if not from_revision:
+            raise SystemExit(f"Unknown version {args.from_version!r} supplied as `--from-version`.")
+    if args.version:

Review comment:
       ```suggestion
   
       if args.version:
   ```
   Nit: better readability with a couple extra newlines?

##########
File path: airflow/cli/commands/connection_command.py
##########
@@ -161,6 +161,7 @@ def connections_add(args):
     # Check that the conn_id and conn_uri args were passed to the command:
     missing_args = []
     invalid_args = []
+    print(args.__class__)

Review comment:
       ```suggestion
   ```
   
   Leftovers?

##########
File path: airflow/cli/commands/db_command.py
##########
@@ -17,15 +17,21 @@
 """Database sub-commands"""
 import os
 import textwrap
+import typing
 from tempfile import NamedTemporaryFile
 
+from packaging.version import parse as parse_version
+
 from airflow import settings
 from airflow.exceptions import AirflowException
 from airflow.utils import cli as cli_utils, db
 from airflow.utils.db import REVISION_HEADS_MAP
 from airflow.utils.db_cleanup import config_dict, run_cleanup
 from airflow.utils.process_utils import execute_interactive
 
+if typing.TYPE_CHECKING:
+    pass
+

Review comment:
       ```suggestion
   ```
   
   Leftovers?

##########
File path: airflow/utils/db.py
##########
@@ -1043,103 +1061,106 @@ def _offline_migration(migration_func: Callable, config, revision):
         logging.disable(logging.NOTSET)
 
 
-def _validate_version_range(script_, version_range):
-    if ':' not in version_range:
-        raise AirflowException(
-            'Please provide Airflow version range with the format "old_version:new_version"'
-        )
-    lower, upper = version_range.split(':')
-
-    if not REVISION_HEADS_MAP.get(lower) or not REVISION_HEADS_MAP.get(upper):
-        raise AirflowException('Please provide valid Airflow versions above 2.0.0.')
-    if REVISION_HEADS_MAP.get(lower) == REVISION_HEADS_MAP.get(upper):
-        if sys.stdout.isatty():
-            size = os.get_terminal_size().columns
-        else:
-            size = 0
-        print(f"Hey this is your migration script from {lower}, to {upper}, but guess what?".center(size))
-        print(
-            "There is no migration needed as the database has not changed between those versions. "
-            "You are done.".center(size)
-        )
-        print("""/\\_/\\""".center(size))
-        print("""(='_' )""".center(size))
-        print("""(,(") (")""".center(size))
-        print("""^^^""".center(size))
-        return
-    dbname = settings.engine.dialect.name
-    if dbname == 'sqlite':
-        raise AirflowException('SQLite is not supported for offline migration.')
-    elif dbname == 'mssql' and (lower != '2.2.0' or int(lower.split('.')[1]) < 2):
-        raise AirflowException(
-            'MSSQL is not supported for offline migration in Airflow versions less than 2.2.0.'
-        )
-    _lower, _upper = REVISION_HEADS_MAP[lower], REVISION_HEADS_MAP[upper]
-    revision = f"{_lower}:{_upper}"
+def print_happy_cat(message):
+    if sys.stdout.isatty():
+        size = os.get_terminal_size().columns
+    else:
+        size = 0
+    print(message.center(size))
+    print("""/\\_/\\""".center(size))
+    print("""(='_' )""".center(size))
+    print("""(,(") (")""".center(size))
+    print("""^^^""".center(size))
+    return
+
+
+def _revision_greater(config, this_rev, base_rev):
+    # Check if there is history between the revisions and the start revision
+    # This ensures that the revisions are above `min_revision`
+    script = _get_script_object(config)
     try:
-        # Check if there is history between the revisions
-        list(script_.revision_map.iterate_revisions(_upper, _lower))
+        list(script.revision_map.iterate_revisions(upper=this_rev, lower=base_rev))
+        return True
     except Exception:
-        raise AirflowException(
-            f"Error while checking history for revision range {revision}. "
-            f"Check that the supplied airflow version is in the format 'old_version:new_version'."
-        )
-    return revision
+        return False
 
 
-def _validate_revision(script_, revision_range):
-    if ':' not in revision_range:
-        raise AirflowException(
-            'Please provide Airflow revision range with the format "old_revision:new_revision"'
-        )
+def _revisions_above_min_for_offline(config, revisions):
+    """
+    Checks that all supplied revision ids are above the minimum revision for the dialect.
+
+    :param config: Alembic config
+    :param revisions: list of Alembic revision ids
+    :return: None
+    :rtype: None
+    """
     dbname = settings.engine.dialect.name
     if dbname == 'sqlite':
-        raise AirflowException('SQLite is not supported for offline migration.')
-    start_version = '2.0.0'
-    rev_2_0_0_head = 'e959f08ac86c'
-    _lowerband, _upperband = revision_range.split(':')
-    if dbname == 'mssql':
-        rev_2_0_0_head = '7b2661a43ba3'
-        start_version = '2.2.0'
-    for i in [_lowerband, _upperband]:
-        try:
-            # Check if there is history between the revisions and the start revision
-            # This ensures that the revisions are above 2.0.0 head or 2.2.0 head if mssql
-            list(script_.revision_map.iterate_revisions(upper=i, lower=rev_2_0_0_head))
-        except Exception:
-            raise AirflowException(
-                f"Error while checking history for revision range {rev_2_0_0_head}:{i}. "
-                f"Check that {i} is a valid revision. "
-                f"Supported revision for offline migration is from {rev_2_0_0_head} "
-                f"which is airflow {start_version} head"
+        raise AirflowException('Offline migration not supported for SQLite.')
+    min_version, min_revision = ('2.2.0', '7b2661a43ba3') if dbname == 'mssql' else ('2.0.0', 'e959f08ac86c')
+
+    # Check if there is history between the revisions and the start revision
+    # This ensures that the revisions are above `min_revision`
+    for rev in revisions:
+        if not rev:
+            raise ValueError('unexpected')
+        if not _revision_greater(config, rev, min_revision):
+            raise ValueError(
+                f"Error while checking history for revision range {min_revision}:{rev}. "
+                f"Check that {rev} is a valid revision. "
+                f"For dialect {dbname!r}, supported revision for offline migration is from {min_revision} "
+                f"which corresponds to Airflow {min_version}."
             )
 
 
 @provide_session
 def upgradedb(
-    version_range: Optional[str] = None, revision_range: Optional[str] = None, session: Session = NEW_SESSION
+    to_revision: Optional[str] = None,
+    from_revision: Optional[str] = None,
+    sql: bool = False,
+    session: Session = NEW_SESSION,
 ):
-    """Upgrade the database."""
+    """
+
+    :param to_revision: Optional Alembic revision ID to upgrade *to*.
+        If omitted, upgrades to latest revision.
+    :param from_revision: Optional Alembic revision ID to upgrade *from*.
+        Not compatible with ``sql_only=False``.
+    :param sql: if True, migration statements will be printed but not executed.
+    :param session: sqlalchemy session with connection to Airflow metadata database
+    :return: None
+    :rtype: None
+    """
+    if from_revision and not sql:
+        raise Exception("`from_revision` only supported with `sql_only=True`.")

Review comment:
       ```suggestion
           raise AirflowException("`from_revision` only supported with `sql_only=True`.")
   ```

##########
File path: airflow/utils/db.py
##########
@@ -1043,103 +1061,106 @@ def _offline_migration(migration_func: Callable, config, revision):
         logging.disable(logging.NOTSET)
 
 
-def _validate_version_range(script_, version_range):
-    if ':' not in version_range:
-        raise AirflowException(
-            'Please provide Airflow version range with the format "old_version:new_version"'
-        )
-    lower, upper = version_range.split(':')
-
-    if not REVISION_HEADS_MAP.get(lower) or not REVISION_HEADS_MAP.get(upper):
-        raise AirflowException('Please provide valid Airflow versions above 2.0.0.')
-    if REVISION_HEADS_MAP.get(lower) == REVISION_HEADS_MAP.get(upper):
-        if sys.stdout.isatty():
-            size = os.get_terminal_size().columns
-        else:
-            size = 0
-        print(f"Hey this is your migration script from {lower}, to {upper}, but guess what?".center(size))
-        print(
-            "There is no migration needed as the database has not changed between those versions. "
-            "You are done.".center(size)
-        )
-        print("""/\\_/\\""".center(size))
-        print("""(='_' )""".center(size))
-        print("""(,(") (")""".center(size))
-        print("""^^^""".center(size))
-        return
-    dbname = settings.engine.dialect.name
-    if dbname == 'sqlite':
-        raise AirflowException('SQLite is not supported for offline migration.')
-    elif dbname == 'mssql' and (lower != '2.2.0' or int(lower.split('.')[1]) < 2):
-        raise AirflowException(
-            'MSSQL is not supported for offline migration in Airflow versions less than 2.2.0.'
-        )
-    _lower, _upper = REVISION_HEADS_MAP[lower], REVISION_HEADS_MAP[upper]
-    revision = f"{_lower}:{_upper}"
+def print_happy_cat(message):
+    if sys.stdout.isatty():
+        size = os.get_terminal_size().columns
+    else:
+        size = 0
+    print(message.center(size))
+    print("""/\\_/\\""".center(size))
+    print("""(='_' )""".center(size))
+    print("""(,(") (")""".center(size))
+    print("""^^^""".center(size))
+    return
+
+
+def _revision_greater(config, this_rev, base_rev):
+    # Check if there is history between the revisions and the start revision
+    # This ensures that the revisions are above `min_revision`
+    script = _get_script_object(config)
     try:
-        # Check if there is history between the revisions
-        list(script_.revision_map.iterate_revisions(_upper, _lower))
+        list(script.revision_map.iterate_revisions(upper=this_rev, lower=base_rev))
+        return True
     except Exception:
-        raise AirflowException(
-            f"Error while checking history for revision range {revision}. "
-            f"Check that the supplied airflow version is in the format 'old_version:new_version'."
-        )
-    return revision
+        return False
 
 
-def _validate_revision(script_, revision_range):
-    if ':' not in revision_range:
-        raise AirflowException(
-            'Please provide Airflow revision range with the format "old_revision:new_revision"'
-        )
+def _revisions_above_min_for_offline(config, revisions):
+    """
+    Checks that all supplied revision ids are above the minimum revision for the dialect.
+
+    :param config: Alembic config
+    :param revisions: list of Alembic revision ids
+    :return: None
+    :rtype: None
+    """
     dbname = settings.engine.dialect.name
     if dbname == 'sqlite':
-        raise AirflowException('SQLite is not supported for offline migration.')
-    start_version = '2.0.0'
-    rev_2_0_0_head = 'e959f08ac86c'
-    _lowerband, _upperband = revision_range.split(':')
-    if dbname == 'mssql':
-        rev_2_0_0_head = '7b2661a43ba3'
-        start_version = '2.2.0'
-    for i in [_lowerband, _upperband]:
-        try:
-            # Check if there is history between the revisions and the start revision
-            # This ensures that the revisions are above 2.0.0 head or 2.2.0 head if mssql
-            list(script_.revision_map.iterate_revisions(upper=i, lower=rev_2_0_0_head))
-        except Exception:
-            raise AirflowException(
-                f"Error while checking history for revision range {rev_2_0_0_head}:{i}. "
-                f"Check that {i} is a valid revision. "
-                f"Supported revision for offline migration is from {rev_2_0_0_head} "
-                f"which is airflow {start_version} head"
+        raise AirflowException('Offline migration not supported for SQLite.')
+    min_version, min_revision = ('2.2.0', '7b2661a43ba3') if dbname == 'mssql' else ('2.0.0', 'e959f08ac86c')
+
+    # Check if there is history between the revisions and the start revision
+    # This ensures that the revisions are above `min_revision`
+    for rev in revisions:
+        if not rev:
+            raise ValueError('unexpected')
+        if not _revision_greater(config, rev, min_revision):
+            raise ValueError(
+                f"Error while checking history for revision range {min_revision}:{rev}. "
+                f"Check that {rev} is a valid revision. "
+                f"For dialect {dbname!r}, supported revision for offline migration is from {min_revision} "
+                f"which corresponds to Airflow {min_version}."
             )
 
 
 @provide_session
 def upgradedb(
-    version_range: Optional[str] = None, revision_range: Optional[str] = None, session: Session = NEW_SESSION
+    to_revision: Optional[str] = None,
+    from_revision: Optional[str] = None,
+    sql: bool = False,
+    session: Session = NEW_SESSION,
 ):
-    """Upgrade the database."""
+    """
+
+    :param to_revision: Optional Alembic revision ID to upgrade *to*.
+        If omitted, upgrades to latest revision.
+    :param from_revision: Optional Alembic revision ID to upgrade *from*.
+        Not compatible with ``sql_only=False``.
+    :param sql: if True, migration statements will be printed but not executed.
+    :param session: sqlalchemy session with connection to Airflow metadata database
+    :return: None
+    :rtype: None
+    """
+    if from_revision and not sql:
+        raise Exception("`from_revision` only supported with `sql_only=True`.")
+
     # alembic adds significant import time, so we import it lazily
     if not settings.SQL_ALCHEMY_CONN:
-        raise RuntimeError("The settings.SQL_ALCHEMY_CONN not set. This is critical assertion.")
+        raise RuntimeError("The settings.SQL_ALCHEMY_CONN not set. This is a critical assertion.")
     from alembic import command
-    from alembic.script import ScriptDirectory
 
     config = _get_alembic_config()
-    script_ = ScriptDirectory.from_config(config)
 
-    config.set_main_option('sqlalchemy.url', settings.SQL_ALCHEMY_CONN.replace('%', '%%'))
-    if version_range:
-        revision = _validate_version_range(script_, version_range)
-        if not revision:
+    if sql:
+        if not from_revision:
+            from_revision = _get_current_revision(session)
+
+        if not from_revision and to_revision:
+            raise Exception('unexpected')
+
+        if to_revision == from_revision:
+            print_happy_cat("No migrations to apply; nothing to do.")
             return
-        log.info("Running offline migrations for version range %s", version_range)
-        return _offline_migration(command.upgrade, config, revision)
-    elif revision_range:
-        _validate_revision(script_, revision_range)
-        log.info("Running offline migrations for revision range %s", revision_range)
-        return _offline_migration(command.upgrade, config, revision_range)
+
+        if not _revision_greater(config, to_revision, from_revision):
+            raise ValueError(
+                f'Requested *to* revision {to_revision} is older than *from* revision {from_revision}. '
+                'Please check your requested versions / revisions.'
+            )
+        _revisions_above_min_for_offline(config=config, revisions=[from_revision, to_revision])
+
+        _offline_migration(command.upgrade, config, f"{from_revision}:{to_revision}")
+        return

Review comment:
       ```suggestion
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org