You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by Craig Rodrigues <cr...@gmail.com> on 2018/05/21 08:50:41 UTC

celery problem: cannot override celery_broker_transport_options

Hi,

I used this requirements.txt file to install airflow from the v1-10-test branch:

git+https://github.com/celery/celery@master#egg=celery
git+https://github.com/apache/incubator-airflow@v1-10-test#egg=apache-airflow[celery,crypto,emr,hive,hdfs,ldap,mysql,postgres,redis,slack,s3]
kombu>=4.1.0


In my airflow.cfg, I have:

[celery]
executor = CeleryExecutor

executor = CeleryExec
broker_url = sqla+mysql://airflow:blah@localhost:3306/mydb

[celery_broker_transport_options]
#
#

However, if I manually run this code inside the webserver, I see:

python -c "from airflow import configuration; c = configuration.conf.getsection('celery_broker_transport_options'); print(c)"
OrderedDict([(u'visibility_timeout', 21600), (u'ssl_active', False), (u'ssl_key', u''), (u'ssl_cert', u''), (u'ssl_cacert', u'')])

My worker crashes with this error:


[2018-05-21 07:46:12,406] {configuration.py:212} WARNING - section/key [celery/ssl_active] not found in config
[2018-05-21 07:46:12,407] {default_celery.py:51} WARNING - Celery Executor will run without SSL
[2018-05-21 07:46:12,411] {__init__.py:48} INFO - Using executor CeleryExecutor
[2018-05-21 07:46:13,086: CRITICAL/MainProcess] Unrecoverable error: TypeError(u"Invalid argument(s) 'ssl_key','ssl_cert','ssl_active','visibility_timeout','ssl_cacert' sent to create_engine(), using configuration MySQLDialect_mysqldb/QueuePool/Engine.  Please check that the keyword arguments are appropriate for this combination of components.",)
Traceback (most recent call last):
  File "/usr/lib/python2.7/site-packages/celery/worker/worker.py", line 205, in start
    self.blueprint.start(self)
  File "/usr/lib/python2.7/site-packages/celery/bootsteps.py", line 119, in start
    step.start(parent)
  File "/usr/lib/python2.7/site-packages/celery/bootsteps.py", line 369, in start
    return self.obj.start()
  File "/usr/lib/python2.7/site-packages/celery/worker/consumer/consumer.py", line 322, in start
    blueprint.start(self)
  File "/usr/lib/python2.7/site-packages/celery/bootsteps.py", line 119, in start
    step.start(parent)
  File "/usr/lib/python2.7/site-packages/celery/worker/consumer/tasks.py", line 41, in start
    c.connection, on_decode_error=c.on_decode_error,
  File "/usr/lib/python2.7/site-packages/celery/app/amqp.py", line 297, in TaskConsumer
    **kw
  File "/usr/lib/python2.7/site-packages/kombu/messaging.py", line 386, in __init__
    self.revive(self.channel)
  File "/usr/lib/python2.7/site-packages/kombu/messaging.py", line 408, in revive
    self.declare()
  File "/usr/lib/python2.7/site-packages/kombu/messaging.py", line 421, in declare
    queue.declare()
  File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 605, in declare
    self._create_queue(nowait=nowait, channel=channel)
  File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 614, in _create_queue
    self.queue_declare(nowait=nowait, passive=False, channel=channel)
  File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 649, in queue_declare
    nowait=nowait,
  File "/usr/lib/python2.7/site-packages/kombu/transport/virtual/base.py", line 531, in queue_declare
    self._new_queue(queue, **kwargs)
  File "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py", line 82, in _new_queue
    self._get_or_create(queue)
  File "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py", line 70, in _get_or_create
    obj = self.session.query(self.queue_cls) \
  File "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py", line 65, in session
    _, Session = self._open()
  File "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py", line 56, in _open
    engine = self._engine_from_config()
  File "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py", line 51, in _engine_from_config
    return create_engine(conninfo.hostname, **transport_options)
  File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/__init__.py", line 391, in create_engine
    return strategy.create(*args, **kwargs)
  File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/strategies.py", line 160, in create
    engineclass.__name__))
TypeError: Invalid argument(s) 'ssl_key','ssl_cert','ssl_active','visibility_timeout','ssl_cacert' sent to create_engine(), using configuration MySQLDialect_mysqldb/QueuePool/Engine.  Please check that the keyword arguments are appropriate for this combination of components.
 
 -------------- celery@qa1 v4.2.0rc3 (windowlicker)
---- **** ----- 
--- * ***  * -- Linux-4.13.0-16-generic-x86_64-with-centos-7.3.1611-Core 2018-05-21 07:46:12
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         airflow.executors.celery_executor:0x4766d50
- ** ---------- .> transport:   sqla+mysql://airflow:blah@localhost:3306/mydb
- ** ---------- .> results:     mysql://airflow:**@localhost:3306/airflow
- *** --- * --- .> concurrency: 16 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> airflow_celery   exchange=airflow_celery(direct) key=airflow_celery



What is the correct way to override the celery_broker_transport_options?
I thought that having an empty section in airflow.cfg would be enough?

I thought that this was fixed with: https://github.com/apache/incubator-airflow/pull/2842


I cannot pass visibilty_timeout or ssl_key to a mysql backend.
--
Craig
                







Re: celery problem: cannot override celery_broker_transport_options

Posted by Craig Rodrigues <cr...@gmail.com>.
Bolke,

Can you help me with this?
You have worked on this code with respect to parsing celery broker options.

I cannot figure out how to override the defaults, and wrong values are
being passed
down into the mysql backend, causing things to fail.

This is blocking me from doing further testing of airflow 1.10 in my
environment.

Since I have found stability bugs in airflow 1.9.0 that have been fixed in
master, I want to try
to run airflow 1.10 from git.

Thanks.
--
Craig

On Mon, May 21, 2018 at 1:50 AM Craig Rodrigues <cr...@gmail.com> wrote:

> Hi,
>
> I used this requirements.txt file to install airflow from the v1-10-test
> branch:
>
> git+https://github.com/celery/celery@master#egg=celery
> git+
> https://github.com/apache/incubator-airflow@v1-10-test#egg=apache-airflow[celery,crypto,emr,hive,hdfs,ldap,mysql,postgres,redis,slack,s3]
> kombu>=4.1.0
>
>
> In my airflow.cfg, I have:
>
> [celery]
> executor = CeleryExecutor
>
> executor = CeleryExec
> broker_url = sqla+mysql://airflow:blah@localhost:3306/mydb
>
> [celery_broker_transport_options]
> #
> #
>
> However, if I manually run this code inside the webserver, I see:
>
> python -c "from airflow import configuration; c =
> configuration.conf.getsection('celery_broker_transport_options'); print(c)"
> OrderedDict([(u'visibility_timeout', 21600), (u'ssl_active', False),
> (u'ssl_key', u''), (u'ssl_cert', u''), (u'ssl_cacert', u'')])
>
> My worker crashes with this error:
>
>
> [2018-05-21 07:46:12,406] {configuration.py:212} WARNING - section/key
> [celery/ssl_active] not found in config
> [2018-05-21 07:46:12,407] {default_celery.py:51} WARNING - Celery Executor
> will run without SSL
> [2018-05-21 07:46:12,411] {__init__.py:48} INFO - Using executor
> CeleryExecutor
> [2018-05-21 07:46:13,086: CRITICAL/MainProcess] Unrecoverable error:
> TypeError(u"Invalid argument(s)
> 'ssl_key','ssl_cert','ssl_active','visibility_timeout','ssl_cacert' sent to
> create_engine(), using configuration
> MySQLDialect_mysqldb/QueuePool/Engine.  Please check that the keyword
> arguments are appropriate for this combination of components.",)
> Traceback (most recent call last):
>   File "/usr/lib/python2.7/site-packages/celery/worker/worker.py", line
> 205, in start
>     self.blueprint.start(self)
>   File "/usr/lib/python2.7/site-packages/celery/bootsteps.py", line 119,
> in start
>     step.start(parent)
>   File "/usr/lib/python2.7/site-packages/celery/bootsteps.py", line 369,
> in start
>     return self.obj.start()
>   File
> "/usr/lib/python2.7/site-packages/celery/worker/consumer/consumer.py", line
> 322, in start
>     blueprint.start(self)
>   File "/usr/lib/python2.7/site-packages/celery/bootsteps.py", line 119,
> in start
>     step.start(parent)
>   File "/usr/lib/python2.7/site-packages/celery/worker/consumer/tasks.py",
> line 41, in start
>     c.connection, on_decode_error=c.on_decode_error,
>   File "/usr/lib/python2.7/site-packages/celery/app/amqp.py", line 297, in
> TaskConsumer
>     **kw
>   File "/usr/lib/python2.7/site-packages/kombu/messaging.py", line 386, in
> __init__
>     self.revive(self.channel)
>   File "/usr/lib/python2.7/site-packages/kombu/messaging.py", line 408, in
> revive
>     self.declare()
>   File "/usr/lib/python2.7/site-packages/kombu/messaging.py", line 421, in
> declare
>     queue.declare()
>   File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 605, in
> declare
>     self._create_queue(nowait=nowait, channel=channel)
>   File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 614, in
> _create_queue
>     self.queue_declare(nowait=nowait, passive=False, channel=channel)
>   File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 649, in
> queue_declare
>     nowait=nowait,
>   File "/usr/lib/python2.7/site-packages/kombu/transport/virtual/base.py",
> line 531, in queue_declare
>     self._new_queue(queue, **kwargs)
>   File
> "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py",
> line 82, in _new_queue
>     self._get_or_create(queue)
>   File
> "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py",
> line 70, in _get_or_create
>     obj = self.session.query(self.queue_cls) \
>   File
> "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py",
> line 65, in session
>     _, Session = self._open()
>   File
> "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py",
> line 56, in _open
>     engine = self._engine_from_config()
>   File
> "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py",
> line 51, in _engine_from_config
>     return create_engine(conninfo.hostname, **transport_options)
>   File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/__init__.py",
> line 391, in create_engine
>     return strategy.create(*args, **kwargs)
>   File
> "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/strategies.py", line
> 160, in create
>     engineclass.__name__))
> TypeError: Invalid argument(s)
> 'ssl_key','ssl_cert','ssl_active','visibility_timeout','ssl_cacert' sent to
> create_engine(), using configuration
> MySQLDialect_mysqldb/QueuePool/Engine.  Please check that the keyword
> arguments are appropriate for this combination of components.
>
>  -------------- celery@qa1 v4.2.0rc3 (windowlicker)
> ---- **** -----
> --- * ***  * -- Linux-4.13.0-16-generic-x86_64-with-centos-7.3.1611-Core
> 2018-05-21 07:46:12
> -- * - **** ---
> - ** ---------- [config]
> - ** ---------- .> app:         airflow.executors.celery_executor:0x4766d50
> - ** ---------- .> transport:   sqla+mysql://airflow:blah@localhost
> :3306/mydb
> - ** ---------- .> results:     mysql://airflow:**@localhost:3306/airflow
> - *** --- * --- .> concurrency: 16 (prefork)
> -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this
> worker)
> --- ***** -----
>  -------------- [queues]
>                 .> airflow_celery   exchange=airflow_celery(direct)
> key=airflow_celery
>
>
>
> What is the correct way to override the celery_broker_transport_options?
> I thought that having an empty section in airflow.cfg would be enough?
>
> I thought that this was fixed with:
> https://github.com/apache/incubator-airflow/pull/2842
>
>
> I cannot pass visibilty_timeout or ssl_key to a mysql backend.
> --
> Craig
>
>
>
>
>
>
>
>

