You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Dmitry (Jira)" <ji...@apache.org> on 2019/09/18 14:01:00 UTC
[jira] [Commented] (AIRFLOW-2219) Race condition to
DagRun.verify_integrity between Scheduler and Webserver
[ https://issues.apache.org/jira/browse/AIRFLOW-2219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16932479#comment-16932479 ]
Dmitry commented on AIRFLOW-2219:
---------------------------------
I had same error a couple of times per day from TriggerDagRunOperator, but after i rewrote it, error is almost gone - new operator is working for a month and a half, and issue happened only once during this time. My operator code is attached, two key features is that i checked if dag already started, and i use same session in get_dagrun and create_dagrun functions.
{code:python}
from airflow import settings
from airflow.utils.state import State
from airflow.models import DagBag
from airflow.operators.dagrun_operator import TriggerDagRunOperator, DagRunOrder
class DagRunOperator(TriggerDagRunOperator):
template_fields = ('execution_date',)
ui_color = '#e6ccff'
def __init__(
self,
trigger_dag_id,
python_callable,
execution_date=None,
*args, **kwargs
):
self.execution_date = execution_date
super(DagRunOperator, self).__init__(
trigger_dag_id=trigger_dag_id,
python_callable=python_callable,
*args, **kwargs
)
def execute(self, context):
run_id_dt = datetime.strptime(self.execution_date, '%Y-%m-%d %H:%M:%S') if self.execution_date is not None \
else datetime.now()
dro = DagRunOrder(run_id='trig__' + run_id_dt.isoformat())
dro = self.python_callable(context, dro)
if dro:
session = settings.Session()
dbag = DagBag(settings.DAGS_FOLDER)
trigger_dag = dbag.get_dag(self.trigger_dag_id)
if not trigger_dag.get_dagrun(self.execution_date, session=session):
logging.info("Creating DagRun...")
dr = trigger_dag.create_dagrun(
run_id=dro.run_id,
state=State.RUNNING,
execution_date=self.execution_date,
conf=dro.payload,
session=session,
external_trigger=True
)
logging.info("DagRun Created: {}".format(dr))
session.add(dr)
session.commit()
else:
logging.info("DagRun already exists {}".format(trigger_dag))
session.close()
else:
logging.info("Criteria not met, moving on")
{code}
> Race condition to DagRun.verify_integrity between Scheduler and Webserver
> -------------------------------------------------------------------------
>
> Key: AIRFLOW-2219
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2219
> Project: Apache Airflow
> Issue Type: Bug
> Components: database
> Affects Versions: 1.8.1, 1.9.0
> Reporter: Will Wong
> Priority: Trivial
>
> Symptoms:
> * Triggering dag causes the 404 nuke page with an error message along the lines of: {{psycopg2.IntegrityError: duplicate key value violates unique constraint "task_instance_pkey"}} when calling {{DagRun.verify_integrity}}
> Or
> * Similar error in scheduler log for dag file when scheduling a DAG. (Example exception at the end of description)
> This occurs because {{Dag.create_dagrun}} commits a the dag_run entry to the database and then runs {{verify_integrity}} to add the task_instances immediately. However, the scheduler already picks up a dag run before all task_instances are created and also calls {{verify_integrity}} to create task_instances at the same time.
> I don't _think_ this actually breaks anything in particular. The exception happens either on the webpage or in the scheduler logs:
> * If it occurs in the UI, it just scares people thinking something broke but the task_instances will be created by the scheduler.
> * If the error shows up in the scheduler, the task_instances are created by the webserver and it continues processing the DAG during the next loop.
>
> I'm not sure if {{DagRun.verify_integrity}} is necessary for both {{SchedulerJob._process_task_instances}} as well {{Dag.create_dagrun}} but perhaps we can just stick to one?
>
> {noformat}
> Traceback (most recent call last):
> File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1170, in _execute_context
> context)
> File "/usr/local/lib/python3.6/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py", line 683, in do_executemany
> cursor.executemany(statement, parameters)
> psycopg2.IntegrityError: duplicate key value violates unique constraint "task_instance_pkey"
> DETAIL: Key (task_id, dag_id, execution_date)=(docker_task_10240_7680_0, chunkedgraph_edgetask_scheduler, 2018-03-15 23:46:57.116673) already exists.
> The above exception was the direct cause of the following exception:
> Traceback (most recent call last):
> File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 371, in helper
> pickle_dags)
> File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 50, in wrapper
> result = func(*args, **kwargs)
> File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 1792, in process_file
> self._process_dags(dagbag, dags, ti_keys_to_schedule)
> File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 1391, in _process_dags
> self._process_task_instances(dag, tis_out)
> File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 915, in _process_task_instances
> run.verify_integrity(session=session)
> File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 50, in wrapper
> result = func(*args, **kwargs)
> File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 4786, in verify_integrity
> session.commit()
> File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 943, in commit
> self.transaction.commit()
> File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 467, in commit
> self._prepare_impl()
> File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 447, in _prepare_impl
> self.session.flush()
> File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 2254, in flush
> self._flush(objects)
> File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 2380, in _flush
> transaction.rollback(_capture_exception=True)
> File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 66, in __exit__
> compat.reraise(exc_type, exc_value, exc_tb)
> File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 187, in reraise
> raise value
> File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 2344, in _flush
> flush_context.execute()
> File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/unitofwork.py", line 391, in execute
> rec.execute(self)
> File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/unitofwork.py", line 556, in execute
> uow
> File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/persistence.py", line 181, in save_obj
> mapper, table, insert)
> File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/persistence.py", line 830, in _emit_insert_statements
> execute(statement, multiparams)
> File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 948, in execute
> return meth(self, multiparams, params)
> File "/usr/local/lib/python3.6/site-packages/sqlalchemy/sql/elements.py", line 269, in _execute_on_connection
> return connection._execute_clauseelement(self, multiparams, params)
> File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1060, in _execute_clauseelement
> compiled_sql, distilled_params
> File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1200, in _execute_context
> context)
> File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1413, in _handle_dbapi_exception
> exc_info
> File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 203, in raise_from_cause
> reraise(type(exception), exception, tb=exc_tb, cause=cause)
> File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 186, in reraise
> raise value.with_traceback(tb)
> File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1170, in _execute_context
> context)
> File "/usr/local/lib/python3.6/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py", line 683, in do_executemany
> cursor.executemany(statement, parameters)
> DETAIL: Key (task_id, dag_id, execution_date)=(docker_task_10240_7680_0, chunkedgraph_edgetask_scheduler, 2018-03-15 23:46:57.116673) already exists.
> {noformat}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)