You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/09/22 19:30:29 UTC

[GitHub] [airflow] ephraimbuddy opened a new pull request #18439: Check and run migration in webserver startup if necessary

ephraimbuddy opened a new pull request #18439:
URL: https://github.com/apache/airflow/pull/18439


   This PR proposes to add db initialization and upgrade in webserver startup.
   
   When the webserver is started, we check the migrations and decide whether
   to initialize the database or upgrade it.
   
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil edited a comment on pull request #18439: Check and run migration in commands if necessary

Posted by GitBox <gi...@apache.org>.
kaxil edited a comment on pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#issuecomment-986979416


   > Hey @kaxil - have your worries been addressed?
   
   Hey, yeah I & Ephraim are working closely on this one -- some of the PRs (#20018 & #20069) lately were to unblock this PR :) 
   
   We will rebase on top of main once both those PRs are merged


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk edited a comment on pull request #18439: Check and run migration in webserver startup if necessary

Posted by GitBox <gi...@apache.org>.
potiuk edited a comment on pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#issuecomment-927114807


   This is about a 100th time (yeah I am exaggerating a bit) of similar problem I see over and over: https://github.com/apache/airflow/issues/18513
   
   We **MUST** add it - and fast.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #18439: Check and run migration in commands if necessary

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#issuecomment-979752863


   I'm currently hitting this issue https://github.com/apache/airflow/issues/15340


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #18439: Check and run migration in webserver startup if necessary

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#discussion_r721334385



##########
File path: airflow/utils/cli.py
##########
@@ -89,6 +90,8 @@ def wrapper(*args, **kwargs):
         metrics = _build_metrics(f.__name__, args[0])
         cli_action_loggers.on_pre_execution(**metrics)
         try:
+            # Check and run migrations if necessary
+            check_and_run_migrations()

Review comment:
       I tihnk really the ones that for sure should NOT have the check is the `db` commmand (and all subcommands of it). The `db` command is the one that is used to managed the database and having de-synchronized migration there is "acceptable" really. Then you can either reset/upgrade or shell to running db without worrying about the state of synchronization (because you can use those commands to fix or inspect a problem if for example your migration fails).
   
   I would also skip it for the "installation" inspection/helper ones  (`version`, `providers`, `plugins`, `cheat-sheet`) - there is `slight` risk here - some of those commands 'could` work before  without the DB and for example we are running them to check if the "image" works fine, without actually having a DB initialized. Those never access/modify the DB.
   
   For all the rest, I think makes perfect sense to perform the check, as they simply need the DB in the **right** version to run. 
   
   On one hand we could introduce a new decorator (like `action_logging`)  but it is better to explicitly exclude the "non-DB" commands rather than add decorators for everything else. So maybe renaming `action_logging` to more generic (action_cli) decorator and adding `check_db=True` parameter and set them to false for all `db, versions, providers, plugins, cheat-sheet` commands makes more sense? I think we should rather explicitly exclude those commands than do not need db migration check than include the check by adding new decorator.  
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #18439: Check and run migration in webserver startup if necessary

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#discussion_r721300335



##########
File path: airflow/utils/cli.py
##########
@@ -89,6 +90,8 @@ def wrapper(*args, **kwargs):
         metrics = _build_metrics(f.__name__, args[0])
         cli_action_loggers.on_pre_execution(**metrics)
         try:
+            # Check and run migrations if necessary
+            check_and_run_migrations()

Review comment:
       We should avoid this check when running "db upgrade" :) @uranusjr mentioned that at the beginning of the conversation, but now when it part of the "action_logging" decorator it's more important than ever :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #18439: Check and run migration in webserver startup if necessary

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#issuecomment-927221077


   > One question: Would there be race conditions if multiple `airflow` commands are launched concurrently? i.e. `./breeze start-airflow` launches a tmux session and five commands together, and both the scheduler and webserver would want to do migration at the same time.
   
   We have now exclusive locks for migration - in most backends (MsSQL needs to be re-added as it has been removed - but it should be possible to bring it back after I discovered where the lock contention came from - I will do it shortly after this one is merged) .
   
   But I think it's a good point -   the lock should be put around the whole check with this change rather than inside the upgradedb command.
   
   ```
       with create_global_lock(session=session, pg_lock_id=2, lock_name="upgrade"):
           log.info("Creating tables")
           command.upgrade(config, 'heads')
       add_default_pool_if_not_exists()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #18439: Check and run migration in commands if necessary

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#discussion_r724011850



##########
File path: airflow/utils/db.py
##########
@@ -626,27 +629,88 @@ def check_migrations(timeout):
     :param timeout: Timeout for the migration in seconds
     :return: None
     """
-    from alembic.runtime.migration import MigrationContext
+    ticker = 0
+    while True:
+        source_heads = get_source_heads()
+        db_heads = get_db_heads()
+        if source_heads == db_heads:
+            break
+        if ticker >= timeout:
+            raise TimeoutError(
+                f"There are still unapplied migrations after {ticker} seconds. "
+                f"Migration Head(s) in DB: {db_heads} | Migration Head(s) in Source Code: {source_heads}"
+            )
+        ticker += 1
+        time.sleep(1)
+        log.info('Waiting for migrations... %s second(s)', ticker)

Review comment:
       `never use while loops` :) ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #18439: Check and run migration in commands if necessary

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#discussion_r724011850



##########
File path: airflow/utils/db.py
##########
@@ -626,27 +629,88 @@ def check_migrations(timeout):
     :param timeout: Timeout for the migration in seconds
     :return: None
     """
-    from alembic.runtime.migration import MigrationContext
+    ticker = 0
+    while True:
+        source_heads = get_source_heads()
+        db_heads = get_db_heads()
+        if source_heads == db_heads:
+            break
+        if ticker >= timeout:
+            raise TimeoutError(
+                f"There are still unapplied migrations after {ticker} seconds. "
+                f"Migration Head(s) in DB: {db_heads} | Migration Head(s) in Source Code: {source_heads}"
+            )
+        ticker += 1
+        time.sleep(1)
+        log.info('Waiting for migrations... %s second(s)', ticker)

Review comment:
       `never use while loops` :) ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #18439: Check and run migration in commands if necessary

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#discussion_r756031105



##########
File path: airflow/utils/db.py
##########
@@ -626,27 +629,85 @@ def check_migrations(timeout):
     :param timeout: Timeout for the migration in seconds
     :return: None
     """
-    from alembic.runtime.migration import MigrationContext
+    for ticker in range(timeout):
+        source_heads = get_source_heads()
+        db_heads = get_db_heads()
+        if source_heads == db_heads:
+            return
+        time.sleep(1)
+        log.info('Waiting for migrations... %s second(s)', ticker)
+    raise TimeoutError(
+        f"There are still unapplied migrations after {timeout} seconds. "
+        f"Migration Head(s) in DB: {db_heads} | Migration Head(s) in Source Code: {source_heads}"
+    )
+
+
+def get_source_heads() -> Set:
+    """
+    Function to get the current migration head in the source code.
+
+    :return: a set of migration head(s)
+    """
     from alembic.script import ScriptDirectory
 
     config = _get_alembic_config()
     script_ = ScriptDirectory.from_config(config)
+    return set(script_.get_heads())
+
+
+def get_db_heads() -> Set:
+    """
+    Function to get the current migration head in the database.
+
+    :return: a set of migration head(s)
+    """
+    from alembic.runtime.migration import MigrationContext
+
     with settings.engine.connect() as connection:
         context = MigrationContext.configure(connection)
-        ticker = 0
-        while True:
-            source_heads = set(script_.get_heads())
-            db_heads = set(context.get_current_heads())
-            if source_heads == db_heads:
-                break
-            if ticker >= timeout:
-                raise TimeoutError(
-                    f"There are still unapplied migrations after {ticker} seconds. "
-                    f"Migration Head(s) in DB: {db_heads} | Migration Head(s) in Source Code: {source_heads}"
-                )
-            ticker += 1
-            time.sleep(1)
-            log.info('Waiting for migrations... %s second(s)', ticker)
+        return set(context.get_current_heads())
+
+
+def check_and_run_migrations():
+    """Check and run migrations if necessary. Only use in a tty"""
+    source_heads = get_source_heads()
+    db_heads = get_db_heads()
+    db_command = None
+    command_name = None
+    verb = None
+    if len(db_heads) < 1:
+        db_command = initdb
+        command_name = "init"
+        verb = "initialization"
+    elif source_heads != db_heads:
+        db_command = upgradedb
+        command_name = "upgrade"
+        verb = "upgrade"
+
+    if sys.stdout.isatty() and verb:
+        print()
+        question = f"Please confirm database {verb} (or wait 4 seconds to skip it). Are you sure?"
+        try:
+            ans = helpers.prompt_with_timeout(question, timeout=4)
+            if ans:
+                try:
+                    db_command()
+                    print(f"DB {verb} done")
+                except Exception as error:
+                    print(error)
+                    print(
+                        "You still have unapplied migrations. "
+                        "You may need to reset the database by running `airflow db reset`",
+                        file=sys.stderr,
+                    )
+                    sys.exit(1)
+        except AirflowException:
+            pass
+    elif source_heads != db_heads:
+        print(
+            f"ERROR: You need to {verb} the database. Please run `airflow db {command_name}`", file=sys.stderr

Review comment:
       Following discussion https://github.com/apache/airflow/issues/19784  and scenario where Helm migration images might be different than airlflow image, I think it would be good to add a hint about using the same version:
   
   ```suggestion
               f"ERROR: You need to {verb} the database. Please run `airflow db {command_name}`. Make sure the command is run using airflow version {version}.", file=sys.stderr
   ```
   
   (version should be imported aboive) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil merged pull request #18439: Check and run migration in commands if necessary

Posted by GitBox <gi...@apache.org>.
kaxil merged pull request #18439:
URL: https://github.com/apache/airflow/pull/18439


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #18439: Check and run migration in commands if necessary

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#issuecomment-988956834


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #18439: Check and run migration in webserver startup if necessary

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#issuecomment-927219091


   One question: Would there be race conditions if multiple `airflow` commands are launched concurrently? i.e. `./breeze start-airflow` launches a tmux session and five commands together, and both the scheduler and webserver would want to do migration at the same time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #18439: Check and run migration in webserver startup if necessary

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#discussion_r721312887



##########
File path: airflow/utils/cli.py
##########
@@ -89,6 +90,8 @@ def wrapper(*args, **kwargs):
         metrics = _build_metrics(f.__name__, args[0])
         cli_action_loggers.on_pre_execution(**metrics)
         try:
+            # Check and run migrations if necessary
+            check_and_run_migrations()

Review comment:
       Ok. Is it ok to have it in webserver, triggered, scheduler, and celery components only?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #18439: Check and run migration in webserver startup if necessary

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#issuecomment-925738714


   >There is one potential problem - the migration lock is not implemented for MsSQL.
   
   It has not been addressed in this PR -- so makes MSSQL support wonky.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #18439: Check and run migration in webserver startup if necessary

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#discussion_r721334385



##########
File path: airflow/utils/cli.py
##########
@@ -89,6 +90,8 @@ def wrapper(*args, **kwargs):
         metrics = _build_metrics(f.__name__, args[0])
         cli_action_loggers.on_pre_execution(**metrics)
         try:
+            # Check and run migrations if necessary
+            check_and_run_migrations()

Review comment:
       I tihnk really the ones that for sure should NOT have the check is the `db` commmand (and all subcommands of it). The `db` command is the one that is used to managed the database and having de-synchronized migration there is "acceptable" really. Then you can either reset/upgrade or shell to running db without worrying about the state of synchronization (because you can use those commands to fix or inspect a problem if for example your migration fails).
   
   I would also skip it for the two "installation" inspection/helper ones  (`version`, `providers`, `plugins`, `cheat-sheet`) - there is `slight` risk here - some of those commands 'could` work before  without the DB and for example we are running them to check if the "image" works fine, without actually having a DB initialized. Those never access/modify the DB.
   
   For all the rest, I think makes perfect sense to perform the check, as they simply need the DB in the **right** version to run. 
   
   On one hand we could introduce a new decorator (like `action_logging`)  but it is better to explicitly exclude the "non-DB" commands rather than add decorators for everything else. So maybe renaming `action_logging` to more generic (action_cli) decorator and adding `check_db=True` parameter and set them to false for all `db, versions, providers, plugins, cheat-sheet` commands makes more sense? I think we should rather explicitly exclude those commands than do not need db migration check than include the check by adding new decorator.  
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #18439: Check and run migration in commands if necessary

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#discussion_r756031105



