You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/07/17 15:22:00 UTC

[1/3] camel git commit: Refactored logic around parsing destination names from the URI. Rejecting topics from batch consumption, as does not make sense conceptually.

Repository: camel
Updated Branches:
  refs/heads/master 65f9a3ab3 -> 832a99c54


Refactored logic around parsing destination names from the URI. Rejecting topics from batch consumption, as does not make sense conceptually.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2ba152d3
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2ba152d3
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2ba152d3

Branch: refs/heads/master
Commit: 2ba152d387d23d2f0f4dc2297984d4173b147a5e
Parents: ab1d1dd
Author: jkorab <ja...@gmail.com>
Authored: Fri Jul 17 11:42:47 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Jul 17 14:56:22 2015 +0200

----------------------------------------------------------------------
 .../camel/component/sjms/SjmsComponent.java     |  2 +-
 .../camel/component/sjms/SjmsEndpoint.java      | 22 ++------
 .../component/sjms/batch/SjmsBatchConsumer.java | 15 +++---
 .../component/sjms/batch/SjmsBatchEndpoint.java | 19 +++++--
 .../sjms/jms/DestinationNameParser.java         | 36 +++++++++++++
 .../sjms/batch/SjmsBatchConsumerTest.java       | 49 ++++++++++++++---
 .../sjms/batch/SjmsBatchEndpointTest.java       | 23 ++++++--
 .../sjms/jms/DestinationNameParserTest.java     | 56 ++++++++++++++++++++
 .../sjms/producer/QueueProducerQoSTest.java     | 23 ++++----
 9 files changed, 192 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/2ba152d3/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
index f2eebc6..2503162 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
@@ -61,7 +61,7 @@ public class SjmsComponent extends UriEndpointComponent implements HeaderFilterS
     protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
         validateMepAndReplyTo(parameters);
         uri = normalizeUri(uri);
