You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Ruslan Dautkhanov (JIRA)" <ji...@apache.org> on 2017/03/14 17:20:41 UTC

[jira] [Created] (AIRFLOW-980) IntegrityError: (psycopg2.IntegrityError) duplicate key value violates unique constraint "dag_run_dag_id_key" on sample DAGs

Ruslan Dautkhanov created AIRFLOW-980:
-----------------------------------------

             Summary: IntegrityError: (psycopg2.IntegrityError) duplicate key value violates unique constraint "dag_run_dag_id_key" on sample DAGs
                 Key: AIRFLOW-980
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-980
             Project: Apache Airflow
          Issue Type: Bug
    Affects Versions: Airflow 1.7.1.3
         Environment: Local Executor
postgresql+psycopg2 database backend
            Reporter: Ruslan Dautkhanov


Fresh Airflow install using pip.
Only sample DAGs are installed.
LocalExecutor (4 workers).
Most of the parameters are at defaults.

Turned On all of the sample DAGs (14 of them).
After some execution (a lot of DAGs had at least one successful execution),
started seeing below error stack again and again .. In scheduler log.

{noformat}
IntegrityError: (psycopg2.IntegrityError) duplicate key value violates unique constraint "dag_run_dag_id_key"
 [SQL: 'INSERT INTO dag_run (dag_id, execution_date, start_date, end_date, state, run_id, external_trigger, conf) VALUES (%(dag_id)s, %(execution_date)s, %(start_date)s, %(end_date)s, %(state)s, %(run_id)s, %(external_trigger)s, %(conf)s) RETURNING dag_run.id'] [parameters: {'end_date': None, 'run_id': u'scheduled__2015-01-01T00:00:00', 'execution_date': datetime.datetime(2015, 1, 1, 0, 0), 'external_trigger': False, 'state': u'running', 'conf': None, 'start_date': datetime.datetime(2017, 3, 14, 11, 12, 29, 646995), 'dag_id': 'example_xcom'}]
Process Process-152:
Traceback (most recent call last):
  File "/opt/cloudera/parcels/Anaconda/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/opt/cloudera/parcels/Anaconda/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py", line 664, in _do_dags
    dag = dagbag.get_dag(dag.dag_id)
  File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/models.py", line 188, in get_dag
    orm_dag = DagModel.get_current(root_dag_id)
  File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/models.py", line 2320, in get_current
    obj = session.query(cls).filter(cls.dag_id == dag_id).first()
  File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2634, in first
    ret = list(self[0:1])
  File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2457, in __getitem__
    return list(res)
  File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2736, in __iter__
    return self._execute_and_instances(context)
  File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2749, in _execute_and_instances
    close_with_result=True)
  File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2740, in _connection_from_session
    **kw)
  File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 893, in connection
    execution_options=execution_options)
  File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 898, in _connection_for_bind
    engine, execution_options)
  File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 313, in _connection_for_bind
    self._assert_active()
  File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 214, in _assert_active
    % self._rollback_exception
InvalidRequestError: This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: (psycopg2.IntegrityError) duplicate key value violates unique constraint "dag_run_dag_id_key"
 [SQL: 'INSERT INTO dag_run (dag_id, execution_date, start_date, end_date, state, run_id, external_trigger, conf) VALUES (%(dag_id)s, %(execution_date)s, %(start_date)s, %(end_date)s, %(state)s, %(run_id)s, %(external_trigger)s, %(conf)s) RETURNING dag_run.id'] [parameters: {'end_date': None, 'run_id': u'scheduled__2015-01-01T00:00:00', 'execution_date': datetime.datetime(2015, 1, 1, 0, 0), 'external_trigger': False, 'state': u'running', 'conf': None, 'start_date': datetime.datetime(2017, 3, 14, 11, 12, 29, 646995), 'dag_id': 'example_xcom'}]
[2017-03-14 11:12:29,757] {jobs.py:741} INFO - Done queuing tasks, calling the executor's heartbeat
[2017-03-14 11:12:29,757] {jobs.py:744} INFO - Loop took: 29.335935 seconds
{noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)