You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2014/03/31 08:21:36 UTC
[2/8] git commit: Add unit test to support scenario
concurrentConsumers
Add unit test to support scenario concurrentConsumers
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/85833902
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/85833902
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/85833902
Branch: refs/heads/camel-2.13.x
Commit: 85833902ef13959ea07345f3b92696c1593c521e
Parents: 6919c57
Author: Charles Moulliard <ch...@gmail.com>
Authored: Mon Mar 17 11:21:08 2014 +0100
Committer: Willem Jiang <wi...@gmail.com>
Committed: Mon Mar 31 14:19:08 2014 +0800
----------------------------------------------------------------------
.../component/sjms/CamelJmsTestHelper.java | 83 ++++++++++++++++++++
.../consumer/InOutConcurrentConsumerTest.java | 80 +++++++++++++++++++
2 files changed, 163 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/85833902/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/CamelJmsTestHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/CamelJmsTestHelper.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/CamelJmsTestHelper.java
new file mode 100644
index 0000000..00e3555
--- /dev/null
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/CamelJmsTestHelper.java
@@ -0,0 +1,83 @@
+package org.apache.camel.component.sjms;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.pool.PooledConnectionFactory;
+import org.apache.camel.util.FileUtil;
+
+import javax.jms.ConnectionFactory;
+import java.io.File;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A helper for unit testing with Apache ActiveMQ as embedded JMS broker.
+ *
+ * @version
+ */
+public final class CamelJmsTestHelper {
+
+ private static AtomicInteger counter = new AtomicInteger(0);
+
+ private CamelJmsTestHelper() {
+ }
+
+ public static ConnectionFactory createConnectionFactory() {
+ return createConnectionFactory(null);
+ }
+
+ public static ConnectionFactory createConnectionFactory(String options) {
+ // using a unique broker name improves testing when running the entire test suite in the same JVM
+ int id = counter.incrementAndGet();
+ String url = "vm://test-broker-" + id + "?broker.persistent=false&broker.useJmx=false";
+ if (options != null) {
+ url = url + "&" + options;
+ }
+ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
+ // optimize AMQ to be as fast as possible so unit testing is quicker
+ connectionFactory.setCopyMessageOnSend(false);
+ connectionFactory.setOptimizeAcknowledge(true);
+ connectionFactory.setOptimizedMessageDispatch(true);
+
+ // When using asyncSend, producers will not be guaranteed to send in the order we
+ // have in the tests (which may be confusing for queues) so we need this set to false.
+ // Another way of guaranteeing order is to use persistent messages or transactions.
+ connectionFactory.setUseAsyncSend(false);
+
+ connectionFactory.setAlwaysSessionAsync(false);
+ // use a pooled connection factory
+ PooledConnectionFactory pooled = new PooledConnectionFactory(connectionFactory);
+ pooled.setMaxConnections(8);
+ return pooled;
+ }
+
+ public static ConnectionFactory createPersistentConnectionFactory() {
+ return createPersistentConnectionFactory(null);
+ }
+
+ public static ConnectionFactory createPersistentConnectionFactory(String options) {
+ // using a unique broker name improves testing when running the entire test suite in the same JVM
+ int id = counter.incrementAndGet();
+
+ // use an unique data directory in target
+ String dir = "target/activemq-data-" + id;
+
+ // remove dir so its empty on startup
+ FileUtil.removeDir(new File(dir));
+
+ String url = "vm://test-broker-" + id + "?broker.persistent=true&broker.useJmx=false&broker.dataDirectory=" + dir;
+ if (options != null) {
+ url = url + "&" + options;
+ }
+ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
+ // optimize AMQ to be as fast as possible so unit testing is quicker
+ connectionFactory.setCopyMessageOnSend(false);
+ connectionFactory.setOptimizeAcknowledge(true);
+ connectionFactory.setOptimizedMessageDispatch(true);
+ connectionFactory.setUseAsyncSend(true);
+ connectionFactory.setAlwaysSessionAsync(false);
+
+ // use a pooled connection factory
+ PooledConnectionFactory pooled = new PooledConnectionFactory(connectionFactory);
+ pooled.setMaxConnections(8);
+ return pooled;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/85833902/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOutConcurrentConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOutConcurrentConsumerTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOutConcurrentConsumerTest.java
new file mode 100644
index 0000000..fc18efc
--- /dev/null
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOutConcurrentConsumerTest.java
@@ -0,0 +1,80 @@
+package org.apache.camel.component.sjms.consumer;
+
+import org.apache.camel.*;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.sjms.support.JmsTestSupport;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.*;
+
+/**
+ * Concurrent consumer with JMSReply test.
+ */
+public class InOutConcurrentConsumerTest extends JmsTestSupport {
+
+ @EndpointInject(uri = "mock:result")
+ MockEndpoint result;
+
+ @Test
+ public void testConcurrent() throws Exception {
+ doSendMessages(10, 5);
+ }
+
+ private void doSendMessages(int messages, int poolSize) throws Exception {
+
+ result.expectedMessageCount(messages);
+ result.expectsNoDuplicates(body());
+
+ ExecutorService executor = Executors.newFixedThreadPool(poolSize);
+ final List<Future<String>> futures = new ArrayList<Future<String>>();
+ for (int i = 0; i < messages; i++) {
+ final int index = i;
+ Future<String> out = executor.submit(new Callable<String>() {
+ public String call() throws Exception {
+ return template.requestBody("direct:start", "Message " + index, String.class);
+ }
+ });
+ futures.add(out);
+ }
+
+ assertMockEndpointsSatisfied();
+
+ for (int i = 0; i < futures.size(); i++) {
+ Object out = futures.get(i).get();
+ assertEquals("Bye Message " + i, out);
+ }
+ executor.shutdownNow();
+ }
+
+ protected CamelContext createCamelContext() throws Exception {
+ CamelContext camelContext = super.createCamelContext();
+ return camelContext;
+ }
+
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() throws Exception {
+ from("direct:start")
+ .to("sjms:a?consumerCount=5&exchangePattern=InOut&namedReplyTo=myResponse")
+ .to("mock:result");
+
+ from("sjms:a?consumerCount=5&exchangePattern=InOut&namedReplyTo=myResponse")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ String body = exchange.getIn().getBody(String.class);
+ // sleep a little to simulate heavy work and force concurrency processing
+ Thread.sleep(1000);
+ exchange.getOut().setBody("Bye " + body);
+ exchange.getOut().setHeader("threadName", Thread.currentThread().getName());
+ System.out.println("Thread ID : " + Thread.currentThread().getName());
+ }
+ });
+ }
+ };
+ }
+
+}
+