You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Ash Berlin-Taylor (Jira)" <ji...@apache.org> on 2020/02/14 14:00:00 UTC

[jira] [Issue Comment Deleted] (AIRFLOW-2219) Race condition to DagRun.verify_integrity between Scheduler and Webserver

     [ https://issues.apache.org/jira/browse/AIRFLOW-2219?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Ash Berlin-Taylor updated AIRFLOW-2219:
---------------------------------------
    Comment: was deleted

(was: i get same error using trigger_dag in cli:

 

env localexecutor, mysql metastore, airflow 1.10.3 – sqlalchemy.exc.IntegrityError: (MySQLdb._exceptions.IntegrityError) (1062, "Duplicate entry 'redact' for key 'PRIMARY'") [SQL: u'INSERT INTO task_instance 

 

[2019-06-26 20:20:37,124] \{__init__.py:305} INFO - Filling up the DagBag from /home/ec2-user/airflow/dags
Traceback (most recent call last):
 File "/home/ec2-user/venv/bin/airflow", line 32, in <module>
 args.func(args)
 File "/home/ec2-user/venv/local/lib/python2.7/site-packages/airflow/utils/cli.py", line 74, in wrapper
 return f(*args, **kwargs)
 File "/home/ec2-user/venv/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 233, in trigger_dag
 execution_date=args.exec_date)
 File "/home/ec2-user/venv/local/lib/python2.7/site-packages/airflow/api/client/local_client.py", line 33, in trigger_dag
 execution_date=execution_date)
 File "/home/ec2-user/venv/local/lib/python2.7/site-packages/airflow/api/common/experimental/trigger_dag.py", line 101, in trigger_dag
 replace_microseconds=replace_microseconds,
 File "/home/ec2-user/venv/local/lib/python2.7/site-packages/airflow/api/common/experimental/trigger_dag.py", line 77, in _trigger_dag
 external_trigger=True,
 File "/home/ec2-user/venv/local/lib/python2.7/site-packages/airflow/utils/db.py", line 73, in wrapper
 return func(*args, **kwargs)
 File "/home/ec2-user/venv/local/lib/python2.7/site-packages/airflow/models/__init__.py", line 4095, in create_dagrun
 run.verify_integrity(session=session)
 File "/home/ec2-user/venv/local/lib/python2.7/site-packages/airflow/utils/db.py", line 69, in wrapper
 return func(*args, **kwargs)
 File "/home/ec2-user/venv/local/lib/python2.7/site-packages/airflow/models/__init__.py", line 4934, in verify_integrity
 session.commit()
 File "/home/ec2-user/venv/local/lib64/python2.7/site-packages/sqlalchemy/orm/session.py", line 1023, in commit
 self.transaction.commit()
 File "/home/ec2-user/venv/local/lib64/python2.7/site-packages/sqlalchemy/orm/session.py", line 487, in commit
 self._prepare_impl()
 File "/home/ec2-user/venv/local/lib64/python2.7/site-packages/sqlalchemy/orm/session.py", line 466, in _prepare_impl
 self.session.flush()
 File "/home/ec2-user/venv/local/lib64/python2.7/site-packages/sqlalchemy/orm/session.py", line 2446, in flush
 self._flush(objects)
 File "/home/ec2-user/venv/local/lib64/python2.7/site-packages/sqlalchemy/orm/session.py", line 2584, in _flush
 transaction.rollback(_capture_exception=True)
 File "/home/ec2-user/venv/local/lib64/python2.7/site-packages/sqlalchemy/util/langhelpers.py", line 67, in __exit__
 compat.reraise(exc_type, exc_value, exc_tb)
 File "/home/ec2-user/venv/local/lib64/python2.7/site-packages/sqlalchemy/orm/session.py", line 2544, in _flush
 flush_context.execute()
 File "/home/ec2-user/venv/local/lib64/python2.7/site-packages/sqlalchemy/orm/unitofwork.py", line 416, in execute
 rec.execute(self)
 File "/home/ec2-user/venv/local/lib64/python2.7/site-packages/sqlalchemy/orm/unitofwork.py", line 583, in execute
 uow,
 File "/home/ec2-user/venv/local/lib64/python2.7/site-packages/sqlalchemy/orm/persistence.py", line 245, in save_obj
 insert,
 File "/home/ec2-user/venv/local/lib64/python2.7/site-packages/sqlalchemy/orm/persistence.py", line 1063, in _emit_insert_statements
 c = cached_connections[connection].execute(statement, multiparams)
 File "/home/ec2-user/venv/local/lib64/python2.7/site-packages/sqlalchemy/engine/base.py", line 980, in execute
 return meth(self, multiparams, params)
 File "/home/ec2-user/venv/local/lib64/python2.7/site-packages/sqlalchemy/sql/elements.py", line 273, in _execute_on_connection
 return connection._execute_clauseelement(self, multiparams, params)
 File "/home/ec2-user/venv/local/lib64/python2.7/site-packages/sqlalchemy/engine/base.py", line 1099, in _execute_clauseelement
 distilled_params,
 File "/home/ec2-user/venv/local/lib64/python2.7/site-packages/sqlalchemy/engine/base.py", line 1240, in _execute_context
 e, statement, parameters, cursor, context
 File "/home/ec2-user/venv/local/lib64/python2.7/site-packages/sqlalchemy/engine/base.py", line 1458, in _handle_dbapi_exception
 util.raise_from_cause(sqlalchemy_exception, exc_info)
 File "/home/ec2-user/venv/local/lib64/python2.7/site-packages/sqlalchemy/util/compat.py", line 296, in raise_from_cause
 reraise(type(exception), exception, tb=exc_tb, cause=cause)
 File "/home/ec2-user/venv/local/lib64/python2.7/site-packages/sqlalchemy/engine/base.py", line 1216, in _execute_context
 cursor, statement, parameters, context
 File "/home/ec2-user/venv/local/lib64/python2.7/site-packages/sqlalchemy/dialects/mysql/mysqldb.py", line 107, in do_executemany
 rowcount = cursor.executemany(statement, parameters)
 File "/home/ec2-user/venv/local/lib64/python2.7/site-packages/MySQLdb/cursors.py", line 234, in executemany
 self._get_db().encoding)
 File "/home/ec2-user/venv/local/lib64/python2.7/site-packages/MySQLdb/cursors.py", line 256, in _do_execute_many
 rows += self.execute(sql + postfix)
 File "/home/ec2-user/venv/local/lib64/python2.7/site-packages/MySQLdb/cursors.py", line 206, in execute
 res = self._query(query)
 File "/home/ec2-user/venv/local/lib64/python2.7/site-packages/MySQLdb/cursors.py", line 312, in _query
 db.query(q)
 File "/home/ec2-user/venv/local/lib64/python2.7/site-packages/MySQLdb/connections.py", line 224, in query
 _mysql.connection.query(self, query))

