You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/07/30 13:18:58 UTC

[camel] branch camel-3.4.x updated: CAMEL-15355: camel-rabbitmq fix arg.queue.x-single-active-consumer to be configured as boolean type for RabbitMQ to make it work. Thanks to Devansh Arora for reporting and suggested fix.

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-3.4.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.4.x by this push:
     new b96cc3b  CAMEL-15355: camel-rabbitmq fix arg.queue.x-single-active-consumer to be configured as boolean type for RabbitMQ to make it work. Thanks to Devansh Arora for reporting and suggested fix.
b96cc3b is described below

commit b96cc3bf0dd884c46ee70a8f80e5cdd7522af6a3
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Jul 30 15:16:50 2020 +0200

    CAMEL-15355: camel-rabbitmq fix arg.queue.x-single-active-consumer to be configured as boolean type for RabbitMQ to make it work. Thanks to Devansh Arora for reporting and suggested fix.
---
 .../camel/component/rabbitmq/RabbitMQConstants.java      |  1 +
 .../camel/component/rabbitmq/RabbitMQDeclareSupport.java |  5 +++++
 .../camel/component/rabbitmq/RabbitMQEndpointTest.java   | 16 ++++++++++++++++
 .../rabbitmq/integration/RabbitMQConsumerIntTest.java    |  2 +-
 4 files changed, 23 insertions(+), 1 deletion(-)

diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java
index 1d26f9a..493c2d4 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java
@@ -49,6 +49,7 @@ public final class RabbitMQConstants {
     public static final String RABBITMQ_QUEUE_MAX_PRIORITY_KEY = "x-max-priority";
     public static final String RABBITMQ_QUEUE_MESSAGE_TTL_KEY = "x-message-ttl";
     public static final String RABBITMQ_QUEUE_TTL_KEY = "x-expires";
+    public static final String RABBITMQ_QUEUE_SINGLE_ACTIVE_CONSUMER_KEY = "x-single-active-consumer";
 
     private RabbitMQConstants() {
         // Constants class
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQDeclareSupport.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQDeclareSupport.java
index 23324a2..d04f6c3 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQDeclareSupport.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQDeclareSupport.java
@@ -85,6 +85,11 @@ public class RabbitMQDeclareSupport {
         if (queueExpiration instanceof String) {
             queueArgs.put(RabbitMQConstants.RABBITMQ_QUEUE_TTL_KEY, Long.parseLong((String)queueExpiration));
         }
+
+        Object singleConsumer = queueArgs.get(RabbitMQConstants.RABBITMQ_QUEUE_SINGLE_ACTIVE_CONSUMER_KEY);
+        if (singleConsumer instanceof String) {
+            queueArgs.put(RabbitMQConstants.RABBITMQ_QUEUE_SINGLE_ACTIVE_CONSUMER_KEY, Boolean.parseBoolean((String)singleConsumer));
+        }
     }
 
     private void populateQueueArgumentsFromDeadLetterExchange(final Map<String, Object> queueArgs) {
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
index 3fdb624..45d86f5 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
@@ -38,6 +38,7 @@ import org.apache.camel.spi.Registry;
 import org.apache.camel.support.SimpleRegistry;
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.junit.Test;
+import org.junit.Assert;
 import org.mockito.Mockito;
 
 public class RabbitMQEndpointTest extends CamelTestSupport {
@@ -286,4 +287,19 @@ public class RabbitMQEndpointTest extends CamelTestSupport {
         RabbitMQEndpoint endpoint = context.getEndpoint("rabbitmq:localhost/exchange?passive=true", RabbitMQEndpoint.class);
         assertTrue(endpoint.isPassive());
     }
+
+    @Test
+    public void testEndpointArgsIssue() throws Exception {
+        RabbitMQEndpoint endpoint1 = context.getEndpoint("rabbitmq://localhost:5672/mydirectdelayed?queue=testQ4"
+                + "&routingKey=testKey&username=me&password=mypwd&threadPoolSize=1&concurrentConsumers=1&autoDelete=false"
+                + "&vhost=myvhost&arg.queue.x-single-active-consumer=true&arg.exchange.x-delayed-type=direct&exchangeType=x-delayed-message", RabbitMQEndpoint.class);
+
+        Assert.assertNotNull(endpoint1.getArgs());
+        Assert.assertEquals(2, endpoint1.getArgs().size());
+        Assert.assertNotNull(endpoint1.getExchangeArgs());
+        Assert.assertEquals(1, endpoint1.getExchangeArgs().size());
+        Assert.assertNotNull(endpoint1.getQueueArgs());
+        Assert.assertEquals(1, endpoint1.getQueueArgs().size());
+    }
+
 }
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQConsumerIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQConsumerIntTest.java
index 14957ac..c5edfb5 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQConsumerIntTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQConsumerIntTest.java
@@ -43,7 +43,7 @@ public class RabbitMQConsumerIntTest extends AbstractRabbitMQIntTest {
     private static final String QUEUE = "q1";
     private static final String MSG = "hello world";
 
-    @EndpointInject("rabbitmq:localhost:5672/" + EXCHANGE + "?username=cameltest&password=cameltest")
+    @EndpointInject("rabbitmq:localhost:5672/" + EXCHANGE + "?username=cameltest&password=cameltest&arg.queue.x-single-active-consumer=true")
     private Endpoint from;
 
     @EndpointInject("mock:result")