You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/03/26 14:03:55 UTC

[camel] branch camel-2.x updated: CAMEL-14789: camel-rabbitmq - Automatic recovery of temporary reply queue is not handled correctly - Add QueueRecoveryListener to update replyTo and rebind the new temporary queue - Add integration test using RabbitMQ Management HTTP API

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch camel-2.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-2.x by this push:
     new d5a6d97  CAMEL-14789: camel-rabbitmq - Automatic recovery of temporary reply queue is not handled correctly - Add QueueRecoveryListener to update replyTo and rebind the new temporary queue - Add integration test using RabbitMQ Management HTTP API
     new eddcb07  Merge pull request #3685 from rszczesiak/camel-2.x
d5a6d97 is described below

commit d5a6d9744a57df3264669eb6f7ae2e90cce55b2f
Author: Robert Szczesiak <ro...@gmail.com>
AuthorDate: Thu Mar 26 14:35:23 2020 +0100

    CAMEL-14789: camel-rabbitmq - Automatic recovery of temporary reply queue is not handled correctly
    - Add QueueRecoveryListener to update replyTo and rebind the new temporary queue
    - Add integration test using RabbitMQ Management HTTP API
---
 components/camel-rabbitmq/pom.xml                  |   5 +
 components/camel-rabbitmq/readme.txt               |   2 +-
 .../rabbitmq/reply/TemporaryQueueReplyManager.java |  17 ++
 .../RabbitMQTemporaryQueueAutoRecoveryIntTest.java | 179 +++++++++++++++++++++
 4 files changed, 202 insertions(+), 1 deletion(-)

diff --git a/components/camel-rabbitmq/pom.xml b/components/camel-rabbitmq/pom.xml
index dd16492..1ee5939 100644
--- a/components/camel-rabbitmq/pom.xml
+++ b/components/camel-rabbitmq/pom.xml
@@ -104,6 +104,11 @@
       <artifactId>log4j-slf4j-impl</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.camel</groupId>
+      <artifactId>camel-http4</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/components/camel-rabbitmq/readme.txt b/components/camel-rabbitmq/readme.txt
index ef005d5..d812391 100644
--- a/components/camel-rabbitmq/readme.txt
+++ b/components/camel-rabbitmq/readme.txt
@@ -7,7 +7,7 @@ The integration tests requires a running RabbitMQ broker.
 
 The broker can be run via Docker:
 
