You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/10/15 10:26:00 UTC

[GitHub] [airflow] potiuk commented on issue #11543: Scheduler Dedlock in tests for MySQL

potiuk commented on issue #11543:
URL: https://github.com/apache/airflow/issues/11543#issuecomment-709110147


   Stacktrace for permanent storage:
   
   ```
   =================================== FAILURES ===================================
   ______ TestSchedulerJob.test_scheduler_verify_pool_full_2_slots_per_task _______
   
   self = <sqlalchemy.engine.base.Connection object at 0x7f4a588ae100>
   dialect = <sqlalchemy.dialects.mysql.mysqldb.MySQLDialect_mysqldb object at 0x7f4a57fc78b0>
   constructor = <bound method DefaultExecutionContext._init_compiled of <class 'sqlalchemy.dialects.mysql.mysqldb.MySQLExecutionContext_mysqldb'>>
   statement = 'INSERT INTO dag (dag_id, root_dag_id, is_paused, is_subdag, is_active, last_scheduler_run, last_pickled, last_expired..._dagrun, next_dagrun_create_after) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)'
   parameters = ('test_scheduler_verify_pool_full_2_slots_per_task', None, 0, 0, 1, None, ...)
   args = (<sqlalchemy.dialects.mysql.mysqldb.MySQLCompiler_mysqldb object at 0x7f4a58988490>, [{'concurrency': 16, 'dag_id': 'test_scheduler_verify_pool_full_2_slots_per_task', 'default_view': 'tree', 'description': None, ...}])
   conn = <sqlalchemy.pool.base._ConnectionFairy object at 0x7f4a589cd400>
   context = <sqlalchemy.dialects.mysql.mysqldb.MySQLExecutionContext_mysqldb object at 0x7f4a588ae9d0>
   
       def _execute_context(
           self, dialect, constructor, statement, parameters, *args
       ):
           """Create an :class:`.ExecutionContext` and execute, returning
           a :class:`_engine.ResultProxy`."""
       
           try:
               try:
                   conn = self.__connection
               except AttributeError:
                   # escape "except AttributeError" before revalidating
                   # to prevent misleading stacktraces in Py3K
                   conn = None
               if conn is None:
                   conn = self._revalidate_connection()
       
               context = constructor(dialect, self, conn, *args)
           except BaseException as e:
               self._handle_dbapi_exception(
                   e, util.text_type(statement), parameters, None, None
               )
       
           if context.compiled:
               context.pre_exec()
       
           cursor, statement, parameters = (
               context.cursor,
               context.statement,
               context.parameters,
           )
       
           if not context.executemany:
               parameters = parameters[0]
       
           if self._has_events or self.engine._has_events:
               for fn in self.dispatch.before_cursor_execute:
                   statement, parameters = fn(
                       self,
                       cursor,
                       statement,
                       parameters,
                       context,
                       context.executemany,
                   )
       
           if self._echo:
               self.engine.logger.info(statement)
               if not self.engine.hide_parameters:
                   self.engine.logger.info(
                       "%r",
                       sql_util._repr_params(
                           parameters, batches=10, ismulti=context.executemany
                       ),
                   )
               else:
                   self.engine.logger.info(
                       "[SQL parameters hidden due to hide_parameters=True]"
                   )
       
           evt_handled = False
           try:
               if context.executemany:
                   if self.dialect._has_events:
                       for fn in self.dialect.dispatch.do_executemany:
                           if fn(cursor, statement, parameters, context):
                               evt_handled = True
                               break
                   if not evt_handled:
                       self.dialect.do_executemany(
                           cursor, statement, parameters, context
                       )
               elif not parameters and context.no_parameters:
                   if self.dialect._has_events:
                       for fn in self.dialect.dispatch.do_execute_no_params:
                           if fn(cursor, statement, context):
                               evt_handled = True
                               break
                   if not evt_handled:
                       self.dialect.do_execute_no_params(
                           cursor, statement, context
                       )
               else:
                   if self.dialect._has_events:
                       for fn in self.dialect.dispatch.do_execute:
                           if fn(cursor, statement, parameters, context):
                               evt_handled = True
                               break
                   if not evt_handled:
   >                   self.dialect.do_execute(
                           cursor, statement, parameters, context
                       )
   
   /usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1277: 
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
   
   self = <sqlalchemy.dialects.mysql.mysqldb.MySQLDialect_mysqldb object at 0x7f4a57fc78b0>
   cursor = <MySQLdb.cursors.Cursor object at 0x7f4a588ae2e0>
   statement = 'INSERT INTO dag (dag_id, root_dag_id, is_paused, is_subdag, is_active, last_scheduler_run, last_pickled, last_expired..._dagrun, next_dagrun_create_after) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)'
   parameters = ('test_scheduler_verify_pool_full_2_slots_per_task', None, 0, 0, 1, None, ...)
   context = <sqlalchemy.dialects.mysql.mysqldb.MySQLExecutionContext_mysqldb object at 0x7f4a588ae9d0>
   
       def do_execute(self, cursor, statement, parameters, context=None):
   >       cursor.execute(statement, parameters)
   
   /usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py:593: 
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
   
   self = <MySQLdb.cursors.Cursor object at 0x7f4a588ae2e0>
   query = b'INSERT INTO dag (dag_id, root_dag_id, is_paused, is_subdag, is_active, last_scheduler_run, last_pickled, last_expire... {\\"days\\": 1, \\"seconds\\": 0, \\"microseconds\\": 0}}\', 16, 0, \'2016-01-01 00:00:00\', \'2016-01-02 00:00:00\')'
   args = ("'test_scheduler_verify_pool_full_2_slots_per_task'", 'NULL', '0', '0', '1', 'NULL', ...)
   
       def execute(self, query, args=None):
           """Execute a query.
       
           query -- string, query to execute on server
           args -- optional sequence or mapping, parameters to use with query.
       
           Note: If args is a sequence, then %s must be used as the
           parameter placeholder in the query. If a mapping is used,
           %(key)s must be used as the placeholder.
       
           Returns integer represents rows affected, if any
           """
           while self.nextset():
               pass
           db = self._get_db()
       
           # NOTE:
           # Python 2: query should be bytes when executing %.
           # All unicode in args should be encoded to bytes on Python 2.
           # Python 3: query should be str (unicode) when executing %.
           # All bytes in args should be decoded with ascii and surrogateescape on Python 3.
           # db.literal(obj) always returns str.
       
           if PY2 and isinstance(query, unicode):
               query = query.encode(db.encoding)
       
           if args is not None:
               if isinstance(args, dict):
                   args = dict((key, db.literal(item)) for key, item in args.items())
               else:
                   args = tuple(map(db.literal, args))
               if not PY2 and isinstance(query, (bytes, bytearray)):
                   query = query.decode(db.encoding)
               try:
                   query = query % args
               except TypeError as m:
                   self.errorhandler(self, ProgrammingError, str(m))
       
           if isinstance(query, unicode):
               query = query.encode(db.encoding, 'surrogateescape')
       
           res = None
           try:
               res = self._query(query)
           except Exception:
               exc, value = sys.exc_info()[:2]
   >           self.errorhandler(self, exc, value)
   
   /usr/local/lib/python3.8/site-packages/MySQLdb/cursors.py:255: 
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
   
   errorclass = <class '_mysql_exceptions.OperationalError'>
   errorvalue = OperationalError(1213, 'Deadlock found when trying to get lock; try restarting transaction')
   
       def defaulterrorhandler(connection, cursor, errorclass, errorvalue):
           """
           If cursor is not None, (errorclass, errorvalue) is appended to
           cursor.messages; otherwise it is appended to
           connection.messages. Then errorclass is raised with errorvalue as
           the value.
       
           You can override this with your own error handler by assigning it
           to the instance.
           """
           error = errorclass, errorvalue
           if cursor:
               cursor.messages.append(error)
           else:
               connection.messages.append(error)
           del cursor
           del connection
           if isinstance(errorvalue, BaseException):
   >           raise errorvalue
   
   /usr/local/lib/python3.8/site-packages/MySQLdb/connections.py:50: 
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
   
   self = <MySQLdb.cursors.Cursor object at 0x7f4a588ae2e0>
   query = b'INSERT INTO dag (dag_id, root_dag_id, is_paused, is_subdag, is_active, last_scheduler_run, last_pickled, last_expire... {\\"days\\": 1, \\"seconds\\": 0, \\"microseconds\\": 0}}\', 16, 0, \'2016-01-01 00:00:00\', \'2016-01-02 00:00:00\')'
   args = ("'test_scheduler_verify_pool_full_2_slots_per_task'", 'NULL', '0', '0', '1', 'NULL', ...)
   
       def execute(self, query, args=None):
           """Execute a query.
       
           query -- string, query to execute on server
           args -- optional sequence or mapping, parameters to use with query.
       
           Note: If args is a sequence, then %s must be used as the
           parameter placeholder in the query. If a mapping is used,
           %(key)s must be used as the placeholder.
       
           Returns integer represents rows affected, if any
           """
           while self.nextset():
               pass
           db = self._get_db()
       
           # NOTE:
           # Python 2: query should be bytes when executing %.
           # All unicode in args should be encoded to bytes on Python 2.
           # Python 3: query should be str (unicode) when executing %.
           # All bytes in args should be decoded with ascii and surrogateescape on Python 3.
           # db.literal(obj) always returns str.
       
           if PY2 and isinstance(query, unicode):
               query = query.encode(db.encoding)
       
           if args is not None:
               if isinstance(args, dict):
                   args = dict((key, db.literal(item)) for key, item in args.items())
               else:
                   args = tuple(map(db.literal, args))
               if not PY2 and isinstance(query, (bytes, bytearray)):
                   query = query.decode(db.encoding)
               try:
                   query = query % args
               except TypeError as m:
                   self.errorhandler(self, ProgrammingError, str(m))
       
           if isinstance(query, unicode):
               query = query.encode(db.encoding, 'surrogateescape')
       
           res = None
           try:
   >           res = self._query(query)
   
   /usr/local/lib/python3.8/site-packages/MySQLdb/cursors.py:252: 
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
   
   self = <MySQLdb.cursors.Cursor object at 0x7f4a588ae2e0>
   q = b'INSERT INTO dag (dag_id, root_dag_id, is_paused, is_subdag, is_active, last_scheduler_run, last_pickled, last_expire... {\\"days\\": 1, \\"seconds\\": 0, \\"microseconds\\": 0}}\', 16, 0, \'2016-01-01 00:00:00\', \'2016-01-02 00:00:00\')'
   
       def _query(self, q):
           db = self._get_db()
           self._result = None
   >       db.query(q)
   
   /usr/local/lib/python3.8/site-packages/MySQLdb/cursors.py:378: 
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
   
   self = <_mysql.connection open to 'mysql' at 55ec2a735e70>
   query = b'INSERT INTO dag (dag_id, root_dag_id, is_paused, is_subdag, is_active, last_scheduler_run, last_pickled, last_expire... {\\"days\\": 1, \\"seconds\\": 0, \\"microseconds\\": 0}}\', 16, 0, \'2016-01-01 00:00:00\', \'2016-01-02 00:00:00\')'
   
       def query(self, query):
           # Since _mysql releases GIL while querying, we need immutable buffer.
           if isinstance(query, bytearray):
               query = bytes(query)
           if self.waiter is not None:
               self.send_query(query)
               self.waiter(self.fileno())
               self.read_query_result()
           else:
   >           _mysql.connection.query(self, query)
   E           _mysql_exceptions.OperationalError: (1213, 'Deadlock found when trying to get lock; try restarting transaction')
   
   /usr/local/lib/python3.8/site-packages/MySQLdb/connections.py:280: OperationalError
   
   The above exception was the direct cause of the following exception:
   
   self = <tests.jobs.test_scheduler_job.TestSchedulerJob testMethod=test_scheduler_verify_pool_full_2_slots_per_task>
   
       def test_scheduler_verify_pool_full_2_slots_per_task(self):
           """
           Test task instances not queued when pool is full.
       
           Variation with non-default pool_slots
           """
           dag = DAG(
               dag_id='test_scheduler_verify_pool_full_2_slots_per_task',
               start_date=DEFAULT_DATE)
       
           BashOperator(
               task_id='dummy',
               dag=dag,
               owner='airflow',
               pool='test_scheduler_verify_pool_full_2_slots_per_task',
               pool_slots=2,
               bash_command='echo hi',
           )
       
           dagbag = DagBag(dag_folder=os.path.join(settings.DAGS_FOLDER, "no_dags.py"),
                           include_examples=False,
                           read_dags_from_db=True)
           dagbag.bag_dag(dag=dag, root_dag=dag)
   >       dagbag.sync_to_db()
   
   tests/jobs/test_scheduler_job.py:2576: 
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
   airflow/utils/session.py:65: in wrapper
       return func(*args, **kwargs)
   airflow/models/dagbag.py:535: in sync_to_db
       DAG.bulk_write_to_db(self.dags.values(), session=session)
   airflow/utils/session.py:61: in wrapper
       return func(*args, **kwargs)
   airflow/models/dag.py:1791: in bulk_write_to_db
       session.flush()
   /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:2523: in flush
       self._flush(objects)
   /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:2664: in _flush
       transaction.rollback(_capture_exception=True)
   /usr/local/lib/python3.8/site-packages/sqlalchemy/util/langhelpers.py:68: in __exit__
       compat.raise_(
   /usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py:178: in raise_
       raise exception
   /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py:2624: in _flush
       flush_context.execute()
   /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/unitofwork.py:422: in execute
       rec.execute(self)
   /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/unitofwork.py:586: in execute
       persistence.save_obj(
   /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py:239: in save_obj
       _emit_insert_statements(
   /usr/local/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py:1083: in _emit_insert_statements
       c = cached_connections[connection].execute(statement, multiparams)
   /usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1014: in execute
       return meth(self, multiparams, params)
   /usr/local/lib/python3.8/site-packages/sqlalchemy/sql/elements.py:298: in _execute_on_connection
       return connection._execute_clauseelement(self, multiparams, params)
   /usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1127: in _execute_clauseelement
       ret = self._execute_context(
   /usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1317: in _execute_context
       self._handle_dbapi_exception(
   /usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1511: in _handle_dbapi_exception
       util.raise_(
   /usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py:178: in raise_
       raise exception
   /usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py:1277: in _execute_context
       self.dialect.do_execute(
   /usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py:593: in do_execute
       cursor.execute(statement, parameters)
   /usr/local/lib/python3.8/site-packages/MySQLdb/cursors.py:255: in execute
       self.errorhandler(self, exc, value)
   /usr/local/lib/python3.8/site-packages/MySQLdb/connections.py:50: in defaulterrorhandler
       raise errorvalue
   /usr/local/lib/python3.8/site-packages/MySQLdb/cursors.py:252: in execute
       res = self._query(query)
   /usr/local/lib/python3.8/site-packages/MySQLdb/cursors.py:378: in _query
       db.query(q)
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
   
   self = <_mysql.connection open to 'mysql' at 55ec2a735e70>
   query = b'INSERT INTO dag (dag_id, root_dag_id, is_paused, is_subdag, is_active, last_scheduler_run, last_pickled, last_expire... {\\"days\\": 1, \\"seconds\\": 0, \\"microseconds\\": 0}}\', 16, 0, \'2016-01-01 00:00:00\', \'2016-01-02 00:00:00\')'
   
       def query(self, query):
           # Since _mysql releases GIL while querying, we need immutable buffer.
           if isinstance(query, bytearray):
               query = bytes(query)
           if self.waiter is not None:
               self.send_query(query)
               self.waiter(self.fileno())
               self.read_query_result()
           else:
   >           _mysql.connection.query(self, query)
   E           sqlalchemy.exc.OperationalError: (_mysql_exceptions.OperationalError) (1213, 'Deadlock found when trying to get lock; try restarting transaction')
   E           [SQL: INSERT INTO dag (dag_id, root_dag_id, is_paused, is_subdag, is_active, last_scheduler_run, last_pickled, last_expired, scheduler_lock, pickle_id, fileloc, owners, description, default_view, schedule_interval, concurrency, has_task_concurrency_limits, next_dagrun, next_dagrun_create_after) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)]
   E           [parameters: ('test_scheduler_verify_pool_full_2_slots_per_task', None, 0, 0, 1, None, None, None, None, None, '/opt/airflow/tests/jobs/test_scheduler_job.py', 'airflow', None, 'tree', '{"type": "timedelta", "attrs": {"days": 1, "seconds": 0, "microseconds": 0}}', 16, 0, datetime.datetime(2016, 1, 1, 0, 0), datetime.datetime(2016, 1, 2, 0, 0))]
   E           (Background on this error at: http://sqlalche.me/e/13/e3q8)
   
   /usr/local/lib/python3.8/site-packages/MySQLdb/connections.py:280: OperationalError
   ----------------------------- Captured stdout call -----------------------------
   [2020-10-15 09:35:59,178] {dag.py:1708} INFO - Sync 1 DAGs
   [2020-10-15 09:35:59,185] {dag.py:1727} INFO - Creating ORM DAG for test_scheduler_verify_pool_full_2_slots_per_task
   [2020-10-15 09:35:59,192] {dag.py:2151} INFO - Setting next_dagrun for test_scheduler_verify_pool_full_2_slots_per_task to 2016-01-01 00:00:00+00:00
   ------------------------------ Captured log call -------------------------------
   INFO     airflow.models.dag:dag.py:1708 Sync 1 DAGs
   INFO     airflow.models.dag:dag.py:1727 Creating ORM DAG for test_scheduler_verify_pool_full_2_slots_per_task
   INFO     airflow.models.dag:dag.py:2151 Setting next_dagrun for test_scheduler_verify_pool_full_2_slots_per_task to 2016-01-01 00:00:00+00:00
   --------------------------- Captured stderr teardown ---------------------------
   Process DagFileProcessor0-Process:
   Traceback (most recent call last):
     File "/usr/local/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
       self.run()
     File "/usr/local/lib/python3.8/multiprocessing/process.py", line 108, in run
       self._target(*self._args, **self._kwargs)
     File "/opt/airflow/airflow/jobs/scheduler_job.py", line 186, in _run_file_processor
       result_channel.send(result)
     File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 206, in send
       self._send_bytes(_ForkingPickler.dumps(obj))
     File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 411, in _send_bytes
       self._send(header + buf)
     File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 368, in _send
       n = write(self._handle, buf)
   BrokenPipeError: [Errno 32] Broken pipe
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org