You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/07/17 15:22:01 UTC

[2/3] camel git commit: Merged in sjms-batch component.

Merged in sjms-batch component.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ab1d1dd7
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ab1d1dd7
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ab1d1dd7

Branch: refs/heads/master
Commit: ab1d1dd78fe53edb50c4ede447e4ac5a55ee2ac9
Parents: 65f9a3a
Author: jkorab <ja...@gmail.com>
Authored: Thu Jul 16 11:32:29 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Jul 17 14:56:22 2015 +0200

----------------------------------------------------------------------
 components/camel-sjms/pom.xml                   |   6 +-
 .../component/sjms/batch/SessionCompletion.java |  61 ++++
 .../sjms/batch/SjmsBatchComponent.java          |  53 ++++
 .../component/sjms/batch/SjmsBatchConsumer.java | 306 +++++++++++++++++++
 .../component/sjms/batch/SjmsBatchEndpoint.java | 133 ++++++++
 .../sjms/consumer/AbstractMessageHandler.java   |   3 +-
 .../component/sjms/jms/JmsMessageHelper.java    |  21 +-
 .../org/apache/camel/component/sjms-batch       |  18 ++
 .../sjms/batch/EmbeddedActiveMQBroker.java      |  74 +++++
 .../sjms/batch/ListAggregationStrategy.java     |  43 +++
 .../sjms/batch/SjmsBatchConsumerTest.java       | 247 +++++++++++++++
 .../sjms/batch/SjmsBatchEndpointTest.java       | 116 +++++++
 12 files changed, 1075 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-sjms/pom.xml b/components/camel-sjms/pom.xml
index 9778517..6977796 100644
--- a/components/camel-sjms/pom.xml
+++ b/components/camel-sjms/pom.xml
@@ -32,7 +32,8 @@
     <properties>
         <camel.osgi.export.pkg>
             org.apache.camel.component.sjms,
-            org.apache.camel.component.sjms.jms
+            org.apache.camel.component.sjms.jms,
+            org.apache.camel.component.sjms.batch
         </camel.osgi.export.pkg>
         <camel.osgi.private.pkg>
             org.apache.camel.component.sjms.consumer,
@@ -41,7 +42,8 @@
             org.apache.camel.component.sjms.tx
         </camel.osgi.private.pkg>
         <camel.osgi.export.service>
-            org.apache.camel.spi.ComponentResolver;component=sjms
+            org.apache.camel.spi.ComponentResolver;component=sjms,
+            org.apache.camel.spi.ComponentResolver;component=sjms-batch
         </camel.osgi.export.service>
     </properties>
 