-        SjmsEndpoint endpoint = new SjmsEndpoint(uri, this);
+        SjmsEndpoint endpoint = new SjmsEndpoint(uri, this, remaining);
         setProperties(endpoint, parameters);
         if (endpoint.isTransacted()) {
             endpoint.setSynchronous(true);

http://git-wip-us.apache.org/repos/asf/camel/blob/2ba152d3/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
index 67774d4..0e8d68a 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
@@ -22,11 +22,7 @@ import org.apache.camel.ExchangePattern;
 import org.apache.camel.MultipleConsumersSupport;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
-import org.apache.camel.component.sjms.jms.ConnectionResource;
-import org.apache.camel.component.sjms.jms.DefaultDestinationCreationStrategy;
-import org.apache.camel.component.sjms.jms.DestinationCreationStrategy;
-import org.apache.camel.component.sjms.jms.KeyFormatStrategy;
-import org.apache.camel.component.sjms.jms.SessionAcknowledgementType;
+import org.apache.camel.component.sjms.jms.*;
 import org.apache.camel.component.sjms.producer.InOnlyProducer;
 import org.apache.camel.component.sjms.producer.InOutProducer;
 import org.apache.camel.impl.DefaultEndpoint;
@@ -95,19 +91,11 @@ public class SjmsEndpoint extends DefaultEndpoint implements MultipleConsumersSu
     public SjmsEndpoint() {
     }
 
-    public SjmsEndpoint(String uri, Component component) {
+    public SjmsEndpoint(String uri, Component component, String remaining) {
         super(uri, component);
-        if (getEndpointUri().contains("://queue:")) {
-            topic = false;
-        } else if (getEndpointUri().contains("://topic:")) {
-            topic = true;
-        } else {
-            throw new IllegalArgumentException("Endpoint URI unsupported: " + uri);
-        }
-        destinationName = getEndpointUri().substring(getEndpointUri().lastIndexOf(":") + 1);
-        if (destinationName.contains("?")) {
-            destinationName = destinationName.substring(0, destinationName.lastIndexOf("?"));
-        }
+        DestinationNameParser parser = new DestinationNameParser();
+        topic = parser.isTopic(remaining);
+        this.destinationName = parser.getShortName(remaining);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/2ba152d3/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
index 613d471..ca47c7c 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
@@ -16,13 +16,11 @@
  */
 package org.apache.camel.component.sjms.batch;
 
-import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.component.sjms.jms.JmsMessageHelper;
 import org.apache.camel.impl.DefaultConsumer;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
-import org.apache.camel.spi.Synchronization;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -90,7 +88,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
     }
 
     @Override
-    public Endpoint getEndpoint() {
+    public SjmsBatchEndpoint getEndpoint() {
         return sjmsBatchEndpoint;
     }
 
@@ -164,10 +162,11 @@ public class SjmsBatchConsumer extends DefaultConsumer {
                 // a batch corresponds to a single session that will be committed or rolled back by a background thread
                 final Session session = connection.createSession(TRANSACTED, Session.CLIENT_ACKNOWLEDGE);
                 try {
-                    // destinationName only creates queues; there is no additional value to be gained
-                    // by transactionally consuming topic messages in batches
+                    // only batch consumption from queues is supported - it makes no sense to transactionally consume
+                    // from a topic as you don't car about message loss, users can just use a regular aggregator instead
                     Queue queue = session.createQueue(destinationName);
                     MessageConsumer consumer = session.createConsumer(queue);
+
                     try {
                         consumeBatchesOnLoop(session, consumer);
                     } finally {
@@ -289,11 +288,11 @@ public class SjmsBatchConsumer extends DefaultConsumer {
             int id = batchCount.getAndIncrement();
             int batchSize = exchange.getProperty(SjmsBatchEndpoint.PROPERTY_BATCH_SIZE, Integer.class);
             if (LOG.isDebugEnabled()) {
-                LOG.debug("Processing batch:size={}:total={}", batchSize, messagesReceived.addAndGet(batchSize));
+                LOG.debug("Processing batch[" + id + "]:size=" + batchSize + ":total=" + messagesReceived.addAndGet(batchSize));
             }
 
-            Synchronization committing = new SessionCompletion(session);
-            exchange.addOnCompletion(committing);
+            SessionCompletion sessionCompletion = new SessionCompletion(session);
+            exchange.addOnCompletion(sessionCompletion);
             try {
                 processor.process(exchange);
                 LOG.debug("Completed processing[{}]:total={}", id, messagesProcessed.addAndGet(batchSize));

http://git-wip-us.apache.org/repos/asf/camel/blob/2ba152d3/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
index afd5cbe..5d307a7 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
@@ -20,6 +20,7 @@ import org.apache.camel.Component;
 import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
+import org.apache.camel.component.sjms.jms.DestinationNameParser;
 import org.apache.camel.impl.DefaultEndpoint;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.spi.Metadata;
@@ -30,7 +31,10 @@ import org.apache.camel.spi.UriPath;
 /**
  * @author jkorab
  */
-@UriEndpoint(scheme = "sjmsBatch", title = "Simple JMS Batch Component", syntax = "sjms-batch:destinationName?aggregationStrategy=#aggStrategy", consumerClass = SjmsBatchComponent.class, label = "messaging")
+@UriEndpoint(scheme = "sjmsBatch",
+        title = "Simple JMS Batch Component",
+        syntax = "sjms-batch:destinationName?aggregationStrategy=#aggStrategy",
+        consumerClass = SjmsBatchComponent.class, label = "messaging")
 public class SjmsBatchEndpoint extends DefaultEndpoint {
 
     public static final int DEFAULT_COMPLETION_SIZE = 200; // the default dispatch queue size in ActiveMQ
@@ -38,7 +42,8 @@ public class SjmsBatchEndpoint extends DefaultEndpoint {
     public static final String PROPERTY_BATCH_SIZE = "CamelSjmsBatchSize";
 
     @UriPath
-    @Metadata(required = "true")
+    @Metadata(required = "true",
+            description = "The destination name. Only queues are supported, names may be prefixed by 'queue:'.")
     private String destinationName;
 
     @UriParam(label = "consumer", defaultValue = "1", description = "The number of JMS sessions to consume from")
@@ -61,11 +66,18 @@ public class SjmsBatchEndpoint extends DefaultEndpoint {
     @UriParam(label = "consumer", description = "A #-reference to an AggregationStrategy visible to Camel")
     private AggregationStrategy aggregationStrategy;
 
+    private boolean topic;
+
     public SjmsBatchEndpoint() {}
 
     public SjmsBatchEndpoint(String endpointUri, Component component, String remaining) {
         super(endpointUri, component);
-        this.destinationName = remaining;
+        DestinationNameParser parser = new DestinationNameParser();
+        if (parser.isTopic(remaining)) {
+            throw new IllegalArgumentException("Only batch consumption from queues is supported. For topics you " +
+                    "should use a regular JMS consumer with an aggregator.");
+        }
+        this.destinationName = parser.getShortName(remaining);
     }
 
     @Override
@@ -130,4 +142,5 @@ public class SjmsBatchEndpoint extends DefaultEndpoint {
     public void setPollDuration(Integer pollDuration) {
         this.pollDuration = pollDuration;
     }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/2ba152d3/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DestinationNameParser.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DestinationNameParser.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DestinationNameParser.java
new file mode 100644
index 0000000..d248350
--- /dev/null
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DestinationNameParser.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.jms;
+
+/**
+ * @author jkorab
+ */
+public class DestinationNameParser {
+    public boolean isTopic(String destinationName) {
+        if (destinationName == null) {
+            throw new IllegalArgumentException("destinationName is null");
+        }
+        return destinationName.startsWith("topic:");
+    }
+
+    public String getShortName(String destinationName) {
+        if (destinationName == null) {
+            throw new IllegalArgumentException("destinationName is null");
+        }
+        return destinationName.substring(destinationName.lastIndexOf(":") + 1);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/2ba152d3/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
index 58fc717..642a38f 100644
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
@@ -17,9 +17,11 @@
 package org.apache.camel.component.sjms.batch;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.jmx.DestinationViewMBean;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.LoggingLevel;
+import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.component.sjms.SjmsComponent;
@@ -60,7 +62,7 @@ public class SjmsBatchConsumerTest extends CamelTestSupport {
 
         CamelContext context = new DefaultCamelContext(registry);
         context.addComponent("sjms", sjmsComponent);
-        context.addComponent("sjmsbatch", sjmsBatchComponent);
+        context.addComponent("sjms-batch", sjmsBatchComponent);
         return context;
     }
 
@@ -100,7 +102,7 @@ public class SjmsBatchConsumerTest extends CamelTestSupport {
                 int completionTimeout = 1000;
                 int completionSize = 200;
 
-                fromF("sjmsbatch:%s?completionTimeout=%s&completionSize=%s" +
+                fromF("sjms-batch:%s?completionTimeout=%s&completionSize=%s" +
                         "&consumerCount=%s&aggregationStrategy=#testStrategy",
                         queueName, completionTimeout, completionSize, consumerCount)
                         .routeId("batchConsumer").startupOrder(10).autoStartup(false)
@@ -138,7 +140,7 @@ public class SjmsBatchConsumerTest extends CamelTestSupport {
         context.addRoutes(new TransactedSendHarness(queueName));
         context.addRoutes(new RouteBuilder() {
             public void configure() throws Exception {
-                fromF("sjmsbatch:%s?completionTimeout=%s&completionSize=%s&aggregationStrategy=#testStrategy",
+                fromF("sjms-batch:%s?completionTimeout=%s&completionSize=%s&aggregationStrategy=#testStrategy",
                         queueName, completionTimeout, completionSize).routeId("batchConsumer").startupOrder(10)
                         .log(LoggingLevel.DEBUG, "${body.size}")
                         .to("mock:batches");
@@ -163,7 +165,7 @@ public class SjmsBatchConsumerTest extends CamelTestSupport {
         context.addRoutes(new TransactedSendHarness(queueName));
         context.addRoutes(new RouteBuilder() {
             public void configure() throws Exception {
-                fromF("sjmsbatch:%s?completionTimeout=%s&completionSize=%s&aggregationStrategy=#testStrategy",
+                fromF("sjms-batch:%s?completionTimeout=%s&completionSize=%s&aggregationStrategy=#testStrategy",
                         queueName, completionTimeout, completionSize).routeId("batchConsumer").startupOrder(10)
                         .to("mock:batches");
             }
@@ -199,11 +201,11 @@ public class SjmsBatchConsumerTest extends CamelTestSupport {
                         .toF("sjms:%s", queueName + "B")
                     .end();
 
-                fromF("sjmsbatch:%s?completionTimeout=%s&completionSize=%s&aggregationStrategy=#testStrategy",
+                fromF("sjms-batch:%s?completionTimeout=%s&completionSize=%s&aggregationStrategy=#testStrategy",
                         queueName + "A", completionTimeout, completionSize).routeId("batchConsumerA")
                         .to("mock:outA");
 
-                fromF("sjmsbatch:%s?completionTimeout=%s&completionSize=%s&aggregationStrategy=#testStrategy",
+                fromF("sjms-batch:%s?completionTimeout=%s&completionSize=%s&aggregationStrategy=#testStrategy",
                         queueName + "B", completionTimeout, completionSize).routeId("batchConsumerB")
                         .to("mock:outB");
 
@@ -226,6 +228,39 @@ public class SjmsBatchConsumerTest extends CamelTestSupport {
         assertFirstMessageBodyOfLength(mockOutB, messageCount);
     }
 
+    @Test
+    public void testConsumption_rollback() throws Exception {
+        final int completionTimeout = 2000;
+        final int completionSize = 5;
+
+        final String queueName = getQueueName();
+        context.addRoutes(new TransactedSendHarness(queueName));
+        context.addRoutes(new RouteBuilder() {
+            public void configure() throws Exception {
+                fromF("sjms-batch:%s?completionTimeout=%s&completionSize=%s&aggregationStrategy=#testStrategy",
+                        queueName, completionTimeout, completionSize).routeId("batchConsumer").startupOrder(10)
+                        .to("mock:batches");
+            }
+        });
+        context.start();
+
+        int messageCount = 5;
+        MockEndpoint mockBatches = getMockEndpoint("mock:batches");
+        // the first time around, the batch should throw an exception
+        mockBatches.whenExchangeReceived(1, new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                throw new RuntimeException("Boom!");
+            }
+        });
+        // so the batch should be processed twice due to redelivery
+        mockBatches.expectedMessageCount(2);
+
+        template.sendBody("direct:in", generateStrings(messageCount));
+        mockBatches.assertIsSatisfied();
+
+    }
+
     private void assertFirstMessageBodyOfLength(MockEndpoint mockEndpoint, int expectedLength) {
         Exchange exchange = mockEndpoint.getExchanges().get(0);
         assertEquals(expectedLength, exchange.getIn().getBody(List.class).size());
@@ -233,7 +268,7 @@ public class SjmsBatchConsumerTest extends CamelTestSupport {
 
     private String getQueueName() {
         SimpleDateFormat sdf = new SimpleDateFormat("yyMMddhhmmss");
-        return "sjmsbatch-" + sdf.format(new Date());
+        return "sjms-batch-" + sdf.format(new Date());
     }
 
     private String[] generateStrings(int messageCount) {

http://git-wip-us.apache.org/repos/asf/camel/blob/2ba152d3/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpointTest.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpointTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpointTest.java
index 7a75e76..c97d0dd 100644
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpointTest.java
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpointTest.java
@@ -18,6 +18,8 @@ package org.apache.camel.component.sjms.batch;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.camel.CamelContext;
+import org.apache.camel.FailedToCreateProducerException;
+import org.apache.camel.FailedToCreateRouteException;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.sjms.SjmsComponent;
 import org.apache.camel.impl.DefaultCamelContext;
@@ -68,7 +70,7 @@ public class SjmsBatchEndpointTest extends CamelTestSupport {
         sjmsBatchComponent.setConnectionFactory(connectionFactory);
 
         CamelContext context = new DefaultCamelContext(registry);
-        context.addComponent("sjmsbatch", sjmsBatchComponent);
+        context.addComponent("sjms-batch", sjmsBatchComponent);
         context.addComponent("sjms", sjmsComponent);
 
         return context;
@@ -79,11 +81,11 @@ public class SjmsBatchEndpointTest extends CamelTestSupport {
         return true;
     }
 
-    @Test(expected = org.apache.camel.FailedToCreateProducerException.class)
+    @Test(expected = FailedToCreateProducerException.class)
     public void testProducerFailure() throws Exception {
         context.addRoutes(new RouteBuilder() {
             public void configure() throws Exception {
-                from("direct:in").to("sjmsbatch:testQueue");
+                from("direct:in").to("sjms-batch:testQueue");
             }
         });
         context.start();
@@ -94,7 +96,7 @@ public class SjmsBatchEndpointTest extends CamelTestSupport {
         context.addRoutes(new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("sjmsbatch:in?aggregationStrategy=#aggStrategy&pollDuration=-1")
+                from("sjms-batch:in?aggregationStrategy=#aggStrategy&pollDuration=-1")
                     .to("mock:out");
             }
         });
@@ -106,11 +108,22 @@ public class SjmsBatchEndpointTest extends CamelTestSupport {
         context.addRoutes(new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("sjmsbatch:in?aggregationStrategy=#aggStrategy&consumerCount=-1")
+                from("sjms-batch:in?aggregationStrategy=#aggStrategy&consumerCount=-1")
                         .to("mock:out");
             }
         });
         context.start();
     }
 
+    @Test(expected = FailedToCreateRouteException.class)
+    public void testConsumer_topic() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("sjms-batch:topic:in?aggregationStrategy=#aggStrategy")
+                        .to("mock:out");
+            }
+        });
+        context.start();
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/2ba152d3/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/DestinationNameParserTest.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/DestinationNameParserTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/DestinationNameParserTest.java
new file mode 100644
index 0000000..3fb4968
--- /dev/null
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/DestinationNameParserTest.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.jms;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+/**
+ * @author jkorab
+ */
+public class DestinationNameParserTest {
+
+    @Test
+    public void testIsTopic() throws Exception {
+        DestinationNameParser parser = new DestinationNameParser();
+        assertTrue(parser.isTopic("topic:foo"));
+        assertFalse(parser.isTopic("queue:bar"));
+        assertFalse(parser.isTopic("bar"));
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testIsTopic_nullDestinationName() throws Exception {
+        DestinationNameParser parser = new DestinationNameParser();
+        parser.isTopic(null);
+    }
+
+    @Test
+    public void testGetShortName() throws Exception {
+        DestinationNameParser parser = new DestinationNameParser();
+        assertEquals("foo", parser.getShortName("topic:foo"));
+        assertFalse("bar", parser.isTopic("queue:bar"));
+        assertFalse("bar", parser.isTopic("bar"));
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testGetShortName_nullDestinationName() throws Exception {
+        DestinationNameParser parser = new DestinationNameParser();
+        parser.getShortName(null);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/2ba152d3/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProducerQoSTest.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProducerQoSTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProducerQoSTest.java
index c82842a..beef0a9 100644
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProducerQoSTest.java
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProducerQoSTest.java
@@ -16,14 +16,14 @@
  */
 package org.apache.camel.component.sjms.producer;
 
-import java.util.concurrent.TimeUnit;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.jmx.DestinationViewMBean;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.camel.builder.NotifyBuilder;
+import org.apache.camel.EndpointInject;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.component.sjms.support.JmsTestSupport;
 import org.junit.Test;
 
@@ -33,12 +33,14 @@ public class QueueProducerQoSTest extends JmsTestSupport {
     private static final String TEST_INOUT_DESTINATION_NAME = "queue.producer.test.qos.inout";
 
     private static final String EXPIRED_MESSAGE_ROUTE_ID = "expiredAdvisoryRoute";
+    public static final String MOCK_EXPIRED_ADVISORY = "mock:expiredAdvisory";
+
+    @EndpointInject(uri = MOCK_EXPIRED_ADVISORY)
+    MockEndpoint mockExpiredAdvisory;
 
     @Test
     public void testInOutQueueProducerTTL() throws Exception {
-
-        NotifyBuilder expireMatcher = new NotifyBuilder(context)
-                .fromRoute(EXPIRED_MESSAGE_ROUTE_ID).whenCompleted(1).create();
+        mockExpiredAdvisory.expectedMessageCount(1);
 
         String endpoint = String.format("sjms:queue:%s?ttl=1000&exchangePattern=InOut&responseTimeOut=500", TEST_INOUT_DESTINATION_NAME);
 
@@ -51,8 +53,7 @@ public class QueueProducerQoSTest extends JmsTestSupport {
             // we're just interested in the message becoming expired
         }
 
-        // we should delay a bit so broker can run its expiration processes...
-        assertFalse(expireMatcher.matches(2, TimeUnit.SECONDS));
+        assertMockEndpointsSatisfied();
 
         DestinationViewMBean queue = getQueueMBean(TEST_INOUT_DESTINATION_NAME);
         assertEquals("There were unexpected messages left in the queue: " + TEST_INOUT_DESTINATION_NAME,
@@ -61,14 +62,12 @@ public class QueueProducerQoSTest extends JmsTestSupport {
 
     @Test
     public void testInOnlyQueueProducerTTL() throws Exception {
-        NotifyBuilder expireMatcher = new NotifyBuilder(context)
-                .fromRoute(EXPIRED_MESSAGE_ROUTE_ID).whenCompleted(1).create();
+        mockExpiredAdvisory.expectedMessageCount(1);
 
         String endpoint = String.format("sjms:queue:%s?ttl=1000", TEST_INONLY_DESTINATION_NAME);
         template.sendBody(endpoint, "test message");
 
-        // we should delay a bit so broker can run its expiration processes...
-        assertFalse(expireMatcher.matches(2, TimeUnit.SECONDS));
+        assertMockEndpointsSatisfied();
 
         DestinationViewMBean queue = getQueueMBean(TEST_INONLY_DESTINATION_NAME);
         assertEquals("There were unexpected messages left in the queue: " + TEST_INONLY_DESTINATION_NAME,
@@ -102,7 +101,7 @@ public class QueueProducerQoSTest extends JmsTestSupport {
             public void configure() throws Exception {
                 from("sjms:topic:ActiveMQ.Advisory.Expired.Queue.>")
                         .routeId(EXPIRED_MESSAGE_ROUTE_ID)
-                        .to("mock:expiredAdvisory");
+                        .to(MOCK_EXPIRED_ADVISORY);
             }
         };
     }


[2/3] camel git commit: Merged in sjms-batch component.

Posted by da...@apache.org.
Merged in sjms-batch component.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ab1d1dd7
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ab1d1dd7
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ab1d1dd7

Branch: refs/heads/master
Commit: ab1d1dd78fe53edb50c4ede447e4ac5a55ee2ac9
Parents: 65f9a3a
Author: jkorab <ja...@gmail.com>
Authored: Thu Jul 16 11:32:29 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Jul 17 14:56:22 2015 +0200

----------------------------------------------------------------------
 components/camel-sjms/pom.xml                   |   6 +-
 .../component/sjms/batch/SessionCompletion.java |  61 ++++
 .../sjms/batch/SjmsBatchComponent.java          |  53 ++++
 .../component/sjms/batch/SjmsBatchConsumer.java | 306 +++++++++++++++++++
 .../component/sjms/batch/SjmsBatchEndpoint.java | 133 ++++++++
 .../sjms/consumer/AbstractMessageHandler.java   |   3 +-
 .../component/sjms/jms/JmsMessageHelper.java    |  21 +-
 .../org/apache/camel/component/sjms-batch       |  18 ++
 .../sjms/batch/EmbeddedActiveMQBroker.java      |  74 +++++
 .../sjms/batch/ListAggregationStrategy.java     |  43 +++
 .../sjms/batch/SjmsBatchConsumerTest.java       | 247 +++++++++++++++
 .../sjms/batch/SjmsBatchEndpointTest.java       | 116 +++++++
 12 files changed, 1075 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-sjms/pom.xml b/components/camel-sjms/pom.xml
index 9778517..6977796 100644
--- a/components/camel-sjms/pom.xml
+++ b/components/camel-sjms/pom.xml
@@ -32,7 +32,8 @@
     <properties>
         <camel.osgi.export.pkg>
             org.apache.camel.component.sjms,
-            org.apache.camel.component.sjms.jms
+            org.apache.camel.component.sjms.jms,
+            org.apache.camel.component.sjms.batch
         </camel.osgi.export.pkg>
         <camel.osgi.private.pkg>
             org.apache.camel.component.sjms.consumer,
@@ -41,7 +42,8 @@
             org.apache.camel.component.sjms.tx
         </camel.osgi.private.pkg>
         <camel.osgi.export.service>
-            org.apache.camel.spi.ComponentResolver;component=sjms
+            org.apache.camel.spi.ComponentResolver;component=sjms,
+            org.apache.camel.spi.ComponentResolver;component=sjms-batch
         </camel.osgi.export.service>
     </properties>
 

http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SessionCompletion.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SessionCompletion.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SessionCompletion.java
new file mode 100644
index 0000000..27f06e6
--- /dev/null
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SessionCompletion.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.batch;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.spi.Synchronization;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.JMSException;
+import javax.jms.Session;
+
+/**
+ * @author jkorab
+ */
+class SessionCompletion implements Synchronization {
+    private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+    private final Session session;
+
+    public SessionCompletion(Session session) {
+        assert (session != null);
+        this.session = session;
+    }
+
+    @Override
+    public void onComplete(Exchange exchange) {
+        try {
+            log.debug("Committing");
+            session.commit();
+        } catch (JMSException ex) {
+            log.error("Exception caught while committing: {}", ex.getMessage());
+            exchange.setException(ex);
+        }
+    }
+
+    @Override
+    public void onFailure(Exchange exchange) {
+        try {
+            log.debug("Rolling back");
+            session.rollback();
+        } catch (JMSException ex) {
+            log.error("Exception caught while rolling back: {}", ex.getMessage());
+            exchange.setException(ex);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java
new file mode 100644
index 0000000..04b875d
--- /dev/null
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.batch;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.impl.UriEndpointComponent;
+import org.apache.camel.util.ObjectHelper;
+
+import javax.jms.ConnectionFactory;
+import java.util.Map;
+
+/**
+ * @author jkorab
+ */
+public class SjmsBatchComponent extends UriEndpointComponent {
+
+    private ConnectionFactory connectionFactory;
+
+    public SjmsBatchComponent() {
+        super(SjmsBatchEndpoint.class);
+    }
+
+    @Override
+    protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+        ObjectHelper.notNull(connectionFactory, "connectionFactory is null");
+        SjmsBatchEndpoint sjmsBatchEndpoint = new SjmsBatchEndpoint(uri, this, remaining);
+        setProperties(sjmsBatchEndpoint, parameters);
+        return sjmsBatchEndpoint;
+    }
+
+    public ConnectionFactory getConnectionFactory() {
+        return connectionFactory;
+    }
+
+    public void setConnectionFactory(ConnectionFactory connectionFactory) {
+        this.connectionFactory = connectionFactory;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
new file mode 100644
index 0000000..613d471
--- /dev/null
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.batch;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.component.sjms.jms.JmsMessageHelper;
+import org.apache.camel.impl.DefaultConsumer;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.spi.Synchronization;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.*;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Date;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * @author jkorab
+ */
+public class SjmsBatchConsumer extends DefaultConsumer {
+    private static final boolean TRANSACTED = true;
+    private final Logger LOG = LoggerFactory.getLogger(SjmsBatchConsumer.class);
+
+    private final SjmsBatchEndpoint sjmsBatchEndpoint;
+    private final AggregationStrategy aggregationStrategy;
+
+    private final int completionSize;
+    private final int completionTimeout;
+    private final int consumerCount;
+    private final int pollDuration;
+
+    private final ConnectionFactory connectionFactory;
+    private final String destinationName;
+    private final Processor processor;
+
+    private static AtomicInteger batchCount = new AtomicInteger();
+    private static AtomicLong messagesReceived = new AtomicLong();
+    private static AtomicLong messagesProcessed = new AtomicLong();
+    private ExecutorService jmsConsumerExecutors;
+
+    public SjmsBatchConsumer(SjmsBatchEndpoint sjmsBatchEndpoint, Processor processor) {
+        super(sjmsBatchEndpoint, processor);
+
+        this.sjmsBatchEndpoint = ObjectHelper.notNull(sjmsBatchEndpoint, "batchJmsEndpoint");
+        this.processor = ObjectHelper.notNull(processor, "processor");
+
+        destinationName = ObjectHelper.notEmpty(sjmsBatchEndpoint.getDestinationName(), "destinationName");
+
+        completionSize = sjmsBatchEndpoint.getCompletionSize();
+        completionTimeout = sjmsBatchEndpoint.getCompletionTimeout();
+        pollDuration = sjmsBatchEndpoint.getPollDuration();
+        if (pollDuration < 0) {
+            throw new IllegalArgumentException("pollDuration must be 0 or greater");
+        }
+
+        this.aggregationStrategy = ObjectHelper.notNull(sjmsBatchEndpoint.getAggregationStrategy(), "aggregationStrategy");
+
+        consumerCount = sjmsBatchEndpoint.getConsumerCount();
+        if (consumerCount <= 0) {
+            throw new IllegalArgumentException("consumerCount must be greater than 0");
+        }
+
+        SjmsBatchComponent sjmsBatchComponent = (SjmsBatchComponent) sjmsBatchEndpoint.getComponent();
+        connectionFactory = ObjectHelper.notNull(sjmsBatchComponent.getConnectionFactory(), "jmsBatchComponent.connectionFactory");
+    }
+
+    @Override
+    public Endpoint getEndpoint() {
+        return sjmsBatchEndpoint;
+    }
+
+    private final AtomicBoolean running = new AtomicBoolean(true);
+    private final AtomicReference<CountDownLatch> consumersShutdownLatchRef = new AtomicReference<>();
+
+    private Connection connection;
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+
+        // start up a shared connection
+        try {
+            connection = connectionFactory.createConnection();
+            connection.start();
+        } catch (JMSException ex) {
+            LOG.error("Exception caught closing connection: {}", getStackTrace(ex));
+            return;
+        }
+
+        if (LOG.isInfoEnabled()) {
+            LOG.info("Starting " + consumerCount + " consumer(s) for " + destinationName + ":" + completionSize);
+        }
+        consumersShutdownLatchRef.set(new CountDownLatch(consumerCount));
+
+        jmsConsumerExecutors = getEndpoint().getCamelContext().getExecutorServiceManager()
+                .newFixedThreadPool(this, "SjmsBatchConsumer", consumerCount);
+        for (int i = 0; i < consumerCount; i++) {
+            jmsConsumerExecutors.execute(new BatchConsumptionLoop());
+        }
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+        running.set(false);
+        CountDownLatch consumersShutdownLatch = consumersShutdownLatchRef.get();
+        if (consumersShutdownLatch != null) {
+            LOG.info("Stop signalled, waiting on consumers to shut down");
+            if (consumersShutdownLatch.await(60, TimeUnit.SECONDS)) {
+                LOG.warn("Timeout waiting on consumer threads to signal completion - shutting down");
+            } else {
+                LOG.info("All consumers have shut down");
+            }
+        } else {
+            LOG.info("Stop signalled while there are no consumers yet, so no need to wait for consumers");
+        }
+
+        try {
+            LOG.debug("Shutting down JMS connection");
+            connection.close();
+        } catch (JMSException jex) {
+            LOG.error("Exception caught closing connection: {}", getStackTrace(jex));
+        }
+
+        getEndpoint().getCamelContext().getExecutorServiceManager()
+                .shutdown(jmsConsumerExecutors);
+    }
+
+    private String getStackTrace(Exception ex) {
+        StringWriter writer = new StringWriter();
+        ex.printStackTrace(new PrintWriter(writer));
+        return writer.toString();
+    }
+
+    private class BatchConsumptionLoop implements Runnable {
+        @Override
+        public void run() {
+            try {
+                // a batch corresponds to a single session that will be committed or rolled back by a background thread
+                final Session session = connection.createSession(TRANSACTED, Session.CLIENT_ACKNOWLEDGE);
+                try {
+                    // destinationName only creates queues; there is no additional value to be gained
+                    // by transactionally consuming topic messages in batches
+                    Queue queue = session.createQueue(destinationName);
+                    MessageConsumer consumer = session.createConsumer(queue);
+                    try {
+                        consumeBatchesOnLoop(session, consumer);
+                    } finally {
+                        try {
+                            consumer.close();
+                        } catch (JMSException ex2) {
+                            log.error("Exception caught closing consumer: {}", ex2.getMessage());
+
+                        }
+                    }
+                } finally {
+                    try {
+                        session.close();
+                    } catch (JMSException ex1) {
+                        log.error("Exception caught closing session: {}", ex1.getMessage());
+                    }
+                }
+            } catch (JMSException ex) {
+                // from loop
+                LOG.error("Exception caught consuming from {}: {}", destinationName, getStackTrace(ex));
+            } finally {
+                // indicate that we have shut down
+                CountDownLatch consumersShutdownLatch = consumersShutdownLatchRef.get();
+                consumersShutdownLatch.countDown();
+            }
+        }
+
+        private void consumeBatchesOnLoop(Session session, MessageConsumer consumer) throws JMSException {
+            final boolean usingTimeout = completionTimeout > 0;
+
+            batchConsumption:
+            while (running.get()) {
+                int messageCount = 0;
+
+                // reset the clock counters
+                long timeElapsed = 0;
+                long startTime = 0;
+                Exchange aggregatedExchange = null;
+
+                batch:
+                while ((completionSize <= 0) || (messageCount < completionSize)) {
+                    // check periodically to see whether we should be shutting down
+                    long waitTime = (usingTimeout && (timeElapsed > 0))
+                            ? getReceiveWaitTime(timeElapsed)
+                            : pollDuration;
+                    Message message = consumer.receive(waitTime);
+
+                    if (running.get()) { // no interruptions received
+                        if (message == null) {
+                            // timed out, no message received
+                            LOG.trace("No message received");
+                        } else {
+                            if ((usingTimeout) && (messageCount == 0)) { // this is the first message
+                                startTime = new Date().getTime(); // start counting down the period for this batch
+                            }
+                            messageCount++;
+                            LOG.debug("Message received: {}", messageCount);
+                            if ((message instanceof ObjectMessage)
+                                    || (message instanceof TextMessage)) {
+                                Exchange exchange = JmsMessageHelper.createExchange(message, getEndpoint());
+                                aggregatedExchange = aggregationStrategy.aggregate(aggregatedExchange, exchange);
+                                aggregatedExchange.setProperty(SjmsBatchEndpoint.PROPERTY_BATCH_SIZE, messageCount);
+                            } else {
+                                throw new IllegalArgumentException("Unexpected message type: "
+                                        + message.getClass().toString());
+                            }
+                        }
+
+                        if ((usingTimeout) && (startTime > 0)) {
+                            // a batch has been started, check whether it should be timed out
+                            long currentTime = new Date().getTime();
+                            timeElapsed = currentTime - startTime;
+
+                            if (timeElapsed > completionTimeout) {
+                                // batch finished by timeout
+                                break batch;
+                            }
+                        }
+
+                    } else {
+                        LOG.info("Shutdown signal received - rolling batch back");
+                        session.rollback();
+                        break batchConsumption;
+                    }
+                } // batch
+                assert (aggregatedExchange != null);
+                process(aggregatedExchange, session);
+            }
+        }
+
+        /**
+         * Determine the time that a call to {@link MessageConsumer#receive()} should wait given the time that has elapsed for this batch.
+         * @param timeElapsed The time that has elapsed.
+         * @return The shorter of the time remaining or poll duration.
+         */
+        private long getReceiveWaitTime(long timeElapsed) {
+            long timeRemaining = getTimeRemaining(timeElapsed);
+
+            // wait for the shorter of the time remaining or the poll duration
+            if (timeRemaining <= 0) { // ensure that the thread doesn't wait indefinitely
+                timeRemaining = 1;
+            }
+            final long waitTime = (timeRemaining > pollDuration) ? pollDuration : timeRemaining;
+
+            LOG.debug("waiting for {}", waitTime);
+            return waitTime;
+        }
+
+        private long getTimeRemaining(long timeElapsed) {
+            long timeRemaining = completionTimeout - timeElapsed;
+            if (LOG.isDebugEnabled() && (timeElapsed > 0)) {
+                LOG.debug("Time remaining this batch: {}", timeRemaining);
+            }
+            return timeRemaining;
+        }
+
+        private void process(Exchange exchange, Session session) {
+            assert (exchange != null);
+            int id = batchCount.getAndIncrement();
+            int batchSize = exchange.getProperty(SjmsBatchEndpoint.PROPERTY_BATCH_SIZE, Integer.class);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Processing batch:size={}:total={}", batchSize, messagesReceived.addAndGet(batchSize));
+            }
+
+            Synchronization committing = new SessionCompletion(session);
+            exchange.addOnCompletion(committing);
+            try {
+                processor.process(exchange);
+                LOG.debug("Completed processing[{}]:total={}", id, messagesProcessed.addAndGet(batchSize));
+            } catch (Exception e) {
+                LOG.error("Error processing exchange: {}", e.getMessage());
+            }
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
new file mode 100644
index 0000000..afd5cbe
--- /dev/null
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.batch;
+
+import org.apache.camel.Component;
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriPath;
+
+/**
+ * @author jkorab
+ */
+@UriEndpoint(scheme = "sjmsBatch", title = "Simple JMS Batch Component", syntax = "sjms-batch:destinationName?aggregationStrategy=#aggStrategy", consumerClass = SjmsBatchComponent.class, label = "messaging")
+public class SjmsBatchEndpoint extends DefaultEndpoint {
+
+    public static final int DEFAULT_COMPLETION_SIZE = 200; // the default dispatch queue size in ActiveMQ
+    public static final int DEFAULT_COMPLETION_TIMEOUT = 500;
+    public static final String PROPERTY_BATCH_SIZE = "CamelSjmsBatchSize";
+
+    @UriPath
+    @Metadata(required = "true")
+    private String destinationName;
+
+    @UriParam(label = "consumer", defaultValue = "1", description = "The number of JMS sessions to consume from")
+    private Integer consumerCount = 1;
+
+    @UriParam(label = "consumer", defaultValue = "200",
+            description = "The number of messages consumed at which the batch will be completed")
+    private Integer completionSize = DEFAULT_COMPLETION_SIZE;
+
+    @UriParam(label = "consumer", defaultValue = "500",
+            description = "The timeout from receipt of the first first message when the batch will be completed")
+    private Integer completionTimeout = DEFAULT_COMPLETION_TIMEOUT;
+
+    @UriParam(label = "consumer", defaultValue = "1000",
+            description = "The duration in milliseconds of each poll for messages. " +
+                    "completionTimeOut will be used if it is shorter and a batch has started.")
+    private Integer pollDuration = 1000;
+
+    @Metadata(required = "true")
+    @UriParam(label = "consumer", description = "A #-reference to an AggregationStrategy visible to Camel")
+    private AggregationStrategy aggregationStrategy;
+
+    public SjmsBatchEndpoint() {}
+
+    public SjmsBatchEndpoint(String endpointUri, Component component, String remaining) {
+        super(endpointUri, component);
+        this.destinationName = remaining;
+    }
+
+    @Override
+    public boolean isSingleton() {
+        return true;
+    }
+
+    @Override
+    public Producer createProducer() throws Exception {
+        throw new UnsupportedOperationException("Cannot produce though a " + SjmsBatchEndpoint.class.getName());
+    }
+
+    @Override
+    public Consumer createConsumer(Processor processor) throws Exception {
+        return new SjmsBatchConsumer(this, processor);
+    }
+
+    public AggregationStrategy getAggregationStrategy() {
+        return aggregationStrategy;
+    }
+
+    public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
+        this.aggregationStrategy = aggregationStrategy;
+    }
+
+    public Integer getCompletionSize() {
+        return completionSize;
+    }
+
+    public void setCompletionSize(Integer completionSize) {
+        this.completionSize = completionSize;
+    }
+
+    public Integer getCompletionTimeout() {
+        return completionTimeout;
+    }
+
+    public void setCompletionTimeout(Integer completionTimeout) {
+        this.completionTimeout = completionTimeout;
+    }
+
+    public String getDestinationName() {
+        return destinationName;
+    }
+
+    public void setDestinationName(String destinationName) {
+        this.destinationName = destinationName;
+    }
+
+    public Integer getConsumerCount() {
+        return consumerCount;
+    }
+
+    public void setConsumerCount(Integer consumerCount) {
+        this.consumerCount = consumerCount;
+    }
+
+    public Integer getPollDuration() {
+        return pollDuration;
+    }
+
+    public void setPollDuration(Integer pollDuration) {
+        this.pollDuration = pollDuration;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java
index c3c5e55..1598a43 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java
@@ -70,7 +70,8 @@ public abstract class AbstractMessageHandler implements MessageListener {
     public void onMessage(Message message) {
         RuntimeCamelException rce = null;
         try {
-            final DefaultExchange exchange = (DefaultExchange) JmsMessageHelper.createExchange(message, getEndpoint());
+            SjmsEndpoint endpoint = (SjmsEndpoint) getEndpoint();
+            final DefaultExchange exchange = (DefaultExchange) JmsMessageHelper.createExchange(message, endpoint, endpoint.getJmsKeyFormatStrategy());
 
             log.debug("Processing Exchange.id:{}", exchange.getExchangeId());
 

http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java
index dcccd9b..79787c9 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java
@@ -62,8 +62,23 @@ public final class JmsMessageHelper implements JmsConstants {
     }
 
     public static Exchange createExchange(Message message, Endpoint endpoint) {
+        return createExchange(message, endpoint, null);
+    }
+
+    /**
+     * Creates an Exchange from a JMS Message.
+     * @param message The JMS message.
+     * @param endpoint The Endpoint to use to create the Exchange object.
+     * @param keyFormatStrategy the a {@link KeyFormatStrategy} to used to
+     *                          format keys in a JMS 1.1 compliant manner. If null the
+     *                          {@link DefaultJmsKeyFormatStrategy} will be used.
+     * @return Populated Exchange.
+     */
+    public static Exchange createExchange(Message message, Endpoint endpoint, KeyFormatStrategy keyFormatStrategy) {
         Exchange exchange = endpoint.createExchange();
-        return populateExchange(message, exchange, false, ((SjmsEndpoint)endpoint).getJmsKeyFormatStrategy());
+        KeyFormatStrategy initialisedKeyFormatStrategy = (keyFormatStrategy == null)
+                ? new DefaultJmsKeyFormatStrategy() : keyFormatStrategy;
+        return populateExchange(message, exchange, false, initialisedKeyFormatStrategy);
     }
 
     @SuppressWarnings("unchecked")
@@ -222,11 +237,11 @@ public final class JmsMessageHelper implements JmsConstants {
      * @param jmsMessage        the {@link Message} to add or update the headers on
      * @param messageHeaders    a {@link Map} of String/Object pairs
      * @param keyFormatStrategy the a {@link KeyFormatStrategy} to used to
-     *                          format keys in a JMS 1.1 compliant manner. If null the
-     *                          {@link DefaultJmsKeyFormatStrategy} will be used.
+     *                          format keys in a JMS 1.1 compliant manner.
      * @return {@link Message}
      */
     private static Message setJmsMessageHeaders(final Message jmsMessage, Map<String, Object> messageHeaders, KeyFormatStrategy keyFormatStrategy) throws IllegalHeaderException {
+
         Map<String, Object> headers = new HashMap<String, Object>(messageHeaders);
         for (final Map.Entry<String, Object> entry : headers.entrySet()) {
             String headerName = entry.getKey();

http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/src/main/resources/META-INF/services/org/apache/camel/component/sjms-batch
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/resources/META-INF/services/org/apache/camel/component/sjms-batch b/components/camel-sjms/src/main/resources/META-INF/services/org/apache/camel/component/sjms-batch
new file mode 100644
index 0000000..9ee9e4c
--- /dev/null
+++ b/components/camel-sjms/src/main/resources/META-INF/services/org/apache/camel/component/sjms-batch
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+class=org.apache.camel.component.sjms.batch.SjmsBatchComponent

http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/EmbeddedActiveMQBroker.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/EmbeddedActiveMQBroker.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/EmbeddedActiveMQBroker.java
new file mode 100644
index 0000000..fd1ed27
--- /dev/null
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/EmbeddedActiveMQBroker.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.batch;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
+import org.apache.camel.test.AvailablePortFinder;
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * JUnit Test aspect that creates an embedded ActiveMQ broker at the beginning of each test and shuts it down after.
+ */
+public class EmbeddedActiveMQBroker extends ExternalResource {
+
+    private final Logger log = LoggerFactory.getLogger(EmbeddedActiveMQBroker.class);
+    private final String brokerId;
+    private BrokerService brokerService;
+    private final String tcpConnectorUri;
+
+    public EmbeddedActiveMQBroker(String brokerId) {
+        if ((brokerId == null) || (brokerId.isEmpty())) {
+            throw new IllegalArgumentException("brokerId is empty");
+        }
+        this.brokerId = brokerId;
+        tcpConnectorUri = "tcp://localhost:" + AvailablePortFinder.getNextAvailable();
+
+        brokerService = new BrokerService();
+        brokerService.setBrokerId(brokerId);
+        brokerService.setPersistent(false);
+        brokerService.setUseJmx(false);
+        try {
+            brokerService.setPersistenceAdapter(new MemoryPersistenceAdapter());
+            brokerService.addConnector(tcpConnectorUri);
+        } catch (Exception e) {
+            throw new RuntimeException("Problem creating brokerService", e);
+        }
+    }
+
+    @Override
+    protected void before() throws Throwable {
+        log.info("Starting embedded broker[{}] on {}", brokerId, tcpConnectorUri);
+        brokerService.start();
+    }
+
+    @Override
+    protected void after() {
+        try {
+            log.info("Stopping embedded broker[{}]", brokerId);
+            brokerService.stop();
+        } catch (Exception e) {
+            throw new RuntimeException("Exception shutting down broker service", e);
+        }
+    }
+
+    public String getTcpConnectorUri() {
+        return tcpConnectorUri;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/ListAggregationStrategy.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/ListAggregationStrategy.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/ListAggregationStrategy.java
new file mode 100644
index 0000000..39774b2
--- /dev/null
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/ListAggregationStrategy.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.batch;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author jkorab
+ */
+public class ListAggregationStrategy implements AggregationStrategy {
+    @Override
+    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+        String body = newExchange.getIn().getBody(String.class);
+        if (oldExchange == null) {
+            List<String> list = new ArrayList<String>();
+            list.add(body);
+            newExchange.getIn().setBody(list);
+            return newExchange;
+        }  else {
+            List<String> list = oldExchange.getIn().getBody(List.class);
+            list.add(body);
+            return oldExchange;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
new file mode 100644
index 0000000..58fc717
--- /dev/null
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.batch;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.LoggingLevel;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.sjms.SjmsComponent;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.SimpleRegistry;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.apache.camel.util.StopWatch;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.ConnectionFactory;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+
+/**
+ * @author jkorab
+ */
+public class SjmsBatchConsumerTest extends CamelTestSupport {
+    private final Logger LOG = LoggerFactory.getLogger(SjmsBatchConsumerTest.class);
+
+    @Rule
+    public EmbeddedActiveMQBroker broker = new EmbeddedActiveMQBroker("localhost");
+
+    @Override
+    public CamelContext createCamelContext() throws Exception {
+        SimpleRegistry registry = new SimpleRegistry();
+        registry.put("testStrategy", new ListAggregationStrategy());
+        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(broker.getTcpConnectorUri());
+
+        SjmsComponent sjmsComponent = new SjmsComponent();
+        sjmsComponent.setConnectionFactory(connectionFactory);
+
+        SjmsBatchComponent sjmsBatchComponent = new SjmsBatchComponent();
+        sjmsBatchComponent.setConnectionFactory(connectionFactory);
+
+        CamelContext context = new DefaultCamelContext(registry);
+        context.addComponent("sjms", sjmsComponent);
+        context.addComponent("sjmsbatch", sjmsBatchComponent);
+        return context;
+    }
+
+    private static class TransactedSendHarness extends RouteBuilder {
+        private final String queueName;
+
+        public TransactedSendHarness(String queueName) {
+            this.queueName = queueName;
+        }
+
+        @Override
+        public void configure() throws Exception {
+            from("direct:in").routeId("harness").startupOrder(20)
+                .split(body())
+                    .toF("sjms:queue:%s?transacted=true", queueName)
+                    .to("mock:before")
+                .end();
+        }
+    }
+
+    @Override
+    public boolean isUseAdviceWith() {
+        return true;
+    }
+
+    @Test
+    public void testConsumption() throws Exception {
+
+        final int messageCount = 10000;
+        final int consumerCount = 5;
+
+        final String queueName = getQueueName();
+        context.addRoutes(new TransactedSendHarness(queueName));
+        context.addRoutes(new RouteBuilder() {
+            public void configure() throws Exception {
+
+                int completionTimeout = 1000;
+                int completionSize = 200;
+
+                fromF("sjmsbatch:%s?completionTimeout=%s&completionSize=%s" +
+                        "&consumerCount=%s&aggregationStrategy=#testStrategy",
+                        queueName, completionTimeout, completionSize, consumerCount)
+                        .routeId("batchConsumer").startupOrder(10).autoStartup(false)
+                    .split(body())
+                    .to("mock:split");
+            }
+        });
+        context.start();
+
+        MockEndpoint mockBefore = getMockEndpoint("mock:before");
+        mockBefore.setExpectedMessageCount(messageCount);
+
+        MockEndpoint mockSplit = getMockEndpoint("mock:split");
+        mockSplit.setExpectedMessageCount(messageCount);
+
+        LOG.info("Sending messages");
+        template.sendBody("direct:in", generateStrings(messageCount));
+        LOG.info("Send complete");
+
+        StopWatch stopWatch = new StopWatch();
+        context.startRoute("batchConsumer");
+        assertMockEndpointsSatisfied();
+        long time = stopWatch.stop();
+
+        LOG.info("Processed {} messages in {} ms", messageCount, time);
+        LOG.info("Average throughput {} msg/s", (long) (messageCount / (time / 1000d)));
+    }
+
+    @Test
+    public void testConsumption_completionSize() throws Exception {
+        final int completionSize = 5;
+        final int completionTimeout = -1; // size-based only
+
+        final String queueName = getQueueName();
+        context.addRoutes(new TransactedSendHarness(queueName));
+        context.addRoutes(new RouteBuilder() {
+            public void configure() throws Exception {
+                fromF("sjmsbatch:%s?completionTimeout=%s&completionSize=%s&aggregationStrategy=#testStrategy",
+                        queueName, completionTimeout, completionSize).routeId("batchConsumer").startupOrder(10)
+                        .log(LoggingLevel.DEBUG, "${body.size}")
+                        .to("mock:batches");
+            }
+        });
+        context.start();
+
+        int messageCount = 100;
+        MockEndpoint mockBatches = getMockEndpoint("mock:batches");
+        mockBatches.expectedMessageCount(messageCount / completionSize);
+
+        template.sendBody("direct:in", generateStrings(messageCount));
+        mockBatches.assertIsSatisfied();
+    }
+
+    @Test
+    public void testConsumption_completionTimeout() throws Exception {
+        final int completionTimeout = 2000;
+        final int completionSize = -1; // timeout-based only
+
+        final String queueName = getQueueName();
+        context.addRoutes(new TransactedSendHarness(queueName));
+        context.addRoutes(new RouteBuilder() {
+            public void configure() throws Exception {
+                fromF("sjmsbatch:%s?completionTimeout=%s&completionSize=%s&aggregationStrategy=#testStrategy",
+                        queueName, completionTimeout, completionSize).routeId("batchConsumer").startupOrder(10)
+                        .to("mock:batches");
+            }
+        });
+        context.start();
+
+        int messageCount = 50;
+        assertTrue(messageCount < SjmsBatchEndpoint.DEFAULT_COMPLETION_SIZE);
+        MockEndpoint mockBatches = getMockEndpoint("mock:batches");
+        mockBatches.expectedMessageCount(1);  // everything batched together
+
+        template.sendBody("direct:in", generateStrings(messageCount));
+        mockBatches.assertIsSatisfied();
+        assertFirstMessageBodyOfLength(mockBatches, messageCount);
+    }
+
+    /**
+     * Checks whether multiple consumer endpoints can operate in parallel.
+     */
+    @Test
+    public void testConsumption_multipleConsumerEndpoints() throws Exception {
+        final int completionTimeout = 2000;
+        final int completionSize = 5;
+
+        final String queueName = getQueueName();
+        context.addRoutes(new RouteBuilder() {
+            public void configure() throws Exception {
+
+                from("direct:in")
+                    .split().body()
+                    .multicast()
+                        .toF("sjms:%s", queueName + "A")
+                        .toF("sjms:%s", queueName + "B")
+                    .end();
+
+                fromF("sjmsbatch:%s?completionTimeout=%s&completionSize=%s&aggregationStrategy=#testStrategy",
+                        queueName + "A", completionTimeout, completionSize).routeId("batchConsumerA")
+                        .to("mock:outA");
+
+                fromF("sjmsbatch:%s?completionTimeout=%s&completionSize=%s&aggregationStrategy=#testStrategy",
+                        queueName + "B", completionTimeout, completionSize).routeId("batchConsumerB")
+                        .to("mock:outB");
+
+            }
+        });
+        context.start();
+
+        int messageCount = 5;
+
+        assertTrue(messageCount < SjmsBatchEndpoint.DEFAULT_COMPLETION_SIZE);
+        MockEndpoint mockOutA = getMockEndpoint("mock:outA");
+        mockOutA.expectedMessageCount(1);  // everything batched together
+        MockEndpoint mockOutB = getMockEndpoint("mock:outB");
+        mockOutB.expectedMessageCount(1);  // everything batched together
+
+        template.sendBody("direct:in", generateStrings(messageCount));
+        assertMockEndpointsSatisfied();
+
+        assertFirstMessageBodyOfLength(mockOutA, messageCount);
+        assertFirstMessageBodyOfLength(mockOutB, messageCount);
+    }
+
+    private void assertFirstMessageBodyOfLength(MockEndpoint mockEndpoint, int expectedLength) {
+        Exchange exchange = mockEndpoint.getExchanges().get(0);
+        assertEquals(expectedLength, exchange.getIn().getBody(List.class).size());
+    }
+
+    private String getQueueName() {
+        SimpleDateFormat sdf = new SimpleDateFormat("yyMMddhhmmss");
+        return "sjmsbatch-" + sdf.format(new Date());
+    }
+
+    private String[] generateStrings(int messageCount) {
+        String[] strings = new String[messageCount];
+        for (int i = 0; i < messageCount; i++) {
+            strings[i] = "message:" + i;
+        }
+        return strings;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpointTest.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpointTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpointTest.java
new file mode 100644
index 0000000..7a75e76
--- /dev/null
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpointTest.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.batch;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.sjms.SjmsComponent;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.SimpleRegistry;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.apache.camel.util.toolbox.AggregationStrategies;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * @author jkorab
+ */
+public class SjmsBatchEndpointTest extends CamelTestSupport {
+
+    // Create one embedded broker instance for the entire test, as we aren't actually
+    // going to send any messages to it; we just need it so that the ConnectionFactory
+    // has something local to connect to.
+    public static EmbeddedActiveMQBroker broker;
+
+    @BeforeClass
+    public static void setupBroker() {
+        broker = new EmbeddedActiveMQBroker("localhost");
+        try {
+            broker.before();
+        } catch (Throwable t) {
+            throw new RuntimeException(t);
+        }
+    }
+
+    @AfterClass
+    public static void shutDownBroker() {
+        broker.after();
+    }
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        SimpleRegistry registry = new SimpleRegistry();
+        registry.put("aggStrategy", AggregationStrategies.groupedExchange());
+
+        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
+        connectionFactory.setBrokerURL(broker.getTcpConnectorUri());
+
+        SjmsComponent sjmsComponent = new SjmsComponent();
+        sjmsComponent.setConnectionFactory(connectionFactory);
+
+        SjmsBatchComponent sjmsBatchComponent = new SjmsBatchComponent();
+        sjmsBatchComponent.setConnectionFactory(connectionFactory);
+
+        CamelContext context = new DefaultCamelContext(registry);
+        context.addComponent("sjmsbatch", sjmsBatchComponent);
+        context.addComponent("sjms", sjmsComponent);
+
+        return context;
+    }
+
+    @Override
+    public boolean isUseAdviceWith() {
+        return true;
+    }
+
+    @Test(expected = org.apache.camel.FailedToCreateProducerException.class)
+    public void testProducerFailure() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            public void configure() throws Exception {
+                from("direct:in").to("sjmsbatch:testQueue");
+            }
+        });
+        context.start();
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testConsumer_negativePollDuration() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("sjmsbatch:in?aggregationStrategy=#aggStrategy&pollDuration=-1")
+                    .to("mock:out");
+            }
+        });
+        context.start();
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testConsumer_negativeConsumerCount() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("sjmsbatch:in?aggregationStrategy=#aggStrategy&consumerCount=-1")
+                        .to("mock:out");
+            }
+        });
+        context.start();
+    }
+
+}


[3/3] camel git commit: Polished and fixed CS and a few other bits

Posted by da...@apache.org.
Polished and fixed CS and a few other bits


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/832a99c5
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/832a99c5
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/832a99c5

Branch: refs/heads/master
Commit: 832a99c546d4fca5510a88136de8fb2002c54d6f
Parents: 2ba152d
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Jul 17 15:25:15 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Jul 17 15:25:15 2015 +0200

----------------------------------------------------------------------
 .../camel/component/sjms/SjmsEndpoint.java      |   7 +-
 .../component/sjms/batch/SessionCompletion.java |  22 ++--
 .../sjms/batch/SjmsBatchComponent.java          |  14 +--
 .../component/sjms/batch/SjmsBatchConsumer.java |  79 +++++++-------
 .../component/sjms/batch/SjmsBatchEndpoint.java | 104 +++++++++----------
 .../sjms/jms/DestinationNameParser.java         |   1 +
 .../sjms/batch/ListAggregationStrategy.java     |  12 +--
 .../sjms/batch/SjmsBatchConsumerTest.java       |  43 ++++----
 .../sjms/batch/SjmsBatchEndpointTest.java       |  11 +-
 .../sjms/jms/DestinationNameParserTest.java     |   4 +-
 .../sjms/producer/QueueProducerQoSTest.java     |   2 +-
 11 files changed, 145 insertions(+), 154 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/832a99c5/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
index 0e8d68a..6ffa513 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
@@ -22,7 +22,12 @@ import org.apache.camel.ExchangePattern;
 import org.apache.camel.MultipleConsumersSupport;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
-import org.apache.camel.component.sjms.jms.*;
+import org.apache.camel.component.sjms.jms.ConnectionResource;
+import org.apache.camel.component.sjms.jms.DefaultDestinationCreationStrategy;
+import org.apache.camel.component.sjms.jms.DestinationCreationStrategy;
+import org.apache.camel.component.sjms.jms.DestinationNameParser;
+import org.apache.camel.component.sjms.jms.KeyFormatStrategy;
+import org.apache.camel.component.sjms.jms.SessionAcknowledgementType;
 import org.apache.camel.component.sjms.producer.InOnlyProducer;
 import org.apache.camel.component.sjms.producer.InOutProducer;
 import org.apache.camel.impl.DefaultEndpoint;

http://git-wip-us.apache.org/repos/asf/camel/blob/832a99c5/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SessionCompletion.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SessionCompletion.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SessionCompletion.java
index 27f06e6..cae90cb 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SessionCompletion.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SessionCompletion.java
@@ -16,34 +16,32 @@
  */
 package org.apache.camel.component.sjms.batch;
 
+import javax.jms.JMSException;
+import javax.jms.Session;
+
 import org.apache.camel.Exchange;
 import org.apache.camel.spi.Synchronization;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.jms.JMSException;
-import javax.jms.Session;
-
-/**
- * @author jkorab
- */
 class SessionCompletion implements Synchronization {
-    private final Logger log = LoggerFactory.getLogger(this.getClass());
+    private static final Logger LOG = LoggerFactory.getLogger(SessionCompletion.class);
 
     private final Session session;
 
+    // TODO: add more details in the commit/rollback eg such as message id
+
     public SessionCompletion(Session session) {
-        assert (session != null);
         this.session = session;
     }
 
     @Override
     public void onComplete(Exchange exchange) {
         try {
-            log.debug("Committing");
+            LOG.debug("Committing");
             session.commit();
         } catch (JMSException ex) {
-            log.error("Exception caught while committing: {}", ex.getMessage());
+            LOG.error("Exception caught while committing: {}", ex.getMessage());
             exchange.setException(ex);
         }
     }
@@ -51,10 +49,10 @@ class SessionCompletion implements Synchronization {
     @Override
     public void onFailure(Exchange exchange) {
         try {
-            log.debug("Rolling back");
+            LOG.debug("Rolling back");
             session.rollback();
         } catch (JMSException ex) {
-            log.error("Exception caught while rolling back: {}", ex.getMessage());
+            LOG.error("Exception caught while rolling back: {}", ex.getMessage());
             exchange.setException(ex);
         }
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/832a99c5/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java
index 04b875d..421fd8a 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java
@@ -16,16 +16,13 @@
  */
 package org.apache.camel.component.sjms.batch;
 
+import java.util.Map;
+import javax.jms.ConnectionFactory;
+
 import org.apache.camel.Endpoint;
 import org.apache.camel.impl.UriEndpointComponent;
 import org.apache.camel.util.ObjectHelper;
 
-import javax.jms.ConnectionFactory;
-import java.util.Map;
-
-/**
- * @author jkorab
- */
 public class SjmsBatchComponent extends UriEndpointComponent {
 
     private ConnectionFactory connectionFactory;
@@ -36,7 +33,7 @@ public class SjmsBatchComponent extends UriEndpointComponent {
 
     @Override
     protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
-        ObjectHelper.notNull(connectionFactory, "connectionFactory is null");
+        ObjectHelper.notNull(connectionFactory, "connectionFactory");
         SjmsBatchEndpoint sjmsBatchEndpoint = new SjmsBatchEndpoint(uri, this, remaining);
         setProperties(sjmsBatchEndpoint, parameters);
         return sjmsBatchEndpoint;
@@ -46,6 +43,9 @@ public class SjmsBatchComponent extends UriEndpointComponent {
         return connectionFactory;
     }
 
+    /**
+     * A ConnectionFactory is required to enable the SjmsBatchComponent.
+     */
     public void setConnectionFactory(ConnectionFactory connectionFactory) {
         this.connectionFactory = connectionFactory;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/832a99c5/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
index ca47c7c..ee2b250 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
@@ -16,16 +16,6 @@
  */
 package org.apache.camel.component.sjms.batch;
 
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
-import org.apache.camel.component.sjms.jms.JmsMessageHelper;
-import org.apache.camel.impl.DefaultConsumer;
-import org.apache.camel.processor.aggregate.AggregationStrategy;
-import org.apache.camel.util.ObjectHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.jms.*;
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.util.Date;
@@ -36,30 +26,47 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.component.sjms.jms.JmsMessageHelper;
+import org.apache.camel.impl.DefaultConsumer;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-/**
- * @author jkorab
- */
 public class SjmsBatchConsumer extends DefaultConsumer {
     private static final boolean TRANSACTED = true;
-    private final Logger LOG = LoggerFactory.getLogger(SjmsBatchConsumer.class);
+    private static final Logger LOG = LoggerFactory.getLogger(SjmsBatchConsumer.class);
+
+    // global counters, maybe they should be on component instead?
+    private static final AtomicInteger BATCH_COUNT = new AtomicInteger();
+    private static final AtomicLong MESSAGE_RECEIVED = new AtomicLong();
+    private static final AtomicLong MESSAGE_PROCESSED = new AtomicLong();
 
     private final SjmsBatchEndpoint sjmsBatchEndpoint;
     private final AggregationStrategy aggregationStrategy;
-
     private final int completionSize;
     private final int completionTimeout;
     private final int consumerCount;
     private final int pollDuration;
-
     private final ConnectionFactory connectionFactory;
     private final String destinationName;
     private final Processor processor;
-
-    private static AtomicInteger batchCount = new AtomicInteger();
-    private static AtomicLong messagesReceived = new AtomicLong();
-    private static AtomicLong messagesProcessed = new AtomicLong();
     private ExecutorService jmsConsumerExecutors;
+    private final AtomicBoolean running = new AtomicBoolean(true);
+    private final AtomicReference<CountDownLatch> consumersShutdownLatchRef = new AtomicReference<>();
+    private Connection connection;
 
     public SjmsBatchConsumer(SjmsBatchEndpoint sjmsBatchEndpoint, Processor processor) {
         super(sjmsBatchEndpoint, processor);
@@ -92,11 +99,6 @@ public class SjmsBatchConsumer extends DefaultConsumer {
         return sjmsBatchEndpoint;
     }
 
-    private final AtomicBoolean running = new AtomicBoolean(true);
-    private final AtomicReference<CountDownLatch> consumersShutdownLatchRef = new AtomicReference<>();
-
-    private Connection connection;
-
     @Override
     protected void doStart() throws Exception {
         super.doStart();
@@ -145,8 +147,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
             LOG.error("Exception caught closing connection: {}", getStackTrace(jex));
         }
 
-        getEndpoint().getCamelContext().getExecutorServiceManager()
-                .shutdown(jmsConsumerExecutors);
+        getEndpoint().getCamelContext().getExecutorServiceManager().shutdown(jmsConsumerExecutors);
     }
 
     private String getStackTrace(Exception ex) {
@@ -174,7 +175,6 @@ public class SjmsBatchConsumer extends DefaultConsumer {
                             consumer.close();
                         } catch (JMSException ex2) {
                             log.error("Exception caught closing consumer: {}", ex2.getMessage());
-
                         }
                     }
                 } finally {
@@ -197,7 +197,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
         private void consumeBatchesOnLoop(Session session, MessageConsumer consumer) throws JMSException {
             final boolean usingTimeout = completionTimeout > 0;
 
-            batchConsumption:
+        batchConsumption:
             while (running.get()) {
                 int messageCount = 0;
 
@@ -206,7 +206,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
                 long startTime = 0;
                 Exchange aggregatedExchange = null;
 
-                batch:
+            batch:
                 while ((completionSize <= 0) || (messageCount < completionSize)) {
                     // check periodically to see whether we should be shutting down
                     long waitTime = (usingTimeout && (timeElapsed > 0))
@@ -219,7 +219,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
                             // timed out, no message received
                             LOG.trace("No message received");
                         } else {
-                            if ((usingTimeout) && (messageCount == 0)) { // this is the first message
+                            if (usingTimeout && messageCount == 0) { // this is the first message
                                 startTime = new Date().getTime(); // start counting down the period for this batch
                             }
                             messageCount++;
@@ -230,12 +230,11 @@ public class SjmsBatchConsumer extends DefaultConsumer {
                                 aggregatedExchange = aggregationStrategy.aggregate(aggregatedExchange, exchange);
                                 aggregatedExchange.setProperty(SjmsBatchEndpoint.PROPERTY_BATCH_SIZE, messageCount);
                             } else {
-                                throw new IllegalArgumentException("Unexpected message type: "
-                                        + message.getClass().toString());
+                                throw new IllegalArgumentException("Unexpected message type: " + message.getClass().toString());
                             }
                         }
 
-                        if ((usingTimeout) && (startTime > 0)) {
+                        if (usingTimeout && startTime > 0) {
                             // a batch has been started, check whether it should be timed out
                             long currentTime = new Date().getTime();
                             timeElapsed = currentTime - startTime;
@@ -252,13 +251,13 @@ public class SjmsBatchConsumer extends DefaultConsumer {
                         break batchConsumption;
                     }
                 } // batch
-                assert (aggregatedExchange != null);
                 process(aggregatedExchange, session);
             }
         }
 
         /**
          * Determine the time that a call to {@link MessageConsumer#receive()} should wait given the time that has elapsed for this batch.
+         *
          * @param timeElapsed The time that has elapsed.
          * @return The shorter of the time remaining or poll duration.
          */
@@ -277,25 +276,25 @@ public class SjmsBatchConsumer extends DefaultConsumer {
 
         private long getTimeRemaining(long timeElapsed) {
             long timeRemaining = completionTimeout - timeElapsed;
-            if (LOG.isDebugEnabled() && (timeElapsed > 0)) {
+            if (LOG.isDebugEnabled() && timeElapsed > 0) {
                 LOG.debug("Time remaining this batch: {}", timeRemaining);
             }
             return timeRemaining;
         }
 
         private void process(Exchange exchange, Session session) {
-            assert (exchange != null);
-            int id = batchCount.getAndIncrement();
+            int id = BATCH_COUNT.getAndIncrement();
             int batchSize = exchange.getProperty(SjmsBatchEndpoint.PROPERTY_BATCH_SIZE, Integer.class);
             if (LOG.isDebugEnabled()) {
-                LOG.debug("Processing batch[" + id + "]:size=" + batchSize + ":total=" + messagesReceived.addAndGet(batchSize));
+                LOG.debug("Processing batch[" + id + "]:size=" + batchSize + ":total=" + MESSAGE_RECEIVED.addAndGet(batchSize));
             }
 
             SessionCompletion sessionCompletion = new SessionCompletion(session);
             exchange.addOnCompletion(sessionCompletion);
             try {
                 processor.process(exchange);
-                LOG.debug("Completed processing[{}]:total={}", id, messagesProcessed.addAndGet(batchSize));
+                long total = MESSAGE_PROCESSED.addAndGet(batchSize);
+                LOG.debug("Completed processing[{}]:total={}", id, total);
             } catch (Exception e) {
                 LOG.error("Error processing exchange: {}", e.getMessage());
             }

http://git-wip-us.apache.org/repos/asf/camel/blob/832a99c5/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
index 5d307a7..b4c052f 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
@@ -28,12 +28,7 @@ import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriPath;
 
-/**
- * @author jkorab
- */
-@UriEndpoint(scheme = "sjmsBatch",
-        title = "Simple JMS Batch Component",
-        syntax = "sjms-batch:destinationName?aggregationStrategy=#aggStrategy",
+@UriEndpoint(scheme = "sjms-batch", title = "Simple JMS Batch Component", syntax = "sjms-batch:destinationName",
         consumerClass = SjmsBatchComponent.class, label = "messaging")
 public class SjmsBatchEndpoint extends DefaultEndpoint {
 
@@ -41,41 +36,29 @@ public class SjmsBatchEndpoint extends DefaultEndpoint {
     public static final int DEFAULT_COMPLETION_TIMEOUT = 500;
     public static final String PROPERTY_BATCH_SIZE = "CamelSjmsBatchSize";
 
-    @UriPath
-    @Metadata(required = "true",
-            description = "The destination name. Only queues are supported, names may be prefixed by 'queue:'.")
+    @UriPath(label = "consumer") @Metadata(required = "true")
     private String destinationName;
-
-    @UriParam(label = "consumer", defaultValue = "1", description = "The number of JMS sessions to consume from")
-    private Integer consumerCount = 1;
-
-    @UriParam(label = "consumer", defaultValue = "200",
-            description = "The number of messages consumed at which the batch will be completed")
-    private Integer completionSize = DEFAULT_COMPLETION_SIZE;
-
-    @UriParam(label = "consumer", defaultValue = "500",
-            description = "The timeout from receipt of the first first message when the batch will be completed")
-    private Integer completionTimeout = DEFAULT_COMPLETION_TIMEOUT;
-
-    @UriParam(label = "consumer", defaultValue = "1000",
-            description = "The duration in milliseconds of each poll for messages. " +
-                    "completionTimeOut will be used if it is shorter and a batch has started.")
-    private Integer pollDuration = 1000;
-
-    @Metadata(required = "true")
-    @UriParam(label = "consumer", description = "A #-reference to an AggregationStrategy visible to Camel")
+    @UriParam(label = "consumer", defaultValue = "1")
+    private int consumerCount = 1;
+    @UriParam(label = "consumer", defaultValue = "200")
+    private int completionSize = DEFAULT_COMPLETION_SIZE;
+    @UriParam(label = "consumer", defaultValue = "500")
+    private int completionTimeout = DEFAULT_COMPLETION_TIMEOUT;
+    @UriParam(label = "consumer", defaultValue = "1000")
+    private int pollDuration = 1000;
+    @UriParam(label = "consumer") @Metadata(required = "true")
     private AggregationStrategy aggregationStrategy;
 
-    private boolean topic;
-
-    public SjmsBatchEndpoint() {}
+    public SjmsBatchEndpoint() {
+    }
 
     public SjmsBatchEndpoint(String endpointUri, Component component, String remaining) {
         super(endpointUri, component);
+
         DestinationNameParser parser = new DestinationNameParser();
         if (parser.isTopic(remaining)) {
-            throw new IllegalArgumentException("Only batch consumption from queues is supported. For topics you " +
-                    "should use a regular JMS consumer with an aggregator.");
+            throw new IllegalArgumentException("Only batch consumption from queues is supported. For topics you "
+                    + "should use a regular JMS consumer with an aggregator.");
         }
         this.destinationName = parser.getShortName(remaining);
     }
@@ -99,47 +82,62 @@ public class SjmsBatchEndpoint extends DefaultEndpoint {
         return aggregationStrategy;
     }
 
+    /**
+     * The aggregation strategy to use, which merges all the batched messages into a single message
+     */
     public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
         this.aggregationStrategy = aggregationStrategy;
     }
 
-    public Integer getCompletionSize() {
-        return completionSize;
-    }
-
-    public void setCompletionSize(Integer completionSize) {
-        this.completionSize = completionSize;
+    /**
+     * The destination name. Only queues are supported, names may be prefixed by 'queue:'.
+     */
+    public String getDestinationName() {
+        return destinationName;
     }
 
-    public Integer getCompletionTimeout() {
-        return completionTimeout;
+    public int getConsumerCount() {
+        return consumerCount;
     }
 
-    public void setCompletionTimeout(Integer completionTimeout) {
-        this.completionTimeout = completionTimeout;
+    /**
+     * The number of JMS sessions to consume from
+     */
+    public void setConsumerCount(int consumerCount) {
+        this.consumerCount = consumerCount;
     }
 
-    public String getDestinationName() {
-        return destinationName;
+    public int getCompletionSize() {
+        return completionSize;
     }
 
-    public void setDestinationName(String destinationName) {
-        this.destinationName = destinationName;
+    /**
+     * The number of messages consumed at which the batch will be completed
+     */
+    public void setCompletionSize(int completionSize) {
+        this.completionSize = completionSize;
     }
 
-    public Integer getConsumerCount() {
-        return consumerCount;
+    public int getCompletionTimeout() {
+        return completionTimeout;
     }
 
-    public void setConsumerCount(Integer consumerCount) {
-        this.consumerCount = consumerCount;
+    /**
+     * The timeout from receipt of the first first message when the batch will be completed
+     */
+    public void setCompletionTimeout(int completionTimeout) {
+        this.completionTimeout = completionTimeout;
     }
 
-    public Integer getPollDuration() {
+    public int getPollDuration() {
         return pollDuration;
     }
 
-    public void setPollDuration(Integer pollDuration) {
+    /**
+     * The duration in milliseconds of each poll for messages.
+     * completionTimeOut will be used if it is shorter and a batch has started.
+     */
+    public void setPollDuration(int pollDuration) {
         this.pollDuration = pollDuration;
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/832a99c5/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DestinationNameParser.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DestinationNameParser.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DestinationNameParser.java
index d248350..ddc213d 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DestinationNameParser.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DestinationNameParser.java
@@ -20,6 +20,7 @@ package org.apache.camel.component.sjms.jms;
  * @author jkorab
  */
 public class DestinationNameParser {
+
     public boolean isTopic(String destinationName) {
         if (destinationName == null) {
             throw new IllegalArgumentException("destinationName is null");

http://git-wip-us.apache.org/repos/asf/camel/blob/832a99c5/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/ListAggregationStrategy.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/ListAggregationStrategy.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/ListAggregationStrategy.java
index 39774b2..d1eb6e5 100644
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/ListAggregationStrategy.java
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/ListAggregationStrategy.java
@@ -16,16 +16,14 @@
  */
 package org.apache.camel.component.sjms.batch;
 
-import org.apache.camel.Exchange;
-import org.apache.camel.processor.aggregate.AggregationStrategy;
-
 import java.util.ArrayList;
 import java.util.List;
 
-/**
- * @author jkorab
- */
+import org.apache.camel.Exchange;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+
 public class ListAggregationStrategy implements AggregationStrategy {
+
     @Override
     public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
         String body = newExchange.getIn().getBody(String.class);
@@ -34,7 +32,7 @@ public class ListAggregationStrategy implements AggregationStrategy {
             list.add(body);
             newExchange.getIn().setBody(list);
             return newExchange;
-        }  else {
+        } else {
             List<String> list = oldExchange.getIn().getBody(List.class);
             list.add(body);
             return oldExchange;

http://git-wip-us.apache.org/repos/asf/camel/blob/832a99c5/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
index 642a38f..76c739b 100644
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
@@ -16,8 +16,12 @@
  */
 package org.apache.camel.component.sjms.batch;
 
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import javax.jms.ConnectionFactory;
+
 import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.jmx.DestinationViewMBean;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.LoggingLevel;
@@ -34,16 +38,8 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.jms.ConnectionFactory;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.List;
-
-/**
- * @author jkorab
- */
 public class SjmsBatchConsumerTest extends CamelTestSupport {
-    private final Logger LOG = LoggerFactory.getLogger(SjmsBatchConsumerTest.class);
+    private static final Logger LOG = LoggerFactory.getLogger(SjmsBatchConsumerTest.class);
 
     @Rule
     public EmbeddedActiveMQBroker broker = new EmbeddedActiveMQBroker("localhost");
@@ -76,10 +72,10 @@ public class SjmsBatchConsumerTest extends CamelTestSupport {
         @Override
         public void configure() throws Exception {
             from("direct:in").routeId("harness").startupOrder(20)
-                .split(body())
+                    .split(body())
                     .toF("sjms:queue:%s?transacted=true", queueName)
                     .to("mock:before")
-                .end();
+                    .end();
         }
     }
 
@@ -102,12 +98,11 @@ public class SjmsBatchConsumerTest extends CamelTestSupport {
                 int completionTimeout = 1000;
                 int completionSize = 200;
 
-                fromF("sjms-batch:%s?completionTimeout=%s&completionSize=%s" +
-                        "&consumerCount=%s&aggregationStrategy=#testStrategy",
+                fromF("sjms-batch:%s?completionTimeout=%s&completionSize=%s&consumerCount=%s&aggregationStrategy=#testStrategy",
                         queueName, completionTimeout, completionSize, consumerCount)
                         .routeId("batchConsumer").startupOrder(10).autoStartup(false)
-                    .split(body())
-                    .to("mock:split");
+                        .split(body())
+                        .to("mock:split");
             }
         });
         context.start();
@@ -132,7 +127,7 @@ public class SjmsBatchConsumerTest extends CamelTestSupport {
     }
 
     @Test
-    public void testConsumption_completionSize() throws Exception {
+    public void testConsumptionCompletionSize() throws Exception {
         final int completionSize = 5;
         final int completionTimeout = -1; // size-based only
 
@@ -157,7 +152,7 @@ public class SjmsBatchConsumerTest extends CamelTestSupport {
     }
 
     @Test
-    public void testConsumption_completionTimeout() throws Exception {
+    public void testConsumptionCompletionTimeout() throws Exception {
         final int completionTimeout = 2000;
         final int completionSize = -1; // timeout-based only
 
@@ -186,7 +181,7 @@ public class SjmsBatchConsumerTest extends CamelTestSupport {
      * Checks whether multiple consumer endpoints can operate in parallel.
      */
     @Test
-    public void testConsumption_multipleConsumerEndpoints() throws Exception {
+    public void testConsumptionMultipleConsumerEndpoints() throws Exception {
         final int completionTimeout = 2000;
         final int completionSize = 5;
 
@@ -196,10 +191,10 @@ public class SjmsBatchConsumerTest extends CamelTestSupport {
 
                 from("direct:in")
                     .split().body()
-                    .multicast()
-                        .toF("sjms:%s", queueName + "A")
-                        .toF("sjms:%s", queueName + "B")
-                    .end();
+                        .multicast()
+                            .toF("sjms:%s", queueName + "A")
+                            .toF("sjms:%s", queueName + "B")
+                        .end();
 
                 fromF("sjms-batch:%s?completionTimeout=%s&completionSize=%s&aggregationStrategy=#testStrategy",
                         queueName + "A", completionTimeout, completionSize).routeId("batchConsumerA")
@@ -229,7 +224,7 @@ public class SjmsBatchConsumerTest extends CamelTestSupport {
     }
 
     @Test
-    public void testConsumption_rollback() throws Exception {
+    public void testConsumptionRollback() throws Exception {
         final int completionTimeout = 2000;
         final int completionSize = 5;
 

http://git-wip-us.apache.org/repos/asf/camel/blob/832a99c5/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpointTest.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpointTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpointTest.java
index c97d0dd..45fe324 100644
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpointTest.java
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpointTest.java
@@ -30,9 +30,6 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-/**
- * @author jkorab
- */
 public class SjmsBatchEndpointTest extends CamelTestSupport {
 
     // Create one embedded broker instance for the entire test, as we aren't actually
@@ -92,19 +89,19 @@ public class SjmsBatchEndpointTest extends CamelTestSupport {
     }
 
     @Test(expected = IllegalArgumentException.class)
-    public void testConsumer_negativePollDuration() throws Exception {
+    public void testConsumerNegativePollDuration() throws Exception {
         context.addRoutes(new RouteBuilder() {
             @Override
             public void configure() throws Exception {
                 from("sjms-batch:in?aggregationStrategy=#aggStrategy&pollDuration=-1")
-                    .to("mock:out");
+                        .to("mock:out");
             }
         });
         context.start();
     }
 
     @Test(expected = IllegalArgumentException.class)
-    public void testConsumer_negativeConsumerCount() throws Exception {
+    public void testConsumerNegativeConsumerCount() throws Exception {
         context.addRoutes(new RouteBuilder() {
             @Override
             public void configure() throws Exception {
@@ -116,7 +113,7 @@ public class SjmsBatchEndpointTest extends CamelTestSupport {
     }
 
     @Test(expected = FailedToCreateRouteException.class)
-    public void testConsumer_topic() throws Exception {
+    public void testConsumerTopic() throws Exception {
         context.addRoutes(new RouteBuilder() {
             @Override
             public void configure() throws Exception {

http://git-wip-us.apache.org/repos/asf/camel/blob/832a99c5/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/DestinationNameParserTest.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/DestinationNameParserTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/DestinationNameParserTest.java
index 3fb4968..7f25239 100644
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/DestinationNameParserTest.java
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/DestinationNameParserTest.java
@@ -34,7 +34,7 @@ public class DestinationNameParserTest {
     }
 
     @Test(expected = IllegalArgumentException.class)
-    public void testIsTopic_nullDestinationName() throws Exception {
+    public void testIsTopicNullDestinationName() throws Exception {
         DestinationNameParser parser = new DestinationNameParser();
         parser.isTopic(null);
     }
@@ -48,7 +48,7 @@ public class DestinationNameParserTest {
     }
 
     @Test(expected = IllegalArgumentException.class)
-    public void testGetShortName_nullDestinationName() throws Exception {
+    public void testGetShortNameNullDestinationName() throws Exception {
         DestinationNameParser parser = new DestinationNameParser();
         parser.getShortName(null);
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/832a99c5/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProducerQoSTest.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProducerQoSTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProducerQoSTest.java
index beef0a9..2a925f0 100644
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProducerQoSTest.java
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProducerQoSTest.java
@@ -33,7 +33,7 @@ public class QueueProducerQoSTest extends JmsTestSupport {
     private static final String TEST_INOUT_DESTINATION_NAME = "queue.producer.test.qos.inout";
 
     private static final String EXPIRED_MESSAGE_ROUTE_ID = "expiredAdvisoryRoute";
-    public static final String MOCK_EXPIRED_ADVISORY = "mock:expiredAdvisory";
+    private static final String MOCK_EXPIRED_ADVISORY = "mock:expiredAdvisory";
 
     @EndpointInject(uri = MOCK_EXPIRED_ADVISORY)
     MockEndpoint mockExpiredAdvisory;