You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Craig Rodrigues (JIRA)" <ji...@apache.org> on 2018/05/24 16:22:00 UTC

[jira] [Updated] (AIRFLOW-2519) Wrong options passed to SQLAlchemy celery broker backend

     [ https://issues.apache.org/jira/browse/AIRFLOW-2519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Craig Rodrigues updated AIRFLOW-2519:
-------------------------------------
    Description: 
Hi,
 
 I used this requirements.txt file to install airflow from the v1-10-test branch:
 
 git+[https://github.com/celery/celery@master#egg=celery]
 git+[https://github.com/apache/incubator-airflow@v1-10-test#egg=apache-airflow[celery,crypto,emr,hive,hdfs,ldap,mysql,postgres,redis,slack,s3]]
 kombu>=4.1.0
 
 
 In my airflow.cfg, I have:
 
 [celery]
 executor = CeleryExecutor
 
 executor = CeleryExec
 broker_url = sqla+mysql://airflow:blah@localhost:3306/mydb
 
 [celery_broker_transport_options]
 #
 #
 
 However, if I manually run this code inside the webserver, I see:
 
 python -c "from airflow import configuration; c = configuration.conf.getsection('celery_broker_transport_options'); print(c)"
 OrderedDict([(u'visibility_timeout', 21600), (u'ssl_active', False), (u'ssl_key', u''), (u'ssl_cert', u''), (u'ssl_cacert', u'')])
 
 My worker crashes with this error:
 
 
 [2018-05-21 07:46:12,406] \{configuration.py:212} WARNING - section/key [celery/ssl_active] not found in config
 [2018-05-21 07:46:12,407] \{default_celery.py:51} WARNING - Celery Executor will run without SSL
 [2018-05-21 07:46:12,411] \{__init__.py:48} INFO - Using executor CeleryExecutor
 [2018-05-21 07:46:13,086: CRITICAL/MainProcess] Unrecoverable error: TypeError(u"Invalid argument(s) 'ssl_key','ssl_cert','ssl_active','visibility_timeout','ssl_cacert' sent to create_engine(), using configuration MySQLDialect_mysqldb/QueuePool/Engine.  Please check that the keyword arguments are appropriate for this combination of components.",)
 Traceback (most recent call last):
   File "/usr/lib/python2.7/site-packages/celery/worker/worker.py", line 205, in start
     self.blueprint.start(self)
   File "/usr/lib/python2.7/site-packages/celery/bootsteps.py", line 119, in start
     step.start(parent)
   File "/usr/lib/python2.7/site-packages/celery/bootsteps.py", line 369, in start
     return self.obj.start()
   File "/usr/lib/python2.7/site-packages/celery/worker/consumer/consumer.py", line 322, in start
     blueprint.start(self)
   File "/usr/lib/python2.7/site-packages/celery/bootsteps.py", line 119, in start
     step.start(parent)
   File "/usr/lib/python2.7/site-packages/celery/worker/consumer/tasks.py", line 41, in start
     c.connection, on_decode_error=c.on_decode_error,
   File "/usr/lib/python2.7/site-packages/celery/app/amqp.py", line 297, in TaskConsumer
     **kw
   File "/usr/lib/python2.7/site-packages/kombu/messaging.py", line 386, in __init__
     self.revive(self.channel)
   File "/usr/lib/python2.7/site-packages/kombu/messaging.py", line 408, in revive
     self.declare()
   File "/usr/lib/python2.7/site-packages/kombu/messaging.py", line 421, in declare
     queue.declare()
   File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 605, in declare
     self._create_queue(nowait=nowait, channel=channel)
   File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 614, in _create_queue
     self.queue_declare(nowait=nowait, passive=False, channel=channel)
   File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 649, in queue_declare
     nowait=nowait,
   File "/usr/lib/python2.7/site-packages/kombu/transport/virtual/base.py", line 531, in queue_declare
     self._new_queue(queue, **kwargs)
   File "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py", line 82, in _new_queue
     self._get_or_create(queue)
   File "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py", line 70, in _get_or_create
     obj = self.session.query(self.queue_cls) \
   File "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py", line 65, in session
     _, Session = self._open()
   File "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py", line 56, in _open
     engine = self._engine_from_config()
   File "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py", line 51, in _engine_from_config
     return create_engine(conninfo.hostname, **transport_options)
   File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/__init__.py", line 391, in create_engine
     return strategy.create(*args, **kwargs)
   File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/strategies.py", line 160, in create
     engineclass.__name__))
 TypeError: Invalid argument(s) 'ssl_key','ssl_cert','ssl_active','visibility_timeout','ssl_cacert' sent to create_engine(), using configuration MySQLDialect_mysqldb/QueuePool/Engine.  Please check that the keyword arguments are appropriate for this combination of components.
 
  -------------- celery@qa1 v4.2.0rc3 (windowlicker)
 ---- **** ----- 
 --- * ***  * -- Linux-4.13.0-16-generic-x86_64-with-centos-7.3.1611-Core 2018-05-21 07:46:12
 -- * - **** --- 
 - ** ---------- [config]
 - ** ---------- .> app:         airflow.executors.celery_executor:0x4766d50
 - ** ---------- .> transport:   sqla+mysql://airflow:blah@localhost:3306/mydb
 - ** ---------- .> results:     mysql://airflow:**@localhost:3306/airflow
 - *** --- * --- .> concurrency: 16 (prefork)
 -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
 --- ***** ----- 
  -------------- [queues]
                 .> airflow_celery   exchange=airflow_celery(direct) key=airflow_celery
 
 
 
 What is the correct way to override the celery_broker_transport_options?
 I thought that having an empty section in airflow.cfg would be enough?
 
 I thought that this was fixed with: [https://github.com/apache/incubator-airflow/pull/2842]
 
 
 I cannot pass visibilty_timeout or ssl_key to a mysql backend.

> Wrong options passed to SQLAlchemy celery broker backend
> --------------------------------------------------------
>
>                 Key: AIRFLOW-2519
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-2519
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: celery
>    Affects Versions: 1.9.0
>            Reporter: Craig Rodrigues
>            Priority: Major
>
> Hi,
>  
>  I used this requirements.txt file to install airflow from the v1-10-test branch:
>  
>  git+[https://github.com/celery/celery@master#egg=celery]
>  git+[https://github.com/apache/incubator-airflow@v1-10-test#egg=apache-airflow[celery,crypto,emr,hive,hdfs,ldap,mysql,postgres,redis,slack,s3]]
>  kombu>=4.1.0
>  
>  
>  In my airflow.cfg, I have:
>  
>  [celery]
>  executor = CeleryExecutor
>  
>  executor = CeleryExec
>  broker_url = sqla+mysql://airflow:blah@localhost:3306/mydb
>  
>  [celery_broker_transport_options]
>  #
>  #
>  
>  However, if I manually run this code inside the webserver, I see:
>  
>  python -c "from airflow import configuration; c = configuration.conf.getsection('celery_broker_transport_options'); print(c)"
>  OrderedDict([(u'visibility_timeout', 21600), (u'ssl_active', False), (u'ssl_key', u''), (u'ssl_cert', u''), (u'ssl_cacert', u'')])
>  
>  My worker crashes with this error:
>  
>  
>  [2018-05-21 07:46:12,406] \{configuration.py:212} WARNING - section/key [celery/ssl_active] not found in config
>  [2018-05-21 07:46:12,407] \{default_celery.py:51} WARNING - Celery Executor will run without SSL
>  [2018-05-21 07:46:12,411] \{__init__.py:48} INFO - Using executor CeleryExecutor
>  [2018-05-21 07:46:13,086: CRITICAL/MainProcess] Unrecoverable error: TypeError(u"Invalid argument(s) 'ssl_key','ssl_cert','ssl_active','visibility_timeout','ssl_cacert' sent to create_engine(), using configuration MySQLDialect_mysqldb/QueuePool/Engine.  Please check that the keyword arguments are appropriate for this combination of components.",)
>  Traceback (most recent call last):
>    File "/usr/lib/python2.7/site-packages/celery/worker/worker.py", line 205, in start
>      self.blueprint.start(self)
>    File "/usr/lib/python2.7/site-packages/celery/bootsteps.py", line 119, in start
>      step.start(parent)
>    File "/usr/lib/python2.7/site-packages/celery/bootsteps.py", line 369, in start
>      return self.obj.start()
>    File "/usr/lib/python2.7/site-packages/celery/worker/consumer/consumer.py", line 322, in start
>      blueprint.start(self)
>    File "/usr/lib/python2.7/site-packages/celery/bootsteps.py", line 119, in start
>      step.start(parent)
>    File "/usr/lib/python2.7/site-packages/celery/worker/consumer/tasks.py", line 41, in start
>      c.connection, on_decode_error=c.on_decode_error,
>    File "/usr/lib/python2.7/site-packages/celery/app/amqp.py", line 297, in TaskConsumer
>      **kw
>    File "/usr/lib/python2.7/site-packages/kombu/messaging.py", line 386, in __init__
>      self.revive(self.channel)
>    File "/usr/lib/python2.7/site-packages/kombu/messaging.py", line 408, in revive
>      self.declare()
>    File "/usr/lib/python2.7/site-packages/kombu/messaging.py", line 421, in declare
>      queue.declare()
>    File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 605, in declare
>      self._create_queue(nowait=nowait, channel=channel)
>    File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 614, in _create_queue
>      self.queue_declare(nowait=nowait, passive=False, channel=channel)
>    File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 649, in queue_declare
>      nowait=nowait,
>    File "/usr/lib/python2.7/site-packages/kombu/transport/virtual/base.py", line 531, in queue_declare
>      self._new_queue(queue, **kwargs)
>    File "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py", line 82, in _new_queue
>      self._get_or_create(queue)
>    File "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py", line 70, in _get_or_create
>      obj = self.session.query(self.queue_cls) \
>    File "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py", line 65, in session
>      _, Session = self._open()
>    File "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py", line 56, in _open
>      engine = self._engine_from_config()
>    File "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py", line 51, in _engine_from_config
>      return create_engine(conninfo.hostname, **transport_options)
>    File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/__init__.py", line 391, in create_engine
>      return strategy.create(*args, **kwargs)
>    File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/strategies.py", line 160, in create
>      engineclass.__name__))
>  TypeError: Invalid argument(s) 'ssl_key','ssl_cert','ssl_active','visibility_timeout','ssl_cacert' sent to create_engine(), using configuration MySQLDialect_mysqldb/QueuePool/Engine.  Please check that the keyword arguments are appropriate for this combination of components.
>  
>   -------------- celery@qa1 v4.2.0rc3 (windowlicker)
>  ---- **** ----- 
>  --- * ***  * -- Linux-4.13.0-16-generic-x86_64-with-centos-7.3.1611-Core 2018-05-21 07:46:12
>  -- * - **** --- 
>  - ** ---------- [config]
>  - ** ---------- .> app:         airflow.executors.celery_executor:0x4766d50
>  - ** ---------- .> transport:   sqla+mysql://airflow:blah@localhost:3306/mydb
>  - ** ---------- .> results:     mysql://airflow:**@localhost:3306/airflow
>  - *** --- * --- .> concurrency: 16 (prefork)
>  -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
>  --- ***** ----- 
>   -------------- [queues]
>                  .> airflow_celery   exchange=airflow_celery(direct) key=airflow_celery
>  
>  
>  
>  What is the correct way to override the celery_broker_transport_options?
>  I thought that having an empty section in airflow.cfg would be enough?
>  
>  I thought that this was fixed with: [https://github.com/apache/incubator-airflow/pull/2842]
>  
>  
>  I cannot pass visibilty_timeout or ssl_key to a mysql backend.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)