You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2018/07/02 08:33:08 UTC

[camel] 01/02: CAMEL-12612 - Added support for Rabbitmq exclusive consumer

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 509348a3230d7e7b69a2a37255a69f41dadd7fa3
Author: Darrell King <ki...@gmail.com>
AuthorDate: Sun Jul 1 08:58:15 2018 +0100

    CAMEL-12612 - Added support for Rabbitmq exclusive consumer
---
 .../apache/camel/component/rabbitmq/RabbitConsumer.java   |  2 +-
 .../camel/component/rabbitmq/RabbitMQComponent.java       | 15 +++++++++++++++
 .../apache/camel/component/rabbitmq/RabbitMQEndpoint.java | 14 ++++++++++++++
 .../camel/component/rabbitmq/RabbitMQComponentTest.java   |  3 +++
 .../camel/component/rabbitmq/RabbitMQEndpointTest.java    |  6 ++++++
 5 files changed, 39 insertions(+), 1 deletion(-)

diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
index 8727673..b30a051 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
@@ -174,7 +174,7 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer {
         if (channel == null) {
             throw new IOException("The RabbitMQ channel is not open");
         }
-        tag = channel.basicConsume(consumer.getEndpoint().getQueue(), consumer.getEndpoint().isAutoAck(), this);
+        tag = channel.basicConsume(consumer.getEndpoint().getQueue(), consumer.getEndpoint().isAutoAck(), "", false, consumer.getEndpoint().isExclusiveConsumer(), null, this);
     }
 
     /**
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
index c258a5c..96c1ac8 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
@@ -59,6 +59,8 @@ public class RabbitMQComponent extends UriEndpointComponent {
     private boolean autoDelete = true;
     @Metadata(label = "common", defaultValue = "true")
     private boolean durable = true;
+    @Metadata(label = "consumer")
+    private boolean exclusiveConsumer;
     @Metadata(label = "common")
     private boolean exclusive;
     @Metadata(label = "common")
@@ -229,6 +231,7 @@ public class RabbitMQComponent extends UriEndpointComponent {
         endpoint.setAutoDelete(isAutoDelete());
         endpoint.setDurable(isDurable());
         endpoint.setExclusive(isExclusive());
+        endpoint.setExclusiveConsumer(isExclusiveConsumer());
         endpoint.setPassive(isPassive());
         endpoint.setSkipExchangeDeclare(isSkipExchangeDeclare());
         endpoint.setSkipQueueBind(isSkipQueueBind());
@@ -742,6 +745,18 @@ public class RabbitMQComponent extends UriEndpointComponent {
         this.exclusive = exclusive;
     }
 
+    public boolean isExclusiveConsumer() {
+        return exclusiveConsumer;
+    }
+
+    /**
+     * Request exclusive access to the queue (meaning only this consumer can access the queue). This is useful
+     * when you want a long-lived shared queue to be temporarily accessible by just one consumer.
+     */
+    public void setExclusiveConsumer(boolean exclusiveConsumer) {
+        this.exclusiveConsumer = exclusiveConsumer;
+    }
+
     public boolean isPassive() {
         return passive;
     }
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
index 39de2e3..837ca59 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
@@ -76,6 +76,8 @@ public class RabbitMQEndpoint extends DefaultEndpoint implements AsyncEndpoint {
     private boolean autoDelete = true;
     @UriParam(label = "common", defaultValue = "true")
     private boolean durable = true;
+    @UriParam(label = "consumer", defaultValue = "false")
+    private boolean exclusiveConsumer;
     @UriParam(label = "common")
     private boolean exclusive;
     @UriParam(label = "common")
@@ -985,6 +987,18 @@ public class RabbitMQEndpoint extends DefaultEndpoint implements AsyncEndpoint {
         this.exclusive = exclusive;
     }
 
+    public boolean isExclusiveConsumer() {
+        return exclusiveConsumer;
+    }
+
+    /**
+     * Request exclusive access to the queue (meaning only this consumer can access the queue). This is useful
+     * when you want a long-lived shared queue to be temporarily accessible by just one consumer.
+     */
+    public void setExclusiveConsumer(boolean exclusiveConsumer) {
+        this.exclusiveConsumer = exclusiveConsumer;
+    }
+
     public boolean isPassive() {
         return passive;
     }
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQComponentTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQComponentTest.java
index d734917..4b59443 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQComponentTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQComponentTest.java
@@ -44,6 +44,7 @@ public class RabbitMQComponentTest {
         assertEquals(true, endpoint.isAutoAck());
         assertEquals(true, endpoint.isAutoDelete());
         assertEquals(true, endpoint.isDurable());
+        assertEquals(false, endpoint.isExclusiveConsumer());
         assertEquals("direct", endpoint.getExchangeType());
         assertEquals(ConnectionFactory.DEFAULT_CONNECTION_TIMEOUT, endpoint.getConnectionTimeout());
         assertEquals(ConnectionFactory.DEFAULT_CHANNEL_MAX, endpoint.getRequestedChannelMax());
@@ -68,6 +69,7 @@ public class RabbitMQComponentTest {
         params.put("requestedChannelMax", 456);
         params.put("requestedFrameMax", 789);
         params.put("requestedHeartbeat", 321);
+        params.put("exclusiveConsumer", true);
 
         RabbitMQEndpoint endpoint = createEndpoint(params);
 
@@ -86,6 +88,7 @@ public class RabbitMQComponentTest {
         assertEquals(456, endpoint.getRequestedChannelMax());
         assertEquals(789, endpoint.getRequestedFrameMax());
         assertEquals(321, endpoint.getRequestedHeartbeat());
+        assertEquals(true, endpoint.isExclusiveConsumer());
     }
 
     private RabbitMQEndpoint createEndpoint(Map<String, Object> params) throws Exception {
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
index 518e9e6..914173a 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
@@ -345,6 +345,12 @@ public class RabbitMQEndpointTest extends CamelTestSupport {
     }
 
     @Test
+    public void createEndpointWithExclusiveConsumerEnabled() throws Exception {
+        RabbitMQEndpoint endpoint = context.getEndpoint("rabbitmq:localhost/exchange?exclusiveConsumer=true", RabbitMQEndpoint.class);
+        assertTrue(endpoint.isExclusiveConsumer());
+    }
+
+    @Test
     public void createEndpointWithPassiveEnabled() throws Exception {
         RabbitMQEndpoint endpoint = context.getEndpoint("rabbitmq:localhost/exchange?passive=true", RabbitMQEndpoint.class);
         assertTrue(endpoint.isPassive());