-- 
Craig Rodrigues
rodrigc@rodrigues.org

Re: celery problem: cannot override celery_broker_transport_options

Posted by Craig Rodrigues <ro...@crodrigues.org>.
I have submitted this patch:
https://github.com/apache/incubator-airflow/pull/3417

I have tested with a Redis celery broker, and with a SQLAlchemy/MySQL
celery broker.
It works on Redis, and with SQLAlchemy/MySQL.

With this patch, I no longer get this exception:

 File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/strategies.py",
line 160, in create
    engineclass.__name__))
TypeError: Invalid argument(s) 'ssl_key','ssl_cert','ssl_acti
ve','visibility_timeout','ssl_cacert' sent to create_engine(), using
configuration MySQLDialect_mysqldb/QueuePool/Engine.  Please check that the
keyword arguments are appropriate for this combination of components.

--
Craig

Re: celery problem: cannot override celery_broker_transport_options

Posted by Craig Rodrigues <cr...@gmail.com>.
I have submitted this patch:
https://github.com/apache/incubator-airflow/pull/3417

I have tested with a Redis celery broker, and with a SQLAlchemy/MySQL
celery broker.
It works on Redis, and with SQLAlchemy/MySQL.

With this patch, I no longer get this exception:

 File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/strategies.py",
line 160, in create
    engineclass.__name__))
TypeError: Invalid argument(s) 'ssl_key','ssl_cert','ssl_acti
ve','visibility_timeout','ssl_cacert' sent to create_engine(), using
configuration MySQLDialect_mysqldb/QueuePool/Engine.  Please check that the
keyword arguments are appropriate for this combination of components.

--
Craig

Re: celery problem: cannot override celery_broker_transport_options

Posted by Craig Rodrigues <ro...@crodrigues.org>.
You are right!!
This is an interesting puzzle to follow, but we are unravelling it!
--
Craig

On Thu, May 24, 2018 at 8:39 AM Ash Berlin-Taylor <
ash_airflowlist@firemirror.com> wrote:

> Kombu (a library Celery uses) 4.1.0 added it back in
> https://github.com/celery/kombu/blob/master/Changelog#L75-L99 <
> https://github.com/celery/kombu/blob/master/Changelog#L75-L99> - I
> _thought_ that means it's supported in Celery again...?
>
>
> > On 24 May 2018, at 16:34, Craig Rodrigues <ro...@crodrigues.org>
> wrote:
> >
> > Removal of sqla as a backend is mentioned in these release notes for
> celery
> > 4.0:
> >
> >
> http://docs.celeryproject.org/en/latest/whatsnew-4.0.html#features-removed-for-lack-of-funding
> >
> > --
> > Craig
> >
> >
> > On Thu, May 24, 2018 at 8:32 AM Craig Rodrigues <ro...@crodrigues.org>
> > wrote:
> >
> >> It looks like in Celery, the documentation for sqla broker was removed:
> >>
> >>
> >>
> https://github.com/celery/celery/commit/79810a26a116e9881c42a14d856fa94c40fefcd8#diff-29ccf8c96d521253467909a652e6ded2
> >>
> >> I cannot find the pull request or release notes which document this.
> >>
> >> --
> >> Craig
> >>
> >>
> >> On Thu, May 24, 2018 at 8:19 AM Ash Berlin-Taylor <
> >> ash_airflowlist@firemirror.com> wrote:
> >>
> >>> Sounds like
> >>>
> https://github.com/apache/incubator-airflow/blob/v1-10-test/airflow/config_templates/default_celery.py#L31
> >>> <
> >>>
> https://github.com/apache/incubator-airflow/blob/v1-10-test/airflow/config_templates/default_celery.py#L31
> >
> >>> should be guarded in some way to only do that for a redis:// and sqs://
> >>> backends.
> >>>
> >>>
> >>>> On 24 May 2018, at 16:13, Craig Rodrigues <ro...@crodrigues.org>
> >>> wrote:
> >>>>
> >>>> Ash,
> >>>>
> >>>> According to this:
> >>>>
> >>>
> http://docs.celeryproject.org/en/latest/userguide/configuration.html#broker-settings
> >>>> visibility_timeout is supported by Redis and SQS.
> >>>>
> >>>> --
> >>>> Craig
> >>>>
> >>>>
> >>>> On Thu, May 24, 2018 at 8:07 AM Craig Rodrigues <
> rodrigc@crodrigues.org
> >>>>
> >>>> wrote:
> >>>>
> >>>>> Ash,
> >>>>>
> >>>>> Thanks again.  You are leading me on the right path!
> >>>>>
> >>>>> I can prepare a patch to move the ssl_ options into the celery
> section.
> >>>>>
> >>>>> What about visbility_timeout?  The error I am getting is:
> >>>>>
> >>>>> File
> >>> "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/strategies.py",
> >>>>> line 160, in create
> >>>>>   engineclass.__name__))
> >>>>> TypeError: Invalid argument(s)
> >>>>> 'ssl_key','ssl_cert','ssl_active','visibility_timeout','ssl_cacert'
> >>> sent to
> >>>>> create_engine(), using configuration
> >>>>> MySQLDialect_mysqldb/QueuePool/Engine.  Please check that the keyword
> >>>>> arguments are appropriate for this combination of components.
> >>>>>
> >>>>>
> >>>>> So it looks like visibility_timeout does not work with sqla as well.
> >>>>> --
> >>>>> Craig
> >>>>>
> >>>>>
> >>>>> On Thu, May 24, 2018 at 2:17 AM Ash Berlin-Taylor <
> >>>>> ash_airflowlist@firemirror.com> wrote:
> >>>>>
> >>>>>> Yes, you would need to duplicate a chunk of the default_celery in
> your
> >>>>>> copy right now. But you can just make it have the values you want -
> >>> so it
> >>>>>> would be about 10 lines in total.
> >>>>>>
> >>>>>> It seems that between AIRFLOW-966 and AIRFLOW-1840 things got a
> little
> >>>>>> bit out of sync in the default .cfg and the celery .py - the .py is
> >>> looking
> >>>>>> for celery->ssl_* but the default config puts it in
> >>>>>> celery_broker_transport_options->ssl_*. Looking at the celery config
> >>> option
> >>>>>> it looks like they aren't actually options for the other transports,
> >>> it's
> >>>>>> just that they don't complain about the extra options. I think the
> >>> fix is
> >>>>>> just to move them up to the celery section.
> >>>>>>
> >>>>>> -ash
> >>>>>>
> >>>>>>
> >>>>>>> On 24 May 2018, at 07:21, Craig Rodrigues <ro...@crodrigues.org>
> >>>>>> wrote:
> >>>>>>>
> >>>>>>> Ash,
> >>>>>>>
> >>>>>>> Thanks!   You put me on the right track.
> >>>>>>> Unfortunately, there is a lot of logic in
> >>>>>>> airflow/config_templates/default_celery.py that I need,
> >>>>>>> and if I was to come up with my own class to replace:
> >>>>>>>
> >>>>>>> celery_config_options =
> >>>>>>> airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG
> >>>>>>>
> >>>>>>> then I would basically have to rewrite my own version of
> >>>>>> default_celery.py
> >>>>>>>
> >>>>>>> Also, in airflow/config_templates/default_airflow.cfg, there is
> this:
> >>>>>>>
> >>>>>>> [celery_broker_transport_options]
> >>>>>>> # The visibility timeout defines the number of seconds to wait for
> >>> the
> >>>>>>> worker
> >>>>>>> # to acknowledge the task before the message is redelivered to
> >>> another
> >>>>>>> worker.
> >>>>>>> # Make sure to increase the visibility timeout to match the time of
> >>> the
> >>>>>>> longest
> >>>>>>> # ETA you're planning to use. Especially important in case of using
> >>>>>> Redis
> >>>>>>> or SQS
> >>>>>>> visibility_timeout = 21600
> >>>>>>>
> >>>>>>> # In case of using SSL
> >>>>>>> ssl_active = False
> >>>>>>> ssl_key =
> >>>>>>> ssl_cert =
> >>>>>>> ssl_cacert =
> >>>>>>>
> >>>>>>>
> >>>>>>> None of those options work if using an sqla backend for celery
> >>>>>> broker_url.
> >>>>>>>
> >>>>>>> I need to think about this, but this needs to be cleaned up before
> >>>>>>> Airflow 1.10 is released.  In
> >>>>>> airflow/config_templates/default_airflow.cfg
> >>>>>>> there is this:
> >>>>>>>
> >>>>>>> broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow
> >>>>>>>
> >>>>>>>
> >>>>>>> So sqla is specified as the default broker_url for celery.
> >>>>>>> A lot of people (including where I work) have used this template to
> >>> set
> >>>>>> up
> >>>>>>> Airflow + Celery, and even though sqla is an "experimental"
> >>> broker_url,
> >>>>>>> it actually works pretty well.
> >>>>>>>
> >>>>>>> Now in airflow 1.10, something that was very easy to set up is now
> >>>>>> really
> >>>>>>> complicated and unintuitive.
> >>>>>>>
> >>>>>>> Would it be OK to change the airflow code so that
> >>>>>>> in airflow/config_templates/default_airflow.cfg, all the options in
> >>>>>>> [celery_broker_transport_options] are commented out?
> >>>>>>> And if someone is running Redis, they would have to add those
> >>>>>>> options in their own airflow.cfg file?
> >>>>>>>
> >>>>>>> Bolke, do you have any comments?
> >>>>>>>
> >>>>>>> --
> >>>>>>> Craig
> >>>>>>>
> >>>>>>> On Tue, May 22, 2018 at 1:50 AM Ash Berlin-Taylor <
> >>>>>>> ash_airflowlist@firemirror.com> wrote:
> >>>>>>>
> >>>>>>>> To use with the SQLA backend to celery you need to override the
> >>> options
> >>>>>>>> Airflow passes to Celery. Those come from
> >>>>>>>>
> >>>>>>
> >>>
> https://github.com/apache/incubator-airflow/blob/v1-10-test/airflow/config_templates/default_celery.py
> >>>>>>>>
> >>>>>>>> Since you don't want most/all of those options (and there is no
> way
> >>> in
> >>>>>> the
> >>>>>>>> config file to _remove_ a setting) you will have to point airflow
> >>> to a
> >>>>>>>> different file for the celery config:
> >>>>>>>>
> >>>>>>>> This line in the config is what you will need to change:
> >>>>>>>>
> >>>>>>>>  # Import path for celery configuration options
> >>>>>>>>  celery_config_options =
> >>>>>>>> airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG
> >>>>>>>>
> >>>>>>>> If you create something like config/celery_config.py containing:
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>  CELERY_CONFIG = {
> >>>>>>>>      # Just the options you want to set
> >>>>>>>>  }
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> (config/ should exist along side your dags/ folder, and I think it
> >>>>>> should
> >>>>>>>> be added to the python path already). You can then set this in the
> >>>>>> config:
> >>>>>>>>
> >>>>>>>>  celery_config_options = celery_config.CELERY_CONFIG
> >>>>>>>>
> >>>>>>>> That should give you complete control
> >>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>>>
> >>>
> >>>
>
>

