You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@airflow.apache.org by Toomas Aas <To...@reach-u.com> on 2022/03/11 13:57:38 UTC

High Availability of queues in RabbitMQ

Greetings!

I apologise in advance for the long-winded question, or if this is more 
a RabbitMQ than Airflow question.

I am setting up a proof-of-concept multi-node cluster of Airflow 2.2.3 
with Celery executor. The aspect that I'm currently working on is high 
availability of RabbitMQ.

I have set up a 3-node RabbitMQ cluster which is fronted by HAProxy. 
HAProxy and RabbitMQ are actually running on the same machines that are 
running Airflow. On each machine there is a RabbitMQ server and HAProxy 
which is configured to direct traffic to all 3 nodes. broker_url in 
airflow.cfg is configured to point to HAProxy port on localhost.

RabbitMQ cluster itself looks perfectly healthy if I check it with 
'rabbitmqctl cluster_status'. Also the failover of tcp connection seems 
to be working - if I shut down rabbitmq on one machine, I can see in 
system logs that after a short pause the Airflow worker successfully 
reconnects with amqp.

But there is a problem with queues. Currently all the queues that exist 
in my rabbitmq setup are 'classic' queues. This means that if the 
rabbitmq node that hosts the queue goes down, the queue is not available 
and Airflow is sad. This is what happened when I shut down rabbitmq on 
one machine:

--------------------------------------------------------------------
[2022-03-11 12:48:40,913: ERROR/MainProcess] consumer: Cannot connect to 
amqp://airflow_ci:**@127.0.0.1:5673/airflow_ci: Server unexpectedly 
closed connection.
Mar 11 12:48:40 Trying again in 4.00 seconds... (2/100)
Mar 11 12:48:44 [2022-03-11 12:48:44,940: INFO/MainProcess] Connected to 
amqp://airflow_ci:**@127.0.0.1:5673/airflow_ci
[2022-03-11 12:48:44,961: INFO/MainProcess] mingle: searching for neighbors
[2022-03-11 12:48:45,997: INFO/MainProcess] mingle: all alone
[2022-03-11 12:48:46,005: CRITICAL/MainProcess] Unrecoverable error: 
NotFound(404, "NOT_FOUND - home node 'rabbit@ci-91-col' of durable queue 
'default' in vhost 'airflow_ci' is down or inaccessible", (50, 10), 
'Queue.declare')
--------------------------------------------------------------------

What I am looking for is a way to achieve the situation where the queues 
created by Airflow are not 'classic' queues but 'quorum' or 'mirrored' 
queues. I guess I could manually create a quorum queue in RabbitMQ 
management UI and set this as default_queue in airflow.cfg, but is this 
a proper way? Or is there perhaps an entirely different way for 
surviving a rabbitmq node going down that I'm not thinking of?

Thank you in advance for any pointers.
-- 
Toomas Aas

Re: High Availability of queues in RabbitMQ

Posted by Toomas Aas <To...@reach-u.com>.

Toomas Aas
support engineer | reach-u.com

On 3/14/22 17:31, Toomas Aas wrote:

> As I didn't have better ideas, I created a quorum queue in RabbitMQ and
> set it as default_queue in airflow.cfg. Then I started airflow celery
> worker on two nodes.
> 
> On one node, the worker started successfully, but on the other node it
> failed with error below.
> 
> Has anyone gotten Airflow working successfully with quorum queues in
> rabbitmq?
> 
> INFO/MainProcess] mingle: sync complete
> CRITICAL/MainProcess] Unrecoverable error: PreconditionFailed(406,
> "PRECONDITION_FAILED - inequivalent arg 'x-queue-type' for queue
> 'airflow-default' in vhost 'airflow_ci': received none but current is
> the value 'quorum' of type 'longstr'", (50, 10), 'Queue.declare')

For the sake of future readers: I switched from quorum queues to classic 
queue mirroring and this seems to work. I can shut down one or two 
rabbitmq instances in my 3-node cluster and Airflow keeps working.

This is maybe not ideal, because RabbitMQ website says that classic 
queue mirroring will be removed in a future version of RabbitMQ and 
quorum queues should be used instead, but I guess that's a problem for 
the future.

Best regards,
--
Toomas Aas

Re: High Availability of queues in RabbitMQ

Posted by Toomas Aas <To...@reach-u.com>.

On 3/11/22 15:57, Toomas Aas wrote:
> 
> 
> I am setting up a proof-of-concept multi-node cluster of Airflow 2.2.3
> with Celery executor. The aspect that I'm currently working on is high
> availability of RabbitMQ.
> 
> I have set up a 3-node RabbitMQ cluster which is fronted by HAProxy.
> HAProxy and RabbitMQ are actually running on the same machines that are
> running Airflow. On each machine there is a RabbitMQ server and HAProxy
> which is configured to direct traffic to all 3 nodes. broker_url in
> airflow.cfg is configured to point to HAProxy port on localhost.
> 
> RabbitMQ cluster itself looks perfectly healthy if I check it with
> 'rabbitmqctl cluster_status'. Also the failover of tcp connection seems
> to be working - if I shut down rabbitmq on one machine, I can see in
> system logs that after a short pause the Airflow worker successfully
> reconnects with amqp.
> 
> But there is a problem with queues. Currently all the queues that exist
> in my rabbitmq setup are 'classic' queues. This means that if the
> rabbitmq node that hosts the queue goes down, the queue is not available
> and Airflow is sad. This is what happened when I shut down rabbitmq on
> one machine:
> 
> --------------------------------------------------------------------
> [2022-03-11 12:48:40,913: ERROR/MainProcess] consumer: Cannot connect to
> amqp://airflow_ci:**@127.0.0.1:5673/airflow_ci: Server unexpectedly
> closed connection.
> Mar 11 12:48:40 Trying again in 4.00 seconds... (2/100)
> Mar 11 12:48:44 [2022-03-11 12:48:44,940: INFO/MainProcess] Connected to
> amqp://airflow_ci:**@127.0.0.1:5673/airflow_ci
> [2022-03-11 12:48:44,961: INFO/MainProcess] mingle: searching for neighbors
> [2022-03-11 12:48:45,997: INFO/MainProcess] mingle: all alone
> [2022-03-11 12:48:46,005: CRITICAL/MainProcess] Unrecoverable error:
> NotFound(404, "NOT_FOUND - home node 'rabbit@ci-91-col' of durable queue
> 'default' in vhost 'airflow_ci' is down or inaccessible", (50, 10),
> 'Queue.declare')
> --------------------------------------------------------------------
> 
> What I am looking for is a way to achieve the situation where the queues
> created by Airflow are not 'classic' queues but 'quorum' or 'mirrored'
> queues. I guess I could manually create a quorum queue in RabbitMQ
> management UI and set this as default_queue in airflow.cfg, but is this
> a proper way? Or is there perhaps an entirely different way for
> surviving a rabbitmq node going down that I'm not thinking of?
> 

As I didn't have better ideas, I created a quorum queue in RabbitMQ and 
set it as default_queue in airflow.cfg. Then I started airflow celery 
worker on two nodes.

On one node, the worker started successfully, but on the other node it 
failed with error below.

Has anyone gotten Airflow working successfully with quorum queues in 
rabbitmq?

INFO/MainProcess] mingle: sync complete
CRITICAL/MainProcess] Unrecoverable error: PreconditionFailed(406, 
"PRECONDITION_FAILED - inequivalent arg 'x-queue-type' for queue 
'airflow-default' in vhost 'airflow_ci': received none but current is 
the value 'quorum' of type 'longstr'", (50, 10), 'Queue.declare')
Traceback (most recent call last):
    File 
"/opt/virtualenv/dg_etl/lib/python3.9/site-packages/celery/worker/worker.py", 
line 203, in start
      self.blueprint.start(self)
    File 
"/opt/virtualenv/dg_etl/lib/python3.9/site-packages/celery/bootsteps.py", line 
116, in start
      step.start(parent)
    File 
"/opt/virtualenv/dg_etl/lib/python3.9/site-packages/celery/bootsteps.py", line 
365, in start
      return self.obj.start()
    File 
"/opt/virtualenv/dg_etl/lib/python3.9/site-packages/celery/worker/consumer/consumer.py", 
line 326, in start
      blueprint.start(self)
    File 
"/opt/virtualenv/dg_etl/lib/python3.9/site-packages/celery/bootsteps.py", line 
116, in start
      step.start(parent)
    File 
"/opt/virtualenv/dg_etl/lib/python3.9/site-packages/celery/worker/consumer/tasks.py", 
line 38, in start
      c.task_consumer = c.app.amqp.TaskConsumer(
    File 
"/opt/virtualenv/dg_etl/lib/python3.9/site-packages/celery/app/amqp.py", 
line 274, in TaskConsumer
      return self.Consumer(
    File 
"/opt/virtualenv/dg_etl/lib/python3.9/site-packages/kombu/messaging.py", 
line 387, in __init__
      self.revive(self.channel)
    File 
"/opt/virtualenv/dg_etl/lib/python3.9/site-packages/kombu/messaging.py", 
line 409, in revive
      self.declare()
    File 
"/opt/virtualenv/dg_etl/lib/python3.9/site-packages/kombu/messaging.py", 
line 422, in declare
      queue.declare()
    File 
"/opt/virtualenv/dg_etl/lib/python3.9/site-packages/kombu/entity.py", 
line 606, in declare
      self._create_queue(nowait=nowait, channel=channel)
    File 
"/opt/virtualenv/dg_etl/lib/python3.9/site-packages/kombu/entity.py", 
line 615, in _create_queue
      self.queue_declare(nowait=nowait, passive=False, channel=channel)
    File 
"/opt/virtualenv/dg_etl/lib/python3.9/site-packages/kombu/entity.py", 
line 643, in queue_declare
      ret = channel.queue_declare(
    File 
"/opt/virtualenv/dg_etl/lib/python3.9/site-packages/amqp/channel.py", 
line 1146, in queue_declare
      return queue_declare_ok_t(*self.wait(
    File 
"/opt/virtualenv/dg_etl/lib/python3.9/site-packages/amqp/abstract_channel.py", 
line 86, in wait
      self.connection.drain_events(timeout=timeout)
    File 
"/opt/virtualenv/dg_etl/lib/python3.9/site-packages/amqp/connection.py", 
line 519, in drain_events
      while not self.blocking_read(timeout):
    File 
"/opt/virtualenv/dg_etl/lib/python3.9/site-packages/amqp/connection.py", 
line 525, in blocking_read
      return self.on_inbound_frame(frame)
    File 
"/opt/virtualenv/dg_etl/lib/python3.9/site-packages/amqp/method_framing.py", 
line 53, in on_frame
      callback(channel, method_sig, buf, None)
    File 
"/opt/virtualenv/dg_etl/lib/python3.9/site-packages/amqp/connection.py", 
line 531, in on_inbound_method
      return self.channels[channel_id].dispatch_method(
    File 
"/opt/virtualenv/dg_etl/lib/python3.9/site-packages/amqp/abstract_channel.py", 
line 143, in dispatch_method
      listener(*args)
    File 
"/opt/virtualenv/dg_etl/lib/python3.9/site-packages/amqp/channel.py", 
line 277, in _on_close
      raise error_for_code(
  amqp.exceptions.PreconditionFailed: Queue.declare: (406) 
PRECONDITION_FAILED - inequivalent arg 'x-queue-type' for queue 
'airflow-default' in vhost 'airflow_ci': received none but current is 
the value 'quorum' of type 'longstr'


Thanks in advance,
--
Toomas Aas