You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/02/13 10:02:55 UTC

[3/3] camel git commit: CAMEL-10759 Inline multi-value rabbitmq args in URI

CAMEL-10759 Inline multi-value rabbitmq args in URI

- New inline multi-value map property 'args' on rabbitmq component for specifying rabbitmq args
- Combines all definition with the individually properties via setExchangeArgs, setQueueArgs & setBindingArgs
- Property args just delegates via it's three map prefixes: arg.exchange., arg.queue. & arg.binding.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/82452aab
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/82452aab
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/82452aab

Branch: refs/heads/master
Commit: 82452aab34f39cac2ee77f7ae881e61087648e51
Parents: d0a6c0d
Author: Paul Watson <pa...@pdwtech.com>
Authored: Sun Feb 12 20:31:03 2017 +0000
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Feb 13 11:00:59 2017 +0100

----------------------------------------------------------------------
 .../src/main/docs/rabbitmq-component.adoc       |  5 +-
 .../component/rabbitmq/RabbitMQComponent.java   | 19 +++++++-
 .../component/rabbitmq/RabbitMQEndpoint.java    | 31 ++++++++++++-
 .../rabbitmq/RabbitMQConsumerIntTest.java       | 39 ++++++++++------
 .../rabbitmq/RabbitMQEndpointTest.java          | 48 ++++++++++++++++++++
 5 files changed, 124 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/82452aab/components/camel-rabbitmq/src/main/docs/rabbitmq-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/docs/rabbitmq-component.adoc b/components/camel-rabbitmq/src/main/docs/rabbitmq-component.adoc
index e716693..019d877 100644
--- a/components/camel-rabbitmq/src/main/docs/rabbitmq-component.adoc
+++ b/components/camel-rabbitmq/src/main/docs/rabbitmq-component.adoc
@@ -45,7 +45,7 @@ The RabbitMQ component has no options.
 
 
 // endpoint options: START
-The RabbitMQ component supports 59 endpoint options which are listed below:
+The RabbitMQ component supports 60 endpoint options which are listed below:
 
 {% raw %}
 [width="100%",cols="2,1,1m,1m,5",options="header"]
@@ -89,6 +89,7 @@ The RabbitMQ component supports 59 endpoint options which are listed below:
 | publisherAcknowledgements | producer | false | boolean | When true the message will be published with publisher acknowledgements turned on
 | publisherAcknowledgementsTimeout | producer |  | long | The amount of time in milliseconds to wait for a basic.ack response from RabbitMQ server
 | addresses | advanced |  | Address[] | If this option is set camel-rabbitmq will try to create connection based on the setting of option addresses. The addresses value is a string which looks like server1:12345 server2:12345
+| args | advanced |  | Map | Specify arguments for configuring the different RabbitMQ concepts a different prefix is required for each: Queue: arg.queue. Exchange: arg.exchange. Binding: arg.binding. For example to declare a queue with message ttl argument: http://localhost:5672/exchange/queueargs=arg.queue.x-message-ttl=60000
 | automaticRecoveryEnabled | advanced |  | Boolean | Enables connection automatic recovery (uses connection implementation that performs automatic recovery when connection shutdown is not initiated by the application)
 | bindingArgs | advanced |  | Map | Key/value args for configuring the queue binding parameters when declare=true
 | clientProperties | advanced |  | Map | Connection client properties (client info used in negotiating with the server)
