You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/10/01 17:43:24 UTC

[GitHub] [airflow] sudarshan2906 opened a new issue #11225: celery executer with SQS

sudarshan2906 opened a new issue #11225:
URL: https://github.com/apache/airflow/issues/11225


   Hi I am using airflow 1.10.12 with celery executor with SQS. When configuring `predefined_queues` in celery as mentioned [here](https://docs.celeryproject.org/en/latest/getting-started/brokers/sqs.html#predefined-queues) I am getting the bellow error:
   
   ```
   [2020-10-01 17:37:54,498: CRITICAL/MainProcess] Unrecoverable error: AttributeError("'str' object has no attribute 'items'")
   Traceback (most recent call last):
     File "/usr/local/lib/python3.7/site-packages/kombu/transport/virtual/base.py", line 921, in create_channel
       return self._avail_channels.pop()
   IndexError: pop from empty list
   
   During handling of the above exception, another exception occurred:
   
   Traceback (most recent call last):
     File "/usr/local/lib/python3.7/site-packages/celery/worker/worker.py", line 208, in start
       self.blueprint.start(self)
     File "/usr/local/lib/python3.7/site-packages/celery/bootsteps.py", line 119, in start
       step.start(parent)
     File "/usr/local/lib/python3.7/site-packages/celery/bootsteps.py", line 369, in start
       return self.obj.start()
     File "/usr/local/lib/python3.7/site-packages/celery/worker/consumer/consumer.py", line 318, in start
       blueprint.start(self)
     File "/usr/local/lib/python3.7/site-packages/celery/bootsteps.py", line 119, in start
       step.start(parent)
     File "/usr/local/lib/python3.7/site-packages/celery/worker/consumer/connection.py", line 23, in start
       c.connection = c.connect()
     File "/usr/local/lib/python3.7/site-packages/celery/worker/consumer/consumer.py", line 405, in connect
       conn = self.connection_for_read(heartbeat=self.amqheartbeat)
     File "/usr/local/lib/python3.7/site-packages/celery/worker/consumer/consumer.py", line 412, in connection_for_read
       self.app.connection_for_read(heartbeat=heartbeat))
     File "/usr/local/lib/python3.7/site-packages/celery/worker/consumer/consumer.py", line 439, in ensure_connected
       callback=maybe_shutdown,
     File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 389, in ensure_connection
       self._ensure_connection(*args, **kwargs)
     File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 445, in _ensure_connection
       callback, timeout=timeout
     File "/usr/local/lib/python3.7/site-packages/kombu/utils/functional.py", line 344, in retry_over_time
       return fun(*args, **kwargs)
     File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 874, in _connection_factory
       self._connection = self._establish_connection()
     File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 809, in _establish_connection
       conn = self.transport.establish_connection()
     File "/usr/local/lib/python3.7/site-packages/kombu/transport/virtual/base.py", line 941, in establish_connection
       self._avail_channels.append(self.create_channel(self))
     File "/usr/local/lib/python3.7/site-packages/kombu/transport/virtual/base.py", line 923, in create_channel
       channel = self.Channel(connection)
     File "/usr/local/lib/python3.7/site-packages/kombu/transport/SQS.py", line 134, in __init__
       self._update_queue_cache(self.queue_name_prefix)
     File "/usr/local/lib/python3.7/site-packages/kombu/transport/SQS.py", line 140, in _update_queue_cache
       for queue_name, q in self.predefined_queues.items():
   AttributeError: 'str' object has no attribute 'items'
   
   ```
   
   airflow.cfg:
   
   ```
   [celery_broker_transport_options]
   'predefined_queues': = { 'my-q': {
               'url': 'https://ap-southeast-2.queue.amazonaws.com/123456/my-q',
               'access_key_id': 'xxx',
               'secret_access_key': 'xxx',
           }
       }
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] charliegriefer commented on issue #11225: celery executer with SQS and predefined_queues

Posted by GitBox <gi...@apache.org>.
charliegriefer commented on issue #11225:
URL: https://github.com/apache/airflow/issues/11225#issuecomment-901995246


   @sudarshan2906 - can you show the `celery_config_options` line in your `airflow.cfg`? I'm doing something similar, but getting an error on the scheduler saying:
   
   > airflow.exceptions.AirflowConfigException: The object could not be loaded. Please check "celery_config_options" key in "celery" section. Current value: "celery_config.MY_CELERY_CONFIG".
   
   My `celery_config` is in my AIRFLOW_HOME, and currently it's pretty basic:
   
   ```
   from airflow.config_templates.default_celery import DEFAULT_CELERY_CONFIG
   
   MY_CELERY_CONFIG = {
       **DEFAULT_CELERY_CONFIG
   }
   ```
   
   Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] chris-french commented on issue #11225: celery executer with SQS and predefined_queues

Posted by GitBox <gi...@apache.org>.
chris-french commented on issue #11225:
URL: https://github.com/apache/airflow/issues/11225#issuecomment-704365226


   @sudarshan2906 @mik-laj  -- did you find a solution for this? 
   
   It seems like you would need to make a small change to [`setup.py`](https://github.com/apache/airflow/blob/c803a27e44da071b5a2c7fc2ce2a7951501753c1/setup.py#L203) to install sqs support for celery in Airflow:
   
   ```
       'celery[sqs]~=4.4.2',
   ```
   
   source: https://docs.celeryproject.org/en/4.4.2/getting-started/brokers/sqs.html?highlight=sqs


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] nicnguyen3103 commented on issue #11225: celery executer with SQS and predefined_queues

Posted by GitBox <gi...@apache.org>.
nicnguyen3103 commented on issue #11225:
URL: https://github.com/apache/airflow/issues/11225#issuecomment-997587302


   For anyone who is still struggling with the import path of celery config here is how to import it: 
   
   1. Create a celery_config_extend.py and put it in AIRFLOW_HOME folder. By default, airflow will have this path added in the sys.path https://airflow.apache.org/docs/apache-airflow/stable/modules_management.html.
   2. Setup the script by like @charliegriefer or other comments above. If you want to keep the original config and extends some extra config you can write the config like this, it will keep the original DEFAULT_CELERY_CONFIG and add/replace the key:value when applicable
   ```
   from airflow.config_templates.default_celery import DEFAULT_CELERY_CONFIG
   
   CELERY_CONFIG = dict(
       DEFAULT_CELERY_CONFIG,
       **{
           'key1': value1, # add new key value pair or replace the existing one if key is in DEFAULT_CELERY_CONFIG
           ....
       }
   )
   ```
   3. Set the environment variable `AIRFLOW__CELERY__CELERY_CONFIG_OPTIONS=airflow.celery_default_config.CELERY_CONFIG` to import your new config
   
   Airflow now should pick up your new celery config and apply it to the worker
    
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] sudarshan2906 commented on issue #11225: celery executer with SQS and predefined_queues

Posted by GitBox <gi...@apache.org>.
sudarshan2906 commented on issue #11225:
URL: https://github.com/apache/airflow/issues/11225#issuecomment-702368780


   Thanks. I am using my celery configuration file for now. 
   
   But do you think we can add `predefined_queues` in DEFAULT_CELERY_CONFIG . That when `predefined_queues` is passed in celery config it will convert it to json. Its quite a important config for celery when using sqs as queue.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on issue #11225: celery executer with SQS and predefined_queues

Posted by GitBox <gi...@apache.org>.
mik-laj commented on issue #11225:
URL: https://github.com/apache/airflow/issues/11225#issuecomment-703618579


   @auvipy can you share some details about Celery 5?  What is the status of?  Should we think about migration? why is it worth migrating?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on issue #11225: celery executer with SQS

Posted by GitBox <gi...@apache.org>.
mik-laj commented on issue #11225:
URL: https://github.com/apache/airflow/issues/11225#issuecomment-702357824


   This section only accepts strings as values. You need to use "celery_config_options" to set-up other types. See: https://github.com/apache/airflow/blob/master/airflow/executors/celery_executor.py#L66-L69


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] auvipy commented on issue #11225: celery executer with SQS and predefined_queues

Posted by GitBox <gi...@apache.org>.
auvipy commented on issue #11225:
URL: https://github.com/apache/airflow/issues/11225#issuecomment-997627238


   can anyone check this if it is helpful https://github.com/celery/kombu/pull/1450 ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] sudarshan2906 commented on issue #11225: celery executer with SQS and predefined_queues

Posted by GitBox <gi...@apache.org>.
sudarshan2906 commented on issue #11225:
URL: https://github.com/apache/airflow/issues/11225#issuecomment-704386220


   @chris-french Yup, I got it working using celery version to be 4.4.2. But the predefined_queues property was not supported by airflow as its a dict format. I used a celery_config file and changed the celery_config location in `airflow.cfg`
   
   celery_config.py
   ```
   from airflow.config_templates.default_celery import DEFAULT_CELERY_CONFIG
   broker_transport_options = {'visibility_timeout': 21600,
                                                 'predefined_queues': {
                                                                                 'sqs_queue_name: {'url': 'sqs_queue_url'}}}
   
   DEFAULT_CELERY_CONFIG['broker_transport_options'] = broker_transport_options
   CELERY_CONFIG = DEFAULT_CELERY_CONFIG
   ```
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj edited a comment on issue #11225: celery executer with SQS

Posted by GitBox <gi...@apache.org>.
mik-laj edited a comment on issue #11225:
URL: https://github.com/apache/airflow/issues/11225#issuecomment-702357824


   This section only accepts strings as values. You need to use "celery_config_options" to set-up other types. See: https://github.com/apache/airflow/blob/3ca11eb9b02a2c2591292fd6b76e0e98b8f22656/airflow/executors/celery_executor.py#L66-L69


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] sudarshan2906 commented on issue #11225: celery executer with SQS and predefined_queues

Posted by GitBox <gi...@apache.org>.
sudarshan2906 commented on issue #11225:
URL: https://github.com/apache/airflow/issues/11225#issuecomment-703613465


   well `predefined_queues` is supported in celery version `5.0.0`. But airflow doesn't supports that version. I can't find any ticket created for that. Will this be upgraded in airflow `2.0.0`?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on issue #11225: celery executer with SQS and predefined_queues

Posted by GitBox <gi...@apache.org>.
mik-laj commented on issue #11225:
URL: https://github.com/apache/airflow/issues/11225#issuecomment-703467160


   I have no experience with SQS, but it sounds like a good solution.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] sudarshan2906 edited a comment on issue #11225: celery executer with SQS and predefined_queues

Posted by GitBox <gi...@apache.org>.
sudarshan2906 edited a comment on issue #11225:
URL: https://github.com/apache/airflow/issues/11225#issuecomment-704386220


   @chris-french Yup, I got it working using celery version to be 4.4.2. But the predefined_queues property was not supported by airflow as its a dict format. I used a celery_config file and changed the celery_config location in `airflow.cfg`
   
   celery_config.py
   ```
   from airflow.config_templates.default_celery import DEFAULT_CELERY_CONFIG
   broker_transport_options = {'visibility_timeout': 21600,
                               'predefined_queues': {
                                   'sqs_queue_name': {
                                       'url': 'sqs_queue_url'}}}
   
   DEFAULT_CELERY_CONFIG['broker_transport_options'] = broker_transport_options
   CELERY_CONFIG = DEFAULT_CELERY_CONFIG
   ```
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] charliegriefer edited a comment on issue #11225: celery executer with SQS and predefined_queues

Posted by GitBox <gi...@apache.org>.
charliegriefer edited a comment on issue #11225:
URL: https://github.com/apache/airflow/issues/11225#issuecomment-901995246


   @sudarshan2906 - can you show the `celery_config_options` line in your `airflow.cfg`? I'm doing something similar, but getting an error on the scheduler saying:
   
   > airflow.exceptions.AirflowConfigException: The object could not be loaded. Please check "celery_config_options" key in "celery" section. Current value: "celery_config.MY_CELERY_CONFIG".
   
   My `celery_config` is in my AIRFLOW_HOME, and currently it's pretty basic:
   
   ```
   from airflow.config_templates.default_celery import DEFAULT_CELERY_CONFIG
   
   MY_CELERY_CONFIG = {
       **DEFAULT_CELERY_CONFIG
   }
   ```
   
   EDIT to add, an exception higher up in the stack trace: `[2021-08-19 09:19:06,303] {configuration.py:468} ERROR - No module named 'celery_config'`, so it's just not finding the file.  
   
   The specific line in my airflow.cfg: `celery_config_options = celery_config.MY_CELERY_CONFIG`
   
   
   Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org