You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2014/03/31 08:21:36 UTC

[2/8] git commit: Add unit test to support scenario concurrentConsumers

Add unit test to support scenario concurrentConsumers


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/85833902
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/85833902
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/85833902

Branch: refs/heads/camel-2.13.x
Commit: 85833902ef13959ea07345f3b92696c1593c521e
Parents: 6919c57
Author: Charles Moulliard <ch...@gmail.com>
Authored: Mon Mar 17 11:21:08 2014 +0100
Committer: Willem Jiang <wi...@gmail.com>
Committed: Mon Mar 31 14:19:08 2014 +0800

----------------------------------------------------------------------
 .../component/sjms/CamelJmsTestHelper.java      | 83 ++++++++++++++++++++
 .../consumer/InOutConcurrentConsumerTest.java   | 80 +++++++++++++++++++
 2 files changed, 163 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/85833902/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/CamelJmsTestHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/CamelJmsTestHelper.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/CamelJmsTestHelper.java
new file mode 100644
index 0000000..00e3555
--- /dev/null
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/CamelJmsTestHelper.java
@@ -0,0 +1,83 @@
+package org.apache.camel.component.sjms;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.pool.PooledConnectionFactory;
+import org.apache.camel.util.FileUtil;
+
+import javax.jms.ConnectionFactory;
+import java.io.File;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A helper for unit testing with Apache ActiveMQ as embedded JMS broker.
+ *
+ * @version
+ */
+public final class CamelJmsTestHelper {
+
+    private static AtomicInteger counter = new AtomicInteger(0);
+
+    private CamelJmsTestHelper() {
+    }
+
+    public static ConnectionFactory createConnectionFactory() {
+        return createConnectionFactory(null);
+    }
+
+    public static ConnectionFactory createConnectionFactory(String options) {
+        // using a unique broker name improves testing when running the entire test suite in the same JVM
+        int id = counter.incrementAndGet();
+        String url = "vm://test-broker-" + id + "?broker.persistent=false&broker.useJmx=false";
+        if (options != null) {
+            url = url + "&" + options;
+        }
+        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
+        // optimize AMQ to be as fast as possible so unit testing is quicker
+        connectionFactory.setCopyMessageOnSend(false);
+        connectionFactory.setOptimizeAcknowledge(true);
+        connectionFactory.setOptimizedMessageDispatch(true);
+
+        // When using asyncSend, producers will not be guaranteed to send in the order we
+        // have in the tests (which may be confusing for queues) so we need this set to false.
+        // Another way of guaranteeing order is to use persistent messages or transactions.
+        connectionFactory.setUseAsyncSend(false);
+
+        connectionFactory.setAlwaysSessionAsync(false);
+        // use a pooled connection factory
+        PooledConnectionFactory pooled = new PooledConnectionFactory(connectionFactory);
+        pooled.setMaxConnections(8);
+        return pooled;
+    }
+
+    public static ConnectionFactory createPersistentConnectionFactory() {
+        return createPersistentConnectionFactory(null);
+    }
+
+    public static ConnectionFactory createPersistentConnectionFactory(String options) {
+        // using a unique broker name improves testing when running the entire test suite in the same JVM
+        int id = counter.incrementAndGet();
+
+        // use an unique data directory in target
+        String dir = "target/activemq-data-" + id;
+
+        // remove dir so its empty on startup
+        FileUtil.removeDir(new File(dir));
+
+        String url = "vm://test-broker-" + id + "?broker.persistent=true&broker.useJmx=false&broker.dataDirectory=" + dir;
+        if (options != null) {
+            url = url + "&" + options;
+        }
+        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
+        // optimize AMQ to be as fast as possible so unit testing is quicker
+        connectionFactory.setCopyMessageOnSend(false);
+        connectionFactory.setOptimizeAcknowledge(true);
+        connectionFactory.setOptimizedMessageDispatch(true);
+        connectionFactory.setUseAsyncSend(true);
+        connectionFactory.setAlwaysSessionAsync(false);
+
+        // use a pooled connection factory
+        PooledConnectionFactory pooled = new PooledConnectionFactory(connectionFactory);
+        pooled.setMaxConnections(8);
+        return pooled;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/85833902/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOutConcurrentConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOutConcurrentConsumerTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOutConcurrentConsumerTest.java
new file mode 100644
index 0000000..fc18efc
--- /dev/null
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOutConcurrentConsumerTest.java
@@ -0,0 +1,80 @@
+package org.apache.camel.component.sjms.consumer;
+
+import org.apache.camel.*;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.sjms.support.JmsTestSupport;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.*;
+
+/**
+ * Concurrent consumer with JMSReply test.
+ */
+public class InOutConcurrentConsumerTest extends JmsTestSupport {
+
+    @EndpointInject(uri = "mock:result")
+    MockEndpoint result;
+
+    @Test
+    public void testConcurrent() throws Exception {
+        doSendMessages(10, 5);
+    }
+
+    private void doSendMessages(int messages, int poolSize) throws Exception {
+
+        result.expectedMessageCount(messages);
+        result.expectsNoDuplicates(body());
+
+        ExecutorService executor = Executors.newFixedThreadPool(poolSize);
+        final List<Future<String>> futures = new ArrayList<Future<String>>();
+        for (int i = 0; i < messages; i++) {
+            final int index = i;
+            Future<String> out = executor.submit(new Callable<String>() {
+                public String call() throws Exception {
+                    return template.requestBody("direct:start", "Message " + index, String.class);
+                }
+            });
+            futures.add(out);
+        }
+
+        assertMockEndpointsSatisfied();
+
+        for (int i = 0; i < futures.size(); i++) {
+            Object out = futures.get(i).get();
+            assertEquals("Bye Message " + i, out);
+        }
+        executor.shutdownNow();
+    }
+
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext camelContext = super.createCamelContext();
+        return camelContext;
+    }
+
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                from("direct:start")
+                    .to("sjms:a?consumerCount=5&exchangePattern=InOut&namedReplyTo=myResponse")
+                    .to("mock:result");
+
+                from("sjms:a?consumerCount=5&exchangePattern=InOut&namedReplyTo=myResponse")
+                    .process(new Processor() {
+                        public void process(Exchange exchange) throws Exception {
+                            String body = exchange.getIn().getBody(String.class);
+                            // sleep a little to simulate heavy work and force concurrency processing
+                            Thread.sleep(1000);
+                            exchange.getOut().setBody("Bye " + body);
+                            exchange.getOut().setHeader("threadName", Thread.currentThread().getName());
+                            System.out.println("Thread ID : " + Thread.currentThread().getName());
+                        }
+                    });
+            }
+        };
+    }
+
+}
+