You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2018/07/02 08:33:08 UTC
[camel] 01/02: CAMEL-12612 - Added support for Rabbitmq exclusive
consumer
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 509348a3230d7e7b69a2a37255a69f41dadd7fa3
Author: Darrell King <ki...@gmail.com>
AuthorDate: Sun Jul 1 08:58:15 2018 +0100
CAMEL-12612 - Added support for Rabbitmq exclusive consumer
---
.../apache/camel/component/rabbitmq/RabbitConsumer.java | 2 +-
.../camel/component/rabbitmq/RabbitMQComponent.java | 15 +++++++++++++++
.../apache/camel/component/rabbitmq/RabbitMQEndpoint.java | 14 ++++++++++++++
.../camel/component/rabbitmq/RabbitMQComponentTest.java | 3 +++
.../camel/component/rabbitmq/RabbitMQEndpointTest.java | 6 ++++++
5 files changed, 39 insertions(+), 1 deletion(-)
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
index 8727673..b30a051 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
@@ -174,7 +174,7 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer {
if (channel == null) {
throw new IOException("The RabbitMQ channel is not open");
}
- tag = channel.basicConsume(consumer.getEndpoint().getQueue(), consumer.getEndpoint().isAutoAck(), this);
+ tag = channel.basicConsume(consumer.getEndpoint().getQueue(), consumer.getEndpoint().isAutoAck(), "", false, consumer.getEndpoint().isExclusiveConsumer(), null, this);
}
/**
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
index c258a5c..96c1ac8 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
@@ -59,6 +59,8 @@ public class RabbitMQComponent extends UriEndpointComponent {
private boolean autoDelete = true;
@Metadata(label = "common", defaultValue = "true")
private boolean durable = true;
+ @Metadata(label = "consumer")
+ private boolean exclusiveConsumer;
@Metadata(label = "common")
private boolean exclusive;
@Metadata(label = "common")
@@ -229,6 +231,7 @@ public class RabbitMQComponent extends UriEndpointComponent {
endpoint.setAutoDelete(isAutoDelete());
endpoint.setDurable(isDurable());
endpoint.setExclusive(isExclusive());
+ endpoint.setExclusiveConsumer(isExclusiveConsumer());
endpoint.setPassive(isPassive());
endpoint.setSkipExchangeDeclare(isSkipExchangeDeclare());
endpoint.setSkipQueueBind(isSkipQueueBind());
@@ -742,6 +745,18 @@ public class RabbitMQComponent extends UriEndpointComponent {
this.exclusive = exclusive;
}
+ public boolean isExclusiveConsumer() {
+ return exclusiveConsumer;
+ }
+
+ /**
+ * Request exclusive access to the queue (meaning only this consumer can access the queue). This is useful
+ * when you want a long-lived shared queue to be temporarily accessible by just one consumer.
+ */
+ public void setExclusiveConsumer(boolean exclusiveConsumer) {
+ this.exclusiveConsumer = exclusiveConsumer;
+ }
+
public boolean isPassive() {
return passive;
}
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
index 39de2e3..837ca59 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
@@ -76,6 +76,8 @@ public class RabbitMQEndpoint extends DefaultEndpoint implements AsyncEndpoint {
private boolean autoDelete = true;
@UriParam(label = "common", defaultValue = "true")
private boolean durable = true;
+ @UriParam(label = "consumer", defaultValue = "false")
+ private boolean exclusiveConsumer;
@UriParam(label = "common")
private boolean exclusive;
@UriParam(label = "common")
@@ -985,6 +987,18 @@ public class RabbitMQEndpoint extends DefaultEndpoint implements AsyncEndpoint {
this.exclusive = exclusive;
}
+ public boolean isExclusiveConsumer() {
+ return exclusiveConsumer;
+ }
+
+ /**
+ * Request exclusive access to the queue (meaning only this consumer can access the queue). This is useful
+ * when you want a long-lived shared queue to be temporarily accessible by just one consumer.
+ */
+ public void setExclusiveConsumer(boolean exclusiveConsumer) {
+ this.exclusiveConsumer = exclusiveConsumer;
+ }
+
public boolean isPassive() {
return passive;
}
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQComponentTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQComponentTest.java
index d734917..4b59443 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQComponentTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQComponentTest.java
@@ -44,6 +44,7 @@ public class RabbitMQComponentTest {
assertEquals(true, endpoint.isAutoAck());
assertEquals(true, endpoint.isAutoDelete());
assertEquals(true, endpoint.isDurable());
+ assertEquals(false, endpoint.isExclusiveConsumer());
assertEquals("direct", endpoint.getExchangeType());
assertEquals(ConnectionFactory.DEFAULT_CONNECTION_TIMEOUT, endpoint.getConnectionTimeout());
assertEquals(ConnectionFactory.DEFAULT_CHANNEL_MAX, endpoint.getRequestedChannelMax());
@@ -68,6 +69,7 @@ public class RabbitMQComponentTest {
params.put("requestedChannelMax", 456);
params.put("requestedFrameMax", 789);
params.put("requestedHeartbeat", 321);
+ params.put("exclusiveConsumer", true);
RabbitMQEndpoint endpoint = createEndpoint(params);
@@ -86,6 +88,7 @@ public class RabbitMQComponentTest {
assertEquals(456, endpoint.getRequestedChannelMax());
assertEquals(789, endpoint.getRequestedFrameMax());
assertEquals(321, endpoint.getRequestedHeartbeat());
+ assertEquals(true, endpoint.isExclusiveConsumer());
}
private RabbitMQEndpoint createEndpoint(Map<String, Object> params) throws Exception {
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
index 518e9e6..914173a 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
@@ -345,6 +345,12 @@ public class RabbitMQEndpointTest extends CamelTestSupport {
}
@Test
+ public void createEndpointWithExclusiveConsumerEnabled() throws Exception {
+ RabbitMQEndpoint endpoint = context.getEndpoint("rabbitmq:localhost/exchange?exclusiveConsumer=true", RabbitMQEndpoint.class);
+ assertTrue(endpoint.isExclusiveConsumer());
+ }
+
+ @Test
public void createEndpointWithPassiveEnabled() throws Exception {
RabbitMQEndpoint endpoint = context.getEndpoint("rabbitmq:localhost/exchange?passive=true", RabbitMQEndpoint.class);
assertTrue(endpoint.isPassive());