You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/11/24 12:48:48 UTC

[GitHub] [airflow] potiuk commented on a change in pull request #18439: Check and run migration in commands if necessary

potiuk commented on a change in pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#discussion_r756031105



##########
File path: airflow/utils/db.py
##########
@@ -626,27 +629,85 @@ def check_migrations(timeout):
     :param timeout: Timeout for the migration in seconds
     :return: None
     """
-    from alembic.runtime.migration import MigrationContext
+    for ticker in range(timeout):
+        source_heads = get_source_heads()
+        db_heads = get_db_heads()
+        if source_heads == db_heads:
+            return
+        time.sleep(1)
+        log.info('Waiting for migrations... %s second(s)', ticker)
+    raise TimeoutError(
+        f"There are still unapplied migrations after {timeout} seconds. "
+        f"Migration Head(s) in DB: {db_heads} | Migration Head(s) in Source Code: {source_heads}"
+    )
+
+
+def get_source_heads() -> Set:
+    """
+    Function to get the current migration head in the source code.
+
+    :return: a set of migration head(s)
+    """
     from alembic.script import ScriptDirectory
 
     config = _get_alembic_config()
     script_ = ScriptDirectory.from_config(config)
+    return set(script_.get_heads())
+
+
+def get_db_heads() -> Set:
+    """
+    Function to get the current migration head in the database.
+
+    :return: a set of migration head(s)
+    """
+    from alembic.runtime.migration import MigrationContext
+
     with settings.engine.connect() as connection:
         context = MigrationContext.configure(connection)
-        ticker = 0
-        while True:
-            source_heads = set(script_.get_heads())
-            db_heads = set(context.get_current_heads())
-            if source_heads == db_heads:
-                break
-            if ticker >= timeout:
-                raise TimeoutError(
-                    f"There are still unapplied migrations after {ticker} seconds. "
-                    f"Migration Head(s) in DB: {db_heads} | Migration Head(s) in Source Code: {source_heads}"
-                )
-            ticker += 1
-            time.sleep(1)
-            log.info('Waiting for migrations... %s second(s)', ticker)
+        return set(context.get_current_heads())
+
+
+def check_and_run_migrations():
+    """Check and run migrations if necessary. Only use in a tty"""
+    source_heads = get_source_heads()
+    db_heads = get_db_heads()
+    db_command = None
+    command_name = None
+    verb = None
+    if len(db_heads) < 1:
+        db_command = initdb
+        command_name = "init"
+        verb = "initialization"
+    elif source_heads != db_heads:
+        db_command = upgradedb
+        command_name = "upgrade"
+        verb = "upgrade"
+
+    if sys.stdout.isatty() and verb:
+        print()
+        question = f"Please confirm database {verb} (or wait 4 seconds to skip it). Are you sure?"
+        try:
+            ans = helpers.prompt_with_timeout(question, timeout=4)
+            if ans:
+                try:
+                    db_command()
+                    print(f"DB {verb} done")
+                except Exception as error:
+                    print(error)
+                    print(
+                        "You still have unapplied migrations. "
+                        "You may need to reset the database by running `airflow db reset`",
+                        file=sys.stderr,
+                    )
+                    sys.exit(1)
+        except AirflowException:
+            pass
+    elif source_heads != db_heads:
+        print(
+            f"ERROR: You need to {verb} the database. Please run `airflow db {command_name}`", file=sys.stderr

Review comment:
       Following discussion https://github.com/apache/airflow/issues/19784  and scenario where Helm migration images might be different than airlflow image, I think it would be good to add a hint about using the same version:
   
   ```suggestion
               f"ERROR: You need to {verb} the database. Please run `airflow db {command_name}`. Make sure the command is run using airflow version {version}.", file=sys.stderr
   ```
   
   (version should be imported aboive) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org