Re: celery problem: cannot override celery_broker_transport_options

Posted by Ash Berlin-Taylor <as...@firemirror.com>.
Kombu (a library Celery uses) 4.1.0 added it back in https://github.com/celery/kombu/blob/master/Changelog#L75-L99 <https://github.com/celery/kombu/blob/master/Changelog#L75-L99> - I _thought_ that means it's supported in Celery again...?


> On 24 May 2018, at 16:34, Craig Rodrigues <ro...@crodrigues.org> wrote:
> 
> Removal of sqla as a backend is mentioned in these release notes for celery
> 4.0:
> 
> http://docs.celeryproject.org/en/latest/whatsnew-4.0.html#features-removed-for-lack-of-funding
> 
> --
> Craig
> 
> 
> On Thu, May 24, 2018 at 8:32 AM Craig Rodrigues <ro...@crodrigues.org>
> wrote:
> 
>> It looks like in Celery, the documentation for sqla broker was removed:
>> 
>> 
>> https://github.com/celery/celery/commit/79810a26a116e9881c42a14d856fa94c40fefcd8#diff-29ccf8c96d521253467909a652e6ded2
>> 
>> I cannot find the pull request or release notes which document this.
>> 
>> --
>> Craig
>> 
>> 
>> On Thu, May 24, 2018 at 8:19 AM Ash Berlin-Taylor <
>> ash_airflowlist@firemirror.com> wrote:
>> 
>>> Sounds like
>>> https://github.com/apache/incubator-airflow/blob/v1-10-test/airflow/config_templates/default_celery.py#L31
>>> <
>>> https://github.com/apache/incubator-airflow/blob/v1-10-test/airflow/config_templates/default_celery.py#L31>
>>> should be guarded in some way to only do that for a redis:// and sqs://
>>> backends.
>>> 
>>> 
>>>> On 24 May 2018, at 16:13, Craig Rodrigues <ro...@crodrigues.org>
>>> wrote:
>>>> 
>>>> Ash,
>>>> 
>>>> According to this:
>>>> 
>>> http://docs.celeryproject.org/en/latest/userguide/configuration.html#broker-settings
>>>> visibility_timeout is supported by Redis and SQS.
>>>> 
>>>> --
>>>> Craig
>>>> 
>>>> 
>>>> On Thu, May 24, 2018 at 8:07 AM Craig Rodrigues <rodrigc@crodrigues.org
>>>> 
>>>> wrote:
>>>> 
>>>>> Ash,
>>>>> 
>>>>> Thanks again.  You are leading me on the right path!
>>>>> 
>>>>> I can prepare a patch to move the ssl_ options into the celery section.
>>>>> 
>>>>> What about visbility_timeout?  The error I am getting is:
>>>>> 
>>>>> File
>>> "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/strategies.py",
>>>>> line 160, in create
>>>>>   engineclass.__name__))
>>>>> TypeError: Invalid argument(s)
>>>>> 'ssl_key','ssl_cert','ssl_active','visibility_timeout','ssl_cacert'
>>> sent to
>>>>> create_engine(), using configuration
>>>>> MySQLDialect_mysqldb/QueuePool/Engine.  Please check that the keyword
>>>>> arguments are appropriate for this combination of components.
>>>>> 
>>>>> 
>>>>> So it looks like visibility_timeout does not work with sqla as well.
>>>>> --
>>>>> Craig
>>>>> 
>>>>> 
>>>>> On Thu, May 24, 2018 at 2:17 AM Ash Berlin-Taylor <
>>>>> ash_airflowlist@firemirror.com> wrote:
>>>>> 
>>>>>> Yes, you would need to duplicate a chunk of the default_celery in your
>>>>>> copy right now. But you can just make it have the values you want -
>>> so it
>>>>>> would be about 10 lines in total.
>>>>>> 
>>>>>> It seems that between AIRFLOW-966 and AIRFLOW-1840 things got a little
>>>>>> bit out of sync in the default .cfg and the celery .py - the .py is
>>> looking
>>>>>> for celery->ssl_* but the default config puts it in
>>>>>> celery_broker_transport_options->ssl_*. Looking at the celery config
>>> option
>>>>>> it looks like they aren't actually options for the other transports,
>>> it's
>>>>>> just that they don't complain about the extra options. I think the
>>> fix is
>>>>>> just to move them up to the celery section.
>>>>>> 
>>>>>> -ash
>>>>>> 
>>>>>> 
>>>>>>> On 24 May 2018, at 07:21, Craig Rodrigues <ro...@crodrigues.org>
>>>>>> wrote:
>>>>>>> 
>>>>>>> Ash,
>>>>>>> 
>>>>>>> Thanks!   You put me on the right track.
>>>>>>> Unfortunately, there is a lot of logic in
>>>>>>> airflow/config_templates/default_celery.py that I need,
>>>>>>> and if I was to come up with my own class to replace:
>>>>>>> 
>>>>>>> celery_config_options =
>>>>>>> airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG
>>>>>>> 
>>>>>>> then I would basically have to rewrite my own version of
>>>>>> default_celery.py
>>>>>>> 
>>>>>>> Also, in airflow/config_templates/default_airflow.cfg, there is this:
>>>>>>> 
>>>>>>> [celery_broker_transport_options]
>>>>>>> # The visibility timeout defines the number of seconds to wait for
>>> the
>>>>>>> worker
>>>>>>> # to acknowledge the task before the message is redelivered to
>>> another
>>>>>>> worker.
>>>>>>> # Make sure to increase the visibility timeout to match the time of
>>> the
>>>>>>> longest
>>>>>>> # ETA you're planning to use. Especially important in case of using
>>>>>> Redis
>>>>>>> or SQS
>>>>>>> visibility_timeout = 21600
>>>>>>> 
>>>>>>> # In case of using SSL
>>>>>>> ssl_active = False
>>>>>>> ssl_key =
>>>>>>> ssl_cert =
>>>>>>> ssl_cacert =
>>>>>>> 
>>>>>>> 
>>>>>>> None of those options work if using an sqla backend for celery
>>>>>> broker_url.
>>>>>>> 
>>>>>>> I need to think about this, but this needs to be cleaned up before
>>>>>>> Airflow 1.10 is released.  In
>>>>>> airflow/config_templates/default_airflow.cfg
>>>>>>> there is this:
>>>>>>> 
>>>>>>> broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow
>>>>>>> 
>>>>>>> 
>>>>>>> So sqla is specified as the default broker_url for celery.
>>>>>>> A lot of people (including where I work) have used this template to
>>> set
>>>>>> up
>>>>>>> Airflow + Celery, and even though sqla is an "experimental"
>>> broker_url,
>>>>>>> it actually works pretty well.
>>>>>>> 
>>>>>>> Now in airflow 1.10, something that was very easy to set up is now
>>>>>> really
>>>>>>> complicated and unintuitive.
>>>>>>> 
>>>>>>> Would it be OK to change the airflow code so that
>>>>>>> in airflow/config_templates/default_airflow.cfg, all the options in
>>>>>>> [celery_broker_transport_options] are commented out?
>>>>>>> And if someone is running Redis, they would have to add those
>>>>>>> options in their own airflow.cfg file?
>>>>>>> 
>>>>>>> Bolke, do you have any comments?
>>>>>>> 
>>>>>>> --
>>>>>>> Craig
>>>>>>> 
>>>>>>> On Tue, May 22, 2018 at 1:50 AM Ash Berlin-Taylor <
>>>>>>> ash_airflowlist@firemirror.com> wrote:
>>>>>>> 
>>>>>>>> To use with the SQLA backend to celery you need to override the
>>> options
>>>>>>>> Airflow passes to Celery. Those come from
>>>>>>>> 
>>>>>> 
>>> https://github.com/apache/incubator-airflow/blob/v1-10-test/airflow/config_templates/default_celery.py
>>>>>>>> 
>>>>>>>> Since you don't want most/all of those options (and there is no way
>>> in
>>>>>> the
>>>>>>>> config file to _remove_ a setting) you will have to point airflow
>>> to a
>>>>>>>> different file for the celery config:
>>>>>>>> 
>>>>>>>> This line in the config is what you will need to change:
>>>>>>>> 
>>>>>>>>  # Import path for celery configuration options
>>>>>>>>  celery_config_options =
>>>>>>>> airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG
>>>>>>>> 
>>>>>>>> If you create something like config/celery_config.py containing:
>>>>>>>> 
>>>>>>>> 
>>>>>>>>  CELERY_CONFIG = {
>>>>>>>>      # Just the options you want to set
>>>>>>>>  }
>>>>>>>> 
>>>>>>>> 
>>>>>>>> (config/ should exist along side your dags/ folder, and I think it
>>>>>> should
>>>>>>>> be added to the python path already). You can then set this in the
>>>>>> config:
>>>>>>>> 
>>>>>>>>  celery_config_options = celery_config.CELERY_CONFIG
>>>>>>>> 
>>>>>>>> That should give you complete control
>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>>> 
>>> 
>>> 


