You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/07/08 19:51:48 UTC
[GitHub] [airflow] dstaple commented on issue #23361: Scheduler crashes with psycopg2.errors.DeadlockDetected exception
dstaple commented on issue #23361:
URL: https://github.com/apache/airflow/issues/23361#issuecomment-1179314412
I am also encountering this issue. I collected some details about both queries involved in the deadlock, hopefully this is helpful.
Deployment details:
* Airflow 2.2.5
* KuberneteExecutor
* A single Airflow scheduler is running.
* Row level locking is enabled.
* Scheduler parsing_processes = 5
* Scheduler resources: 8 cores, 5 GB RAM
* Database resources: 12 cores, 8 GB RAM (Postgres 11.3)
* The problem only appears at scale (50-150 DAGs, several of which have hundreds of tasks).
* The problem is not easily reproducible but is happening daily.
In the deadlocks there is an UPDATE statement deadlocking with a SELECT ... FOR UPDATE.
Based on stack traces visible in the scheduler logs, the UPDATE originates from the main scheduler loop here:
https://github.com/apache/airflow/blob/2.2.5/airflow/models/dagrun.py#L901-L910
Based on the database logs, the SELECT statement has the form:
```
SELECT task_instance.try_number AS task_instance_try_number, ...
FROM task_instance JOIN dag_run AS dag_run_1 ON dag_run_1.dag_id = task_instance.dag_id AND dag_run_1.run_id = task_instance.run_id
WHERE task_instance.dag_id = 'my_dag_id' AND task_instance.task_id = 'my_task_id' AND task_instance.run_id = 'sanitized_run_id_1'
LIMIT 1 FOR UPDATE
```
Searching the Airflow source code, the query that looks most similar to the SELECT from the database error is in `TaskInstance.refresh_from_db()`:
https://github.com/apache/airflow/blob/2.2.5/airflow/models/taskinstance.py#L714-L736
Example scheduler logs showing the origins of the UPDATE statement:
```
[2022-07-06 18:54:29,456] {{scheduler_job.py:753}} INFO - Exited execute loop
Traceback (most recent call last):
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context
cursor, statement, parameters, context
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 609, in do_execute
cursor.execute(statement, parameters)
psycopg2.errors.DeadlockDetected: deadlock detected
DETAIL: Process 99711 waits for ShareLock on transaction 527390121; blocked by process 100627.
Process 100627 waits for ShareLock on transaction 527390039; blocked by process 99711.
HINT: See server log for query details.
CONTEXT: while updating tuple (48513,18) in relation "task_instance"
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/opt/airflow/venv-py3/bin/airflow", line 8, in <module>
sys.exit(main())
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/__main__.py", line 48, in main
args.func(args)
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/cli/cli_parser.py", line 48, in command
return func(*args, **kwargs)
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/utils/cli.py", line 92, in wrapper
return f(*args, **kwargs)
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/cli/commands/scheduler_command.py", line 75, in scheduler
_run_scheduler_job(args=args)
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/cli/commands/scheduler_command.py", line 46, in _run_scheduler_job
job.run()
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/jobs/base_job.py", line 246, in run
self._execute()
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 726, in _execute
self._run_scheduler_loop()
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 807, in _run_scheduler_loop
num_queued_tis = self._do_scheduling(session)
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 890, in _do_scheduling
callback_to_run = self._schedule_dag_run(dag_run, session)
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1147, in _schedule_dag_run
dag_run.schedule_tis(schedulable_tis, session)
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/utils/session.py", line 67, in wrapper
return func(*args, **kwargs)
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/models/dagrun.py", line 909, in schedule_tis
.update({TI.state: State.SCHEDULED}, synchronize_session=False)
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 4063, in update
update_op.exec_()
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 1697, in exec_
self._do_exec()
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 1895, in _do_exec
self._execute_stmt(update_stmt)
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 1702, in _execute_stmt
self.result = self.query._execute_crud(stmt, self.mapper)
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 3568, in _execute_crud
return conn.execute(stmt, self._params)
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1011, in execute
return meth(self, multiparams, params)
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1130, in _execute_clauseelement
distilled_params,
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1317, in _execute_context
e, statement, parameters, cursor, context
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1511, in _handle_dbapi_exception
sqlalchemy_exception, with_traceback=exc_info[2], from_=e
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
raise exception
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context
cursor, statement, parameters, context
File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 609, in do_execute
cursor.execute(statement, parameters)
sqlalchemy.exc.OperationalError: (psycopg2.errors.DeadlockDetected) deadlock detected
DETAIL: Process 99711 waits for ShareLock on transaction 527390121; blocked by process 100627.
Process 100627 waits for ShareLock on transaction 527390039; blocked by process 99711.
HINT: See server log for query details.
CONTEXT: while updating tuple (48513,18) in relation "task_instance"
[SQL: UPDATE task_instance SET state=%(state)s WHERE task_instance.dag_id = %(dag_id_1)s AND task_instance.run_id = %(run_id_1)s AND task_instance.task_id IN (%(task_id_1)s)]
[parameters: {'state': <TaskInstanceState.SCHEDULED: 'scheduled'>, 'dag_id_1': 'sanitized_dag_id_1', 'run_id_1': 'sanitized_run_id_1', 'task_id_1': 'sanitized_task_id_1'}]
(Background on this error at: http://sqlalche.me/e/13/e3q8)
```
Example Postgres logs showing a complete SELECT ... FOR UPDATE statement:
```
2022-07-06 18:54:25.816 UTC [100639] ERROR: deadlock detected
2022-07-06 18:54:25.816 UTC [100639] DETAIL: Process 100639 waits for ShareLock on transaction 527390039; blocked by process 99711.
Process 99711 waits for ShareLock on transaction 527390130; blocked by process 100639.
Process 100639: SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.run_id AS task_instance_run_id, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.queued_by_job_id AS task_instance_queued_by_job_id,
task_instance.pid AS task_instance_pid, task_insta
Process 99711: UPDATE task_instance SET state='scheduled' WHERE task_instance.dag_id = 'sanitized_dag_id_2' AND task_instance.run_id = 'sanitized_run_id_2' AND task_instance.task_id IN ('sanitized_task_id_2', 'sanitized_task_id_3')
2022-07-06 18:54:25.816 UTC [100639] HINT: See server log for query details.
2022-07-06 18:54:25.816 UTC [100639] CONTEXT: while locking tuple (725,169) in relation "dag_run"
2022-07-06 18:54:25.816 UTC [100639] STATEMENT: SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.run_id AS task_instance_run_id, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.queued_by_job_id AS task
_instance_queued_by_job_id, task_instance.pid AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config, task_instance.external_executor_id AS task_instance_external_executor_id, task_instance.trigger_id AS task_instance_trigger_id, task_instance.trigger_timeout AS task_instance_trigger_timeout, task_instance.next_method AS task_instance_next_method, task_instance.next_kwargs AS task_instance_next_kwargs, dag_run_1.state AS dag_run_1_state, [dag_run_1.id](http://dag_run_1.id/) AS dag_run_1_id, dag_run_1.dag_id AS dag_run_1_dag_id, dag_run_1.queued_at AS dag_run_1_queued_at, dag_run_1.execution_date AS dag_run_1_execution_date, dag_run_1.start_date AS dag_run_1_start_date, dag_run_1.end_date AS dag_run_1_end_date, dag_run_1.run_id AS dag_run_1_run_id, dag_run_1.creating_job_id AS dag_run_1_creating_job_id, dag_run_1.external_trigger AS dag_run_1_external_trigger, dag_run_1.run_type AS dag_run_1_run_type, dag_run_1.conf AS dag_run_1_conf, dag_run_1.data_inte
rval_start AS dag_run_1_data_interval_start, dag_run_1.data_interval_end AS dag_run_1_data_interval_end, dag_run_1.last_scheduling_decision AS dag_run_1_last_scheduling_decision, dag_run_1.dag_hash AS dag_run_1_dag_hash
FROM task_instance JOIN dag_run AS dag_run_1 ON dag_run_1.dag_id = task_instance.dag_id AND dag_run_1.run_id = task_instance.run_id
WHERE task_instance.dag_id = 'sanitized_dag_id_2' AND task_instance.task_id = 'sanitized_task_id_3' AND task_instance.run_id = 'sanitized_run_id_2'
LIMIT 1 FOR UPDATE
```
Unfortunately we are not able to repro this on a test instance so I have not been able to try on newer Airflow versions, but based on the discussion on this thread it sounds like the issue is present until at least 2.3.2.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org