-    docker run -it -p 5672:5672 -e RABBITMQ_DEFAULT_USER=cameltest -e RABBITMQ_DEFAULT_PASS=cameltest --hostname my-rabbit --name some-rabbit rabbitmq:3
+    docker run -it -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=cameltest -e RABBITMQ_DEFAULT_PASS=cameltest --hostname my-rabbit --name some-rabbit rabbitmq:3-management
 
 Or to install RabbitMQ as standalone and then configure it:
 
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyManager.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyManager.java
index b375e22..a54b4f2 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyManager.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyManager.java
@@ -24,6 +24,7 @@ import com.rabbitmq.client.AMQP.Queue.DeclareOk;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.Envelope;
+import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
@@ -94,6 +95,22 @@ public class TemporaryQueueReplyManager extends ReplyManagerSupport {
         //TODO check for the RabbitMQConstants.EXCHANGE_NAME header
         channel.queueBind(getReplyTo(), endpoint.getExchangeName(), getReplyTo());
 
+        //Add QueueRecoveryListener to notify when temporary queue name changes due to recovery
+        if (conn instanceof AutorecoveringConnection) {
+            ((AutorecoveringConnection) conn).addQueueRecoveryListener((oldName, newName) -> {
+                log.debug("Temporary queue name {} was changed to {}. Updating replyTo.", oldName, newName);
+                setReplyTo(newName);
+
+                log.debug("Trying to rebind the new temporary queue to update routingKey");
+                try {
+                    channel.queueBind(newName, endpoint.getExchangeName(), newName);
+                    channel.queueUnbind(newName, endpoint.getExchangeName(), oldName);
+                } catch (IOException e) {
+                    log.warn("Failed to bind or unbind a queue. This exception is ignored.", e);
+                }
+            });
+        }
+
         consumer = new RabbitConsumer(this, channel);
         consumer.start();
 
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQTemporaryQueueAutoRecoveryIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQTemporaryQueueAutoRecoveryIntTest.java
new file mode 100644
index 0000000..a62a85a
--- /dev/null
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQTemporaryQueueAutoRecoveryIntTest.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rabbitmq;
+
+import java.util.stream.StreamSupport;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+/**
+ * Integration test to check if temporary queue's name change is properly handled after auto recovering
+ * caused by connection failure.
+ * This test takes advantage of RabbitMQ Management HTTP API provided by RabbitMQ Management Plugin.
+ */
+public class RabbitMQTemporaryQueueAutoRecoveryIntTest extends AbstractRabbitMQIntTest {
+
+    private static final String EXCHANGE = "ex_temp-queue-test";
+    private static final String QUEUE = "q_temp-queue-test";
+    private static final String ROUTING_KEY = "k_temp-queue-test";
+    private static final String TEMP_QUEUE_NAME = "tempQueueName";
+    private static final String TEMP_QUEUE_CONN_NAME = "tempQueueConnName";
+    private static final String REQUEST = "Foo request";
+    private static final String REPLY = "Bar reply";
+
+    @Produce(uri = "direct:rabbitMQ")
+    protected ProducerTemplate directRabbitMQProducer;
+
+    @Produce(uri = "direct:rabbitMQApi-forceCloseConnection")
+    protected ProducerTemplate forceCloseConnectionProducer;
+
+    @Produce(uri = "direct:rabbitMQApi-getExchangeBindings")
+    protected ProducerTemplate getExchangeBindingsProducer;
+
+    @EndpointInject(uri = "rabbitmq:" + EXCHANGE + "?addresses=localhost:5672&username=cameltest&password=cameltest"
+            + "&autoAck=false&queue=" + QUEUE + "&routingKey=" + ROUTING_KEY)
+    private Endpoint rabbitMQEndpoint;
+
+    @EndpointInject(uri = "http4:localhost:15672/api?authMethod=Basic&authUsername=cameltest&authPassword=cameltest")
+    private Endpoint rabbitMQApiEndpoint;
+
+    @EndpointInject(uri = "mock:consuming")
+    private MockEndpoint consumingMockEndpoint;
+
+    @EndpointInject(uri = "mock:producing")
+    private MockEndpoint producingMockEndpoint;
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() {
+
+                log.info("Building routes...");
+
+                from("direct:rabbitMQ")
+                        .id("producingRoute")
+                        .log("Sending message to RabbitMQ broker")
+                        .to(rabbitMQEndpoint)
+                        .to(producingMockEndpoint);
+
+                from(rabbitMQEndpoint)
+                        .id("consumingRoute")
+                        .log("Receiving message from RabbitMQ broker")
+                        .to(consumingMockEndpoint)
+                        .setBody(simple(REPLY));
+
+                from("direct:rabbitMQApi-forceCloseConnection")
+                        .id("forceCloseConnectionRoute")
+                        .log("Getting temporary queue's connection name")
+                        .setHeader(Exchange.HTTP_PATH, simple("/queues/%2F/${header." + TEMP_QUEUE_NAME + "}"))
+                        .setHeader(Exchange.HTTP_METHOD, simple("GET"))
+                        .to(rabbitMQApiEndpoint)
+                        .process(exchange -> {
+                            String responseJsonString = exchange.getMessage().getBody(String.class);
+                            ObjectNode node = new ObjectMapper().readValue(responseJsonString, ObjectNode.class);
+                            String connectionName = node.at("/owner_pid_details/name").asText();
+                            exchange.getMessage().setHeader(TEMP_QUEUE_CONN_NAME, connectionName);
+                        })
+                        .log("Force closing temporary queue's connection")
+                        .setHeader(Exchange.HTTP_PATH, simple("/connections/${header." + TEMP_QUEUE_CONN_NAME + "}"))
+                        .setHeader(Exchange.HTTP_METHOD, simple("DELETE"))
+                        .to(rabbitMQApiEndpoint);
+
+                from("direct:rabbitMQApi-getExchangeBindings")
+                        .id("getExchangeBindingsRoute")
+                        .log("Getting temporary queue's routing key to verify rebinding was successful")
+                        .setHeader(Exchange.HTTP_PATH, simple("/exchanges/%2F/" + EXCHANGE + "/bindings/source"))
+                        .setHeader(Exchange.HTTP_METHOD, simple("GET"))
+                        .to(rabbitMQApiEndpoint)
+                        .process(exchange -> {
+                            String responseJsonString = exchange.getMessage().getBody(String.class);
+                            String tempQueueName = exchange.getMessage().getHeader(TEMP_QUEUE_NAME, String.class);
+                            ArrayNode node = new ObjectMapper().readValue(responseJsonString, ArrayNode.class);
+                            String tempQueueRoutingKey = StreamSupport.stream(node.spliterator(), false)
+                                    .filter(binding -> tempQueueName.equals(binding.get("destination").textValue()))
+                                    .findFirst()
+                                    .map(binding -> binding.get("routing_key").textValue())
+                                    .orElse(null);
+                            exchange.getMessage().setBody(tempQueueRoutingKey);
+                        });
+            }
+        };
+    }
+
+    /**
+     * <p><b>NOTE:</b>Make sure RabbitMQ Management Plugin is enabled
+     * and ConnectionFactory#automaticRecovery is set to <code>true</code> (default)</p>
+     * <ul>
+     * <li>Send first PRC request that automatically creates server-named temporary reply queue</li>
+     * <li>Send another PRC request to verify reply-to property stays the same
+     * if no connection failure occurred</li>
+     * <li>Wait a few seconds to ensure all necessary bindings are created
+     * and seen by the RabbitMQ Management HTTP API</li>
+     * <li>Forcibly close temporary reply queue's connection and wait another few seconds
+     * to let it recover automatically</li>
+     * <li>Send one last RPC request and verify reply-to property is changed
+     * (assuming the new server-generated name will not be exactly the same)</li>
+     * <li>Get new temporary queue's bindings and verify routing key matches queue name</li>
+     * </ul>
+     *
+     * @throws InterruptedException when Thread#sleep is interrupted
+     */
+    @Test
+    public void testReplyToAndBindingsUpdated() throws InterruptedException {
+
+        consumingMockEndpoint.expectedMessageCount(3);
+        producingMockEndpoint.expectedMessageCount(3);
+
+        directRabbitMQProducer.requestBody(REQUEST);
+        String replyToOriginal = consumingMockEndpoint.getExchanges().get(0).getMessage().getHeader(RabbitMQConstants.REPLY_TO, String.class);
+
+        directRabbitMQProducer.requestBody(REQUEST);
+        String replyToVerify = consumingMockEndpoint.getExchanges().get(1).getMessage().getHeader(RabbitMQConstants.REPLY_TO, String.class);
+
+        Thread.sleep(7000);
+
+        forceCloseConnectionProducer.sendBodyAndHeader(null, TEMP_QUEUE_NAME, replyToOriginal);
+        Thread.sleep(7000);
+
+        directRabbitMQProducer.requestBody(REQUEST);
+        String replyToRecovered = consumingMockEndpoint.getExchanges().get(2).getMessage().getHeader(RabbitMQConstants.REPLY_TO, String.class);
+
+        String tempQueueRoutingKey = (String) getExchangeBindingsProducer.requestBodyAndHeader(null, TEMP_QUEUE_NAME, replyToRecovered);
+
+        assertEquals(replyToVerify, replyToOriginal);
+        assertNotEquals(replyToRecovered, replyToOriginal);
+        assertEquals(tempQueueRoutingKey, replyToRecovered);
+        consumingMockEndpoint.assertIsSatisfied();
+        producingMockEndpoint.assertIsSatisfied();
+    }
+}