Re: celery problem: cannot override celery_broker_transport_options

Posted by Craig Rodrigues <ro...@crodrigues.org>.
Removal of sqla as a backend is mentioned in these release notes for celery
4.0:

http://docs.celeryproject.org/en/latest/whatsnew-4.0.html#features-removed-for-lack-of-funding

--
Craig


On Thu, May 24, 2018 at 8:32 AM Craig Rodrigues <ro...@crodrigues.org>
wrote:

> It looks like in Celery, the documentation for sqla broker was removed:
>
>
> https://github.com/celery/celery/commit/79810a26a116e9881c42a14d856fa94c40fefcd8#diff-29ccf8c96d521253467909a652e6ded2
>
> I cannot find the pull request or release notes which document this.
>
> --
> Craig
>
>
> On Thu, May 24, 2018 at 8:19 AM Ash Berlin-Taylor <
> ash_airflowlist@firemirror.com> wrote:
>
>> Sounds like
>> https://github.com/apache/incubator-airflow/blob/v1-10-test/airflow/config_templates/default_celery.py#L31
>> <
>> https://github.com/apache/incubator-airflow/blob/v1-10-test/airflow/config_templates/default_celery.py#L31>
>> should be guarded in some way to only do that for a redis:// and sqs://
>> backends.
>>
>>
>> > On 24 May 2018, at 16:13, Craig Rodrigues <ro...@crodrigues.org>
>> wrote:
>> >
>> > Ash,
>> >
>> > According to this:
>> >
>> http://docs.celeryproject.org/en/latest/userguide/configuration.html#broker-settings
>> > visibility_timeout is supported by Redis and SQS.
>> >
>> > --
>> > Craig
>> >
>> >
>> > On Thu, May 24, 2018 at 8:07 AM Craig Rodrigues <rodrigc@crodrigues.org
>> >
>> > wrote:
>> >
>> >> Ash,
>> >>
>> >> Thanks again.  You are leading me on the right path!
>> >>
>> >> I can prepare a patch to move the ssl_ options into the celery section.
>> >>
>> >> What about visbility_timeout?  The error I am getting is:
>> >>
>> >> File
>> "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/strategies.py",
>> >> line 160, in create
>> >>    engineclass.__name__))
>> >> TypeError: Invalid argument(s)
>> >> 'ssl_key','ssl_cert','ssl_active','visibility_timeout','ssl_cacert'
>> sent to
>> >> create_engine(), using configuration
>> >> MySQLDialect_mysqldb/QueuePool/Engine.  Please check that the keyword
>> >> arguments are appropriate for this combination of components.
>> >>
>> >>
>> >> So it looks like visibility_timeout does not work with sqla as well.
>> >> --
>> >> Craig
>> >>
>> >>
>> >> On Thu, May 24, 2018 at 2:17 AM Ash Berlin-Taylor <
>> >> ash_airflowlist@firemirror.com> wrote:
>> >>
>> >>> Yes, you would need to duplicate a chunk of the default_celery in your
>> >>> copy right now. But you can just make it have the values you want -
>> so it
>> >>> would be about 10 lines in total.
>> >>>
>> >>> It seems that between AIRFLOW-966 and AIRFLOW-1840 things got a little
>> >>> bit out of sync in the default .cfg and the celery .py - the .py is
>> looking
>> >>> for celery->ssl_* but the default config puts it in
>> >>> celery_broker_transport_options->ssl_*. Looking at the celery config
>> option
>> >>> it looks like they aren't actually options for the other transports,
>> it's
>> >>> just that they don't complain about the extra options. I think the
>> fix is
>> >>> just to move them up to the celery section.
>> >>>
>> >>> -ash
>> >>>
>> >>>
>> >>>> On 24 May 2018, at 07:21, Craig Rodrigues <ro...@crodrigues.org>
>> >>> wrote:
>> >>>>
>> >>>> Ash,
>> >>>>
>> >>>> Thanks!   You put me on the right track.
>> >>>> Unfortunately, there is a lot of logic in
>> >>>> airflow/config_templates/default_celery.py that I need,
>> >>>> and if I was to come up with my own class to replace:
>> >>>>
>> >>>> celery_config_options =
>> >>>> airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG
>> >>>>
>> >>>> then I would basically have to rewrite my own version of
>> >>> default_celery.py
>> >>>>
>> >>>> Also, in airflow/config_templates/default_airflow.cfg, there is this:
>> >>>>
>> >>>> [celery_broker_transport_options]
>> >>>> # The visibility timeout defines the number of seconds to wait for
>> the
>> >>>> worker
>> >>>> # to acknowledge the task before the message is redelivered to
>> another
>> >>>> worker.
>> >>>> # Make sure to increase the visibility timeout to match the time of
>> the
>> >>>> longest
>> >>>> # ETA you're planning to use. Especially important in case of using
>> >>> Redis
>> >>>> or SQS
>> >>>> visibility_timeout = 21600
>> >>>>
>> >>>> # In case of using SSL
>> >>>> ssl_active = False
>> >>>> ssl_key =
>> >>>> ssl_cert =
>> >>>> ssl_cacert =
>> >>>>
>> >>>>
>> >>>> None of those options work if using an sqla backend for celery
>> >>> broker_url.
>> >>>>
>> >>>> I need to think about this, but this needs to be cleaned up before
>> >>>> Airflow 1.10 is released.  In
>> >>> airflow/config_templates/default_airflow.cfg
>> >>>> there is this:
>> >>>>
>> >>>> broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow
>> >>>>
>> >>>>
>> >>>> So sqla is specified as the default broker_url for celery.
>> >>>> A lot of people (including where I work) have used this template to
>> set
>> >>> up
>> >>>> Airflow + Celery, and even though sqla is an "experimental"
>> broker_url,
>> >>>> it actually works pretty well.
>> >>>>
>> >>>> Now in airflow 1.10, something that was very easy to set up is now
>> >>> really
>> >>>> complicated and unintuitive.
>> >>>>
>> >>>> Would it be OK to change the airflow code so that
>> >>>> in airflow/config_templates/default_airflow.cfg, all the options in
>> >>>> [celery_broker_transport_options] are commented out?
>> >>>> And if someone is running Redis, they would have to add those
>> >>>> options in their own airflow.cfg file?
>> >>>>
>> >>>> Bolke, do you have any comments?
>> >>>>
>> >>>> --
>> >>>> Craig
>> >>>>
>> >>>> On Tue, May 22, 2018 at 1:50 AM Ash Berlin-Taylor <
>> >>>> ash_airflowlist@firemirror.com> wrote:
>> >>>>
>> >>>>> To use with the SQLA backend to celery you need to override the
>> options
>> >>>>> Airflow passes to Celery. Those come from
>> >>>>>
>> >>>
>> https://github.com/apache/incubator-airflow/blob/v1-10-test/airflow/config_templates/default_celery.py
>> >>>>>
>> >>>>> Since you don't want most/all of those options (and there is no way
>> in
>> >>> the
>> >>>>> config file to _remove_ a setting) you will have to point airflow
>> to a
>> >>>>> different file for the celery config:
>> >>>>>
>> >>>>> This line in the config is what you will need to change:
>> >>>>>
>> >>>>>   # Import path for celery configuration options
>> >>>>>   celery_config_options =
>> >>>>> airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG
>> >>>>>
>> >>>>> If you create something like config/celery_config.py containing:
>> >>>>>
>> >>>>>
>> >>>>>   CELERY_CONFIG = {
>> >>>>>       # Just the options you want to set
>> >>>>>   }
>> >>>>>
>> >>>>>
>> >>>>> (config/ should exist along side your dags/ folder, and I think it
>> >>> should
>> >>>>> be added to the python path already). You can then set this in the
>> >>> config:
>> >>>>>
>> >>>>>   celery_config_options = celery_config.CELERY_CONFIG
>> >>>>>
>> >>>>> That should give you complete control
>> >>>>>
>> >>>>>
>> >>>
>> >>>
>>
>>

