You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/12/22 20:18:34 UTC

camel git commit: CAMEL-10648: camel-sjms - Batch consumer should support async start listener

Repository: camel
Updated Branches:
  refs/heads/master a58885109 -> 5371913c4


CAMEL-10648: camel-sjms - Batch consumer should support async start listener


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5371913c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5371913c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5371913c

Branch: refs/heads/master
Commit: 5371913c405f0f296029b0f274823c6b9a3944bf
Parents: a588851
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Dec 22 20:04:22 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Dec 22 20:22:15 2016 +0100

----------------------------------------------------------------------
 .../SjmsBatchComponentConfiguration.java        |  35 ++++++
 .../src/main/docs/sjms-batch-component.adoc     |   8 +-
 .../sjms/batch/SjmsBatchComponent.java          |  64 ++++++++++-
 .../component/sjms/batch/SjmsBatchConsumer.java | 115 +++++++++++++++----
 .../component/sjms/batch/SjmsBatchEndpoint.java |  38 ++++++
 .../batch/SjmsBatchConsumerAsyncStartTest.java  |  51 ++++++++
 .../sjms/manual/ManualBatchFromQueueTest.java   |  75 ++++++++++++
 7 files changed, 360 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/5371913c/components-starter/camel-sjms-starter/src/main/java/org/apache/camel/component/sjms/batch/springboot/SjmsBatchComponentConfiguration.java
