You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Kevin Reilly (JIRA)" <ji...@apache.org> on 2018/01/18 03:23:00 UTC
[jira] [Created] (AIRFLOW-2011) Airflow ampq pool maintains dead
connections
Kevin Reilly created AIRFLOW-2011:
-------------------------------------
Summary: Airflow ampq pool maintains dead connections
Key: AIRFLOW-2011
URL: https://issues.apache.org/jira/browse/AIRFLOW-2011
Project: Apache Airflow
Issue Type: Bug
Components: celery, scheduler
Affects Versions: 1.9.1
Environment: OS: Ubuntu 16.04 LTS (debian)
Python: 3.6.3
Airflow: 1.9.1rc1
Reporter: Kevin Reilly
Airflow scheduler deadlocks on queue-up for tasks
[2018-01-08 07:01:09,315] \{{celery_executor.py:101}} ERROR - Error syncing the celery executor, ignoring it:
[2018-01-08 07:01:09,315] \{{celery_executor.py:102}} ERROR - [Errno 104] Connection reset by peer
Traceback (most recent call last):
File "/usr/local/lib/python3.6/dist-packages/airflow/executors/celery_executor.py", line 83, in
state = async.state
File "/usr/local/lib/python3.6/dist-packages/celery/result.py", line 436, in state
return self._get_task_meta()['status']
File "/usr/local/lib/python3.6/dist-packages/celery/result.py", line 375, in _get_task_meta
return self._maybe_set_cache(self.backend.get_task_meta(self.id))
File "/usr/local/lib/python3.6/dist-packages/celery/backends/rpc.py", line 244, in get_task_meta
for acc in self._slurp_from_queue(task_id, self.accept, backlog_limit):
File "/usr/local/lib/python3.6/dist-packages/celery/backends/rpc.py", line 278, in
binding.declare()
File "/usr/local/lib/python3.6/dist-packages/kombu/entity.py", line 605, in declare
self._create_queue(nowait=nowait, channel=channel)
File "/usr/local/lib/python3.6/dist-packages/kombu/entity.py", line 614, in _create_queue
self.queue_declare(nowait=nowait, passive=False, channel=channel)
File "/usr/local/lib/python3.6/dist-packages/kombu/entity.py", line 649, in queue_declare
nowait=nowait,
File "/usr/local/lib/python3.6/dist-packages/amqp/channel.py", line 1147, in queue_declare
nowait, arguments),
File "/usr/local/lib/python3.6/dist-packages/amqp/abstract_channel.py", line 50, in send_method
conn.frame_writer(1, self.channel_id, sig, args, content)
File "/usr/local/lib/python3.6/dist-packages/amqp/method_framing.py", line 166, in write_frame
write(view[:offset])
File "/usr/local/lib/python3.6/dist-packages/amqp/transport.py", line 258, in write
self._write(s)
ConnectionResetError: [Errno 104] Connection reset by peer
If I edit the celery settings file and add an argument to set
broker_pool_limit=None
editing default_celery.py
and adding
"broker_pool_limit":None,
between lines 37 and 38 would solve the issue. This particular setting requires celery to create a new ampq connection each time it needs one, thereby preventing the rabbitmq server from disconnecting the connection where the client is unaware and leaving broken sockets open for use.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)