##########
File path: airflow/utils/db.py
##########
@@ -626,27 +629,85 @@ def check_migrations(timeout):
     :param timeout: Timeout for the migration in seconds
     :return: None
     """
-    from alembic.runtime.migration import MigrationContext
+    for ticker in range(timeout):
+        source_heads = get_source_heads()
+        db_heads = get_db_heads()
+        if source_heads == db_heads:
+            return
+        time.sleep(1)
+        log.info('Waiting for migrations... %s second(s)', ticker)
+    raise TimeoutError(
+        f"There are still unapplied migrations after {timeout} seconds. "
+        f"Migration Head(s) in DB: {db_heads} | Migration Head(s) in Source Code: {source_heads}"
+    )
+
+
+def get_source_heads() -> Set:
+    """
+    Function to get the current migration head in the source code.
+
+    :return: a set of migration head(s)
+    """
     from alembic.script import ScriptDirectory
 
     config = _get_alembic_config()
     script_ = ScriptDirectory.from_config(config)
+    return set(script_.get_heads())
+
+
+def get_db_heads() -> Set:
+    """
+    Function to get the current migration head in the database.
+
+    :return: a set of migration head(s)
+    """
+    from alembic.runtime.migration import MigrationContext
+
     with settings.engine.connect() as connection:
         context = MigrationContext.configure(connection)
-        ticker = 0
-        while True:
-            source_heads = set(script_.get_heads())
-            db_heads = set(context.get_current_heads())
-            if source_heads == db_heads:
-                break
-            if ticker >= timeout:
-                raise TimeoutError(
-                    f"There are still unapplied migrations after {ticker} seconds. "
-                    f"Migration Head(s) in DB: {db_heads} | Migration Head(s) in Source Code: {source_heads}"
-                )
-            ticker += 1
-            time.sleep(1)
-            log.info('Waiting for migrations... %s second(s)', ticker)
+        return set(context.get_current_heads())
+
+
+def check_and_run_migrations():
+    """Check and run migrations if necessary. Only use in a tty"""
+    source_heads = get_source_heads()
+    db_heads = get_db_heads()
+    db_command = None
+    command_name = None
+    verb = None
+    if len(db_heads) < 1:
+        db_command = initdb
+        command_name = "init"
+        verb = "initialization"
+    elif source_heads != db_heads:
+        db_command = upgradedb
+        command_name = "upgrade"
+        verb = "upgrade"
+
+    if sys.stdout.isatty() and verb:
+        print()
+        question = f"Please confirm database {verb} (or wait 4 seconds to skip it). Are you sure?"
+        try:
+            ans = helpers.prompt_with_timeout(question, timeout=4)
+            if ans:
+                try:
+                    db_command()
+                    print(f"DB {verb} done")
+                except Exception as error:
+                    print(error)
+                    print(
+                        "You still have unapplied migrations. "
+                        "You may need to reset the database by running `airflow db reset`",
+                        file=sys.stderr,
+                    )
+                    sys.exit(1)
+        except AirflowException:
+            pass
+    elif source_heads != db_heads:
+        print(
+            f"ERROR: You need to {verb} the database. Please run `airflow db {command_name}`", file=sys.stderr

Review comment:
       Following discussion https://github.com/apache/airflow/issues/19784  and scenario where Helm migration images might be different than airlflow image, I think it would be good to add a hint about using the same version:
   
   ```suggestion
               f"ERROR: You need to {verb} the database. Please run `airflow db {command_name}`. Make sure the command is run using airflow version {version}.", file=sys.stderr
   ```
   
   (version should be imported above) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #18439: Check and run migration in commands if necessary

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#issuecomment-977849151


   @kaxil ? I think it would be great to add it (possibly even in 2.2.3 if the cherry-pick will be clean.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #18439: Check and run migration in webserver startup if necessary

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#discussion_r716075733



##########
File path: airflow/utils/db.py
##########
@@ -626,27 +627,74 @@ def check_migrations(timeout):
     :param timeout: Timeout for the migration in seconds
     :return: None
     """
-    from alembic.runtime.migration import MigrationContext
+    ticker = 0
+    while True:
+        source_heads = get_source_heads()
+        db_heads = get_db_heads()
+        if source_heads == db_heads:
+            break
+        if ticker >= timeout:
+            raise TimeoutError(
+                f"There are still unapplied migrations after {ticker} seconds. "
+                f"Migration Head(s) in DB: {db_heads} | Migration Head(s) in Source Code: {source_heads}"
+            )
+        ticker += 1
+        time.sleep(1)
+        log.info('Waiting for migrations... %s second(s)', ticker)
+
+
+def get_source_heads():
+    """
+    Function to get the current migration head in the source code.
+
+    :return: List of migration head(s)
+    """
     from alembic.script import ScriptDirectory
 
     config = _get_alembic_config()
     script_ = ScriptDirectory.from_config(config)
+    return set(script_.get_heads())
+
+
+def get_db_heads():
+    """
+    Function to get the current migration head in the database.
+
+    :return: List of migration head(s)
+    """
+    from alembic.runtime.migration import MigrationContext
+
     with settings.engine.connect() as connection:
         context = MigrationContext.configure(connection)
-        ticker = 0
-        while True:
-            source_heads = set(script_.get_heads())
-            db_heads = set(context.get_current_heads())
-            if source_heads == db_heads:
-                break
-            if ticker >= timeout:
-                raise TimeoutError(
-                    f"There are still unapplied migrations after {ticker} seconds. "
-                    f"Migration Head(s) in DB: {db_heads} | Migration Head(s) in Source Code: {source_heads}"
-                )
-            ticker += 1
-            time.sleep(1)
-            log.info('Waiting for migrations... %s second(s)', ticker)
+        return set(context.get_current_heads())
+
+
+def check_and_run_migrations(source_heads, db_heads):
+    """Check and run migrations if necessary. Only use in a tty"""
+    from rich import print as rich_print
+
+    source_heads = get_source_heads()
+    db_heads = get_db_heads()
+    if len(db_heads) < 0:
+        if sys.stdout.isatty():
+            # initialize database
+            print("DB: " + repr(settings.engine.url))
+            initdb()
+            print("DB Initialization done")
+        else:
+            rich_print("[red][bold]ERROR:[/bold] You need to initialize the database")

Review comment:
       Both I think - there is a chance that running Airflow on non-migrated DB might cause some data corruption - I think this is perfectly fine to exit immediately if we have wrong DB state (even with sys.exit() rather than throwing exceptions). 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #18439: Check and run migration in webserver startup if necessary

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#discussion_r716051299



##########
File path: airflow/utils/db.py
##########
@@ -626,27 +627,74 @@ def check_migrations(timeout):
     :param timeout: Timeout for the migration in seconds
     :return: None
     """
-    from alembic.runtime.migration import MigrationContext
+    ticker = 0
+    while True:
+        source_heads = get_source_heads()
+        db_heads = get_db_heads()
+        if source_heads == db_heads:
+            break
+        if ticker >= timeout:
+            raise TimeoutError(
+                f"There are still unapplied migrations after {ticker} seconds. "
+                f"Migration Head(s) in DB: {db_heads} | Migration Head(s) in Source Code: {source_heads}"
+            )
+        ticker += 1
+        time.sleep(1)
+        log.info('Waiting for migrations... %s second(s)', ticker)
+
+
+def get_source_heads():
+    """
+    Function to get the current migration head in the source code.
+
+    :return: List of migration head(s)
+    """
     from alembic.script import ScriptDirectory
 
     config = _get_alembic_config()
     script_ = ScriptDirectory.from_config(config)
+    return set(script_.get_heads())
+
+
+def get_db_heads():
+    """
+    Function to get the current migration head in the database.
+
+    :return: List of migration head(s)
+    """
+    from alembic.runtime.migration import MigrationContext
+
     with settings.engine.connect() as connection:
         context = MigrationContext.configure(connection)
-        ticker = 0
-        while True:
-            source_heads = set(script_.get_heads())
-            db_heads = set(context.get_current_heads())
-            if source_heads == db_heads:
-                break
-            if ticker >= timeout:
-                raise TimeoutError(
-                    f"There are still unapplied migrations after {ticker} seconds. "
-                    f"Migration Head(s) in DB: {db_heads} | Migration Head(s) in Source Code: {source_heads}"
-                )
-            ticker += 1
-            time.sleep(1)
-            log.info('Waiting for migrations... %s second(s)', ticker)
+        return set(context.get_current_heads())
+
+
+def check_and_run_migrations(source_heads, db_heads):
+    """Check and run migrations if necessary. Only use in a tty"""
+    from rich import print as rich_print
+
+    source_heads = get_source_heads()
+    db_heads = get_db_heads()
+    if len(db_heads) < 0:
+        if sys.stdout.isatty():
+            # initialize database
+            print("DB: " + repr(settings.engine.url))
+            initdb()
+            print("DB Initialization done")
+        else:
+            rich_print("[red][bold]ERROR:[/bold] You need to initialize the database")

Review comment:
       I think printing error is better than erroring out? 
   
   Still yet to add input asking the user whether to run the migration, will add it later




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #18439: Check and run migration in commands if necessary

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#discussion_r765079267



##########
File path: airflow/cli/commands/celery_command.py
##########
@@ -37,7 +37,7 @@
 WORKER_PROCESS_NAME = "worker"
 
 
-@cli_utils.action_logging
+@cli_utils.action_cli()

Review comment:
       FWIW I personally like to do this because I hate the _first argument is the function unless you need a flag when the decorator becomes a function_ complexity, and perfer to always just implement the callable-returning-decorator case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb edited a comment on pull request #18439: Check and run migration in webserver startup if necessary

Posted by GitBox <gi...@apache.org>.
ashb edited a comment on pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#issuecomment-925826422


   I feel uneasy about automatically running migrations for non-automated approaches.
   
   What I propse we do is:
   
   - When connected to a TTY: prompt the user if they want to run migrations if there are any outstanding
   
     > There are outstanding db migrations. Do you wish to run them now N/y?
   
   - When not connected to a TTY (i.e. a daemon or non-attached container for instance) just error.
   
   That feels much safer than automatically running a potential long migration when the user isn't expecting -- doubly so in the case of MySQL where DDL etc is non transactional so the process can't even be safely interrupted!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #18439: Check and run migration in webserver startup if necessary

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#issuecomment-925262312


   Hi @potiuk, instead of failing the command(https://github.com/apache/airflow/issues/18323) I'm proposing to check and run the migration in webserver but I'm not sure if it's the right approach.
   
   I know that the helm chart runs a separate container for migration and would update it if this approach is accepted.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #18439: Check and run migration in webserver startup if necessary

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#issuecomment-927796920


   > > One question: Would there be race conditions if multiple `airflow` commands are launched concurrently? i.e. `./breeze start-airflow` launches a tmux session and five commands together, and both the scheduler and webserver would want to do migration at the same time.
   > 
   > We have now exclusive locks for migration - in most backends (MsSQL needs to be re-added as it has been removed - but it should be possible to bring it back after I discovered where the lock contention came from - I will do it shortly after this one is merged) .
   > 
   > But I think it's a good point - the lock should be put around the whole check with this change rather than inside the upgradedb command.
   > 
   > ```
   >     with create_global_lock(session=session, pg_lock_id=2, lock_name="upgrade"):
   >         log.info("Creating tables")
   >         command.upgrade(config, 'heads')
   >     add_default_pool_if_not_exists()
   > ```
   
   Hi @potiuk , I'm a bit confused about what to do here. Please explain more


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #18439: Check and run migration in commands if necessary

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#discussion_r764880713



##########
File path: airflow/cli/commands/celery_command.py
##########
@@ -37,7 +37,7 @@
 WORKER_PROCESS_NAME = "worker"
 
 
-@cli_utils.action_logging
+@cli_utils.action_cli()

Review comment:
       Is there a reason why we've got the empty parens on all these?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy edited a comment on pull request #18439: Check and run migration in commands if necessary

Posted by GitBox <gi...@apache.org>.
ephraimbuddy edited a comment on pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#issuecomment-987643394


   Because we use init containers to run the migration, this change doesn't get hit when using the airflow chart. Only in a chart without a migration container will it get hit. 
   
   If we don't raise TimeoutError in the init containers, then this change is hit. However, it just stops for a few minutes to show the error message and the component will continue running.
   
   ~I think the migration container should not exit on error, we should have a way to restart it when it fails.~
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #18439: Check and run migration in commands if necessary

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#discussion_r756031105



##########
File path: airflow/utils/db.py
##########
@@ -626,27 +629,85 @@ def check_migrations(timeout):
     :param timeout: Timeout for the migration in seconds
     :return: None
     """