Re: celery problem: cannot override celery_broker_transport_options

Posted by Craig Rodrigues <ro...@crodrigues.org>.
It looks like in Celery, the documentation for sqla broker was removed:

https://github.com/celery/celery/commit/79810a26a116e9881c42a14d856fa94c40fefcd8#diff-29ccf8c96d521253467909a652e6ded2

I cannot find the pull request or release notes which document this.

--
Craig


On Thu, May 24, 2018 at 8:19 AM Ash Berlin-Taylor <
ash_airflowlist@firemirror.com> wrote:

> Sounds like
> https://github.com/apache/incubator-airflow/blob/v1-10-test/airflow/config_templates/default_celery.py#L31
> <
> https://github.com/apache/incubator-airflow/blob/v1-10-test/airflow/config_templates/default_celery.py#L31>
> should be guarded in some way to only do that for a redis:// and sqs://
> backends.
>
>
> > On 24 May 2018, at 16:13, Craig Rodrigues <ro...@crodrigues.org>
> wrote:
> >
> > Ash,
> >
> > According to this:
> >
> http://docs.celeryproject.org/en/latest/userguide/configuration.html#broker-settings
> > visibility_timeout is supported by Redis and SQS.
> >
> > --
> > Craig
> >
> >
> > On Thu, May 24, 2018 at 8:07 AM Craig Rodrigues <ro...@crodrigues.org>
> > wrote:
> >
> >> Ash,
> >>
> >> Thanks again.  You are leading me on the right path!
> >>
> >> I can prepare a patch to move the ssl_ options into the celery section.
> >>
> >> What about visbility_timeout?  The error I am getting is:
> >>
> >> File
> "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/strategies.py",
> >> line 160, in create
> >>    engineclass.__name__))
> >> TypeError: Invalid argument(s)
> >> 'ssl_key','ssl_cert','ssl_active','visibility_timeout','ssl_cacert'
> sent to
> >> create_engine(), using configuration
> >> MySQLDialect_mysqldb/QueuePool/Engine.  Please check that the keyword
> >> arguments are appropriate for this combination of components.
> >>
> >>
> >> So it looks like visibility_timeout does not work with sqla as well.
> >> --
> >> Craig
> >>
> >>
> >> On Thu, May 24, 2018 at 2:17 AM Ash Berlin-Taylor <
> >> ash_airflowlist@firemirror.com> wrote:
> >>
> >>> Yes, you would need to duplicate a chunk of the default_celery in your
> >>> copy right now. But you can just make it have the values you want - so
> it
> >>> would be about 10 lines in total.
> >>>
> >>> It seems that between AIRFLOW-966 and AIRFLOW-1840 things got a little
> >>> bit out of sync in the default .cfg and the celery .py - the .py is
> looking
> >>> for celery->ssl_* but the default config puts it in
> >>> celery_broker_transport_options->ssl_*. Looking at the celery config
> option
> >>> it looks like they aren't actually options for the other transports,
> it's
> >>> just that they don't complain about the extra options. I think the fix
> is
> >>> just to move them up to the celery section.
> >>>
> >>> -ash
> >>>
> >>>
> >>>> On 24 May 2018, at 07:21, Craig Rodrigues <ro...@crodrigues.org>
> >>> wrote:
> >>>>
> >>>> Ash,
> >>>>
> >>>> Thanks!   You put me on the right track.
> >>>> Unfortunately, there is a lot of logic in
> >>>> airflow/config_templates/default_celery.py that I need,
> >>>> and if I was to come up with my own class to replace:
> >>>>
> >>>> celery_config_options =
> >>>> airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG
> >>>>
> >>>> then I would basically have to rewrite my own version of
> >>> default_celery.py
> >>>>
> >>>> Also, in airflow/config_templates/default_airflow.cfg, there is this:
> >>>>
> >>>> [celery_broker_transport_options]
> >>>> # The visibility timeout defines the number of seconds to wait for the
> >>>> worker
> >>>> # to acknowledge the task before the message is redelivered to another
> >>>> worker.
> >>>> # Make sure to increase the visibility timeout to match the time of
> the
> >>>> longest
> >>>> # ETA you're planning to use. Especially important in case of using
> >>> Redis
> >>>> or SQS
> >>>> visibility_timeout = 21600
> >>>>
> >>>> # In case of using SSL
> >>>> ssl_active = False
> >>>> ssl_key =
> >>>> ssl_cert =
> >>>> ssl_cacert =
> >>>>
> >>>>
> >>>> None of those options work if using an sqla backend for celery
> >>> broker_url.
> >>>>
> >>>> I need to think about this, but this needs to be cleaned up before
> >>>> Airflow 1.10 is released.  In
> >>> airflow/config_templates/default_airflow.cfg
> >>>> there is this:
> >>>>
> >>>> broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow
> >>>>
> >>>>
> >>>> So sqla is specified as the default broker_url for celery.
> >>>> A lot of people (including where I work) have used this template to
> set
> >>> up
> >>>> Airflow + Celery, and even though sqla is an "experimental"
> broker_url,
> >>>> it actually works pretty well.
> >>>>
> >>>> Now in airflow 1.10, something that was very easy to set up is now
> >>> really
> >>>> complicated and unintuitive.
> >>>>
> >>>> Would it be OK to change the airflow code so that
> >>>> in airflow/config_templates/default_airflow.cfg, all the options in
> >>>> [celery_broker_transport_options] are commented out?
> >>>> And if someone is running Redis, they would have to add those
> >>>> options in their own airflow.cfg file?
> >>>>
> >>>> Bolke, do you have any comments?
> >>>>
> >>>> --
> >>>> Craig
> >>>>
> >>>> On Tue, May 22, 2018 at 1:50 AM Ash Berlin-Taylor <
> >>>> ash_airflowlist@firemirror.com> wrote:
> >>>>
> >>>>> To use with the SQLA backend to celery you need to override the
> options
> >>>>> Airflow passes to Celery. Those come from
> >>>>>
> >>>
> https://github.com/apache/incubator-airflow/blob/v1-10-test/airflow/config_templates/default_celery.py
> >>>>>
> >>>>> Since you don't want most/all of those options (and there is no way
> in
> >>> the
> >>>>> config file to _remove_ a setting) you will have to point airflow to
> a
> >>>>> different file for the celery config:
> >>>>>
> >>>>> This line in the config is what you will need to change:
> >>>>>
> >>>>>   # Import path for celery configuration options
> >>>>>   celery_config_options =
> >>>>> airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG
> >>>>>
> >>>>> If you create something like config/celery_config.py containing:
> >>>>>
> >>>>>
> >>>>>   CELERY_CONFIG = {
> >>>>>       # Just the options you want to set
> >>>>>   }
> >>>>>
> >>>>>
> >>>>> (config/ should exist along side your dags/ folder, and I think it
> >>> should
> >>>>> be added to the python path already). You can then set this in the
> >>> config:
> >>>>>
> >>>>>   celery_config_options = celery_config.CELERY_CONFIG
> >>>>>
> >>>>> That should give you complete control
> >>>>>
> >>>>>
> >>>
> >>>
>
>

Re: celery problem: cannot override celery_broker_transport_options

Posted by Ash Berlin-Taylor <as...@firemirror.com>.
Sounds like https://github.com/apache/incubator-airflow/blob/v1-10-test/airflow/config_templates/default_celery.py#L31 <https://github.com/apache/incubator-airflow/blob/v1-10-test/airflow/config_templates/default_celery.py#L31> should be guarded in some way to only do that for a redis:// and sqs:// backends.