http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SessionCompletion.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SessionCompletion.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SessionCompletion.java
new file mode 100644
index 0000000..27f06e6
--- /dev/null
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SessionCompletion.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.batch;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.spi.Synchronization;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.JMSException;
+import javax.jms.Session;
+
+/**
+ * @author jkorab
+ */
+class SessionCompletion implements Synchronization {
+    private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+    private final Session session;
+
+    public SessionCompletion(Session session) {
+        assert (session != null);
+        this.session = session;
+    }
+
+    @Override
+    public void onComplete(Exchange exchange) {
+        try {
+            log.debug("Committing");
+            session.commit();
+        } catch (JMSException ex) {
+            log.error("Exception caught while committing: {}", ex.getMessage());
+            exchange.setException(ex);
+        }
+    }
+
+    @Override
+    public void onFailure(Exchange exchange) {
+        try {
+            log.debug("Rolling back");
+            session.rollback();
+        } catch (JMSException ex) {
+            log.error("Exception caught while rolling back: {}", ex.getMessage());
+            exchange.setException(ex);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java
new file mode 100644
index 0000000..04b875d
--- /dev/null
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.batch;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.impl.UriEndpointComponent;
+import org.apache.camel.util.ObjectHelper;
+
+import javax.jms.ConnectionFactory;
+import java.util.Map;
+
+/**
+ * @author jkorab
+ */
+public class SjmsBatchComponent extends UriEndpointComponent {
+
+    private ConnectionFactory connectionFactory;
+
+    public SjmsBatchComponent() {
+        super(SjmsBatchEndpoint.class);
+    }
+
+    @Override
+    protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+        ObjectHelper.notNull(connectionFactory, "connectionFactory is null");
+        SjmsBatchEndpoint sjmsBatchEndpoint = new SjmsBatchEndpoint(uri, this, remaining);
+        setProperties(sjmsBatchEndpoint, parameters);
+        return sjmsBatchEndpoint;
+    }
+
+    public ConnectionFactory getConnectionFactory() {
+        return connectionFactory;
+    }
+
+    public void setConnectionFactory(ConnectionFactory connectionFactory) {
+        this.connectionFactory = connectionFactory;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
new file mode 100644
index 0000000..613d471
--- /dev/null
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.batch;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.component.sjms.jms.JmsMessageHelper;
+import org.apache.camel.impl.DefaultConsumer;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.spi.Synchronization;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.*;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Date;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * @author jkorab
+ */
+public class SjmsBatchConsumer extends DefaultConsumer {
+    private static final boolean TRANSACTED = true;
+    private final Logger LOG = LoggerFactory.getLogger(SjmsBatchConsumer.class);
+
+    private final SjmsBatchEndpoint sjmsBatchEndpoint;
+    private final AggregationStrategy aggregationStrategy;
+
+    private final int completionSize;
+    private final int completionTimeout;
+    private final int consumerCount;
+    private final int pollDuration;
+
+    private final ConnectionFactory connectionFactory;
+    private final String destinationName;
+    private final Processor processor;
+
+    private static AtomicInteger batchCount = new AtomicInteger();
+    private static AtomicLong messagesReceived = new AtomicLong();
+    private static AtomicLong messagesProcessed = new AtomicLong();
+    private ExecutorService jmsConsumerExecutors;
+
+    public SjmsBatchConsumer(SjmsBatchEndpoint sjmsBatchEndpoint, Processor processor) {
+        super(sjmsBatchEndpoint, processor);
+
+        this.sjmsBatchEndpoint = ObjectHelper.notNull(sjmsBatchEndpoint, "batchJmsEndpoint");
+        this.processor = ObjectHelper.notNull(processor, "processor");
+
+        destinationName = ObjectHelper.notEmpty(sjmsBatchEndpoint.getDestinationName(), "destinationName");
+
+        completionSize = sjmsBatchEndpoint.getCompletionSize();
+        completionTimeout = sjmsBatchEndpoint.getCompletionTimeout();
+        pollDuration = sjmsBatchEndpoint.getPollDuration();
+        if (pollDuration < 0) {
+            throw new IllegalArgumentException("pollDuration must be 0 or greater");
+        }
+
+        this.aggregationStrategy = ObjectHelper.notNull(sjmsBatchEndpoint.getAggregationStrategy(), "aggregationStrategy");
+
+        consumerCount = sjmsBatchEndpoint.getConsumerCount();
+        if (consumerCount <= 0) {
+            throw new IllegalArgumentException("consumerCount must be greater than 0");
+        }
+
+        SjmsBatchComponent sjmsBatchComponent = (SjmsBatchComponent) sjmsBatchEndpoint.getComponent();
+        connectionFactory = ObjectHelper.notNull(sjmsBatchComponent.getConnectionFactory(), "jmsBatchComponent.connectionFactory");
+    }
+
+    @Override
+    public Endpoint getEndpoint() {
+        return sjmsBatchEndpoint;
+    }
+
+    private final AtomicBoolean running = new AtomicBoolean(true);
+    private final AtomicReference<CountDownLatch> consumersShutdownLatchRef = new AtomicReference<>();
+
+    private Connection connection;
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+
+        // start up a shared connection
+        try {
+            connection = connectionFactory.createConnection();
+            connection.start();
+        } catch (JMSException ex) {
+            LOG.error("Exception caught closing connection: {}", getStackTrace(ex));
+            return;
+        }
+
+        if (LOG.isInfoEnabled()) {
+            LOG.info("Starting " + consumerCount + " consumer(s) for " + destinationName + ":" + completionSize);
+        }
+        consumersShutdownLatchRef.set(new CountDownLatch(consumerCount));
+
+        jmsConsumerExecutors = getEndpoint().getCamelContext().getExecutorServiceManager()
+                .newFixedThreadPool(this, "SjmsBatchConsumer", consumerCount);
+        for (int i = 0; i < consumerCount; i++) {
+            jmsConsumerExecutors.execute(new BatchConsumptionLoop());
+        }
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+        running.set(false);
+        CountDownLatch consumersShutdownLatch = consumersShutdownLatchRef.get();
+        if (consumersShutdownLatch != null) {
+            LOG.info("Stop signalled, waiting on consumers to shut down");
+            if (consumersShutdownLatch.await(60, TimeUnit.SECONDS)) {
+                LOG.warn("Timeout waiting on consumer threads to signal completion - shutting down");
+            } else {
+                LOG.info("All consumers have shut down");
+            }
+        } else {
+            LOG.info("Stop signalled while there are no consumers yet, so no need to wait for consumers");
+        }
+
+        try {
+            LOG.debug("Shutting down JMS connection");
+            connection.close();
+        } catch (JMSException jex) {
+            LOG.error("Exception caught closing connection: {}", getStackTrace(jex));
+        }
+
+        getEndpoint().getCamelContext().getExecutorServiceManager()
+                .shutdown(jmsConsumerExecutors);
+    }
+
+    private String getStackTrace(Exception ex) {
+        StringWriter writer = new StringWriter();
+        ex.printStackTrace(new PrintWriter(writer));
+        return writer.toString();
+    }
+
+    private class BatchConsumptionLoop implements Runnable {
+        @Override
+        public void run() {
+            try {
+                // a batch corresponds to a single session that will be committed or rolled back by a background thread
+                final Session session = connection.createSession(TRANSACTED, Session.CLIENT_ACKNOWLEDGE);
+                try {
+                    // destinationName only creates queues; there is no additional value to be gained
+                    // by transactionally consuming topic messages in batches
+                    Queue queue = session.createQueue(destinationName);
+                    MessageConsumer consumer = session.createConsumer(queue);
+                    try {
+                        consumeBatchesOnLoop(session, consumer);
+                    } finally {
+                        try {
+                            consumer.close();
+                        } catch (JMSException ex2) {
+                            log.error("Exception caught closing consumer: {}", ex2.getMessage());
+
+                        }
+                    }
+                } finally {
+                    try {
+                        session.close();
+                    } catch (JMSException ex1) {
+                        log.error("Exception caught closing session: {}", ex1.getMessage());
+                    }
+                }
+            } catch (JMSException ex) {
+                // from loop
+                LOG.error("Exception caught consuming from {}: {}", destinationName, getStackTrace(ex));
+            } finally {
+                // indicate that we have shut down
+                CountDownLatch consumersShutdownLatch = consumersShutdownLatchRef.get();
+                consumersShutdownLatch.countDown();
+            }
+        }
+
+        private void consumeBatchesOnLoop(Session session, MessageConsumer consumer) throws JMSException {
+            final boolean usingTimeout = completionTimeout > 0;
+
+            batchConsumption:
+            while (running.get()) {
+                int messageCount = 0;
+
+                // reset the clock counters
+                long timeElapsed = 0;
+                long startTime = 0;
+                Exchange aggregatedExchange = null;
+
+                batch:
+                while ((completionSize <= 0) || (messageCount < completionSize)) {
+                    // check periodically to see whether we should be shutting down
+                    long waitTime = (usingTimeout && (timeElapsed > 0))
+                            ? getReceiveWaitTime(timeElapsed)
+                            : pollDuration;
+                    Message message = consumer.receive(waitTime);
+
+                    if (running.get()) { // no interruptions received
+                        if (message == null) {
+                            // timed out, no message received
+                            LOG.trace("No message received");
+                        } else {
+                            if ((usingTimeout) && (messageCount == 0)) { // this is the first message
+                                startTime = new Date().getTime(); // start counting down the period for this batch
+                            }
+                            messageCount++;
+                            LOG.debug("Message received: {}", messageCount);
+                            if ((message instanceof ObjectMessage)
+                                    || (message instanceof TextMessage)) {
+                                Exchange exchange = JmsMessageHelper.createExchange(message, getEndpoint());
+                                aggregatedExchange = aggregationStrategy.aggregate(aggregatedExchange, exchange);
+                                aggregatedExchange.setProperty(SjmsBatchEndpoint.PROPERTY_BATCH_SIZE, messageCount);
+                            } else {
+                                throw new IllegalArgumentException("Unexpected message type: "
+                                        + message.getClass().toString());
+                            }
+                        }
+
+                        if ((usingTimeout) && (startTime > 0)) {
+                            // a batch has been started, check whether it should be timed out
+                            long currentTime = new Date().getTime();
+                            timeElapsed = currentTime - startTime;
+
+                            if (timeElapsed > completionTimeout) {
+                                // batch finished by timeout
+                                break batch;
+                            }
+                        }
+
+                    } else {
+                        LOG.info("Shutdown signal received - rolling batch back");
+                        session.rollback();
+                        break batchConsumption;
+                    }
+                } // batch
+                assert (aggregatedExchange != null);
+                process(aggregatedExchange, session);
+            }
+        }
+
+        /**
+         * Determine the time that a call to {@link MessageConsumer#receive()} should wait given the time that has elapsed for this batch.
+         * @param timeElapsed The time that has elapsed.
+         * @return The shorter of the time remaining or poll duration.
+         */
+        private long getReceiveWaitTime(long timeElapsed) {
+            long timeRemaining = getTimeRemaining(timeElapsed);
+
+            // wait for the shorter of the time remaining or the poll duration
+            if (timeRemaining <= 0) { // ensure that the thread doesn't wait indefinitely
+                timeRemaining = 1;
+            }
+            final long waitTime = (timeRemaining > pollDuration) ? pollDuration : timeRemaining;
+
+            LOG.debug("waiting for {}", waitTime);
+            return waitTime;
+        }
+
+        private long getTimeRemaining(long timeElapsed) {
+            long timeRemaining = completionTimeout - timeElapsed;
+            if (LOG.isDebugEnabled() && (timeElapsed > 0)) {
+                LOG.debug("Time remaining this batch: {}", timeRemaining);
+            }
+            return timeRemaining;
+        }
+
+        private void process(Exchange exchange, Session session) {
+            assert (exchange != null);
+            int id = batchCount.getAndIncrement();
+            int batchSize = exchange.getProperty(SjmsBatchEndpoint.PROPERTY_BATCH_SIZE, Integer.class);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Processing batch:size={}:total={}", batchSize, messagesReceived.addAndGet(batchSize));
+            }
+
+            Synchronization committing = new SessionCompletion(session);
+            exchange.addOnCompletion(committing);
+            try {
+                processor.process(exchange);
+                LOG.debug("Completed processing[{}]:total={}", id, messagesProcessed.addAndGet(batchSize));
+            } catch (Exception e) {
+                LOG.error("Error processing exchange: {}", e.getMessage());
+            }
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
new file mode 100644
index 0000000..afd5cbe
--- /dev/null
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.batch;
+
+import org.apache.camel.Component;
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriPath;
+
+/**
+ * @author jkorab
+ */
+@UriEndpoint(scheme = "sjmsBatch", title = "Simple JMS Batch Component", syntax = "sjms-batch:destinationName?aggregationStrategy=#aggStrategy", consumerClass = SjmsBatchComponent.class, label = "messaging")
+public class SjmsBatchEndpoint extends DefaultEndpoint {
+
+    public static final int DEFAULT_COMPLETION_SIZE = 200; // the default dispatch queue size in ActiveMQ
+    public static final int DEFAULT_COMPLETION_TIMEOUT = 500;
+    public static final String PROPERTY_BATCH_SIZE = "CamelSjmsBatchSize";
+
+    @UriPath
+    @Metadata(required = "true")
+    private String destinationName;
+
+    @UriParam(label = "consumer", defaultValue = "1", description = "The number of JMS sessions to consume from")
+    private Integer consumerCount = 1;
+
+    @UriParam(label = "consumer", defaultValue = "200",
+            description = "The number of messages consumed at which the batch will be completed")
+    private Integer completionSize = DEFAULT_COMPLETION_SIZE;
+
+    @UriParam(label = "consumer", defaultValue = "500",
+            description = "The timeout from receipt of the first first message when the batch will be completed")
+    private Integer completionTimeout = DEFAULT_COMPLETION_TIMEOUT;
+
+    @UriParam(label = "consumer", defaultValue = "1000",
+            description = "The duration in milliseconds of each poll for messages. " +
+                    "completionTimeOut will be used if it is shorter and a batch has started.")
+    private Integer pollDuration = 1000;
+
+    @Metadata(required = "true")
+    @UriParam(label = "consumer", description = "A #-reference to an AggregationStrategy visible to Camel")
+    private AggregationStrategy aggregationStrategy;
+
+    public SjmsBatchEndpoint() {}
+
+    public SjmsBatchEndpoint(String endpointUri, Component component, String remaining) {
+        super(endpointUri, component);
+        this.destinationName = remaining;
+    }
+
+    @Override
+    public boolean isSingleton() {
+        return true;
+    }
+
+    @Override
+    public Producer createProducer() throws Exception {
+        throw new UnsupportedOperationException("Cannot produce though a " + SjmsBatchEndpoint.class.getName());
+    }
+
+    @Override
+    public Consumer createConsumer(Processor processor) throws Exception {
+        return new SjmsBatchConsumer(this, processor);
+    }
+
+    public AggregationStrategy getAggregationStrategy() {
+        return aggregationStrategy;
+    }
+
+    public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
+        this.aggregationStrategy = aggregationStrategy;
+    }
+
+    public Integer getCompletionSize() {
+        return completionSize;
+    }
+
+    public void setCompletionSize(Integer completionSize) {
+        this.completionSize = completionSize;
+    }
+
+    public Integer getCompletionTimeout() {
+        return completionTimeout;
+    }
+
+    public void setCompletionTimeout(Integer completionTimeout) {
+        this.completionTimeout = completionTimeout;
+    }
+
+    public String getDestinationName() {
+        return destinationName;
+    }
+
+    public void setDestinationName(String destinationName) {
+        this.destinationName = destinationName;
+    }
+
+    public Integer getConsumerCount() {
+        return consumerCount;
+    }
+
+    public void setConsumerCount(Integer consumerCount) {
+        this.consumerCount = consumerCount;
+    }
+
+    public Integer getPollDuration() {
+        return pollDuration;
+    }
+
+    public void setPollDuration(Integer pollDuration) {
+        this.pollDuration = pollDuration;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java
index c3c5e55..1598a43 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java
@@ -70,7 +70,8 @@ public abstract class AbstractMessageHandler implements MessageListener {
     public void onMessage(Message message) {
         RuntimeCamelException rce = null;
         try {
-            final DefaultExchange exchange = (DefaultExchange) JmsMessageHelper.createExchange(message, getEndpoint());
+            SjmsEndpoint endpoint = (SjmsEndpoint) getEndpoint();
+            final DefaultExchange exchange = (DefaultExchange) JmsMessageHelper.createExchange(message, endpoint, endpoint.getJmsKeyFormatStrategy());
 
             log.debug("Processing Exchange.id:{}", exchange.getExchangeId());
 

http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java
index dcccd9b..79787c9 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java
@@ -62,8 +62,23 @@ public final class JmsMessageHelper implements JmsConstants {
     }
 
     public static Exchange createExchange(Message message, Endpoint endpoint) {
+        return createExchange(message, endpoint, null);
+    }
+
+    /**
+     * Creates an Exchange from a JMS Message.
+     * @param message The JMS message.
+     * @param endpoint The Endpoint to use to create the Exchange object.
+     * @param keyFormatStrategy the a {@link KeyFormatStrategy} to used to
+     *                          format keys in a JMS 1.1 compliant manner. If null the
+     *                          {@link DefaultJmsKeyFormatStrategy} will be used.
+     * @return Populated Exchange.
+     */
+    public static Exchange createExchange(Message message, Endpoint endpoint, KeyFormatStrategy keyFormatStrategy) {
         Exchange exchange = endpoint.createExchange();
-        return populateExchange(message, exchange, false, ((SjmsEndpoint)endpoint).getJmsKeyFormatStrategy());
+        KeyFormatStrategy initialisedKeyFormatStrategy = (keyFormatStrategy == null)
+                ? new DefaultJmsKeyFormatStrategy() : keyFormatStrategy;
+        return populateExchange(message, exchange, false, initialisedKeyFormatStrategy);
     }
 
     @SuppressWarnings("unchecked")
@@ -222,11 +237,11 @@ public final class JmsMessageHelper implements JmsConstants {
      * @param jmsMessage        the {@link Message} to add or update the headers on
      * @param messageHeaders    a {@link Map} of String/Object pairs
      * @param keyFormatStrategy the a {@link KeyFormatStrategy} to used to
-     *                          format keys in a JMS 1.1 compliant manner. If null the
-     *                          {@link DefaultJmsKeyFormatStrategy} will be used.
+     *                          format keys in a JMS 1.1 compliant manner.
      * @return {@link Message}
      */
     private static Message setJmsMessageHeaders(final Message jmsMessage, Map<String, Object> messageHeaders, KeyFormatStrategy keyFormatStrategy) throws IllegalHeaderException {
+
         Map<String, Object> headers = new HashMap<String, Object>(messageHeaders);
         for (final Map.Entry<String, Object> entry : headers.entrySet()) {
             String headerName = entry.getKey();

http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/src/main/resources/META-INF/services/org/apache/camel/component/sjms-batch
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/resources/META-INF/services/org/apache/camel/component/sjms-batch b/components/camel-sjms/src/main/resources/META-INF/services/org/apache/camel/component/sjms-batch
new file mode 100644
index 0000000..9ee9e4c
--- /dev/null
+++ b/components/camel-sjms/src/main/resources/META-INF/services/org/apache/camel/component/sjms-batch
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+class=org.apache.camel.component.sjms.batch.SjmsBatchComponent

http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/EmbeddedActiveMQBroker.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/EmbeddedActiveMQBroker.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/EmbeddedActiveMQBroker.java
new file mode 100644
index 0000000..fd1ed27
--- /dev/null
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/EmbeddedActiveMQBroker.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.batch;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
+import org.apache.camel.test.AvailablePortFinder;
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * JUnit Test aspect that creates an embedded ActiveMQ broker at the beginning of each test and shuts it down after.
+ */
+public class EmbeddedActiveMQBroker extends ExternalResource {
+
+    private final Logger log = LoggerFactory.getLogger(EmbeddedActiveMQBroker.class);
+    private final String brokerId;
+    private BrokerService brokerService;
+    private final String tcpConnectorUri;
+
+    public EmbeddedActiveMQBroker(String brokerId) {
+        if ((brokerId == null) || (brokerId.isEmpty())) {
+            throw new IllegalArgumentException("brokerId is empty");
+        }
+        this.brokerId = brokerId;
+        tcpConnectorUri = "tcp://localhost:" + AvailablePortFinder.getNextAvailable();
+
+        brokerService = new BrokerService();
+        brokerService.setBrokerId(brokerId);
+        brokerService.setPersistent(false);
+        brokerService.setUseJmx(false);
+        try {
+            brokerService.setPersistenceAdapter(new MemoryPersistenceAdapter());
+            brokerService.addConnector(tcpConnectorUri);
+        } catch (Exception e) {
+            throw new RuntimeException("Problem creating brokerService", e);
+        }
+    }
+
+    @Override
+    protected void before() throws Throwable {
+        log.info("Starting embedded broker[{}] on {}", brokerId, tcpConnectorUri);
+        brokerService.start();
+    }
+
+    @Override
+    protected void after() {
+        try {
+            log.info("Stopping embedded broker[{}]", brokerId);
+            brokerService.stop();
+        } catch (Exception e) {
+            throw new RuntimeException("Exception shutting down broker service", e);
+        }
+    }
+
+    public String getTcpConnectorUri() {
+        return tcpConnectorUri;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/ListAggregationStrategy.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/ListAggregationStrategy.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/ListAggregationStrategy.java
new file mode 100644
index 0000000..39774b2
--- /dev/null
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/ListAggregationStrategy.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.batch;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author jkorab
+ */
+public class ListAggregationStrategy implements AggregationStrategy {
+    @Override
+    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+        String body = newExchange.getIn().getBody(String.class);
+        if (oldExchange == null) {
+            List<String> list = new ArrayList<String>();
+            list.add(body);
+            newExchange.getIn().setBody(list);
+            return newExchange;
+        }  else {
+            List<String> list = oldExchange.getIn().getBody(List.class);
+            list.add(body);
+            return oldExchange;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
new file mode 100644
index 0000000..58fc717
--- /dev/null
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.batch;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.LoggingLevel;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.sjms.SjmsComponent;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.SimpleRegistry;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.apache.camel.util.StopWatch;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.ConnectionFactory;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+
+/**
+ * @author jkorab
+ */
+public class SjmsBatchConsumerTest extends CamelTestSupport {
+    private final Logger LOG = LoggerFactory.getLogger(SjmsBatchConsumerTest.class);
+
+    @Rule
+    public EmbeddedActiveMQBroker broker = new EmbeddedActiveMQBroker("localhost");
+
+    @Override
+    public CamelContext createCamelContext() throws Exception {
+        SimpleRegistry registry = new SimpleRegistry();
+        registry.put("testStrategy", new ListAggregationStrategy());
+        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(broker.getTcpConnectorUri());
+
+        SjmsComponent sjmsComponent = new SjmsComponent();
+        sjmsComponent.setConnectionFactory(connectionFactory);
+
+        SjmsBatchComponent sjmsBatchComponent = new SjmsBatchComponent();
+        sjmsBatchComponent.setConnectionFactory(connectionFactory);
+
+        CamelContext context = new DefaultCamelContext(registry);
+        context.addComponent("sjms", sjmsComponent);
+        context.addComponent("sjmsbatch", sjmsBatchComponent);
+        return context;
+    }
+
+    private static class TransactedSendHarness extends RouteBuilder {
+        private final String queueName;
+
+        public TransactedSendHarness(String queueName) {
+            this.queueName = queueName;
+        }
+
+        @Override
+        public void configure() throws Exception {
+            from("direct:in").routeId("harness").startupOrder(20)
+                .split(body())
+                    .toF("sjms:queue:%s?transacted=true", queueName)
+                    .to("mock:before")
+                .end();
+        }
+    }
+
+    @Override
+    public boolean isUseAdviceWith() {
+        return true;
+    }
+
+    @Test
+    public void testConsumption() throws Exception {
+
+        final int messageCount = 10000;
+        final int consumerCount = 5;
+
+        final String queueName = getQueueName();
+        context.addRoutes(new TransactedSendHarness(queueName));
+        context.addRoutes(new RouteBuilder() {
+            public void configure() throws Exception {
+
+                int completionTimeout = 1000;
+                int completionSize = 200;
+
+                fromF("sjmsbatch:%s?completionTimeout=%s&completionSize=%s" +
+                        "&consumerCount=%s&aggregationStrategy=#testStrategy",
+                        queueName, completionTimeout, completionSize, consumerCount)
+                        .routeId("batchConsumer").startupOrder(10).autoStartup(false)
+                    .split(body())
+                    .to("mock:split");
+            }
+        });
+        context.start();
+
+        MockEndpoint mockBefore = getMockEndpoint("mock:before");
+        mockBefore.setExpectedMessageCount(messageCount);
+
+        MockEndpoint mockSplit = getMockEndpoint("mock:split");
+        mockSplit.setExpectedMessageCount(messageCount);
+
+        LOG.info("Sending messages");
+        template.sendBody("direct:in", generateStrings(messageCount));
+        LOG.info("Send complete");
+
+        StopWatch stopWatch = new StopWatch();
+        context.startRoute("batchConsumer");
+        assertMockEndpointsSatisfied();
+        long time = stopWatch.stop();
+
+        LOG.info("Processed {} messages in {} ms", messageCount, time);
+        LOG.info("Average throughput {} msg/s", (long) (messageCount / (time / 1000d)));
+    }
+
+    @Test
+    public void testConsumption_completionSize() throws Exception {
+        final int completionSize = 5;
+        final int completionTimeout = -1; // size-based only
+
+        final String queueName = getQueueName();
+        context.addRoutes(new TransactedSendHarness(queueName));
+        context.addRoutes(new RouteBuilder() {
+            public void configure() throws Exception {
+                fromF("sjmsbatch:%s?completionTimeout=%s&completionSize=%s&aggregationStrategy=#testStrategy",
+                        queueName, completionTimeout, completionSize).routeId("batchConsumer").startupOrder(10)
+                        .log(LoggingLevel.DEBUG, "${body.size}")
+                        .to("mock:batches");
+            }
+        });
+        context.start();
+
+        int messageCount = 100;
+        MockEndpoint mockBatches = getMockEndpoint("mock:batches");
+        mockBatches.expectedMessageCount(messageCount / completionSize);
+
+        template.sendBody("direct:in", generateStrings(messageCount));
+        mockBatches.assertIsSatisfied();
+    }
+
+    @Test
+    public void testConsumption_completionTimeout() throws Exception {
+        final int completionTimeout = 2000;
+        final int completionSize = -1; // timeout-based only
+
+        final String queueName = getQueueName();
+        context.addRoutes(new TransactedSendHarness(queueName));
+        context.addRoutes(new RouteBuilder() {
+            public void configure() throws Exception {
+                fromF("sjmsbatch:%s?completionTimeout=%s&completionSize=%s&aggregationStrategy=#testStrategy",
+                        queueName, completionTimeout, completionSize).routeId("batchConsumer").startupOrder(10)
+                        .to("mock:batches");
+            }
+        });
+        context.start();
+
+        int messageCount = 50;
+        assertTrue(messageCount < SjmsBatchEndpoint.DEFAULT_COMPLETION_SIZE);
+        MockEndpoint mockBatches = getMockEndpoint("mock:batches");
+        mockBatches.expectedMessageCount(1);  // everything batched together
+
+        template.sendBody("direct:in", generateStrings(messageCount));
+        mockBatches.assertIsSatisfied();
+        assertFirstMessageBodyOfLength(mockBatches, messageCount);
+    }
+
+    /**
+     * Checks whether multiple consumer endpoints can operate in parallel.
+     */
+    @Test
+    public void testConsumption_multipleConsumerEndpoints() throws Exception {
+        final int completionTimeout = 2000;
+        final int completionSize = 5;
+
+        final String queueName = getQueueName();
+        context.addRoutes(new RouteBuilder() {
+            public void configure() throws Exception {
+
+                from("direct:in")
+                    .split().body()
+                    .multicast()
+                        .toF("sjms:%s", queueName + "A")
+                        .toF("sjms:%s", queueName + "B")
+                    .end();
+
+                fromF("sjmsbatch:%s?completionTimeout=%s&completionSize=%s&aggregationStrategy=#testStrategy",
+                        queueName + "A", completionTimeout, completionSize).routeId("batchConsumerA")
+                        .to("mock:outA");
+
+                fromF("sjmsbatch:%s?completionTimeout=%s&completionSize=%s&aggregationStrategy=#testStrategy",
+                        queueName + "B", completionTimeout, completionSize).routeId("batchConsumerB")
+                        .to("mock:outB");
+
+            }
+        });
+        context.start();
+
+        int messageCount = 5;
+
+        assertTrue(messageCount < SjmsBatchEndpoint.DEFAULT_COMPLETION_SIZE);
+        MockEndpoint mockOutA = getMockEndpoint("mock:outA");
+        mockOutA.expectedMessageCount(1);  // everything batched together
+        MockEndpoint mockOutB = getMockEndpoint("mock:outB");
+        mockOutB.expectedMessageCount(1);  // everything batched together
+
+        template.sendBody("direct:in", generateStrings(messageCount));
+        assertMockEndpointsSatisfied();
+
+        assertFirstMessageBodyOfLength(mockOutA, messageCount);
+        assertFirstMessageBodyOfLength(mockOutB, messageCount);
+    }
+
+    private void assertFirstMessageBodyOfLength(MockEndpoint mockEndpoint, int expectedLength) {
+        Exchange exchange = mockEndpoint.getExchanges().get(0);
+        assertEquals(expectedLength, exchange.getIn().getBody(List.class).size());
+    }
+
+    private String getQueueName() {
+        SimpleDateFormat sdf = new SimpleDateFormat("yyMMddhhmmss");
+        return "sjmsbatch-" + sdf.format(new Date());
+    }
+
+    private String[] generateStrings(int messageCount) {
+        String[] strings = new String[messageCount];
+        for (int i = 0; i < messageCount; i++) {
+            strings[i] = "message:" + i;
+        }
+        return strings;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/ab1d1dd7/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpointTest.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpointTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpointTest.java
new file mode 100644
index 0000000..7a75e76
--- /dev/null
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpointTest.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.batch;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.sjms.SjmsComponent;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.SimpleRegistry;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.apache.camel.util.toolbox.AggregationStrategies;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * @author jkorab
+ */
+public class SjmsBatchEndpointTest extends CamelTestSupport {
+
+    // Create one embedded broker instance for the entire test, as we aren't actually
+    // going to send any messages to it; we just need it so that the ConnectionFactory
+    // has something local to connect to.
+    public static EmbeddedActiveMQBroker broker;
+
+    @BeforeClass
+    public static void setupBroker() {
+        broker = new EmbeddedActiveMQBroker("localhost");
+        try {
+            broker.before();
+        } catch (Throwable t) {
+            throw new RuntimeException(t);
+        }
+    }
+
+    @AfterClass
+    public static void shutDownBroker() {
+        broker.after();
+    }
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        SimpleRegistry registry = new SimpleRegistry();
+        registry.put("aggStrategy", AggregationStrategies.groupedExchange());
+
+        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
+        connectionFactory.setBrokerURL(broker.getTcpConnectorUri());
+
+        SjmsComponent sjmsComponent = new SjmsComponent();
+        sjmsComponent.setConnectionFactory(connectionFactory);
+
+        SjmsBatchComponent sjmsBatchComponent = new SjmsBatchComponent();
+        sjmsBatchComponent.setConnectionFactory(connectionFactory);
+
+        CamelContext context = new DefaultCamelContext(registry);
+        context.addComponent("sjmsbatch", sjmsBatchComponent);
+        context.addComponent("sjms", sjmsComponent);
+
+        return context;
+    }
+
+    @Override
+    public boolean isUseAdviceWith() {
+        return true;
+    }
+
+    @Test(expected = org.apache.camel.FailedToCreateProducerException.class)
+    public void testProducerFailure() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            public void configure() throws Exception {
+                from("direct:in").to("sjmsbatch:testQueue");
+            }
+        });
+        context.start();
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testConsumer_negativePollDuration() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("sjmsbatch:in?aggregationStrategy=#aggStrategy&pollDuration=-1")
+                    .to("mock:out");
+            }
+        });
+        context.start();
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testConsumer_negativeConsumerCount() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("sjmsbatch:in?aggregationStrategy=#aggStrategy&consumerCount=-1")
+                        .to("mock:out");
+            }
+        });
+        context.start();
+    }
+
+}