You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "gero (JIRA)" <ji...@apache.org> on 2017/04/21 00:41:04 UTC

[jira] [Commented] (AIRFLOW-492) Insert into dag_stats table results into failed task while task itself succeeded

    [ https://issues.apache.org/jira/browse/AIRFLOW-492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15977869#comment-15977869 ] 

gero commented on AIRFLOW-492:
------------------------------

Hey guys, I'm seeing the same problem. I have a sub-dag (along with sub dag operator) -- it contains two python operators. The individual python operators are completing successfully, but the sub dag operator gets marked failed because of the dup key error. This seems to be happening with some of my backfill run -- not all of them. Is there a workaround or a solution?



> Insert into dag_stats table results into failed task while task itself succeeded
> --------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-492
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-492
>             Project: Apache Airflow
>          Issue Type: Bug
>            Reporter: Bolke de Bruin
>            Assignee: Siddharth Anand
>            Priority: Critical
>         Attachments: subdag_test.py
>
>
> In some occasions there seem to be a duplicate key being inserted in dag_stats that results in a task/dag run being marked failed while the task itself has succeeded.
> [2016-09-07 18:44:16,940] {models.py:3912} INFO - Marking run <DagRun hanging_subdags_n16_sqe.level_2_14 @ 2016-04-21 00:00:00: backfill_2016-04-21T00:00:00, externally triggered: False> successful
> [2016-09-07 18:44:17,671] {models.py:1450} ERROR - (_mysql_exceptions.IntegrityError) (1062, "Duplicate entry 'hanging_subdags_n16_sqe.level_2_14-success' for key 'PRIMARY'") [SQL: u'INSERT INTO dag_stats (dag_id, state, count, dirty) VALUES (%s, %s, %s, %s)'] [parameters: ('hanging_subdags_n16_sqe.level_2_14', 'success', 3L, 0)]
> Traceback (most recent call last):
>   File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/models.py", line 1409, in run
>     result = task_copy.execute(context=context)
>   File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/operators/subdag_operator.py", line 88, in execute
>     executor=self.executor)
>   File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/models.py", line 3244, in run
>     job.run()
>   File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/jobs.py", line 189, in run
>     self._execute()
>   File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/jobs.py", line 1855, in _execute
>     models.DagStat.clean_dirty([run.dag_id], session=session)
>   File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/utils/db.py", line 54, in wrapper
>     result = func(*args, **kwargs)
>   File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/models.py", line 3695, in clean_dirty
>     session.commit()
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 801, in commit
>     self.transaction.commit()
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 392, in commit
>     self._prepare_impl()
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 372, in _prepare_impl
>     self.session.flush()
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 2019, in flush
>     self._flush(objects)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 2137, in _flush
>     transaction.rollback(_capture_exception=True)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/util/langhelpers.py", line 60, in __exit__
>     compat.reraise(exc_type, exc_value, exc_tb)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 2101, in _flush
>     flush_context.execute()
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/unitofwork.py", line 373, in execute
>     rec.execute(self)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/unitofwork.py", line 532, in execute
>     uow
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/persistence.py", line 174, in save_obj
>     mapper, table, insert)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/persistence.py", line 767, in _emit_insert_statements
>     execute(statement, multiparams)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 914, in execute
>     return meth(self, multiparams, params)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/sql/elements.py", line 323, in _execute_on_connection
>     return connection._execute_clauseelement(self, multiparams, params)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 1010, in _execute_clauseelement
>     compiled_sql, distilled_params
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 1146, in _execute_context
>     context)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 1341, in _handle_dbapi_exception
>     exc_info
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/util/compat.py", line 200, in raise_from_cause
>     reraise(type(exception), exception, tb=exc_tb, cause=cause)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 1139, in _execute_context
>     context)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/default.py", line 450, in do_execute
>     cursor.execute(statement, parameters)
>   File "/usr/local/lib/python2.7/dist-packages/MySQLdb/cursors.py", line 226, in execute
>     self.errorhandler(self, exc, value)
>   File "/usr/local/lib/python2.7/dist-packages/MySQLdb/connections.py", line 36, in defaulterrorhandler
>     raise errorvalue
> IntegrityError: (_mysql_exceptions.IntegrityError) (1062, "Duplicate entry 'hanging_subdags_n16_sqe.level_2_14-success' for key 'PRIMARY'") [SQL: u'INSERT INTO dag_stats (dag_id, state, count, dirty) VALUES (%s, %s, %s, %s)'] [parameters: ('hanging_subdags_n16_sqe.level_2_14', 'success', 3L, 0)]
> [2016-09-07 18:44:17,787] {models.py:1473} INFO - Marking task as FAILED.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)