> On 24 May 2018, at 16:13, Craig Rodrigues <ro...@crodrigues.org> wrote:
> 
> Ash,
> 
> According to this:
> http://docs.celeryproject.org/en/latest/userguide/configuration.html#broker-settings
> visibility_timeout is supported by Redis and SQS.
> 
> --
> Craig
> 
> 
> On Thu, May 24, 2018 at 8:07 AM Craig Rodrigues <ro...@crodrigues.org>
> wrote:
> 
>> Ash,
>> 
>> Thanks again.  You are leading me on the right path!
>> 
>> I can prepare a patch to move the ssl_ options into the celery section.
>> 
>> What about visbility_timeout?  The error I am getting is:
>> 
>> File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/strategies.py",
>> line 160, in create
>>    engineclass.__name__))
>> TypeError: Invalid argument(s)
>> 'ssl_key','ssl_cert','ssl_active','visibility_timeout','ssl_cacert' sent to
>> create_engine(), using configuration
>> MySQLDialect_mysqldb/QueuePool/Engine.  Please check that the keyword
>> arguments are appropriate for this combination of components.
>> 
>> 
>> So it looks like visibility_timeout does not work with sqla as well.
>> --
>> Craig
>> 
>> 
>> On Thu, May 24, 2018 at 2:17 AM Ash Berlin-Taylor <
>> ash_airflowlist@firemirror.com> wrote:
>> 
>>> Yes, you would need to duplicate a chunk of the default_celery in your
>>> copy right now. But you can just make it have the values you want - so it
>>> would be about 10 lines in total.
>>> 
>>> It seems that between AIRFLOW-966 and AIRFLOW-1840 things got a little
>>> bit out of sync in the default .cfg and the celery .py - the .py is looking
>>> for celery->ssl_* but the default config puts it in
>>> celery_broker_transport_options->ssl_*. Looking at the celery config option
>>> it looks like they aren't actually options for the other transports, it's
>>> just that they don't complain about the extra options. I think the fix is
>>> just to move them up to the celery section.
>>> 
>>> -ash
>>> 
>>> 
>>>> On 24 May 2018, at 07:21, Craig Rodrigues <ro...@crodrigues.org>
>>> wrote:
>>>> 
>>>> Ash,
>>>> 
>>>> Thanks!   You put me on the right track.
>>>> Unfortunately, there is a lot of logic in
>>>> airflow/config_templates/default_celery.py that I need,
>>>> and if I was to come up with my own class to replace:
>>>> 
>>>> celery_config_options =
>>>> airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG
>>>> 
>>>> then I would basically have to rewrite my own version of
>>> default_celery.py
>>>> 
>>>> Also, in airflow/config_templates/default_airflow.cfg, there is this:
>>>> 
>>>> [celery_broker_transport_options]
>>>> # The visibility timeout defines the number of seconds to wait for the
>>>> worker
>>>> # to acknowledge the task before the message is redelivered to another
>>>> worker.
>>>> # Make sure to increase the visibility timeout to match the time of the
>>>> longest
>>>> # ETA you're planning to use. Especially important in case of using
>>> Redis
>>>> or SQS
>>>> visibility_timeout = 21600
>>>> 
>>>> # In case of using SSL
>>>> ssl_active = False
>>>> ssl_key =
>>>> ssl_cert =
>>>> ssl_cacert =
>>>> 
>>>> 
>>>> None of those options work if using an sqla backend for celery
>>> broker_url.
>>>> 
>>>> I need to think about this, but this needs to be cleaned up before
>>>> Airflow 1.10 is released.  In
>>> airflow/config_templates/default_airflow.cfg
>>>> there is this:
>>>> 
>>>> broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow
>>>> 
>>>> 
>>>> So sqla is specified as the default broker_url for celery.
>>>> A lot of people (including where I work) have used this template to set
>>> up
>>>> Airflow + Celery, and even though sqla is an "experimental" broker_url,
>>>> it actually works pretty well.
>>>> 
>>>> Now in airflow 1.10, something that was very easy to set up is now
>>> really
>>>> complicated and unintuitive.
>>>> 
>>>> Would it be OK to change the airflow code so that
>>>> in airflow/config_templates/default_airflow.cfg, all the options in
>>>> [celery_broker_transport_options] are commented out?
>>>> And if someone is running Redis, they would have to add those
>>>> options in their own airflow.cfg file?
>>>> 
>>>> Bolke, do you have any comments?
>>>> 
>>>> --
>>>> Craig
>>>> 
>>>> On Tue, May 22, 2018 at 1:50 AM Ash Berlin-Taylor <
>>>> ash_airflowlist@firemirror.com> wrote:
>>>> 
>>>>> To use with the SQLA backend to celery you need to override the options
>>>>> Airflow passes to Celery. Those come from
>>>>> 
>>> https://github.com/apache/incubator-airflow/blob/v1-10-test/airflow/config_templates/default_celery.py
>>>>> 
>>>>> Since you don't want most/all of those options (and there is no way in
>>> the
>>>>> config file to _remove_ a setting) you will have to point airflow to a
>>>>> different file for the celery config:
>>>>> 
>>>>> This line in the config is what you will need to change:
>>>>> 
>>>>>   # Import path for celery configuration options
>>>>>   celery_config_options =
>>>>> airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG
>>>>> 
>>>>> If you create something like config/celery_config.py containing:
>>>>> 
>>>>> 
>>>>>   CELERY_CONFIG = {
>>>>>       # Just the options you want to set
>>>>>   }
>>>>> 
>>>>> 
>>>>> (config/ should exist along side your dags/ folder, and I think it
>>> should
>>>>> be added to the python path already). You can then set this in the
>>> config:
>>>>> 
>>>>>   celery_config_options = celery_config.CELERY_CONFIG
>>>>> 
>>>>> That should give you complete control
>>>>> 
>>>>> 
>>> 
>>> 


Re: celery problem: cannot override celery_broker_transport_options

Posted by Craig Rodrigues <ro...@crodrigues.org>.
Ash,

According to this:
http://docs.celeryproject.org/en/latest/userguide/configuration.html#broker-settings
visibility_timeout is supported by Redis and SQS.

--
Craig


On Thu, May 24, 2018 at 8:07 AM Craig Rodrigues <ro...@crodrigues.org>
wrote:

> Ash,
>
> Thanks again.  You are leading me on the right path!
>
> I can prepare a patch to move the ssl_ options into the celery section.
>
> What about visbility_timeout?  The error I am getting is:
>
> File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/strategies.py",
> line 160, in create
>     engineclass.__name__))
> TypeError: Invalid argument(s)
> 'ssl_key','ssl_cert','ssl_active','visibility_timeout','ssl_cacert' sent to
> create_engine(), using configuration
> MySQLDialect_mysqldb/QueuePool/Engine.  Please check that the keyword
> arguments are appropriate for this combination of components.
>
>
> So it looks like visibility_timeout does not work with sqla as well.
> --
> Craig
>
>
> On Thu, May 24, 2018 at 2:17 AM Ash Berlin-Taylor <
> ash_airflowlist@firemirror.com> wrote:
>
>> Yes, you would need to duplicate a chunk of the default_celery in your
>> copy right now. But you can just make it have the values you want - so it
>> would be about 10 lines in total.
>>
>> It seems that between AIRFLOW-966 and AIRFLOW-1840 things got a little
>> bit out of sync in the default .cfg and the celery .py - the .py is looking
>> for celery->ssl_* but the default config puts it in
>> celery_broker_transport_options->ssl_*. Looking at the celery config option
>> it looks like they aren't actually options for the other transports, it's
>> just that they don't complain about the extra options. I think the fix is
>> just to move them up to the celery section.
>>
>> -ash
>>
>>
>> > On 24 May 2018, at 07:21, Craig Rodrigues <ro...@crodrigues.org>
>> wrote:
>> >
>> > Ash,
>> >
>> > Thanks!   You put me on the right track.
>> > Unfortunately, there is a lot of logic in
>> > airflow/config_templates/default_celery.py that I need,
>> > and if I was to come up with my own class to replace:
>> >
>> > celery_config_options =
>> > airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG
>> >
>> > then I would basically have to rewrite my own version of
>> default_celery.py
>> >
>> > Also, in airflow/config_templates/default_airflow.cfg, there is this:
>> >
>> > [celery_broker_transport_options]
>> > # The visibility timeout defines the number of seconds to wait for the
>> > worker
>> > # to acknowledge the task before the message is redelivered to another
>> > worker.
>> > # Make sure to increase the visibility timeout to match the time of the
>> > longest
>> > # ETA you're planning to use. Especially important in case of using
>> Redis
>> > or SQS
>> > visibility_timeout = 21600
>> >
>> > # In case of using SSL
>> > ssl_active = False
>> > ssl_key =
>> > ssl_cert =
>> > ssl_cacert =
>> >
>> >
>> > None of those options work if using an sqla backend for celery
>> broker_url.
>> >
>> > I need to think about this, but this needs to be cleaned up before
>> > Airflow 1.10 is released.  In
>> airflow/config_templates/default_airflow.cfg
>> > there is this:
>> >
>> > broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow
>> >
>> >
>> > So sqla is specified as the default broker_url for celery.
>> > A lot of people (including where I work) have used this template to set
>> up
>> > Airflow + Celery, and even though sqla is an "experimental" broker_url,
>> > it actually works pretty well.
>> >
>> > Now in airflow 1.10, something that was very easy to set up is now
>> really
>> > complicated and unintuitive.
>> >
>> > Would it be OK to change the airflow code so that
>> > in airflow/config_templates/default_airflow.cfg, all the options in
>> > [celery_broker_transport_options] are commented out?
>> > And if someone is running Redis, they would have to add those
>> > options in their own airflow.cfg file?
>> >
>> > Bolke, do you have any comments?
>> >
>> > --
>> > Craig
>> >
>> > On Tue, May 22, 2018 at 1:50 AM Ash Berlin-Taylor <
>> > ash_airflowlist@firemirror.com> wrote:
>> >
>> >> To use with the SQLA backend to celery you need to override the options
>> >> Airflow passes to Celery. Those come from
>> >>
>> https://github.com/apache/incubator-airflow/blob/v1-10-test/airflow/config_templates/default_celery.py
>> >>
>> >> Since you don't want most/all of those options (and there is no way in
>> the
>> >> config file to _remove_ a setting) you will have to point airflow to a
>> >> different file for the celery config:
>> >>
>> >> This line in the config is what you will need to change:
>> >>
>> >>    # Import path for celery configuration options
>> >>    celery_config_options =
>> >> airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG
>> >>
>> >> If you create something like config/celery_config.py containing:
>> >>
>> >>
>> >>    CELERY_CONFIG = {
>> >>        # Just the options you want to set
>> >>    }
>> >>
>> >>
>> >> (config/ should exist along side your dags/ folder, and I think it
>> should
>> >> be added to the python path already). You can then set this in the
>> config:
>> >>
>> >>    celery_config_options = celery_config.CELERY_CONFIG
>> >>
>> >> That should give you complete control
>> >>
>> >>
>>
>>

