You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Bolke de Bruin (JIRA)" <ji...@apache.org> on 2016/09/07 21:08:20 UTC
[jira] [Created] (AIRFLOW-492) Insert into dag_stats table results
into failed task while task itself succeeded
Bolke de Bruin created AIRFLOW-492:
--------------------------------------
Summary: Insert into dag_stats table results into failed task while task itself succeeded
Key: AIRFLOW-492
URL: https://issues.apache.org/jira/browse/AIRFLOW-492
Project: Apache Airflow
Issue Type: Bug
Reporter: Bolke de Bruin
Assignee: siddharth anand
Priority: Critical
In some occasions there seem to be a duplicate key being inserted in dag_stats that results in a task/dag run being marked failed while the task itself has succeeded.
[2016-09-07 18:44:16,940] {models.py:3912} INFO - Marking run <DagRun hanging_subdags_n16_sqe.level_2_14 @ 2016-04-21 00:00:00: backfill_2016-04-21T00:00:00, externally triggered: False> successful
[2016-09-07 18:44:17,671] {models.py:1450} ERROR - (_mysql_exceptions.IntegrityError) (1062, "Duplicate entry 'hanging_subdags_n16_sqe.level_2_14-success' for key 'PRIMARY'") [SQL: u'INSERT INTO dag_stats (dag_id, state, count, dirty) VALUES (%s, %s, %s, %s)'] [parameters: ('hanging_subdags_n16_sqe.level_2_14', 'success', 3L, 0)]
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/models.py", line 1409, in run
result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/operators/subdag_operator.py", line 88, in execute
executor=self.executor)
File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/models.py", line 3244, in run
job.run()
File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/jobs.py", line 189, in run
self._execute()
File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/jobs.py", line 1855, in _execute
models.DagStat.clean_dirty([run.dag_id], session=session)
File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/utils/db.py", line 54, in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/models.py", line 3695, in clean_dirty
session.commit()
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 801, in commit
self.transaction.commit()
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 392, in commit
self._prepare_impl()
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 372, in _prepare_impl
self.session.flush()
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 2019, in flush
self._flush(objects)
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 2137, in _flush
transaction.rollback(_capture_exception=True)
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/util/langhelpers.py", line 60, in __exit__
compat.reraise(exc_type, exc_value, exc_tb)
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 2101, in _flush
flush_context.execute()
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/unitofwork.py", line 373, in execute
rec.execute(self)
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/unitofwork.py", line 532, in execute
uow
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/persistence.py", line 174, in save_obj
mapper, table, insert)
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/persistence.py", line 767, in _emit_insert_statements
execute(statement, multiparams)
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 914, in execute
return meth(self, multiparams, params)
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/sql/elements.py", line 323, in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 1010, in _execute_clauseelement
compiled_sql, distilled_params
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 1146, in _execute_context
context)
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 1341, in _handle_dbapi_exception
exc_info
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/util/compat.py", line 200, in raise_from_cause
reraise(type(exception), exception, tb=exc_tb, cause=cause)
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 1139, in _execute_context
context)
File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/default.py", line 450, in do_execute
cursor.execute(statement, parameters)
File "/usr/local/lib/python2.7/dist-packages/MySQLdb/cursors.py", line 226, in execute
self.errorhandler(self, exc, value)
File "/usr/local/lib/python2.7/dist-packages/MySQLdb/connections.py", line 36, in defaulterrorhandler
raise errorvalue
IntegrityError: (_mysql_exceptions.IntegrityError) (1062, "Duplicate entry 'hanging_subdags_n16_sqe.level_2_14-success' for key 'PRIMARY'") [SQL: u'INSERT INTO dag_stats (dag_id, state, count, dirty) VALUES (%s, %s, %s, %s)'] [parameters: ('hanging_subdags_n16_sqe.level_2_14', 'success', 3L, 0)]
[2016-09-07 18:44:17,787] {models.py:1473} INFO - Marking task as FAILED.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)