You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/03/28 08:24:17 UTC

[1/6] camel git commit: formatting changes for compliance

Repository: camel
Updated Branches:
  refs/heads/master 0dc847eaf -> cded03c5a


formatting changes for compliance


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/927205e6
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/927205e6
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/927205e6

Branch: refs/heads/master
Commit: 927205e6cbbf44ea3f98ead1ab03ffb196b35b61
Parents: 3879b2c
Author: Bryan Love <br...@iovation.com>
Authored: Mon Mar 27 17:22:12 2017 -0700
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Mar 28 10:03:54 2017 +0200

----------------------------------------------------------------------
 .../component/sjms/batch/SjmsBatchConsumer.java | 10 +++--
 .../component/sjms/batch/SjmsBatchEndpoint.java |  2 +-
 .../component/sjms/support/MockConnection.java  | 36 +++++++++++++-----
 .../sjms/support/MockConnectionFactory.java     | 32 ++++++++++++----
 .../sjms/support/MockMessageConsumer.java       | 36 +++++++++++++-----
 .../component/sjms/support/MockSession.java     | 40 ++++++++++++++++----
 6 files changed, 118 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/927205e6/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
index 5a28dc2..708cbba 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
@@ -298,7 +298,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
         public AtomicBoolean getCompletionTimeoutTrigger() {
             return completionTimeoutTrigger;
         }