Re: celery problem: cannot override celery_broker_transport_options

Posted by Craig Rodrigues <ro...@crodrigues.org>.
Ash,

Thanks again.  You are leading me on the right path!

I can prepare a patch to move the ssl_ options into the celery section.

What about visbility_timeout?  The error I am getting is:

File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/strategies.py",
line 160, in create
    engineclass.__name__))
TypeError: Invalid argument(s)
'ssl_key','ssl_cert','ssl_active','visibility_timeout','ssl_cacert' sent to
create_engine(), using configuration
MySQLDialect_mysqldb/QueuePool/Engine.  Please check that the keyword
arguments are appropriate for this combination of components.


So it looks like visibility_timeout does not work with sqla as well.
--
Craig


On Thu, May 24, 2018 at 2:17 AM Ash Berlin-Taylor <
ash_airflowlist@firemirror.com> wrote:

> Yes, you would need to duplicate a chunk of the default_celery in your
> copy right now. But you can just make it have the values you want - so it
> would be about 10 lines in total.
>
> It seems that between AIRFLOW-966 and AIRFLOW-1840 things got a little bit
> out of sync in the default .cfg and the celery .py - the .py is looking for
> celery->ssl_* but the default config puts it in
> celery_broker_transport_options->ssl_*. Looking at the celery config option
> it looks like they aren't actually options for the other transports, it's
> just that they don't complain about the extra options. I think the fix is
> just to move them up to the celery section.
>
> -ash
>
>
> > On 24 May 2018, at 07:21, Craig Rodrigues <ro...@crodrigues.org>
> wrote:
> >
> > Ash,
> >
> > Thanks!   You put me on the right track.
> > Unfortunately, there is a lot of logic in
> > airflow/config_templates/default_celery.py that I need,
> > and if I was to come up with my own class to replace:
> >
> > celery_config_options =
> > airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG
> >
> > then I would basically have to rewrite my own version of
> default_celery.py
> >
> > Also, in airflow/config_templates/default_airflow.cfg, there is this:
> >
> > [celery_broker_transport_options]
> > # The visibility timeout defines the number of seconds to wait for the
> > worker
> > # to acknowledge the task before the message is redelivered to another
> > worker.
> > # Make sure to increase the visibility timeout to match the time of the
> > longest
> > # ETA you're planning to use. Especially important in case of using Redis
> > or SQS
> > visibility_timeout = 21600
> >
> > # In case of using SSL
> > ssl_active = False
> > ssl_key =
> > ssl_cert =
> > ssl_cacert =
> >
> >
> > None of those options work if using an sqla backend for celery
> broker_url.
> >
> > I need to think about this, but this needs to be cleaned up before
> > Airflow 1.10 is released.  In
> airflow/config_templates/default_airflow.cfg
> > there is this:
> >
> > broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow
> >
> >
> > So sqla is specified as the default broker_url for celery.
> > A lot of people (including where I work) have used this template to set
> up
> > Airflow + Celery, and even though sqla is an "experimental" broker_url,
> > it actually works pretty well.
> >
> > Now in airflow 1.10, something that was very easy to set up is now really
> > complicated and unintuitive.
> >
> > Would it be OK to change the airflow code so that
> > in airflow/config_templates/default_airflow.cfg, all the options in
> > [celery_broker_transport_options] are commented out?
> > And if someone is running Redis, they would have to add those
> > options in their own airflow.cfg file?
> >
> > Bolke, do you have any comments?
> >
> > --
> > Craig
> >
> > On Tue, May 22, 2018 at 1:50 AM Ash Berlin-Taylor <
> > ash_airflowlist@firemirror.com> wrote:
> >
> >> To use with the SQLA backend to celery you need to override the options
> >> Airflow passes to Celery. Those come from
> >>
> https://github.com/apache/incubator-airflow/blob/v1-10-test/airflow/config_templates/default_celery.py
> >>
> >> Since you don't want most/all of those options (and there is no way in
> the
> >> config file to _remove_ a setting) you will have to point airflow to a
> >> different file for the celery config:
> >>
> >> This line in the config is what you will need to change:
> >>
> >>    # Import path for celery configuration options
> >>    celery_config_options =
> >> airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG
> >>
> >> If you create something like config/celery_config.py containing:
> >>
> >>
> >>    CELERY_CONFIG = {
> >>        # Just the options you want to set
> >>    }
> >>
> >>
> >> (config/ should exist along side your dags/ folder, and I think it
> should
> >> be added to the python path already). You can then set this in the
> config:
> >>
> >>    celery_config_options = celery_config.CELERY_CONFIG
> >>
> >> That should give you complete control
> >>
> >>
>
>

Re: celery problem: cannot override celery_broker_transport_options

Posted by Ash Berlin-Taylor <as...@firemirror.com>.
Yes, you would need to duplicate a chunk of the default_celery in your copy right now. But you can just make it have the values you want - so it would be about 10 lines in total.

It seems that between AIRFLOW-966 and AIRFLOW-1840 things got a little bit out of sync in the default .cfg and the celery .py - the .py is looking for celery->ssl_* but the default config puts it in celery_broker_transport_options->ssl_*. Looking at the celery config option it looks like they aren't actually options for the other transports, it's just that they don't complain about the extra options. I think the fix is just to move them up to the celery section.

-ash


> On 24 May 2018, at 07:21, Craig Rodrigues <ro...@crodrigues.org> wrote:
> 
> Ash,
> 
> Thanks!   You put me on the right track.
> Unfortunately, there is a lot of logic in
> airflow/config_templates/default_celery.py that I need,
> and if I was to come up with my own class to replace:
> 
> celery_config_options =
> airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG
> 
> then I would basically have to rewrite my own version of default_celery.py
> 
> Also, in airflow/config_templates/default_airflow.cfg, there is this:
> 
> [celery_broker_transport_options]
> # The visibility timeout defines the number of seconds to wait for the
> worker
> # to acknowledge the task before the message is redelivered to another
> worker.
> # Make sure to increase the visibility timeout to match the time of the
> longest
> # ETA you're planning to use. Especially important in case of using Redis
> or SQS
> visibility_timeout = 21600
> 
> # In case of using SSL
> ssl_active = False
> ssl_key =
> ssl_cert =
> ssl_cacert =
> 
> 
> None of those options work if using an sqla backend for celery broker_url.
> 
> I need to think about this, but this needs to be cleaned up before
> Airflow 1.10 is released.  In airflow/config_templates/default_airflow.cfg
> there is this:
> 
> broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow
> 
> 
> So sqla is specified as the default broker_url for celery.
> A lot of people (including where I work) have used this template to set up
> Airflow + Celery, and even though sqla is an "experimental" broker_url,
> it actually works pretty well.
> 
> Now in airflow 1.10, something that was very easy to set up is now really
> complicated and unintuitive.
> 
> Would it be OK to change the airflow code so that
> in airflow/config_templates/default_airflow.cfg, all the options in
> [celery_broker_transport_options] are commented out?
> And if someone is running Redis, they would have to add those
> options in their own airflow.cfg file?
> 
> Bolke, do you have any comments?
> 
> --
> Craig
> 
> On Tue, May 22, 2018 at 1:50 AM Ash Berlin-Taylor <
> ash_airflowlist@firemirror.com> wrote:
> 
>> To use with the SQLA backend to celery you need to override the options
>> Airflow passes to Celery. Those come from
>> https://github.com/apache/incubator-airflow/blob/v1-10-test/airflow/config_templates/default_celery.py
>> 
>> Since you don't want most/all of those options (and there is no way in the
>> config file to _remove_ a setting) you will have to point airflow to a
>> different file for the celery config:
>> 
>> This line in the config is what you will need to change:
>> 
>>    # Import path for celery configuration options
>>    celery_config_options =
>> airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG
>> 
>> If you create something like config/celery_config.py containing:
>> 
>> 
>>    CELERY_CONFIG = {
>>        # Just the options you want to set
>>    }
>> 
>> 
>> (config/ should exist along side your dags/ folder, and I think it should
>> be added to the python path already). You can then set this in the config:
>> 
>>    celery_config_options = celery_config.CELERY_CONFIG
>> 
>> That should give you complete control
>> 
>> 


