You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Bolke de Bruin (JIRA)" <ji...@apache.org> on 2017/01/06 20:44:58 UTC

[jira] [Created] (AIRFLOW-738) XCom: Deadlock found when trying to get lock; try restarting transaction

Bolke de Bruin created AIRFLOW-738:
--------------------------------------

             Summary: XCom: Deadlock found when trying to get lock; try restarting transaction
                 Key: AIRFLOW-738
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-738
             Project: Apache Airflow
          Issue Type: Bug
    Affects Versions: Airflow 1.8
            Reporter: Bolke de Bruin
            Priority: Blocker


When using the following dag:

{code}
from datetime import datetime, timedelta
import logging
import pprint
import random

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.operators.python_operator import PythonOperator


start_time = datetime.now().replace(minute=0, second=0, microsecond=0)
start_time += timedelta(hours=-1)  # timedelta(days=-2)

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': start_time,
    'email': ['alex.papanic@gmail.com'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=1)
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG(
    'xcom_test',
    default_args=default_args,
    schedule_interval='@once')


def upload_activity_status(pgconn_id, **context):
    upstream_task_ids = context['task'].upstream_task_ids
    logging.info(
        "Getting status from upstream task {}".format(upstream_task_ids))
    status = context['ti'].xcom_pull(task_ids=upstream_task_ids)
    logging.info("Xcom pull results:\n{}".format(pprint.pformat(status)))
    logging.info("Upload to DB here")


upload_ativity_status = PythonOperator(
    task_id='upload_activity_status',
    python_callable=upload_activity_status,
    op_kwargs={'pgconn_id': 'postgres_conn'},
    provide_context=True,
    dag=dag)


def poll_data(params, execution_date, **context):
    logging.info("Test polling function for {data_stream}".format(**params))
    status = random.random() < 0.5
    output = dict(
        data_stream=params['data_stream'],
        timeperiod=execution_date + timedelta(hours=-1),
        status=status
    )
    return output


def poll_data_factory(data_stream, dag):
    return PythonOperator(
        task_id='poll_{}'.format(data_stream),
        python_callable=poll_data,
        params={u'data_stream': data_stream},
        provide_context=True,
        dag=dag
    )

poll_streams = []
streams = ['stream' + str(i) for i in range(30)]

for data_stream in streams:
    poll = poll_data_factory(data_stream, dag)
    poll_streams.append(poll)
    upload_ativity_status.set_upstream(poll)

{code}

The following error is thrown:

{code}
2017-01-06 21:41:35,824] {jobs.py:1433} INFO - Heartbeating the scheduler
Traceback (most recent call last):
  File "/Users/bolke/Documents/dev/airflow_env/bin/airflow", line 4, in <module>
    __import__('pkg_resources').run_script('airflow==1.7.2.dev0', 'airflow')
  File "/Users/bolke/Documents/dev/airflow_env/lib/python2.7/site-packages/pkg_resources/__init__.py", line 739, in run_script
    self.require(requires)[0].run_script(script_name, ns)
  File "/Users/bolke/Documents/dev/airflow_env/lib/python2.7/site-packages/pkg_resources/__init__.py", line 1494, in run_script
    exec(code, namespace, namespace)
  File "/Users/bolke/Documents/dev/airflow_env/lib/python2.7/site-packages/airflow-1.7.2.dev0-py2.7.egg/EGG-INFO/scripts/airflow", line 28, in <module>
    args.func(args)
  File "/Users/bolke/Documents/dev/airflow_env/lib/python2.7/site-packages/airflow-1.7.2.dev0-py2.7.egg/airflow/bin/cli.py", line 380, in run
    pool=args.pool,
  File "/Users/bolke/Documents/dev/airflow_env/lib/python2.7/site-packages/airflow-1.7.2.dev0-py2.7.egg/airflow/utils/db.py", line 54, in wrapper
    result = func(*args, **kwargs)
  File "/Users/bolke/Documents/dev/airflow_env/lib/python2.7/site-packages/airflow-1.7.2.dev0-py2.7.egg/airflow/models.py", line 1334, in run
    self.handle_failure(e, test_mode, context)
  File "/Users/bolke/Documents/dev/airflow_env/lib/python2.7/site-packages/airflow-1.7.2.dev0-py2.7.egg/airflow/models.py", line 1407, in handle_failure
    session.merge(self)
  File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/session.py", line 1815, in merge
  File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/session.py", line 1861, in _merge
  File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/query.py", line 831, in get
  File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/query.py", line 864, in _get_impl
  File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/loading.py", line 223, in load_on_ident
  File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/query.py", line 2756, in one
  File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/query.py", line 2726, in one_or_none
  File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/query.py", line 2797, in __iter__
  File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/query.py", line 2818, in _execute_and_instances
  File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/query.py", line 2827, in _get_bind_args
  File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/query.py", line 2809, in _connection_from_session
  File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/session.py", line 966, in connection
  File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/session.py", line 971, in _connection_for_bind
  File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/session.py", line 382, in _connection_for_bind
  File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/session.py", line 276, in _assert_active
sqlalchemy.exc.InvalidRequestError: This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: (_mysql_exceptions.OperationalError) (1213, 'Deadlock found when trying to get lock; try restarting transaction') [SQL: u'INSERT INTO xcom (`key`, value, timestamp, execution_date, task_id, dag_id) VALUES (%s, %s, now(), %s, %s, %s)'] [parameters: (u'return_value', '\x80\x02}q\x00(U\x06statusq\x01\x89U\ntimeperiodq\x02cdatetime\ndatetime\nq\x03U\n\x07\xe1\x01\x06\x13\x00\x00\x00\x00\x00q\x04\x85q\x05Rq\x06U\x0bdata_streamq\x07U\x08stream26q\x08u.', datetime.datetime(2017, 1, 6, 20, 0), 'poll_stream26', 'xcom_test')]
{code}




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)