You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/03/26 14:47:18 UTC
[camel] branch camel-2.25.x updated: CAMEL-14789: camel-rabbitmq -
Automatic recovery of temporary reply queue is not handled correctly - Add
QueueRecoveryListener to update replyTo and rebind the new temporary queue
- Add integration test using RabbitMQ Management HTTP API
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch camel-2.25.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-2.25.x by this push:
new edc9dce CAMEL-14789: camel-rabbitmq - Automatic recovery of temporary reply queue is not handled correctly - Add QueueRecoveryListener to update replyTo and rebind the new temporary queue - Add integration test using RabbitMQ Management HTTP API
edc9dce is described below
commit edc9dce8db5e72ffc5097b19fe093655c563b89d
Author: Robert Szczesiak <ro...@gmail.com>
AuthorDate: Thu Mar 26 14:35:23 2020 +0100
CAMEL-14789: camel-rabbitmq - Automatic recovery of temporary reply queue is not handled correctly
- Add QueueRecoveryListener to update replyTo and rebind the new temporary queue
- Add integration test using RabbitMQ Management HTTP API
---
components/camel-rabbitmq/pom.xml | 5 +
components/camel-rabbitmq/readme.txt | 2 +-
.../rabbitmq/reply/TemporaryQueueReplyManager.java | 17 ++
.../RabbitMQTemporaryQueueAutoRecoveryIntTest.java | 179 +++++++++++++++++++++
4 files changed, 202 insertions(+), 1 deletion(-)
diff --git a/components/camel-rabbitmq/pom.xml b/components/camel-rabbitmq/pom.xml
index 1749cc7..0c45aee 100644
--- a/components/camel-rabbitmq/pom.xml
+++ b/components/camel-rabbitmq/pom.xml
@@ -104,6 +104,11 @@
<artifactId>log4j-slf4j-impl</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-http4</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/components/camel-rabbitmq/readme.txt b/components/camel-rabbitmq/readme.txt
index ef005d5..d812391 100644
--- a/components/camel-rabbitmq/readme.txt
+++ b/components/camel-rabbitmq/readme.txt
@@ -7,7 +7,7 @@ The integration tests requires a running RabbitMQ broker.
The broker can be run via Docker:
- docker run -it -p 5672:5672 -e RABBITMQ_DEFAULT_USER=cameltest -e RABBITMQ_DEFAULT_PASS=cameltest --hostname my-rabbit --name some-rabbit rabbitmq:3
+ docker run -it -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=cameltest -e RABBITMQ_DEFAULT_PASS=cameltest --hostname my-rabbit --name some-rabbit rabbitmq:3-management
Or to install RabbitMQ as standalone and then configure it:
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyManager.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyManager.java
index b375e22..a54b4f2 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyManager.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyManager.java
@@ -24,6 +24,7 @@ import com.rabbitmq.client.AMQP.Queue.DeclareOk;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Envelope;
+import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
@@ -94,6 +95,22 @@ public class TemporaryQueueReplyManager extends ReplyManagerSupport {
//TODO check for the RabbitMQConstants.EXCHANGE_NAME header
channel.queueBind(getReplyTo(), endpoint.getExchangeName(), getReplyTo());
+ //Add QueueRecoveryListener to notify when temporary queue name changes due to recovery
+ if (conn instanceof AutorecoveringConnection) {
+ ((AutorecoveringConnection) conn).addQueueRecoveryListener((oldName, newName) -> {
+ log.debug("Temporary queue name {} was changed to {}. Updating replyTo.", oldName, newName);
+ setReplyTo(newName);
+
+ log.debug("Trying to rebind the new temporary queue to update routingKey");
+ try {
+ channel.queueBind(newName, endpoint.getExchangeName(), newName);
+ channel.queueUnbind(newName, endpoint.getExchangeName(), oldName);
+ } catch (IOException e) {
+ log.warn("Failed to bind or unbind a queue. This exception is ignored.", e);
+ }
+ });
+ }
+
consumer = new RabbitConsumer(this, channel);
consumer.start();
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQTemporaryQueueAutoRecoveryIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQTemporaryQueueAutoRecoveryIntTest.java
new file mode 100644
index 0000000..a62a85a
--- /dev/null
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQTemporaryQueueAutoRecoveryIntTest.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rabbitmq;
+
+import java.util.stream.StreamSupport;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+/**
+ * Integration test to check if temporary queue's name change is properly handled after auto recovering
+ * caused by connection failure.
+ * This test takes advantage of RabbitMQ Management HTTP API provided by RabbitMQ Management Plugin.
+ */
+public class RabbitMQTemporaryQueueAutoRecoveryIntTest extends AbstractRabbitMQIntTest {
+
+ private static final String EXCHANGE = "ex_temp-queue-test";
+ private static final String QUEUE = "q_temp-queue-test";
+ private static final String ROUTING_KEY = "k_temp-queue-test";
+ private static final String TEMP_QUEUE_NAME = "tempQueueName";
+ private static final String TEMP_QUEUE_CONN_NAME = "tempQueueConnName";
+ private static final String REQUEST = "Foo request";
+ private static final String REPLY = "Bar reply";
+
+ @Produce(uri = "direct:rabbitMQ")
+ protected ProducerTemplate directRabbitMQProducer;
+
+ @Produce(uri = "direct:rabbitMQApi-forceCloseConnection")
+ protected ProducerTemplate forceCloseConnectionProducer;
+
+ @Produce(uri = "direct:rabbitMQApi-getExchangeBindings")
+ protected ProducerTemplate getExchangeBindingsProducer;
+
+ @EndpointInject(uri = "rabbitmq:" + EXCHANGE + "?addresses=localhost:5672&username=cameltest&password=cameltest"
+ + "&autoAck=false&queue=" + QUEUE + "&routingKey=" + ROUTING_KEY)
+ private Endpoint rabbitMQEndpoint;
+
+ @EndpointInject(uri = "http4:localhost:15672/api?authMethod=Basic&authUsername=cameltest&authPassword=cameltest")
+ private Endpoint rabbitMQApiEndpoint;
+
+ @EndpointInject(uri = "mock:consuming")
+ private MockEndpoint consumingMockEndpoint;
+
+ @EndpointInject(uri = "mock:producing")
+ private MockEndpoint producingMockEndpoint;
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+
+ return new RouteBuilder() {
+
+ @Override
+ public void configure() {
+
+ log.info("Building routes...");
+
+ from("direct:rabbitMQ")
+ .id("producingRoute")
+ .log("Sending message to RabbitMQ broker")
+ .to(rabbitMQEndpoint)
+ .to(producingMockEndpoint);
+
+ from(rabbitMQEndpoint)
+ .id("consumingRoute")
+ .log("Receiving message from RabbitMQ broker")
+ .to(consumingMockEndpoint)
+ .setBody(simple(REPLY));
+
+ from("direct:rabbitMQApi-forceCloseConnection")
+ .id("forceCloseConnectionRoute")
+ .log("Getting temporary queue's connection name")
+ .setHeader(Exchange.HTTP_PATH, simple("/queues/%2F/${header." + TEMP_QUEUE_NAME + "}"))
+ .setHeader(Exchange.HTTP_METHOD, simple("GET"))
+ .to(rabbitMQApiEndpoint)
+ .process(exchange -> {
+ String responseJsonString = exchange.getMessage().getBody(String.class);
+ ObjectNode node = new ObjectMapper().readValue(responseJsonString, ObjectNode.class);
+ String connectionName = node.at("/owner_pid_details/name").asText();
+ exchange.getMessage().setHeader(TEMP_QUEUE_CONN_NAME, connectionName);
+ })
+ .log("Force closing temporary queue's connection")
+ .setHeader(Exchange.HTTP_PATH, simple("/connections/${header." + TEMP_QUEUE_CONN_NAME + "}"))
+ .setHeader(Exchange.HTTP_METHOD, simple("DELETE"))
+ .to(rabbitMQApiEndpoint);
+
+ from("direct:rabbitMQApi-getExchangeBindings")
+ .id("getExchangeBindingsRoute")
+ .log("Getting temporary queue's routing key to verify rebinding was successful")
+ .setHeader(Exchange.HTTP_PATH, simple("/exchanges/%2F/" + EXCHANGE + "/bindings/source"))
+ .setHeader(Exchange.HTTP_METHOD, simple("GET"))
+ .to(rabbitMQApiEndpoint)
+ .process(exchange -> {
+ String responseJsonString = exchange.getMessage().getBody(String.class);
+ String tempQueueName = exchange.getMessage().getHeader(TEMP_QUEUE_NAME, String.class);
+ ArrayNode node = new ObjectMapper().readValue(responseJsonString, ArrayNode.class);
+ String tempQueueRoutingKey = StreamSupport.stream(node.spliterator(), false)
+ .filter(binding -> tempQueueName.equals(binding.get("destination").textValue()))
+ .findFirst()
+ .map(binding -> binding.get("routing_key").textValue())
+ .orElse(null);
+ exchange.getMessage().setBody(tempQueueRoutingKey);
+ });
+ }
+ };
+ }
+
+ /**
+ * <p><b>NOTE:</b>Make sure RabbitMQ Management Plugin is enabled
+ * and ConnectionFactory#automaticRecovery is set to <code>true</code> (default)</p>
+ * <ul>
+ * <li>Send first PRC request that automatically creates server-named temporary reply queue</li>
+ * <li>Send another PRC request to verify reply-to property stays the same
+ * if no connection failure occurred</li>
+ * <li>Wait a few seconds to ensure all necessary bindings are created
+ * and seen by the RabbitMQ Management HTTP API</li>
+ * <li>Forcibly close temporary reply queue's connection and wait another few seconds
+ * to let it recover automatically</li>
+ * <li>Send one last RPC request and verify reply-to property is changed
+ * (assuming the new server-generated name will not be exactly the same)</li>
+ * <li>Get new temporary queue's bindings and verify routing key matches queue name</li>
+ * </ul>
+ *
+ * @throws InterruptedException when Thread#sleep is interrupted
+ */
+ @Test
+ public void testReplyToAndBindingsUpdated() throws InterruptedException {
+
+ consumingMockEndpoint.expectedMessageCount(3);
+ producingMockEndpoint.expectedMessageCount(3);
+
+ directRabbitMQProducer.requestBody(REQUEST);
+ String replyToOriginal = consumingMockEndpoint.getExchanges().get(0).getMessage().getHeader(RabbitMQConstants.REPLY_TO, String.class);
+
+ directRabbitMQProducer.requestBody(REQUEST);
+ String replyToVerify = consumingMockEndpoint.getExchanges().get(1).getMessage().getHeader(RabbitMQConstants.REPLY_TO, String.class);
+
+ Thread.sleep(7000);
+
+ forceCloseConnectionProducer.sendBodyAndHeader(null, TEMP_QUEUE_NAME, replyToOriginal);
+ Thread.sleep(7000);
+
+ directRabbitMQProducer.requestBody(REQUEST);
+ String replyToRecovered = consumingMockEndpoint.getExchanges().get(2).getMessage().getHeader(RabbitMQConstants.REPLY_TO, String.class);
+
+ String tempQueueRoutingKey = (String) getExchangeBindingsProducer.requestBodyAndHeader(null, TEMP_QUEUE_NAME, replyToRecovered);
+
+ assertEquals(replyToVerify, replyToOriginal);
+ assertNotEquals(replyToRecovered, replyToOriginal);
+ assertEquals(tempQueueRoutingKey, replyToRecovered);
+ consumingMockEndpoint.assertIsSatisfied();
+ producingMockEndpoint.assertIsSatisfied();
+ }
+}