----------------------------------------------------------------------
diff --git a/components-starter/camel-sjms-starter/src/main/java/org/apache/camel/component/sjms/batch/springboot/SjmsBatchComponentConfiguration.java b/components-starter/camel-sjms-starter/src/main/java/org/apache/camel/component/sjms/batch/springboot/SjmsBatchComponentConfiguration.java
index c1dc1a9..fde3431 100644
--- a/components-starter/camel-sjms-starter/src/main/java/org/apache/camel/component/sjms/batch/springboot/SjmsBatchComponentConfiguration.java
+++ b/components-starter/camel-sjms-starter/src/main/java/org/apache/camel/component/sjms/batch/springboot/SjmsBatchComponentConfiguration.java
@@ -35,6 +35,25 @@ public class SjmsBatchComponentConfiguration {
      */
     private ConnectionFactory connectionFactory;
     /**
+     * Whether to startup the consumer message listener asynchronously when
+     * starting a route. For example if a JmsConsumer cannot get a connection to
+     * a remote JMS broker then it may block while retrying and/or failover.
+     * This will cause Camel to block while starting routes. By setting this
+     * option to true you will let routes startup while the JmsConsumer connects
+     * to the JMS broker using a dedicated thread in asynchronous mode. If this
+     * option is used then beware that if the connection could not be
+     * established then an exception is logged at WARN level and the consumer
+     * will not be able to receive messages; You can then restart the route to
+     * retry.
+     */
+    private Boolean asyncStartListener = false;
+    /**
+     * Specifies the interval between recovery attempts i.e. when a connection
+     * is being refreshed in milliseconds. The default is 5000 ms that is 5
+     * seconds.
+     */
+    private Integer recoveryInterval = 5000;
+    /**
      * To use a custom org.apache.camel.spi.HeaderFilterStrategy to filter
      * header to and from Camel message.
      */
@@ -49,6 +68,22 @@ public class SjmsBatchComponentConfiguration {
         this.connectionFactory = connectionFactory;
     }
 
+    public Boolean getAsyncStartListener() {
+        return asyncStartListener;
+    }
+
+    public void setAsyncStartListener(Boolean asyncStartListener) {
+        this.asyncStartListener = asyncStartListener;
+    }
+
+    public Integer getRecoveryInterval() {
+        return recoveryInterval;
+    }
+
+    public void setRecoveryInterval(Integer recoveryInterval) {
+        this.recoveryInterval = recoveryInterval;
+    }
+
     public HeaderFilterStrategy getHeaderFilterStrategy() {
         return headerFilterStrategy;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/5371913c/components/camel-sjms/src/main/docs/sjms-batch-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/docs/sjms-batch-component.adoc b/components/camel-sjms/src/main/docs/sjms-batch-component.adoc
index 5892047..506c235 100644
--- a/components/camel-sjms/src/main/docs/sjms-batch-component.adoc
+++ b/components/camel-sjms/src/main/docs/sjms-batch-component.adoc
@@ -117,7 +117,7 @@ Component Options and Configurations
 
 
 // component options: START
-The Simple JMS Batch component supports 2 options which are listed below.
+The Simple JMS Batch component supports 4 options which are listed below.
 
 
 
@@ -126,6 +126,8 @@ The Simple JMS Batch component supports 2 options which are listed below.
 |=======================================================================
 | Name | Group | Default | Java Type | Description
 | connectionFactory | advanced |  | ConnectionFactory | A ConnectionFactory is required to enable the SjmsBatchComponent.
+| asyncStartListener | advanced | false | boolean | Whether to startup the consumer message listener asynchronously when starting a route. For example if a JmsConsumer cannot get a connection to a remote JMS broker then it may block while retrying and/or failover. This will cause Camel to block while starting routes. By setting this option to true you will let routes startup while the JmsConsumer connects to the JMS broker using a dedicated thread in asynchronous mode. If this option is used then beware that if the connection could not be established then an exception is logged at WARN level and the consumer will not be able to receive messages; You can then restart the route to retry.
+| recoveryInterval | advanced | 5000 | int | Specifies the interval between recovery attempts i.e. when a connection is being refreshed in milliseconds. The default is 5000 ms that is 5 seconds.
 | headerFilterStrategy | filter |  | HeaderFilterStrategy | To use a custom org.apache.camel.spi.HeaderFilterStrategy to filter header to and from Camel message.
 |=======================================================================
 {% endraw %}
@@ -140,7 +142,7 @@ The Simple JMS Batch component supports 2 options which are listed below.
 
 
 // endpoint options: START
-The Simple JMS Batch component supports 21 endpoint options which are listed below:
+The Simple JMS Batch component supports 23 endpoint options which are listed below:
 
 {% raw %}
 [width="100%",cols="2,1,1m,1m,5",options="header"]
@@ -162,9 +164,11 @@ The Simple JMS Batch component supports 21 endpoint options which are listed bel
 | sendEmptyMessageWhenIdle | consumer | false | boolean | If using completion timeout or interval then the batch may be empty if the timeout triggered and there was no messages in the batch. If this option is true and the batch is empty then an empty message is added to the batch so an empty message is routed.
 | exceptionHandler | consumer (advanced) |  | ExceptionHandler | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this options is not in use. By default the consumer will deal with exceptions that will be logged at WARN/ERROR level and ignored.
 | exchangePattern | consumer (advanced) |  | ExchangePattern | Sets the exchange pattern when the consumer creates an exchange.
+| asyncStartListener | advanced | false | boolean | Whether to startup the consumer message listener asynchronously when starting a route. For example if a JmsConsumer cannot get a connection to a remote JMS broker then it may block while retrying and/or failover. This will cause Camel to block while starting routes. By setting this option to true you will let routes startup while the JmsConsumer connects to the JMS broker using a dedicated thread in asynchronous mode. If this option is used then beware that if the connection could not be established then an exception is logged at WARN level and the consumer will not be able to receive messages; You can then restart the route to retry.
 | headerFilterStrategy | advanced |  | HeaderFilterStrategy | To use a custom HeaderFilterStrategy to filter header to and from Camel message.
 | jmsKeyFormatStrategy | advanced |  | JmsKeyFormatStrategy | Pluggable strategy for encoding and decoding JMS keys so they can be compliant with the JMS specification. Camel provides two implementations out of the box: default and passthrough. The default strategy will safely marshal dots and hyphens (. and -). The passthrough strategy leaves the key as is. Can be used for JMS brokers which do not care whether JMS header keys contain illegal characters. You can provide your own implementation of the org.apache.camel.component.jms.JmsKeyFormatStrategy and refer to it using the notation.
 | messageCreatedStrategy | advanced |  | MessageCreatedStrategy | To use the given MessageCreatedStrategy which are invoked when Camel creates new instances of javax.jms.Message objects when Camel is sending a JMS message.
+| recoveryInterval | advanced | 5000 | int | Specifies the interval between recovery attempts i.e. when a connection is being refreshed in milliseconds. The default is 5000 ms that is 5 seconds.
 | synchronous | advanced | false | boolean | Sets whether synchronous processing should be strictly used or Camel is allowed to use asynchronous processing (if supported).
 | timeoutCheckerExecutorService | advanced |  | ScheduledExecutorService | If using the completionInterval option a background thread is created to trigger the completion interval. Set this option to provide a custom thread pool to be used rather than creating a new thread for every consumer.
 |=======================================================================

http://git-wip-us.apache.org/repos/asf/camel/blob/5371913c/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java
index 9f2cf27..3649ffb 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.sjms.batch;
 
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
 import javax.jms.ConnectionFactory;
 
 import org.apache.camel.Endpoint;
@@ -26,8 +27,14 @@ import org.apache.camel.util.ObjectHelper;
 
 public class SjmsBatchComponent extends HeaderFilterStrategyComponent {
 
+    private ExecutorService asyncStartStopExecutorService;
+
     @Metadata(label = "advanced")
     private ConnectionFactory connectionFactory;
+    @Metadata(label = "advanced")
+    private boolean asyncStartListener;
+    @Metadata(label = "advanced", defaultValue = "5000")
+    private int recoveryInterval = 5000;
 
     public SjmsBatchComponent() {
         super(SjmsBatchEndpoint.class);
@@ -40,9 +47,13 @@ public class SjmsBatchComponent extends HeaderFilterStrategyComponent {
             setConnectionFactory(cf);
         }
         ObjectHelper.notNull(connectionFactory, "connectionFactory");
-        SjmsBatchEndpoint sjmsBatchEndpoint = new SjmsBatchEndpoint(uri, this, remaining);
-        setProperties(sjmsBatchEndpoint, parameters);
-        return sjmsBatchEndpoint;
+
+        SjmsBatchEndpoint answer = new SjmsBatchEndpoint(uri, this, remaining);
+        answer.setAsyncStartListener(isAsyncStartListener());
+        answer.setRecoveryInterval(getRecoveryInterval());
+        setProperties(answer, parameters);
+
+        return answer;
     }
 
     public ConnectionFactory getConnectionFactory() {
@@ -56,4 +67,51 @@ public class SjmsBatchComponent extends HeaderFilterStrategyComponent {
         this.connectionFactory = connectionFactory;
     }
 
+    public boolean isAsyncStartListener() {
+        return asyncStartListener;
+    }
+
+    /**
+     * Whether to startup the consumer message listener asynchronously, when starting a route.
+     * For example if a JmsConsumer cannot get a connection to a remote JMS broker, then it may block while retrying
+     * and/or failover. This will cause Camel to block while starting routes. By setting this option to true,
+     * you will let routes startup, while the JmsConsumer connects to the JMS broker using a dedicated thread
+     * in asynchronous mode. If this option is used, then beware that if the connection could not be established,
+     * then an exception is logged at WARN level, and the consumer will not be able to receive messages;
+     * You can then restart the route to retry.
+     */
+    public void setAsyncStartListener(boolean asyncStartListener) {
+        this.asyncStartListener = asyncStartListener;
+    }
+
+    public int getRecoveryInterval() {
+        return recoveryInterval;
+    }
+
+    /**
+     * Specifies the interval between recovery attempts, i.e. when a connection is being refreshed, in milliseconds.
+     * The default is 5000 ms, that is, 5 seconds.
+     */
+    public void setRecoveryInterval(int recoveryInterval) {
+        this.recoveryInterval = recoveryInterval;
+    }
+
+    @Override
+    protected void doShutdown() throws Exception {
+        if (asyncStartStopExecutorService != null) {
+            getCamelContext().getExecutorServiceManager().shutdownNow(asyncStartStopExecutorService);
+            asyncStartStopExecutorService = null;
+        }
+        super.doShutdown();
+    }
+
+    protected synchronized ExecutorService getAsyncStartStopExecutorService() {
+        if (asyncStartStopExecutorService == null) {
+            // use a cached thread pool for async start tasks as they can run for a while, and we need a dedicated thread
+            // for each task, and the thread pool will shrink when no more tasks running
+            asyncStartStopExecutorService = getCamelContext().getExecutorServiceManager().newCachedThreadPool(this, "AsyncStartStopListener");
+        }
+        return asyncStartStopExecutorService;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/5371913c/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
index 20acfdf..48f9ff7 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
@@ -71,7 +71,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
     private final ConnectionFactory connectionFactory;
     private final String destinationName;
     private ExecutorService jmsConsumerExecutors;
-    private final AtomicBoolean running = new AtomicBoolean(true);
+    private final AtomicBoolean running = new AtomicBoolean(false);
     private final AtomicReference<CountDownLatch> consumersShutdownLatchRef = new AtomicReference<>();
     private Connection connection;
 
@@ -106,7 +106,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
             throw new IllegalArgumentException("consumerCount must be greater than 0");
         }
 
-        SjmsBatchComponent sjmsBatchComponent = (SjmsBatchComponent) sjmsBatchEndpoint.getComponent();
+        SjmsBatchComponent sjmsBatchComponent = sjmsBatchEndpoint.getComponent();
         connectionFactory = ObjectHelper.notNull(sjmsBatchComponent.getConnectionFactory(), "jmsBatchComponent.connectionFactory");
     }
 
@@ -127,32 +127,105 @@ public class SjmsBatchConsumer extends DefaultConsumer {
     protected void doStart() throws Exception {
         super.doStart();
 
-        // start up a shared connection
-        connection = connectionFactory.createConnection();
-        connection.start();
+        boolean recovery = getEndpoint().isAsyncStartListener();
+        StartConsumerTask task = new StartConsumerTask(recovery, getEndpoint().getRecoveryInterval());
 
-        LOG.info("Starting {} consumer(s) for {}:{}", consumerCount, destinationName, completionSize);
-        consumersShutdownLatchRef.set(new CountDownLatch(consumerCount));
+        if (recovery) {
+            // use a background thread to keep starting the consumer until
+            getEndpoint().getComponent().getAsyncStartStopExecutorService().submit(task);
+        } else {
+            task.run();
+        }
+    }
 
-        jmsConsumerExecutors = getEndpoint().getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, "SjmsBatchConsumer", consumerCount);
+    /**
+     * Task to startup the consumer either synchronously or using asynchronous with recovery
+     */
+    protected class StartConsumerTask implements Runnable {
 
-        final List<AtomicBoolean> triggers = new ArrayList<>();
-        for (int i = 0; i < consumerCount; i++) {
-            BatchConsumptionLoop loop = new BatchConsumptionLoop();
-            triggers.add(loop.getCompletionTimeoutTrigger());
-            jmsConsumerExecutors.execute(loop);
+        private boolean recoveryEnabled;
+        private int recoveryInterval;
+        private long attempt;
+
+        public StartConsumerTask(boolean recoveryEnabled, int recoveryInterval) {
+            this.recoveryEnabled = recoveryEnabled;
+            this.recoveryInterval = recoveryInterval;
         }
 
-        if (completionInterval > 0) {
-            LOG.info("Using CompletionInterval to run every {} millis.", completionInterval);
-            if (timeoutCheckerExecutorService == null) {
-                setTimeoutCheckerExecutorService(getEndpoint().getCamelContext().getExecutorServiceManager().newScheduledThreadPool(this, SJMS_BATCH_TIMEOUT_CHECKER, 1));
-                shutdownTimeoutCheckerExecutorService = true;
+        @Override
+        public void run() {
+            jmsConsumerExecutors = getEndpoint().getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, "SjmsBatchConsumer", consumerCount);
+            consumersShutdownLatchRef.set(new CountDownLatch(consumerCount));
+
+            if (completionInterval > 0) {
+                LOG.info("Using CompletionInterval to run every {} millis.", completionInterval);
+                if (timeoutCheckerExecutorService == null) {
+                    setTimeoutCheckerExecutorService(getEndpoint().getCamelContext().getExecutorServiceManager().newScheduledThreadPool(this, SJMS_BATCH_TIMEOUT_CHECKER, 1));
+                    shutdownTimeoutCheckerExecutorService = true;
+                }
             }
-            // trigger completion based on interval
-            timeoutCheckerExecutorService.scheduleAtFixedRate(new CompletionIntervalTask(triggers), completionInterval, completionInterval, TimeUnit.MILLISECONDS);
-        }
 
+            // keep loop until we can connect
+            while (isRunAllowed() && !running.get()) {
+                Connection localConnection = null;
+                try {
+                    attempt++;
+
+                    LOG.debug("Attempt #{}. Starting {} consumer(s) for {}:{}", attempt, consumerCount, destinationName, completionSize);
+
+                    // start up a shared connection
+                    localConnection = connectionFactory.createConnection();
+                    localConnection.start();
+
+                    final List<AtomicBoolean> triggers = new ArrayList<>();
+                    for (int i = 0; i < consumerCount; i++) {
+                        BatchConsumptionLoop loop = new BatchConsumptionLoop();
+                        triggers.add(loop.getCompletionTimeoutTrigger());
+                        jmsConsumerExecutors.execute(loop);
+                    }
+
+                    // its success so prepare for exit
+                    connection = localConnection;
+
+                    if (completionInterval > 0) {
+                        // trigger completion based on interval
+                        timeoutCheckerExecutorService.scheduleAtFixedRate(new CompletionIntervalTask(triggers), completionInterval, completionInterval, TimeUnit.MILLISECONDS);
+                    }
+
+                    if (attempt > 1) {
+                        LOG.info("Successfully refreshed connection after {} attempts.", attempt);
+                    }
+
+                    LOG.info("Started {} consumer(s) for {}:{}", consumerCount, destinationName, completionSize);
+                    running.set(true);
+                    return;
+                } catch (Throwable e) {
+                    // we failed so close the local connection as we create a new on next attempt
+                    try {
+                        if (localConnection != null) {
+                            localConnection.close();
+                        }
+                    } catch (Throwable t) {
+                        // ignore
+                    }
+
+                    if (recoveryEnabled) {
+                        getExceptionHandler().handleException("Error starting consumer after " + attempt + " attempts. Will try again in " + recoveryInterval + " millis.", e);
+                    } else {
+                        throw ObjectHelper.wrapRuntimeCamelException(e);
+                    }
+                }
+
+                // sleeping before next attempt
+                try {
+                    LOG.debug("Attempt #{}. Sleeping {} before next attempt to recover", attempt, recoveryInterval);
+                    Thread.sleep(recoveryInterval);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    return;
+                }
+            }
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/5371913c/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
index 4e06b87..73d83c2 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
@@ -89,6 +89,10 @@ public class SjmsBatchEndpoint extends DefaultEndpoint implements HeaderFilterSt
     private JmsKeyFormatStrategy jmsKeyFormatStrategy;
     @UriParam(label = "advanced")
     private ScheduledExecutorService timeoutCheckerExecutorService;
+    @UriParam(label = "advanced")
+    private boolean asyncStartListener;
+    @UriParam(label = "advanced", defaultValue = "5000")
+    private int recoveryInterval = 5000;
 
     public SjmsBatchEndpoint() {
     }
@@ -110,6 +114,11 @@ public class SjmsBatchEndpoint extends DefaultEndpoint implements HeaderFilterSt
     }
 
     @Override
+    public SjmsBatchComponent getComponent() {
+        return (SjmsBatchComponent) super.getComponent();
+    }
+
+    @Override
     public Producer createProducer() throws Exception {
         throw new UnsupportedOperationException("Producer not supported");
     }
@@ -366,4 +375,33 @@ public class SjmsBatchEndpoint extends DefaultEndpoint implements HeaderFilterSt
     public void setTimeoutCheckerExecutorService(ScheduledExecutorService timeoutCheckerExecutorService) {
         this.timeoutCheckerExecutorService = timeoutCheckerExecutorService;
     }
+
+    public boolean isAsyncStartListener() {
+        return asyncStartListener;
+    }
+
+    /**
+     * Whether to startup the consumer message listener asynchronously, when starting a route.
+     * For example if a JmsConsumer cannot get a connection to a remote JMS broker, then it may block while retrying
+     * and/or failover. This will cause Camel to block while starting routes. By setting this option to true,
+     * you will let routes startup, while the JmsConsumer connects to the JMS broker using a dedicated thread
+     * in asynchronous mode. If this option is used, then beware that if the connection could not be established,
+     * then an exception is logged at WARN level, and the consumer will not be able to receive messages;
+     * You can then restart the route to retry.
+     */
+    public void setAsyncStartListener(boolean asyncStartListener) {
+        this.asyncStartListener = asyncStartListener;
+    }
+
+    public int getRecoveryInterval() {
+        return recoveryInterval;
+    }
+
+    /**
+     * Specifies the interval between recovery attempts, i.e. when a connection is being refreshed, in milliseconds.
+     * The default is 5000 ms, that is, 5 seconds.
+     */
+    public void setRecoveryInterval(int recoveryInterval) {
+        this.recoveryInterval = recoveryInterval;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/5371913c/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerAsyncStartTest.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerAsyncStartTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerAsyncStartTest.java
new file mode 100644
index 0000000..fc0a46e
--- /dev/null
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerAsyncStartTest.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.batch;
+
+import javax.jms.ConnectionFactory;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.camel.CamelContext;
+import org.apache.camel.component.sjms.SjmsComponent;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.SimpleRegistry;
+
+public class SjmsBatchConsumerAsyncStartTest extends SjmsBatchConsumerTest {
+
+    // lets just test that any of the existing tests works
+
+    @Override
+    public CamelContext createCamelContext() throws Exception {
+        SimpleRegistry registry = new SimpleRegistry();
+        registry.put("testStrategy", new ListAggregationStrategy());
+        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(broker.getTcpConnectorUri());
+
+        SjmsComponent sjmsComponent = new SjmsComponent();
+        sjmsComponent.setConnectionFactory(connectionFactory);
+
+        SjmsBatchComponent sjmsBatchComponent = new SjmsBatchComponent();
+        sjmsBatchComponent.setConnectionFactory(connectionFactory);
+        // turn on async start listener
+        sjmsBatchComponent.setAsyncStartListener(true);
+
+        CamelContext context = new DefaultCamelContext(registry);
+        context.addComponent("sjms", sjmsComponent);
+        context.addComponent("sjms-batch", sjmsBatchComponent);
+        return context;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/5371913c/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/manual/ManualBatchFromQueueTest.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/manual/ManualBatchFromQueueTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/manual/ManualBatchFromQueueTest.java
new file mode 100644
index 0000000..ce92e13
--- /dev/null
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/manual/ManualBatchFromQueueTest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.manual;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.camel.CamelContext;
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.sjms.batch.ListAggregationStrategy;
+import org.apache.camel.component.sjms.batch.SjmsBatchComponent;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.SimpleRegistry;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Ignore;
+import org.junit.Test;
+
+@Ignore("Manual test")
+public class ManualBatchFromQueueTest extends CamelTestSupport {
+
+    // using failover will automatic re-connect with ActiveMQ
+    // private String url = "failover:tcp://localhost:61616";
+    private String url = "tcp://localhost:61616";
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        SimpleRegistry registry = new SimpleRegistry();
+        registry.put("testStrategy", new ListAggregationStrategy());
+
+        CamelContext camel = new DefaultCamelContext(registry);
+
+        SjmsBatchComponent sjms = new SjmsBatchComponent();
+        sjms.setAsyncStartListener(true);
+        log.info("Using live connection to existing ActiveMQ broker running on {}", url);
+        sjms.setConnectionFactory(new ActiveMQConnectionFactory(url));
+
+        camel.addComponent("sjms-batch", sjms);
+
+        return camel;
+    }
+
+    @Test
+    public void testConsume() throws Exception {
+        getMockEndpoint("mock:foo").expectedMinimumMessageCount(1);
+
+        assertMockEndpointsSatisfied(1, TimeUnit.MINUTES);
+    }
+
+    @Override
+    protected RoutesBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("sjms-batch:queue:foo?asyncStartListener=true&completionSize=3&completionTimeout=60000&aggregationStrategy=#testStrategy")
+                    .to("log:foo")
+                    .to("mock:foo");
+            }
+        };
+    }
+}