You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/06/10 17:02:11 UTC

[GitHub] [airflow] ldacey commented on pull request #21731: Store callbacks in database if standalone_dag_processor config is True.

ldacey commented on PR #21731:
URL: https://github.com/apache/airflow/pull/21731#issuecomment-1152561962

   FYI - I disabled the standalone DAG processor and it fixed my issues, but while enabled my scheduler would die sometimes. Here are some logs where it died almost immediately after I deployed Airflow:
   
   ```
   
   | [2022-06-09 19:11:22,231] {scheduler_job.py:696} INFO - Starting the scheduler
   | [2022-06-09 19:11:22,231] {scheduler_job.py:701} INFO - Processing each file at most -1 times
   | [2022-06-09 19:11:22,501] {executor_loader.py:105} INFO - Loaded executor: CeleryExecutor
   | [2022-06-09 19:11:22,502] {scheduler_job.py:1221} INFO - Resetting orphaned tasks for active dag runs
   | [2022-06-09 19:11:22,656] {celery_executor.py:532} INFO - Adopted the following 5 tasks from a dead executor
   |       <TaskInstance: cms-agent.download.extract scheduled__2022-06-09T17:33:00+00:00 [running]> in state STARTED
   |       <TaskInstance: gts-received.export-gts-received scheduled__2022-06-09T16:03:00+00:00 [queued]> in state STARTED
   |       <TaskInstance: cms-agent.download.extract scheduled__2022-06-09T17:33:00+00:00 [running]> in state STARTED
   |       <TaskInstance: cms-split.transform scheduled__2022-06-09T17:32:00+00:00 [running]> in state STARTED
   | [2022-06-09 19:11:22,777] {dag.py:2927} INFO - Setting next_dagrun for telecom-gts to 2022-06-09T19:05:00+00:00, run_after=2022-06-09T19:35:00+00:00
   | [2022-06-09 19:11:27,118] {font_manager.py:1443} INFO - generated new fontManager
   | [2022-06-09 19:11:31,341] {scheduler_job.py:1126} INFO - Run scheduled__2022-06-09T17:33:00+00:00 of cms-agent has timed-out
   | [2022-06-09 19:11:31,347] {dag.py:2927} INFO - Setting next_dagrun for cms-agent to 2022-06-09T18:33:00+00:00, run_after=2022-06-09T19:03:00+00:00
   | [2022-06-09 19:11:31,349] {scheduler_job.py:756} ERROR - Exception when executing SchedulerJob._run_scheduler_loop
   | Traceback (most recent call last):
   |   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 739, in _execute
   |     self._run_scheduler_loop()
   |   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 827, in _run_scheduler_loop
   |     num_queued_tis = self._do_scheduling(session)
   |   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 909, in _do_scheduling
   |     callback_to_run = self._schedule_dag_run(dag_run, session)
   |   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 1141, in _schedule_dag_run
   |     self._send_dag_callbacks_to_processor(dag, callback_to_execute)
   |   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 1184, in _send_dag_callbacks_to_processor
   |     self.executor.send_callback(callback)
   |   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/executors/base_executor.py", line 363, in send_callback
   |     self.callback_sink.send(request)
   |   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 70, in wrapper
   |     with create_session() as session:
   |   File "/usr/local/lib/python3.10/contextlib.py", line 142, in __exit__
   |     next(self.gen)
   |   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 33, in create_session
   |     session.commit()
   |   File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 1423, in commit
   |     self._transaction.commit(_to_root=self.future)
   |   File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 829, in commit
   |     self._prepare_impl()
   |   File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 797, in _prepare_impl
   |     self.session.dispatch.before_commit(self.session)
   |   File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/event/attr.py", line 320, in __call__
   |     fn(*args, **kw)
   |   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/sqlalchemy.py", line 268, in _validate_commit
   |     raise RuntimeError("UNEXPECTED COMMIT - THIS WILL BREAK HA LOCKS!")
   | RuntimeError: UNEXPECTED COMMIT - THIS WILL BREAK HA LOCKS!
   | [2022-06-09 19:11:31,372] {scheduler_job.py:768} INFO - Exited execute loop
   | Traceback (most recent call last):
   |   File "/home/airflow/.local/bin/airflow", line 8, in <module>
   |     sys.exit(main())
   |   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/__main__.py", line 38, in main
   |     args.func(args)
   |   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/cli/cli_parser.py", line 51, in command
   |     return func(*args, **kwargs)
   |   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/cli.py", line 99, in wrapper
   |     return f(*args, **kwargs)
   |   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/cli/commands/scheduler_command.py", line 75, in scheduler
   |     _run_scheduler_job(args=args)
   |   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/cli/commands/scheduler_command.py", line 46, in _run_scheduler_job
   |     job.run()
   |   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/base_job.py", line 244, in run
   |     self._execute()
   |   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 739, in _execute
   |     self._run_scheduler_loop()
   |   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 827, in _run_scheduler_loop
   |     num_queued_tis = self._do_scheduling(session)
   |   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 909, in _do_scheduling
   |     callback_to_run = self._schedule_dag_run(dag_run, session)
   |   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 1141, in _schedule_dag_run
   |     self._send_dag_callbacks_to_processor(dag, callback_to_execute)
   |   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 1184, in _send_dag_callbacks_to_processor
   |     self.executor.send_callback(callback)
   |   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/executors/base_executor.py", line 363, in send_callback
   |     self.callback_sink.send(request)
   |   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 70, in wrapper
   |     with create_session() as session:
   |   File "/usr/local/lib/python3.10/contextlib.py", line 142, in __exit__
   |     next(self.gen)
   |   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 33, in create_session
   |     session.commit()
   |   File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 1423, in commit
   |     self._transaction.commit(_to_root=self.future)
   |   File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 829, in commit
   |     self._prepare_impl()
   |   File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 797, in _prepare_impl
   |     self.session.dispatch.before_commit(self.session)
   |   File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/event/attr.py", line 320, in __call__
   |     fn(*args, **kw)
   |   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/sqlalchemy.py", line 268, in _validate_commit
   |     raise RuntimeError("UNEXPECTED COMMIT - THIS WILL BREAK HA LOCKS!")
   | RuntimeError: UNEXPECTED COMMIT - THIS WILL BREAK HA LOCKS!
   |   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 70, in wrapper
   |     with create_session() as session:
   |   File "/usr/local/lib/python3.10/contextlib.py", line 142, in __exit__
   |     next(self.gen)
   |   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 33, in create_session
   |     session.commit()
   |   File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 1423, in commit
   |     self._transaction.commit(_to_root=self.future)
   |   File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 829, in commit
   |     self._prepare_impl()
   |   File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 797, in _prepare_impl
   |     self.session.dispatch.before_commit(self.session)
   |   File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/event/attr.py", line 320, in __call__
   |     fn(*args, **kw)
   |   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/sqlalchemy.py", line 268, in _validate_commit
   |     raise RuntimeError("UNEXPECTED COMMIT - THIS WILL BREAK HA LOCKS!")
   | RuntimeError: UNEXPECTED COMMIT - THIS WILL BREAK HA LOCKS!
   ```
   
   A pull request was already created (I discussed this on Slack), so I am not sure if an issue should be raised or not: https://github.com/apache/airflow/pull/24366


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org