> Race condition to DagRun.verify_integrity between Scheduler and Webserver
> -------------------------------------------------------------------------
>
>                 Key: AIRFLOW-2219
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-2219
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: database
>    Affects Versions: 1.8.1, 1.9.0
>            Reporter: Will Wong
>            Priority: Trivial
>
> Symptoms:
>  * Triggering dag causes the 404 nuke page with an error message along the lines of: {{psycopg2.IntegrityError: duplicate key value violates unique constraint "task_instance_pkey"}} when calling {{DagRun.verify_integrity}}
> Or
>  * Similar error in scheduler log for dag file when scheduling a DAG. (Example exception at the end of description)
> This occurs because {{Dag.create_dagrun}} commits a the dag_run entry to the database and then runs {{verify_integrity}} to add the task_instances immediately. However, the scheduler already picks up a dag run before all task_instances are created and also calls {{verify_integrity}} to create task_instances at the same time.
> I don't _think_ this actually breaks anything in particular. The exception happens either on the webpage or in the scheduler logs:
>  * If it occurs in the UI, it just scares people thinking something broke but the task_instances will be created by the scheduler.
>  * If the error shows up in the scheduler, the task_instances are created by the webserver and it continues processing the DAG during the next loop.
>  
>  I'm not sure if {{DagRun.verify_integrity}} is necessary for both {{SchedulerJob._process_task_instances}} as well {{Dag.create_dagrun}} but perhaps we can just stick to one?
>  
> {noformat}
> Traceback (most recent call last):
>   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1170, in _execute_context
>     context)
>   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py", line 683, in do_executemany
>     cursor.executemany(statement, parameters)
> psycopg2.IntegrityError: duplicate key value violates unique constraint "task_instance_pkey"
> DETAIL:  Key (task_id, dag_id, execution_date)=(docker_task_10240_7680_0, chunkedgraph_edgetask_scheduler, 2018-03-15 23:46:57.116673) already exists.
> The above exception was the direct cause of the following exception:
> Traceback (most recent call last):
>   File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 371, in helper
>     pickle_dags)
>   File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 50, in wrapper
>     result = func(*args, **kwargs)
>   File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 1792, in process_file
>     self._process_dags(dagbag, dags, ti_keys_to_schedule)
>   File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 1391, in _process_dags
>     self._process_task_instances(dag, tis_out)
>   File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 915, in _process_task_instances
>     run.verify_integrity(session=session)
>   File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 50, in wrapper
>     result = func(*args, **kwargs)
>   File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 4786, in verify_integrity
>     session.commit()
>   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 943, in commit
>     self.transaction.commit()
>   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 467, in commit
>     self._prepare_impl()
>   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 447, in _prepare_impl
>     self.session.flush()
>   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 2254, in flush
>     self._flush(objects)
>   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 2380, in _flush
>     transaction.rollback(_capture_exception=True)
>   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 66, in __exit__
>     compat.reraise(exc_type, exc_value, exc_tb)
>   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 187, in reraise
>     raise value
>   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 2344, in _flush
>     flush_context.execute()
>   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/unitofwork.py", line 391, in execute
>     rec.execute(self)
>   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/unitofwork.py", line 556, in execute
>     uow
>   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/persistence.py", line 181, in save_obj
>     mapper, table, insert)
>   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/persistence.py", line 830, in _emit_insert_statements
>     execute(statement, multiparams)
>   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 948, in execute
>     return meth(self, multiparams, params)
>   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/sql/elements.py", line 269, in _execute_on_connection
>     return connection._execute_clauseelement(self, multiparams, params)
>   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1060, in _execute_clauseelement
>     compiled_sql, distilled_params
>   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1200, in _execute_context
>     context)
>   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1413, in _handle_dbapi_exception
>     exc_info
>   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 203, in raise_from_cause
>     reraise(type(exception), exception, tb=exc_tb, cause=cause)
>   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 186, in reraise
>     raise value.with_traceback(tb)
>   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1170, in _execute_context
>     context)
>   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py", line 683, in do_executemany
>     cursor.executemany(statement, parameters)
> DETAIL: Key (task_id, dag_id, execution_date)=(docker_task_10240_7680_0, chunkedgraph_edgetask_scheduler, 2018-03-15 23:46:57.116673) already exists.
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)