You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Kevin Reilly (JIRA)" <ji...@apache.org> on 2018/01/18 03:23:00 UTC

[jira] [Created] (AIRFLOW-2011) Airflow ampq pool maintains dead connections

Kevin Reilly created AIRFLOW-2011:
-------------------------------------

             Summary: Airflow ampq pool maintains dead connections
                 Key: AIRFLOW-2011
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-2011
             Project: Apache Airflow
          Issue Type: Bug
          Components: celery, scheduler
    Affects Versions: 1.9.1
         Environment: OS: Ubuntu 16.04 LTS (debian)
Python: 3.6.3
Airflow: 1.9.1rc1

            Reporter: Kevin Reilly


Airflow scheduler deadlocks on queue-up for tasks




[2018-01-08 07:01:09,315] \{{celery_executor.py:101}} ERROR - Error syncing the celery executor, ignoring it:

[2018-01-08 07:01:09,315] \{{celery_executor.py:102}} ERROR - [Errno 104] Connection reset by peer

Traceback (most recent call last):

File "/usr/local/lib/python3.6/dist-packages/airflow/executors/celery_executor.py", line 83, in

state = async.state

File "/usr/local/lib/python3.6/dist-packages/celery/result.py", line 436, in state

return self._get_task_meta()['status']

File "/usr/local/lib/python3.6/dist-packages/celery/result.py", line 375, in _get_task_meta

return self._maybe_set_cache(self.backend.get_task_meta(self.id))

File "/usr/local/lib/python3.6/dist-packages/celery/backends/rpc.py", line 244, in get_task_meta

for acc in self._slurp_from_queue(task_id, self.accept, backlog_limit):

File "/usr/local/lib/python3.6/dist-packages/celery/backends/rpc.py", line 278, in

binding.declare()

File "/usr/local/lib/python3.6/dist-packages/kombu/entity.py", line 605, in declare

self._create_queue(nowait=nowait, channel=channel)

File "/usr/local/lib/python3.6/dist-packages/kombu/entity.py", line 614, in _create_queue

self.queue_declare(nowait=nowait, passive=False, channel=channel)
File "/usr/local/lib/python3.6/dist-packages/kombu/entity.py", line 649, in queue_declare

nowait=nowait,

File "/usr/local/lib/python3.6/dist-packages/amqp/channel.py", line 1147, in queue_declare

nowait, arguments),

File "/usr/local/lib/python3.6/dist-packages/amqp/abstract_channel.py", line 50, in send_method

conn.frame_writer(1, self.channel_id, sig, args, content)

File "/usr/local/lib/python3.6/dist-packages/amqp/method_framing.py", line 166, in write_frame

write(view[:offset])

File "/usr/local/lib/python3.6/dist-packages/amqp/transport.py", line 258, in write

self._write(s)

ConnectionResetError: [Errno 104] Connection reset by peer


If I edit the celery settings file and add an argument to set
broker_pool_limit=None


editing default_celery.py
and adding
"broker_pool_limit":None,
between lines 37 and 38 would solve the issue.  This particular setting requires celery to create a new ampq connection each time it needs one, thereby preventing the rabbitmq server from disconnecting the connection where the client is unaware and leaving broken sockets open for use.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)