You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/07/14 21:56:22 UTC

[GitHub] [airflow] vitaly-krugl opened a new issue, #25075: Airflow Database Upgrade error: sqlalchemy.exc.ProgrammingError: (psycopg2.errors.UndefinedColumn) column dag.root_dag_id does not exist

vitaly-krugl opened a new issue, #25075:
URL: https://github.com/apache/airflow/issues/25075

   ### Apache Airflow version
   
   2.3.2
   
   ### What happened
   
   My production deployment had Airflow v1.8.0. It already had Dags, Task Instances, etc., in the tables. When upgrading to Airflow v2.3.2, the execution of `airflow db upgrade` resulted in the failure: `sqlalchemy.exc.ProgrammingError: (psycopg2.errors.UndefinedColumn) column dag.root_dag_id does not exist`.
   
   The problem is due to an unsafe practice used in the implementation of multiple migration version scripts, including some recent ones. More on this in the "Anything else" section below.
   
   
   The logs/traceback:
   ```
   INFO  [alembic.runtime.migration] Context impl PostgresqlImpl.
   INFO  [alembic.runtime.migration] Will assume transactional DDL.
   INFO  [alembic.runtime.migration] Running upgrade 127d2bf2dfa7 -> cc1e65623dc7, Add ``max_tries`` column to ``task_instance``
   Traceback (most recent call last):
     File "/a/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1705, in _execute_context
       self.dialect.do_execute(
     File "/a/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 716, in do_execute
       cursor.execute(statement, parameters)
   psycopg2.errors.UndefinedColumn: column dag.root_dag_id does not exist
   LINE 1: SELECT dag.dag_id AS dag_dag_id, dag.root_dag_id AS dag_root...
                                            ^
    
    
   The above exception was the direct cause of the following exception:
    
   Traceback (most recent call last):
     File "/a/bin/airflow", line 8, in <module>
       sys.exit(main())
     File "/a/lib/python3.9/site-packages/airflow/__main__.py", line 38, in main
       args.func(args)
     File "/a/lib/python3.9/site-packages/airflow/cli/cli_parser.py", line 51, in command
       return func(*args, **kwargs)
     File "/a/lib/python3.9/site-packages/airflow/utils/cli.py", line 99, in wrapper
       return f(*args, **kwargs)
     File "/a/lib/python3.9/site-packages/airflow/cli/commands/db_command.py", line 82, in upgradedb
       db.upgradedb(to_revision=to_revision, from_revision=from_revision, show_sql_only=args.show_sql_only)
     File "/a/lib/python3.9/site-packages/airflow/utils/session.py", line 71, in wrapper
       return func(*args, session=session, **kwargs)
     File "/a/lib/python3.9/site-packages/airflow/utils/db.py", line 1449, in upgradedb
       command.upgrade(config, revision=to_revision or 'heads')
     File "/a/lib/python3.9/site-packages/alembic/command.py", line 322, in upgrade
       script.run_env()
     File "/a/lib/python3.9/site-packages/alembic/script/base.py", line 569, in run_env
       util.load_python_file(self.dir, "env.py")
     File "/a/lib/python3.9/site-packages/alembic/util/pyfiles.py", line 94, in load_python_file
       module = load_module_py(module_id, path)
     File "/a/lib/python3.9/site-packages/alembic/util/pyfiles.py", line 110, in load_module_py
       spec.loader.exec_module(module)  # type: ignore
     File "<frozen importlib._bootstrap_external>", line 855, in exec_module
     File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed
     File "/a/lib/python3.9/site-packages/airflow/migrations/env.py", line 107, in <module>
       run_migrations_online()
     File "/a/lib/python3.9/site-packages/airflow/migrations/env.py", line 101, in run_migrations_online
       context.run_migrations()
     File "<string>", line 8, in run_migrations
     File "/a/lib/python3.9/site-packages/alembic/runtime/environment.py", line 853, in run_migrations
       self.get_context().run_migrations(**kw)
     File "/a/lib/python3.9/site-packages/alembic/runtime/migration.py", line 623, in run_migrations
       step.migration_fn(**kw)
     File "/a/lib/python3.9/site-packages/airflow/migrations/versions/0023_1_8_2_add_max_tries_column_to_task_instance.py", line 81, in upgrade
       dag = dagbag.get_dag(ti.dag_id)
     File "/a/lib/python3.9/site-packages/airflow/utils/session.py", line 71, in wrapper
       return func(*args, session=session, **kwargs)
     File "/a/lib/python3.9/site-packages/airflow/models/dagbag.py", line 217, in get_dag
       orm_dag = DagModel.get_current(root_dag_id, session=session)
     File "/a/lib/python3.9/site-packages/airflow/utils/session.py", line 68, in wrapper
       return func(*args, **kwargs)
     File "/a/lib/python3.9/site-packages/airflow/models/dag.py", line 2775, in get_current
       return session.query(cls).filter(cls.dag_id == dag_id).first()
     File "/a/lib/python3.9/site-packages/sqlalchemy/orm/query.py", line 2734, in first
       return self.limit(1)._iter().first()
     File "/a/lib/python3.9/site-packages/sqlalchemy/orm/query.py", line 2818, in _iter
       result = self.session.execute(
     File "/a/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 1670, in execute
       result = conn._execute_20(statement, params or {}, execution_options)
     File "/a/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1520, in _execute_20
       return meth(self, args_10style, kwargs_10style, execution_options)
     File "/a/lib/python3.9/site-packages/sqlalchemy/sql/elements.py", line 313, in _execute_on_connection
       return connection._execute_clauseelement(
     File "/a/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1389, in _execute_clauseelement
       ret = self._execute_context(
     File "/a/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1748, in _execute_context
       self._handle_dbapi_exception(
     File "/a/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1929, in _handle_dbapi_exception
       util.raise_(
     File "/a/lib/python3.9/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
       raise exception
     File "/a/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1705, in _execute_context
       self.dialect.do_execute(
     File "/a/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 716, in do_execute
       cursor.execute(statement, parameters)
   sqlalchemy.exc.ProgrammingError: (psycopg2.errors.UndefinedColumn) column dag.root_dag_id does not exist
   LINE 1: SELECT dag.dag_id AS dag_dag_id, dag.root_dag_id AS dag_root...
                                            ^
    
   [SQL: SELECT dag.dag_id AS dag_dag_id, dag.root_dag_id AS dag_root_dag_id, dag.is_paused AS dag_is_paused, dag.is_subdag AS dag_is_subdag, dag.is_active AS dag_is_active, dag.last_parsed_time AS dag_last_parsed_time, dag.last_pickled AS dag_last_pickled, dag.last_expired AS dag_last_expired, dag.scheduler_lock AS dag_scheduler_lock, dag.pickle_id AS dag_pickle_id, dag.fileloc AS dag_fileloc, dag.owners AS dag_owners, dag.description AS dag_description, dag.default_view AS dag_default_view, dag.schedule_interval AS dag_schedule_interval, dag.timetable_description AS dag_timetable_description, dag.max_active_tasks AS dag_max_active_tasks, dag.max_active_runs AS dag_max_active_runs, dag.has_task_concurrency_limits AS dag_has_task_concurrency_limits, dag.has_import_errors AS dag_has_import_errors, dag.next_dagrun AS dag_next_dagrun, dag.next_dagrun_data_interval_start AS dag_next_dagrun_data_interval_start, dag.next_dagrun_data_interval_end AS dag_next_dagrun_data_interval_end, dag.ne
 xt_dagrun_create_after AS dag_next_dagrun_create_after
   FROM dag
   WHERE dag.dag_id = %(dag_id_1)s
    LIMIT %(param_1)s]
   [parameters: {'dag_id_1': 'dns_dv_challenge_initiate_v2', 'param_1': 1}]
   (Background on this error at: http://sqlalche.me/e/14/f405)
   ```
   
   ### What you think should happen instead
   
   The database upgrade should have succeeded.
   
   ### How to reproduce
   
   Install Airflow 1.8.0 and initialize airflow db.
   Create and execute some dags so that Dag table gets populated.
   Upgrade airflow to 2.3.2 and upgrade the database via `airflow db upgrade`.
   
   ### Operating System
   
   OS-X
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-ftp==2.1.2
   apache-airflow-providers-http==2.1.2
   apache-airflow-providers-imap==2.2.3
   apache-airflow-providers-sqlite==2.1.3
   
   
   ### Deployment
   
   Virtualenv installation
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   The sqlalchemy ProgrammingError exception occurred during the alembic database upgrade 127d2bf2dfa7 -> cc1e65623dc7. Searching for the target revision cc1e65623dc7 in the sources of Airflow v2.3.2, matches this Airflow database migration version script: https://github.com/apache/airflow/blob/2.3.2/airflow/migrations/versions/0023_1_8_2_add_max_tries_column_to_task_instance.py. Here, we observe the following imports from Airflow's own code at https://github.com/apache/airflow/blob/2.3.2/airflow/migrations/versions/0023_1_8_2_add_max_tries_column_to_task_instance.py#L31-L33:
   
   from airflow import settings
   from airflow.compat.sqlalchemy import inspect
   from airflow.models import DagBag
   And the code at lines https://github.com/apache/airflow/blob/2.3.2/airflow/migrations/versions/0023_1_8_2_add_max_tries_column_to_task_instance.py#L74-L81 triggers the aforementioned ProgrammingError exception:
   
   dagbag = DagBag(settings.DAGS_FOLDER)
   query = session.query(sa.func.count(TaskInstance.max_tries)).filter(TaskInstance.max_tries == -1)
   # Separate db query in batch to prevent loading entire table
   # into memory and cause out of memory error.
   while query.scalar():
       tis = session.query(TaskInstance).filter(TaskInstance.max_tries == -1).limit(BATCH_SIZE).all()
       for ti in tis:
           dag = dagbag.get_dag(ti.dag_id)
   What's happening? Let's recall that the error during migration to database revision cc1e65623dc7 was "sqlalchemy.exc.ProgrammingError: (psycopg2.errors.UndefinedColumn) column dag.root_dag_id does not exist". So, the failure occurred because the call to `dagbag.get_dag(ti.dag_id)` was trying to load a Dag model with the column "root_dag_id" and this column didn't exist yet in the Airflow database revision 127d2bf2dfa7 (from Airflow v1.8.0). The column "root_dag_id" would be added by a later version script https://github.com/apache/airflow/blob/2.3.2/airflow/migrations/versions/0045_1_10_7_add_root_dag_id_to_dag.py from Airflow v1.10.7.
   
   The problem with 0023_1_8_2_add_max_tries_column_to_task_instance.py is that it imports and uses DagBag to fetch Dag models which had all the expected columns during the time that this migration version script was developed, but this was no longer the case when the I actually tried to run the airflow db upgrade - by this time, the Dag columns had changed, and so the Dag model was trying to load the column "root_dag_id" (since the upgrade was happening at Airflow v2.3.2 which has this column), but the actual database (which was initialized by Airflow v1.8.0) didn't have this column yet!
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk closed issue #25075: Airflow Database Upgrade error: sqlalchemy.exc.ProgrammingError: (psycopg2.errors.UndefinedColumn) column dag.root_dag_id does not exist

Posted by GitBox <gi...@apache.org>.
potiuk closed issue #25075: Airflow Database Upgrade error: sqlalchemy.exc.ProgrammingError: (psycopg2.errors.UndefinedColumn) column dag.root_dag_id does not exist
URL: https://github.com/apache/airflow/issues/25075


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on issue #25075: Airflow Database Upgrade error: sqlalchemy.exc.ProgrammingError: (psycopg2.errors.UndefinedColumn) column dag.root_dag_id does not exist

Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #25075:
URL: https://github.com/apache/airflow/issues/25075#issuecomment-1186276061

   Version 1.8 is not supported for more than 4 years. https://github.com/apache/airflow#version-life-cycle - I think you lost the opportunity a long time ago to have any support here - people here are supporting any questions in their free time, but there is probably no-one who was around when 1.8 was released. 
   
   I recommend you drop your data and start from scratch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org