You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2015/03/16 08:59:31 UTC
camel git commit: CAMEL-8487 Support to configure the custom
arguments on the RabbitMQ queues and exchange
Repository: camel
Updated Branches:
refs/heads/master b9fe0513f -> 9f893d83e
CAMEL-8487 Support to configure the custom arguments on the RabbitMQ queues and exchange
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9f893d83
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9f893d83
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9f893d83
Branch: refs/heads/master
Commit: 9f893d83e50d25b4a8571705f13c34f8981c0af6
Parents: b9fe0513
Author: Willem Jiang <wi...@gmail.com>
Authored: Mon Mar 16 15:58:31 2015 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Mon Mar 16 15:59:12 2015 +0800
----------------------------------------------------------------------
.../component/rabbitmq/ArgsConfigurer.java | 29 +++++++++++
.../component/rabbitmq/RabbitMQEndpoint.java | 54 ++++++++++++++++++--
.../rabbitmq/RabbitMQEndpointTest.java | 21 +++++++-
3 files changed, 99 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/9f893d83/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/ArgsConfigurer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/ArgsConfigurer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/ArgsConfigurer.java
new file mode 100644
index 0000000..71fd212
--- /dev/null
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/ArgsConfigurer.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rabbitmq;
+
+import java.util.Map;
+
+public interface ArgsConfigurer {
+
+ /**
+ * Configure the args maps for RabbitMQ to use
+ * @param args the map need to be configured
+ */
+ void configurArgs(Map<String, Object> args);
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/9f893d83/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
index f6e4198..ea311d7 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
@@ -72,6 +72,7 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
private boolean durable = true;
@UriParam(defaultValue = "false")
private boolean bridgeEndpoint;
+ @UriParam
private String queue = String.valueOf(UUID.randomUUID().toString().hashCode());
@UriParam(defaultValue = "direct")
private String exchangeType = "direct";
@@ -138,6 +139,10 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
//Maximum time (in milliseconds) waiting for channel
@UriParam(defaultValue = "1000")
private long channelPoolMaxWait = 1000;
+ @UriParam
+ private ArgsConfigurer queueArgsConfigurer;
+ @UriParam
+ private ArgsConfigurer exchangeArgsConfigurer;
public RabbitMQEndpoint() {
}
@@ -197,12 +202,13 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
* If needed, declare Exchange, declare Queue and bind them with Routing Key
*/
public void declareExchangeAndQueue(Channel channel) throws IOException {
- HashMap<String, Object> queueArgs = null;
+ Map<String, Object> queueArgs = new HashMap<String, Object>();
+ Map<String, Object> exchangeArgs = new HashMap<String, Object>();
+
if (deadLetterExchange != null) {
- queueArgs = new HashMap<String, Object>();
queueArgs.put(RabbitMQConstants.RABBITMQ_DEAD_LETTER_EXCHANGE, getDeadLetterExchange());
queueArgs.put(RabbitMQConstants.RABBITMQ_DEAD_LETTER_ROUTING_KEY, getDeadLetterRoutingKey());
-
+ // TODO Do we need to setup the args for the DeadLetter?
channel.exchangeDeclare(getDeadLetterExchange(),
getDeadLetterExchangeType(),
isDurable(),
@@ -215,10 +221,18 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
getDeadLetterExchange(),
getDeadLetterRoutingKey() == null ? "" : getDeadLetterRoutingKey());
}
+
+ if (getQueueArgsConfigurer() != null) {
+ getQueueArgsConfigurer().configurArgs(queueArgs);
+ }
+ if (getExchangeArgsConfigurer() != null) {
+ getExchangeArgsConfigurer().configurArgs(exchangeArgs);
+ }
+
channel.exchangeDeclare(getExchangeName(),
getExchangeType(),
isDurable(),
- isAutoDelete(), new HashMap<String, Object>());
+ isAutoDelete(), exchangeArgs);
if (getQueue() != null) {
// need to make sure the queueDeclare is same with the exchange declare
channel.queueDeclare(getQueue(), isDurable(), false,
@@ -618,4 +632,36 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
public void setChannelPoolMaxWait(long channelPoolMaxWait) {
this.channelPoolMaxWait = channelPoolMaxWait;
}
+
+ /**
+ * Get the configurer for setting the queue args in Channel.queueDeclare
+ * @return
+ */
+ public ArgsConfigurer getQueueArgsConfigurer() {
+ return queueArgsConfigurer;
+ }
+
+ /**
+ * Set the configurer for setting the queue args in Channel.queueDeclare
+ * @param queueArgsConfigurer the queue args configurer
+ */
+ public void setQueueArgsConfigurer(ArgsConfigurer queueArgsConfigurer) {
+ this.queueArgsConfigurer = queueArgsConfigurer;
+ }
+
+ /**
+ * Get the configurer for setting the exchange args in Channel.exchangeDeclare
+ * @return
+ */
+ public ArgsConfigurer getExchangeArgsConfigurer() {
+ return exchangeArgsConfigurer;
+ }
+
+ /**
+ * Set the configurer for setting the exchange args in Channel.exchangeDeclare
+ * @param queueArgsConfigurer the queue args configurer
+ */
+ public void setExchangeArgsConfigurer(ArgsConfigurer exchangeArgsConfigurer) {
+ this.exchangeArgsConfigurer = exchangeArgsConfigurer;
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/9f893d83/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
index afae40d..df12e65 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
@@ -30,8 +30,8 @@ import com.rabbitmq.client.Address;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.impl.LongStringHelper;
-
import org.apache.camel.Exchange;
+import org.apache.camel.impl.JndiRegistry;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;
import org.mockito.Mockito;
@@ -40,6 +40,18 @@ public class RabbitMQEndpointTest extends CamelTestSupport {
private Envelope envelope = Mockito.mock(Envelope.class);
private AMQP.BasicProperties properties = Mockito.mock(AMQP.BasicProperties.class);
+
+ protected JndiRegistry createRegistry() throws Exception {
+ JndiRegistry registry = super.createRegistry();
+ registry.bind("argsConfigurer", new ArgsConfigurer() {
+ @Override
+ public void configurArgs(Map<String, Object> args) {
+ // do nothing here
+ }
+
+ });
+ return registry;
+ }
@Test
public void testCreatingRabbitExchangeSetsStandardHeaders() throws Exception {
@@ -122,6 +134,13 @@ public class RabbitMQEndpointTest extends CamelTestSupport {
assertTrue(endpoint.isSingleton());
}
+
+ @Test
+ public void testArgConfigurer() throws Exception {
+ RabbitMQEndpoint endpoint = context.getEndpoint("rabbitmq:localhost/exchange?queueArgsConfigurer=#argsConfigurer", RabbitMQEndpoint.class);
+ assertNotNull("We should get the queueArgsConfigurer here.", endpoint.getQueueArgsConfigurer());
+ assertNull("We should not get the exchangeArgsConfigurer here.", endpoint.getExchangeArgsConfigurer());
+ }
@Test
public void brokerEndpointAddressesSettings() throws Exception {