You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Zhidong (Jira)" <ji...@apache.org> on 2019/12/18 01:59:00 UTC

[jira] [Commented] (AIRFLOW-2516) Deadlock found when trying to update task_instance table

    [ https://issues.apache.org/jira/browse/AIRFLOW-2516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16998737#comment-16998737 ] 

Zhidong commented on AIRFLOW-2516:
----------------------------------

Hi [~higrys], the problem occurs at a very high frequency, especially when task_instance table is too large.

The deadlock happens when both worker and scheduler try to update task_instance at the same time. MySQL rollback one of the transactions, so both worker and scheduler could suffer from this problem.

Workarounds:
 # try/except the deadlock exception in application code, and retry immediately after.
 # drop `ti_state` index to avoid race conditions on the record locks of the index.

 

Engine innodb status
------------------------
LATEST DETECTED DEADLOCK
------------------------
191216  8:52:09
*** (1) TRANSACTION:
TRANSACTION 4AB6056BF, ACTIVE 0 sec fetching rows
mysql tables in use 2, locked 2
LOCK WAIT 5 lock struct(s), heap size 1248, 5 row lock(s)
MySQL thread id 256467507, OS thread handle 0x7f973d815700, query id 14205100829 127.0.0.1 airflow Sending data
UPDATE task_instance, dag_run SET task_instance.state=NULL WHERE task_instance.dag_id IN ('A_Dag', 'B_Dag') AND task_instance.state IN ('queued', 'scheduled') AND dag_run.dag_id = task_instance.dag_id AND dag_run.execution_date = task_instance.execution_date AND dag_run.state != 'running'
*** (1) WAITING FOR THIS LOCK TO BE GRANTED:
RECORD LOCKS space id 0 page no 112745 n bits 128 index `PRIMARY` of table `airflow`.`task_instance` trx id 4AB6056BF lock_mode X locks rec but not gap waiting
*** (2) TRANSACTION:
TRANSACTION 4AB6056BC, ACTIVE 0 sec updating or deleting
mysql tables in use 1, locked 1
4 lock struct(s), heap size 1248, 2 row lock(s), undo log entries 2
MySQL thread id 256620571, OS thread handle 0x7f973d531700, query id 14205100833 192.168.1.1 airflow Updating
UPDATE task_instance SET start_date='2019-12-16 08:51:35.486074', state='running', try_number=1, hostname='192.168.1.1', unixname='airflow', job_id=119942, operator='PythonOperator', pid=60059 WHERE task_instance.task_id = 'X_Task' AND task_instance.dag_id = 'X_Dag' AND task_instance.execution_date = '2019-12-16 08:50:00'
*** (2) HOLDS THE LOCK(S):
RECORD LOCKS space id 0 page no 112745 n bits 128 index `PRIMARY` of table `airflow`.`task_instance` trx id 4AB6056BC lock_mode X locks rec but not gap
*** (2) WAITING FOR THIS LOCK TO BE GRANTED:
RECORD LOCKS space id 0 page no 112079 n bits 232 index `ti_state` of table `airflow`.`task_instance` trx id 4AB6056BC lock_mode X locks rec but not gap waiting
*** WE ROLL BACK TRANSACTION (1)
 
MySQL general.log
191216  8:52:09
                256467507 Query SELECT 1
                256620571 Connect       airflow@192.168.1.1 as anonymous on airflow
                256620571 Query set autocommit=0
                256467507 Query UPDATE task_instance, dag_run SET task_instance.state='failed' WHERE task_instance.dag_id IN ('A_Dag', 'B_Dag') AND task_instance.state IN ('up_for_retry') AND dag_run.dag_id = task_instance.dag_id AND dag_run.execution_date = task_instance.execution_date AND dag_run.state != 'running'
                256620571 Query SELECT 1
                256620571 Query SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.execution_date AS task_instance_execution_date, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.pid AS task_instance_pid FROM task_instance WHERE task_instance.task_id = 'X_Task' AND task_instance.dag_id = 'X_Dag' AND task_instance.execution_date = '2019-12-16 08:50:00'
                256467507 Query commit
                256467507 Query rollback
                256467507 Query SELECT 1
                256620571 Query INSERT INTO log (dttm, dag_id, task_id, event, execution_date, owner, extra) VALUES ('2019-12-16 08:51:35.491292', 'X_Dag', 'X_Task', 'running', '2019-12-16 08:50:00', 'Airflow', NULL)
                256467507 Query UPDATE task_instance, dag_run SET task_instance.state=NULL WHERE task_instance.dag_id IN ('A_Dag', 'B_Dag') AND task_instance.state IN ('queued', 'scheduled') AND dag_run.dag_id = task_instance.dag_id AND dag_run.execution_date = task_instance.execution_date AND dag_run.state != 'running'
                256620571 Query UPDATE task_instance SET start_date='2019-12-16 08:51:35.486074', state='running', try_number=1, hostname='192.168.1.1', unixname='airflow', job_id=119942, operator='PythonOperator', pid=60059 WHERE task_instance.task_id = 'X_Task' AND task_instance.dag_id = 'X_Dag' AND task_instance.execution_date = '2019-12-16 08:50:00'
                256620571 Query commit
                256620571 Query rollback
                256620571 Quit

> Deadlock found when trying to update task_instance table
> --------------------------------------------------------
>
>                 Key: AIRFLOW-2516
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-2516
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: DagRun
>    Affects Versions: 1.8.0
>            Reporter: Jeff Liu
>            Priority: Major
>
>  
>  
> {code:java}
> [2018-05-23 17:59:57,218] {base_task_runner.py:98} INFO - Subtask: [2018-05-23 17:59:57,217] {base_executor.py:49} INFO - Adding to queue: airflow run production_wipeout_wipe_manager.Carat Carat_20180227 2018-05-23T17:41:18.815809 --local -sd DAGS_FOLDER/wipeout/wipeout.py
> [2018-05-23 17:59:57,231] {base_task_runner.py:98} INFO - Subtask: Traceback (most recent call last):
> [2018-05-23 17:59:57,232] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/bin/airflow", line 27, in <module>
> [2018-05-23 17:59:57,232] {base_task_runner.py:98} INFO - Subtask: args.func(args)
> [2018-05-23 17:59:57,232] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
> [2018-05-23 17:59:57,232] {base_task_runner.py:98} INFO - Subtask: pool=args.pool,
> [2018-05-23 17:59:57,233] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
> [2018-05-23 17:59:57,233] {base_task_runner.py:98} INFO - Subtask: result = func(*args, **kwargs)
> [2018-05-23 17:59:57,233] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1532, in _run_raw_task
> [2018-05-23 17:59:57,234] {base_task_runner.py:98} INFO - Subtask: self.handle_failure(e, test_mode, context)
> [2018-05-23 17:59:57,234] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1641, in handle_failure
> [2018-05-23 17:59:57,234] {base_task_runner.py:98} INFO - Subtask: session.merge(self)
> [2018-05-23 17:59:57,235] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 1920, in merge
> [2018-05-23 17:59:57,235] {base_task_runner.py:98} INFO - Subtask: _resolve_conflict_map=_resolve_conflict_map)
> [2018-05-23 17:59:57,235] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 1974, in _merge
> [2018-05-23 17:59:57,236] {base_task_runner.py:98} INFO - Subtask: merged = self.query(mapper.class_).get(key[1])
> [2018-05-23 17:59:57,236] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 882, in get
> [2018-05-23 17:59:57,236] {base_task_runner.py:98} INFO - Subtask: ident, loading.load_on_pk_identity)
> [2018-05-23 17:59:57,236] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 952, in _get_impl
> [2018-05-23 17:59:57,237] {base_task_runner.py:98} INFO - Subtask: return db_load_fn(self, primary_key_identity)
> [2018-05-23 17:59:57,237] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/loading.py", line 247, in load_on_pk_i
> dentity
> [2018-05-23 17:59:57,237] {base_task_runner.py:98} INFO - Subtask: return q.one()
> [2018-05-23 17:59:57,238] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2884, in one
> [2018-05-23 17:59:57,238] {base_task_runner.py:98} INFO - Subtask: ret = self.one_or_none()
> [2018-05-23 17:59:57,238] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2854, in one_or_none
> [2018-05-23 17:59:57,238] {base_task_runner.py:98} INFO - Subtask: ret = list(self)
> [2018-05-23 17:59:57,239] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2925, in __iter__
> [2018-05-23 17:59:57,239] {base_task_runner.py:98} INFO - Subtask: return self._execute_and_instances(context)
> [2018-05-23 17:59:57,239] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2946, in _execute_and_instances
> [2018-05-23 17:59:57,240] {base_task_runner.py:98} INFO - Subtask: close_with_result=True)
> [2018-05-23 17:59:57,240] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2955, in _get_bind_ar
> s
> [2018-05-23 17:59:57,240] {base_task_runner.py:98} INFO - Subtask: **kw
> [2018-05-23 17:59:57,240] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2937, in _connection_f
> rom_session
> [2018-05-23 17:59:57,241] {base_task_runner.py:98} INFO - Subtask: conn = self.session.connection(**kw)
> [2018-05-23 17:59:57,241] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 1035, in connection
> [2018-05-23 17:59:57,241] {base_task_runner.py:98} INFO - Subtask: execution_options=execution_options)
> [2018-05-23 17:59:57,241] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 1040, in _connection
> _for_bind
> [2018-05-23 17:59:57,242] {base_task_runner.py:98} INFO - Subtask: engine, execution_options)
> [2018-05-23 17:59:57,242] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 388, in _connection_
> for_bind
> [2018-05-23 17:59:57,242] {base_task_runner.py:98} INFO - Subtask: self._assert_active()
> [2018-05-23 17:59:57,243] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 276, in _assert_acti
> ve
> [2018-05-23 17:59:57,243] {base_task_runner.py:98} INFO - Subtask: % self._rollback_exception
> [2018-05-23 17:59:57,244] {base_task_runner.py:98} INFO - Subtask: sqlalchemy.exc.InvalidRequestError: This Session's transaction has been rolled back due to a previou
> s exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: (_mysql_exceptions.OperationalError) (1
> 213, 'Deadlock found when trying to get lock; try restarting transaction') [SQL: u'UPDATE task_instance SET state=%s WHERE task_instance.task_id = %s AND task_instance
> .dag_id = %s AND task_instance.execution_date = %s'] [parameters: (u'queued', 'Carat_20180227', 'production_wipeout_wipe_manager.Carat', datetime.datetime(2018, 5, 23,
> 17, 41, 18, 815809))] (Background on this error at: http://sqlalche.me/e/e3q8){code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)