You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/03/26 12:08:49 UTC
[camel] branch camel-3.1.x updated: CAMEL-14789: camel-rabbitmq -
Automatic recovery of temporary reply queue is not handled correctly - Add
QueueRecoveryListener to update replyTo and rebind the new temporary queue
- Add integration test using RabbitMQ Management HTTP API
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch camel-3.1.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.1.x by this push:
new a4fde14 CAMEL-14789: camel-rabbitmq - Automatic recovery of temporary reply queue is not handled correctly - Add QueueRecoveryListener to update replyTo and rebind the new temporary queue - Add integration test using RabbitMQ Management HTTP API
a4fde14 is described below
commit a4fde14670eb82cea77c1137ef5b6bfb3c9ca8f3
Author: Robert Szczesiak <ro...@gmail.com>
AuthorDate: Thu Mar 26 11:06:40 2020 +0100
CAMEL-14789: camel-rabbitmq - Automatic recovery of temporary reply queue is not handled correctly
- Add QueueRecoveryListener to update replyTo and rebind the new temporary queue
- Add integration test using RabbitMQ Management HTTP API
---
components/camel-rabbitmq/pom.xml | 5 +
components/camel-rabbitmq/readme.txt | 2 +-
.../rabbitmq/reply/TemporaryQueueReplyManager.java | 17 ++
.../rabbitmq/integration/DockerTestUtils.java | 27 ++--
.../RabbitMQTemporaryQueueAutoRecoveryIntTest.java | 180 +++++++++++++++++++++
5 files changed, 215 insertions(+), 16 deletions(-)
diff --git a/components/camel-rabbitmq/pom.xml b/components/camel-rabbitmq/pom.xml
index 9caf555..fc89c53 100644
--- a/components/camel-rabbitmq/pom.xml
+++ b/components/camel-rabbitmq/pom.xml
@@ -110,6 +110,11 @@
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-http</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<profiles>
diff --git a/components/camel-rabbitmq/readme.txt b/components/camel-rabbitmq/readme.txt
index 3628428..a470537 100644
--- a/components/camel-rabbitmq/readme.txt
+++ b/components/camel-rabbitmq/readme.txt
@@ -11,7 +11,7 @@ The integration tests with Qpid could be run via Maven (disabled by default):
The broker can be run via Docker:
- docker run -it -p 5672:5672 -e RABBITMQ_DEFAULT_USER=cameltest -e RABBITMQ_DEFAULT_PASS=cameltest --hostname my-rabbit --name some-rabbit rabbitmq:3
+ docker run -it -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=cameltest -e RABBITMQ_DEFAULT_PASS=cameltest --hostname my-rabbit --name some-rabbit rabbitmq:3-management
Or to install RabbitMQ as standalone and then configure it:
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyManager.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyManager.java
index a9d959b..fe0e4bc 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyManager.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyManager.java
@@ -24,6 +24,7 @@ import com.rabbitmq.client.AMQP.Queue.DeclareOk;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Envelope;
+import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
@@ -98,6 +99,22 @@ public class TemporaryQueueReplyManager extends ReplyManagerSupport {
//TODO check for the RabbitMQConstants.EXCHANGE_NAME header
channel.queueBind(getReplyTo(), endpoint.getExchangeName(), getReplyTo());
+ //Add QueueRecoveryListener to notify when temporary queue name changes due to recovery
+ if (conn instanceof AutorecoveringConnection) {
+ ((AutorecoveringConnection) conn).addQueueRecoveryListener((oldName, newName) -> {
+ LOG.debug("Temporary queue name {} was changed to {}. Updating replyTo.", oldName, newName);
+ setReplyTo(newName);
+
+ LOG.debug("Trying to rebind the new temporary queue to update routingKey");
+ try {
+ channel.queueBind(newName, endpoint.getExchangeName(), newName);
+ channel.queueUnbind(newName, endpoint.getExchangeName(), oldName);
+ } catch (IOException e) {
+ LOG.warn("Failed to bind or unbind a queue. This exception is ignored.", e);
+ }
+ });
+ }
+
consumer = new RabbitConsumer(this, channel);
consumer.start();
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/DockerTestUtils.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/DockerTestUtils.java
index 2876d70..229f85c 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/DockerTestUtils.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/DockerTestUtils.java
@@ -22,27 +22,24 @@ import org.apache.camel.test.testcontainers.Wait;
import org.testcontainers.containers.GenericContainer;
public class DockerTestUtils {
- public static final String CONTAINER_IMAGE = "rabbitmq:3";
+ public static final String CONTAINER_IMAGE = "rabbitmq:3-management";
public static final String CONTAINER_NAME = "some-rabbit";
- public static final int EXPOSE_PORT = 5672;
+ public static final int EXPOSE_PORT_BROKER = 5672;
+ public static final int EXPOSE_PORT_MANAGEMENT = 15672;
protected DockerTestUtils() {
}
public static GenericContainer rabbitMQContainer() {
- // docker run -it -p 5672:5672
- // -e RABBITMQ_DEFAULT_USER=cameltest
- // -e RABBITMQ_DEFAULT_PASS=cameltest
- // --hostname my-rabbit
- // --name some-rabbit rabbitmq:3
- GenericContainer container = new GenericContainer<>(CONTAINER_IMAGE)
- .withNetworkAliases(CONTAINER_NAME)
- .withExposedPorts(EXPOSE_PORT)
- .withEnv("RABBITMQ_DEFAULT_USER", "cameltest")
- .withEnv("RABBITMQ_DEFAULT_PASS", "cameltest")
- .withCreateContainerCmdModifier(cmd -> cmd.withHostName("my-rabbit"))
- .waitingFor(Wait.forLogMessage(".*Server startup complete.*\n", 1));
- container.setPortBindings(Arrays.asList(String.format("%d:%d", EXPOSE_PORT, EXPOSE_PORT)));
+ // docker run -it -p 5672:5672 -p 15672:15672
+ // -e RABBITMQ_DEFAULT_USER=cameltest
+ // -e RABBITMQ_DEFAULT_PASS=cameltest
+ // --hostname my-rabbit
+ // --name some-rabbit rabbitmq:3-management
+ GenericContainer container = new GenericContainer<>(CONTAINER_IMAGE).withNetworkAliases(CONTAINER_NAME).withExposedPorts(EXPOSE_PORT_BROKER, EXPOSE_PORT_MANAGEMENT)
+ .withEnv("RABBITMQ_DEFAULT_USER", "cameltest").withEnv("RABBITMQ_DEFAULT_PASS", "cameltest").withCreateContainerCmdModifier(cmd -> cmd.withHostName("my-rabbit"))
+ .waitingFor(Wait.forLogMessage(".*Server startup complete.*\n", 1));
+ container.setPortBindings(Arrays.asList(String.format("%d:%d", EXPOSE_PORT_BROKER, EXPOSE_PORT_BROKER), String.format("%d:%d", EXPOSE_PORT_MANAGEMENT, EXPOSE_PORT_MANAGEMENT)));
return container;
}
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQTemporaryQueueAutoRecoveryIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQTemporaryQueueAutoRecoveryIntTest.java
new file mode 100644
index 0000000..c1ca5b8
--- /dev/null
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQTemporaryQueueAutoRecoveryIntTest.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rabbitmq.integration;
+
+import java.util.stream.StreamSupport;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.rabbitmq.RabbitMQConstants;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+/**
+ * Integration test to check if temporary queue's name change is properly handled after auto recovering
+ * caused by connection failure.
+ * This test takes advantage of RabbitMQ Management HTTP API provided by RabbitMQ Management Plugin.
+ */
+public class RabbitMQTemporaryQueueAutoRecoveryIntTest extends AbstractRabbitMQIntTest {
+
+ private static final String EXCHANGE = "ex_temp-queue-test";
+ private static final String QUEUE = "q_temp-queue-test";
+ private static final String ROUTING_KEY = "k_temp-queue-test";
+ private static final String TEMP_QUEUE_NAME = "tempQueueName";
+ private static final String TEMP_QUEUE_CONN_NAME = "tempQueueConnName";
+ private static final String REQUEST = "Foo request";
+ private static final String REPLY = "Bar reply";
+
+ @Produce(uri = "direct:rabbitMQ")
+ protected ProducerTemplate directRabbitMQProducer;
+
+ @Produce(uri = "direct:rabbitMQApi-forceCloseConnection")
+ protected ProducerTemplate forceCloseConnectionProducer;
+
+ @Produce(uri = "direct:rabbitMQApi-getExchangeBindings")
+ protected ProducerTemplate getExchangeBindingsProducer;
+
+ @EndpointInject(uri = "rabbitmq:" + EXCHANGE + "?addresses=localhost:5672&username=cameltest&password=cameltest"
+ + "&autoAck=false&queue=" + QUEUE + "&routingKey=" + ROUTING_KEY)
+ private Endpoint rabbitMQEndpoint;
+
+ @EndpointInject(uri = "http:localhost:15672/api?authMethod=Basic&authUsername=cameltest&authPassword=cameltest")
+ private Endpoint rabbitMQApiEndpoint;
+
+ @EndpointInject(uri = "mock:consuming")
+ private MockEndpoint consumingMockEndpoint;
+
+ @EndpointInject(uri = "mock:producing")
+ private MockEndpoint producingMockEndpoint;
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+
+ return new RouteBuilder() {
+
+ @Override
+ public void configure() {
+
+ log.info("Building routes...");
+
+ from("direct:rabbitMQ")
+ .id("producingRoute")
+ .log("Sending message to RabbitMQ broker")
+ .to(rabbitMQEndpoint)
+ .to(producingMockEndpoint);
+
+ from(rabbitMQEndpoint)
+ .id("consumingRoute")
+ .log("Receiving message from RabbitMQ broker")
+ .to(consumingMockEndpoint)
+ .setBody(simple(REPLY));
+
+ from("direct:rabbitMQApi-forceCloseConnection")
+ .id("forceCloseConnectionRoute")
+ .log("Getting temporary queue's connection name")
+ .setHeader(Exchange.HTTP_PATH, simple("/queues/%2F/${header." + TEMP_QUEUE_NAME + "}"))
+ .setHeader(Exchange.HTTP_METHOD, simple("GET"))
+ .to(rabbitMQApiEndpoint)
+ .process(exchange -> {
+ String responseJsonString = exchange.getMessage().getBody(String.class);
+ ObjectNode node = new ObjectMapper().readValue(responseJsonString, ObjectNode.class);
+ String connectionName = node.at("/owner_pid_details/name").asText();
+ exchange.getMessage().setHeader(TEMP_QUEUE_CONN_NAME, connectionName);
+ })
+ .log("Force closing temporary queue's connection")
+ .setHeader(Exchange.HTTP_PATH, simple("/connections/${header." + TEMP_QUEUE_CONN_NAME + "}"))
+ .setHeader(Exchange.HTTP_METHOD, simple("DELETE"))
+ .to(rabbitMQApiEndpoint);
+
+ from("direct:rabbitMQApi-getExchangeBindings")
+ .id("getExchangeBindingsRoute")
+ .log("Getting temporary queue's routing key to verify rebinding was successful")
+ .setHeader(Exchange.HTTP_PATH, simple("/exchanges/%2F/" + EXCHANGE + "/bindings/source"))
+ .setHeader(Exchange.HTTP_METHOD, simple("GET"))
+ .to(rabbitMQApiEndpoint)
+ .process(exchange -> {
+ String responseJsonString = exchange.getMessage().getBody(String.class);
+ String tempQueueName = exchange.getMessage().getHeader(TEMP_QUEUE_NAME, String.class);
+ ArrayNode node = new ObjectMapper().readValue(responseJsonString, ArrayNode.class);
+ String tempQueueRoutingKey = StreamSupport.stream(node.spliterator(), false)
+ .filter(binding -> tempQueueName.equals(binding.get("destination").textValue()))
+ .findFirst()
+ .map(binding -> binding.get("routing_key").textValue())
+ .orElse(null);
+ exchange.getMessage().setBody(tempQueueRoutingKey);
+ });
+ }
+ };
+ }
+
+ /**
+ * <p><b>NOTE:</b>Make sure RabbitMQ Management Plugin is enabled
+ * and ConnectionFactory#automaticRecovery is set to <code>true</code> (default)</p>
+ * <ul>
+ * <li>Send first PRC request that automatically creates server-named temporary reply queue</li>
+ * <li>Send another PRC request to verify reply-to property stays the same
+ * if no connection failure occurred</li>
+ * <li>Wait a few seconds to ensure all necessary bindings are created
+ * and seen by the RabbitMQ Management HTTP API</li>
+ * <li>Forcibly close temporary reply queue's connection and wait another few seconds
+ * to let it recover automatically</li>
+ * <li>Send one last RPC request and verify reply-to property is changed
+ * (assuming the new server-generated name will not be exactly the same)</li>
+ * <li>Get new temporary queue's bindings and verify routing key matches queue name</li>
+ * </ul>
+ *
+ * @throws InterruptedException when Thread#sleep is interrupted
+ */
+ @Test
+ public void testReplyToAndBindingsUpdated() throws InterruptedException {
+
+ consumingMockEndpoint.expectedMessageCount(3);
+ producingMockEndpoint.expectedMessageCount(3);
+
+ directRabbitMQProducer.requestBody(REQUEST);
+ String replyToOriginal = consumingMockEndpoint.getExchanges().get(0).getMessage().getHeader(RabbitMQConstants.REPLY_TO, String.class);
+
+ directRabbitMQProducer.requestBody(REQUEST);
+ String replyToVerify = consumingMockEndpoint.getExchanges().get(1).getMessage().getHeader(RabbitMQConstants.REPLY_TO, String.class);
+
+ Thread.sleep(7000);
+
+ forceCloseConnectionProducer.sendBodyAndHeader(null, TEMP_QUEUE_NAME, replyToOriginal);
+ Thread.sleep(7000);
+
+ directRabbitMQProducer.requestBody(REQUEST);
+ String replyToRecovered = consumingMockEndpoint.getExchanges().get(2).getMessage().getHeader(RabbitMQConstants.REPLY_TO, String.class);
+
+ String tempQueueRoutingKey = (String) getExchangeBindingsProducer.requestBodyAndHeader(null, TEMP_QUEUE_NAME, replyToRecovered);
+
+ assertEquals(replyToVerify, replyToOriginal);
+ assertNotEquals(replyToRecovered, replyToOriginal);
+ assertEquals(tempQueueRoutingKey, replyToRecovered);
+ consumingMockEndpoint.assertIsSatisfied();
+ producingMockEndpoint.assertIsSatisfied();
+ }
+}
\ No newline at end of file