-        public void setKeepAliveDelay(int i){
+        public void setKeepAliveDelay(int i) {
             keepAliveDelay = i;
         }
 
@@ -325,11 +325,15 @@ public class SjmsBatchConsumer extends DefaultConsumer {
                     } catch (javax.jms.IllegalStateException ex) {
                         // from consumeBatchesOnLoop
                         // if keepAliveDelay was not specified (defaults to -1) just rethrow to break the loop. This preserves original default behavior
-                        if(keepAliveDelay < 0) throw ex;
+                        if (keepAliveDelay < 0) {
+                            throw ex;
+                        }
                         // this will log the exception and the parent loop will create a new session
                         getExceptionHandler().handleException("Exception caught consuming from " + destinationName, ex);
                         //sleep to avoid log spamming
-                        if(keepAliveDelay > 0) Thread.sleep(keepAliveDelay);
+                        if (keepAliveDelay > 0) {
+                            Thread.sleep(keepAliveDelay);
+                        }
                     } finally {
                         closeJmsSession(session);
                     }

http://git-wip-us.apache.org/repos/asf/camel/blob/927205e6/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
index 395c23f..c8debd7 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
@@ -407,7 +407,7 @@ public class SjmsBatchEndpoint extends DefaultEndpoint implements HeaderFilterSt
      * it will bail out and the route will shut down if it sees an IllegalStateException.
      */
     public void setKeepAliveDelay(int keepAliveDelay) {
-         this.keepAliveDelay = keepAliveDelay;
+        this.keepAliveDelay = keepAliveDelay;
     }
     public int getKeepAliveDelay() {
         return keepAliveDelay;

http://git-wip-us.apache.org/repos/asf/camel/blob/927205e6/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockConnection.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockConnection.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockConnection.java
index 00f06be..441a263 100644
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockConnection.java
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockConnection.java
@@ -1,18 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.camel.component.sjms.support;
 
+import javax.jms.JMSException;
+import javax.jms.Session;
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.management.JMSStatsImpl;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.util.IdGenerator;
 
-import javax.jms.JMSException;
-import javax.jms.Session;
-
 /**
  * Created by bryan.love on 3/22/17.
  */
 public class MockConnection extends ActiveMQConnection {
-    private int returnBadSessionNTimes = 0;
+    private int returnBadSessionNTimes;
 
     protected MockConnection(final Transport transport, IdGenerator clientIdGenerator, IdGenerator connectionIdGenerator, JMSStatsImpl factoryStats, int returnBadSessionNTimes) throws Exception {
         super(transport,  clientIdGenerator,  connectionIdGenerator,  factoryStats);
@@ -22,22 +37,23 @@ public class MockConnection extends ActiveMQConnection {
     public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
         this.checkClosedOrFailed();
         this.ensureConnectionInfoSent();
-        if(!transacted) {
-            if(acknowledgeMode == 0) {
+        if (!transacted) {
+            if (acknowledgeMode == 0) {
                 throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session");
             }
 
-            if(acknowledgeMode < 0 || acknowledgeMode > 4) {
-                throw new JMSException("invalid acknowledgeMode: " + acknowledgeMode + ". Valid values are Session.AUTO_ACKNOWLEDGE (1), Session.CLIENT_ACKNOWLEDGE (2), Session.DUPS_OK_ACKNOWLEDGE (3), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE (4) or for transacted sessions Session.SESSION_TRANSACTED (0)");
+            if (acknowledgeMode < 0 || acknowledgeMode > 4) {
+                throw new JMSException("invalid acknowledgeMode: " + acknowledgeMode + ". Valid values are Session.AUTO_ACKNOWLEDGE (1), Session.CLIENT_ACKNOWLEDGE (2), "
+                + "Session.DUPS_OK_ACKNOWLEDGE (3), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE (4) or for transacted sessions Session.SESSION_TRANSACTED (0)");
             }
         }
 
         boolean useBadSession = false;
-        if(returnBadSessionNTimes > 0){
+        if (returnBadSessionNTimes > 0) {
             useBadSession = true;
             returnBadSessionNTimes = returnBadSessionNTimes - 1;
         }
-        return new MockSession(this, this.getNextSessionId(), transacted?0:acknowledgeMode, this.isDispatchAsync(), this.isAlwaysSessionAsync(), useBadSession);
+        return new MockSession(this, this.getNextSessionId(), transacted ? 0 : acknowledgeMode, this.isDispatchAsync(), this.isAlwaysSessionAsync(), useBadSession);
 
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/927205e6/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockConnectionFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockConnectionFactory.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockConnectionFactory.java
index 75cbe0f..0158ff7 100644
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockConnectionFactory.java
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockConnectionFactory.java
@@ -1,27 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.camel.component.sjms.support;
 
+import java.net.URI;
+import java.net.URISyntaxException;
+import javax.jms.Connection;
+import javax.jms.JMSException;
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.management.JMSStatsImpl;
 import org.apache.activemq.transport.Transport;
 
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import java.net.URI;
-import java.net.URISyntaxException;
 
 /**
  * Created by bryan.love on 3/22/17.
  */
 public class MockConnectionFactory extends ActiveMQConnectionFactory {
-    private int returnBadSessionNTimes = 0;
+    private int returnBadSessionNTimes;
 
-    public Connection createConnection() throws JMSException {
-        return this.createActiveMQConnection();
-    }
     public MockConnectionFactory(String brokerURL) {
         super(createURI(brokerURL));
     }
+    public Connection createConnection() throws JMSException {
+        return this.createActiveMQConnection();
+    }
     private static URI createURI(String brokerURL) {
         try {
             return new URI(brokerURL);

http://git-wip-us.apache.org/repos/asf/camel/blob/927205e6/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockMessageConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockMessageConsumer.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockMessageConsumer.java
index 624c152..467aedf 100644
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockMessageConsumer.java
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockMessageConsumer.java
@@ -1,29 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.camel.component.sjms.support;
 
-import org.apache.activemq.ActiveMQMessageConsumer;
-import org.apache.activemq.ActiveMQSession;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ConsumerId;
-import org.apache.activemq.command.MessageDispatch;
-
 import javax.jms.IllegalStateException;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageListener;
+import org.apache.activemq.ActiveMQMessageConsumer;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ConsumerId;
 
 /**
  * Created by bryan.love on 3/22/17.
  */
-public class MockMessageConsumer extends ActiveMQMessageConsumer{
+public class MockMessageConsumer extends ActiveMQMessageConsumer {
     private boolean isBadSession;
 
-    public MockMessageConsumer(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination dest, String name, String selector, int prefetch, int maximumPendingMessageCount, boolean noLocal, boolean browser, boolean dispatchAsync, MessageListener messageListener, boolean isBadSession) throws JMSException {
+    public MockMessageConsumer(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination dest, String name, String selector, int prefetch,
+                               int maximumPendingMessageCount, boolean noLocal, boolean browser, boolean dispatchAsync, MessageListener messageListener,
+                               boolean isBadSession) throws JMSException {
         super(session, consumerId, dest, name, selector, prefetch, maximumPendingMessageCount, noLocal, browser, dispatchAsync, messageListener);
         this.isBadSession = isBadSession;
     }
 
     public Message receive(long timeout) throws JMSException {
-        if(isBadSession) throw new IllegalStateException("asdf");
+        if (isBadSession) {
+            throw new IllegalStateException("asdf");
+        }
         return super.receive(timeout);
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/927205e6/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockSession.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockSession.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockSession.java
index 4290e34..391d734 100644
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockSession.java
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockSession.java
@@ -1,18 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.camel.component.sjms.support;
 
-import org.apache.activemq.*;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Queue;
+import javax.jms.Topic;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQMessageTransformation;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.CustomDestination;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTempQueue;
 import org.apache.activemq.command.SessionId;
 
-import javax.jms.*;
 
 /**
  * Created by bryan.love on 3/22/17.
  */
 public class MockSession extends ActiveMQSession {
-    private boolean isBadSession = false;
+    private boolean isBadSession;
 
     protected MockSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch, boolean sessionAsyncDispatch, boolean isBadSession) throws JMSException {
         super(connection,  sessionId,  acknowledgeMode,  asyncDispatch,  sessionAsyncDispatch);
@@ -20,26 +45,27 @@ public class MockSession extends ActiveMQSession {
     }
     public Queue createQueue(String queueName) throws JMSException {
         this.checkClosed();
-        return (Queue)(queueName.startsWith("ID:")?new ActiveMQTempQueue(queueName):new ActiveMQQueue(queueName));
+        return (Queue)(queueName.startsWith("ID:") ? new ActiveMQTempQueue(queueName) : new ActiveMQQueue(queueName));
     }
 
     public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal, MessageListener messageListener) throws JMSException {
         this.checkClosed();
-        if(destination instanceof CustomDestination) {
+        if (destination instanceof CustomDestination) {
             CustomDestination prefetchPolicy1 = (CustomDestination)destination;
             return prefetchPolicy1.createConsumer(this, messageSelector, noLocal);
         } else {
             ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
             boolean prefetch = false;
             int prefetch1;
-            if(destination instanceof Topic) {
+            if (destination instanceof Topic) {
                 prefetch1 = prefetchPolicy.getTopicPrefetch();
             } else {
                 prefetch1 = prefetchPolicy.getQueuePrefetch();
             }
 
             ActiveMQDestination activemqDestination = ActiveMQMessageTransformation.transformDestination(destination);
-            return new MockMessageConsumer(this, this.getNextConsumerId(), activemqDestination, (String)null, messageSelector, prefetch1, prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, this.isAsyncDispatch(), messageListener, isBadSession);
+            return new MockMessageConsumer(this, this.getNextConsumerId(), activemqDestination, (String)null, messageSelector, prefetch1, prefetchPolicy.getMaximumPendingMessageLimit(),
+                                           noLocal, false, this.isAsyncDispatch(), messageListener, isBadSession);
         }
     }
 }


[6/6] camel git commit: Fixed CS. This closes #1565

Posted by da...@apache.org.
Fixed CS. This closes #1565


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/cded03c5
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/cded03c5
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/cded03c5

Branch: refs/heads/master
Commit: cded03c5a547a59a760d191959476dd3674f5429
Parents: 927205e
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Mar 28 10:17:37 2017 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Mar 28 10:18:35 2017 +0200

----------------------------------------------------------------------
 .../camel/component/sjms/SjmsComponent.java     | 23 ++++++++++++++------
 .../camel/component/sjms/SjmsEndpoint.java      | 20 ++++++++++++-----
 .../component/sjms/batch/SjmsBatchEndpoint.java | 16 ++++++++------
 .../component/sjms/support/MockConnection.java  |  3 ---
 .../sjms/support/MockConnectionFactory.java     |  4 ----
 .../sjms/support/MockMessageConsumer.java       |  4 +---
 .../component/sjms/support/MockSession.java     |  4 ----
 .../src/main/docs/sjms2-component.adoc          |  2 +-
 8 files changed, 42 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/cded03c5/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
index f4e07c0..158c0ac 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
@@ -45,7 +45,10 @@ public class SjmsComponent extends HeaderFilterStrategyComponent {
     @Metadata(label = "advanced", description = "A ConnectionResource is an interface that allows for customization and container control of the ConnectionFactory."
                     + " * See Plugable Connection Resource Management for further details.")
     private ConnectionResource connectionResource;
-    @Metadata(label = "advanced", description = "Pluggable strategy for encoding and decoding JMS keys so they can be compliant with the JMS specification. Camel provides one implementation out of the box: default. The default strategy will safely marshal dots and hyphens (. and -). Can be used for JMS brokers which do not care whether JMS header keys contain illegal characters. You can provide your own implementation of the org.apache.camel.component.jms.JmsKeyFormatStrategy and refer to it using the # notation.")
+    @Metadata(label = "advanced", description = "Pluggable strategy for encoding and decoding JMS keys so they can be compliant with the JMS specification."
+        + " Camel provides one implementation out of the box: default. The default strategy will safely marshal dots and hyphens (. and -)."
+        + " Can be used for JMS brokers which do not care whether JMS header keys contain illegal characters. You can provide your own implementation"
+        + " of the org.apache.camel.component.jms.JmsKeyFormatStrategy and refer to it using the # notation.")
     private JmsKeyFormatStrategy jmsKeyFormatStrategy = new DefaultJmsKeyFormatStrategy();
     @Metadata(defaultValue = "1", description = "The maximum number of connections available to endpoints started under this component")
     private Integer connectionCount = 1;
@@ -55,17 +58,23 @@ public class SjmsComponent extends HeaderFilterStrategyComponent {
     private TimedTaskManager timedTaskManager;
     @Metadata(label = "advanced", description = "To use a custom DestinationCreationStrategy.")
     private DestinationCreationStrategy destinationCreationStrategy;
-    @Metadata(label = "advanced", description = "To use the given MessageCreatedStrategy which are invoked when Camel creates new instances of <tt>javax.jms.Message</tt> objects when Camel is sending a JMS message.")
+    @Metadata(label = "advanced", description = "To use the given MessageCreatedStrategy which are invoked when Camel creates new instances"
+        + " of <tt>javax.jms.Message</tt> objects when Camel is sending a JMS message.")
     private MessageCreatedStrategy messageCreatedStrategy;
-    @Metadata(label = "advanced", defaultValue = "true", description = "When using the default {@link org.apache.camel.component.sjms.jms.ConnectionFactoryResource} then should each {@link javax.jms.Connection} be tested (calling start) before returned from the pool.")
+    @Metadata(label = "advanced", defaultValue = "true", description = "When using the default {@link org.apache.camel.component.sjms.jms.ConnectionFactoryResource}"
+        + " then should each {@link javax.jms.Connection} be tested (calling start) before returned from the pool.")
     private boolean connectionTestOnBorrow = true;
-    @Metadata(label = "security", secret = true, description = "The username to use when creating {@link javax.jms.Connection} when using the default {@link org.apache.camel.component.sjms.jms.ConnectionFactoryResource}.")
+    @Metadata(label = "security", secret = true, description = "The username to use when creating {@link javax.jms.Connection} when using the"
+        + " default {@link org.apache.camel.component.sjms.jms.ConnectionFactoryResource}.")
     private String connectionUsername;
-    @Metadata(label = "security", secret = true, description = "The password to use when creating {@link javax.jms.Connection} when using the default {@link org.apache.camel.component.sjms.jms.ConnectionFactoryResource}.")
+    @Metadata(label = "security", secret = true, description = "The password to use when creating {@link javax.jms.Connection} when using the"
+        + " default {@link org.apache.camel.component.sjms.jms.ConnectionFactoryResource}.")
     private String connectionPassword;
-    @Metadata(label = "advanced", description = "The client ID to use when creating {@link javax.jms.Connection} when using the default {@link org.apache.camel.component.sjms.jms.ConnectionFactoryResource}.")
+    @Metadata(label = "advanced", description = "The client ID to use when creating {@link javax.jms.Connection} when using the"
+        + " default {@link org.apache.camel.component.sjms.jms.ConnectionFactoryResource}.")
     private String connectionClientId;
-    @Metadata(label = "advanced", defaultValue = "5000", description = "The max wait time in millis to block and wait on free connection when the pool is exhausted when using the default {@link org.apache.camel.component.sjms.jms.ConnectionFactoryResource}.")
+    @Metadata(label = "advanced", defaultValue = "5000", description = "The max wait time in millis to block and wait on free connection when the pool"
+        + " is exhausted when using the default {@link org.apache.camel.component.sjms.jms.ConnectionFactoryResource}.")
     private long connectionMaxWait = 5000;
 
     public SjmsComponent() {

http://git-wip-us.apache.org/repos/asf/camel/blob/cded03c5/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
index 77b53a1..c73ebb0 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
@@ -81,13 +81,15 @@ public class SjmsEndpoint extends DefaultEndpoint implements AsyncEndpoint, Mult
             description = "To use a custom HeaderFilterStrategy to filter header to and from Camel message.")
     private HeaderFilterStrategy headerFilterStrategy;
     @UriParam(label = "advanced",
-            description = "Whether to include all JMSXxxx properties when mapping from JMS to Camel Message. Setting this to true will include properties such as JMSXAppID, and JMSXUserID etc. Note: If you are using a custom headerFilterStrategy then this option does not apply.")
+            description = "Whether to include all JMSXxxx properties when mapping from JMS to Camel Message."
+                + " Setting this to true will include properties such as JMSXAppID, and JMSXUserID etc. Note: If you are using a custom headerFilterStrategy then this option does not apply.")
     private boolean includeAllJMSXProperties;
     @UriParam(label = "consumer,transaction",
             description = "Specifies whether to use transacted mode")
     private boolean transacted;
     @UriParam(label = "transaction,advanced", defaultValue = "true",
-            description = "Specifies whether to share JMS session with other SJMS endpoints. Turn this off if your route is accessing to multiple JMS providers. If you need transaction against multiple JMS providers, use jms component to leverage XA transaction.")
+            description = "Specifies whether to share JMS session with other SJMS endpoints. Turn this off if your route is accessing to multiple JMS providers."
+                + " If you need transaction against multiple JMS providers, use jms component to leverage XA transaction.")
     private boolean sharedJMSSession = true;
     @UriParam(label = "producer",
             description = "Sets the reply to destination name used for InOut producer endpoints.")
@@ -125,7 +127,11 @@ public class SjmsEndpoint extends DefaultEndpoint implements AsyncEndpoint, Mult
             description = "Sets timeout (in millis) for batch transactions, the value should be 1000 or higher.")
     private long transactionBatchTimeout = 5000;
     @UriParam(label = "advanced",
-            description = "Whether to startup the consumer message listener asynchronously, when starting a route. For example if a JmsConsumer cannot get a connection to a remote JMS broker, then it may block while retrying and/or failover. This will cause Camel to block while starting routes. By setting this option to true, you will let routes startup, while the JmsConsumer connects to the JMS broker using a dedicated thread in asynchronous mode. If this option is used, then beware that if the connection could not be established, then an exception is logged at WARN level, and the consumer will not be able to receive messages; You can then restart the route to retry.")
+            description = "Whether to startup the consumer message listener asynchronously, when starting a route."
+                + " For example if a JmsConsumer cannot get a connection to a remote JMS broker, then it may block while retrying and/or failover."
+                + " This will cause Camel to block while starting routes. By setting this option to true, you will let routes startup, while the JmsConsumer connects to the JMS broker"
+                + " using a dedicated thread in asynchronous mode. If this option is used, then beware that if the connection could not be established, then an exception is logged at WARN level,"
+                + " and the consumer will not be able to receive messages; You can then restart the route to retry.")
     private boolean asyncStartListener;
     @UriParam(label = "advanced",
             description = "Whether to stop the consumer message listener asynchronously, when stopping a route.")
@@ -137,7 +143,8 @@ public class SjmsEndpoint extends DefaultEndpoint implements AsyncEndpoint, Mult
             description = "Whether to allow sending messages with no body. If this option is false and the message body is null, then an JMSException is thrown.")
     private boolean allowNullBody = true;
     @UriParam(label = "advanced", defaultValue = "true",
-            description = "Specifies whether Camel should auto map the received JMS message to a suited payload type, such as javax.jms.TextMessage to a String etc. See section about how mapping works below for more details.")
+            description = "Specifies whether Camel should auto map the received JMS message to a suited payload type, such as javax.jms.TextMessage to a String etc."
+                + " See section about how mapping works below for more details.")
     private boolean mapJmsMessage = true;
     @UriParam(label = "transaction",
             description = "Sets the commit strategy.")
@@ -149,7 +156,10 @@ public class SjmsEndpoint extends DefaultEndpoint implements AsyncEndpoint, Mult
             description = "To use the given MessageCreatedStrategy which are invoked when Camel creates new instances of <tt>javax.jms.Message</tt> objects when Camel is sending a JMS message.")
     private MessageCreatedStrategy messageCreatedStrategy;
     @UriParam(label = "advanced",
-            description = "Pluggable strategy for encoding and decoding JMS keys so they can be compliant with the JMS specification. Camel provides two implementations out of the box: default and passthrough. The default strategy will safely marshal dots and hyphens (. and -). The passthrough strategy leaves the key as is. Can be used for JMS brokers which do not care whether JMS header keys contain illegal characters. You can provide your own implementation of the org.apache.camel.component.jms.JmsKeyFormatStrategy and refer to it using the # notation.")
+            description = "Pluggable strategy for encoding and decoding JMS keys so they can be compliant with the JMS specification."
+                + "Camel provides two implementations out of the box: default and passthrough. The default strategy will safely marshal dots and hyphens (. and -)."
+                + " The passthrough strategy leaves the key as is. Can be used for JMS brokers which do not care whether JMS header keys contain illegal characters."
+                + " You can provide your own implementation of the org.apache.camel.component.jms.JmsKeyFormatStrategy and refer to it using the # notation.")
     private JmsKeyFormatStrategy jmsKeyFormatStrategy;
     @UriParam(label = "advanced",
             description = "Initializes the connectionResource for the endpoint, which takes precedence over the component's connectionResource, if any")

http://git-wip-us.apache.org/repos/asf/camel/blob/cded03c5/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
index c8debd7..648aa77 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
@@ -400,6 +400,14 @@ public class SjmsBatchEndpoint extends DefaultEndpoint implements HeaderFilterSt
     }
 
     /**
+     * Specifies the interval between recovery attempts, i.e. when a connection is being refreshed, in milliseconds.
+     * The default is 5000 ms, that is, 5 seconds.
+     */
+    public void setRecoveryInterval(int recoveryInterval) {
+        this.recoveryInterval = recoveryInterval;
+    }
+
+    /**
      * The delay in millis between attempts to re-establish a valid session.
      * If this is a positive value the SjmsBatchConsumer will attempt to create a new session if it sees an IllegalStateException
      * during message consumption. This delay value allows you to pause between attempts to prevent spamming the logs.
@@ -409,15 +417,9 @@ public class SjmsBatchEndpoint extends DefaultEndpoint implements HeaderFilterSt
     public void setKeepAliveDelay(int keepAliveDelay) {
         this.keepAliveDelay = keepAliveDelay;
     }
+
     public int getKeepAliveDelay() {
         return keepAliveDelay;
     }
 
-    /**
-     * Specifies the interval between recovery attempts, i.e. when a connection is being refreshed, in milliseconds.
-     * The default is 5000 ms, that is, 5 seconds.
-     */
-    public void setRecoveryInterval(int recoveryInterval) {
-        this.recoveryInterval = recoveryInterval;
-    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/cded03c5/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockConnection.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockConnection.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockConnection.java
index 441a263..335c47b 100644
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockConnection.java
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockConnection.java
@@ -23,9 +23,6 @@ import org.apache.activemq.management.JMSStatsImpl;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.util.IdGenerator;
 
-/**
- * Created by bryan.love on 3/22/17.
- */
 public class MockConnection extends ActiveMQConnection {
     private int returnBadSessionNTimes;
 

http://git-wip-us.apache.org/repos/asf/camel/blob/cded03c5/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockConnectionFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockConnectionFactory.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockConnectionFactory.java
index 0158ff7..c7fe9f1 100644
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockConnectionFactory.java
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockConnectionFactory.java
@@ -25,10 +25,6 @@ import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.management.JMSStatsImpl;
 import org.apache.activemq.transport.Transport;
 
-
-/**
- * Created by bryan.love on 3/22/17.
- */
 public class MockConnectionFactory extends ActiveMQConnectionFactory {
     private int returnBadSessionNTimes;
 

http://git-wip-us.apache.org/repos/asf/camel/blob/cded03c5/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockMessageConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockMessageConsumer.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockMessageConsumer.java
index 467aedf..9a5a17e 100644
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockMessageConsumer.java
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockMessageConsumer.java
@@ -25,10 +25,8 @@ import org.apache.activemq.ActiveMQSession;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConsumerId;
 
-/**
- * Created by bryan.love on 3/22/17.
- */
 public class MockMessageConsumer extends ActiveMQMessageConsumer {
+
     private boolean isBadSession;
 
     public MockMessageConsumer(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination dest, String name, String selector, int prefetch,

http://git-wip-us.apache.org/repos/asf/camel/blob/cded03c5/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockSession.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockSession.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockSession.java
index 391d734..8495c9f 100644
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockSession.java
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockSession.java
@@ -32,10 +32,6 @@ import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTempQueue;
 import org.apache.activemq.command.SessionId;
 
-
-/**
- * Created by bryan.love on 3/22/17.
- */
 public class MockSession extends ActiveMQSession {
     private boolean isBadSession;
 

http://git-wip-us.apache.org/repos/asf/camel/blob/cded03c5/components/camel-sjms2/src/main/docs/sjms2-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-sjms2/src/main/docs/sjms2-component.adoc b/components/camel-sjms2/src/main/docs/sjms2-component.adoc
index c9da69a..f3c5e75 100644
--- a/components/camel-sjms2/src/main/docs/sjms2-component.adoc
+++ b/components/camel-sjms2/src/main/docs/sjms2-component.adoc
@@ -163,7 +163,7 @@ with the following path and query parameters:
 | **exceptionListener** (advanced) | Specifies the JMS Exception Listener that is to be notified of any underlying JMS exceptions. |  | ExceptionListener
 | **headerFilterStrategy** (advanced) | To use a custom HeaderFilterStrategy to filter header to and from Camel message. |  | HeaderFilterStrategy
 | **includeAllJMSXProperties** (advanced) | Whether to include all JMSXxxx properties when mapping from JMS to Camel Message. Setting this to true will include properties such as JMSXAppID and JMSXUserID etc. Note: If you are using a custom headerFilterStrategy then this option does not apply. | false | boolean
-| **jmsKeyFormatStrategy** (advanced) | Pluggable strategy for encoding and decoding JMS keys so they can be compliant with the JMS specification. Camel provides two implementations out of the box: default and passthrough. The default strategy will safely marshal dots and hyphens (. and -). The passthrough strategy leaves the key as is. Can be used for JMS brokers which do not care whether JMS header keys contain illegal characters. You can provide your own implementation of the org.apache.camel.component.jms.JmsKeyFormatStrategy and refer to it using the notation. |  | JmsKeyFormatStrategy
+| **jmsKeyFormatStrategy** (advanced) | Pluggable strategy for encoding and decoding JMS keys so they can be compliant with the JMS specification.Camel provides two implementations out of the box: default and passthrough. The default strategy will safely marshal dots and hyphens (. and -). The passthrough strategy leaves the key as is. Can be used for JMS brokers which do not care whether JMS header keys contain illegal characters. You can provide your own implementation of the org.apache.camel.component.jms.JmsKeyFormatStrategy and refer to it using the notation. |  | JmsKeyFormatStrategy
 | **mapJmsMessage** (advanced) | Specifies whether Camel should auto map the received JMS message to a suited payload type such as javax.jms.TextMessage to a String etc. See section about how mapping works below for more details. | true | boolean
 | **messageCreatedStrategy** (advanced) | To use the given MessageCreatedStrategy which are invoked when Camel creates new instances of javax.jms.Message objects when Camel is sending a JMS message. |  | MessageCreatedStrategy
 | **errorHandlerLoggingLevel** (logging) | Allows to configure the default errorHandler logging level for logging uncaught exceptions. | WARN | LoggingLevel


[3/6] camel git commit: batch consumer will create new sessions instead of bailing out now

Posted by da...@apache.org.
batch consumer will create new sessions instead of bailing out now


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a5646bbf
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a5646bbf
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a5646bbf

Branch: refs/heads/master
Commit: a5646bbf2ff78dc87036a2c319159db962764128
Parents: 0dc847e
Author: Bryan Love <br...@iovation.com>
Authored: Wed Mar 22 16:43:33 2017 -0700
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Mar 28 10:03:54 2017 +0200

----------------------------------------------------------------------
 .../component/sjms/batch/SjmsBatchConsumer.java | 34 +++++++++++++-------
 1 file changed, 22 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/a5646bbf/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
index 630bdfb..2f2440d 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
@@ -298,24 +298,34 @@ public class SjmsBatchConsumer extends DefaultConsumer {
         @Override
         public void run() {
             try {
-                // a batch corresponds to a single session that will be committed or rolled back by a background thread
-                final Session session = connection.createSession(TRANSACTED, Session.CLIENT_ACKNOWLEDGE);
-                try {
-                    // only batch consumption from queues is supported - it makes no sense to transactionally consume
-                    // from a topic as you don't car about message loss, users can just use a regular aggregator instead
-                    Queue queue = session.createQueue(destinationName);
-                    MessageConsumer consumer = session.createConsumer(queue);
-
+                // this loop is intended to keep the consumer up and running as long as it's supposed to be, but allow it to bail if signaled
+                while (running.get()) {
+                    // a batch corresponds to a single session that will be committed or rolled back by a background thread
+                    final Session session = connection.createSession(TRANSACTED, Session.CLIENT_ACKNOWLEDGE);
                     try {
-                        task.consumeBatchesOnLoop(session, consumer);
+                        // only batch consumption from queues is supported - it makes no sense to transactionally consume
+                        // from a topic as you don't car about message loss, users can just use a regular aggregator instead
+                        Queue queue = session.createQueue(destinationName);
+                        MessageConsumer consumer = session.createConsumer(queue);
+
+                        try {
+                            task.consumeBatchesOnLoop(session, consumer);
+                        } finally {
+                            closeJmsConsumer(consumer);
+                        }
+                    } catch (javax.jms.IllegalStateException ex) {
+                        // from consumeBatchesOnLoop
+                        // this will log the exception and the parent loop will create a new session
+                        getExceptionHandler().handleException("Exception caught consuming from " + destinationName, ex);
+                        //rest a minute to avoid destroying the logs
+                        Thread.sleep(2000);
                     } finally {
-                        closeJmsConsumer(consumer);
+                        closeJmsSession(session);
                     }
-                } finally {
-                    closeJmsSession(session);
                 }
             } catch (Throwable ex) {
                 // from consumeBatchesOnLoop
+                // catch anything besides the IllegalStateException and exit the application
                 getExceptionHandler().handleException("Exception caught consuming from " + destinationName, ex);
             } finally {
                 // indicate that we have shut down


[2/6] camel git commit: change the while loop to a do/while loop to fix async startup

Posted by da...@apache.org.
change the while loop to a do/while loop to fix async startup


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3879b2c0
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3879b2c0
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3879b2c0

Branch: refs/heads/master
Commit: 3879b2c0b018a6449766aa6662d1e2cc4c2593a1
Parents: 9449648
Author: Bryan Love <br...@iovation.com>
Authored: Fri Mar 24 13:07:28 2017 -0700
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Mar 28 10:03:54 2017 +0200

----------------------------------------------------------------------
 .../apache/camel/component/sjms/batch/SjmsBatchConsumer.java | 8 +++++---
 .../sjms/batch/SjmsBatchConsumerAsyncStartTest.java          | 3 ++-
 .../camel/component/sjms/batch/SjmsBatchConsumerTest.java    | 4 ++++
 3 files changed, 11 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/3879b2c0/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
index a32cc3d..5a28dc2 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
@@ -305,8 +305,10 @@ public class SjmsBatchConsumer extends DefaultConsumer {
         @Override
         public void run() {
             try {
-                // this loop is intended to keep the consumer up and running as long as it's supposed to be, but allow it to bail if signaled
-                while (running.get() || isStarting()) {
+                // This loop is intended to keep the consumer up and running as long as it's supposed to be, but allow it to bail if signaled.
+                // I'm using a do/while loop because the first time through we want to attempt it regardless of any other conditions... we
+                // only want to try AGAIN if the keepAlive is set.
+                do {
                     // a batch corresponds to a single session that will be committed or rolled back by a background thread
                     final Session session = connection.createSession(TRANSACTED, Session.CLIENT_ACKNOWLEDGE);
                     try {
@@ -331,7 +333,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
                     } finally {
                         closeJmsSession(session);
                     }
-                }
+                }while (running.get() || isStarting());
             } catch (Throwable ex) {
                 // from consumeBatchesOnLoop
                 // catch anything besides the IllegalStateException and exit the application

http://git-wip-us.apache.org/repos/asf/camel/blob/3879b2c0/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerAsyncStartTest.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerAsyncStartTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerAsyncStartTest.java
index fc0a46e..bb30840 100644
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerAsyncStartTest.java
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerAsyncStartTest.java
@@ -21,6 +21,7 @@ import javax.jms.ConnectionFactory;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.camel.CamelContext;
 import org.apache.camel.component.sjms.SjmsComponent;
+import org.apache.camel.component.sjms.support.MockConnectionFactory;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.impl.SimpleRegistry;
 
@@ -32,7 +33,7 @@ public class SjmsBatchConsumerAsyncStartTest extends SjmsBatchConsumerTest {
     public CamelContext createCamelContext() throws Exception {
         SimpleRegistry registry = new SimpleRegistry();
         registry.put("testStrategy", new ListAggregationStrategy());
-        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(broker.getTcpConnectorUri());
+        ConnectionFactory connectionFactory = new MockConnectionFactory(broker.getTcpConnectorUri());
 
         SjmsComponent sjmsComponent = new SjmsComponent();
         sjmsComponent.setConnectionFactory(connectionFactory);

http://git-wip-us.apache.org/repos/asf/camel/blob/3879b2c0/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
index 72610de..04746f2 100644
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
@@ -33,6 +33,7 @@ import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.impl.SimpleRegistry;
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.apache.camel.util.StopWatch;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -48,6 +49,9 @@ public class SjmsBatchConsumerTest extends CamelTestSupport {
     public CamelContext createCamelContext() throws Exception {
         SimpleRegistry registry = new SimpleRegistry();
         registry.put("testStrategy", new ListAggregationStrategy());
+        // the only thing special about this MockConnectionFactor is it allows us to call returnBadSessionNTimes(int)
+        // which will cause the MockSession to throw an IllegalStateException <int> times before returning a valid one.
+        // This gives us the ability to test bad sessions
         ConnectionFactory connectionFactory = new MockConnectionFactory(broker.getTcpConnectorUri());
 
         SjmsComponent sjmsComponent = new SjmsComponent();


[4/6] camel git commit: added keepAliveDelay URI param to prevent premature exit

Posted by da...@apache.org.
added keepAliveDelay URI param to prevent premature exit


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/bd6b87c5
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/bd6b87c5
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/bd6b87c5

Branch: refs/heads/master
Commit: bd6b87c5855c339396c547ca63bdfe70b4b0aa5a
Parents: a5646bb
Author: Bryan Love <br...@iovation.com>
Authored: Thu Mar 23 11:25:46 2017 -0700
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Mar 28 10:03:54 2017 +0200

----------------------------------------------------------------------
 .../component/sjms/batch/SjmsBatchConsumer.java | 21 +++++++++++++++-----
 .../component/sjms/batch/SjmsBatchEndpoint.java |  6 ++++++
 2 files changed, 22 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/bd6b87c5/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
index 2f2440d..c386c66 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
@@ -128,7 +128,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
         super.doStart();
 
         boolean recovery = getEndpoint().isAsyncStartListener();
-        StartConsumerTask task = new StartConsumerTask(recovery, getEndpoint().getRecoveryInterval());
+        StartConsumerTask task = new StartConsumerTask(recovery, getEndpoint().getRecoveryInterval(), getEndpoint().getKeepAliveDelay());
 
         if (recovery) {
             // use a background thread to keep starting the consumer until
@@ -145,11 +145,13 @@ public class SjmsBatchConsumer extends DefaultConsumer {
 
         private boolean recoveryEnabled;
         private int recoveryInterval;
+        private int keepAliveDelay;
         private long attempt;
 
-        public StartConsumerTask(boolean recoveryEnabled, int recoveryInterval) {
+        public StartConsumerTask(boolean recoveryEnabled, int recoveryInterval, int keepAliveDelay) {
             this.recoveryEnabled = recoveryEnabled;
             this.recoveryInterval = recoveryInterval;
+            this.keepAliveDelay = keepAliveDelay;
         }
 
         @Override
@@ -183,6 +185,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
                     final List<AtomicBoolean> triggers = new ArrayList<>();
                     for (int i = 0; i < consumerCount; i++) {
                         BatchConsumptionLoop loop = new BatchConsumptionLoop();
+                        loop.setKeepAliveDelay(keepAliveDelay);
                         triggers.add(loop.getCompletionTimeoutTrigger());
                         jmsConsumerExecutors.submit(loop);
                     }
@@ -290,16 +293,20 @@ public class SjmsBatchConsumer extends DefaultConsumer {
 
         private final AtomicBoolean completionTimeoutTrigger = new AtomicBoolean();
         private final BatchConsumptionTask task = new BatchConsumptionTask(completionTimeoutTrigger);
+        private int keepAliveDelay;
 
         public AtomicBoolean getCompletionTimeoutTrigger() {
             return completionTimeoutTrigger;
         }
+        public void setKeepAliveDelay(int i){
+            keepAliveDelay = i;
+        }
 
         @Override
         public void run() {
             try {
                 // this loop is intended to keep the consumer up and running as long as it's supposed to be, but allow it to bail if signaled
-                while (running.get()) {
+                while (running.get() || isStarting()) {
                     // a batch corresponds to a single session that will be committed or rolled back by a background thread
                     final Session session = connection.createSession(TRANSACTED, Session.CLIENT_ACKNOWLEDGE);
                     try {
@@ -315,10 +322,12 @@ public class SjmsBatchConsumer extends DefaultConsumer {
                         }
                     } catch (javax.jms.IllegalStateException ex) {
                         // from consumeBatchesOnLoop
+                        // if keepAliveDelay was not specified just rethrow to break the loop. This preserves original default behavior
+                        if(keepAliveDelay == -1) throw ex;
                         // this will log the exception and the parent loop will create a new session
                         getExceptionHandler().handleException("Exception caught consuming from " + destinationName, ex);
-                        //rest a minute to avoid destroying the logs
-                        Thread.sleep(2000);
+                        //sleep to avoid log spamming
+                        Thread.sleep(keepAliveDelay);
                     } finally {
                         closeJmsSession(session);
                     }
@@ -401,6 +410,8 @@ public class SjmsBatchConsumer extends DefaultConsumer {
                     long waitTime = (usingTimeout && (timeElapsed > 0))
                             ? getReceiveWaitTime(timeElapsed)
                             : pollDuration;
+
+
                     Message message = consumer.receive(waitTime);
 
                     if (running.get()) {

http://git-wip-us.apache.org/repos/asf/camel/blob/bd6b87c5/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
index 84e1fd1..2e8affb 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
@@ -93,6 +93,8 @@ public class SjmsBatchEndpoint extends DefaultEndpoint implements HeaderFilterSt
     private boolean asyncStartListener;
     @UriParam(label = "advanced", defaultValue = "5000")
     private int recoveryInterval = 5000;
+    @UriParam(label = "advanced", defaultValue = "-1")
+    private int keepAliveDelay = -1;
 
     public SjmsBatchEndpoint() {
     }
@@ -397,6 +399,10 @@ public class SjmsBatchEndpoint extends DefaultEndpoint implements HeaderFilterSt
         return recoveryInterval;
     }
 
+    public int getKeepAliveDelay() {
+        return recoveryInterval;
+    }
+
     /**
      * Specifies the interval between recovery attempts, i.e. when a connection is being refreshed, in milliseconds.
      * The default is 5000 ms, that is, 5 seconds.


[5/6] camel git commit: added documentation and test case

Posted by da...@apache.org.
added documentation and test case


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/94496488
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/94496488
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/94496488

Branch: refs/heads/master
Commit: 944964888ed512501ed7495f51dc7468a3059c46
Parents: bd6b87c
Author: Bryan Love <br...@iovation.com>
Authored: Thu Mar 23 14:04:36 2017 -0700
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Mar 28 10:03:54 2017 +0200

----------------------------------------------------------------------
 .../src/main/docs/sjms-batch-component.adoc     |  3 +-
 .../component/sjms/batch/SjmsBatchConsumer.java |  6 +--
 .../component/sjms/batch/SjmsBatchEndpoint.java | 12 ++++-
 .../sjms/batch/SjmsBatchConsumerTest.java       | 49 +++++++++++++++++++-
 .../component/sjms/support/MockConnection.java  | 43 +++++++++++++++++
 .../sjms/support/MockConnectionFactory.java     | 42 +++++++++++++++++
 .../sjms/support/MockMessageConsumer.java       | 29 ++++++++++++
 .../component/sjms/support/MockSession.java     | 45 ++++++++++++++++++
 8 files changed, 222 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/94496488/components/camel-sjms/src/main/docs/sjms-batch-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/docs/sjms-batch-component.adoc b/components/camel-sjms/src/main/docs/sjms-batch-component.adoc
index cf8f2b2..3ed1d86 100644
--- a/components/camel-sjms/src/main/docs/sjms-batch-component.adoc
+++ b/components/camel-sjms/src/main/docs/sjms-batch-component.adoc
@@ -148,7 +148,7 @@ with the following path and query parameters:
 | **destinationName** | *Required* The destination name. Only queues are supported names may be prefixed by 'queue:'. |  | String
 |=======================================================================
 
-#### Query Parameters (22 parameters):
+#### Query Parameters (23 parameters):
 
 [width="100%",cols="2,5,^1,2",options="header"]
 |=======================================================================
@@ -171,6 +171,7 @@ with the following path and query parameters:
 | **asyncStartListener** (advanced) | Whether to startup the consumer message listener asynchronously when starting a route. For example if a JmsConsumer cannot get a connection to a remote JMS broker then it may block while retrying and/or failover. This will cause Camel to block while starting routes. By setting this option to true you will let routes startup while the JmsConsumer connects to the JMS broker using a dedicated thread in asynchronous mode. If this option is used then beware that if the connection could not be established then an exception is logged at WARN level and the consumer will not be able to receive messages; You can then restart the route to retry. | false | boolean
 | **headerFilterStrategy** (advanced) | To use a custom HeaderFilterStrategy to filter header to and from Camel message. |  | HeaderFilterStrategy
 | **jmsKeyFormatStrategy** (advanced) | Pluggable strategy for encoding and decoding JMS keys so they can be compliant with the JMS specification. Camel provides two implementations out of the box: default and passthrough. The default strategy will safely marshal dots and hyphens (. and -). The passthrough strategy leaves the key as is. Can be used for JMS brokers which do not care whether JMS header keys contain illegal characters. You can provide your own implementation of the org.apache.camel.component.jms.JmsKeyFormatStrategy and refer to it using the notation. |  | JmsKeyFormatStrategy
+| **keepAliveDelay** (advanced) | The delay in millis between attempts to re-establish a valid session. If this is a positive value the SjmsBatchConsumer will attempt to create a new session if it sees an IllegalStateException during message consumption. This delay value allows you to pause between attempts to prevent spamming the logs. If this is a negative value (default is -1) then the SjmsBatchConsumer will behave as it always has before - that is it will bail out and the route will shut down if it sees an IllegalStateException. | -1 | int
 | **messageCreatedStrategy** (advanced) | To use the given MessageCreatedStrategy which are invoked when Camel creates new instances of javax.jms.Message objects when Camel is sending a JMS message. |  | MessageCreatedStrategy
 | **recoveryInterval** (advanced) | Specifies the interval between recovery attempts i.e. when a connection is being refreshed in milliseconds. The default is 5000 ms that is 5 seconds. | 5000 | int
 | **synchronous** (advanced) | Sets whether synchronous processing should be strictly used or Camel is allowed to use asynchronous processing (if supported). | false | boolean

http://git-wip-us.apache.org/repos/asf/camel/blob/94496488/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
index c386c66..a32cc3d 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
@@ -322,12 +322,12 @@ public class SjmsBatchConsumer extends DefaultConsumer {
                         }
                     } catch (javax.jms.IllegalStateException ex) {
                         // from consumeBatchesOnLoop
-                        // if keepAliveDelay was not specified just rethrow to break the loop. This preserves original default behavior
-                        if(keepAliveDelay == -1) throw ex;
+                        // if keepAliveDelay was not specified (defaults to -1) just rethrow to break the loop. This preserves original default behavior
+                        if(keepAliveDelay < 0) throw ex;
                         // this will log the exception and the parent loop will create a new session
                         getExceptionHandler().handleException("Exception caught consuming from " + destinationName, ex);
                         //sleep to avoid log spamming
-                        Thread.sleep(keepAliveDelay);
+                        if(keepAliveDelay > 0) Thread.sleep(keepAliveDelay);
                     } finally {
                         closeJmsSession(session);
                     }

http://git-wip-us.apache.org/repos/asf/camel/blob/94496488/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
index 2e8affb..395c23f 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
@@ -399,8 +399,18 @@ public class SjmsBatchEndpoint extends DefaultEndpoint implements HeaderFilterSt
         return recoveryInterval;
     }
 
+    /**
+     * The delay in millis between attempts to re-establish a valid session.
+     * If this is a positive value the SjmsBatchConsumer will attempt to create a new session if it sees an IllegalStateException
+     * during message consumption. This delay value allows you to pause between attempts to prevent spamming the logs.
+     * If this is a negative value (default is -1) then the SjmsBatchConsumer will behave as it always has before - that is
+     * it will bail out and the route will shut down if it sees an IllegalStateException.
+     */
+    public void setKeepAliveDelay(int keepAliveDelay) {
+         this.keepAliveDelay = keepAliveDelay;
+    }
     public int getKeepAliveDelay() {
-        return recoveryInterval;
+        return keepAliveDelay;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/camel/blob/94496488/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
index e378457..72610de 100644
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
@@ -21,7 +21,6 @@ import java.util.Date;
 import java.util.List;
 import javax.jms.ConnectionFactory;
 
-import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.LoggingLevel;
@@ -29,6 +28,7 @@ import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.component.sjms.SjmsComponent;
+import org.apache.camel.component.sjms.support.MockConnectionFactory;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.impl.SimpleRegistry;
 import org.apache.camel.test.junit4.CamelTestSupport;
@@ -48,7 +48,7 @@ public class SjmsBatchConsumerTest extends CamelTestSupport {
     public CamelContext createCamelContext() throws Exception {
         SimpleRegistry registry = new SimpleRegistry();
         registry.put("testStrategy", new ListAggregationStrategy());
-        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(broker.getTcpConnectorUri());
+        ConnectionFactory connectionFactory = new MockConnectionFactory(broker.getTcpConnectorUri());
 
         SjmsComponent sjmsComponent = new SjmsComponent();
         sjmsComponent.setConnectionFactory(connectionFactory);
@@ -338,6 +338,51 @@ public class SjmsBatchConsumerTest extends CamelTestSupport {
 
     }
 
+    @Test
+    public void testConsumptionBadSession() throws Exception {
+
+        final int messageCount = 5;
+        final int consumerCount = 1;
+        SjmsBatchComponent sb = (SjmsBatchComponent)context.getComponent("sjms-batch");
+        MockConnectionFactory cf = (MockConnectionFactory)sb.getConnectionFactory();
+        cf.returnBadSessionNTimes(2);
+
+        final String queueName = getQueueName();
+        context.addRoutes(new TransactedSendHarness(queueName));
+        context.addRoutes(new RouteBuilder() {
+            public void configure() throws Exception {
+
+                int completionTimeout = 1000;
+                int completionSize = 200;
+
+                // keepAliveDelay=300 is the key... it's a 300 millis delay between attempts to create a new session.
+                fromF("sjms-batch:%s?completionTimeout=%s&completionSize=%s&consumerCount=%s&aggregationStrategy=#testStrategy&keepAliveDelay=300",
+                        queueName, completionTimeout, completionSize, consumerCount)
+                        .routeId("batchConsumer").startupOrder(10).autoStartup(false)
+                        .split(body())
+                        .to("mock:split");
+            }
+        });
+        context.start();
+
+        MockEndpoint mockBefore = getMockEndpoint("mock:before");
+        mockBefore.setExpectedMessageCount(messageCount);
+
+        MockEndpoint mockSplit = getMockEndpoint("mock:split");
+        mockSplit.setExpectedMessageCount(messageCount);
+
+        LOG.info("Sending messages");
+        template.sendBody("direct:in", generateStrings(messageCount));
+        LOG.info("Send complete");
+
+        StopWatch stopWatch = new StopWatch();
+        context.startRoute("batchConsumer");
+
+        assertMockEndpointsSatisfied();
+        long time = stopWatch.stop();
+
+    }
+
     private void assertFirstMessageBodyOfLength(MockEndpoint mockEndpoint, int expectedLength) {
         Exchange exchange = mockEndpoint.getExchanges().get(0);
         assertEquals(expectedLength, exchange.getIn().getBody(List.class).size());

http://git-wip-us.apache.org/repos/asf/camel/blob/94496488/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockConnection.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockConnection.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockConnection.java
new file mode 100644
index 0000000..00f06be
--- /dev/null
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockConnection.java
@@ -0,0 +1,43 @@
+package org.apache.camel.component.sjms.support;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.management.JMSStatsImpl;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.util.IdGenerator;
+
+import javax.jms.JMSException;
+import javax.jms.Session;
+
+/**
+ * Created by bryan.love on 3/22/17.
+ */
+public class MockConnection extends ActiveMQConnection {
+    private int returnBadSessionNTimes = 0;
+
+    protected MockConnection(final Transport transport, IdGenerator clientIdGenerator, IdGenerator connectionIdGenerator, JMSStatsImpl factoryStats, int returnBadSessionNTimes) throws Exception {
+        super(transport,  clientIdGenerator,  connectionIdGenerator,  factoryStats);
+        this.returnBadSessionNTimes = returnBadSessionNTimes;
+    }
+
+    public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
+        this.checkClosedOrFailed();
+        this.ensureConnectionInfoSent();
+        if(!transacted) {
+            if(acknowledgeMode == 0) {
+                throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session");
+            }
+
+            if(acknowledgeMode < 0 || acknowledgeMode > 4) {
+                throw new JMSException("invalid acknowledgeMode: " + acknowledgeMode + ". Valid values are Session.AUTO_ACKNOWLEDGE (1), Session.CLIENT_ACKNOWLEDGE (2), Session.DUPS_OK_ACKNOWLEDGE (3), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE (4) or for transacted sessions Session.SESSION_TRANSACTED (0)");
+            }
+        }
+
+        boolean useBadSession = false;
+        if(returnBadSessionNTimes > 0){
+            useBadSession = true;
+            returnBadSessionNTimes = returnBadSessionNTimes - 1;
+        }
+        return new MockSession(this, this.getNextSessionId(), transacted?0:acknowledgeMode, this.isDispatchAsync(), this.isAlwaysSessionAsync(), useBadSession);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/94496488/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockConnectionFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockConnectionFactory.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockConnectionFactory.java
new file mode 100644
index 0000000..75cbe0f
--- /dev/null
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockConnectionFactory.java
@@ -0,0 +1,42 @@
+package org.apache.camel.component.sjms.support;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.management.JMSStatsImpl;
+import org.apache.activemq.transport.Transport;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+/**
+ * Created by bryan.love on 3/22/17.
+ */
+public class MockConnectionFactory extends ActiveMQConnectionFactory {
+    private int returnBadSessionNTimes = 0;
+
+    public Connection createConnection() throws JMSException {
+        return this.createActiveMQConnection();
+    }
+    public MockConnectionFactory(String brokerURL) {
+        super(createURI(brokerURL));
+    }
+    private static URI createURI(String brokerURL) {
+        try {
+            return new URI(brokerURL);
+        } catch (URISyntaxException var2) {
+            throw (IllegalArgumentException)(new IllegalArgumentException("Invalid broker URI: " + brokerURL)).initCause(var2);
+        }
+    }
+
+    protected ActiveMQConnection createActiveMQConnection(Transport transport, JMSStatsImpl stats) throws Exception {
+        MockConnection connection = new MockConnection(transport, this.getClientIdGenerator(), this.getConnectionIdGenerator(), stats, returnBadSessionNTimes);
+        return connection;
+    }
+
+    public void returnBadSessionNTimes(int returnBadSessionNTimes) {
+        this.returnBadSessionNTimes = returnBadSessionNTimes;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/94496488/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockMessageConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockMessageConsumer.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockMessageConsumer.java
new file mode 100644
index 0000000..624c152
--- /dev/null
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockMessageConsumer.java
@@ -0,0 +1,29 @@
+package org.apache.camel.component.sjms.support;
+
+import org.apache.activemq.ActiveMQMessageConsumer;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.MessageDispatch;
+
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+
+/**
+ * Created by bryan.love on 3/22/17.
+ */
+public class MockMessageConsumer extends ActiveMQMessageConsumer{
+    private boolean isBadSession;
+
+    public MockMessageConsumer(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination dest, String name, String selector, int prefetch, int maximumPendingMessageCount, boolean noLocal, boolean browser, boolean dispatchAsync, MessageListener messageListener, boolean isBadSession) throws JMSException {
+        super(session, consumerId, dest, name, selector, prefetch, maximumPendingMessageCount, noLocal, browser, dispatchAsync, messageListener);
+        this.isBadSession = isBadSession;
+    }
+
+    public Message receive(long timeout) throws JMSException {
+        if(isBadSession) throw new IllegalStateException("asdf");
+        return super.receive(timeout);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/94496488/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockSession.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockSession.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockSession.java
new file mode 100644
index 0000000..4290e34
--- /dev/null
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockSession.java
@@ -0,0 +1,45 @@
+package org.apache.camel.component.sjms.support;
+
+import org.apache.activemq.*;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTempQueue;
+import org.apache.activemq.command.SessionId;
+
+import javax.jms.*;
+
+/**
+ * Created by bryan.love on 3/22/17.
+ */
+public class MockSession extends ActiveMQSession {
+    private boolean isBadSession = false;
+
+    protected MockSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch, boolean sessionAsyncDispatch, boolean isBadSession) throws JMSException {
+        super(connection,  sessionId,  acknowledgeMode,  asyncDispatch,  sessionAsyncDispatch);
+        this.isBadSession = isBadSession;
+    }
+    public Queue createQueue(String queueName) throws JMSException {
+        this.checkClosed();
+        return (Queue)(queueName.startsWith("ID:")?new ActiveMQTempQueue(queueName):new ActiveMQQueue(queueName));
+    }
+
+    public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal, MessageListener messageListener) throws JMSException {
+        this.checkClosed();
+        if(destination instanceof CustomDestination) {
+            CustomDestination prefetchPolicy1 = (CustomDestination)destination;
+            return prefetchPolicy1.createConsumer(this, messageSelector, noLocal);
+        } else {
+            ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
+            boolean prefetch = false;
+            int prefetch1;
+            if(destination instanceof Topic) {
+                prefetch1 = prefetchPolicy.getTopicPrefetch();
+            } else {
+                prefetch1 = prefetchPolicy.getQueuePrefetch();
+            }
+
+            ActiveMQDestination activemqDestination = ActiveMQMessageTransformation.transformDestination(destination);
+            return new MockMessageConsumer(this, this.getNextConsumerId(), activemqDestination, (String)null, messageSelector, prefetch1, prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, this.isAsyncDispatch(), messageListener, isBadSession);
+        }
+    }
+}