You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "ASF subversion and git services (JIRA)" <ji...@apache.org> on 2017/01/10 08:06:58 UTC

[jira] [Commented] (AIRFLOW-738) XCom: Deadlock found when trying to get lock; try restarting transaction

    [ https://issues.apache.org/jira/browse/AIRFLOW-738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15814267#comment-15814267 ] 

ASF subversion and git services commented on AIRFLOW-738:
---------------------------------------------------------

Commit bb883785077f10784255107cbf8c6d5f7b33de15 in incubator-airflow's branch refs/heads/master from [~bolke]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=bb88378 ]

Merge branch 'AIRFLOW-738'


> XCom: Deadlock found when trying to get lock; try restarting transaction
> ------------------------------------------------------------------------
>
>                 Key: AIRFLOW-738
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-738
>             Project: Apache Airflow
>          Issue Type: Bug
>    Affects Versions: Airflow 1.8
>            Reporter: Bolke de Bruin
>            Priority: Blocker
>
> When using the following dag:
> {code}
> from datetime import datetime, timedelta
> import logging
> import pprint
> import random
> # The DAG object; we'll need this to instantiate a DAG
> from airflow import DAG
> # Operators; we need this to operate!
> from airflow.operators.python_operator import PythonOperator
> start_time = datetime.now().replace(minute=0, second=0, microsecond=0)
> start_time += timedelta(hours=-1)  # timedelta(days=-2)
> default_args = {
>     'owner': 'airflow',
>     'depends_on_past': False,
>     'start_date': start_time,
>     'email': ['alex.papanic@gmail.com'],
>     'email_on_failure': True,
>     'email_on_retry': True,
>     'retries': 1,
>     'retry_delay': timedelta(minutes=1)
>     # 'queue': 'bash_queue',
>     # 'pool': 'backfill',
>     # 'priority_weight': 10,
>     # 'end_date': datetime(2016, 1, 1),
> }
> dag = DAG(
>     'xcom_test',
>     default_args=default_args,
>     schedule_interval='@once')
> def upload_activity_status(pgconn_id, **context):
>     upstream_task_ids = context['task'].upstream_task_ids
>     logging.info(
>         "Getting status from upstream task {}".format(upstream_task_ids))
>     status = context['ti'].xcom_pull(task_ids=upstream_task_ids)
>     logging.info("Xcom pull results:\n{}".format(pprint.pformat(status)))
>     logging.info("Upload to DB here")
> upload_ativity_status = PythonOperator(
>     task_id='upload_activity_status',
>     python_callable=upload_activity_status,
>     op_kwargs={'pgconn_id': 'postgres_conn'},
>     provide_context=True,
>     dag=dag)
> def poll_data(params, execution_date, **context):
>     logging.info("Test polling function for {data_stream}".format(**params))
>     status = random.random() < 0.5
>     output = dict(
>         data_stream=params['data_stream'],
>         timeperiod=execution_date + timedelta(hours=-1),
>         status=status
>     )
>     return output
> def poll_data_factory(data_stream, dag):
>     return PythonOperator(
>         task_id='poll_{}'.format(data_stream),
>         python_callable=poll_data,
>         params={u'data_stream': data_stream},
>         provide_context=True,
>         dag=dag
>     )
> poll_streams = []
> streams = ['stream' + str(i) for i in range(30)]
> for data_stream in streams:
>     poll = poll_data_factory(data_stream, dag)
>     poll_streams.append(poll)
>     upload_ativity_status.set_upstream(poll)
> {code}
> The following error is thrown:
> {code}
> 2017-01-06 21:41:35,824] {jobs.py:1433} INFO - Heartbeating the scheduler
> Traceback (most recent call last):
>   File "/Users/bolke/Documents/dev/airflow_env/bin/airflow", line 4, in <module>
>     __import__('pkg_resources').run_script('airflow==1.7.2.dev0', 'airflow')
>   File "/Users/bolke/Documents/dev/airflow_env/lib/python2.7/site-packages/pkg_resources/__init__.py", line 739, in run_script
>     self.require(requires)[0].run_script(script_name, ns)
>   File "/Users/bolke/Documents/dev/airflow_env/lib/python2.7/site-packages/pkg_resources/__init__.py", line 1494, in run_script
>     exec(code, namespace, namespace)
>   File "/Users/bolke/Documents/dev/airflow_env/lib/python2.7/site-packages/airflow-1.7.2.dev0-py2.7.egg/EGG-INFO/scripts/airflow", line 28, in <module>
>     args.func(args)
>   File "/Users/bolke/Documents/dev/airflow_env/lib/python2.7/site-packages/airflow-1.7.2.dev0-py2.7.egg/airflow/bin/cli.py", line 380, in run
>     pool=args.pool,
>   File "/Users/bolke/Documents/dev/airflow_env/lib/python2.7/site-packages/airflow-1.7.2.dev0-py2.7.egg/airflow/utils/db.py", line 54, in wrapper
>     result = func(*args, **kwargs)
>   File "/Users/bolke/Documents/dev/airflow_env/lib/python2.7/site-packages/airflow-1.7.2.dev0-py2.7.egg/airflow/models.py", line 1334, in run
>     self.handle_failure(e, test_mode, context)
>   File "/Users/bolke/Documents/dev/airflow_env/lib/python2.7/site-packages/airflow-1.7.2.dev0-py2.7.egg/airflow/models.py", line 1407, in handle_failure
>     session.merge(self)
>   File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/session.py", line 1815, in merge
>   File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/session.py", line 1861, in _merge
>   File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/query.py", line 831, in get
>   File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/query.py", line 864, in _get_impl
>   File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/loading.py", line 223, in load_on_ident
>   File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/query.py", line 2756, in one
>   File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/query.py", line 2726, in one_or_none
>   File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/query.py", line 2797, in __iter__
>   File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/query.py", line 2818, in _execute_and_instances
>   File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/query.py", line 2827, in _get_bind_args
>   File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/query.py", line 2809, in _connection_from_session
>   File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/session.py", line 966, in connection
>   File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/session.py", line 971, in _connection_for_bind
>   File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/session.py", line 382, in _connection_for_bind
>   File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/session.py", line 276, in _assert_active
> sqlalchemy.exc.InvalidRequestError: This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: (_mysql_exceptions.OperationalError) (1213, 'Deadlock found when trying to get lock; try restarting transaction') [SQL: u'INSERT INTO xcom (`key`, value, timestamp, execution_date, task_id, dag_id) VALUES (%s, %s, now(), %s, %s, %s)'] [parameters: (u'return_value', '\x80\x02}q\x00(U\x06statusq\x01\x89U\ntimeperiodq\x02cdatetime\ndatetime\nq\x03U\n\x07\xe1\x01\x06\x13\x00\x00\x00\x00\x00q\x04\x85q\x05Rq\x06U\x0bdata_streamq\x07U\x08stream26q\x08u.', datetime.datetime(2017, 1, 6, 20, 0), 'poll_stream26', 'xcom_test')]
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)