You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/07/08 19:51:48 UTC

[GitHub] [airflow] dstaple commented on issue #23361: Scheduler crashes with psycopg2.errors.DeadlockDetected exception

dstaple commented on issue #23361:
URL: https://github.com/apache/airflow/issues/23361#issuecomment-1179314412

   I am also encountering this issue. I collected some details about both queries involved in the deadlock, hopefully this is helpful.
   
   Deployment details:
   * Airflow 2.2.5
   * KuberneteExecutor
   * A single Airflow scheduler is running.
   * Row level locking is enabled.
   * Scheduler parsing_processes = 5
   * Scheduler resources: 8 cores, 5 GB RAM
   * Database resources: 12 cores, 8 GB RAM (Postgres 11.3)
   * The problem only appears at scale (50-150 DAGs, several of which have hundreds of tasks).
   * The problem is not easily reproducible but is happening daily.
   
   In the deadlocks there is an UPDATE statement deadlocking with a SELECT ... FOR UPDATE.
   
   Based on stack traces visible in the scheduler logs, the UPDATE originates from the main scheduler loop here:
   https://github.com/apache/airflow/blob/2.2.5/airflow/models/dagrun.py#L901-L910
   
   
   Based on the database logs, the SELECT statement has the form:
   ```
   SELECT task_instance.try_number AS task_instance_try_number, ...
   FROM task_instance JOIN dag_run AS dag_run_1 ON dag_run_1.dag_id = task_instance.dag_id AND dag_run_1.run_id = task_instance.run_id
           WHERE task_instance.dag_id = 'my_dag_id' AND task_instance.task_id = 'my_task_id' AND task_instance.run_id = 'sanitized_run_id_1'
            LIMIT 1 FOR UPDATE
   ```
   
   Searching the Airflow source code, the query that looks most similar to the SELECT from the database error is in `TaskInstance.refresh_from_db()`:
   https://github.com/apache/airflow/blob/2.2.5/airflow/models/taskinstance.py#L714-L736
   
   
   Example scheduler logs showing the origins of the UPDATE statement:
   ```
   [2022-07-06 18:54:29,456] {{scheduler_job.py:753}} INFO - Exited execute loop
   Traceback (most recent call last):
     File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context
       cursor, statement, parameters, context
     File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 609, in do_execute
       cursor.execute(statement, parameters)
   psycopg2.errors.DeadlockDetected: deadlock detected
   DETAIL:  Process 99711 waits for ShareLock on transaction 527390121; blocked by process 100627.
   Process 100627 waits for ShareLock on transaction 527390039; blocked by process 99711.
   HINT:  See server log for query details.
   CONTEXT:  while updating tuple (48513,18) in relation "task_instance"
   
   
   The above exception was the direct cause of the following exception:
   
   Traceback (most recent call last):
     File "/opt/airflow/venv-py3/bin/airflow", line 8, in <module>
       sys.exit(main())
     File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/__main__.py", line 48, in main
       args.func(args)
     File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/cli/cli_parser.py", line 48, in command
       return func(*args, **kwargs)
     File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/utils/cli.py", line 92, in wrapper
       return f(*args, **kwargs)
     File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/cli/commands/scheduler_command.py", line 75, in scheduler
       _run_scheduler_job(args=args)
     File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/cli/commands/scheduler_command.py", line 46, in _run_scheduler_job
       job.run()
     File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/jobs/base_job.py", line 246, in run
       self._execute()
     File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 726, in _execute
       self._run_scheduler_loop()
     File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 807, in _run_scheduler_loop
       num_queued_tis = self._do_scheduling(session)
     File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 890, in _do_scheduling
       callback_to_run = self._schedule_dag_run(dag_run, session)
     File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1147, in _schedule_dag_run
       dag_run.schedule_tis(schedulable_tis, session)
     File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/utils/session.py", line 67, in wrapper
       return func(*args, **kwargs)
     File "/opt/airflow/venv-py3/lib/python3.7/site-packages/airflow/models/dagrun.py", line 909, in schedule_tis
       .update({TI.state: State.SCHEDULED}, synchronize_session=False)
     File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 4063, in update
       update_op.exec_()
     File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 1697, in exec_
       self._do_exec()
     File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 1895, in _do_exec
       self._execute_stmt(update_stmt)
     File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 1702, in _execute_stmt
       self.result = self.query._execute_crud(stmt, self.mapper)
     File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 3568, in _execute_crud
       return conn.execute(stmt, self._params)
     File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1011, in execute
       return meth(self, multiparams, params)
     File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection
       return connection._execute_clauseelement(self, multiparams, params)
     File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1130, in _execute_clauseelement
       distilled_params,
     File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1317, in _execute_context
       e, statement, parameters, cursor, context
     File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1511, in _handle_dbapi_exception
       sqlalchemy_exception, with_traceback=exc_info[2], from_=e
     File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
       raise exception
     File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context
       cursor, statement, parameters, context
     File "/opt/airflow/venv-py3/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 609, in do_execute
       cursor.execute(statement, parameters)
   sqlalchemy.exc.OperationalError: (psycopg2.errors.DeadlockDetected) deadlock detected
   DETAIL:  Process 99711 waits for ShareLock on transaction 527390121; blocked by process 100627.
   Process 100627 waits for ShareLock on transaction 527390039; blocked by process 99711.
   HINT:  See server log for query details.
   CONTEXT:  while updating tuple (48513,18) in relation "task_instance"
   
   [SQL: UPDATE task_instance SET state=%(state)s WHERE task_instance.dag_id = %(dag_id_1)s AND task_instance.run_id = %(run_id_1)s AND task_instance.task_id IN (%(task_id_1)s)]
   [parameters: {'state': <TaskInstanceState.SCHEDULED: 'scheduled'>, 'dag_id_1': 'sanitized_dag_id_1', 'run_id_1': 'sanitized_run_id_1', 'task_id_1': 'sanitized_task_id_1'}]
   (Background on this error at: http://sqlalche.me/e/13/e3q8)
   ```
   
   Example Postgres logs showing a complete SELECT ... FOR UPDATE statement:
   ```
   2022-07-06 18:54:25.816 UTC [100639] ERROR:  deadlock detected
   2022-07-06 18:54:25.816 UTC [100639] DETAIL:  Process 100639 waits for ShareLock on transaction 527390039; blocked by process 99711.
       Process 99711 waits for ShareLock on transaction 527390130; blocked by process 100639.
       Process 100639: SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.run_id AS task_instance_run_id, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.queued_by_job_id AS task_instance_queued_by_job_id, 
 task_instance.pid AS task_instance_pid, task_insta
       Process 99711: UPDATE task_instance SET state='scheduled' WHERE task_instance.dag_id = 'sanitized_dag_id_2' AND task_instance.run_id = 'sanitized_run_id_2' AND task_instance.task_id IN ('sanitized_task_id_2', 'sanitized_task_id_3')
   2022-07-06 18:54:25.816 UTC [100639] HINT:  See server log for query details.
   2022-07-06 18:54:25.816 UTC [100639] CONTEXT:  while locking tuple (725,169) in relation "dag_run"
   2022-07-06 18:54:25.816 UTC [100639] STATEMENT:  SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.run_id AS task_instance_run_id, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.queued_by_job_id AS task
 _instance_queued_by_job_id, task_instance.pid AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config, task_instance.external_executor_id AS task_instance_external_executor_id, task_instance.trigger_id AS task_instance_trigger_id, task_instance.trigger_timeout AS task_instance_trigger_timeout, task_instance.next_method AS task_instance_next_method, task_instance.next_kwargs AS task_instance_next_kwargs, dag_run_1.state AS dag_run_1_state, [dag_run_1.id](http://dag_run_1.id/) AS dag_run_1_id, dag_run_1.dag_id AS dag_run_1_dag_id, dag_run_1.queued_at AS dag_run_1_queued_at, dag_run_1.execution_date AS dag_run_1_execution_date, dag_run_1.start_date AS dag_run_1_start_date, dag_run_1.end_date AS dag_run_1_end_date, dag_run_1.run_id AS dag_run_1_run_id, dag_run_1.creating_job_id AS dag_run_1_creating_job_id, dag_run_1.external_trigger AS dag_run_1_external_trigger, dag_run_1.run_type AS dag_run_1_run_type, dag_run_1.conf AS dag_run_1_conf, dag_run_1.data_inte
 rval_start AS dag_run_1_data_interval_start, dag_run_1.data_interval_end AS dag_run_1_data_interval_end, dag_run_1.last_scheduling_decision AS dag_run_1_last_scheduling_decision, dag_run_1.dag_hash AS dag_run_1_dag_hash
       FROM task_instance JOIN dag_run AS dag_run_1 ON dag_run_1.dag_id = task_instance.dag_id AND dag_run_1.run_id = task_instance.run_id
       WHERE task_instance.dag_id = 'sanitized_dag_id_2' AND task_instance.task_id = 'sanitized_task_id_3' AND task_instance.run_id = 'sanitized_run_id_2'
        LIMIT 1 FOR UPDATE
   ```
   
   Unfortunately we are not able to repro this on a test instance so I have not been able to try on newer Airflow versions, but based on the discussion on this thread it sounds like the issue is present until at least 2.3.2.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org