You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Bolke de Bruin (JIRA)" <ji...@apache.org> on 2017/01/06 20:44:58 UTC
[jira] [Created] (AIRFLOW-738) XCom: Deadlock found when trying to
get lock; try restarting transaction
Bolke de Bruin created AIRFLOW-738:
--------------------------------------
Summary: XCom: Deadlock found when trying to get lock; try restarting transaction
Key: AIRFLOW-738
URL: https://issues.apache.org/jira/browse/AIRFLOW-738
Project: Apache Airflow
Issue Type: Bug
Affects Versions: Airflow 1.8
Reporter: Bolke de Bruin
Priority: Blocker
When using the following dag:
{code}
from datetime import datetime, timedelta
import logging
import pprint
import random
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.python_operator import PythonOperator
start_time = datetime.now().replace(minute=0, second=0, microsecond=0)
start_time += timedelta(hours=-1) # timedelta(days=-2)
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': start_time,
'email': ['alex.papanic@gmail.com'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=1)
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG(
'xcom_test',
default_args=default_args,
schedule_interval='@once')
def upload_activity_status(pgconn_id, **context):
upstream_task_ids = context['task'].upstream_task_ids
logging.info(
"Getting status from upstream task {}".format(upstream_task_ids))
status = context['ti'].xcom_pull(task_ids=upstream_task_ids)
logging.info("Xcom pull results:\n{}".format(pprint.pformat(status)))
logging.info("Upload to DB here")
upload_ativity_status = PythonOperator(
task_id='upload_activity_status',
python_callable=upload_activity_status,
op_kwargs={'pgconn_id': 'postgres_conn'},
provide_context=True,
dag=dag)
def poll_data(params, execution_date, **context):
logging.info("Test polling function for {data_stream}".format(**params))
status = random.random() < 0.5
output = dict(
data_stream=params['data_stream'],
timeperiod=execution_date + timedelta(hours=-1),
status=status
)
return output
def poll_data_factory(data_stream, dag):
return PythonOperator(
task_id='poll_{}'.format(data_stream),
python_callable=poll_data,
params={u'data_stream': data_stream},
provide_context=True,
dag=dag
)
poll_streams = []
streams = ['stream' + str(i) for i in range(30)]
for data_stream in streams:
poll = poll_data_factory(data_stream, dag)
poll_streams.append(poll)
upload_ativity_status.set_upstream(poll)
{code}
The following error is thrown:
{code}
2017-01-06 21:41:35,824] {jobs.py:1433} INFO - Heartbeating the scheduler
Traceback (most recent call last):
File "/Users/bolke/Documents/dev/airflow_env/bin/airflow", line 4, in <module>
__import__('pkg_resources').run_script('airflow==1.7.2.dev0', 'airflow')
File "/Users/bolke/Documents/dev/airflow_env/lib/python2.7/site-packages/pkg_resources/__init__.py", line 739, in run_script
self.require(requires)[0].run_script(script_name, ns)
File "/Users/bolke/Documents/dev/airflow_env/lib/python2.7/site-packages/pkg_resources/__init__.py", line 1494, in run_script
exec(code, namespace, namespace)
File "/Users/bolke/Documents/dev/airflow_env/lib/python2.7/site-packages/airflow-1.7.2.dev0-py2.7.egg/EGG-INFO/scripts/airflow", line 28, in <module>
args.func(args)
File "/Users/bolke/Documents/dev/airflow_env/lib/python2.7/site-packages/airflow-1.7.2.dev0-py2.7.egg/airflow/bin/cli.py", line 380, in run
pool=args.pool,
File "/Users/bolke/Documents/dev/airflow_env/lib/python2.7/site-packages/airflow-1.7.2.dev0-py2.7.egg/airflow/utils/db.py", line 54, in wrapper
result = func(*args, **kwargs)
File "/Users/bolke/Documents/dev/airflow_env/lib/python2.7/site-packages/airflow-1.7.2.dev0-py2.7.egg/airflow/models.py", line 1334, in run
self.handle_failure(e, test_mode, context)
File "/Users/bolke/Documents/dev/airflow_env/lib/python2.7/site-packages/airflow-1.7.2.dev0-py2.7.egg/airflow/models.py", line 1407, in handle_failure
session.merge(self)
File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/session.py", line 1815, in merge
File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/session.py", line 1861, in _merge
File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/query.py", line 831, in get
File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/query.py", line 864, in _get_impl
File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/loading.py", line 223, in load_on_ident
File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/query.py", line 2756, in one
File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/query.py", line 2726, in one_or_none
File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/query.py", line 2797, in __iter__
File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/query.py", line 2818, in _execute_and_instances
File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/query.py", line 2827, in _get_bind_args
File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/query.py", line 2809, in _connection_from_session
File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/session.py", line 966, in connection
File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/session.py", line 971, in _connection_for_bind
File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/session.py", line 382, in _connection_for_bind
File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/session.py", line 276, in _assert_active
sqlalchemy.exc.InvalidRequestError: This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: (_mysql_exceptions.OperationalError) (1213, 'Deadlock found when trying to get lock; try restarting transaction') [SQL: u'INSERT INTO xcom (`key`, value, timestamp, execution_date, task_id, dag_id) VALUES (%s, %s, now(), %s, %s, %s)'] [parameters: (u'return_value', '\x80\x02}q\x00(U\x06statusq\x01\x89U\ntimeperiodq\x02cdatetime\ndatetime\nq\x03U\n\x07\xe1\x01\x06\x13\x00\x00\x00\x00\x00q\x04\x85q\x05Rq\x06U\x0bdata_streamq\x07U\x08stream26q\x08u.', datetime.datetime(2017, 1, 6, 20, 0), 'poll_stream26', 'xcom_test')]
{code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)