You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Ash Berlin-Taylor (Jira)" <ji...@apache.org> on 2020/03/16 11:49:00 UTC

[jira] [Resolved] (AIRFLOW-6814) Airflow Scheduler fails for any dag with MS SQL server as the backend

     [ https://issues.apache.org/jira/browse/AIRFLOW-6814?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Ash Berlin-Taylor resolved AIRFLOW-6814.
----------------------------------------
    Resolution: Invalid

MSSQL is not officially supported by Airflow, but we will accept patches to improve support for it.

However in this case the problem is not with Airflow, but with Celery (as almost the entire call stack is from the Celery project, not Airflow), so you will need to open that issue there.

> Airflow Scheduler fails for any dag with MS SQL server as the backend
> ---------------------------------------------------------------------
>
>                 Key: AIRFLOW-6814
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-6814
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: database
>    Affects Versions: 1.10.9
>         Environment: Centos 6
>            Reporter: Amith Gopal
>            Priority: Blocker
>         Attachments: AirflowScheduler-MSSQL-Pyodbc-error.txt
>
>
> I am trying to execute airflow with MS SQL Server as the backend. 
> Eventhough *airflow initdb* works fine, *airflow resetdb* throws an error for which I have already created an issue. 
> After airflow initdb, with some dags in the dags folder, if I run the *airflow scheduler,* then I am getting the following error and it fails for every dag and all the time. I have tried with different (including latest) *pyodbc versions*
> Please find below the error. 
> ERROR - Process timed out, PID: 94590
> [2020-02-15 02:30:52,094] \{celery_executor.py:266} ERROR - Error fetching Celery task state, ignoring it:StatementError("(builtins.SystemError) <class 'pyodbc.Error'> returned a result with an error set",)
> Celery Task ID: ('simple_dag', 'print_date0', datetime.datetime(2020, 2, 14, 23, 0, tzinfo=<Timezone [UTC]>), 1)
> Traceback (most recent call last):
>  File "/home/ops/.pyenv/versions/3.6.10/lib/python3.6/encodings/utf_16_le.py", line 15, in decode
>  def decode(input, errors='strict'):
>  File "/home/ops/.pyenv/versions/3.6.10/envs/airflow_env/lib/python3.6/site-packages/airflow/utils/timeout.py", line 43, in handle_timeout
>  raise AirflowTaskTimeout(self.error_message)
> airflow.exceptions.AirflowTaskTimeout: Timeout, PID: 94590
> The above exception was the direct cause of the following exception:
> airflow.exceptions.AirflowTaskTimeout: decoding with 'utf-16le' codec failed (AirflowTaskTimeout: Timeout, PID: 94590)
> The above exception was the direct cause of the following exception:
> Traceback (most recent call last):
>  File "/home/ops/.pyenv/versions/3.6.10/envs/airflow_env/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1318, in _cursor_execute
>  self.dialect.do_execute(cursor, statement, parameters, context)
>  File "/home/ops/.pyenv/versions/3.6.10/envs/airflow_env/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 588, in do_execute
>  cursor.execute(statement, parameters)
> SystemError: <class 'pyodbc.Error'> returned a result with an error set
> The above exception was the direct cause of the following exception:
> Traceback (most recent call last):
>  File "/home/ops/.pyenv/versions/3.6.10/envs/airflow_env/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 108, in fetch_celery_task_state
>  res = (celery_task[0], celery_task[1].state)
>  File "/home/ops/.pyenv/versions/3.6.10/envs/airflow_env/lib/python3.6/site-packages/celery/result.py", line 475, in state
>  return self._get_task_meta()['status']
>  File "/home/ops/.pyenv/versions/3.6.10/envs/airflow_env/lib/python3.6/site-packages/celery/result.py", line 414, in _get_task_meta
>  return self._maybe_set_cache(self.backend.get_task_meta(self.id))
>  File "/home/ops/.pyenv/versions/3.6.10/envs/airflow_env/lib/python3.6/site-packages/celery/backends/base.py", line 451, in get_task_meta
>  meta = self._get_task_meta_for(task_id)
>  File "/home/ops/.pyenv/versions/3.6.10/envs/airflow_env/lib/python3.6/site-packages/celery/backends/database/__init__.py", line 51, in _inner
>  return fun(*args, **kwargs)
>  File "/home/ops/.pyenv/versions/3.6.10/envs/airflow_env/lib/python3.6/site-packages/celery/backends/database/__init__.py", line 151, in _get_task_meta_for
>  session = self.ResultSession()
>  File "/home/ops/.pyenv/versions/3.6.10/envs/airflow_env/lib/python3.6/site-packages/celery/backends/database/__init__.py", line 110, in ResultSession
>  **self.engine_options)
>  File "/home/ops/.pyenv/versions/3.6.10/envs/airflow_env/lib/python3.6/site-packages/celery/backends/database/session.py", line 59, in session_factory
>  self.prepare_models(engine)
>  File "/home/ops/.pyenv/versions/3.6.10/envs/airflow_env/lib/python3.6/site-packages/celery/backends/database/session.py", line 54, in prepare_models
>  ResultModelBase.metadata.create_all(engine)
>  File "/home/ops/.pyenv/versions/3.6.10/envs/airflow_env/lib/python3.6/site-packages/sqlalchemy/sql/schema.py", line 4316, in create_all
>  ddl.SchemaGenerator, self, checkfirst=checkfirst, tables=tables
>  File "/home/ops/.pyenv/versions/3.6.10/envs/airflow_env/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 2048, in _run_visitor
>  with self._optional_conn_ctx_manager(connection) as conn:
>  File "/home/ops/.pyenv/versions/3.6.10/lib/python3.6/contextlib.py", line 81, in __enter__
>  return next(self.gen)
>  File "/home/ops/.pyenv/versions/3.6.10/envs/airflow_env/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 2040, in _optional_conn_ctx_manager
>  with self._contextual_connect() as conn:
>  File "/home/ops/.pyenv/versions/3.6.10/envs/airflow_env/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 2242, in _contextual_connect
>  self._wrap_pool_connect(self.pool.connect, None),
>  File "/home/ops/.pyenv/versions/3.6.10/envs/airflow_env/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 2276, in _wrap_pool_connect
>  return fn()
>  File "/home/ops/.pyenv/versions/3.6.10/envs/airflow_env/lib/python3.6/site-packages/sqlalchemy/pool/base.py", line 363, in connect
>  return _ConnectionFairy._checkout(self)
>  File "/home/ops/.pyenv/versions/3.6.10/envs/airflow_env/lib/python3.6/site-packages/sqlalchemy/pool/base.py", line 774, in _checkout
>  fairy = _ConnectionRecord.checkout(pool)
>  File "/home/ops/.pyenv/versions/3.6.10/envs/airflow_env/lib/python3.6/site-packages/sqlalchemy/pool/base.py", line 492, in checkout
>  rec = pool._do_get()
>  File "/home/ops/.pyenv/versions/3.6.10/envs/airflow_env/lib/python3.6/site-packages/sqlalchemy/pool/impl.py", line 139, in _do_get
>  self._dec_overflow()
>  File "/home/ops/.pyenv/versions/3.6.10/envs/airflow_env/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 68, in __exit__
>  compat.reraise(exc_type, exc_value, exc_tb)
>  File "/home/ops/.pyenv/versions/3.6.10/envs/airflow_env/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 153, in reraise
>  raise value
>  File "/home/ops/.pyenv/versions/3.6.10/envs/airflow_env/lib/python3.6/site-packages/sqlalchemy/pool/impl.py", line 136, in _do_get
>  return self._create_connection()
>  File "/home/ops/.pyenv/versions/3.6.10/envs/airflow_env/lib/python3.6/site-packages/sqlalchemy/pool/base.py", line 308, in _create_connection
>  return _ConnectionRecord(self)
>  File "/home/ops/.pyenv/versions/3.6.10/envs/airflow_env/lib/python3.6/site-packages/sqlalchemy/pool/base.py", line 437, in __init__
>  self.__connect(first_connect_check=True)
>  File "/home/ops/.pyenv/versions/3.6.10/envs/airflow_env/lib/python3.6/site-packages/sqlalchemy/pool/base.py", line 662, in __connect
>  ).exec_once_unless_exception(self.connection, self)
>  File "/home/ops/.pyenv/versions/3.6.10/envs/airflow_env/lib/python3.6/site-packages/sqlalchemy/event/attr.py", line 314, in exec_once_unless_exception
>  self._exec_once_impl(True, *args, **kw)
>  File "/home/ops/.pyenv/versions/3.6.10/envs/airflow_env/lib/python3.6/site-packages/sqlalchemy/event/attr.py", line 285, in _exec_once_impl
>  self(*args, **kw)
>  File "/home/ops/.pyenv/versions/3.6.10/envs/airflow_env/lib/python3.6/site-packages/sqlalchemy/event/attr.py", line 322, in __call__
>  fn(*args, **kw)
>  File "/home/ops/.pyenv/versions/3.6.10/envs/airflow_env/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 1485, in go
>  return once_fn(*arg, **kw)
>  File "/home/ops/.pyenv/versions/3.6.10/envs/airflow_env/lib/python3.6/site-packages/sqlalchemy/engine/strategies.py", line 199, in first_connect
>  dialect.initialize(c)
>  File "/home/ops/.pyenv/versions/3.6.10/envs/airflow_env/lib/python3.6/site-packages/sqlalchemy/dialects/mssql/base.py", line 2378, in initialize
>  super(MSDialect, self).initialize(connection)
>  File "/home/ops/.pyenv/versions/3.6.10/envs/airflow_env/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 318, in initialize
>  self.returns_unicode_strings = self._check_unicode_returns(connection)
>  File "/home/ops/.pyenv/versions/3.6.10/envs/airflow_env/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 415, in _check_unicode_returns
>  results = \{check_unicode(test) for test in tests}
>  File "/home/ops/.pyenv/versions/3.6.10/envs/airflow_env/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 415, in <setcomp>
>  results = \{check_unicode(test) for test in tests}
>  File "/home/ops/.pyenv/versions/3.6.10/envs/airflow_env/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 384, in check_unicode
>  connection._cursor_execute(cursor, statement, parameters)
>  File "/home/ops/.pyenv/versions/3.6.10/envs/airflow_env/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1321, in _cursor_execute
>  e, statement, parameters, cursor, context
>  File "/home/ops/.pyenv/versions/3.6.10/envs/airflow_env/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1476, in _handle_dbapi_exception
>  util.raise_from_cause(sqlalchemy_exception, exc_info)
>  File "/home/ops/.pyenv/versions/3.6.10/envs/airflow_env/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 398, in raise_from_cause
>  reraise(type(exception), exception, tb=exc_tb, cause=cause)
>  File "/home/ops/.pyenv/versions/3.6.10/envs/airflow_env/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 152, in reraise
>  raise value.with_traceback(tb)
>  File "/home/ops/.pyenv/versions/3.6.10/envs/airflow_env/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1318, in _cursor_execute
>  self.dialect.do_execute(cursor, statement, parameters, context)
>  File "/home/ops/.pyenv/versions/3.6.10/envs/airflow_env/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 588, in do_execute
>  cursor.execute(statement, parameters)
> sqlalchemy.exc.StatementError: (builtins.SystemError) <class 'pyodbc.Error'> returned a result with an error set
> [SQL: SELECT CAST('test unicode returns' AS NVARCHAR(60)) AS anon_1]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)