@@ -263,4 +264,4 @@ public Map<String, Object> bindArgsBuilder() {
 * link:configuring-camel.html[Configuring Camel]
 * link:component.html[Component]
 * link:endpoint.html[Endpoint]
-* link:getting-started.html[Getting Started]
\ No newline at end of file
+* link:getting-started.html[Getting Started]

http://git-wip-us.apache.org/repos/asf/camel/blob/82452aab/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
index 3fe86c5..237ba1a 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
@@ -17,19 +17,25 @@
 package org.apache.camel.component.rabbitmq;
 
 import java.net.URI;
+import java.util.HashMap;
 import java.util.Map;
-
 import javax.net.ssl.TrustManager;
 
 import com.rabbitmq.client.ConnectionFactory;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.impl.UriEndpointComponent;
+import org.apache.camel.util.IntrospectionSupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class RabbitMQComponent extends UriEndpointComponent {
 
+    public static final String ARG_PREFIX = "arg.";
+    public static final String EXCHANGE_ARG_PREFIX = "exchange.";
+    public static final String QUEUE_ARG_PREFIX = "queue.";
+    public static final String BINDING_ARG_PREFIX = "binding.";
+
     private static final Logger LOG = LoggerFactory.getLogger(RabbitMQComponent.class);
 
     public RabbitMQComponent() {
@@ -75,6 +81,17 @@ public class RabbitMQComponent extends UriEndpointComponent {
                     new Object[]{endpoint.getHostname(), endpoint.getPortNumber(), endpoint.getExchangeName()});
         }
 
+        HashMap<String, Object> args = new HashMap<>();
+        args.putAll(IntrospectionSupport.extractProperties(params, ARG_PREFIX));
+        endpoint.setArgs(args);
+
+        HashMap<String, Object> argsCopy = new HashMap<>(args);
+        
+        // Combine the three types of rabbit arguments with their individual endpoint properties
+        endpoint.getExchangeArgs().putAll(IntrospectionSupport.extractProperties(argsCopy, EXCHANGE_ARG_PREFIX));
+        endpoint.getQueueArgs().putAll(IntrospectionSupport.extractProperties(argsCopy, QUEUE_ARG_PREFIX));
+        endpoint.getBindingArgs().putAll(IntrospectionSupport.extractProperties(argsCopy, BINDING_ARG_PREFIX));
+
         return endpoint;
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/82452aab/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
index 60dc9f7..2292612 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
@@ -49,7 +49,8 @@ import org.apache.camel.spi.UriPath;
 /**
  * The rabbitmq component allows you produce and consume messages from <a href="http://www.rabbitmq.com/">RabbitMQ</a> instances.
  */
-@UriEndpoint(firstVersion = "2.12.0", scheme = "rabbitmq", title = "RabbitMQ", syntax = "rabbitmq:hostname:portNumber/exchangeName", consumerClass = RabbitMQConsumer.class, label = "messaging")
+@UriEndpoint(firstVersion = "2.12.0", scheme = "rabbitmq", title = "RabbitMQ", syntax = "rabbitmq:hostname:portNumber/exchangeName",
+        consumerClass = RabbitMQConsumer.class, label = "messaging", lenientProperties = true)
 public class RabbitMQEndpoint extends DefaultEndpoint implements AsyncEndpoint {
     // header to indicate that the message body needs to be de-serialized
     public static final String SERIALIZE_HEADER = "CamelSerialize";
@@ -148,6 +149,8 @@ public class RabbitMQEndpoint extends DefaultEndpoint implements AsyncEndpoint {
     private Map<String, Object> queueArgs = new HashMap<>();
     @UriParam(label = "advanced")
     private Map<String, Object> bindingArgs = new HashMap<>();
+    @UriParam(label = "advanced", prefix = "arg.", multiValue = true)
+    private Map<String, Object> args;
     @UriParam(label = "advanced")
     private ArgsConfigurer queueArgsConfigurer;
     @UriParam(label = "advanced")
@@ -794,6 +797,27 @@ public class RabbitMQEndpoint extends DefaultEndpoint implements AsyncEndpoint {
         return bindingArgs;
     }
 
+    /**
+     * Specify arguments for configuring the different RabbitMQ concepts, a different prefix is
+     * required for each:
+     * <ul>
+     *     <li>Queue: arg.queue.</li>
+     *     <li>Exchange: arg.exchange.</li>
+     *     <li>Binding: arg.binding.</li>
+     * </ul>
+     * For example to declare a queue with message ttl argument:
+     *
+     * http://localhost:5672/exchange/queue?args=arg.queue.x-message-ttl=60000
+     *
+     */
+    public void setArgs(Map<String, Object> args) {
+        this.args = args;
+    }
+
+    public Map<String, Object> getArgs() {
+        return args;
+    }
+
     public ArgsConfigurer getQueueArgsConfigurer() {
         return queueArgsConfigurer;
     }
@@ -919,4 +943,9 @@ public class RabbitMQEndpoint extends DefaultEndpoint implements AsyncEndpoint {
         this.exclusive = exclusive;
     }
 
+    public boolean isLenientProperties() {
+        // true to allow dynamic URI options to be configured
+        return true;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/82452aab/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerIntTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerIntTest.java
index 842f0d0..8ea7ac0 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerIntTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerIntTest.java
@@ -26,24 +26,18 @@ import java.util.concurrent.TimeoutException;
 
 import com.rabbitmq.client.AMQP;
 import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.ConnectionFactory;
-
 import org.apache.camel.Endpoint;
 import org.apache.camel.EndpointInject;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.impl.JndiRegistry;
-import org.apache.camel.test.junit4.CamelTestSupport;
 import org.junit.Test;
 
 public class RabbitMQConsumerIntTest extends AbstractRabbitMQIntTest {
 
     private static final String EXCHANGE = "ex1";
-    private static final String HEADERS_EXCHANGE = "ex2";
+    private static final String HEADERS_EXCHANGE = "ex8";
     private static final String QUEUE = "q1";
-    private static final String HEADER_KEY = "foo";
-    private static final String HEADER_VALUE = "bar";
     private static final String MSG = "hello world";
 
     @EndpointInject(uri = "rabbitmq:localhost:5672/" + EXCHANGE + "?username=cameltest&password=cameltest")
@@ -55,6 +49,9 @@ public class RabbitMQConsumerIntTest extends AbstractRabbitMQIntTest {
     @EndpointInject(uri = "rabbitmq:localhost:5672/" + HEADERS_EXCHANGE + "?username=cameltest&password=cameltest&exchangeType=headers&queue=" + QUEUE + "&bindingArgs=#bindArgs")
     private Endpoint headersExchangeWithQueue;
 
+    @EndpointInject(uri = "rabbitmq:localhost:5672/" + "ex7" + "?username=cameltest&password=cameltest&exchangeType=headers&autoDelete=false&durable=true&queue=q7&arg.binding.fizz=buzz")
+    private Endpoint headersExchangeWithQueueDefiniedInline;
+
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
@@ -63,6 +60,7 @@ public class RabbitMQConsumerIntTest extends AbstractRabbitMQIntTest {
             public void configure() throws Exception {
                 from(from).to(to);
                 from(headersExchangeWithQueue).to(to);
+                from(headersExchangeWithQueueDefiniedInline).to(to);
             }
         };
     }
@@ -119,20 +117,33 @@ public class RabbitMQConsumerIntTest extends AbstractRabbitMQIntTest {
         //one has the correct header set
         to.expectedMessageCount(1);
 
-        AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder();
-        properties.headers(Collections.singletonMap(HEADER_KEY, HEADER_VALUE));
+        Channel channel = connection().createChannel();
+        channel.basicPublish(HEADERS_EXCHANGE, "", propertiesWithHeader("foo", "bar"), MSG.getBytes());
+        channel.basicPublish(HEADERS_EXCHANGE, "", null, MSG.getBytes());
+        channel.basicPublish(HEADERS_EXCHANGE, "", propertiesWithHeader("foo", "bra"), MSG.getBytes());
 
-        AMQP.BasicProperties.Builder nonMatchingProperties = new AMQP.BasicProperties.Builder();
-        nonMatchingProperties.headers(Collections.singletonMap(HEADER_KEY, "wrong-value"));
+        to.assertIsSatisfied();
+    }
+
+    @Test
+    public void sentMessageIsReceivedWithHeadersRoutingMultiValueMapBindings() throws Exception {
+        to.expectedMessageCount(3);
 
         Channel channel = connection().createChannel();
-        channel.basicPublish(HEADERS_EXCHANGE, "", properties.build(), MSG.getBytes());
-        channel.basicPublish(HEADERS_EXCHANGE, "", null, MSG.getBytes());
-        channel.basicPublish(HEADERS_EXCHANGE, "", nonMatchingProperties.build(), MSG.getBytes());
+        channel.basicPublish("ex7", "", propertiesWithHeader("fizz", "buzz"), MSG.getBytes());
+        channel.basicPublish("ex7", "", propertiesWithHeader("fizz", "buzz"), MSG.getBytes());
+        channel.basicPublish("ex7", "", propertiesWithHeader("fizz", "buzz"), MSG.getBytes());
+        channel.basicPublish("ex7", "", propertiesWithHeader("fizz", "nope"), MSG.getBytes());
 
         to.assertIsSatisfied();
     }
 
+    private AMQP.BasicProperties propertiesWithHeader(String headerName, String headerValue) {
+        AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder();
+        properties.headers(Collections.singletonMap(headerName, headerValue));
+        return properties.build();
+    }
+
     private Date currentTimestampWithoutMillis() {
         Calendar calendar = Calendar.getInstance();
         calendar.set(Calendar.MILLISECOND, 0);

http://git-wip-us.apache.org/repos/asf/camel/blob/82452aab/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
index 648019f..6b7d737 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
@@ -58,6 +58,13 @@ public class RabbitMQEndpointTest extends CamelTestSupport {
         args.put("foo", "bar");
         registry.bind("args", args);
 
+        HashMap<String, Object> moreArgs = new HashMap<>();
+        moreArgs.put("fizz", "buzz");
+        registry.bind("moreArgs", moreArgs);
+
+        HashMap<String, Object> evenMoreArgs = new HashMap<>();
+        evenMoreArgs.put("ping", "pong");
+        registry.bind("evenMoreArgs", evenMoreArgs);
 
         return registry;
     }
@@ -201,6 +208,47 @@ public class RabbitMQEndpointTest extends CamelTestSupport {
     }
 
     @Test
+    public void testMultiArgsPopulateCorrectEndpointProperties() throws Exception {
+        RabbitMQEndpoint endpoint = context.getEndpoint("rabbitmq:localhost/exchange?arg.exchange.e1=v1&arg.exchange.e2=v2&arg.queue.q1=v3&arg.binding.b1=v4", RabbitMQEndpoint.class);
+        assertEquals("Wrong number of args", 4, endpoint.getArgs().size());
+        assertEquals("Wrong number of args", 1, endpoint.getBindingArgs().size());
+        assertEquals("Wrong number of args", 2, endpoint.getExchangeArgs().size());
+        assertEquals("Wrong number of args", 1, endpoint.getQueueArgs().size());
+    }
+
+    @Test
+    public void testMultiArgsCombinedWithIndividuallySpecified() throws Exception {
+        // setup two arguments for each rabbit fundamental.
+        // Configured inline and via named map in the camel registry
+        RabbitMQEndpoint endpoint = context.getEndpoint("rabbitmq:localhost/exchange"
+                + "?arg.exchange.e1=v1&exchangeArgs=#args"
+                + "&arg.queue.q1=v2&queueArgs=#moreArgs"
+                + "&arg.binding.b1=v3&bindingArgs=#evenMoreArgs", RabbitMQEndpoint.class);
+
+        // The multi-value inline has 3
+        Map<String, Object> inlineArgs = endpoint.getArgs();
+        assertEquals("Wrong number of args", 3, inlineArgs.size());
+        assertTrue(inlineArgs.containsKey("exchange.e1"));
+        assertTrue(inlineArgs.containsKey("queue.q1"));
+        assertTrue(inlineArgs.containsKey("binding.b1"));
+
+        Map<String, Object> exchangeArgs = endpoint.getExchangeArgs();
+        assertEquals("Wrong number of exchange args", 2, exchangeArgs.size());
+        assertTrue("Should contain the individually specified exchange args", exchangeArgs.containsKey("foo"));
+        assertTrue("Should contain the args in the multi-value map", exchangeArgs.containsKey("e1"));
+
+        Map<String, Object> queueArgs = endpoint.getQueueArgs();
+        assertEquals("Wrong number of queue args", 2, queueArgs.size());
+        assertTrue("Should contain the individually specified queue args", queueArgs.containsKey("fizz"));
+        assertTrue("Should contain the args in the multi-value map", queueArgs.containsKey("q1"));
+
+        Map<String, Object> bindingArgs = endpoint.getBindingArgs();
+        assertEquals("Wrong number of binding args", 2, bindingArgs.size());
+        assertTrue("Should contain the individually specified binding args", bindingArgs.containsKey("ping"));
+        assertTrue("Should contain the args in the multi-value map", bindingArgs.containsKey("b1"));
+    }
+
+    @Test
     public void brokerEndpointAddressesSettings() throws Exception {
         RabbitMQEndpoint endpoint = context.getEndpoint("rabbitmq:localhost/exchange?addresses=server1:12345,server2:12345", RabbitMQEndpoint.class);
         assertEquals("Wrong size of endpoint addresses.", 2, endpoint.getAddresses().length);