You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/02/20 04:48:58 UTC

[camel] branch master updated: CAMEL-6950 camel-sjms: Lacks reconnection logic in case of exception (#3581)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b04199  CAMEL-6950 camel-sjms: Lacks reconnection logic in case of exception (#3581)
5b04199 is described below

commit 5b041991668c7442c9204776834489aac2fa61ec
Author: Jan Filipski <gl...@users.noreply.github.com>
AuthorDate: Thu Feb 20 05:48:45 2020 +0100

    CAMEL-6950 camel-sjms: Lacks reconnection logic in case of exception (#3581)
    
    * CAMEL-6950 camel-sjms: Lacks reconnection logic in case of exception
    
    * CAMEL-6950 camel-sjms: Lacks reconnection logic in case of exception
    Attempt with adjustable reconnect delay and connection separation for restart
---
 .../sjms/AggregatedExceptionListener.java          |  53 +++++
 .../apache/camel/component/sjms/SjmsConsumer.java  | 264 +++++++++++++--------
 .../apache/camel/component/sjms/SjmsEndpoint.java  |  25 ++
 .../apache/camel/component/sjms/SjmsProducer.java  |  22 +-
 .../component/sjms/producer/InOutProducer.java     |  17 +-
 .../component/sjms/ReconnectConsumerTest.java      |  55 +++++
 .../component/sjms/ReconnectInOutProducerTest.java | 115 +++++++++
 .../component/sjms/ReconnectProducerTest.java      | 101 ++++++++
 .../component/sjms/support/JmsTestSupport.java     |  25 +-
 9 files changed, 572 insertions(+), 105 deletions(-)

diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/AggregatedExceptionListener.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/AggregatedExceptionListener.java
new file mode 100644
index 0000000..bed3bc5
--- /dev/null
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/AggregatedExceptionListener.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AggregatedExceptionListener implements ExceptionListener {
+    private static final Logger LOG = LoggerFactory.getLogger(AggregatedExceptionListener.class);
+    private final Set<ExceptionListener> listenerSet;
+
+    public AggregatedExceptionListener(ExceptionListener... exceptionListeners) {
+        this.listenerSet = new HashSet<>(exceptionListeners.length);
+        for (ExceptionListener exceptionListener : exceptionListeners) {
+            if (exceptionListener instanceof AggregatedExceptionListener) {
+                listenerSet.addAll(((AggregatedExceptionListener) exceptionListener).listenerSet); //prevent multiwrapping
+            } else {
+                listenerSet.add(exceptionListener);
+            }
+        }
+    }
+
+    @Override
+    public void onException(JMSException exception) {
+        for (ExceptionListener listener : listenerSet) {
+            try {
+                listener.onException(exception);
+            } catch (Exception ex) {
+                LOG.error("Exception listeners shouldn't throw exceptions", ex);
+            }
+        }
+    }
+}
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java
index 898377e..7bc2100 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java
@@ -16,10 +16,20 @@
  */
 package org.apache.camel.component.sjms;
 
-import java.util.concurrent.ExecutorService;
+import java.lang.ref.WeakReference;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.WeakHashMap;
 import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.stream.Collectors;
 
 import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.Session;
@@ -38,8 +48,7 @@ import org.apache.camel.component.sjms.tx.SessionBatchTransactionSynchronization
 import org.apache.camel.component.sjms.tx.SessionTransactionSynchronization;
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.support.DefaultConsumer;
-import org.apache.commons.pool.BasePoolableObjectFactory;
-import org.apache.commons.pool.impl.GenericObjectPool;
+import org.apache.camel.util.backoff.BackOffTimer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,52 +59,10 @@ public class SjmsConsumer extends DefaultConsumer {
 
     private static final Logger LOG = LoggerFactory.getLogger(SjmsConsumer.class);
 
-    protected GenericObjectPool<MessageConsumerResources> consumers;
-    private ExecutorService executor;
+    private final Map<Connection, List<MessageConsumerResources>> consumers = new WeakHashMap<>();
+    private ScheduledExecutorService scheduler;
     private Future<?> asyncStart;
-
-    /**
-     * A pool of MessageConsumerResources created at the initialization of the associated consumer.
-     */
-    protected class MessageConsumerResourcesFactory extends BasePoolableObjectFactory<MessageConsumerResources> {
-
-        /**
-         * Creates a new MessageConsumerResources instance.
-         *
-         * @see org.apache.commons.pool.PoolableObjectFactory#makeObject()
-         */
-        @Override
-        public MessageConsumerResources makeObject() throws Exception {
-            return createConsumer();
-        }
-
-        /**
-         * Cleans up the MessageConsumerResources.
-         *
-         * @see org.apache.commons.pool.PoolableObjectFactory#destroyObject(java.lang.Object)
-         */
-        @Override
-        public void destroyObject(MessageConsumerResources model) throws Exception {
-            if (model != null) {
-                // First clean up our message consumer
-                if (model.getMessageConsumer() != null) {
-                    model.getMessageConsumer().close();
-                }
-
-                // If the resource has a 
-                if (model.getSession() != null) {
-                    if (model.getSession().getTransacted()) {
-                        try {
-                            model.getSession().rollback();
-                        } catch (Exception e) {
-                            // Do nothing. Just make sure we are cleaned up
-                        }
-                    }
-                    model.getSession().close();
-                }
-            }
-        }
-    }
+    private BackOffTimer.Task rescheduleTask;
 
     public SjmsConsumer(Endpoint endpoint, Processor processor) {
         super(endpoint, processor);
@@ -110,36 +77,58 @@ public class SjmsConsumer extends DefaultConsumer {
     protected void doStart() throws Exception {
         super.doStart();
 
-        this.executor = getEndpoint().getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "SjmsConsumer");
-        if (consumers == null) {
-            consumers = new GenericObjectPool<>(new MessageConsumerResourcesFactory());
-            consumers.setMaxActive(getConsumerCount());
-            consumers.setMaxIdle(getConsumerCount());
-            if (getEndpoint().isAsyncStartListener()) {
-                asyncStart = getEndpoint().getComponent().getAsyncStartStopExecutorService().submit(new Runnable() {
-                    @Override
-                    public void run() {
-                        try {
-                            fillConsumersPool();
-                        } catch (Throwable e) {
-                            LOG.warn("Error starting listener container on destination: " + getDestinationName() + ". This exception will be ignored.", e);
+        this.scheduler = getEndpoint().getCamelContext().getExecutorServiceManager().newDefaultScheduledThreadPool(this, "SjmsConsumer");
+        if (getEndpoint().isAsyncStartListener()) {
+            asyncStart = getEndpoint().getComponent().getAsyncStartStopExecutorService().submit(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        fillConsumersPool();
+                    } catch (Throwable e) {
+                        LOG.warn("Error starting listener container on destination: " + getDestinationName() + ". This exception will be ignored.", e);
+                        if (getEndpoint().isReconnectOnError()) {
+                            scheduleRefill(); //we should try to fill consumer pool on next time
                         }
                     }
+                }
 
-                    @Override
-                    public String toString() {
-                        return "AsyncStartListenerTask[" + getDestinationName() + "]";
-                    }
-                });
-            } else {
-                fillConsumersPool();
-            }
+                @Override
+                public String toString() {
+                    return "AsyncStartListenerTask[" + getDestinationName() + "]";
+                }
+            });
+        } else {
+            fillConsumersPool();
         }
     }
 
     private void fillConsumersPool() throws Exception {
-        while (consumers.getNumIdle() < consumers.getMaxIdle()) {
-            consumers.addObject();
+        synchronized (consumers) {
+            while (consumers.values().stream().collect(Collectors.summarizingInt(List::size)).getSum() < getConsumerCount()) {
+                addConsumer();
+            }
+        }
+    }
+
+    public void destroyObject(MessageConsumerResources model) {
+        try {
+            if (model.getMessageConsumer() != null) {
+                model.getMessageConsumer().close();
+            }
+
+            // If the resource has a
+            if (model.getSession() != null) {
+                if (model.getSession().getTransacted()) {
+                    try {
+                        model.getSession().rollback();
+                    } catch (Exception e) {
+                        // Do nothing. Just make sure we are cleaned up
+                    }
+                }
+                model.getSession().close();
+            }
+        } catch (JMSException ex) {
+            LOG.warn("Exception caught on closing consumer", ex);
         }
     }
 
@@ -149,31 +138,36 @@ public class SjmsConsumer extends DefaultConsumer {
         if (asyncStart != null && !asyncStart.isDone()) {
             asyncStart.cancel(true);
         }
-        if (consumers != null) {
-            if (getEndpoint().isAsyncStopListener()) {
-                getEndpoint().getComponent().getAsyncStartStopExecutorService().submit(new Runnable() {
-                    @Override
-                    public void run() {
-                        try {
-                            consumers.close();
-                            consumers = null;
-                        } catch (Throwable e) {
-                            LOG.warn("Error stopping listener container on destination: " + getDestinationName() + ". This exception will be ignored.", e);
+        if (rescheduleTask != null) {
+            rescheduleTask.cancel();
+        }
+        if (getEndpoint().isAsyncStopListener()) {
+            getEndpoint().getComponent().getAsyncStartStopExecutorService().submit(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        synchronized (consumers) {
+                            consumers.values().stream().flatMap(Collection::stream).forEach(SjmsConsumer.this::destroyObject);
+                            consumers.clear();
                         }
+                    } catch (Throwable e) {
+                        LOG.warn("Error stopping listener container on destination: " + getDestinationName() + ". This exception will be ignored.", e);
                     }
+                }
 
-                    @Override
-                    public String toString() {
-                        return "AsyncStopListenerTask[" + getDestinationName() + "]";
-                    }
-                });
-            } else {
-                consumers.close();
-                consumers = null;
+                @Override
+                public String toString() {
+                    return "AsyncStopListenerTask[" + getDestinationName() + "]";
+                }
+            });
+        } else {
+            synchronized (consumers) {
+                consumers.values().stream().flatMap(Collection::stream).forEach(SjmsConsumer.this::destroyObject);
+                consumers.clear();
             }
         }
-        if (this.executor != null) {
-            getEndpoint().getCamelContext().getExecutorServiceManager().shutdownGraceful(this.executor);
+        if (this.scheduler != null) {
+            getEndpoint().getCamelContext().getExecutorServiceManager().shutdownGraceful(this.scheduler);
         }
     }
 
@@ -181,7 +175,7 @@ public class SjmsConsumer extends DefaultConsumer {
      * Creates a {@link MessageConsumerResources} with a dedicated
      * {@link Session} required for transacted and InOut consumers.
      */
-    private MessageConsumerResources createConsumer() throws Exception {
+    private void addConsumer() throws Exception {
         MessageConsumerResources answer;
         ConnectionResource connectionResource = getOrCreateConnectionResource();
         Connection conn = connectionResource.borrowConnection();
@@ -191,17 +185,32 @@ public class SjmsConsumer extends DefaultConsumer {
             MessageListener handler = createMessageHandler(session);
             messageConsumer.setMessageListener(handler);
 
+            if (getEndpoint().isReconnectOnError()) {
+                ExceptionListener exceptionListener = conn.getExceptionListener();
+                ReconnectExceptionListener reconnectExceptionListener = new ReconnectExceptionListener(conn);
+                if (exceptionListener == null) {
+                    exceptionListener = reconnectExceptionListener;
+                } else {
+                    exceptionListener = new AggregatedExceptionListener(exceptionListener, reconnectExceptionListener);
+                }
+                conn.setExceptionListener(exceptionListener);
+            }
             answer = new MessageConsumerResources(session, messageConsumer);
+            consumers.compute(conn, (key, oldValue) -> {
+                if (oldValue == null) {
+                    oldValue = new ArrayList<>();
+                }
+                oldValue.add(answer);
+                return oldValue;
+            });
         } catch (Exception e) {
             LOG.error("Unable to create the MessageConsumer", e);
             throw e;
         } finally {
             connectionResource.returnConnection(conn);
         }
-        return answer;
     }
 
-
     /**
      * Helper factory method used to create a MessageListener based on the MEP
      *
@@ -230,15 +239,15 @@ public class SjmsConsumer extends DefaultConsumer {
         AbstractMessageHandler messageHandler;
         if (getEndpoint().getExchangePattern().equals(ExchangePattern.InOnly)) {
             if (isTransacted() || isSynchronous()) {
-                messageHandler = new InOnlyMessageHandler(getEndpoint(), executor, synchronization);
+                messageHandler = new InOnlyMessageHandler(getEndpoint(), scheduler, synchronization);
             } else {
-                messageHandler = new InOnlyMessageHandler(getEndpoint(), executor);
+                messageHandler = new InOnlyMessageHandler(getEndpoint(), scheduler);
             }
         } else {
             if (isTransacted() || isSynchronous()) {
-                messageHandler = new InOutMessageHandler(getEndpoint(), executor, synchronization);
+                messageHandler = new InOutMessageHandler(getEndpoint(), scheduler, synchronization);
             } else {
-                messageHandler = new InOutMessageHandler(getEndpoint(), executor);
+                messageHandler = new InOutMessageHandler(getEndpoint(), scheduler);
             }
         }
 
@@ -288,6 +297,7 @@ public class SjmsConsumer extends DefaultConsumer {
     public boolean isSharedJMSSession() {
         return getEndpoint().isSharedJMSSession();
     }
+
     /**
      * Use to determine whether or not to process exchanges synchronously.
      *
@@ -369,4 +379,62 @@ public class SjmsConsumer extends DefaultConsumer {
         return getEndpoint().getTransactionBatchTimeout();
     }
 
+    private boolean refillPool(BackOffTimer.Task task) {
+        try {
+            fillConsumersPool();
+            return false;
+        } catch (Exception ex) {
+            LOG.error("Cannot refill consumers pool, attempt " + task.getCurrentAttempts(), ex);
+        }
+        return true;
+    }
+
+    private void scheduleRefill() {
+        if (rescheduleTask == null || rescheduleTask.getStatus() != BackOffTimer.Task.Status.Active) {
+            rescheduleTask = new BackOffTimer(scheduler).schedule(getEndpoint().getReconnectBackOff(), this::refillPool);
+        }
+    }
+
+    private final class ReconnectExceptionListener implements ExceptionListener {
+        private final WeakReference<Connection> connection;
+
+        private ReconnectExceptionListener(Connection connection) {
+            this.connection = new WeakReference<>(connection);
+        }
+
+        @Override
+        public void onException(JMSException exception) {
+            LOG.debug("Handling JMSException for reconnecting", exception);
+            Connection currentConnection = connection.get();
+            if (currentConnection != null) {
+                synchronized (consumers) {
+                    List<MessageConsumerResources> toClose = consumers.get(currentConnection);
+                    if (toClose != null) {
+                        toClose.forEach(SjmsConsumer.this::destroyObject);
+                    }
+                    consumers.remove(currentConnection);
+                }
+                scheduleRefill();
+            }
+        }
+
+        //hash and equals to prevent multiple instances for same connection
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            ReconnectExceptionListener that = (ReconnectExceptionListener) o;
+            return Objects.equals(connection.get(), that.connection.get());
+        }
+
+        @Override
+        public int hashCode() {
+            final Connection currentConnection = this.connection.get();
+            return currentConnection == null ? 0 : currentConnection.hashCode();
+        }
+    }
 }
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
index 818b608..d2926fc 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.sjms;
 
+import java.time.Duration;
+
 import javax.jms.ConnectionFactory;
 import javax.jms.ExceptionListener;
 import javax.jms.Message;
@@ -54,6 +56,7 @@ import org.apache.camel.spi.UriPath;
 import org.apache.camel.support.DefaultEndpoint;
 import org.apache.camel.support.EndpointHelper;
 import org.apache.camel.support.LoggingExceptionHandler;
+import org.apache.camel.util.backoff.BackOff;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -180,6 +183,11 @@ public class SjmsEndpoint extends DefaultEndpoint implements AsyncEndpoint, Mult
     @UriParam(defaultValue = "true", label = "consumer,logging",
             description = "Allows to control whether stacktraces should be logged or not, by the default errorHandler.")
     private boolean errorHandlerLogStackTrace = true;
+    @UriParam(label = "consumer", description = "Try to apply reconnection logic on consumer pool")
+    private boolean reconnectOnError = true;
+    @UriParam(label = "consumer", description = "Backoff policy on consumer pool reconnection",
+        defaultValueNote = "Default backoff is infinite retries with 5 seconds delay")
+    private BackOff reconnectBackOff = BackOff.builder().delay(Duration.ofSeconds(5)).build();
 
     private volatile boolean closeConnectionResource;
 
@@ -754,4 +762,21 @@ public class SjmsEndpoint extends DefaultEndpoint implements AsyncEndpoint, Mult
     public void setJmsObjectFactory(JmsObjectFactory jmsObjectFactory) {
         this.jmsObjectFactory = jmsObjectFactory;
     }
+
+
+    public boolean isReconnectOnError() {
+        return reconnectOnError;
+    }
+
+    public void setReconnectOnError(boolean reconnectOnError) {
+        this.reconnectOnError = reconnectOnError;
+    }
+
+    public BackOff getReconnectBackOff() {
+        return reconnectBackOff;
+    }
+
+    public void setReconnectBackOff(BackOff reconnectBackOff) {
+        this.reconnectBackOff = reconnectBackOff;
+    }
 }
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
index a3e75d9..0c53a4b 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
@@ -20,6 +20,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
 import javax.jms.Connection;
+import javax.jms.JMSException;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 
@@ -54,6 +55,17 @@ public abstract class SjmsProducer extends DefaultAsyncProducer {
         }
 
         @Override
+        public boolean validateObject(MessageProducerResources obj) {
+            try {
+                obj.getSession().getAcknowledgeMode();
+                return true;
+            } catch (JMSException ex) {
+                LOG.error("Cannot validate session", ex);
+            }
+            return false;
+        }
+
+        @Override
         public void destroyObject(MessageProducerResources model) throws Exception {
             if (model.getMessageProducer() != null) {
                 model.getMessageProducer().close();
@@ -90,10 +102,12 @@ public abstract class SjmsProducer extends DefaultAsyncProducer {
 
         this.executor = getEndpoint().getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "SjmsProducer");
         if (getProducers() == null) {
-            setProducers(new GenericObjectPool<>(new MessageProducerResourcesFactory()));
-            getProducers().setMaxActive(getProducerCount());
-            getProducers().setMaxIdle(getProducerCount());
-            getProducers().setLifo(false);
+            GenericObjectPool<MessageProducerResources> producers = new GenericObjectPool<>(new MessageProducerResourcesFactory());
+            setProducers(producers);
+            producers.setMaxActive(getProducerCount());
+            producers.setMaxIdle(getProducerCount());
+            producers.setTestOnBorrow(getEndpoint().getComponent().isConnectionTestOnBorrow());
+            producers.setLifo(false);
             if (getEndpoint().isPrefillPool()) {
                 if (getEndpoint().isAsyncStartListener()) {
                     asyncStart = getEndpoint().getComponent().getAsyncStartStopExecutorService().submit(new Runnable() {
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
index d02cadb..401d920 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeoutException;
 
 import javax.jms.Connection;
 import javax.jms.Destination;
+import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
@@ -92,7 +93,7 @@ public class InOutProducer extends SjmsProducer {
                 }
 
                 Destination replyToDestination;
-                boolean isReplyToTopic = false;
+                boolean isReplyToTopic;
                 if (ObjectHelper.isEmpty(getNamedReplyTo())) {
                     isReplyToTopic = isTopic();
                     replyToDestination = getEndpoint().getDestinationCreationStrategy().createTemporaryDestination(session, isReplyToTopic);
@@ -117,7 +118,7 @@ public class InOutProducer extends SjmsProducer {
                                 // we cannot continue routing the unknown message
                                 // log a warn and then ignore the message
                                 LOG.warn("Reply received for unknown correlationID [{}] on reply destination [{}]. Current correlation map size: {}. The message will be ignored: {}",
-                                        new Object[]{correlationID, replyToDestination, EXCHANGERS.size(), message});
+                                    correlationID, replyToDestination, EXCHANGERS.size(), message);
                             }
                         } catch (Exception e) {
                             LOG.warn("Unable to exchange message: {}. This exception is ignored.", message, e);
@@ -135,6 +136,17 @@ public class InOutProducer extends SjmsProducer {
         }
 
         @Override
+        public boolean validateObject(MessageConsumerResources obj) {
+            try {
+                obj.getSession().getAcknowledgeMode();
+                return true;
+            } catch (JMSException ex) {
+                LOG.error("Cannot validate session", ex);
+            }
+            return false;
+        }
+
+        @Override
         public void destroyObject(MessageConsumerResources model) throws Exception {
             if (model.getMessageConsumer() != null) {
                 model.getMessageConsumer().close();
@@ -173,6 +185,7 @@ public class InOutProducer extends SjmsProducer {
             consumers = new GenericObjectPool<>(new MessageConsumerResourcesFactory());
             consumers.setMaxActive(getConsumerCount());
             consumers.setMaxIdle(getConsumerCount());
+            consumers.setTestOnBorrow(getEndpoint().getComponent().isConnectionTestOnBorrow());
             while (consumers.getNumIdle() < consumers.getMaxIdle()) {
                 consumers.addObject();
             }
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/ReconnectConsumerTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/ReconnectConsumerTest.java
new file mode 100644
index 0000000..e5e4276
--- /dev/null
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/ReconnectConsumerTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.sjms.support.JmsTestSupport;
+import org.junit.Test;
+
+public class ReconnectConsumerTest extends JmsTestSupport {
+
+    private static final String SJMS_QUEUE_NAME = "sjms:in.only.consumer";
+    private static final String MOCK_RESULT = "mock:result";
+
+    @Test
+    public void testSynchronous() throws Exception {
+        final String expectedBody = "Hello World";
+        MockEndpoint mock = getMockEndpoint(MOCK_RESULT);
+        mock.expectedMessageCount(2);
+        mock.expectedBodiesReceived(expectedBody, expectedBody);
+
+        template.sendBody(SJMS_QUEUE_NAME, expectedBody);
+
+        reconnect();
+
+        template.sendBody(SJMS_QUEUE_NAME, expectedBody);
+
+        mock.assertIsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from(SJMS_QUEUE_NAME).to(MOCK_RESULT);
+            }
+        };
+    }
+
+}
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/ReconnectInOutProducerTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/ReconnectInOutProducerTest.java
new file mode 100644
index 0000000..02bb561
--- /dev/null
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/ReconnectInOutProducerTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.TextMessage;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.sjms.support.JmsTestSupport;
+import org.junit.Test;
+
+public class ReconnectInOutProducerTest extends JmsTestSupport {
+
+    private static final String TEST_DESTINATION_NAME = "in.out.queue.producer.test";
+
+    
+    @Override
+    protected boolean useJmx() {
+        return false;
+    }
+
+    @Test
+    public void testInOutQueueProducer() throws Exception {
+        MessageConsumer mc = createQueueConsumer(TEST_DESTINATION_NAME + ".request");
+        assertNotNull(mc);
+        final String requestText = "Hello World!";
+        final String responseText = "How are you";
+        mc.setMessageListener(new MyMessageListener(requestText, responseText));
+        Object responseObject = template.requestBody("direct:start", requestText);
+        assertNotNull(responseObject);
+        assertTrue(responseObject instanceof String);
+        assertEquals(responseText, responseObject);
+        mc.close();
+
+        reconnect();
+
+        mc = createQueueConsumer(TEST_DESTINATION_NAME + ".request");
+        assertNotNull(mc);
+        mc.setMessageListener(new MyMessageListener(requestText, responseText));
+        responseObject = template.requestBody("direct:start", requestText);
+        assertNotNull(responseObject);
+        assertTrue(responseObject instanceof String);
+        assertEquals(responseText, responseObject);
+        mc.close();
+
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:start")
+                    .to("log:" + TEST_DESTINATION_NAME + ".in.log.1?showBody=true")
+                    .inOut("sjms:queue:" + TEST_DESTINATION_NAME + ".request" + "?namedReplyTo="
+                               + TEST_DESTINATION_NAME + ".response")
+                    .to("log:" + TEST_DESTINATION_NAME + ".out.log.1?showBody=true");
+            }
+        };
+    }
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext camelContext = super.createCamelContext();
+        camelContext.getComponent("sjms", SjmsComponent.class).setConnectionTestOnBorrow(true);
+        return camelContext;
+    }
+
+    protected class MyMessageListener implements MessageListener {
+        private String requestText;
+        private String responseText;
+
+        public MyMessageListener(String request, String response) {
+            this.requestText = request;
+            this.responseText = response;
+        }
+
+        @Override
+        public void onMessage(Message message) {
+            try {
+                TextMessage request = (TextMessage)message;
+                assertNotNull(request);
+                String text = request.getText();
+                assertEquals(requestText, text);
+
+                TextMessage response = getSession().createTextMessage();
+                response.setText(responseText);
+                response.setJMSCorrelationID(request.getJMSCorrelationID());
+                MessageProducer mp = getSession().createProducer(message.getJMSReplyTo());
+                mp.send(response);
+                mp.close();
+            } catch (JMSException e) {
+                fail(e.getLocalizedMessage());
+            }
+        }
+    }
+}
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/ReconnectProducerTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/ReconnectProducerTest.java
new file mode 100644
index 0000000..b88a73d
--- /dev/null
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/ReconnectProducerTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms;
+
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.TextMessage;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.sjms.support.JmsTestSupport;
+import org.junit.Test;
+
+public class ReconnectProducerTest extends JmsTestSupport {
+
+    private static final String TEST_DESTINATION_NAME = "sync.queue.producer.test";
+
+    @Override
+    protected boolean useJmx() {
+        return false;
+    }
+
+    @Test
+    public void testInOnlyQueueProducer() throws Exception {
+        MessageConsumer mc = createQueueConsumer(TEST_DESTINATION_NAME);
+        assertNotNull(mc);
+        final String expectedBody = "Hello World!";
+        MockEndpoint mock = getMockEndpoint("mock:result");
+
+        mock.expectedMessageCount(2);
+        mock.expectedBodiesReceived(expectedBody, expectedBody);
+
+        template.sendBody("direct:start", expectedBody);
+        Message message = mc.receive(5000);
+        assertNotNull(message);
+        assertTrue(message instanceof TextMessage);
+
+        TextMessage tm = (TextMessage) message;
+        String text = tm.getText();
+        assertNotNull(text);
+        template.sendBody("direct:finish", text);
+
+        reconnect(10000);
+
+        mc = createQueueConsumer(TEST_DESTINATION_NAME);
+        template.sendBody("direct:start", expectedBody);
+        message = mc.receive(5000);
+        assertNotNull(message);
+        assertTrue(message instanceof TextMessage);
+
+        tm = (TextMessage) message;
+        text = tm.getText();
+        assertNotNull(text);
+
+        template.sendBody("direct:finish", text);
+
+        mock.assertIsSatisfied();
+        mc.close();
+
+    }
+
+    /**
+     * @return
+     * @throws Exception
+     * @see org.apache.camel.test.junit4.CamelTestSupport#createRouteBuilder()
+     */
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:start")
+                    .to("sjms:queue:" + TEST_DESTINATION_NAME + "?consumerCount=10");
+
+                from("direct:finish")
+                    .to("log:test.log.1?showBody=true", "mock:result");
+            }
+        };
+    }
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext camelContext = super.createCamelContext();
+        camelContext.getComponent("sjms", SjmsComponent.class).setConnectionTestOnBorrow(true);
+        return camelContext;
+    }
+}
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/JmsTestSupport.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/JmsTestSupport.java
index 588e645..eb35ab4 100644
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/JmsTestSupport.java
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/JmsTestSupport.java
@@ -73,7 +73,7 @@ public class JmsTestSupport extends CamelTestSupport {
         String host;
         try (InputStream inStream = url.openStream()) {
             properties.load(inStream);
-            if (Boolean.valueOf(properties.getProperty("amq.external"))) {
+            if (Boolean.parseBoolean(properties.getProperty("amq.external"))) {
                 log.info("Using external AMQ");
                 port = Integer.parseInt(properties.getProperty("amq.port"));
                 host = properties.getProperty("amq.host");
@@ -181,4 +181,27 @@ public class JmsTestSupport extends CamelTestSupport {
     public MessageConsumer createTopicConsumer(String destination, String messageSelector) throws Exception {
         return new Jms11ObjectFactory().createMessageConsumer(session, destinationCreationStrategy.createDestination(session, destination, true), messageSelector, true, null, true, false);
     }
+
+    public void reconnect() throws Exception {
+        reconnect(0);
+    }
+
+    public void reconnect(int waitingMillis) throws Exception {
+        log.info("Closing JMS Session");
+        getSession().close();
+        log.info("Closing JMS Connection");
+        connection.stop();
+        log.info("Stopping the ActiveMQ Broker");
+        broker.stop();
+        broker.waitUntilStopped();
+        Thread.sleep(waitingMillis);
+        broker.start(true);
+        broker.waitUntilStarted();
+
+        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUri);
+        setupFactoryExternal(connectionFactory);
+        connection = connectionFactory.createConnection();
+        connection.start();
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+    }
 }