You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2015/03/16 08:59:31 UTC

camel git commit: CAMEL-8487 Support to configure the custom arguments on the RabbitMQ queues and exchange

Repository: camel
Updated Branches:
  refs/heads/master b9fe0513f -> 9f893d83e


CAMEL-8487 Support to configure the custom arguments on the RabbitMQ queues and exchange


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9f893d83
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9f893d83
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9f893d83

Branch: refs/heads/master
Commit: 9f893d83e50d25b4a8571705f13c34f8981c0af6
Parents: b9fe0513
Author: Willem Jiang <wi...@gmail.com>
Authored: Mon Mar 16 15:58:31 2015 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Mon Mar 16 15:59:12 2015 +0800

----------------------------------------------------------------------
 .../component/rabbitmq/ArgsConfigurer.java      | 29 +++++++++++
 .../component/rabbitmq/RabbitMQEndpoint.java    | 54 ++++++++++++++++++--
 .../rabbitmq/RabbitMQEndpointTest.java          | 21 +++++++-
 3 files changed, 99 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/9f893d83/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/ArgsConfigurer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/ArgsConfigurer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/ArgsConfigurer.java
new file mode 100644
index 0000000..71fd212
--- /dev/null
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/ArgsConfigurer.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rabbitmq;
+
+import java.util.Map;
+
+public interface ArgsConfigurer {
+    
+    /**
+     * Configure the args maps for RabbitMQ to use
+     * @param args the map need to be configured
+     */
+    void configurArgs(Map<String, Object> args);
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/9f893d83/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
index f6e4198..ea311d7 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
@@ -72,6 +72,7 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
     private boolean durable = true;
     @UriParam(defaultValue = "false")
     private boolean bridgeEndpoint;
+    @UriParam
     private String queue = String.valueOf(UUID.randomUUID().toString().hashCode());
     @UriParam(defaultValue = "direct")
     private String exchangeType = "direct";
@@ -138,6 +139,10 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
     //Maximum time (in milliseconds) waiting for channel
     @UriParam(defaultValue = "1000")
     private long channelPoolMaxWait = 1000;
+    @UriParam
+    private ArgsConfigurer queueArgsConfigurer;
+    @UriParam
+    private ArgsConfigurer exchangeArgsConfigurer;
 
     public RabbitMQEndpoint() {
     }
@@ -197,12 +202,13 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
      * If needed, declare Exchange, declare Queue and bind them with Routing Key
      */
     public void declareExchangeAndQueue(Channel channel) throws IOException {
-        HashMap<String, Object> queueArgs = null;
+        Map<String, Object> queueArgs = new HashMap<String, Object>();
+        Map<String, Object> exchangeArgs = new HashMap<String, Object>();
+        
         if (deadLetterExchange != null) {
-            queueArgs = new HashMap<String, Object>();
             queueArgs.put(RabbitMQConstants.RABBITMQ_DEAD_LETTER_EXCHANGE, getDeadLetterExchange());
             queueArgs.put(RabbitMQConstants.RABBITMQ_DEAD_LETTER_ROUTING_KEY, getDeadLetterRoutingKey());
-            
+            // TODO Do we need to setup the args for the DeadLetter?
             channel.exchangeDeclare(getDeadLetterExchange(),
                     getDeadLetterExchangeType(),
                     isDurable(),
@@ -215,10 +221,18 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
                     getDeadLetterExchange(),
                     getDeadLetterRoutingKey() == null ? "" : getDeadLetterRoutingKey());
         }
+        
+        if (getQueueArgsConfigurer() != null) {
+            getQueueArgsConfigurer().configurArgs(queueArgs);
+        }
+        if (getExchangeArgsConfigurer() != null) {
+            getExchangeArgsConfigurer().configurArgs(exchangeArgs);
+        }
+        
         channel.exchangeDeclare(getExchangeName(),
                 getExchangeType(),
                 isDurable(),
-                isAutoDelete(), new HashMap<String, Object>());
+                isAutoDelete(), exchangeArgs);
         if (getQueue() != null) {
             // need to make sure the queueDeclare is same with the exchange declare
             channel.queueDeclare(getQueue(), isDurable(), false,
@@ -618,4 +632,36 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
     public void setChannelPoolMaxWait(long channelPoolMaxWait) {
         this.channelPoolMaxWait = channelPoolMaxWait;
     }
+
+    /**
+     * Get the configurer for setting the queue args in Channel.queueDeclare
+     * @return
+     */
+    public ArgsConfigurer getQueueArgsConfigurer() {
+        return queueArgsConfigurer;
+    }
+    
+    /**
+     * Set the configurer for setting the queue args in Channel.queueDeclare
+     * @param queueArgsConfigurer the queue args configurer
+     */
+    public void setQueueArgsConfigurer(ArgsConfigurer queueArgsConfigurer) {
+        this.queueArgsConfigurer = queueArgsConfigurer;
+    }
+    
+    /**
+     * Get the configurer for setting the exchange args in Channel.exchangeDeclare
+     * @return
+     */
+    public ArgsConfigurer getExchangeArgsConfigurer() {
+        return exchangeArgsConfigurer;
+    }
+    
+    /**
+     * Set the configurer for setting the exchange args in Channel.exchangeDeclare
+     * @param queueArgsConfigurer the queue args configurer
+     */
+    public void setExchangeArgsConfigurer(ArgsConfigurer exchangeArgsConfigurer) {
+        this.exchangeArgsConfigurer = exchangeArgsConfigurer;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/9f893d83/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
index afae40d..df12e65 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
@@ -30,8 +30,8 @@ import com.rabbitmq.client.Address;
 import com.rabbitmq.client.ConnectionFactory;
 import com.rabbitmq.client.Envelope;
 import com.rabbitmq.client.impl.LongStringHelper;
-
 import org.apache.camel.Exchange;
+import org.apache.camel.impl.JndiRegistry;
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -40,6 +40,18 @@ public class RabbitMQEndpointTest extends CamelTestSupport {
 
     private Envelope envelope = Mockito.mock(Envelope.class);
     private AMQP.BasicProperties properties = Mockito.mock(AMQP.BasicProperties.class);
+    
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry registry = super.createRegistry();
+        registry.bind("argsConfigurer", new ArgsConfigurer() {
+            @Override
+            public void configurArgs(Map<String, Object> args) {
+                // do nothing here
+            }
+            
+        });
+        return registry;
+    }
 
     @Test
     public void testCreatingRabbitExchangeSetsStandardHeaders() throws Exception {
@@ -122,6 +134,13 @@ public class RabbitMQEndpointTest extends CamelTestSupport {
 
         assertTrue(endpoint.isSingleton());
     }
+    
+    @Test
+    public void testArgConfigurer() throws Exception {
+        RabbitMQEndpoint endpoint = context.getEndpoint("rabbitmq:localhost/exchange?queueArgsConfigurer=#argsConfigurer", RabbitMQEndpoint.class);
+        assertNotNull("We should get the queueArgsConfigurer here.", endpoint.getQueueArgsConfigurer());
+        assertNull("We should not get the exchangeArgsConfigurer here.", endpoint.getExchangeArgsConfigurer());
+    }
 
     @Test
     public void brokerEndpointAddressesSettings() throws Exception {