You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/10/15 10:26:00 UTC
[GitHub] [airflow] potiuk commented on issue #11543: Scheduler Dedlock in tests for MySQL
potiuk commented on issue #11543:
URL: https://github.com/apache/airflow/issues/11543#issuecomment-709110147
Stacktrace for permanent storage:
```
=================================== FAILURES ===================================
______ TestSchedulerJob.test_scheduler_verify_pool_full_2_slots_per_task _______
self = <sqlalchemy.engine.base.Connection object at 0x7f4a588ae100>
dialect = <sqlalchemy.dialects.mysql.mysqldb.MySQLDialect_mysqldb object at 0x7f4a57fc78b0>
constructor = <bound method DefaultExecutionContext._init_compiled of <class 'sqlalchemy.dialects.mysql.mysqldb.MySQLExecutionContext_mysqldb'>>
statement = 'INSERT INTO dag (dag_id, root_dag_id, is_paused, is_subdag, is_active, last_scheduler_run, last_pickled, last_expired..._dagrun, next_dagrun_create_after) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)'
parameters = ('test_scheduler_verify_pool_full_2_slots_per_task', None, 0, 0, 1, None, ...)
args = (<sqlalchemy.dialects.mysql.mysqldb.MySQLCompiler_mysqldb object at 0x7f4a58988490>, [{'concurrency': 16, 'dag_id': 'test_scheduler_verify_pool_full_2_slots_per_task', 'default_view': 'tree', 'description': None, ...}])
conn = <sqlalchemy.pool.base._ConnectionFairy object at 0x7f4a589cd400>
context = <sqlalchemy.dialects.mysql.mysqldb.MySQLExecutionContext_mysqldb object at 0x7f4a588ae9d0>
def _execute_context(
self, dialect, constructor, statement, parameters, *args
):
"""Create an :class:`.ExecutionContext` and execute, returning
a :class:`_engine.ResultProxy`."""
try:
try:
conn = self.__connection
except AttributeError:
# escape "except AttributeError" before revalidating
# to prevent misleading stacktraces in Py3K
conn = None
if conn is None:
conn = self._revalidate_connection()
context = constructor(dialect, self, conn, *args)
except BaseException as e:
self._handle_dbapi_exception(
e, util.text_type(statement), parameters, None, None
)
if context.compiled:
context.pre_exec()
cursor, statement, parameters = (
context.cursor,
context.statement,
context.parameters,
)
if not context.executemany:
parameters = parameters[0]
if self._has_events or self.engine._has_events:
for fn in self.dispatch.before_cursor_execute:
statement, parameters = fn(
self,
cursor,
statement,
parameters,
context,
context.executemany,
)
if self._echo:
self.engine.logger.info(statement)
if not self.engine.hide_parameters:
self.engine.logger.info(
"%r",
sql_util._repr_params(
parameters, batches=10, ismulti=context.executemany
),
)
else:
self.engine.logger.info(
"[SQL parameters hidden due to hide_parameters=True]"
)
evt_handled = False
try:
if context.executemany:
if self.dialect._has_events:
for fn in self.dialect.dispatch.do_executemany:
if fn(cursor, statement, parameters, context):
evt_handled = True
break
if not evt_handled:
self.dialect.do_executemany(
cursor, statement, parameters, context
)
elif not parameters and context.no_parameters:
if self.dialect._has_events:
for fn in self.dialect.dispatch.do_execute_no_params:
if fn(cursor, statement, context):
evt_handled = True
break
if not evt_handled:
self.dialect.do_execute_no_params(
cursor, statement, context
)
else:
if self.dialect._has_events:
for fn in self.dialect.dispatch.do_execute:
if fn(cursor, statement, parameters, context):
evt_handled = True
break
if not evt_handled:
> self.dialect.do_execute(
cursor, statement, parameters, context
)
/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1277:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <sqlalchemy.dialects.mysql.mysqldb.MySQLDialect_mysqldb object at 0x7f4a57fc78b0>
cursor = <MySQLdb.cursors.Cursor object at 0x7f4a588ae2e0>
statement = 'INSERT INTO dag (dag_id, root_dag_id, is_paused, is_subdag, is_active, last_scheduler_run, last_pickled, last_expired..._dagrun, next_dagrun_create_after) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)'
parameters = ('test_scheduler_verify_pool_full_2_slots_per_task', None, 0, 0, 1, None, ...)
context = <sqlalchemy.dialects.mysql.mysqldb.MySQLExecutionContext_mysqldb object at 0x7f4a588ae9d0>
def do_execute(self, cursor, statement, parameters, context=None):
> cursor.execute(statement, parameters)
/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py:593:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <MySQLdb.cursors.Cursor object at 0x7f4a588ae2e0>
query = b'INSERT INTO dag (dag_id, root_dag_id, is_paused, is_subdag, is_active, last_scheduler_run, last_pickled, last_expire... {\\"days\\": 1, \\"seconds\\": 0, \\"microseconds\\": 0}}\', 16, 0, \'2016-01-01 00:00:00\', \'2016-01-02 00:00:00\')'
args = ("'test_scheduler_verify_pool_full_2_slots_per_task'", 'NULL', '0', '0', '1', 'NULL', ...)
def execute(self, query, args=None):
"""Execute a query.
query -- string, query to execute on server
args -- optional sequence or mapping, parameters to use with query.
Note: If args is a sequence, then %s must be used as the
parameter placeholder in the query. If a mapping is used,
%(key)s must be used as the placeholder.
Returns integer represents rows affected, if any
"""
while self.nextset():
pass
db = self._get_db()
# NOTE:
# Python 2: query should be bytes when executing %.
# All unicode in args should be encoded to bytes on Python 2.
# Python 3: query should be str (unicode) when executing %.
# All bytes in args should be decoded with ascii and surrogateescape on Python 3.
# db.literal(obj) always returns str.
if PY2 and isinstance(query, unicode):
query = query.encode(db.encoding)
if args is not None:
if isinstance(args, dict):
args = dict((key, db.literal(item)) for key, item in args.items())
else:
args = tuple(map(db.literal, args))
if not PY2 and isinstance(query, (bytes, bytearray)):
query = query.decode(db.encoding)
try:
query = query % args
except TypeError as m:
self.errorhandler(self, ProgrammingError, str(m))
if isinstance(query, unicode):
query = query.encode(db.encoding, 'surrogateescape')
res = None
try:
res = self._query(query)
except Exception:
exc, value = sys.exc_info()[:2]
> self.errorhandler(self, exc, value)
/usr/local/lib/python3.8/site-packages/MySQLdb/cursors.py:255:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
errorclass = <class '_mysql_exceptions.OperationalError'>
errorvalue = OperationalError(1213, 'Deadlock found when trying to get lock; try restarting transaction')
def defaulterrorhandler(connection, cursor, errorclass, errorvalue):
"""
If cursor is not None, (errorclass, errorvalue) is appended to
cursor.messages; otherwise it is appended to
connection.messages. Then errorclass is raised with errorvalue as
the value.
You can override this with your own error handler by assigning it
to the instance.
"""
error = errorclass, errorvalue
if cursor:
cursor.messages.append(error)
else:
connection.messages.append(error)
del cursor
del connection
if isinstance(errorvalue, BaseException):
> raise errorvalue
/usr/local/lib/python3.8/site-packages/MySQLdb/connections.py:50:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <MySQLdb.cursors.Cursor object at 0x7f4a588ae2e0>
query = b'INSERT INTO dag (dag_id, root_dag_id, is_paused, is_subdag, is_active, last_scheduler_run, last_pickled, last_expire... {\\"days\\": 1, \\"seconds\\": 0, \\"microseconds\\": 0}}\', 16, 0, \'2016-01-01 00:00:00\', \'2016-01-02 00:00:00\')'
args = ("'test_scheduler_verify_pool_full_2_slots_per_task'", 'NULL', '0', '0', '1', 'NULL', ...)
def execute(self, query, args=None):
"""Execute a query.
query -- string, query to execute on server
args -- optional sequence or mapping, parameters to use with query.
Note: If args is a sequence, then %s must be used as the
parameter placeholder in the query. If a mapping is used,
%(key)s must be used as the placeholder.
Returns integer represents rows affected, if any
"""
while self.nextset():
pass
db = self._get_db()
# NOTE:
# Python 2: query should be bytes when executing %.
# All unicode in args should be encoded to bytes on Python 2.
# Python 3: query should be str (unicode) when executing %.
# All bytes in args should be decoded with ascii and surrogateescape on Python 3.
# db.literal(obj) always returns str.
if PY2 and isinstance(query, unicode):
query = query.encode(db.encoding)
if args is not None:
if isinstance(args, dict):
args = dict((key, db.literal(item)) for key, item in args.items())
else:
args = tuple(map(db.literal, args))
if not PY2 and isinstance(query, (bytes, bytearray)):
query = query.decode(db.encoding)
try:
query = query % args
except TypeError as m:
self.errorhandler(self, ProgrammingError, str(m))
if isinstance(query, unicode):
query = query.encode(db.encoding, 'surrogateescape')
res = None
try:
> res = self._query(query)
/usr/local/lib/python3.8/site-packages/MySQLdb/cursors.py:252:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <MySQLdb.cursors.Cursor object at 0x7f4a588ae2e0>
q = b'INSERT INTO dag (dag_id, root_dag_id, is_paused, is_subdag, is_active, last_scheduler_run, last_pickled, last_expire... {\\"days\\": 1, \\"seconds\\": 0, \\"microseconds\\": 0}}\', 16, 0, \'2016-01-01 00:00:00\', \'2016-01-02 00:00:00\')'
def _query(self, q):
db = self._get_db()
self._result = None
> db.query(q)
/usr/local/lib/python3.8/site-packages/MySQLdb/cursors.py:378:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <_mysql.connection open to 'mysql' at 55ec2a735e70>
query = b'INSERT INTO dag (dag_id, root_dag_id, is_paused, is_subdag, is_active, last_scheduler_run, last_pickled, last_expire... {\\"days\\": 1, \\"seconds\\": 0, \\"microseconds\\": 0}}\', 16, 0, \'2016-01-01 00:00:00\', \'2016-01-02 00:00:00\')'
def query(self, query):
# Since _mysql releases GIL while querying, we need immutable buffer.
if isinstance(query, bytearray):
query = bytes(query)
if self.waiter is not None:
self.send_query(query)
self.waiter(self.fileno())
self.read_query_result()
else:
> _mysql.connection.query(self, query)
E _mysql_exceptions.OperationalError: (1213, 'Deadlock found when trying to get lock; try restarting transaction')
/usr/local/lib/python3.8/site-packages/MySQLdb/connections.py:280: OperationalError
The above exception was the direct cause of the following exception:
self = <tests.jobs.test_scheduler_job.TestSchedulerJob testMethod=test_scheduler_verify_pool_full_2_slots_per_task>
def test_scheduler_verify_pool_full_2_slots_per_task(self):
"""
Test task instances not queued when pool is full.
Variation with non-default pool_slots
"""
dag = DAG(
dag_id='test_scheduler_verify_pool_full_2_slots_per_task',
start_date=DEFAULT_DATE)
BashOperator(
task_id='dummy',
dag=dag,
owner='airflow',
pool='test_scheduler_verify_pool_full_2_slots_per_task',
pool_slots=2,
bash_command='echo hi',
)
dagbag = DagBag(dag_folder=os.path.join(settings.DAGS_FOLDER, "no_dags.py"),
include_examples=False,
read_dags_from_db=True)
dagbag.bag_dag(dag=dag, root_dag=dag)
> dagbag.sync_to_db()
tests/jobs/test_scheduler_job.py:2576:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
airflow/utils/session.py:65: in wrapper
return func(*args, **kwargs)
airflow/models/dagbag.py:535: in sync_to_db
DAG.bulk_write_to_db(self.dags.values(), session=session)
airflow/utils/session.py:61: in wrapper
return func(*args, **kwargs)
airflow/models/dag.py:1791: in bulk_write_to_db
session.flush()
/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:2523: in flush
self._flush(objects)
/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:2664: in _flush
transaction.rollback(_capture_exception=True)
/usr/local/lib/python3.8/site-packages/sqlalchemy/util/langhelpers.py:68: in __exit__
compat.raise_(
/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py:178: in raise_
raise exception
/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:2624: in _flush
flush_context.execute()
/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/unitofwork.py:422: in execute
rec.execute(self)
/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/unitofwork.py:586: in execute
persistence.save_obj(
/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py:239: in save_obj
_emit_insert_statements(
/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py:1083: in _emit_insert_statements
c = cached_connections[connection].execute(statement, multiparams)
/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1014: in execute
return meth(self, multiparams, params)
/usr/local/lib/python3.8/site-packages/sqlalchemy/sql/elements.py:298: in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1127: in _execute_clauseelement
ret = self._execute_context(
/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1317: in _execute_context
self._handle_dbapi_exception(
/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1511: in _handle_dbapi_exception
util.raise_(
/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py:178: in raise_
raise exception
/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1277: in _execute_context
self.dialect.do_execute(
/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py:593: in do_execute
cursor.execute(statement, parameters)
/usr/local/lib/python3.8/site-packages/MySQLdb/cursors.py:255: in execute
self.errorhandler(self, exc, value)
/usr/local/lib/python3.8/site-packages/MySQLdb/connections.py:50: in defaulterrorhandler
raise errorvalue
/usr/local/lib/python3.8/site-packages/MySQLdb/cursors.py:252: in execute
res = self._query(query)
/usr/local/lib/python3.8/site-packages/MySQLdb/cursors.py:378: in _query
db.query(q)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <_mysql.connection open to 'mysql' at 55ec2a735e70>
query = b'INSERT INTO dag (dag_id, root_dag_id, is_paused, is_subdag, is_active, last_scheduler_run, last_pickled, last_expire... {\\"days\\": 1, \\"seconds\\": 0, \\"microseconds\\": 0}}\', 16, 0, \'2016-01-01 00:00:00\', \'2016-01-02 00:00:00\')'
def query(self, query):
# Since _mysql releases GIL while querying, we need immutable buffer.
if isinstance(query, bytearray):
query = bytes(query)
if self.waiter is not None:
self.send_query(query)
self.waiter(self.fileno())
self.read_query_result()
else:
> _mysql.connection.query(self, query)
E sqlalchemy.exc.OperationalError: (_mysql_exceptions.OperationalError) (1213, 'Deadlock found when trying to get lock; try restarting transaction')
E [SQL: INSERT INTO dag (dag_id, root_dag_id, is_paused, is_subdag, is_active, last_scheduler_run, last_pickled, last_expired, scheduler_lock, pickle_id, fileloc, owners, description, default_view, schedule_interval, concurrency, has_task_concurrency_limits, next_dagrun, next_dagrun_create_after) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)]
E [parameters: ('test_scheduler_verify_pool_full_2_slots_per_task', None, 0, 0, 1, None, None, None, None, None, '/opt/airflow/tests/jobs/test_scheduler_job.py', 'airflow', None, 'tree', '{"type": "timedelta", "attrs": {"days": 1, "seconds": 0, "microseconds": 0}}', 16, 0, datetime.datetime(2016, 1, 1, 0, 0), datetime.datetime(2016, 1, 2, 0, 0))]
E (Background on this error at: http://sqlalche.me/e/13/e3q8)
/usr/local/lib/python3.8/site-packages/MySQLdb/connections.py:280: OperationalError
----------------------------- Captured stdout call -----------------------------
[2020-10-15 09:35:59,178] {dag.py:1708} INFO - Sync 1 DAGs
[2020-10-15 09:35:59,185] {dag.py:1727} INFO - Creating ORM DAG for test_scheduler_verify_pool_full_2_slots_per_task
[2020-10-15 09:35:59,192] {dag.py:2151} INFO - Setting next_dagrun for test_scheduler_verify_pool_full_2_slots_per_task to 2016-01-01 00:00:00+00:00
------------------------------ Captured log call -------------------------------
INFO airflow.models.dag:dag.py:1708 Sync 1 DAGs
INFO airflow.models.dag:dag.py:1727 Creating ORM DAG for test_scheduler_verify_pool_full_2_slots_per_task
INFO airflow.models.dag:dag.py:2151 Setting next_dagrun for test_scheduler_verify_pool_full_2_slots_per_task to 2016-01-01 00:00:00+00:00
--------------------------- Captured stderr teardown ---------------------------
Process DagFileProcessor0-Process:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/usr/local/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/opt/airflow/airflow/jobs/scheduler_job.py", line 186, in _run_file_processor
result_channel.send(result)
File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 411, in _send_bytes
self._send(header + buf)
File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 368, in _send
n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org