You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/02/13 10:02:55 UTC
[3/3] camel git commit: CAMEL-10759 Inline multi-value rabbitmq args
in URI
CAMEL-10759 Inline multi-value rabbitmq args in URI
- New inline multi-value map property 'args' on rabbitmq component for specifying rabbitmq args
- Combines all definition with the individually properties via setExchangeArgs, setQueueArgs & setBindingArgs
- Property args just delegates via it's three map prefixes: arg.exchange., arg.queue. & arg.binding.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/82452aab
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/82452aab
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/82452aab
Branch: refs/heads/master
Commit: 82452aab34f39cac2ee77f7ae881e61087648e51
Parents: d0a6c0d
Author: Paul Watson <pa...@pdwtech.com>
Authored: Sun Feb 12 20:31:03 2017 +0000
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Feb 13 11:00:59 2017 +0100
----------------------------------------------------------------------
.../src/main/docs/rabbitmq-component.adoc | 5 +-
.../component/rabbitmq/RabbitMQComponent.java | 19 +++++++-
.../component/rabbitmq/RabbitMQEndpoint.java | 31 ++++++++++++-
.../rabbitmq/RabbitMQConsumerIntTest.java | 39 ++++++++++------
.../rabbitmq/RabbitMQEndpointTest.java | 48 ++++++++++++++++++++
5 files changed, 124 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/82452aab/components/camel-rabbitmq/src/main/docs/rabbitmq-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/docs/rabbitmq-component.adoc b/components/camel-rabbitmq/src/main/docs/rabbitmq-component.adoc
index e716693..019d877 100644
--- a/components/camel-rabbitmq/src/main/docs/rabbitmq-component.adoc
+++ b/components/camel-rabbitmq/src/main/docs/rabbitmq-component.adoc
@@ -45,7 +45,7 @@ The RabbitMQ component has no options.
// endpoint options: START
-The RabbitMQ component supports 59 endpoint options which are listed below:
+The RabbitMQ component supports 60 endpoint options which are listed below:
{% raw %}
[width="100%",cols="2,1,1m,1m,5",options="header"]
@@ -89,6 +89,7 @@ The RabbitMQ component supports 59 endpoint options which are listed below:
| publisherAcknowledgements | producer | false | boolean | When true the message will be published with publisher acknowledgements turned on
| publisherAcknowledgementsTimeout | producer | | long | The amount of time in milliseconds to wait for a basic.ack response from RabbitMQ server
| addresses | advanced | | Address[] | If this option is set camel-rabbitmq will try to create connection based on the setting of option addresses. The addresses value is a string which looks like server1:12345 server2:12345
+| args | advanced | | Map | Specify arguments for configuring the different RabbitMQ concepts a different prefix is required for each: Queue: arg.queue. Exchange: arg.exchange. Binding: arg.binding. For example to declare a queue with message ttl argument: http://localhost:5672/exchange/queueargs=arg.queue.x-message-ttl=60000
| automaticRecoveryEnabled | advanced | | Boolean | Enables connection automatic recovery (uses connection implementation that performs automatic recovery when connection shutdown is not initiated by the application)
| bindingArgs | advanced | | Map | Key/value args for configuring the queue binding parameters when declare=true
| clientProperties | advanced | | Map | Connection client properties (client info used in negotiating with the server)
@@ -263,4 +264,4 @@ public Map<String, Object> bindArgsBuilder() {
* link:configuring-camel.html[Configuring Camel]
* link:component.html[Component]
* link:endpoint.html[Endpoint]
-* link:getting-started.html[Getting Started]
\ No newline at end of file
+* link:getting-started.html[Getting Started]
http://git-wip-us.apache.org/repos/asf/camel/blob/82452aab/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
index 3fe86c5..237ba1a 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
@@ -17,19 +17,25 @@
package org.apache.camel.component.rabbitmq;
import java.net.URI;
+import java.util.HashMap;
import java.util.Map;
-
import javax.net.ssl.TrustManager;
import com.rabbitmq.client.ConnectionFactory;
import org.apache.camel.CamelContext;
import org.apache.camel.impl.UriEndpointComponent;
+import org.apache.camel.util.IntrospectionSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RabbitMQComponent extends UriEndpointComponent {
+ public static final String ARG_PREFIX = "arg.";
+ public static final String EXCHANGE_ARG_PREFIX = "exchange.";
+ public static final String QUEUE_ARG_PREFIX = "queue.";
+ public static final String BINDING_ARG_PREFIX = "binding.";
+
private static final Logger LOG = LoggerFactory.getLogger(RabbitMQComponent.class);
public RabbitMQComponent() {
@@ -75,6 +81,17 @@ public class RabbitMQComponent extends UriEndpointComponent {
new Object[]{endpoint.getHostname(), endpoint.getPortNumber(), endpoint.getExchangeName()});
}
+ HashMap<String, Object> args = new HashMap<>();
+ args.putAll(IntrospectionSupport.extractProperties(params, ARG_PREFIX));
+ endpoint.setArgs(args);
+
+ HashMap<String, Object> argsCopy = new HashMap<>(args);
+
+ // Combine the three types of rabbit arguments with their individual endpoint properties
+ endpoint.getExchangeArgs().putAll(IntrospectionSupport.extractProperties(argsCopy, EXCHANGE_ARG_PREFIX));
+ endpoint.getQueueArgs().putAll(IntrospectionSupport.extractProperties(argsCopy, QUEUE_ARG_PREFIX));
+ endpoint.getBindingArgs().putAll(IntrospectionSupport.extractProperties(argsCopy, BINDING_ARG_PREFIX));
+
return endpoint;
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/82452aab/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
index 60dc9f7..2292612 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
@@ -49,7 +49,8 @@ import org.apache.camel.spi.UriPath;
/**
* The rabbitmq component allows you produce and consume messages from <a href="http://www.rabbitmq.com/">RabbitMQ</a> instances.
*/
-@UriEndpoint(firstVersion = "2.12.0", scheme = "rabbitmq", title = "RabbitMQ", syntax = "rabbitmq:hostname:portNumber/exchangeName", consumerClass = RabbitMQConsumer.class, label = "messaging")
+@UriEndpoint(firstVersion = "2.12.0", scheme = "rabbitmq", title = "RabbitMQ", syntax = "rabbitmq:hostname:portNumber/exchangeName",
+ consumerClass = RabbitMQConsumer.class, label = "messaging", lenientProperties = true)
public class RabbitMQEndpoint extends DefaultEndpoint implements AsyncEndpoint {
// header to indicate that the message body needs to be de-serialized
public static final String SERIALIZE_HEADER = "CamelSerialize";
@@ -148,6 +149,8 @@ public class RabbitMQEndpoint extends DefaultEndpoint implements AsyncEndpoint {
private Map<String, Object> queueArgs = new HashMap<>();
@UriParam(label = "advanced")
private Map<String, Object> bindingArgs = new HashMap<>();
+ @UriParam(label = "advanced", prefix = "arg.", multiValue = true)
+ private Map<String, Object> args;
@UriParam(label = "advanced")
private ArgsConfigurer queueArgsConfigurer;
@UriParam(label = "advanced")
@@ -794,6 +797,27 @@ public class RabbitMQEndpoint extends DefaultEndpoint implements AsyncEndpoint {
return bindingArgs;
}
+ /**
+ * Specify arguments for configuring the different RabbitMQ concepts, a different prefix is
+ * required for each:
+ * <ul>
+ * <li>Queue: arg.queue.</li>
+ * <li>Exchange: arg.exchange.</li>
+ * <li>Binding: arg.binding.</li>
+ * </ul>
+ * For example to declare a queue with message ttl argument:
+ *
+ * http://localhost:5672/exchange/queue?args=arg.queue.x-message-ttl=60000
+ *
+ */
+ public void setArgs(Map<String, Object> args) {
+ this.args = args;
+ }
+
+ public Map<String, Object> getArgs() {
+ return args;
+ }
+
public ArgsConfigurer getQueueArgsConfigurer() {
return queueArgsConfigurer;
}
@@ -919,4 +943,9 @@ public class RabbitMQEndpoint extends DefaultEndpoint implements AsyncEndpoint {
this.exclusive = exclusive;
}
+ public boolean isLenientProperties() {
+ // true to allow dynamic URI options to be configured
+ return true;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/82452aab/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerIntTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerIntTest.java
index 842f0d0..8ea7ac0 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerIntTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerIntTest.java
@@ -26,24 +26,18 @@ import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.ConnectionFactory;
-
import org.apache.camel.Endpoint;
import org.apache.camel.EndpointInject;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.impl.JndiRegistry;
-import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;
public class RabbitMQConsumerIntTest extends AbstractRabbitMQIntTest {
private static final String EXCHANGE = "ex1";
- private static final String HEADERS_EXCHANGE = "ex2";
+ private static final String HEADERS_EXCHANGE = "ex8";
private static final String QUEUE = "q1";
- private static final String HEADER_KEY = "foo";
- private static final String HEADER_VALUE = "bar";
private static final String MSG = "hello world";
@EndpointInject(uri = "rabbitmq:localhost:5672/" + EXCHANGE + "?username=cameltest&password=cameltest")
@@ -55,6 +49,9 @@ public class RabbitMQConsumerIntTest extends AbstractRabbitMQIntTest {
@EndpointInject(uri = "rabbitmq:localhost:5672/" + HEADERS_EXCHANGE + "?username=cameltest&password=cameltest&exchangeType=headers&queue=" + QUEUE + "&bindingArgs=#bindArgs")
private Endpoint headersExchangeWithQueue;
+ @EndpointInject(uri = "rabbitmq:localhost:5672/" + "ex7" + "?username=cameltest&password=cameltest&exchangeType=headers&autoDelete=false&durable=true&queue=q7&arg.binding.fizz=buzz")
+ private Endpoint headersExchangeWithQueueDefiniedInline;
+
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@@ -63,6 +60,7 @@ public class RabbitMQConsumerIntTest extends AbstractRabbitMQIntTest {
public void configure() throws Exception {
from(from).to(to);
from(headersExchangeWithQueue).to(to);
+ from(headersExchangeWithQueueDefiniedInline).to(to);
}
};
}
@@ -119,20 +117,33 @@ public class RabbitMQConsumerIntTest extends AbstractRabbitMQIntTest {
//one has the correct header set
to.expectedMessageCount(1);
- AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder();
- properties.headers(Collections.singletonMap(HEADER_KEY, HEADER_VALUE));
+ Channel channel = connection().createChannel();
+ channel.basicPublish(HEADERS_EXCHANGE, "", propertiesWithHeader("foo", "bar"), MSG.getBytes());
+ channel.basicPublish(HEADERS_EXCHANGE, "", null, MSG.getBytes());
+ channel.basicPublish(HEADERS_EXCHANGE, "", propertiesWithHeader("foo", "bra"), MSG.getBytes());
- AMQP.BasicProperties.Builder nonMatchingProperties = new AMQP.BasicProperties.Builder();
- nonMatchingProperties.headers(Collections.singletonMap(HEADER_KEY, "wrong-value"));
+ to.assertIsSatisfied();
+ }
+
+ @Test
+ public void sentMessageIsReceivedWithHeadersRoutingMultiValueMapBindings() throws Exception {
+ to.expectedMessageCount(3);
Channel channel = connection().createChannel();
- channel.basicPublish(HEADERS_EXCHANGE, "", properties.build(), MSG.getBytes());
- channel.basicPublish(HEADERS_EXCHANGE, "", null, MSG.getBytes());
- channel.basicPublish(HEADERS_EXCHANGE, "", nonMatchingProperties.build(), MSG.getBytes());
+ channel.basicPublish("ex7", "", propertiesWithHeader("fizz", "buzz"), MSG.getBytes());
+ channel.basicPublish("ex7", "", propertiesWithHeader("fizz", "buzz"), MSG.getBytes());
+ channel.basicPublish("ex7", "", propertiesWithHeader("fizz", "buzz"), MSG.getBytes());
+ channel.basicPublish("ex7", "", propertiesWithHeader("fizz", "nope"), MSG.getBytes());
to.assertIsSatisfied();
}
+ private AMQP.BasicProperties propertiesWithHeader(String headerName, String headerValue) {
+ AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder();
+ properties.headers(Collections.singletonMap(headerName, headerValue));
+ return properties.build();
+ }
+
private Date currentTimestampWithoutMillis() {
Calendar calendar = Calendar.getInstance();
calendar.set(Calendar.MILLISECOND, 0);
http://git-wip-us.apache.org/repos/asf/camel/blob/82452aab/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
index 648019f..6b7d737 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
@@ -58,6 +58,13 @@ public class RabbitMQEndpointTest extends CamelTestSupport {
args.put("foo", "bar");
registry.bind("args", args);
+ HashMap<String, Object> moreArgs = new HashMap<>();
+ moreArgs.put("fizz", "buzz");
+ registry.bind("moreArgs", moreArgs);
+
+ HashMap<String, Object> evenMoreArgs = new HashMap<>();
+ evenMoreArgs.put("ping", "pong");
+ registry.bind("evenMoreArgs", evenMoreArgs);
return registry;
}
@@ -201,6 +208,47 @@ public class RabbitMQEndpointTest extends CamelTestSupport {
}
@Test
+ public void testMultiArgsPopulateCorrectEndpointProperties() throws Exception {
+ RabbitMQEndpoint endpoint = context.getEndpoint("rabbitmq:localhost/exchange?arg.exchange.e1=v1&arg.exchange.e2=v2&arg.queue.q1=v3&arg.binding.b1=v4", RabbitMQEndpoint.class);
+ assertEquals("Wrong number of args", 4, endpoint.getArgs().size());
+ assertEquals("Wrong number of args", 1, endpoint.getBindingArgs().size());
+ assertEquals("Wrong number of args", 2, endpoint.getExchangeArgs().size());
+ assertEquals("Wrong number of args", 1, endpoint.getQueueArgs().size());
+ }
+
+ @Test
+ public void testMultiArgsCombinedWithIndividuallySpecified() throws Exception {
+ // setup two arguments for each rabbit fundamental.
+ // Configured inline and via named map in the camel registry
+ RabbitMQEndpoint endpoint = context.getEndpoint("rabbitmq:localhost/exchange"
+ + "?arg.exchange.e1=v1&exchangeArgs=#args"
+ + "&arg.queue.q1=v2&queueArgs=#moreArgs"
+ + "&arg.binding.b1=v3&bindingArgs=#evenMoreArgs", RabbitMQEndpoint.class);
+
+ // The multi-value inline has 3
+ Map<String, Object> inlineArgs = endpoint.getArgs();
+ assertEquals("Wrong number of args", 3, inlineArgs.size());
+ assertTrue(inlineArgs.containsKey("exchange.e1"));
+ assertTrue(inlineArgs.containsKey("queue.q1"));
+ assertTrue(inlineArgs.containsKey("binding.b1"));
+
+ Map<String, Object> exchangeArgs = endpoint.getExchangeArgs();
+ assertEquals("Wrong number of exchange args", 2, exchangeArgs.size());
+ assertTrue("Should contain the individually specified exchange args", exchangeArgs.containsKey("foo"));
+ assertTrue("Should contain the args in the multi-value map", exchangeArgs.containsKey("e1"));
+
+ Map<String, Object> queueArgs = endpoint.getQueueArgs();
+ assertEquals("Wrong number of queue args", 2, queueArgs.size());
+ assertTrue("Should contain the individually specified queue args", queueArgs.containsKey("fizz"));
+ assertTrue("Should contain the args in the multi-value map", queueArgs.containsKey("q1"));
+
+ Map<String, Object> bindingArgs = endpoint.getBindingArgs();
+ assertEquals("Wrong number of binding args", 2, bindingArgs.size());
+ assertTrue("Should contain the individually specified binding args", bindingArgs.containsKey("ping"));
+ assertTrue("Should contain the args in the multi-value map", bindingArgs.containsKey("b1"));
+ }
+
+ @Test
public void brokerEndpointAddressesSettings() throws Exception {
RabbitMQEndpoint endpoint = context.getEndpoint("rabbitmq:localhost/exchange?addresses=server1:12345,server2:12345", RabbitMQEndpoint.class);
assertEquals("Wrong size of endpoint addresses.", 2, endpoint.getAddresses().length);