-    from alembic.runtime.migration import MigrationContext
+    for ticker in range(timeout):
+        source_heads = get_source_heads()
+        db_heads = get_db_heads()
+        if source_heads == db_heads:
+            return
+        time.sleep(1)
+        log.info('Waiting for migrations... %s second(s)', ticker)
+    raise TimeoutError(
+        f"There are still unapplied migrations after {timeout} seconds. "
+        f"Migration Head(s) in DB: {db_heads} | Migration Head(s) in Source Code: {source_heads}"
+    )
+
+
+def get_source_heads() -> Set:
+    """
+    Function to get the current migration head in the source code.
+
+    :return: a set of migration head(s)
+    """
     from alembic.script import ScriptDirectory
 
     config = _get_alembic_config()
     script_ = ScriptDirectory.from_config(config)
+    return set(script_.get_heads())
+
+
+def get_db_heads() -> Set:
+    """
+    Function to get the current migration head in the database.
+
+    :return: a set of migration head(s)
+    """
+    from alembic.runtime.migration import MigrationContext
+
     with settings.engine.connect() as connection:
         context = MigrationContext.configure(connection)
-        ticker = 0
-        while True:
-            source_heads = set(script_.get_heads())
-            db_heads = set(context.get_current_heads())
-            if source_heads == db_heads:
-                break
-            if ticker >= timeout:
-                raise TimeoutError(
-                    f"There are still unapplied migrations after {ticker} seconds. "
-                    f"Migration Head(s) in DB: {db_heads} | Migration Head(s) in Source Code: {source_heads}"
-                )
-            ticker += 1
-            time.sleep(1)
-            log.info('Waiting for migrations... %s second(s)', ticker)
+        return set(context.get_current_heads())
+
+
+def check_and_run_migrations():
+    """Check and run migrations if necessary. Only use in a tty"""
+    source_heads = get_source_heads()
+    db_heads = get_db_heads()
+    db_command = None
+    command_name = None
+    verb = None
+    if len(db_heads) < 1:
+        db_command = initdb
+        command_name = "init"
+        verb = "initialization"
+    elif source_heads != db_heads:
+        db_command = upgradedb
+        command_name = "upgrade"
+        verb = "upgrade"
+
+    if sys.stdout.isatty() and verb:
+        print()
+        question = f"Please confirm database {verb} (or wait 4 seconds to skip it). Are you sure?"
+        try:
+            ans = helpers.prompt_with_timeout(question, timeout=4)
+            if ans:
+                try:
+                    db_command()
+                    print(f"DB {verb} done")
+                except Exception as error:
+                    print(error)
+                    print(
+                        "You still have unapplied migrations. "
+                        "You may need to reset the database by running `airflow db reset`",
+                        file=sys.stderr,
+                    )
+                    sys.exit(1)
+        except AirflowException:
+            pass
+    elif source_heads != db_heads:
+        print(
+            f"ERROR: You need to {verb} the database. Please run `airflow db {command_name}`", file=sys.stderr

Review comment:
       Following discussion in https://github.com/apache/airflow/issues/19784  and scenario where Helm migration images might be different than airlflow image, I think it would be good to add a hint about using the same airflow version:
   
   ```suggestion
               f"ERROR: You need to {verb} the database. Please run `airflow db {command_name}`. Make sure the command is run using airflow version {version}.", file=sys.stderr
   ```
   
   (version should be imported above) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #18439: Check and run migration in commands if necessary

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#issuecomment-977897019


   Can you rebase on the `main` branch please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #18439: Check and run migration in commands if necessary

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#issuecomment-940456387


   > @kaxil Can you take another look at this one?
   
   Yeah. looks like ready-to-merge.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #18439: Check and run migration in commands if necessary

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#discussion_r723779104



##########
File path: airflow/utils/db.py
##########
@@ -626,27 +629,88 @@ def check_migrations(timeout):
     :param timeout: Timeout for the migration in seconds
     :return: None
     """
-    from alembic.runtime.migration import MigrationContext
+    ticker = 0
+    while True:
+        source_heads = get_source_heads()
+        db_heads = get_db_heads()
+        if source_heads == db_heads:
+            break
+        if ticker >= timeout:
+            raise TimeoutError(
+                f"There are still unapplied migrations after {ticker} seconds. "
+                f"Migration Head(s) in DB: {db_heads} | Migration Head(s) in Source Code: {source_heads}"
+            )
+        ticker += 1
+        time.sleep(1)
+        log.info('Waiting for migrations... %s second(s)', ticker)

Review comment:
       Maybe?
   
   ```suggestion
       for ticker in range(timeout):
           source_heads = get_source_heads()
           db_heads = get_db_heads()
           if source_heads == db_heads:
               return
           time.sleep(1)
           log.info('Waiting for migrations... %s second(s)', ticker)
       raise TimeoutError(
           f"There are still unapplied migrations after {timeout} seconds. "
           f"Migration Head(s) in DB: {db_heads} | Migration Head(s) in Source Code: {source_heads}"
       )
   ```
   
   (This emits one extra *Waiting for migrations* line; we can eliminate that with an additional `if` check but I don't feel it's worthwhile.)

##########
File path: airflow/utils/db.py
##########
@@ -626,27 +629,88 @@ def check_migrations(timeout):
     :param timeout: Timeout for the migration in seconds
     :return: None
     """
-    from alembic.runtime.migration import MigrationContext
+    ticker = 0
+    while True:
+        source_heads = get_source_heads()
+        db_heads = get_db_heads()
+        if source_heads == db_heads:
+            break
+        if ticker >= timeout:
+            raise TimeoutError(
+                f"There are still unapplied migrations after {ticker} seconds. "
+                f"Migration Head(s) in DB: {db_heads} | Migration Head(s) in Source Code: {source_heads}"
+            )
+        ticker += 1
+        time.sleep(1)
+        log.info('Waiting for migrations... %s second(s)', ticker)
+
+
+def get_source_heads():
+    """
+    Function to get the current migration head in the source code.
+
+    :return: List of migration head(s)

Review comment:
       But the return value is actually a set? :p




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #18439: Check and run migration in webserver startup if necessary

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#issuecomment-925330509


   If we do this, a nob to override the behaviour would be even more necessary. Also, the logic needs to be able to handle cases where the current db state is *in the future*, i.e. say I migrate to latest, `git checkout 2.1.0`, and run `./breeze start-airflow`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #18439: Check and run migration in webserver startup if necessary

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#issuecomment-925330747


   I like this, however, I'd rather do it in scheduler. 
   
   I think webserver is not the onlu/exclusive one that should get it. While webserver won't likely break anything even if it is run with some old  db version (but scheduler can ) .  But since we have locks for upgrades, we could run the upgrade for ALL components that need DB and get rid of the "wait for migration"  special init container.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #18439: Check and run migration in webserver startup if necessary

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#issuecomment-925805769


   > Please wait on this change. This is not needed for 2.2.0 and is a significant change on how migrations are handled. Let's think it through.
   
   Absolutely!
   
   > I would also to love to hear more thoughts and opinions
   
   Me too :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #18439: Check and run migration in webserver startup if necessary

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#discussion_r716502072



##########
File path: airflow/utils/db.py
##########
@@ -626,27 +629,102 @@ def check_migrations(timeout):
     :param timeout: Timeout for the migration in seconds
     :return: None
     """
-    from alembic.runtime.migration import MigrationContext
+    ticker = 0
+    while True:
+        source_heads = get_source_heads()
+        db_heads = get_db_heads()
+        if source_heads == db_heads:
+            break
+        if ticker >= timeout:
+            raise TimeoutError(
+                f"There are still unapplied migrations after {ticker} seconds. "
+                f"Migration Head(s) in DB: {db_heads} | Migration Head(s) in Source Code: {source_heads}"
+            )
+        ticker += 1
+        time.sleep(1)
+        log.info('Waiting for migrations... %s second(s)', ticker)
+
+
+def get_source_heads():
+    """
+    Function to get the current migration head in the source code.
+
+    :return: List of migration head(s)
+    """
     from alembic.script import ScriptDirectory
 
     config = _get_alembic_config()
     script_ = ScriptDirectory.from_config(config)
+    return set(script_.get_heads())
+
+
+def get_db_heads():
+    """
+    Function to get the current migration head in the database.
+
+    :return: List of migration head(s)
+    """
+    from alembic.runtime.migration import MigrationContext
+
     with settings.engine.connect() as connection:
         context = MigrationContext.configure(connection)
-        ticker = 0
-        while True:
-            source_heads = set(script_.get_heads())
-            db_heads = set(context.get_current_heads())
-            if source_heads == db_heads:
-                break
-            if ticker >= timeout:
-                raise TimeoutError(
-                    f"There are still unapplied migrations after {ticker} seconds. "
-                    f"Migration Head(s) in DB: {db_heads} | Migration Head(s) in Source Code: {source_heads}"
-                )
-            ticker += 1
-            time.sleep(1)
-            log.info('Waiting for migrations... %s second(s)', ticker)
+        return set(context.get_current_heads())
+
+
+def check_and_run_migrations():
+    """Check and run migrations if necessary. Only use in a tty"""
+    from rich import print as rich_print
+
+    source_heads = get_source_heads()
+    db_heads = get_db_heads()
+    if len(db_heads) < 1:
+        if sys.stdout.isatty():
+            print()
+            question = (
+                "Please confirm database initialization " "(or wait 4 seconds to skip it). Are you sure?"
+            )
+            try:
+                ans = helpers.prompt_with_timeout(question, timeout=4)
+                if ans:
+                    # initialize database
+                    print("DB: " + repr(settings.engine.url))
+                    initdb()
+                    print("DB Initialization done")
+            except AirflowException:
+                pass
+        else:
+            rich_print(
+                "[red][bold]ERROR:[/bold] You need to initialize the database. "
+                "Run `airflow db init` to initialize the database"
+            )
+            sys.exit(1)
+    elif source_heads != db_heads:
+        if sys.stdout.isatty():
+            print()
+            question = "Please confirm database upgrade" " (or wait 4 seconds to skip it). Are you sure?"
+            try:
+                ans = helpers.prompt_with_timeout(question, timeout=4)
+                if ans:
+                    # upgrade database
+                    try:
+                        print("DB: " + repr(settings.engine.url))
+                        upgradedb()
+                        print("DB Upgrade done")
+                    except Exception as e:
+                        print(e)
+                        rich_print(
+                            "[red][bold]ERROR:[/bold] You still have unapplied migrations. "
+                            "You may need to reset the database by running `airflow db reset`"
+                        )
+                        sys.exit(1)
+            except AirflowException:
+                pass
+        else:
+            rich_print(
+                "[red][bold]ERROR:[/bold] You need to upgrade the database. "
+                "Run `airflow db upgrade` to upgrade"
+            )
+            sys.exit(1)

Review comment:
       We can reduce the duplication in these blocks a lot by doing something like this
   
   ```suggestion
   def check_and_run_migrations():
       """Check and run migrations if necessary. Only use in a tty"""
       from rich import print as rich_print
   
       source_heads = get_source_heads()
       db_heads = get_db_heads()
       if len(db_heads) < 1:
           db_command = initdb
           command_name = "init"
           verb = "initialize"
       else:
           db_command = upgradedb
           command_name = "upgrade"
           verb = "upgrade"
           
       if sys.stdout.isatty():
           print()
           question = (
               f"Please confirm database {verb} (or wait 4 seconds to skip it). Are you sure?"
           )
           ans = helpers.prompt_with_timeout(question, timeout=4)
           if ans:
               # initialize database
               print("DB: " + repr(settings.engine.url))
               db_command()
               print(f"DB {verb} done")
       else:
           rich_print(
               f"[red][bold]ERROR:[/bold] You need to {verb} the database. Please run `airflow db {command_name}`"
           )
           sys.exit(1)
   
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #18439: Check and run migration in webserver startup if necessary

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#issuecomment-925375399


   The more I think about it, the more I like the idea of always running the migration at start of everything. I think for quite a while we neglected the fact, the db structure can get desynchronized from code, but I cannot imagine ANY situation including the dev where DB should be in a different version than latest airflow migration. There is not a single case I can think of that this might be better than allowing for desynchronisation. This is "db as a code" to the fullest.
   
   The only edge case is of course when you are working on a new migration (which is extremely rare actually) - but even there eve if you try to implement and run/rerun and fix the migration you would rather just test the migrations not the airflow and if you wan to iterate on it you have to play with alembic command-line rather than with airflow worker/scheduler/webserver. And even there, it's extremely easy to add ``--skip-db-migrations`` to all the components of airflow.
   
   There is one potential problem - the migration lock is not implemented for MsSQL.  
   
   But assuming that is done, is there any reason we do NOT want to run migration for all components when they start ? Is there any particular reason we would not want to run the migrations? Any risks involved in doing so ? I thought hard about it and I cannot think a single reason why we would not want to do it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #18439: Check and run migration in commands if necessary

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#issuecomment-980206513


   > I'm currently hitting this issue #15340
   
   ```
   TimeoutError: There are still unapplied migrations after 60 seconds.
   ```
   
   just says that migrations didn't run ! Now we need to find out why it didn't run


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #18439: Check and run migration in commands if necessary

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#issuecomment-940454608


   @kaxil Can you take another look at this one?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #18439: Check and run migration in commands if necessary

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#issuecomment-987643394


   Because we use init containers to run the migration, this change doesn't get hit when using the airflow chart. Only in a chart without a migration container will it get hit. 
   
   If we don't raise TimeoutError in the init containers, then this change is hit. However, it just stops for a few minutes to show the error message and the component will continue running.
   
   I think the migration container should not exit on error, we should have a way to restart it when it fails.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #18439: Check and run migration in commands if necessary

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#discussion_r756031105



##########
File path: airflow/utils/db.py
##########
@@ -626,27 +629,85 @@ def check_migrations(timeout):
     :param timeout: Timeout for the migration in seconds
     :return: None
     """
-    from alembic.runtime.migration import MigrationContext
+    for ticker in range(timeout):
+        source_heads = get_source_heads()
+        db_heads = get_db_heads()
+        if source_heads == db_heads:
+            return
+        time.sleep(1)
+        log.info('Waiting for migrations... %s second(s)', ticker)
+    raise TimeoutError(
+        f"There are still unapplied migrations after {timeout} seconds. "
+        f"Migration Head(s) in DB: {db_heads} | Migration Head(s) in Source Code: {source_heads}"
+    )
+
+
+def get_source_heads() -> Set:
+    """
+    Function to get the current migration head in the source code.
+
+    :return: a set of migration head(s)
+    """
     from alembic.script import ScriptDirectory
 
     config = _get_alembic_config()
     script_ = ScriptDirectory.from_config(config)
+    return set(script_.get_heads())
+
+
+def get_db_heads() -> Set:
+    """
+    Function to get the current migration head in the database.
+
+    :return: a set of migration head(s)
+    """
+    from alembic.runtime.migration import MigrationContext
+
     with settings.engine.connect() as connection:
         context = MigrationContext.configure(connection)
-        ticker = 0
-        while True:
-            source_heads = set(script_.get_heads())
-            db_heads = set(context.get_current_heads())
-            if source_heads == db_heads:
-                break
-            if ticker >= timeout:
-                raise TimeoutError(
-                    f"There are still unapplied migrations after {ticker} seconds. "
-                    f"Migration Head(s) in DB: {db_heads} | Migration Head(s) in Source Code: {source_heads}"
-                )
-            ticker += 1
-            time.sleep(1)
-            log.info('Waiting for migrations... %s second(s)', ticker)
+        return set(context.get_current_heads())
+
+
+def check_and_run_migrations():
+    """Check and run migrations if necessary. Only use in a tty"""
+    source_heads = get_source_heads()
+    db_heads = get_db_heads()
+    db_command = None
+    command_name = None
+    verb = None
+    if len(db_heads) < 1:
+        db_command = initdb
+        command_name = "init"
+        verb = "initialization"
+    elif source_heads != db_heads:
+        db_command = upgradedb
+        command_name = "upgrade"
+        verb = "upgrade"
+
+    if sys.stdout.isatty() and verb:
+        print()
+        question = f"Please confirm database {verb} (or wait 4 seconds to skip it). Are you sure?"
+        try:
+            ans = helpers.prompt_with_timeout(question, timeout=4)
+            if ans:
+                try:
+                    db_command()
+                    print(f"DB {verb} done")
+                except Exception as error:
+                    print(error)
+                    print(
+                        "You still have unapplied migrations. "
+                        "You may need to reset the database by running `airflow db reset`",
+                        file=sys.stderr,
+                    )
+                    sys.exit(1)
+        except AirflowException:
+            pass
+    elif source_heads != db_heads:
+        print(
+            f"ERROR: You need to {verb} the database. Please run `airflow db {command_name}`", file=sys.stderr

Review comment:
       Following discussion in https://github.com/apache/airflow/issues/19784  and scenario where Helm migration images might be different than airlflow image, I think it would be good to add a hint about using the same version:
   
   ```suggestion
               f"ERROR: You need to {verb} the database. Please run `airflow db {command_name}`. Make sure the command is run using airflow version {version}.", file=sys.stderr
   ```
   
   (version should be imported above) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #18439: Check and run migration in webserver startup if necessary

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#discussion_r716076101



##########
File path: airflow/utils/db.py
##########
@@ -626,27 +627,74 @@ def check_migrations(timeout):
     :param timeout: Timeout for the migration in seconds
     :return: None
     """
-    from alembic.runtime.migration import MigrationContext
+    ticker = 0
+    while True:
+        source_heads = get_source_heads()
+        db_heads = get_db_heads()
+        if source_heads == db_heads:
+            break
+        if ticker >= timeout:
+            raise TimeoutError(
+                f"There are still unapplied migrations after {ticker} seconds. "
+                f"Migration Head(s) in DB: {db_heads} | Migration Head(s) in Source Code: {source_heads}"
+            )
+        ticker += 1
+        time.sleep(1)
+        log.info('Waiting for migrations... %s second(s)', ticker)
+
+
+def get_source_heads():
+    """
+    Function to get the current migration head in the source code.
+
+    :return: List of migration head(s)
+    """
     from alembic.script import ScriptDirectory
 
     config = _get_alembic_config()
     script_ = ScriptDirectory.from_config(config)
+    return set(script_.get_heads())
+
+
+def get_db_heads():
+    """
+    Function to get the current migration head in the database.
+
+    :return: List of migration head(s)
+    """
+    from alembic.runtime.migration import MigrationContext
+
     with settings.engine.connect() as connection:
         context = MigrationContext.configure(connection)
-        ticker = 0
-        while True:
-            source_heads = set(script_.get_heads())
-            db_heads = set(context.get_current_heads())
-            if source_heads == db_heads:
-                break
-            if ticker >= timeout:
-                raise TimeoutError(
-                    f"There are still unapplied migrations after {ticker} seconds. "
-                    f"Migration Head(s) in DB: {db_heads} | Migration Head(s) in Source Code: {source_heads}"
-                )
-            ticker += 1
-            time.sleep(1)
-            log.info('Waiting for migrations... %s second(s)', ticker)
+        return set(context.get_current_heads())
+
+
+def check_and_run_migrations(source_heads, db_heads):
+    """Check and run migrations if necessary. Only use in a tty"""
+    from rich import print as rich_print
+
+    source_heads = get_source_heads()
+    db_heads = get_db_heads()
+    if len(db_heads) < 0:
+        if sys.stdout.isatty():
+            # initialize database
+            print("DB: " + repr(settings.engine.url))
+            initdb()
+            print("DB Initialization done")
+        else:
+            rich_print("[red][bold]ERROR:[/bold] You need to initialize the database")

Review comment:
       I am a long-time fan of hard-exit if we find some serious inconsistency. This not only prevents data corruption but is also much cleaner and communicates the problem better - because you immediately see what the problem is (the last log entry). Also you prevent cases like endless loops. Those are the worst kind of problems where you think your program is running, but it is just hanging. When in doubt if you are in a consistent state - exit immediately. This is the only way to make sure it WILL be noticed and acted on.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #18439: Check and run migration in webserver startup if necessary

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#issuecomment-927115905


   > This is about a 100th time (yeah I am exaggerating a bit) of similar problem I see over and over: #18513
   > 
   > We **MUST** add it - and fast.
   
   Yeah. I’m working on making it error out and also prompt to run migration with timeout😊


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #18439: Check and run migration in commands if necessary

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#discussion_r723779104



##########
File path: airflow/utils/db.py
##########
@@ -626,27 +629,88 @@ def check_migrations(timeout):
     :param timeout: Timeout for the migration in seconds
     :return: None
     """
-    from alembic.runtime.migration import MigrationContext
+    ticker = 0
+    while True:
+        source_heads = get_source_heads()
+        db_heads = get_db_heads()
+        if source_heads == db_heads:
+            break
+        if ticker >= timeout:
+            raise TimeoutError(
+                f"There are still unapplied migrations after {ticker} seconds. "
+                f"Migration Head(s) in DB: {db_heads} | Migration Head(s) in Source Code: {source_heads}"
+            )
+        ticker += 1
+        time.sleep(1)
+        log.info('Waiting for migrations... %s second(s)', ticker)

Review comment:
       Maybe?
   
   ```suggestion
       for ticker in range(timeout):
           source_heads = get_source_heads()
           db_heads = get_db_heads()
           if source_heads == db_heads:
               return
           time.sleep(1)
           log.info('Waiting for migrations... %s second(s)', ticker)
       raise TimeoutError(
           f"There are still unapplied migrations after {timeout} seconds. "
           f"Migration Head(s) in DB: {db_heads} | Migration Head(s) in Source Code: {source_heads}"
       )
   ```
   
   (This emits one extra *Waiting for migrations* line; we can eliminate that with an additional `if` check but I don't feel it's worthwhile.)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #18439: Check and run migration in webserver startup if necessary

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#issuecomment-925826422


   I feel uneasy about automatically running migrations for non-automated approaches.
   
   What I propse we do is:
   
   - When connected to a TTY: prompt the user if they want to run migrations if there are any outstanding
   - When not connected to a TTY (i.e. a daemon or non-attached container for instance) just error.
   
   That feels much safer than automatically running a potential long migration when the user isn't expecting -- doubly so in the case of MySQL where DDL etc is non transactional so the process can't even be safely interrupted!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #18439: Check and run migration in webserver startup if necessary

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#discussion_r716616556



##########
File path: airflow/utils/db.py
##########
@@ -626,27 +629,89 @@ def check_migrations(timeout):
     :param timeout: Timeout for the migration in seconds
     :return: None
     """
-    from alembic.runtime.migration import MigrationContext
+    ticker = 0
+    while True:
+        source_heads = get_source_heads()
+        db_heads = get_db_heads()
+        if source_heads == db_heads:
+            break
+        if ticker >= timeout:
+            raise TimeoutError(
+                f"There are still unapplied migrations after {ticker} seconds. "
+                f"Migration Head(s) in DB: {db_heads} | Migration Head(s) in Source Code: {source_heads}"
+            )
+        ticker += 1
+        time.sleep(1)
+        log.info('Waiting for migrations... %s second(s)', ticker)
+
+
+def get_source_heads():
+    """
+    Function to get the current migration head in the source code.
+
+    :return: List of migration head(s)
+    """
     from alembic.script import ScriptDirectory
 
     config = _get_alembic_config()
     script_ = ScriptDirectory.from_config(config)
+    return set(script_.get_heads())
+
+
+def get_db_heads():
+    """
+    Function to get the current migration head in the database.
+
+    :return: List of migration head(s)
+    """
+    from alembic.runtime.migration import MigrationContext
+
     with settings.engine.connect() as connection:
         context = MigrationContext.configure(connection)
-        ticker = 0
-        while True:
-            source_heads = set(script_.get_heads())
-            db_heads = set(context.get_current_heads())
-            if source_heads == db_heads:
-                break
-            if ticker >= timeout:
-                raise TimeoutError(
-                    f"There are still unapplied migrations after {ticker} seconds. "
-                    f"Migration Head(s) in DB: {db_heads} | Migration Head(s) in Source Code: {source_heads}"
-                )
-            ticker += 1
-            time.sleep(1)
-            log.info('Waiting for migrations... %s second(s)', ticker)
+        return set(context.get_current_heads())
+
+
+def check_and_run_migrations():
+    """Check and run migrations if necessary. Only use in a tty"""
+    source_heads = get_source_heads()
+    db_heads = get_db_heads()
+    db_command = None
+    command_name = None
+    verb = None
+    if len(db_heads) < 1:
+        db_command = initdb
+        command_name = "init"
+        verb = "initialization"
+    elif source_heads != db_heads:
+        print("UPGRADE")

Review comment:
       ```suggestion
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil edited a comment on pull request #18439: Check and run migration in commands if necessary

Posted by GitBox <gi...@apache.org>.
kaxil edited a comment on pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#issuecomment-977897019


   Can you rebase on the `main` branch please but has to be in 2.3.0


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil edited a comment on pull request #18439: Check and run migration in commands if necessary

Posted by GitBox <gi...@apache.org>.
kaxil edited a comment on pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#issuecomment-986979416


   > Hey @kaxil - have your worries been addressed?
   
   Hey, yeah I & Ephraim are working closely on this one -- some of the PRs (#20018 & #20069) lately were to unblock this PR :) 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #18439: Check and run migration in commands if necessary

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#issuecomment-987859666


   Cool


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #18439: Check and run migration in commands if necessary

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#discussion_r764889409



##########
File path: airflow/utils/cli.py
##########
@@ -53,51 +54,57 @@ def _check_cli_args(args):
         )
 
 
-def action_logging(f: T) -> T:
-    """
-    Decorates function to execute function at the same time submitting action_logging
-    but in CLI context. It will call action logger callbacks twice,
-    one for pre-execution and the other one for post-execution.
-
-    Action logger will be called with below keyword parameters:
-        sub_command : name of sub-command
-        start_datetime : start datetime instance by utc
-        end_datetime : end datetime instance by utc
-        full_command : full command line arguments
-        user : current user
-        log : airflow.models.log.Log ORM instance
-        dag_id : dag id (optional)
-        task_id : task_id (optional)
-        execution_date : execution date (optional)
-        error : exception instance if there's an exception
-
-    :param f: function instance
-    :return: wrapped function
-    """
-
-    @functools.wraps(f)
-    def wrapper(*args, **kwargs):
+def action_cli(check_db=True):

Review comment:
       Nice!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk edited a comment on pull request #18439: Check and run migration in webserver startup if necessary

Posted by GitBox <gi...@apache.org>.
potiuk edited a comment on pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#issuecomment-925375399


   The more I think about it, the more I like the idea of always running the migration at start of everything. I think for quite a while we neglected the fact, the db structure can get desynchronized from code, but I cannot imagine ANY situation including the dev where DB should be in a different version than latest airflow migration in the code. There is not a single case I can think of that this might be better than allowing for desynchronisation. This is "db as a code" to the fullest.
   
   The only edge case is of course when you are working on a new migration (which is extremely rare actually) - but even there eve if you try to implement and run/rerun and fix the migration you would rather just test the migrations not the airflow and if you wan to iterate on it you have to play with alembic command-line rather than with airflow worker/scheduler/webserver. And even there, it's extremely easy to add ``--skip-db-migrations`` to all the components of airflow.
   
   There is one potential problem - the migration lock is not implemented for MsSQL.  
   
   But assuming that is done, is there any reason we do NOT want to run migration for all components when they start ? Is there any particular reason we would not want to run the migrations? Any risks involved in doing so ? I thought hard about it and I cannot think a single reason why we would not want to do it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk closed pull request #18439: Check and run migration in commands if necessary

Posted by GitBox <gi...@apache.org>.
potiuk closed pull request #18439:
URL: https://github.com/apache/airflow/pull/18439


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #18439: Check and run migration in webserver startup if necessary

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#issuecomment-927114807


   This is about a 100th  (yeah I am exaggerating a bit) of similar problem I see over and over: https://github.com/apache/airflow/issues/18513
   
   We **MUST** add it fast.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #18439: Check and run migration in commands if necessary

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#issuecomment-934610635


   Reopened to re-build


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #18439: Check and run migration in webserver startup if necessary

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#discussion_r721334385



##########
File path: airflow/utils/cli.py
##########
@@ -89,6 +90,8 @@ def wrapper(*args, **kwargs):
         metrics = _build_metrics(f.__name__, args[0])
         cli_action_loggers.on_pre_execution(**metrics)
         try:
+            # Check and run migrations if necessary
+            check_and_run_migrations()

Review comment:
       I tihnk really the ones that for sure should NOT have the check is the `db` commmand (and all subcommands of it). The `db` command is the one that is used to managed the database and having de-synchronized migration there is "acceptable" really. Then you can either reset/upgrade or shell to running db without worrying about the state of synchronization (because you can use those commands to fix or inspect a problem if for example your migration fails).
   
   I would also skip it for the two "installation" inspection/helper ones  (`version`, `providers`, `plugins`, `cheat-sheet`) - there is `slight` risk here - some of those commands 'could` work before  without the DB and for example we are running them to check if the "image" works fine, without actually having a DB initialized. Those never access/modify the DB.
   
   For all the rest, I think makes perfect sense to perform the check, as they simply need the DB in the **right** version to run. 
   
   On one hand we could introduce a new decorator (like `action_logging`)  but it is better to explicitly exclude the "non-DB" commands rather than add decorators for everything else. So maybe renaming `action_logging` to more generic (action_cli) decorator and adding `check_db=True` parameter and set them to false for all `db, versions, providers, plugins, cheat-sheet` commands makes more sense? I think we should rather explicitly exclude those commands than run db migration check than include the check by adding new decorator.  
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #18439: Check and run migration in webserver startup if necessary

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#discussion_r721344790



##########
File path: airflow/utils/cli.py
##########
@@ -89,6 +90,8 @@ def wrapper(*args, **kwargs):
         metrics = _build_metrics(f.__name__, args[0])
         cli_action_loggers.on_pre_execution(**metrics)
         try:
+            # Check and run migrations if necessary
+            check_and_run_migrations()

Review comment:
       Makes perfect sense! Thank you!!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy edited a comment on pull request #18439: Check and run migration in webserver startup if necessary

Posted by GitBox <gi...@apache.org>.
ephraimbuddy edited a comment on pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#issuecomment-933010322


   > I guess with this approach we can add it to all components already, not only to webserver ?
   
   I have added the check to run in all commands. Let me know if I shouldn't do that 🙂


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #18439: Check and run migration in commands if necessary

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#discussion_r724683211



##########
File path: airflow/utils/db.py
##########
@@ -626,27 +629,85 @@ def check_migrations(timeout):
     :param timeout: Timeout for the migration in seconds
     :return: None
     """
-    from alembic.runtime.migration import MigrationContext
+    for ticker in range(timeout):
+        source_heads = get_source_heads()
+        db_heads = get_db_heads()
+        if source_heads == db_heads:
+            return
+        time.sleep(1)
+        log.info('Waiting for migrations... %s second(s)', ticker)
+    raise TimeoutError(
+        f"There are still unapplied migrations after {timeout} seconds. "
+        f"Migration Head(s) in DB: {db_heads} | Migration Head(s) in Source Code: {source_heads}"
+    )
+
+
+def get_source_heads() -> Set:

Review comment:
       If you don't need to annotate item types, the built-in `set` is enough. But this is fine as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #18439: Check and run migration in commands if necessary

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#discussion_r723779357



##########
File path: airflow/utils/db.py
##########
@@ -626,27 +629,88 @@ def check_migrations(timeout):
     :param timeout: Timeout for the migration in seconds
     :return: None
     """
-    from alembic.runtime.migration import MigrationContext
+    ticker = 0
+    while True:
+        source_heads = get_source_heads()
+        db_heads = get_db_heads()
+        if source_heads == db_heads:
+            break
+        if ticker >= timeout:
+            raise TimeoutError(
+                f"There are still unapplied migrations after {ticker} seconds. "
+                f"Migration Head(s) in DB: {db_heads} | Migration Head(s) in Source Code: {source_heads}"
+            )
+        ticker += 1
+        time.sleep(1)
+        log.info('Waiting for migrations... %s second(s)', ticker)
+
+
+def get_source_heads():
+    """
+    Function to get the current migration head in the source code.
+
+    :return: List of migration head(s)

Review comment:
       But the return value is actually a set? :p




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #18439: Check and run migration in commands if necessary

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#issuecomment-979222065


   Made some changes that need reviews @uranusjr @potiuk 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #18439: Check and run migration in commands if necessary

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#issuecomment-986744856


   Hey @kaxil - have your worries been addressed? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk edited a comment on pull request #18439: Check and run migration in webserver startup if necessary

Posted by GitBox <gi...@apache.org>.
potiuk edited a comment on pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#issuecomment-925330747


   I like this, however, I'd rather do it in other components as well. 
   
   I think webserver is not the onlu/exclusive one that should get it. While webserver won't likely break anything even if it is run with some old  db version (but scheduler can ) .  But since we have locks for upgrades, we could run the upgrade for ALL components that need DB and get rid of the "wait for migration"  special init container.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk edited a comment on pull request #18439: Check and run migration in webserver startup if necessary

Posted by GitBox <gi...@apache.org>.
potiuk edited a comment on pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#issuecomment-925375399


   The more I think about it, the more I like the idea of always running the migration at start of everything. I think for quite a while we neglected the fact, the db structure can get desynchronized from code, but I cannot imagine ANY situation including the dev where DB should be in a different version than latest airflow migration in the code. There is not a single case I can think of that this might be better than allowing for desynchronisation. This is "db as a code" to the fullest.
   
   The only edge case is of course when you are working on a new migration (which is extremely rare actually) - but even there eve if you try to implement and run/rerun and fix the migration you would rather just test the migrations not the airflow and if you wan to iterate on it you have to play with alembic command-line rather than with airflow worker/scheduler/webserver. And even there, it's extremely easy to add ``--skip-db-migrations`` to all the components of airflow.
   
   There is one potential problem - the migration lock is not implemented for MsSQL.  
   
   But assuming that is done, is there any reason we do NOT want to run migration for all components when they start ? Is there any particular reason we would not want to run the migrations? Any risks involved in doing so ? I thought hard about it and I cannot think a single reason why we would not want to run the migrations as first thing before starting up of scheduler/webserver/worker/triggerer.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #18439: Check and run migration in webserver startup if necessary

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#issuecomment-933010322


   > I guess with this approach we can add it to all components already, not only to webserver ?
   
   Yes. I also added it to the scheduler, triggerer and celery. Is there any other co you would like me to add it to?
   
   > I guess with this approach we can add it to all components already, not only to webserver ?
   
   I have added the check to run in all commands. Let me know if I shouldn't do that 🙂


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #18439: Check and run migration in webserver startup if necessary

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#discussion_r721334385



##########
File path: airflow/utils/cli.py
##########
@@ -89,6 +90,8 @@ def wrapper(*args, **kwargs):
         metrics = _build_metrics(f.__name__, args[0])
         cli_action_loggers.on_pre_execution(**metrics)
         try:
+            # Check and run migrations if necessary
+            check_and_run_migrations()

Review comment:
       I tihnk really the ones that for sure should NOT have the check is the `db` commmand (and all subcommands of it). The `db` command is the one that is used to managed the database and having de-synchronized migration there is "acceptable" really. Then you can either reset/upgrade or shell to running db without worrying about the state of synchronization (because you can use those commands to fix or inspect a problem if for example your migration fails).
   
   I would also skip it for the "installation" inspection/helper ones  (`version`, `providers`, `plugins`, `cheat-sheet`) - there is `slight` risk here - some of those commands 'could` work before  without the DB and for example we are running them to check if the "image" works fine, without actually having a DB initialized. Those never access/modify the DB.
   
   For all the rest, I think makes perfect sense to perform the check, as they simply need the DB in the **right** version to run. 
   
   On one hand we could introduce a new decorator (like `action_logging`)  but it is better to explicitly exclude the "non-DB" commands rather than add decorators for everything else. So maybe renaming `action_logging` to more generic (action_cli) decorator and adding `check_db=True` parameter and set them to false for all `db, version, providers, plugins, cheat-sheet` commands makes more sense? I think we should rather explicitly exclude those commands than do not need db migration check than include the check by adding new decorator.  
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #18439: Check and run migration in webserver startup if necessary

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#discussion_r716499095



##########
File path: airflow/utils/db.py
##########
@@ -626,27 +629,102 @@ def check_migrations(timeout):
     :param timeout: Timeout for the migration in seconds
     :return: None
     """
-    from alembic.runtime.migration import MigrationContext
+    ticker = 0
+    while True:
+        source_heads = get_source_heads()
+        db_heads = get_db_heads()
+        if source_heads == db_heads:
+            break
+        if ticker >= timeout:
+            raise TimeoutError(
+                f"There are still unapplied migrations after {ticker} seconds. "
+                f"Migration Head(s) in DB: {db_heads} | Migration Head(s) in Source Code: {source_heads}"
+            )
+        ticker += 1
+        time.sleep(1)
+        log.info('Waiting for migrations... %s second(s)', ticker)
+
+
+def get_source_heads():
+    """
+    Function to get the current migration head in the source code.
+
+    :return: List of migration head(s)
+    """
     from alembic.script import ScriptDirectory
 
     config = _get_alembic_config()
     script_ = ScriptDirectory.from_config(config)
+    return set(script_.get_heads())
+
+
+def get_db_heads():
+    """
+    Function to get the current migration head in the database.
+
+    :return: List of migration head(s)
+    """
+    from alembic.runtime.migration import MigrationContext
+
     with settings.engine.connect() as connection:
         context = MigrationContext.configure(connection)
-        ticker = 0
-        while True:
-            source_heads = set(script_.get_heads())
-            db_heads = set(context.get_current_heads())
-            if source_heads == db_heads:
-                break
-            if ticker >= timeout:
-                raise TimeoutError(
-                    f"There are still unapplied migrations after {ticker} seconds. "
-                    f"Migration Head(s) in DB: {db_heads} | Migration Head(s) in Source Code: {source_heads}"
-                )
-            ticker += 1
-            time.sleep(1)
-            log.info('Waiting for migrations... %s second(s)', ticker)
+        return set(context.get_current_heads())
+
+
+def check_and_run_migrations():
+    """Check and run migrations if necessary. Only use in a tty"""
+    from rich import print as rich_print
+
+    source_heads = get_source_heads()
+    db_heads = get_db_heads()
+    if len(db_heads) < 1:
+        if sys.stdout.isatty():
+            print()
+            question = (
+                "Please confirm database initialization " "(or wait 4 seconds to skip it). Are you sure?"
+            )
+            try:
+                ans = helpers.prompt_with_timeout(question, timeout=4)
+                if ans:
+                    # initialize database
+                    print("DB: " + repr(settings.engine.url))

Review comment:
       This will print passwords -- we shouldn't do this.

##########
File path: airflow/utils/db.py
##########
@@ -626,27 +629,102 @@ def check_migrations(timeout):
     :param timeout: Timeout for the migration in seconds
     :return: None
     """
-    from alembic.runtime.migration import MigrationContext
+    ticker = 0
+    while True:
+        source_heads = get_source_heads()
+        db_heads = get_db_heads()
+        if source_heads == db_heads:
+            break
+        if ticker >= timeout:
+            raise TimeoutError(
+                f"There are still unapplied migrations after {ticker} seconds. "
+                f"Migration Head(s) in DB: {db_heads} | Migration Head(s) in Source Code: {source_heads}"
+            )
+        ticker += 1
+        time.sleep(1)
+        log.info('Waiting for migrations... %s second(s)', ticker)
+
+
+def get_source_heads():
+    """
+    Function to get the current migration head in the source code.
+
+    :return: List of migration head(s)
+    """
     from alembic.script import ScriptDirectory
 
     config = _get_alembic_config()
     script_ = ScriptDirectory.from_config(config)
+    return set(script_.get_heads())
+
+
+def get_db_heads():
+    """
+    Function to get the current migration head in the database.
+
+    :return: List of migration head(s)
+    """
+    from alembic.runtime.migration import MigrationContext
+
     with settings.engine.connect() as connection:
         context = MigrationContext.configure(connection)
-        ticker = 0
-        while True:
-            source_heads = set(script_.get_heads())
-            db_heads = set(context.get_current_heads())
-            if source_heads == db_heads:
-                break
-            if ticker >= timeout:
-                raise TimeoutError(
-                    f"There are still unapplied migrations after {ticker} seconds. "
-                    f"Migration Head(s) in DB: {db_heads} | Migration Head(s) in Source Code: {source_heads}"
-                )
-            ticker += 1
-            time.sleep(1)
-            log.info('Waiting for migrations... %s second(s)', ticker)
+        return set(context.get_current_heads())
+
+
+def check_and_run_migrations():
+    """Check and run migrations if necessary. Only use in a tty"""
+    from rich import print as rich_print
+
+    source_heads = get_source_heads()
+    db_heads = get_db_heads()
+    if len(db_heads) < 1:
+        if sys.stdout.isatty():
+            print()
+            question = (
+                "Please confirm database initialization " "(or wait 4 seconds to skip it). Are you sure?"
+            )
+            try:
+                ans = helpers.prompt_with_timeout(question, timeout=4)
+                if ans:
+                    # initialize database
+                    print("DB: " + repr(settings.engine.url))
+                    initdb()
+                    print("DB Initialization done")
+            except AirflowException:
+                pass
+        else:
+            rich_print(
+                "[red][bold]ERROR:[/bold] You need to initialize the database. "
+                "Run `airflow db init` to initialize the database"
+            )
+            sys.exit(1)
+    elif source_heads != db_heads:
+        if sys.stdout.isatty():
+            print()
+            question = "Please confirm database upgrade" " (or wait 4 seconds to skip it). Are you sure?"
+            try:
+                ans = helpers.prompt_with_timeout(question, timeout=4)
+                if ans:
+                    # upgrade database
+                    try:
+                        print("DB: " + repr(settings.engine.url))
+                        upgradedb()
+                        print("DB Upgrade done")
+                    except Exception as e:
+                        print(e)
+                        rich_print(
+                            "[red][bold]ERROR:[/bold] You still have unapplied migrations. "
+                            "You may need to reset the database by running `airflow db reset`"
+                        )
+                        sys.exit(1)
+            except AirflowException:
+                pass
+        else:
+            rich_print(
+                "[red][bold]ERROR:[/bold] You need to upgrade the database. "
+                "Run `airflow db upgrade` to upgrade"
+            )
+            sys.exit(1)

Review comment:
       We can reduce the duplication in these blocks a lot by doing something like this
   
   ```suggestion
   def check_and_run_migrations():
       """Check and run migrations if necessary. Only use in a tty"""
       from rich import print as rich_print
   
       source_heads = get_source_heads()
       db_heads = get_db_heads()
       if len(db_heads) < 1:
           db_command = initdb
           db_name = "init"
       else:
           db_command = upgradedb
           db_name = "upgrade"
           
           if sys.stdout.isatty():
               print()
               question = (
                   "Please confirm database initialization " "(or wait 4 seconds to skip it). Are you sure?"
               )
               try:
                   ans = helpers.prompt_with_timeout(question, timeout=4)
                   if ans:
                       # initialize database
                       print("DB: " + repr(settings.engine.url))
                       initdb()
                       print("DB Initialization done")
               except AirflowException:
                   pass
           else:
               rich_print(
                   "[red][bold]ERROR:[/bold] You need to initialize the database. "
                   "Run `airflow db init` to initialize the database"
               )
               sys.exit(1)
       elif source_heads != db_heads:
           if sys.stdout.isatty():
               print()
               question = "Please confirm database upgrade" " (or wait 4 seconds to skip it). Are you sure?"
               try:
                   ans = helpers.prompt_with_timeout(question, timeout=4)
                   if ans:
                       # upgrade database
                       try:
                           print("DB: " + repr(settings.engine.url))
                           upgradedb()
                           print("DB Upgrade done")
                       except Exception as e:
                           print(e)
                           rich_print(
                               "[red][bold]ERROR:[/bold] You still have unapplied migrations. "
                               "You may need to reset the database by running `airflow db reset`"
                           )
                           sys.exit(1)
               except AirflowException:
                   pass
           else:
               rich_print(
                   "[red][bold]ERROR:[/bold] You need to upgrade the database. "
                   "Run `airflow db upgrade` to upgrade"
               )
               sys.exit(1)
   ```
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #18439: Check and run migration in webserver startup if necessary

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#issuecomment-927221301


   This would work fine even now - but rather than detecting that no migration is neded - it would attempt to do it for all of the components (only the first would actually run) - so there is no NEEED to move the lock outside, but it would be a little better also from logging point of view.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #18439: Check and run migration in webserver startup if necessary

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#issuecomment-925371273


   > If we do this, a nob to override the behaviour would be even more necessary. Also, the logic needs to be able to handle cases where the current db state is _in the future_, i.e. say I migrate to latest, `git checkout 2.1.0`, and run `./breeze start-airflow`.
   
   I thought a bit about it and I believe the dev environment is not a  problem at all.  Moreover, it might be pretty handy for development case. For breeze you have `stop` command that wipes out the DB volumes., and i use it rather often when I am switching branches. Alternatively you can also  run `./breeze start-airflow` with `--db-reset` flag (which I often use as well) which will run `airflow db reset` as part of the entrypoint_ci before attempting to run airflow..
   
   And to be honest most people even if they develop stuff don't actually switch to old versions - it;s mainly those few committers who either are involved in release management or analysing some strange bugs.
   
   But still even in the case you described - just failing  if the current db migration is unknown to your version of airflow -  would be really handy to notify  the developer that the db needs some action. Otherwise the errors you can see might be rather strange.  It did happen to me several times that I was scratching my head what's going to only find out that I had an old (or new) db structure. 
   
   So I see such migration check run at start as pretty useful thing also during development as a consistency check (especially that in Breeze we can even get a bit different error messages to instruct the new developers that they should run `./breeze stop.` or `--db-reset`
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #18439: Check and run migration in webserver startup if necessary

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#issuecomment-926004255


   I was also a bit torn between "erroring out" and "migrating". I see benefits of both. But  I think indeed erroring out is less "magical" (explicit vs. implicit) and still keeps the same properties as "migrating" in terms of consistency: db vs. airflow code which I think is the root of the problem.
   
   I like very much the terminal idea. It's the "least surprise" and we can even make it abit more "aggressive". If you do not respond in 5 seconds - exit with error. Or if you have "unknown" migration - error out immediately and print "You should likely reset the db" with instructions on how to do it. 
   
   This will cover the case that we had for quite some time in Breeze where you thought you run something but then you got the question about rebuild. The 4s. timeout with "erroring out" has a much nicer workflow for development. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #18439: Check and run migration in webserver startup if necessary

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#discussion_r716578846



##########
File path: airflow/utils/db.py
##########
@@ -626,27 +629,102 @@ def check_migrations(timeout):
     :param timeout: Timeout for the migration in seconds
     :return: None
     """
-    from alembic.runtime.migration import MigrationContext
+    ticker = 0
+    while True:
+        source_heads = get_source_heads()
+        db_heads = get_db_heads()
+        if source_heads == db_heads:
+            break
+        if ticker >= timeout:
+            raise TimeoutError(
+                f"There are still unapplied migrations after {ticker} seconds. "
+                f"Migration Head(s) in DB: {db_heads} | Migration Head(s) in Source Code: {source_heads}"
+            )
+        ticker += 1
+        time.sleep(1)
+        log.info('Waiting for migrations... %s second(s)', ticker)
+
+
+def get_source_heads():
+    """
+    Function to get the current migration head in the source code.
+
+    :return: List of migration head(s)
+    """
     from alembic.script import ScriptDirectory
 
     config = _get_alembic_config()
     script_ = ScriptDirectory.from_config(config)
+    return set(script_.get_heads())
+
+
+def get_db_heads():
+    """
+    Function to get the current migration head in the database.
+
+    :return: List of migration head(s)
+    """
+    from alembic.runtime.migration import MigrationContext
+
     with settings.engine.connect() as connection:
         context = MigrationContext.configure(connection)
-        ticker = 0
-        while True:
-            source_heads = set(script_.get_heads())
-            db_heads = set(context.get_current_heads())
-            if source_heads == db_heads:
-                break
-            if ticker >= timeout:
-                raise TimeoutError(
-                    f"There are still unapplied migrations after {ticker} seconds. "
-                    f"Migration Head(s) in DB: {db_heads} | Migration Head(s) in Source Code: {source_heads}"
-                )
-            ticker += 1
-            time.sleep(1)
-            log.info('Waiting for migrations... %s second(s)', ticker)
+        return set(context.get_current_heads())
+
+
+def check_and_run_migrations():
+    """Check and run migrations if necessary. Only use in a tty"""
+    from rich import print as rich_print
+
+    source_heads = get_source_heads()
+    db_heads = get_db_heads()
+    if len(db_heads) < 1:
+        if sys.stdout.isatty():
+            print()
+            question = (
+                "Please confirm database initialization " "(or wait 4 seconds to skip it). Are you sure?"
+            )
+            try:
+                ans = helpers.prompt_with_timeout(question, timeout=4)
+                if ans:
+                    # initialize database
+                    print("DB: " + repr(settings.engine.url))

Review comment:
       (At least I think it will print passwords.)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk edited a comment on pull request #18439: Check and run migration in webserver startup if necessary

Posted by GitBox <gi...@apache.org>.
potiuk edited a comment on pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#issuecomment-927114807


   This is about a 100th time (yeah I am exaggerating a bit) of similar problem I see over and over: https://github.com/apache/airflow/issues/18513
   
   We **MUST** add it fast.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #18439: Check and run migration in webserver startup if necessary

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#discussion_r716075733



##########
File path: airflow/utils/db.py
##########
@@ -626,27 +627,74 @@ def check_migrations(timeout):
     :param timeout: Timeout for the migration in seconds
     :return: None
     """
-    from alembic.runtime.migration import MigrationContext
+    ticker = 0
+    while True:
+        source_heads = get_source_heads()
+        db_heads = get_db_heads()
+        if source_heads == db_heads:
+            break
+        if ticker >= timeout:
+            raise TimeoutError(
+                f"There are still unapplied migrations after {ticker} seconds. "
+                f"Migration Head(s) in DB: {db_heads} | Migration Head(s) in Source Code: {source_heads}"
+            )
+        ticker += 1
+        time.sleep(1)
+        log.info('Waiting for migrations... %s second(s)', ticker)
+
+
+def get_source_heads():
+    """
+    Function to get the current migration head in the source code.
+
+    :return: List of migration head(s)
+    """
     from alembic.script import ScriptDirectory
 
     config = _get_alembic_config()
     script_ = ScriptDirectory.from_config(config)
+    return set(script_.get_heads())
+
+
+def get_db_heads():
+    """
+    Function to get the current migration head in the database.
+
+    :return: List of migration head(s)
+    """
+    from alembic.runtime.migration import MigrationContext
+
     with settings.engine.connect() as connection:
         context = MigrationContext.configure(connection)
-        ticker = 0
-        while True:
-            source_heads = set(script_.get_heads())
-            db_heads = set(context.get_current_heads())
-            if source_heads == db_heads:
-                break
-            if ticker >= timeout:
-                raise TimeoutError(
-                    f"There are still unapplied migrations after {ticker} seconds. "
-                    f"Migration Head(s) in DB: {db_heads} | Migration Head(s) in Source Code: {source_heads}"
-                )
-            ticker += 1
-            time.sleep(1)
-            log.info('Waiting for migrations... %s second(s)', ticker)
+        return set(context.get_current_heads())
+
+
+def check_and_run_migrations(source_heads, db_heads):
+    """Check and run migrations if necessary. Only use in a tty"""
+    from rich import print as rich_print
+
+    source_heads = get_source_heads()
+    db_heads = get_db_heads()
+    if len(db_heads) < 0:
+        if sys.stdout.isatty():
+            # initialize database
+            print("DB: " + repr(settings.engine.url))
+            initdb()
+            print("DB Initialization done")
+        else:
+            rich_print("[red][bold]ERROR:[/bold] You need to initialize the database")

Review comment:
       Both I think - there is a chance that running Airflow on non-migrated DB might cause some data corruption - I think this is perfectly fine to exit immediately if we have wrong (even with sys.exit() rather than throwing exceptions). 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #18439: Check and run migration in webserver startup if necessary

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#discussion_r716502072



##########
File path: airflow/utils/db.py
##########
@@ -626,27 +629,102 @@ def check_migrations(timeout):
     :param timeout: Timeout for the migration in seconds
     :return: None
     """
-    from alembic.runtime.migration import MigrationContext
+    ticker = 0
+    while True:
+        source_heads = get_source_heads()
+        db_heads = get_db_heads()
+        if source_heads == db_heads:
+            break
+        if ticker >= timeout:
+            raise TimeoutError(
+                f"There are still unapplied migrations after {ticker} seconds. "
+                f"Migration Head(s) in DB: {db_heads} | Migration Head(s) in Source Code: {source_heads}"
+            )
+        ticker += 1
+        time.sleep(1)
+        log.info('Waiting for migrations... %s second(s)', ticker)
+
+
+def get_source_heads():
+    """
+    Function to get the current migration head in the source code.
+
+    :return: List of migration head(s)
+    """
     from alembic.script import ScriptDirectory
 
     config = _get_alembic_config()
     script_ = ScriptDirectory.from_config(config)
+    return set(script_.get_heads())
+
+
+def get_db_heads():
+    """
+    Function to get the current migration head in the database.
+
+    :return: List of migration head(s)
+    """
+    from alembic.runtime.migration import MigrationContext
+
     with settings.engine.connect() as connection:
         context = MigrationContext.configure(connection)
-        ticker = 0
-        while True:
-            source_heads = set(script_.get_heads())
-            db_heads = set(context.get_current_heads())
-            if source_heads == db_heads:
-                break
-            if ticker >= timeout:
-                raise TimeoutError(
-                    f"There are still unapplied migrations after {ticker} seconds. "
-                    f"Migration Head(s) in DB: {db_heads} | Migration Head(s) in Source Code: {source_heads}"
-                )
-            ticker += 1
-            time.sleep(1)
-            log.info('Waiting for migrations... %s second(s)', ticker)
+        return set(context.get_current_heads())
+
+
+def check_and_run_migrations():
+    """Check and run migrations if necessary. Only use in a tty"""
+    from rich import print as rich_print
+
+    source_heads = get_source_heads()
+    db_heads = get_db_heads()
+    if len(db_heads) < 1:
+        if sys.stdout.isatty():
+            print()
+            question = (
+                "Please confirm database initialization " "(or wait 4 seconds to skip it). Are you sure?"
+            )
+            try:
+                ans = helpers.prompt_with_timeout(question, timeout=4)
+                if ans:
+                    # initialize database
+                    print("DB: " + repr(settings.engine.url))
+                    initdb()
+                    print("DB Initialization done")
+            except AirflowException:
+                pass
+        else:
+            rich_print(
+                "[red][bold]ERROR:[/bold] You need to initialize the database. "
+                "Run `airflow db init` to initialize the database"
+            )
+            sys.exit(1)
+    elif source_heads != db_heads:
+        if sys.stdout.isatty():
+            print()
+            question = "Please confirm database upgrade" " (or wait 4 seconds to skip it). Are you sure?"
+            try:
+                ans = helpers.prompt_with_timeout(question, timeout=4)
+                if ans:
+                    # upgrade database
+                    try:
+                        print("DB: " + repr(settings.engine.url))
+                        upgradedb()
+                        print("DB Upgrade done")
+                    except Exception as e:
+                        print(e)
+                        rich_print(
+                            "[red][bold]ERROR:[/bold] You still have unapplied migrations. "
+                            "You may need to reset the database by running `airflow db reset`"
+                        )
+                        sys.exit(1)
+            except AirflowException:
+                pass
+        else:
+            rich_print(
+                "[red][bold]ERROR:[/bold] You need to upgrade the database. "
+                "Run `airflow db upgrade` to upgrade"
+            )
+            sys.exit(1)

Review comment:
       We can reduce the duplication in these blocks a lot by doing something like this
   
   ```suggestion
   def check_and_run_migrations():
       """Check and run migrations if necessary. Only use in a tty"""
       from rich import print as rich_print
   
       source_heads = get_source_heads()
       db_heads = get_db_heads()
       if len(db_heads) < 1:
           db_command = initdb
           db_name = "init"
           verb = "initialize"
       else:
           db_command = upgradedb
           db_name = "upgrade"
           verb = "upgrade"
           
       if sys.stdout.isatty():
           print()
           question = (
               f"Please confirm database {verb} (or wait 4 seconds to skip it). Are you sure?"
           )
           ans = helpers.prompt_with_timeout(question, timeout=4)
           if ans:
               # initialize database
               print("DB: " + repr(settings.engine.url))
               db_command()
               print(f"DB {verb} done")
       else:
           rich_print(
               f"[red][bold]ERROR:[/bold] You need to {verb} the database. Please run `airflow db {db_name}`"
           )
           sys.exit(1)
   
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb edited a comment on pull request #18439: Check and run migration in webserver startup if necessary

Posted by GitBox <gi...@apache.org>.
ashb edited a comment on pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#issuecomment-925826422


   I feel uneasy about automatically running migrations for non-automated approaches.
   
   What I propse we do is:
   
   - When connected to a TTY: prompt the user if they want to run migrations if there are any outstanding
   
     > There are outstanding db migrations. Do you wish to run them now [N/y]?
   
   - When not connected to a TTY (i.e. a daemon or non-attached container for instance) just error.
   
   That feels much safer than automatically running a potential long migration when the user isn't expecting -- doubly so in the case of MySQL where DDL etc is non transactional so the process can't even be safely interrupted!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #18439: Check and run migration in commands if necessary

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#issuecomment-989263779


   woooohooo!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #18439: Check and run migration in commands if necessary

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#discussion_r764887554



##########
File path: airflow/utils/cli.py
##########
@@ -53,51 +54,57 @@ def _check_cli_args(args):
         )
 
 
-def action_logging(f: T) -> T:
-    """
-    Decorates function to execute function at the same time submitting action_logging
-    but in CLI context. It will call action logger callbacks twice,
-    one for pre-execution and the other one for post-execution.
-
-    Action logger will be called with below keyword parameters:
-        sub_command : name of sub-command
-        start_datetime : start datetime instance by utc
-        end_datetime : end datetime instance by utc
-        full_command : full command line arguments
-        user : current user
-        log : airflow.models.log.Log ORM instance
-        dag_id : dag id (optional)
-        task_id : task_id (optional)
-        execution_date : execution date (optional)
-        error : exception instance if there's an exception
-
-    :param f: function instance
-    :return: wrapped function
-    """
-
-    @functools.wraps(f)
-    def wrapper(*args, **kwargs):
+def action_cli(check_db=True):
+    def action_logging(f: T) -> T:
+        """
+        Decorates function to execute function at the same time submitting action_logging
+        but in CLI context. It will call action logger callbacks twice,
+        one for pre-execution and the other one for post-execution.
+
+        Action logger will be called with below keyword parameters:
+            sub_command : name of sub-command
+            start_datetime : start datetime instance by utc
+            end_datetime : end datetime instance by utc
+            full_command : full command line arguments
+            user : current user
+            log : airflow.models.log.Log ORM instance
+            dag_id : dag id (optional)
+            task_id : task_id (optional)
+            execution_date : execution date (optional)
+            error : exception instance if there's an exception
+
+        :param f: function instance
+        :return: wrapped function
         """
-        An wrapper for cli functions. It assumes to have Namespace instance
-        at 1st positional argument
 
-        :param args: Positional argument. It assumes to have Namespace instance
+        @functools.wraps(f)
+        def wrapper(*args, **kwargs):
+            """
+            An wrapper for cli functions. It assumes to have Namespace instance
             at 1st positional argument
-        :param kwargs: A passthrough keyword argument
-        """
-        _check_cli_args(args)
-        metrics = _build_metrics(f.__name__, args[0])
-        cli_action_loggers.on_pre_execution(**metrics)
-        try:
-            return f(*args, **kwargs)
-        except Exception as e:
-            metrics['error'] = e
-            raise
-        finally:
-            metrics['end_datetime'] = datetime.utcnow()
-            cli_action_loggers.on_post_execution(**metrics)
-
-    return cast(T, wrapper)
+
+            :param args: Positional argument. It assumes to have Namespace instance
+                at 1st positional argument
+            :param kwargs: A passthrough keyword argument
+            """
+            _check_cli_args(args)
+            metrics = _build_metrics(f.__name__, args[0])
+            cli_action_loggers.on_pre_execution(**metrics)
+            try:
+                # Check and run migrations if necessary
+                if check_db:
+                    check_and_run_migrations()
+                return f(*args, **kwargs)
+            except Exception as e:
+                metrics['error'] = e
+                raise
+            finally:
+                metrics['end_datetime'] = datetime.utcnow()
+                cli_action_loggers.on_post_execution(**metrics)
+
+        return cast(T, wrapper)
+
+    return action_logging

Review comment:
       And then
   ```suggestion
       if func:
           return action_logging(func)
       return action_logging
   ```

##########
File path: airflow/utils/cli.py
##########
@@ -53,51 +54,57 @@ def _check_cli_args(args):
         )
 
 
-def action_logging(f: T) -> T:
-    """
-    Decorates function to execute function at the same time submitting action_logging
-    but in CLI context. It will call action logger callbacks twice,
-    one for pre-execution and the other one for post-execution.
-
-    Action logger will be called with below keyword parameters:
-        sub_command : name of sub-command
-        start_datetime : start datetime instance by utc
-        end_datetime : end datetime instance by utc
-        full_command : full command line arguments
-        user : current user
-        log : airflow.models.log.Log ORM instance
-        dag_id : dag id (optional)
-        task_id : task_id (optional)
-        execution_date : execution date (optional)
-        error : exception instance if there's an exception
-
-    :param f: function instance
-    :return: wrapped function
-    """
-
-    @functools.wraps(f)
-    def wrapper(*args, **kwargs):
+def action_cli(check_db=True):

Review comment:
       Ah, this is why we have `action_cli()` instead of `action_cli` at usage. See https://stackoverflow.com/a/69210737/439189
   
   
   
   ```suggestion
   def action_cli(func=None, check_db=True):
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #18439: Check and run migration in commands if necessary

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #18439:
URL: https://github.com/apache/airflow/pull/18439#issuecomment-986979416


   > Hey @kaxil - have your worries been addressed?
   
   Hey, yeah I & Ephraim are working closely on this one -- some of the PRs lately were to unblock this PR :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org