Re: celery problem: cannot override celery_broker_transport_options

Posted by Craig Rodrigues <ro...@crodrigues.org>.
Ash,

Thanks!   You put me on the right track.
Unfortunately, there is a lot of logic in
airflow/config_templates/default_celery.py that I need,
and if I was to come up with my own class to replace:

celery_config_options =
airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG

then I would basically have to rewrite my own version of default_celery.py

Also, in airflow/config_templates/default_airflow.cfg, there is this:

[celery_broker_transport_options]
# The visibility timeout defines the number of seconds to wait for the
worker
# to acknowledge the task before the message is redelivered to another
worker.
# Make sure to increase the visibility timeout to match the time of the
longest
# ETA you're planning to use. Especially important in case of using Redis
or SQS
visibility_timeout = 21600

# In case of using SSL
ssl_active = False
ssl_key =
ssl_cert =
ssl_cacert =


None of those options work if using an sqla backend for celery broker_url.

I need to think about this, but this needs to be cleaned up before
Airflow 1.10 is released.  In airflow/config_templates/default_airflow.cfg
there is this:

broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow


So sqla is specified as the default broker_url for celery.
A lot of people (including where I work) have used this template to set up
Airflow + Celery, and even though sqla is an "experimental" broker_url,
it actually works pretty well.

Now in airflow 1.10, something that was very easy to set up is now really
complicated and unintuitive.

Would it be OK to change the airflow code so that
in airflow/config_templates/default_airflow.cfg, all the options in
[celery_broker_transport_options] are commented out?
And if someone is running Redis, they would have to add those
options in their own airflow.cfg file?

Bolke, do you have any comments?

--
Craig

On Tue, May 22, 2018 at 1:50 AM Ash Berlin-Taylor <
ash_airflowlist@firemirror.com> wrote:

> To use with the SQLA backend to celery you need to override the options
> Airflow passes to Celery. Those come from
> https://github.com/apache/incubator-airflow/blob/v1-10-test/airflow/config_templates/default_celery.py
>
> Since you don't want most/all of those options (and there is no way in the
> config file to _remove_ a setting) you will have to point airflow to a
> different file for the celery config:
>
> This line in the config is what you will need to change:
>
>     # Import path for celery configuration options
>     celery_config_options =
> airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG
>
> If you create something like config/celery_config.py containing:
>
>
>     CELERY_CONFIG = {
>         # Just the options you want to set
>     }
>
>
> (config/ should exist along side your dags/ folder, and I think it should
> be added to the python path already). You can then set this in the config:
>
>     celery_config_options = celery_config.CELERY_CONFIG
>
> That should give you complete control
>
>

Re: celery problem: cannot override celery_broker_transport_options

Posted by Ash Berlin-Taylor <as...@firemirror.com>.
To use with the SQLA backend to celery you need to override the options Airflow passes to Celery. Those come from https://github.com/apache/incubator-airflow/blob/v1-10-test/airflow/config_templates/default_celery.py

Since you don't want most/all of those options (and there is no way in the config file to _remove_ a setting) you will have to point airflow to a different file for the celery config:

This line in the config is what you will need to change:

    # Import path for celery configuration options
    celery_config_options = airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG

If you create something like config/celery_config.py containing:


    CELERY_CONFIG = {
        # Just the options you want to set
    }


(config/ should exist along side your dags/ folder, and I think it should be added to the python path already). You can then set this in the config:

    celery_config_options = celery_config.CELERY_CONFIG

That should give you complete control

> On 21 May 2018, at 09:50, Craig Rodrigues <cr...@gmail.com> wrote:
> 
> Hi,
> 
> I used this requirements.txt file to install airflow from the v1-10-test branch:
> 
> git+https://github.com/celery/celery@master#egg=celery
> git+https://github.com/apache/incubator-airflow@v1-10-test#egg=apache-airflow[celery,crypto,emr,hive,hdfs,ldap,mysql,postgres,redis,slack,s3]
> kombu>=4.1.0
> 
> 
> In my airflow.cfg, I have:
> 
> [celery]
> executor = CeleryExecutor
> 
> executor = CeleryExec
> broker_url = sqla+mysql://airflow:blah@localhost:3306/mydb
> 
> [celery_broker_transport_options]
> #
> #
> 
> However, if I manually run this code inside the webserver, I see:
> 
> python -c "from airflow import configuration; c = configuration.conf.getsection('celery_broker_transport_options'); print(c)"
> OrderedDict([(u'visibility_timeout', 21600), (u'ssl_active', False), (u'ssl_key', u''), (u'ssl_cert', u''), (u'ssl_cacert', u'')])
> 
> My worker crashes with this error:
> 
> 
> [2018-05-21 07:46:12,406] {configuration.py:212} WARNING - section/key [celery/ssl_active] not found in config
> [2018-05-21 07:46:12,407] {default_celery.py:51} WARNING - Celery Executor will run without SSL
> [2018-05-21 07:46:12,411] {__init__.py:48} INFO - Using executor CeleryExecutor
> [2018-05-21 07:46:13,086: CRITICAL/MainProcess] Unrecoverable error: TypeError(u"Invalid argument(s) 'ssl_key','ssl_cert','ssl_active','visibility_timeout','ssl_cacert' sent to create_engine(), using configuration MySQLDialect_mysqldb/QueuePool/Engine.  Please check that the keyword arguments are appropriate for this combination of components.",)
> Traceback (most recent call last):
>  File "/usr/lib/python2.7/site-packages/celery/worker/worker.py", line 205, in start
>    self.blueprint.start(self)
>  File "/usr/lib/python2.7/site-packages/celery/bootsteps.py", line 119, in start
>    step.start(parent)
>  File "/usr/lib/python2.7/site-packages/celery/bootsteps.py", line 369, in start
>    return self.obj.start()
>  File "/usr/lib/python2.7/site-packages/celery/worker/consumer/consumer.py", line 322, in start
>    blueprint.start(self)
>  File "/usr/lib/python2.7/site-packages/celery/bootsteps.py", line 119, in start
>    step.start(parent)
>  File "/usr/lib/python2.7/site-packages/celery/worker/consumer/tasks.py", line 41, in start
>    c.connection, on_decode_error=c.on_decode_error,
>  File "/usr/lib/python2.7/site-packages/celery/app/amqp.py", line 297, in TaskConsumer
>    **kw
>  File "/usr/lib/python2.7/site-packages/kombu/messaging.py", line 386, in __init__
>    self.revive(self.channel)
>  File "/usr/lib/python2.7/site-packages/kombu/messaging.py", line 408, in revive
>    self.declare()
>  File "/usr/lib/python2.7/site-packages/kombu/messaging.py", line 421, in declare
>    queue.declare()
>  File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 605, in declare
>    self._create_queue(nowait=nowait, channel=channel)
>  File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 614, in _create_queue
>    self.queue_declare(nowait=nowait, passive=False, channel=channel)
>  File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 649, in queue_declare
>    nowait=nowait,
>  File "/usr/lib/python2.7/site-packages/kombu/transport/virtual/base.py", line 531, in queue_declare
>    self._new_queue(queue, **kwargs)
>  File "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py", line 82, in _new_queue
>    self._get_or_create(queue)
>  File "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py", line 70, in _get_or_create
>    obj = self.session.query(self.queue_cls) \
>  File "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py", line 65, in session
>    _, Session = self._open()
>  File "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py", line 56, in _open
>    engine = self._engine_from_config()
>  File "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py", line 51, in _engine_from_config
>    return create_engine(conninfo.hostname, **transport_options)
>  File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/__init__.py", line 391, in create_engine
>    return strategy.create(*args, **kwargs)
>  File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/strategies.py", line 160, in create
>    engineclass.__name__))
> TypeError: Invalid argument(s) 'ssl_key','ssl_cert','ssl_active','visibility_timeout','ssl_cacert' sent to create_engine(), using configuration MySQLDialect_mysqldb/QueuePool/Engine.  Please check that the keyword arguments are appropriate for this combination of components.
> 
> -------------- celery@qa1 v4.2.0rc3 (windowlicker)
> ---- **** ----- 
> --- * ***  * -- Linux-4.13.0-16-generic-x86_64-with-centos-7.3.1611-Core 2018-05-21 07:46:12
> -- * - **** --- 
> - ** ---------- [config]
> - ** ---------- .> app:         airflow.executors.celery_executor:0x4766d50
> - ** ---------- .> transport:   sqla+mysql://airflow:blah@localhost:3306/mydb
> - ** ---------- .> results:     mysql://airflow:**@localhost:3306/airflow
> - *** --- * --- .> concurrency: 16 (prefork)
> -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
> --- ***** ----- 
> -------------- [queues]
>                .> airflow_celery   exchange=airflow_celery(direct) key=airflow_celery
> 
> 
> 
> What is the correct way to override the celery_broker_transport_options?
> I thought that having an empty section in airflow.cfg would be enough?
> 
> I thought that this was fixed with: https://github.com/apache/incubator-airflow/pull/2842
> 
> 
> I cannot pass visibilty_timeout or ssl_key to a mysql backend.
> --
> Craig
> 
> 
> 
> 
> 
> 
>