You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/01/06 11:44:05 UTC

[camel] 03/07: CAMEL-15995: camel-sjms - Make it more feature compatible with camel-jms (consumer)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 6bec00f604fade012775a692f45bc784dc6abef8
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Jan 4 18:55:59 2021 +0100

    CAMEL-15995: camel-sjms - Make it more feature compatible with camel-jms (consumer)
---
 .../apache/camel/catalog/docs/sjms-component.adoc  |  20 +-
 components/camel-sjms/pom.xml                      |   3 +-
 .../component/sjms/SjmsComponentConfigurer.java    |  18 +-
 .../component/sjms/SjmsEndpointConfigurer.java     |  66 +-
 .../component/sjms/SjmsEndpointUriFactory.java     |  49 +-
 .../org/apache/camel/component/sjms/sjms.json      |  16 +-
 .../camel-sjms/src/main/docs/sjms-component.adoc   |  20 +-
 .../component/sjms/MessageListenerContainer.java   |  30 +
 .../{SjmsConsumer.java => OldSjmsConsumer.java}    |  35 +-
 .../component/sjms/SessionMessageListener.java     |  27 +
 .../apache/camel/component/sjms/SjmsComponent.java |  34 +-
 .../apache/camel/component/sjms/SjmsConsumer.java  | 402 ++--------
 .../apache/camel/component/sjms/SjmsEndpoint.java  | 242 ++++--
 .../apache/camel/component/sjms/SjmsHelper.java    |  90 +++
 .../apache/camel/component/sjms/SjmsProducer.java  |  11 -
 .../apache/camel/component/sjms/SjmsTemplate.java  |   4 +
 .../sjms/consumer/AbstractMessageHandler.java      |   1 +
 .../sjms/consumer/EndpointMessageListener.java     | 430 +++++++++++
 .../sjms/consumer/InOnlyMessageHandler.java        |   1 +
 .../sjms/consumer/InOutMessageHandler.java         |   4 +-
 .../consumer/SimpleMessageListenerContainer.java   | 302 ++++++++
 .../component/sjms/jms/Jms11ObjectFactory.java     |  12 +
 .../camel/component/sjms/jms/JmsBinding.java       |  12 +-
 .../camel/component/sjms/jms/JmsMessageHelper.java |  16 +
 .../camel/component/sjms/jms/JmsObjectFactory.java |   2 +
 .../component/sjms/producer/InOnlyProducer.java    |   3 +-
 .../component/sjms/producer/InOutProducer.java     |   6 +-
 .../sjms/SjmsEndpointConnectionSettingsTest.java   |  13 -
 .../sjms/consumer/AsyncConsumerInOutTest.java      |   6 +-
 .../sjms/consumer/InOnlyConsumerAsyncTrueTest.java |   2 +-
 .../InOnlyConsumerQueueSynchronousTest.java        |   2 +-
 .../sjms/consumer/InOutConsumerQueueAsyncTest.java |   6 +-
 .../consumer/InOutConsumerTempQueueAsyncTest.java  |  10 +-
 .../camel/component/sjms/it/AsyncJmsInOutIT.java   |   4 +-
 .../component/sjms/it/AsyncJmsInOutTempDestIT.java |   4 +-
 .../sjms/producer/AsyncQueueProducerTest.java      |   4 +-
 .../sjms/producer/AsyncTopicProducerTest.java      |   2 +-
 .../producer/InOutQueueProducerAsyncLoadTest.java  |   2 +-
 .../producer/InOutQueueProducerSyncLoadTest.java   |   4 +-
 .../sjms/producer/NoConnectionFactoryTest.java     | 137 ----
 .../component/dsl/SjmsComponentBuilderFactory.java |  50 +-
 .../endpoint/dsl/SjmsEndpointBuilderFactory.java   | 835 ++++++++++++++++++---
 .../modules/ROOT/pages/sjms-component.adoc         |  20 +-
 43 files changed, 2143 insertions(+), 814 deletions(-)

diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/sjms-component.adoc b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/sjms-component.adoc
index 20de46e..cdabfaa 100644
--- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/sjms-component.adoc
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/sjms-component.adoc
@@ -90,7 +90,7 @@ You append query options to the URI using the following format,
 == Component Options and Configurations
 
 // component options: START
-The Simple JMS component supports 18 options, which are listed below.
+The Simple JMS component supports 17 options, which are listed below.
 
 
 
@@ -100,8 +100,6 @@ The Simple JMS component supports 18 options, which are listed below.
 | *connectionCount* (common) | The maximum number of connections available to endpoints started under this component | 1 | Integer
 | *connectionFactory* (common) | *Autowired* A ConnectionFactory is required to enable the SjmsComponent. It can be set directly or set set as part of a ConnectionResource. |  | ConnectionFactory
 | *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. | false | boolean
-| *reconnectBackOff* (consumer) | Backoff in millis on consumer pool reconnection attempts | 5000 | long
-| *reconnectOnError* (consumer) | Try to apply reconnection logic on consumer pool | true | boolean
 | *lazyStartProducer* (producer) | Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is processed then creating and [...]
 | *autowiredEnabled* (advanced) | Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc. | true | boolean
 | *connectionClientId* (advanced) | The client ID to use when creating javax.jms.Connection when using the default org.apache.camel.component.sjms.jms.ConnectionFactoryResource. |  | String
@@ -111,6 +109,7 @@ The Simple JMS component supports 18 options, which are listed below.
 | *destinationCreationStrategy* (advanced) | To use a custom DestinationCreationStrategy. |  | DestinationCreationStrategy
 | *jmsKeyFormatStrategy* (advanced) | Pluggable strategy for encoding and decoding JMS keys so they can be compliant with the JMS specification. Camel provides one implementation out of the box: default. The default strategy will safely marshal dots and hyphens (. and -). Can be used for JMS brokers which do not care whether JMS header keys contain illegal characters. You can provide your own implementation of the org.apache.camel.component.jms.JmsKeyFormatStrategy and refer to it using  [...]
 | *messageCreatedStrategy* (advanced) | To use the given MessageCreatedStrategy which are invoked when Camel creates new instances of javax.jms.Message objects when Camel is sending a JMS message. |  | MessageCreatedStrategy
+| *recoveryInterval* (advanced) | Specifies the interval between recovery attempts, i.e. when a connection is being refreshed, in milliseconds. The default is 5000 ms, that is, 5 seconds. | 5000 | long
 | *headerFilterStrategy* (filter) | To use a custom org.apache.camel.spi.HeaderFilterStrategy to filter header to and from Camel message. |  | HeaderFilterStrategy
 | *connectionPassword* (security) | The password to use when creating javax.jms.Connection when using the default org.apache.camel.component.sjms.jms.ConnectionFactoryResource. |  | String
 | *connectionUsername* (security) | The username to use when creating javax.jms.Connection when using the default org.apache.camel.component.sjms.jms.ConnectionFactoryResource. |  | String
@@ -139,7 +138,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (40 parameters):
+=== Query Parameters (47 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -148,16 +147,19 @@ with the following path and query parameters:
 | *acknowledgementMode* (common) | The JMS acknowledgement name, which is one of: SESSION_TRANSACTED, CLIENT_ACKNOWLEDGE, AUTO_ACKNOWLEDGE, DUPS_OK_ACKNOWLEDGE. There are 4 enums and the value can be one of: SESSION_TRANSACTED, CLIENT_ACKNOWLEDGE, AUTO_ACKNOWLEDGE, DUPS_OK_ACKNOWLEDGE | AUTO_ACKNOWLEDGE | SessionAcknowledgementType
 | *disableReplyTo* (common) | Specifies whether Camel ignores the JMSReplyTo header in messages. If true, Camel does not send a reply back to the destination specified in the JMSReplyTo header. You can use this option if you want Camel to consume from a route and you do not want Camel to automatically send back a reply message because another component in your code handles the reply message. You can also use this option if you want to use Camel as a proxy between different message broker [...]
 | *replyTo* (common) | Provides an explicit ReplyTo destination (overrides any incoming value of Message.getJMSReplyTo() in consumer). |  | String
+| *testConnectionOnStartup* (common) | Specifies whether to test the connection on startup. This ensures that when Camel starts that all the JMS consumers have a valid connection to the JMS broker. If a connection cannot be granted then Camel throws an exception on startup. This ensures that Camel is not started with failed connections. The JMS producers is tested as well. | false | boolean
+| *asyncConsumer* (consumer) | Whether the JmsConsumer processes the Exchange asynchronously. If enabled then the JmsConsumer may pickup the next message from the JMS queue, while the previous message is being processed asynchronously (by the Asynchronous Routing Engine). This means that messages may be processed not 100% strictly in order. If disabled (as default) then the Exchange is fully processed before the JmsConsumer will pickup the next message from the JMS queue. Note if transac [...]
+| *autoStartup* (consumer) | Specifies whether the consumer container should auto-startup. | true | boolean
 | *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. | false | boolean
 | *consumerCount* (consumer) | Sets the number of consumer listeners used for this endpoint. | 1 | int
 | *durableSubscriptionId* (consumer) | Sets the durable subscription Id required for durable topics. |  | String
-| *reconnectBackOff* (consumer) | Backoff in millis on consumer pool reconnection attempts | 5000 | long
-| *reconnectOnError* (consumer) | Try to apply reconnection logic on consumer pool | true | boolean
 | *replyToDeliveryPersistent* (consumer) | Specifies whether to use persistent delivery by default for replies. | true | boolean
-| *synchronous* (consumer) | Sets whether synchronous processing should be strictly used or Camel is allowed to use asynchronous processing (if supported). | true | boolean
+| *eagerLoadingOfProperties* (consumer) | Enables eager loading of JMS properties and payload as soon as a message is loaded which generally is inefficient as the JMS properties may not be required but sometimes can catch early any issues with the underlying JMS provider and the use of JMS properties. See also the option eagerPoisonBody. | false | boolean
+| *eagerPoisonBody* (consumer) | If eagerLoadingOfProperties is enabled and the JMS message payload (JMS body or JMS properties) is poison (cannot be read/mapped), then set this text as the message body instead so the message can be processed (the cause of the poison are already stored as exception on the Exchange). This can be turned off by setting eagerPoisonBody=false. See also the option eagerLoadingOfProperties. | Poison JMS message due to ${exception.message} | String
 | *exceptionHandler* (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored. |  | ExceptionHandler
 | *exchangePattern* (consumer) | Sets the exchange pattern when the consumer creates an exchange. There are 3 enums and the value can be one of: InOnly, InOut, InOptionalOut |  | ExchangePattern
 | *messageSelector* (consumer) | Sets the JMS Message selector syntax. |  | String
+| *replyToSameDestinationAllowed* (consumer) | Whether a JMS consumer is allowed to send a reply message to the same destination that the consumer is using to consume from. This prevents an endless loop by consuming and sending back the same message to itself. | false | boolean
 | *deliveryMode* (producer) | Specifies the delivery mode to be used. Possible values are those defined by javax.jms.DeliveryMode. NON_PERSISTENT = 1 and PERSISTENT = 2. There are 2 enums and the value can be one of: 1, 2 |  | Integer
 | *deliveryPersistent* (producer) | Specifies whether persistent delivery is used by default. | true | boolean
 | *explicitQosEnabled* (producer) | Set if the deliveryMode, priority or timeToLive qualities of service should be used when sending messages. This option is based on Spring's JmsTemplate. The deliveryMode, priority and timeToLive options are applied to the current endpoint. This contrasts with the preserveMessageQos option, which operates at message granularity, reading QoS properties exclusively from the Camel In message headers. | false | Boolean
@@ -168,6 +170,7 @@ with the following path and query parameters:
 | *requestTimeout* (producer) | The timeout for waiting for a reply when using the InOut Exchange Pattern (in milliseconds). The default is 20 seconds. You can include the header CamelJmsRequestTimeout to override this endpoint configured timeout value, and thus have per message individual timeout values. See also the requestTimeoutCheckerInterval option. | 20000 | long
 | *timeToLive* (producer) | When sending messages, specifies the time-to-live of the message (in milliseconds). | -1 | long
 | *allowNullBody* (producer) | Whether to allow sending messages with no body. If this option is false and the message body is null, then an JMSException is thrown. | true | boolean
+| *disableTimeToLive* (producer) | Use this option to force disabling time to live. For example when you do request/reply over JMS, then Camel will by default use the requestTimeout value as time to live on the message being sent. The problem is that the sender and receiver systems have to have their clocks synchronized, so they are in sync. This is not always so easy to archive. So you can use disableTimeToLive=true to not set a time to live value on the sent message. Then the message w [...]
 | *asyncStartListener* (advanced) | Whether to startup the consumer message listener asynchronously, when starting a route. For example if a JmsConsumer cannot get a connection to a remote JMS broker, then it may block while retrying and/or failover. This will cause Camel to block while starting routes. By setting this option to true, you will let routes startup, while the JmsConsumer connects to the JMS broker using a dedicated thread in asynchronous mode. If this option is used, then b [...]
 | *asyncStopListener* (advanced) | Whether to stop the consumer message listener asynchronously, when stopping a route. | false | boolean
 | *connectionCount* (advanced) | *Deprecated* The maximum number of connections available to this endpoint |  | Integer
@@ -180,6 +183,9 @@ with the following path and query parameters:
 | *jmsKeyFormatStrategy* (advanced) | Pluggable strategy for encoding and decoding JMS keys so they can be compliant with the JMS specification. Camel provides two implementations out of the box: default and passthrough. The default strategy will safely marshal dots and hyphens (. and -). The passthrough strategy leaves the key as is. Can be used for JMS brokers which do not care whether JMS header keys contain illegal characters. You can provide your own implementation of the org.apache [...]
 | *mapJmsMessage* (advanced) | Specifies whether Camel should auto map the received JMS message to a suited payload type, such as javax.jms.TextMessage to a String etc. See section about how mapping works below for more details. | true | boolean
 | *messageCreatedStrategy* (advanced) | To use the given MessageCreatedStrategy which are invoked when Camel creates new instances of javax.jms.Message objects when Camel is sending a JMS message. |  | MessageCreatedStrategy
+| *recoveryInterval* (advanced) | Specifies the interval between recovery attempts, i.e. when a connection is being refreshed, in milliseconds. The default is 5000 ms, that is, 5 seconds. | 5000 | long
+| *synchronous* (advanced) | Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported). | false | boolean
+| *transferException* (advanced) | If enabled and you are using Request Reply messaging (InOut) and an Exchange failed on the consumer side, then the caused Exception will be send back in response as a javax.jms.ObjectMessage. If the client is Camel, the returned Exception is rethrown. This allows you to use Camel JMS as a bridge in your routing - for example, using persistent queues to enable robust routing. Notice that if you also have transferExchange enabled, this option takes preced [...]
 | *errorHandlerLoggingLevel* (logging) | Allows to configure the default errorHandler logging level for logging uncaught exceptions. There are 6 enums and the value can be one of: TRACE, DEBUG, INFO, WARN, ERROR, OFF | WARN | LoggingLevel
 | *errorHandlerLogStackTrace* (logging) | Allows to control whether stacktraces should be logged or not, by the default errorHandler. | true | boolean
 | *transacted* (transaction) | Specifies whether to use transacted mode | false | boolean
diff --git a/components/camel-sjms/pom.xml b/components/camel-sjms/pom.xml
index 5066115..d121fcf 100644
--- a/components/camel-sjms/pom.xml
+++ b/components/camel-sjms/pom.xml
@@ -35,7 +35,6 @@
         <camel.osgi.private.pkg>
             org.apache.camel.component.sjms.consumer,
             org.apache.camel.component.sjms.producer,
-            org.apache.camel.component.sjms.taskmanager,
             org.apache.camel.component.sjms.tx
         </camel.osgi.private.pkg>
         <camel.osgi.import>
@@ -49,6 +48,8 @@
             <groupId>org.apache.camel</groupId>
             <artifactId>camel-support</artifactId>
         </dependency>
+
+        <!-- TODO: remove our own pooling -->
         <dependency>
             <groupId>commons-pool</groupId>
             <artifactId>commons-pool</artifactId>
diff --git a/components/camel-sjms/src/generated/java/org/apache/camel/component/sjms/SjmsComponentConfigurer.java b/components/camel-sjms/src/generated/java/org/apache/camel/component/sjms/SjmsComponentConfigurer.java
index 7845805..505241f 100644
--- a/components/camel-sjms/src/generated/java/org/apache/camel/component/sjms/SjmsComponentConfigurer.java
+++ b/components/camel-sjms/src/generated/java/org/apache/camel/component/sjms/SjmsComponentConfigurer.java
@@ -51,10 +51,8 @@ public class SjmsComponentConfigurer extends PropertyConfigurerSupport implement
         case "lazyStartProducer": target.setLazyStartProducer(property(camelContext, boolean.class, value)); return true;
         case "messagecreatedstrategy":
         case "messageCreatedStrategy": target.setMessageCreatedStrategy(property(camelContext, org.apache.camel.component.sjms.jms.MessageCreatedStrategy.class, value)); return true;
-        case "reconnectbackoff":
-        case "reconnectBackOff": target.setReconnectBackOff(property(camelContext, long.class, value)); return true;
-        case "reconnectonerror":
-        case "reconnectOnError": target.setReconnectOnError(property(camelContext, boolean.class, value)); return true;
+        case "recoveryinterval":
+        case "recoveryInterval": target.setRecoveryInterval(property(camelContext, java.time.Duration.class, value).toMillis()); return true;
         case "transactioncommitstrategy":
         case "transactionCommitStrategy": target.setTransactionCommitStrategy(property(camelContext, org.apache.camel.component.sjms.TransactionCommitStrategy.class, value)); return true;
         default: return false;
@@ -99,10 +97,8 @@ public class SjmsComponentConfigurer extends PropertyConfigurerSupport implement
         case "lazyStartProducer": return boolean.class;
         case "messagecreatedstrategy":
         case "messageCreatedStrategy": return org.apache.camel.component.sjms.jms.MessageCreatedStrategy.class;
-        case "reconnectbackoff":
-        case "reconnectBackOff": return long.class;
-        case "reconnectonerror":
-        case "reconnectOnError": return boolean.class;
+        case "recoveryinterval":
+        case "recoveryInterval": return long.class;
         case "transactioncommitstrategy":
         case "transactionCommitStrategy": return org.apache.camel.component.sjms.TransactionCommitStrategy.class;
         default: return null;
@@ -143,10 +139,8 @@ public class SjmsComponentConfigurer extends PropertyConfigurerSupport implement
         case "lazyStartProducer": return target.isLazyStartProducer();
         case "messagecreatedstrategy":
         case "messageCreatedStrategy": return target.getMessageCreatedStrategy();
-        case "reconnectbackoff":
-        case "reconnectBackOff": return target.getReconnectBackOff();
-        case "reconnectonerror":
-        case "reconnectOnError": return target.isReconnectOnError();
+        case "recoveryinterval":
+        case "recoveryInterval": return target.getRecoveryInterval();
         case "transactioncommitstrategy":
         case "transactionCommitStrategy": return target.getTransactionCommitStrategy();
         default: return null;
diff --git a/components/camel-sjms/src/generated/java/org/apache/camel/component/sjms/SjmsEndpointConfigurer.java b/components/camel-sjms/src/generated/java/org/apache/camel/component/sjms/SjmsEndpointConfigurer.java
index 31c7005..7fd6f8c 100644
--- a/components/camel-sjms/src/generated/java/org/apache/camel/component/sjms/SjmsEndpointConfigurer.java
+++ b/components/camel-sjms/src/generated/java/org/apache/camel/component/sjms/SjmsEndpointConfigurer.java
@@ -25,10 +25,14 @@ public class SjmsEndpointConfigurer extends PropertyConfigurerSupport implements
         case "acknowledgementMode": target.setAcknowledgementMode(property(camelContext, org.apache.camel.component.sjms.jms.SessionAcknowledgementType.class, value)); return true;
         case "allownullbody":
         case "allowNullBody": target.setAllowNullBody(property(camelContext, boolean.class, value)); return true;
+        case "asyncconsumer":
+        case "asyncConsumer": target.setAsyncConsumer(property(camelContext, boolean.class, value)); return true;
         case "asyncstartlistener":
         case "asyncStartListener": target.setAsyncStartListener(property(camelContext, boolean.class, value)); return true;
         case "asyncstoplistener":
         case "asyncStopListener": target.setAsyncStopListener(property(camelContext, boolean.class, value)); return true;
+        case "autostartup":
+        case "autoStartup": target.setAutoStartup(property(camelContext, boolean.class, value)); return true;
         case "bridgeerrorhandler":
         case "bridgeErrorHandler": target.setBridgeErrorHandler(property(camelContext, boolean.class, value)); return true;
         case "connectioncount":
@@ -47,8 +51,14 @@ public class SjmsEndpointConfigurer extends PropertyConfigurerSupport implements
         case "destinationCreationStrategy": target.setDestinationCreationStrategy(property(camelContext, org.apache.camel.component.sjms.jms.DestinationCreationStrategy.class, value)); return true;
         case "disablereplyto":
         case "disableReplyTo": target.setDisableReplyTo(property(camelContext, boolean.class, value)); return true;
+        case "disabletimetolive":
+        case "disableTimeToLive": target.setDisableTimeToLive(property(camelContext, boolean.class, value)); return true;
         case "durablesubscriptionid":
         case "durableSubscriptionId": target.setDurableSubscriptionId(property(camelContext, java.lang.String.class, value)); return true;
+        case "eagerloadingofproperties":
+        case "eagerLoadingOfProperties": target.setEagerLoadingOfProperties(property(camelContext, boolean.class, value)); return true;
+        case "eagerpoisonbody":
+        case "eagerPoisonBody": target.setEagerPoisonBody(property(camelContext, java.lang.String.class, value)); return true;
         case "errorhandlerlogstacktrace":
         case "errorHandlerLogStackTrace": target.setErrorHandlerLogStackTrace(property(camelContext, boolean.class, value)); return true;
         case "errorhandlerlogginglevel":
@@ -78,26 +88,30 @@ public class SjmsEndpointConfigurer extends PropertyConfigurerSupport implements
         case "preservemessageqos":
         case "preserveMessageQos": target.setPreserveMessageQos(property(camelContext, boolean.class, value)); return true;
         case "priority": target.setPriority(property(camelContext, int.class, value)); return true;
-        case "reconnectbackoff":
-        case "reconnectBackOff": target.setReconnectBackOff(property(camelContext, java.time.Duration.class, value).toMillis()); return true;
-        case "reconnectonerror":
-        case "reconnectOnError": target.setReconnectOnError(property(camelContext, boolean.class, value)); return true;
+        case "recoveryinterval":
+        case "recoveryInterval": target.setRecoveryInterval(property(camelContext, java.time.Duration.class, value).toMillis()); return true;
         case "replyto":
         case "replyTo": target.setReplyTo(property(camelContext, java.lang.String.class, value)); return true;
         case "replytodeliverypersistent":
         case "replyToDeliveryPersistent": target.setReplyToDeliveryPersistent(property(camelContext, boolean.class, value)); return true;
         case "replytooverride":
         case "replyToOverride": target.setReplyToOverride(property(camelContext, java.lang.String.class, value)); return true;
+        case "replytosamedestinationallowed":
+        case "replyToSameDestinationAllowed": target.setReplyToSameDestinationAllowed(property(camelContext, boolean.class, value)); return true;
         case "requesttimeout":
         case "requestTimeout": target.setRequestTimeout(property(camelContext, java.time.Duration.class, value).toMillis()); return true;
         case "sharedjmssession":
         case "sharedJMSSession": target.setSharedJMSSession(property(camelContext, boolean.class, value)); return true;
         case "synchronous": target.setSynchronous(property(camelContext, boolean.class, value)); return true;
+        case "testconnectiononstartup":
+        case "testConnectionOnStartup": target.setTestConnectionOnStartup(property(camelContext, boolean.class, value)); return true;
         case "timetolive":
         case "timeToLive": target.setTimeToLive(property(camelContext, long.class, value)); return true;
         case "transacted": target.setTransacted(property(camelContext, boolean.class, value)); return true;
         case "transactioncommitstrategy":
         case "transactionCommitStrategy": target.setTransactionCommitStrategy(property(camelContext, org.apache.camel.component.sjms.TransactionCommitStrategy.class, value)); return true;
+        case "transferexception":
+        case "transferException": target.setTransferException(property(camelContext, boolean.class, value)); return true;
         default: return false;
         }
     }
@@ -109,10 +123,14 @@ public class SjmsEndpointConfigurer extends PropertyConfigurerSupport implements
         case "acknowledgementMode": return org.apache.camel.component.sjms.jms.SessionAcknowledgementType.class;
         case "allownullbody":
         case "allowNullBody": return boolean.class;
+        case "asyncconsumer":
+        case "asyncConsumer": return boolean.class;
         case "asyncstartlistener":
         case "asyncStartListener": return boolean.class;
         case "asyncstoplistener":
         case "asyncStopListener": return boolean.class;
+        case "autostartup":
+        case "autoStartup": return boolean.class;
         case "bridgeerrorhandler":
         case "bridgeErrorHandler": return boolean.class;
         case "connectioncount":
@@ -131,8 +149,14 @@ public class SjmsEndpointConfigurer extends PropertyConfigurerSupport implements
         case "destinationCreationStrategy": return org.apache.camel.component.sjms.jms.DestinationCreationStrategy.class;
         case "disablereplyto":
         case "disableReplyTo": return boolean.class;
+        case "disabletimetolive":
+        case "disableTimeToLive": return boolean.class;
         case "durablesubscriptionid":
         case "durableSubscriptionId": return java.lang.String.class;
+        case "eagerloadingofproperties":
+        case "eagerLoadingOfProperties": return boolean.class;
+        case "eagerpoisonbody":
+        case "eagerPoisonBody": return java.lang.String.class;
         case "errorhandlerlogstacktrace":
         case "errorHandlerLogStackTrace": return boolean.class;
         case "errorhandlerlogginglevel":
@@ -162,26 +186,30 @@ public class SjmsEndpointConfigurer extends PropertyConfigurerSupport implements
         case "preservemessageqos":
         case "preserveMessageQos": return boolean.class;
         case "priority": return int.class;
-        case "reconnectbackoff":
-        case "reconnectBackOff": return long.class;
-        case "reconnectonerror":
-        case "reconnectOnError": return boolean.class;
+        case "recoveryinterval":
+        case "recoveryInterval": return long.class;
         case "replyto":
         case "replyTo": return java.lang.String.class;
         case "replytodeliverypersistent":
         case "replyToDeliveryPersistent": return boolean.class;
         case "replytooverride":
         case "replyToOverride": return java.lang.String.class;
+        case "replytosamedestinationallowed":
+        case "replyToSameDestinationAllowed": return boolean.class;
         case "requesttimeout":
         case "requestTimeout": return long.class;
         case "sharedjmssession":
         case "sharedJMSSession": return boolean.class;
         case "synchronous": return boolean.class;
+        case "testconnectiononstartup":
+        case "testConnectionOnStartup": return boolean.class;
         case "timetolive":
         case "timeToLive": return long.class;
         case "transacted": return boolean.class;
         case "transactioncommitstrategy":
         case "transactionCommitStrategy": return org.apache.camel.component.sjms.TransactionCommitStrategy.class;
+        case "transferexception":
+        case "transferException": return boolean.class;
         default: return null;
         }
     }
@@ -194,10 +222,14 @@ public class SjmsEndpointConfigurer extends PropertyConfigurerSupport implements
         case "acknowledgementMode": return target.getAcknowledgementMode();
         case "allownullbody":
         case "allowNullBody": return target.isAllowNullBody();
+        case "asyncconsumer":
+        case "asyncConsumer": return target.isAsyncConsumer();
         case "asyncstartlistener":
         case "asyncStartListener": return target.isAsyncStartListener();
         case "asyncstoplistener":
         case "asyncStopListener": return target.isAsyncStopListener();
+        case "autostartup":
+        case "autoStartup": return target.isAutoStartup();
         case "bridgeerrorhandler":
         case "bridgeErrorHandler": return target.isBridgeErrorHandler();
         case "connectioncount":
@@ -216,8 +248,14 @@ public class SjmsEndpointConfigurer extends PropertyConfigurerSupport implements
         case "destinationCreationStrategy": return target.getDestinationCreationStrategy();
         case "disablereplyto":
         case "disableReplyTo": return target.isDisableReplyTo();
+        case "disabletimetolive":
+        case "disableTimeToLive": return target.isDisableTimeToLive();
         case "durablesubscriptionid":
         case "durableSubscriptionId": return target.getDurableSubscriptionId();
+        case "eagerloadingofproperties":
+        case "eagerLoadingOfProperties": return target.isEagerLoadingOfProperties();
+        case "eagerpoisonbody":
+        case "eagerPoisonBody": return target.getEagerPoisonBody();
         case "errorhandlerlogstacktrace":
         case "errorHandlerLogStackTrace": return target.isErrorHandlerLogStackTrace();
         case "errorhandlerlogginglevel":
@@ -247,26 +285,30 @@ public class SjmsEndpointConfigurer extends PropertyConfigurerSupport implements
         case "preservemessageqos":
         case "preserveMessageQos": return target.isPreserveMessageQos();
         case "priority": return target.getPriority();
-        case "reconnectbackoff":
-        case "reconnectBackOff": return target.getReconnectBackOff();
-        case "reconnectonerror":
-        case "reconnectOnError": return target.isReconnectOnError();
+        case "recoveryinterval":
+        case "recoveryInterval": return target.getRecoveryInterval();
         case "replyto":
         case "replyTo": return target.getReplyTo();
         case "replytodeliverypersistent":
         case "replyToDeliveryPersistent": return target.isReplyToDeliveryPersistent();
         case "replytooverride":
         case "replyToOverride": return target.getReplyToOverride();
+        case "replytosamedestinationallowed":
+        case "replyToSameDestinationAllowed": return target.isReplyToSameDestinationAllowed();
         case "requesttimeout":
         case "requestTimeout": return target.getRequestTimeout();
         case "sharedjmssession":
         case "sharedJMSSession": return target.isSharedJMSSession();
         case "synchronous": return target.isSynchronous();
+        case "testconnectiononstartup":
+        case "testConnectionOnStartup": return target.isTestConnectionOnStartup();
         case "timetolive":
         case "timeToLive": return target.getTimeToLive();
         case "transacted": return target.isTransacted();
         case "transactioncommitstrategy":
         case "transactionCommitStrategy": return target.getTransactionCommitStrategy();
+        case "transferexception":
+        case "transferException": return target.isTransferException();
         default: return null;
         }
     }
diff --git a/components/camel-sjms/src/generated/java/org/apache/camel/component/sjms/SjmsEndpointUriFactory.java b/components/camel-sjms/src/generated/java/org/apache/camel/component/sjms/SjmsEndpointUriFactory.java
index 48bbd31..d84eb11 100644
--- a/components/camel-sjms/src/generated/java/org/apache/camel/component/sjms/SjmsEndpointUriFactory.java
+++ b/components/camel-sjms/src/generated/java/org/apache/camel/component/sjms/SjmsEndpointUriFactory.java
@@ -20,49 +20,56 @@ public class SjmsEndpointUriFactory extends org.apache.camel.support.component.E
     private static final Set<String> PROPERTY_NAMES;
     private static final Set<String> SECRET_PROPERTY_NAMES;
     static {
-        Set<String> props = new HashSet<>(42);
-        props.add("reconnectBackOff");
-        props.add("connectionCount");
+        Set<String> props = new HashSet<>(49);
+        props.add("asyncConsumer");
         props.add("mapJmsMessage");
         props.add("synchronous");
-        props.add("connectionResource");
         props.add("includeAllJMSXProperties");
-        props.add("explicitQosEnabled");
-        props.add("transacted");
         props.add("errorHandlerLogStackTrace");
+        props.add("eagerLoadingOfProperties");
         props.add("timeToLive");
         props.add("bridgeErrorHandler");
-        props.add("jmsKeyFormatStrategy");
         props.add("deliveryMode");
-        props.add("headerFilterStrategy");
+        props.add("transferException");
         props.add("exceptionListener");
-        props.add("destinationName");
-        props.add("messageCreatedStrategy");
-        props.add("asyncStopListener");
         props.add("destinationType");
         props.add("asyncStartListener");
-        props.add("requestTimeout");
-        props.add("allowNullBody");
-        props.add("replyToDeliveryPersistent");
-        props.add("disableReplyTo");
-        props.add("reconnectOnError");
-        props.add("consumerCount");
+        props.add("eagerPoisonBody");
         props.add("destinationCreationStrategy");
-        props.add("exchangePattern");
+        props.add("disableTimeToLive");
         props.add("messageSelector");
         props.add("deliveryPersistent");
-        props.add("preserveMessageQos");
         props.add("priority");
         props.add("acknowledgementMode");
         props.add("lazyStartProducer");
         props.add("transactionCommitStrategy");
-        props.add("connectionFactory");
         props.add("replyTo");
         props.add("replyToOverride");
         props.add("errorHandlerLoggingLevel");
-        props.add("sharedJMSSession");
         props.add("durableSubscriptionId");
         props.add("exceptionHandler");
+        props.add("connectionCount");
+        props.add("connectionResource");
+        props.add("explicitQosEnabled");
+        props.add("transacted");
+        props.add("autoStartup");
+        props.add("jmsKeyFormatStrategy");
+        props.add("headerFilterStrategy");
+        props.add("destinationName");
+        props.add("messageCreatedStrategy");
+        props.add("asyncStopListener");
+        props.add("requestTimeout");
+        props.add("allowNullBody");
+        props.add("replyToDeliveryPersistent");
+        props.add("disableReplyTo");
+        props.add("consumerCount");
+        props.add("recoveryInterval");
+        props.add("exchangePattern");
+        props.add("preserveMessageQos");
+        props.add("testConnectionOnStartup");
+        props.add("connectionFactory");
+        props.add("replyToSameDestinationAllowed");
+        props.add("sharedJMSSession");
         PROPERTY_NAMES = Collections.unmodifiableSet(props);
         SECRET_PROPERTY_NAMES = Collections.emptySet();
     }
diff --git a/components/camel-sjms/src/generated/resources/org/apache/camel/component/sjms/sjms.json b/components/camel-sjms/src/generated/resources/org/apache/camel/component/sjms/sjms.json
index 47a2980..040eaa9 100644
--- a/components/camel-sjms/src/generated/resources/org/apache/camel/component/sjms/sjms.json
+++ b/components/camel-sjms/src/generated/resources/org/apache/camel/component/sjms/sjms.json
@@ -25,8 +25,6 @@
     "connectionCount": { "kind": "property", "displayName": "Connection Count", "group": "common", "label": "", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "1", "description": "The maximum number of connections available to endpoints started under this component" },
     "connectionFactory": { "kind": "property", "displayName": "Connection Factory", "group": "common", "label": "common", "required": false, "type": "object", "javaType": "javax.jms.ConnectionFactory", "deprecated": false, "autowired": true, "secret": false, "description": "A ConnectionFactory is required to enable the SjmsComponent. It can be set directly or set set as part of a ConnectionResource." },
     "bridgeErrorHandler": { "kind": "property", "displayName": "Bridge Error Handler", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a me [...]
-    "reconnectBackOff": { "kind": "property", "displayName": "Reconnect Back Off", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 5000, "description": "Backoff in millis on consumer pool reconnection attempts" },
-    "reconnectOnError": { "kind": "property", "displayName": "Reconnect On Error", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Try to apply reconnection logic on consumer pool" },
     "lazyStartProducer": { "kind": "property", "displayName": "Lazy Start Producer", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during star [...]
     "autowiredEnabled": { "kind": "property", "displayName": "Autowired Enabled", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which t [...]
     "connectionClientId": { "kind": "property", "displayName": "Connection Client Id", "group": "advanced", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "The client ID to use when creating javax.jms.Connection when using the default org.apache.camel.component.sjms.jms.ConnectionFactoryResource." },
@@ -36,6 +34,7 @@
     "destinationCreationStrategy": { "kind": "property", "displayName": "Destination Creation Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.sjms.jms.DestinationCreationStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom DestinationCreationStrategy." },
     "jmsKeyFormatStrategy": { "kind": "property", "displayName": "Jms Key Format Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.sjms.jms.JmsKeyFormatStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "Pluggable strategy for encoding and decoding JMS keys so they can be compliant with the JMS specification. Camel provides one implementation out of the box: default. The defau [...]
     "messageCreatedStrategy": { "kind": "property", "displayName": "Message Created Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.sjms.jms.MessageCreatedStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "To use the given MessageCreatedStrategy which are invoked when Camel creates new instances of javax.jms.Message objects when Camel is sending a JMS message." },
+    "recoveryInterval": { "kind": "property", "displayName": "Recovery Interval", "group": "advanced", "label": "advanced", "required": false, "type": "duration", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "5000", "description": "Specifies the interval between recovery attempts, i.e. when a connection is being refreshed, in milliseconds. The default is 5000 ms, that is, 5 seconds." },
     "headerFilterStrategy": { "kind": "property", "displayName": "Header Filter Strategy", "group": "filter", "label": "filter", "required": false, "type": "object", "javaType": "org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom org.apache.camel.spi.HeaderFilterStrategy to filter header to and from Camel message." },
     "connectionPassword": { "kind": "property", "displayName": "Connection Password", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": true, "description": "The password to use when creating javax.jms.Connection when using the default org.apache.camel.component.sjms.jms.ConnectionFactoryResource." },
     "connectionUsername": { "kind": "property", "displayName": "Connection Username", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": true, "description": "The username to use when creating javax.jms.Connection when using the default org.apache.camel.component.sjms.jms.ConnectionFactoryResource." },
@@ -47,16 +46,19 @@
     "acknowledgementMode": { "kind": "parameter", "displayName": "Acknowledgement Mode", "group": "common", "label": "", "required": false, "type": "object", "javaType": "org.apache.camel.component.sjms.jms.SessionAcknowledgementType", "enum": [ "SESSION_TRANSACTED", "CLIENT_ACKNOWLEDGE", "AUTO_ACKNOWLEDGE", "DUPS_OK_ACKNOWLEDGE" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "AUTO_ACKNOWLEDGE", "description": "The JMS acknowledgement name, which is one of:  [...]
     "disableReplyTo": { "kind": "parameter", "displayName": "Disable Reply To", "group": "common", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Specifies whether Camel ignores the JMSReplyTo header in messages. If true, Camel does not send a reply back to the destination specified in the JMSReplyTo header. You can use this option if you want Camel to consume from  [...]
     "replyTo": { "kind": "parameter", "displayName": "Reply To", "group": "common", "label": "common", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Provides an explicit ReplyTo destination (overrides any incoming value of Message.getJMSReplyTo() in consumer)." },
+    "testConnectionOnStartup": { "kind": "parameter", "displayName": "Test Connection On Startup", "group": "common", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Specifies whether to test the connection on startup. This ensures that when Camel starts that all the JMS consumers have a valid connection to the JMS broker. If a connection cannot be granted then Camel [...]
+    "asyncConsumer": { "kind": "parameter", "displayName": "Async Consumer", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the JmsConsumer processes the Exchange asynchronously. If enabled then the JmsConsumer may pickup the next message from the JMS queue, while the previous message is being processed asynchronously (by the Asy [...]
+    "autoStartup": { "kind": "parameter", "displayName": "Auto Startup", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Specifies whether the consumer container should auto-startup." },
     "bridgeErrorHandler": { "kind": "parameter", "displayName": "Bridge Error Handler", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a m [...]
     "consumerCount": { "kind": "parameter", "displayName": "Consumer Count", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1, "description": "Sets the number of consumer listeners used for this endpoint." },
     "durableSubscriptionId": { "kind": "parameter", "displayName": "Durable Subscription Id", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the durable subscription Id required for durable topics." },
-    "reconnectBackOff": { "kind": "parameter", "displayName": "Reconnect Back Off", "group": "consumer", "label": "consumer", "required": false, "type": "duration", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "5000", "description": "Backoff in millis on consumer pool reconnection attempts" },
-    "reconnectOnError": { "kind": "parameter", "displayName": "Reconnect On Error", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Try to apply reconnection logic on consumer pool" },
     "replyToDeliveryPersistent": { "kind": "parameter", "displayName": "Reply To Delivery Persistent", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Specifies whether to use persistent delivery by default for replies." },
-    "synchronous": { "kind": "parameter", "displayName": "Synchronous", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Sets whether synchronous processing should be strictly used or Camel is allowed to use asynchronous processing (if supported)." },
+    "eagerLoadingOfProperties": { "kind": "parameter", "displayName": "Eager Loading Of Properties", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Enables eager loading of JMS properties and payload as soon as a message is loaded which generally is inefficient as the JMS properties may not be required but sometimes c [...]
+    "eagerPoisonBody": { "kind": "parameter", "displayName": "Eager Poison Body", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "Poison JMS message due to ${exception.message}", "description": "If eagerLoadingOfProperties is enabled and the JMS message payload (JMS body or JMS properties) is poison (cannot be read\/mapped), then se [...]
     "exceptionHandler": { "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "autowired": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the con [...]
     "exchangePattern": { "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut", "InOptionalOut" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." },
     "messageSelector": { "kind": "parameter", "displayName": "Message Selector", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the JMS Message selector syntax." },
+    "replyToSameDestinationAllowed": { "kind": "parameter", "displayName": "Reply To Same Destination Allowed", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether a JMS consumer is allowed to send a reply message to the same destination that the consumer is using to consume from. This prevents an endless loop by c [...]
     "deliveryMode": { "kind": "parameter", "displayName": "Delivery Mode", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "enum": [ "1", "2" ], "deprecated": false, "autowired": false, "secret": false, "description": "Specifies the delivery mode to be used. Possible values are those defined by javax.jms.DeliveryMode. NON_PERSISTENT = 1 and PERSISTENT = 2." },
     "deliveryPersistent": { "kind": "parameter", "displayName": "Delivery Persistent", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Specifies whether persistent delivery is used by default." },
     "explicitQosEnabled": { "kind": "parameter", "displayName": "Explicit Qos Enabled", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "false", "description": "Set if the deliveryMode, priority or timeToLive qualities of service should be used when sending messages. This option is based on Spring's JmsTemplate. The deliveryMode, priority and timeToLi [...]
@@ -67,6 +69,7 @@
     "requestTimeout": { "kind": "parameter", "displayName": "Request Timeout", "group": "producer", "label": "producer", "required": false, "type": "duration", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "20000", "description": "The timeout for waiting for a reply when using the InOut Exchange Pattern (in milliseconds). The default is 20 seconds. You can include the header CamelJmsRequestTimeout to override this endpoint configured timeou [...]
     "timeToLive": { "kind": "parameter", "displayName": "Time To Live", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": -1, "description": "When sending messages, specifies the time-to-live of the message (in milliseconds)." },
     "allowNullBody": { "kind": "parameter", "displayName": "Allow Null Body", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether to allow sending messages with no body. If this option is false and the message body is null, then an JMSException is thrown." },
+    "disableTimeToLive": { "kind": "parameter", "displayName": "Disable Time To Live", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Use this option to force disabling time to live. For example when you do request\/reply over JMS, then Camel will by default use the requestTimeout value as time to live on the message  [...]
     "asyncStartListener": { "kind": "parameter", "displayName": "Async Start Listener", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether to startup the consumer message listener asynchronously, when starting a route. For example if a JmsConsumer cannot get a connection to a remote JMS broker, then it may block while retrying and\/o [...]
     "asyncStopListener": { "kind": "parameter", "displayName": "Async Stop Listener", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether to stop the consumer message listener asynchronously, when stopping a route." },
     "connectionCount": { "kind": "parameter", "displayName": "Connection Count", "group": "advanced", "label": "advanced", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": true, "autowired": false, "secret": false, "description": "The maximum number of connections available to this endpoint" },
@@ -79,6 +82,9 @@
     "jmsKeyFormatStrategy": { "kind": "parameter", "displayName": "Jms Key Format Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.sjms.jms.JmsKeyFormatStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "Pluggable strategy for encoding and decoding JMS keys so they can be compliant with the JMS specification. Camel provides two implementations out of the box: default and pass [...]
     "mapJmsMessage": { "kind": "parameter", "displayName": "Map Jms Message", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Specifies whether Camel should auto map the received JMS message to a suited payload type, such as javax.jms.TextMessage to a String etc. See section about how mapping works below for more details." },
     "messageCreatedStrategy": { "kind": "parameter", "displayName": "Message Created Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.sjms.jms.MessageCreatedStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "To use the given MessageCreatedStrategy which are invoked when Camel creates new instances of javax.jms.Message objects when Camel is sending a JMS message." },
+    "recoveryInterval": { "kind": "parameter", "displayName": "Recovery Interval", "group": "advanced", "label": "advanced", "required": false, "type": "duration", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "5000", "description": "Specifies the interval between recovery attempts, i.e. when a connection is being refreshed, in milliseconds. The default is 5000 ms, that is, 5 seconds." },
+    "synchronous": { "kind": "parameter", "displayName": "Synchronous", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported)." },
+    "transferException": { "kind": "parameter", "displayName": "Transfer Exception", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If enabled and you are using Request Reply messaging (InOut) and an Exchange failed on the consumer side, then the caused Exception will be send back in response as a javax.jms.ObjectMessage. If the client i [...]
     "errorHandlerLoggingLevel": { "kind": "parameter", "displayName": "Error Handler Logging Level", "group": "logging", "label": "consumer,logging", "required": false, "type": "object", "javaType": "org.apache.camel.LoggingLevel", "enum": [ "TRACE", "DEBUG", "INFO", "WARN", "ERROR", "OFF" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "WARN", "description": "Allows to configure the default errorHandler logging level for logging uncaught exceptions." },
     "errorHandlerLogStackTrace": { "kind": "parameter", "displayName": "Error Handler Log Stack Trace", "group": "logging", "label": "consumer,logging", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Allows to control whether stacktraces should be logged or not, by the default errorHandler." },
     "transacted": { "kind": "parameter", "displayName": "Transacted", "group": "transaction", "label": "transaction", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Specifies whether to use transacted mode" },
diff --git a/components/camel-sjms/src/main/docs/sjms-component.adoc b/components/camel-sjms/src/main/docs/sjms-component.adoc
index 20de46e..cdabfaa 100644
--- a/components/camel-sjms/src/main/docs/sjms-component.adoc
+++ b/components/camel-sjms/src/main/docs/sjms-component.adoc
@@ -90,7 +90,7 @@ You append query options to the URI using the following format,
 == Component Options and Configurations
 
 // component options: START
-The Simple JMS component supports 18 options, which are listed below.
+The Simple JMS component supports 17 options, which are listed below.
 
 
 
@@ -100,8 +100,6 @@ The Simple JMS component supports 18 options, which are listed below.
 | *connectionCount* (common) | The maximum number of connections available to endpoints started under this component | 1 | Integer
 | *connectionFactory* (common) | *Autowired* A ConnectionFactory is required to enable the SjmsComponent. It can be set directly or set set as part of a ConnectionResource. |  | ConnectionFactory
 | *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. | false | boolean
-| *reconnectBackOff* (consumer) | Backoff in millis on consumer pool reconnection attempts | 5000 | long
-| *reconnectOnError* (consumer) | Try to apply reconnection logic on consumer pool | true | boolean
 | *lazyStartProducer* (producer) | Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is processed then creating and [...]
 | *autowiredEnabled* (advanced) | Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc. | true | boolean
 | *connectionClientId* (advanced) | The client ID to use when creating javax.jms.Connection when using the default org.apache.camel.component.sjms.jms.ConnectionFactoryResource. |  | String
@@ -111,6 +109,7 @@ The Simple JMS component supports 18 options, which are listed below.
 | *destinationCreationStrategy* (advanced) | To use a custom DestinationCreationStrategy. |  | DestinationCreationStrategy
 | *jmsKeyFormatStrategy* (advanced) | Pluggable strategy for encoding and decoding JMS keys so they can be compliant with the JMS specification. Camel provides one implementation out of the box: default. The default strategy will safely marshal dots and hyphens (. and -). Can be used for JMS brokers which do not care whether JMS header keys contain illegal characters. You can provide your own implementation of the org.apache.camel.component.jms.JmsKeyFormatStrategy and refer to it using  [...]
 | *messageCreatedStrategy* (advanced) | To use the given MessageCreatedStrategy which are invoked when Camel creates new instances of javax.jms.Message objects when Camel is sending a JMS message. |  | MessageCreatedStrategy
+| *recoveryInterval* (advanced) | Specifies the interval between recovery attempts, i.e. when a connection is being refreshed, in milliseconds. The default is 5000 ms, that is, 5 seconds. | 5000 | long
 | *headerFilterStrategy* (filter) | To use a custom org.apache.camel.spi.HeaderFilterStrategy to filter header to and from Camel message. |  | HeaderFilterStrategy
 | *connectionPassword* (security) | The password to use when creating javax.jms.Connection when using the default org.apache.camel.component.sjms.jms.ConnectionFactoryResource. |  | String
 | *connectionUsername* (security) | The username to use when creating javax.jms.Connection when using the default org.apache.camel.component.sjms.jms.ConnectionFactoryResource. |  | String
@@ -139,7 +138,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (40 parameters):
+=== Query Parameters (47 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -148,16 +147,19 @@ with the following path and query parameters:
 | *acknowledgementMode* (common) | The JMS acknowledgement name, which is one of: SESSION_TRANSACTED, CLIENT_ACKNOWLEDGE, AUTO_ACKNOWLEDGE, DUPS_OK_ACKNOWLEDGE. There are 4 enums and the value can be one of: SESSION_TRANSACTED, CLIENT_ACKNOWLEDGE, AUTO_ACKNOWLEDGE, DUPS_OK_ACKNOWLEDGE | AUTO_ACKNOWLEDGE | SessionAcknowledgementType
 | *disableReplyTo* (common) | Specifies whether Camel ignores the JMSReplyTo header in messages. If true, Camel does not send a reply back to the destination specified in the JMSReplyTo header. You can use this option if you want Camel to consume from a route and you do not want Camel to automatically send back a reply message because another component in your code handles the reply message. You can also use this option if you want to use Camel as a proxy between different message broker [...]
 | *replyTo* (common) | Provides an explicit ReplyTo destination (overrides any incoming value of Message.getJMSReplyTo() in consumer). |  | String
+| *testConnectionOnStartup* (common) | Specifies whether to test the connection on startup. This ensures that when Camel starts that all the JMS consumers have a valid connection to the JMS broker. If a connection cannot be granted then Camel throws an exception on startup. This ensures that Camel is not started with failed connections. The JMS producers is tested as well. | false | boolean
+| *asyncConsumer* (consumer) | Whether the JmsConsumer processes the Exchange asynchronously. If enabled then the JmsConsumer may pickup the next message from the JMS queue, while the previous message is being processed asynchronously (by the Asynchronous Routing Engine). This means that messages may be processed not 100% strictly in order. If disabled (as default) then the Exchange is fully processed before the JmsConsumer will pickup the next message from the JMS queue. Note if transac [...]
+| *autoStartup* (consumer) | Specifies whether the consumer container should auto-startup. | true | boolean
 | *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. | false | boolean
 | *consumerCount* (consumer) | Sets the number of consumer listeners used for this endpoint. | 1 | int
 | *durableSubscriptionId* (consumer) | Sets the durable subscription Id required for durable topics. |  | String
-| *reconnectBackOff* (consumer) | Backoff in millis on consumer pool reconnection attempts | 5000 | long
-| *reconnectOnError* (consumer) | Try to apply reconnection logic on consumer pool | true | boolean
 | *replyToDeliveryPersistent* (consumer) | Specifies whether to use persistent delivery by default for replies. | true | boolean
-| *synchronous* (consumer) | Sets whether synchronous processing should be strictly used or Camel is allowed to use asynchronous processing (if supported). | true | boolean
+| *eagerLoadingOfProperties* (consumer) | Enables eager loading of JMS properties and payload as soon as a message is loaded which generally is inefficient as the JMS properties may not be required but sometimes can catch early any issues with the underlying JMS provider and the use of JMS properties. See also the option eagerPoisonBody. | false | boolean
+| *eagerPoisonBody* (consumer) | If eagerLoadingOfProperties is enabled and the JMS message payload (JMS body or JMS properties) is poison (cannot be read/mapped), then set this text as the message body instead so the message can be processed (the cause of the poison are already stored as exception on the Exchange). This can be turned off by setting eagerPoisonBody=false. See also the option eagerLoadingOfProperties. | Poison JMS message due to ${exception.message} | String
 | *exceptionHandler* (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored. |  | ExceptionHandler
 | *exchangePattern* (consumer) | Sets the exchange pattern when the consumer creates an exchange. There are 3 enums and the value can be one of: InOnly, InOut, InOptionalOut |  | ExchangePattern
 | *messageSelector* (consumer) | Sets the JMS Message selector syntax. |  | String
+| *replyToSameDestinationAllowed* (consumer) | Whether a JMS consumer is allowed to send a reply message to the same destination that the consumer is using to consume from. This prevents an endless loop by consuming and sending back the same message to itself. | false | boolean
 | *deliveryMode* (producer) | Specifies the delivery mode to be used. Possible values are those defined by javax.jms.DeliveryMode. NON_PERSISTENT = 1 and PERSISTENT = 2. There are 2 enums and the value can be one of: 1, 2 |  | Integer
 | *deliveryPersistent* (producer) | Specifies whether persistent delivery is used by default. | true | boolean
 | *explicitQosEnabled* (producer) | Set if the deliveryMode, priority or timeToLive qualities of service should be used when sending messages. This option is based on Spring's JmsTemplate. The deliveryMode, priority and timeToLive options are applied to the current endpoint. This contrasts with the preserveMessageQos option, which operates at message granularity, reading QoS properties exclusively from the Camel In message headers. | false | Boolean
@@ -168,6 +170,7 @@ with the following path and query parameters:
 | *requestTimeout* (producer) | The timeout for waiting for a reply when using the InOut Exchange Pattern (in milliseconds). The default is 20 seconds. You can include the header CamelJmsRequestTimeout to override this endpoint configured timeout value, and thus have per message individual timeout values. See also the requestTimeoutCheckerInterval option. | 20000 | long
 | *timeToLive* (producer) | When sending messages, specifies the time-to-live of the message (in milliseconds). | -1 | long
 | *allowNullBody* (producer) | Whether to allow sending messages with no body. If this option is false and the message body is null, then an JMSException is thrown. | true | boolean
+| *disableTimeToLive* (producer) | Use this option to force disabling time to live. For example when you do request/reply over JMS, then Camel will by default use the requestTimeout value as time to live on the message being sent. The problem is that the sender and receiver systems have to have their clocks synchronized, so they are in sync. This is not always so easy to archive. So you can use disableTimeToLive=true to not set a time to live value on the sent message. Then the message w [...]
 | *asyncStartListener* (advanced) | Whether to startup the consumer message listener asynchronously, when starting a route. For example if a JmsConsumer cannot get a connection to a remote JMS broker, then it may block while retrying and/or failover. This will cause Camel to block while starting routes. By setting this option to true, you will let routes startup, while the JmsConsumer connects to the JMS broker using a dedicated thread in asynchronous mode. If this option is used, then b [...]
 | *asyncStopListener* (advanced) | Whether to stop the consumer message listener asynchronously, when stopping a route. | false | boolean
 | *connectionCount* (advanced) | *Deprecated* The maximum number of connections available to this endpoint |  | Integer
@@ -180,6 +183,9 @@ with the following path and query parameters:
 | *jmsKeyFormatStrategy* (advanced) | Pluggable strategy for encoding and decoding JMS keys so they can be compliant with the JMS specification. Camel provides two implementations out of the box: default and passthrough. The default strategy will safely marshal dots and hyphens (. and -). The passthrough strategy leaves the key as is. Can be used for JMS brokers which do not care whether JMS header keys contain illegal characters. You can provide your own implementation of the org.apache [...]
 | *mapJmsMessage* (advanced) | Specifies whether Camel should auto map the received JMS message to a suited payload type, such as javax.jms.TextMessage to a String etc. See section about how mapping works below for more details. | true | boolean
 | *messageCreatedStrategy* (advanced) | To use the given MessageCreatedStrategy which are invoked when Camel creates new instances of javax.jms.Message objects when Camel is sending a JMS message. |  | MessageCreatedStrategy
+| *recoveryInterval* (advanced) | Specifies the interval between recovery attempts, i.e. when a connection is being refreshed, in milliseconds. The default is 5000 ms, that is, 5 seconds. | 5000 | long
+| *synchronous* (advanced) | Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported). | false | boolean
+| *transferException* (advanced) | If enabled and you are using Request Reply messaging (InOut) and an Exchange failed on the consumer side, then the caused Exception will be send back in response as a javax.jms.ObjectMessage. If the client is Camel, the returned Exception is rethrown. This allows you to use Camel JMS as a bridge in your routing - for example, using persistent queues to enable robust routing. Notice that if you also have transferExchange enabled, this option takes preced [...]
 | *errorHandlerLoggingLevel* (logging) | Allows to configure the default errorHandler logging level for logging uncaught exceptions. There are 6 enums and the value can be one of: TRACE, DEBUG, INFO, WARN, ERROR, OFF | WARN | LoggingLevel
 | *errorHandlerLogStackTrace* (logging) | Allows to control whether stacktraces should be logged or not, by the default errorHandler. | true | boolean
 | *transacted* (transaction) | Specifies whether to use transacted mode | false | boolean
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/MessageListenerContainer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/MessageListenerContainer.java
new file mode 100644
index 0000000..e85772a
--- /dev/null
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/MessageListenerContainer.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms;
+
+import javax.jms.ConnectionFactory;
+
+import org.apache.camel.AfterPropertiesConfigured;
+import org.apache.camel.Service;
+
+public interface MessageListenerContainer extends Service, AfterPropertiesConfigured {
+
+    ConnectionFactory getConnectionFactory();
+
+    void setMessageListener(SessionMessageListener messageListener);
+
+}
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/OldSjmsConsumer.java
similarity index 92%
copy from components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java
copy to components/camel-sjms/src/main/java/org/apache/camel/component/sjms/OldSjmsConsumer.java
index f1ffcaf..9138957 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/OldSjmsConsumer.java
@@ -53,16 +53,17 @@ import org.slf4j.LoggerFactory;
 /**
  * The SjmsConsumer is the base class for the SJMS MessageListener pool.
  */
-public class SjmsConsumer extends DefaultConsumer {
+@Deprecated
+public class OldSjmsConsumer extends DefaultConsumer {
 
-    private static final Logger LOG = LoggerFactory.getLogger(SjmsConsumer.class);
+    private static final Logger LOG = LoggerFactory.getLogger(OldSjmsConsumer.class);
 
     private final Map<Connection, List<MessageConsumerResources>> consumers = new WeakHashMap<>();
     private ScheduledExecutorService scheduler;
     private Future<?> asyncStart;
     private BackOffTimer.Task rescheduleTask;
 
-    public SjmsConsumer(Endpoint endpoint, Processor processor) {
+    public OldSjmsConsumer(Endpoint endpoint, Processor processor) {
         super(endpoint, processor);
     }
 
@@ -87,9 +88,7 @@ public class SjmsConsumer extends DefaultConsumer {
                         LOG.warn("Error starting listener container on destination: " + getDestinationName()
                                  + ". This exception will be ignored.",
                                 e);
-                        if (getEndpoint().isReconnectOnError()) {
-                            scheduleRefill(); //we should try to fill consumer pool on next time
-                        }
+                        scheduleRefill(); //we should try to fill consumer pool on next time
                     }
                 }
 
@@ -148,7 +147,7 @@ public class SjmsConsumer extends DefaultConsumer {
                 public void run() {
                     try {
                         synchronized (consumers) {
-                            consumers.values().stream().flatMap(Collection::stream).forEach(SjmsConsumer.this::destroyObject);
+                            consumers.values().stream().flatMap(Collection::stream).forEach(OldSjmsConsumer.this::destroyObject);
                             consumers.clear();
                         }
                     } catch (Throwable e) {
@@ -165,7 +164,7 @@ public class SjmsConsumer extends DefaultConsumer {
             });
         } else {
             synchronized (consumers) {
-                consumers.values().stream().flatMap(Collection::stream).forEach(SjmsConsumer.this::destroyObject);
+                consumers.values().stream().flatMap(Collection::stream).forEach(OldSjmsConsumer.this::destroyObject);
                 consumers.clear();
             }
         }
@@ -189,16 +188,14 @@ public class SjmsConsumer extends DefaultConsumer {
             MessageListener handler = createMessageHandler(session);
             messageConsumer.setMessageListener(handler);
 
-            if (getEndpoint().isReconnectOnError()) {
-                ExceptionListener exceptionListener = conn.getExceptionListener();
-                ReconnectExceptionListener reconnectExceptionListener = new ReconnectExceptionListener(conn);
-                if (exceptionListener == null) {
-                    exceptionListener = reconnectExceptionListener;
-                } else {
-                    exceptionListener = new AggregatedExceptionListener(exceptionListener, reconnectExceptionListener);
-                }
-                conn.setExceptionListener(exceptionListener);
+            ExceptionListener exceptionListener = conn.getExceptionListener();
+            ReconnectExceptionListener reconnectExceptionListener = new ReconnectExceptionListener(conn);
+            if (exceptionListener == null) {
+                exceptionListener = reconnectExceptionListener;
+            } else {
+                exceptionListener = new AggregatedExceptionListener(exceptionListener, reconnectExceptionListener);
             }
+            conn.setExceptionListener(exceptionListener);
             answer = new MessageConsumerResources(session, messageConsumer);
             consumers.compute(conn, (key, oldValue) -> {
                 if (oldValue == null) {
@@ -373,7 +370,7 @@ public class SjmsConsumer extends DefaultConsumer {
 
     private void scheduleRefill() {
         if (rescheduleTask == null || rescheduleTask.getStatus() != BackOffTimer.Task.Status.Active) {
-            BackOff backOff = BackOff.builder().delay(getEndpoint().getReconnectBackOff()).build();
+            BackOff backOff = BackOff.builder().delay(getEndpoint().getRecoveryInterval()).build();
             rescheduleTask = new BackOffTimer(scheduler).schedule(backOff, this::refillPool);
         }
     }
@@ -393,7 +390,7 @@ public class SjmsConsumer extends DefaultConsumer {
                 synchronized (consumers) {
                     List<MessageConsumerResources> toClose = consumers.get(currentConnection);
                     if (toClose != null) {
-                        toClose.forEach(SjmsConsumer.this::destroyObject);
+                        toClose.forEach(OldSjmsConsumer.this::destroyObject);
                     }
                     consumers.remove(currentConnection);
                 }
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SessionMessageListener.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SessionMessageListener.java
new file mode 100644
index 0000000..85d9376
--- /dev/null
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SessionMessageListener.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Session;
+
+@FunctionalInterface
+public interface SessionMessageListener {
+
+    void onMessage(Message message, Session session) throws JMSException;
+}
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
index 1b4fe9a..a8865f4 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
@@ -85,11 +85,10 @@ public class SjmsComponent extends HeaderFilterStrategyComponent {
               description = "The max wait time in millis to block and wait on free connection when the pool"
                             + " is exhausted when using the default org.apache.camel.component.sjms.jms.ConnectionFactoryResource.")
     private long connectionMaxWait = 5000;
-    @Metadata(label = "consumer", description = "Try to apply reconnection logic on consumer pool", defaultValue = "true")
-    private boolean reconnectOnError = true;
-    @Metadata(label = "consumer", description = "Backoff in millis on consumer pool reconnection attempts",
-              defaultValue = "5000")
-    private long reconnectBackOff = 5000;
+    @Metadata(defaultValue = "5000", label = "advanced", javaType = "java.time.Duration",
+              description = "Specifies the interval between recovery attempts, i.e. when a connection is being refreshed, in milliseconds."
+                            + " The default is 5000 ms, that is, 5 seconds.")
+    private long recoveryInterval = 5000;
 
     public SjmsComponent() {
     }
@@ -116,8 +115,7 @@ public class SjmsComponent extends HeaderFilterStrategyComponent {
         if (messageCreatedStrategy != null) {
             endpoint.setMessageCreatedStrategy(messageCreatedStrategy);
         }
-        endpoint.setReconnectOnError(reconnectOnError);
-        endpoint.setReconnectBackOff(reconnectBackOff);
+        endpoint.setRecoveryInterval(recoveryInterval);
         setProperties(endpoint, parameters);
         return endpoint;
     }
@@ -311,25 +309,11 @@ public class SjmsComponent extends HeaderFilterStrategyComponent {
         this.connectionMaxWait = connectionMaxWait;
     }
 
-    public boolean isReconnectOnError() {
-        return reconnectOnError;
+    public long getRecoveryInterval() {
+        return recoveryInterval;
     }
 
-    /**
-     * Try to apply reconnection logic on consumer pool
-     */
-    public void setReconnectOnError(boolean reconnectOnError) {
-        this.reconnectOnError = reconnectOnError;
-    }
-
-    public long getReconnectBackOff() {
-        return reconnectBackOff;
-    }
-
-    /**
-     * Backoff in millis on consumer pool reconnection attempts
-     */
-    public void setReconnectBackOff(long reconnectBackOff) {
-        this.reconnectBackOff = reconnectBackOff;
+    public void setRecoveryInterval(long recoveryInterval) {
+        this.recoveryInterval = recoveryInterval;
     }
 }
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java
index f1ffcaf..e805bb2 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java
@@ -16,54 +16,29 @@
  */
 package org.apache.camel.component.sjms;
 
-import java.lang.ref.WeakReference;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.WeakHashMap;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.stream.Collectors;
-
 import javax.jms.Connection;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.Session;
 
 import org.apache.camel.Endpoint;
-import org.apache.camel.ExchangePattern;
+import org.apache.camel.FailedToCreateConsumerException;
 import org.apache.camel.Processor;
-import org.apache.camel.component.sjms.consumer.AbstractMessageHandler;
-import org.apache.camel.component.sjms.consumer.InOnlyMessageHandler;
-import org.apache.camel.component.sjms.consumer.InOutMessageHandler;
-import org.apache.camel.component.sjms.jms.ConnectionResource;
-import org.apache.camel.component.sjms.tx.DefaultTransactionCommitStrategy;
-import org.apache.camel.component.sjms.tx.SessionTransactionSynchronization;
-import org.apache.camel.spi.Synchronization;
+import org.apache.camel.Suspendable;
 import org.apache.camel.support.DefaultConsumer;
-import org.apache.camel.util.backoff.BackOff;
-import org.apache.camel.util.backoff.BackOffTimer;
+import org.apache.camel.support.service.ServiceHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/**
- * The SjmsConsumer is the base class for the SJMS MessageListener pool.
- */
-public class SjmsConsumer extends DefaultConsumer {
+import static org.apache.camel.component.sjms.SjmsHelper.closeConnection;
+
+public class SjmsConsumer extends DefaultConsumer implements Suspendable {
 
     private static final Logger LOG = LoggerFactory.getLogger(SjmsConsumer.class);
 
-    private final Map<Connection, List<MessageConsumerResources>> consumers = new WeakHashMap<>();
-    private ScheduledExecutorService scheduler;
-    private Future<?> asyncStart;
-    private BackOffTimer.Task rescheduleTask;
+    private volatile boolean initialized;
+    private final MessageListenerContainer listenerContainer;
 
-    public SjmsConsumer(Endpoint endpoint, Processor processor) {
+    public SjmsConsumer(Endpoint endpoint, Processor processor, MessageListenerContainer listenerContainer) {
         super(endpoint, processor);
+        this.listenerContainer = listenerContainer;
     }
 
     @Override
@@ -75,21 +50,15 @@ public class SjmsConsumer extends DefaultConsumer {
     protected void doStart() throws Exception {
         super.doStart();
 
-        this.scheduler = getEndpoint().getCamelContext().getExecutorServiceManager().newDefaultScheduledThreadPool(this,
-                "SjmsConsumer");
         if (getEndpoint().isAsyncStartListener()) {
-            asyncStart = getEndpoint().getComponent().getAsyncStartStopExecutorService().submit(new Runnable() {
+            getEndpoint().getAsyncStartStopExecutorService().submit(new Runnable() {
                 @Override
                 public void run() {
                     try {
-                        fillConsumersPool();
+                        prepareAndStartListenerContainer();
                     } catch (Throwable e) {
-                        LOG.warn("Error starting listener container on destination: " + getDestinationName()
-                                 + ". This exception will be ignored.",
-                                e);
-                        if (getEndpoint().isReconnectOnError()) {
-                            scheduleRefill(); //we should try to fill consumer pool on next time
-                        }
+                        LOG.warn("Error starting listener container on destination: {}. This exception will be ignored.",
+                                getDestinationName(), e);
                     }
                 }
 
@@ -99,325 +68,94 @@ public class SjmsConsumer extends DefaultConsumer {
                 }
             });
         } else {
-            fillConsumersPool();
+            prepareAndStartListenerContainer();
         }
-    }
 
-    private void fillConsumersPool() throws Exception {
-        synchronized (consumers) {
-            while (consumers.values().stream().collect(Collectors.summarizingInt(List::size)).getSum() < getConsumerCount()) {
-                addConsumer();
-            }
-        }
-    }
-
-    public void destroyObject(MessageConsumerResources model) {
-        try {
-            if (model.getMessageConsumer() != null) {
-                model.getMessageConsumer().close();
-            }
-
-            // If the resource has a
-            if (model.getSession() != null) {
-                if (model.getSession().getTransacted()) {
-                    try {
-                        model.getSession().rollback();
-                    } catch (Exception e) {
-                        // Do nothing. Just make sure we are cleaned up
-                    }
-                }
-                model.getSession().close();
-            }
-        } catch (JMSException ex) {
-            LOG.warn("Exception caught on closing consumer", ex);
-        }
+        // mark as initialized for the first time
+        initialized = true;
     }
 
     @Override
     protected void doStop() throws Exception {
-        super.doStop();
-        if (asyncStart != null && !asyncStart.isDone()) {
-            asyncStart.cancel(true);
-        }
-        if (rescheduleTask != null) {
-            rescheduleTask.cancel();
-        }
-        if (getEndpoint().isAsyncStopListener()) {
-            getEndpoint().getComponent().getAsyncStartStopExecutorService().submit(new Runnable() {
-                @Override
-                public void run() {
-                    try {
-                        synchronized (consumers) {
-                            consumers.values().stream().flatMap(Collection::stream).forEach(SjmsConsumer.this::destroyObject);
-                            consumers.clear();
+        if (listenerContainer != null) {
+
+            if (getEndpoint().isAsyncStopListener()) {
+                getEndpoint().getAsyncStartStopExecutorService().submit(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            stopAndDestroyListenerContainer();
+                        } catch (Throwable e) {
+                            LOG.warn("Error stopping listener container on destination: {}. This exception will be ignored.",
+                                    getDestinationName(), e);
                         }
-                    } catch (Throwable e) {
-                        LOG.warn("Error stopping listener container on destination: " + getDestinationName()
-                                 + ". This exception will be ignored.",
-                                e);
                     }
-                }
-
-                @Override
-                public String toString() {
-                    return "AsyncStopListenerTask[" + getDestinationName() + "]";
-                }
-            });
-        } else {
-            synchronized (consumers) {
-                consumers.values().stream().flatMap(Collection::stream).forEach(SjmsConsumer.this::destroyObject);
-                consumers.clear();
-            }
-        }
-        if (this.scheduler != null) {
-            getEndpoint().getCamelContext().getExecutorServiceManager().shutdownGraceful(this.scheduler);
-        }
-    }
-
-    /**
-     * Creates a {@link MessageConsumerResources} with a dedicated {@link Session} required for transacted and InOut
-     * consumers.
-     */
-    private void addConsumer() throws Exception {
-        MessageConsumerResources answer;
-        ConnectionResource connectionResource = getOrCreateConnectionResource();
-        Connection conn = connectionResource.borrowConnection();
-        try {
-            Session session = conn.createSession(isTransacted(),
-                    isTransacted() ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
-            MessageConsumer messageConsumer = getEndpoint().getJmsObjectFactory().createMessageConsumer(session, getEndpoint());
-            MessageListener handler = createMessageHandler(session);
-            messageConsumer.setMessageListener(handler);
-
-            if (getEndpoint().isReconnectOnError()) {
-                ExceptionListener exceptionListener = conn.getExceptionListener();
-                ReconnectExceptionListener reconnectExceptionListener = new ReconnectExceptionListener(conn);
-                if (exceptionListener == null) {
-                    exceptionListener = reconnectExceptionListener;
-                } else {
-                    exceptionListener = new AggregatedExceptionListener(exceptionListener, reconnectExceptionListener);
-                }
-                conn.setExceptionListener(exceptionListener);
-            }
-            answer = new MessageConsumerResources(session, messageConsumer);
-            consumers.compute(conn, (key, oldValue) -> {
-                if (oldValue == null) {
-                    oldValue = new ArrayList<>();
-                }
-                oldValue.add(answer);
-                return oldValue;
-            });
-        } catch (Exception e) {
-            LOG.error("Unable to create the MessageConsumer", e);
-            throw e;
-        } finally {
-            connectionResource.returnConnection(conn);
-        }
-    }
-
-    /**
-     * Helper factory method used to create a MessageListener based on the MEP
-     *
-     * @param  session a session is only required if we are a transacted consumer
-     * @return         the listener
-     */
-    protected MessageListener createMessageHandler(Session session) {
-        TransactionCommitStrategy commitStrategy;
-        if (getTransactionCommitStrategy() != null) {
-            commitStrategy = getTransactionCommitStrategy();
-        } else {
-            commitStrategy = new DefaultTransactionCommitStrategy();
-        }
 
-        Synchronization synchronization = new SessionTransactionSynchronization(null, session, commitStrategy, false);
-
-        AbstractMessageHandler messageHandler;
-        if (getEndpoint().getExchangePattern().equals(ExchangePattern.InOnly)) {
-            if (isTransacted() || isSynchronous()) {
-                messageHandler = new InOnlyMessageHandler(getEndpoint(), scheduler, synchronization);
-            } else {
-                messageHandler = new InOnlyMessageHandler(getEndpoint(), scheduler);
-            }
-        } else {
-            if (isTransacted() || isSynchronous()) {
-                messageHandler = new InOutMessageHandler(getEndpoint(), scheduler, synchronization);
+                    @Override
+                    public String toString() {
+                        return "AsyncStopListenerTask[" + getDestinationName() + "]";
+                    }
+                });
             } else {
-                messageHandler = new InOutMessageHandler(getEndpoint(), scheduler);
+                stopAndDestroyListenerContainer();
             }
         }
 
-        messageHandler.setSession(session);
-        messageHandler.setProcessor(getAsyncProcessor());
-        messageHandler.setSynchronous(isSynchronous());
-        messageHandler.setTransacted(isTransacted());
-        messageHandler.setSharedJMSSession(isSharedJMSSession());
-        messageHandler.setTopic(isTopic());
-        return messageHandler;
+        super.doStop();
     }
 
-    /**
-     * @deprecated use {@link #getOrCreateConnectionResource()}
-     */
-    @Deprecated
-    protected ConnectionResource getConnectionResource() {
-        return getEndpoint().getConnectionResource();
-    }
+    protected void prepareAndStartListenerContainer() {
+        listenerContainer.afterPropertiesConfigured(getEndpoint().getCamelContext());
 
-    protected ConnectionResource getOrCreateConnectionResource() {
-        ConnectionResource answer = getEndpoint().getConnectionResource();
-        if (answer == null) {
-            answer = getEndpoint().createConnectionResource(this);
+        // only start listener if auto start is enabled or we are explicit invoking start later
+        if (initialized || getEndpoint().isAutoStartup()) {
+            // should we pre test connections before starting?
+            if (getEndpoint().isTestConnectionOnStartup()) {
+                testConnectionOnStartup();
+            }
+            startListenerContainer();
         }
-        return answer;
-    }
-
-    public int getAcknowledgementMode() {
-        return getEndpoint().getAcknowledgementMode().intValue();
-    }
-
-    /**
-     * Use to determine if transactions are enabled or disabled.
-     *
-     * @return true if transacted, otherwise false
-     */
-    public boolean isTransacted() {
-        return getEndpoint().isTransacted();
-    }
-
-    /**
-     * Use to determine if JMS session should be propagated to share with other SJMS endpoints.
-     *
-     * @return true if shared, otherwise false
-     */
-    public boolean isSharedJMSSession() {
-        return getEndpoint().isSharedJMSSession();
     }
 
     /**
-     * Use to determine whether or not to process exchanges synchronously.
+     * Pre tests the connection before starting the listening.
+     * <p/>
+     * In case of connection failure the exception is thrown which prevents Camel from starting.
      *
-     * @return true if synchronous
+     * @throws FailedToCreateConsumerException is thrown if testing the connection failed
      */
-    public boolean isSynchronous() {
-        return getEndpoint().isSynchronous();
+    protected void testConnectionOnStartup() throws FailedToCreateConsumerException {
+        try {
+            LOG.debug("Testing JMS Connection on startup for destination: {}", getDestinationName());
+            Connection con = listenerContainer.getConnectionFactory().createConnection();
+            closeConnection(con);
+            LOG.debug("Successfully tested JMS Connection on startup for destination: {}", getDestinationName());
+        } catch (Exception e) {
+            String msg = "Cannot get JMS Connection on startup for destination " + getDestinationName();
+            throw new FailedToCreateConsumerException(getEndpoint(), msg, e);
+        }
     }
 
-    /**
-     * The destination name for this consumer.
-     *
-     * @return String
-     */
-    public String getDestinationName() {
+    private String getDestinationName() {
         return getEndpoint().getDestinationName();
     }
 
     /**
-     * Returns the number of consumer listeners.
-     *
-     * @return the consumerCount
-     */
-    public int getConsumerCount() {
-        return getEndpoint().getConsumerCount();
-    }
-
-    /**
-     * Flag set by the endpoint used by consumers and producers to determine if the consumer is a JMS Topic.
-     *
-     * @return the topic true if consumer is a JMS Topic, default is false
-     */
-    public boolean isTopic() {
-        return getEndpoint().isTopic();
-    }
-
-    /**
-     * Gets the JMS Message selector syntax.
-     */
-    public String getMessageSelector() {
-        return getEndpoint().getMessageSelector();
-    }
-
-    /**
-     * Gets the durable subscription Id.
-     *
-     * @return the durableSubscriptionId
+     * Starts the JMS listener container
+     * <p/>
+     * Can be used to start this consumer later if it was configured to not auto startup.
      */
-    public String getDurableSubscriptionId() {
-        return getEndpoint().getDurableSubscriptionId();
-    }
-
-    /**
-     * Gets the commit strategy.
-     *
-     * @return the transactionCommitStrategy
-     */
-    public TransactionCommitStrategy getTransactionCommitStrategy() {
-        return getEndpoint().getTransactionCommitStrategy();
-    }
-
-    private boolean refillPool(BackOffTimer.Task task) {
-        LOG.debug("Refill consumers pool task running");
-        try {
-            fillConsumersPool();
-            LOG.info("Refill consumers pool completed (attempt: {})", task.getCurrentAttempts());
-            return false;
-        } catch (Exception ex) {
-            LOG.warn(
-                    "Refill consumers pool failed (attempt: {}) due to: {}. Will try again in {} millis. (stacktrace in DEBUG level)",
-                    task.getCurrentAttempts(), ex.getMessage(), task.getCurrentDelay());
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Refill consumers pool failed", ex);
-            }
-        }
-        return true;
+    public void startListenerContainer() {
+        LOG.trace("Starting listener container {} on destination {}", listenerContainer, getDestinationName());
+        ServiceHelper.startService(listenerContainer);
+        LOG.debug("Started listener container {} on destination {}", listenerContainer, getDestinationName());
     }
 
-    private void scheduleRefill() {
-        if (rescheduleTask == null || rescheduleTask.getStatus() != BackOffTimer.Task.Status.Active) {
-            BackOff backOff = BackOff.builder().delay(getEndpoint().getReconnectBackOff()).build();
-            rescheduleTask = new BackOffTimer(scheduler).schedule(backOff, this::refillPool);
+    protected void stopAndDestroyListenerContainer() {
+        if (listenerContainer != null) {
+            ServiceHelper.stopAndShutdownService(listenerContainer);
         }
+        initialized = false;
     }
 
-    private final class ReconnectExceptionListener implements ExceptionListener {
-        private final WeakReference<Connection> connection;
-
-        private ReconnectExceptionListener(Connection connection) {
-            this.connection = new WeakReference<>(connection);
-        }
-
-        @Override
-        public void onException(JMSException exception) {
-            LOG.debug("Handling JMSException for reconnecting", exception);
-            Connection currentConnection = connection.get();
-            if (currentConnection != null) {
-                synchronized (consumers) {
-                    List<MessageConsumerResources> toClose = consumers.get(currentConnection);
-                    if (toClose != null) {
-                        toClose.forEach(SjmsConsumer.this::destroyObject);
-                    }
-                    consumers.remove(currentConnection);
-                }
-                scheduleRefill();
-            }
-        }
-
-        //hash and equals to prevent multiple instances for same connection
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) {
-                return true;
-            }
-            if (o == null || getClass() != o.getClass()) {
-                return false;
-            }
-            ReconnectExceptionListener that = (ReconnectExceptionListener) o;
-            return Objects.equals(connection.get(), that.connection.get());
-        }
-
-        @Override
-        public int hashCode() {
-            final Connection currentConnection = this.connection.get();
-            return currentConnection == null ? 0 : currentConnection.hashCode();
-        }
-    }
 }
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
index 84a8b03..ee3c230 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
@@ -16,7 +16,10 @@
  */
 package org.apache.camel.component.sjms;
 
+import java.util.concurrent.ExecutorService;
+
 import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
 import javax.jms.ExceptionListener;
 import javax.jms.Message;
 import javax.jms.Session;
@@ -32,6 +35,8 @@ import org.apache.camel.MultipleConsumersSupport;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.component.sjms.consumer.EndpointMessageListener;
+import org.apache.camel.component.sjms.consumer.SimpleMessageListenerContainer;
 import org.apache.camel.component.sjms.jms.ConnectionFactoryResource;
 import org.apache.camel.component.sjms.jms.ConnectionResource;
 import org.apache.camel.component.sjms.jms.DefaultDestinationCreationStrategy;
@@ -78,10 +83,6 @@ public class SjmsEndpoint extends DefaultEndpoint
     @UriPath(description = "DestinationName is a JMS queue or topic name. By default, the destinationName is interpreted as a queue name.")
     @Metadata(required = true)
     private String destinationName;
-    // TODO: get rid of synchronous true for consumer
-    @UriParam(label = "consumer", defaultValue = "true",
-              description = "Sets whether synchronous processing should be strictly used or Camel is allowed to use asynchronous processing (if supported).")
-    private boolean synchronous = true;
     @UriParam(label = "advanced",
               description = "To use a custom HeaderFilterStrategy to filter header to and from Camel message.")
     private HeaderFilterStrategy headerFilterStrategy;
@@ -139,6 +140,19 @@ public class SjmsEndpoint extends DefaultEndpoint
     @UriParam(defaultValue = "true", label = "consumer",
               description = "Specifies whether to use persistent delivery by default for replies.")
     private boolean replyToDeliveryPersistent = true;
+    @UriParam(label = "consumer,advanced", defaultValue = "Poison JMS message due to ${exception.message}",
+              description = "If eagerLoadingOfProperties is enabled and the JMS message payload (JMS body or JMS properties) is poison (cannot be read/mapped),"
+                            + " then set this text as the message body instead so the message can be processed"
+                            + " (the cause of the poison are already stored as exception on the Exchange)."
+                            + " This can be turned off by setting eagerPoisonBody=false."
+                            + " See also the option eagerLoadingOfProperties.")
+    private String eagerPoisonBody = "Poison JMS message payload: ${exception.message}";
+    @UriParam(label = "consumer,advanced",
+              description = "Enables eager loading of JMS properties and payload as soon as a message is loaded"
+                            + " which generally is inefficient as the JMS properties may not be required"
+                            + " but sometimes can catch early any issues with the underlying JMS provider"
+                            + " and the use of JMS properties. See also the option eagerPoisonBody.")
+    private boolean eagerLoadingOfProperties;
     @UriParam(enums = "1,2", label = "producer",
               description = "Specifies the delivery mode to be used."
                             + " Possible values are those defined by javax.jms.DeliveryMode."
@@ -159,6 +173,12 @@ public class SjmsEndpoint extends DefaultEndpoint
     @UriParam(label = "consumer,advanced",
               description = "Sets the JMS Message selector syntax.")
     private String messageSelector;
+    @UriParam(description = "Specifies whether to test the connection on startup."
+                            + " This ensures that when Camel starts that all the JMS consumers have a valid connection to the JMS broker."
+                            + " If a connection cannot be granted then Camel throws an exception on startup."
+                            + " This ensures that Camel is not started with failed connections."
+                            + " The JMS producers is tested as well.")
+    private boolean testConnectionOnStartup;
     @UriParam(label = "advanced",
               description = "Whether to startup the consumer message listener asynchronously, when starting a route."
                             + " For example if a JmsConsumer cannot get a connection to a remote JMS broker, then it may block while retrying and/or failover."
@@ -169,6 +189,13 @@ public class SjmsEndpoint extends DefaultEndpoint
     @UriParam(label = "advanced",
               description = "Whether to stop the consumer message listener asynchronously, when stopping a route.")
     private boolean asyncStopListener;
+    @UriParam(label = "consumer", defaultValue = "true",
+              description = "Specifies whether the consumer container should auto-startup.")
+    private boolean autoStartup = true;
+    @UriParam(label = "consumer,advanced",
+              description = "Whether a JMS consumer is allowed to send a reply message to the same destination that the consumer is using to"
+                            + " consume from. This prevents an endless loop by consuming and sending back the same message to itself.")
+    private boolean replyToSameDestinationAllowed;
     @UriParam(label = "producer,advanced", defaultValue = "true",
               description = "Whether to allow sending messages with no body. If this option is false and the message body is null, then an JMSException is thrown.")
     private boolean allowNullBody = true;
@@ -210,14 +237,39 @@ public class SjmsEndpoint extends DefaultEndpoint
     @UriParam(defaultValue = "true", label = "consumer,logging",
               description = "Allows to control whether stacktraces should be logged or not, by the default errorHandler.")
     private boolean errorHandlerLogStackTrace = true;
-    @UriParam(label = "consumer", description = "Try to apply reconnection logic on consumer pool", defaultValue = "true")
-    private boolean reconnectOnError = true;
-    @UriParam(label = "consumer", javaType = "java.time.Duration",
-              description = "Backoff in millis on consumer pool reconnection attempts", defaultValue = "5000")
-    private long reconnectBackOff = 5000;
-
-    @Deprecated
-    private volatile boolean closeConnectionResource;
+    @UriParam(defaultValue = "5000", label = "advanced", javaType = "java.time.Duration",
+              description = "Specifies the interval between recovery attempts, i.e. when a connection is being refreshed, in milliseconds."
+                            + " The default is 5000 ms, that is, 5 seconds.")
+    private long recoveryInterval = 5000;
+    @UriParam(label = "advanced",
+              description = "If enabled and you are using Request Reply messaging (InOut) and an Exchange failed on the consumer side,"
+                            + " then the caused Exception will be send back in response as a javax.jms.ObjectMessage."
+                            + " If the client is Camel, the returned Exception is rethrown. This allows you to use Camel JMS as a bridge"
+                            + " in your routing - for example, using persistent queues to enable robust routing."
+                            + " Notice that if you also have transferExchange enabled, this option takes precedence."
+                            + " The caught exception is required to be serializable."
+                            + " The original Exception on the consumer side can be wrapped in an outer exception"
+                            + " such as org.apache.camel.RuntimeCamelException when returned to the producer."
+                            + " Use this with caution as the data is using Java Object serialization and requires the received to be able to deserialize the data at Class level, "
+                            + " which forces a strong coupling between the producers and consumer!")
+    private boolean transferException;
+    @UriParam(label = "producer,advanced",
+              description = "Use this option to force disabling time to live."
+                            + " For example when you do request/reply over JMS, then Camel will by default use the requestTimeout value"
+                            + " as time to live on the message being sent. The problem is that the sender and receiver systems have"
+                            + " to have their clocks synchronized, so they are in sync. This is not always so easy to archive."
+                            + " So you can use disableTimeToLive=true to not set a time to live value on the sent message."
+                            + " Then the message will not expire on the receiver system. See below in section About time to live for more details.")
+    private boolean disableTimeToLive;
+    @UriParam(label = "consumer",
+              description = "Whether the JmsConsumer processes the Exchange asynchronously."
+                            + " If enabled then the JmsConsumer may pickup the next message from the JMS queue,"
+                            + " while the previous message is being processed asynchronously (by the Asynchronous Routing Engine)."
+                            + " This means that messages may be processed not 100% strictly in order. If disabled (as default)"
+                            + " then the Exchange is fully processed before the JmsConsumer will pickup the next message from the JMS queue."
+                            + " Note if transacted has been enabled, then asyncConsumer=true does not run asynchronously, as transaction"
+                            + "  must be executed synchronously (Camel 3.0 may support async transactions).")
+    private boolean asyncConsumer;
 
     private JmsObjectFactory jmsObjectFactory = new Jms11ObjectFactory();
 
@@ -226,7 +278,6 @@ public class SjmsEndpoint extends DefaultEndpoint
 
     public SjmsEndpoint(String uri, Component component, String remaining) {
         super(uri, component);
-        // TODO: optimize for dynamic destination name via toD (eg ${ } somewhere)
         this.topic = DestinationNameParser.isTopic(remaining);
         this.destinationName = DestinationNameParser.getShortName(remaining);
     }
@@ -250,9 +301,25 @@ public class SjmsEndpoint extends DefaultEndpoint
 
     @Override
     public Consumer createConsumer(Processor processor) throws Exception {
-        SjmsConsumer answer = new SjmsConsumer(this, processor);
-        configureConsumer(answer);
-        return answer;
+        EndpointMessageListener listener = new EndpointMessageListener(this, processor);
+        if (isDisableReplyTo()) {
+            listener.setDisableReplyTo(true);
+        }
+        if (!"false".equals(eagerPoisonBody)) {
+            listener.setEagerPoisonBody(eagerPoisonBody);
+        }
+        listener.setEagerLoadingOfProperties(eagerLoadingOfProperties);
+        if (getReplyTo() != null) {
+            listener.setReplyToDestination(getReplyTo());
+        }
+        listener.setAsync(isAsyncConsumer());
+
+        MessageListenerContainer container = createMessageListenerContainer(this);
+        container.setMessageListener(listener);
+
+        SjmsConsumer consumer = new SjmsConsumer(this, processor, container);
+        configureConsumer(consumer);
+        return consumer;
     }
 
     @Override
@@ -298,6 +365,48 @@ public class SjmsEndpoint extends DefaultEndpoint
     }
 
     /**
+     * Factory method for creating a new template for InOnly message exchanges
+     */
+    public SjmsTemplate createInOnlyTemplate() {
+        // TODO: Add isPubSub, Destination as parameters so we can have a default destination
+        SjmsTemplate template = new SjmsTemplate(getConnectionFactory(), isTransacted(), getAcknowledgementMode().intValue());
+
+        // configure qos if enabled
+        if (isExplicitQosEnabled()) {
+            int dm = isDeliveryPersistent() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT;
+            if (getDeliveryMode() != null) {
+                dm = getDeliveryMode();
+            }
+            template.setQoSSettings(dm, getPriority(), getTimeToLive());
+        }
+
+        return template;
+    }
+
+    /**
+     * Factory method for creating a new template for InOut message exchanges
+     */
+    public SjmsTemplate createInOutTemplate() {
+        // TODO: Add isPubSub, Destination as parameters so we can have a default destination
+        SjmsTemplate template = createInOnlyTemplate();
+        if (getRequestTimeout() > 0) {
+            template.setExplicitQosEnabled(true);
+
+            // prefer to use timeToLive over requestTimeout if both specified
+            long ttl = getTimeToLive() > 0 ? getTimeToLive() : getRequestTimeout();
+            if (!isDisableTimeToLive()) {
+                // only use TTL if not disabled
+                template.setQoSSettings(0, 0, ttl);
+            }
+        }
+        return template;
+    }
+
+    public MessageListenerContainer createMessageListenerContainer(SjmsEndpoint endpoint) throws Exception {
+        return new SimpleMessageListenerContainer(endpoint);
+    }
+
+    /**
      * When one of the QoS properties are configured such as {@link #setDeliveryPersistent(boolean)},
      * {@link #setPriority(int)} or {@link #setTimeToLive(long)} then we should auto default the setting of
      * {@link #setExplicitQosEnabled(Boolean)} if its not been configured yet
@@ -339,6 +448,15 @@ public class SjmsEndpoint extends DefaultEndpoint
         this.binding = binding;
     }
 
+    protected ExecutorService getAsyncStartStopExecutorService() {
+        if (getComponent() == null) {
+            throw new IllegalStateException(
+                    "AsyncStartStopListener requires JmsComponent to be configured on this endpoint: " + this);
+        }
+        // use shared thread pool from component
+        return getComponent().getAsyncStartStopExecutorService();
+    }
+
     /**
      * DestinationName is a JMS queue or topic name. By default, the destinationName is interpreted as a queue name.
      */
@@ -403,20 +521,6 @@ public class SjmsEndpoint extends DefaultEndpoint
                 = EndpointHelper.resolveReferenceParameter(getCamelContext(), connectionResource, ConnectionResource.class);
     }
 
-    @Override
-    public boolean isSynchronous() {
-        return synchronous;
-    }
-
-    /**
-     * Sets whether synchronous processing should be strictly used or Camel is allowed to use asynchronous processing
-     * (if supported).
-     */
-    @Override
-    public void setSynchronous(boolean synchronous) {
-        this.synchronous = synchronous;
-    }
-
     public SessionAcknowledgementType getAcknowledgementMode() {
         return acknowledgementMode;
     }
@@ -501,6 +605,22 @@ public class SjmsEndpoint extends DefaultEndpoint
         this.replyToDeliveryPersistent = replyToDeliveryPersistent;
     }
 
+    public String getEagerPoisonBody() {
+        return eagerPoisonBody;
+    }
+
+    public boolean isEagerLoadingOfProperties() {
+        return eagerLoadingOfProperties;
+    }
+
+    public void setEagerLoadingOfProperties(boolean eagerLoadingOfProperties) {
+        this.eagerLoadingOfProperties = eagerLoadingOfProperties;
+    }
+
+    public void setEagerPoisonBody(String eagerPoisonBody) {
+        this.eagerPoisonBody = eagerPoisonBody;
+    }
+
     public Integer getDeliveryMode() {
         return deliveryMode;
     }
@@ -596,6 +716,14 @@ public class SjmsEndpoint extends DefaultEndpoint
         setExchangePattern(ExchangePattern.InOut);
     }
 
+    public boolean isTestConnectionOnStartup() {
+        return testConnectionOnStartup;
+    }
+
+    public void setTestConnectionOnStartup(boolean testConnectionOnStartup) {
+        this.testConnectionOnStartup = testConnectionOnStartup;
+    }
+
     /**
      * Whether to startup the consumer message listener asynchronously, when starting a route. For example if a
      * JmsConsumer cannot get a connection to a remote JMS broker, then it may block while retrying and/or failover.
@@ -623,6 +751,14 @@ public class SjmsEndpoint extends DefaultEndpoint
         return asyncStopListener;
     }
 
+    public boolean isAutoStartup() {
+        return autoStartup;
+    }
+
+    public void setAutoStartup(boolean autoStartup) {
+        this.autoStartup = autoStartup;
+    }
+
     public DestinationCreationStrategy getDestinationCreationStrategy() {
         return destinationCreationStrategy;
     }
@@ -634,6 +770,14 @@ public class SjmsEndpoint extends DefaultEndpoint
         this.destinationCreationStrategy = destinationCreationStrategy;
     }
 
+    public boolean isReplyToSameDestinationAllowed() {
+        return replyToSameDestinationAllowed;
+    }
+
+    public void setReplyToSameDestinationAllowed(boolean replyToSameDestinationAllowed) {
+        this.replyToSameDestinationAllowed = replyToSameDestinationAllowed;
+    }
+
     public boolean isAllowNullBody() {
         return allowNullBody;
     }
@@ -766,25 +910,35 @@ public class SjmsEndpoint extends DefaultEndpoint
         this.jmsObjectFactory = jmsObjectFactory;
     }
 
-    public boolean isReconnectOnError() {
-        return reconnectOnError;
+    public boolean isTransferException() {
+        return transferException;
     }
 
-    /**
-     * Try to apply reconnection logic on consumer pool
-     */
-    public void setReconnectOnError(boolean reconnectOnError) {
-        this.reconnectOnError = reconnectOnError;
+    public void setTransferException(boolean transferException) {
+        this.transferException = transferException;
     }
 
-    public long getReconnectBackOff() {
-        return reconnectBackOff;
+    public boolean isDisableTimeToLive() {
+        return disableTimeToLive;
     }
 
-    /**
-     * Backoff in millis on consumer pool reconnection attempts
-     */
-    public void setReconnectBackOff(long reconnectBackOff) {
-        this.reconnectBackOff = reconnectBackOff;
+    public void setDisableTimeToLive(boolean disableTimeToLive) {
+        this.disableTimeToLive = disableTimeToLive;
+    }
+
+    public long getRecoveryInterval() {
+        return recoveryInterval;
+    }
+
+    public void setRecoveryInterval(long recoveryInterval) {
+        this.recoveryInterval = recoveryInterval;
+    }
+
+    public boolean isAsyncConsumer() {
+        return asyncConsumer;
+    }
+
+    public void setAsyncConsumer(boolean asyncConsumer) {
+        this.asyncConsumer = asyncConsumer;
     }
 }
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsHelper.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsHelper.java
new file mode 100644
index 0000000..bff6ecc
--- /dev/null
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsHelper.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+public final class SjmsHelper {
+
+    private SjmsHelper() {
+    }
+
+    public static void closeProducer(MessageProducer producer) {
+        if (producer != null) {
+            try {
+                producer.close();
+            } catch (Throwable e) {
+                // ignore
+            }
+        }
+    }
+
+    public static void closeConnection(Connection con) {
+        if (con != null) {
+            try {
+                con.close();
+            } catch (Throwable e) {
+                // ignore
+            }
+        }
+    }
+
+    public static void closeSession(Session ses) {
+        if (ses != null) {
+            try {
+                ses.close();
+            } catch (Throwable e) {
+                // ignore
+            }
+        }
+    }
+
+    public static void closeConsumer(MessageConsumer consumer) {
+        if (consumer != null) {
+            boolean interrupted = Thread.interrupted();
+            try {
+                consumer.close();
+            } catch (JMSException ex) {
+                // ignore
+            } finally {
+                if (interrupted) {
+                    Thread.currentThread().interrupt();
+                }
+            }
+        }
+    }
+
+    public static void commitIfNecessary(Session session) throws JMSException {
+        try {
+            session.commit();
+        } catch (javax.jms.TransactionInProgressException | javax.jms.IllegalStateException ex) {
+            // ignore
+        }
+    }
+
+    public static void rollbackIfNecessary(Session session) throws JMSException {
+        try {
+            session.rollback();
+        } catch (javax.jms.TransactionInProgressException | javax.jms.IllegalStateException ex) {
+            // ignore
+        }
+    }
+}
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
index 3ed1034..6bc7d80 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
@@ -20,7 +20,6 @@ import javax.jms.Connection;
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
 import javax.jms.JMSException;
-import javax.jms.MessageProducer;
 import javax.jms.Session;
 
 import org.apache.camel.AsyncCallback;
@@ -173,14 +172,4 @@ public abstract class SjmsProducer extends DefaultAsyncProducer {
         return null;
     }
 
-    protected static void close(MessageProducer producer) {
-        if (producer != null) {
-            try {
-                producer.close();
-            } catch (Throwable e) {
-                // ignore
-            }
-        }
-    }
-
 }
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsTemplate.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsTemplate.java
index 6461c6e..902ce6e 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsTemplate.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsTemplate.java
@@ -58,6 +58,10 @@ public class SjmsTemplate {
         }
     }
 
+    public void setExplicitQosEnabled(boolean explicitQosEnabled) {
+        this.explicitQosEnabled = explicitQosEnabled;
+    }
+
     public <T> T execute(SessionCallback<T> sessionCallback) throws Exception {
         Connection con = null;
         Session session = null;
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java
index 2a1380d..0eecf31 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java
@@ -36,6 +36,7 @@ import static org.apache.camel.RuntimeCamelException.wrapRuntimeCamelException;
 /**
  * Abstract MessageListener
  */
+@Deprecated
 public abstract class AbstractMessageHandler implements MessageListener {
 
     protected final Logger log = LoggerFactory.getLogger(getClass());
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java
new file mode 100644
index 0000000..055365c
--- /dev/null
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.consumer;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.CamelExchangeException;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Processor;
+import org.apache.camel.RollbackExchangeException;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.component.sjms.SessionMessageListener;
+import org.apache.camel.component.sjms.SjmsEndpoint;
+import org.apache.camel.component.sjms.SjmsTemplate;
+import org.apache.camel.component.sjms.jms.JmsMessageHelper;
+import org.apache.camel.support.AsyncProcessorConverterHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.camel.RuntimeCamelException.wrapRuntimeCamelException;
+
+/**
+ * A JMS {@link MessageListener} which can be used to delegate processing to a Camel endpoint.
+ *
+ * Note that instance of this object has to be thread safe (reentrant)
+ */
+public class EndpointMessageListener implements SessionMessageListener {
+
+    private static final Logger LOG = LoggerFactory.getLogger(EndpointMessageListener.class);
+
+    private final SjmsEndpoint endpoint;
+    private final AsyncProcessor processor;
+    private Object replyToDestination;
+    private boolean disableReplyTo;
+    private boolean async;
+    private boolean eagerLoadingOfProperties;
+    private String eagerPoisonBody;
+    private volatile SjmsTemplate template;
+
+    public EndpointMessageListener(SjmsEndpoint endpoint, Processor processor) {
+        this.endpoint = endpoint;
+        this.processor = AsyncProcessorConverterHelper.convert(processor);
+    }
+
+    public synchronized SjmsTemplate getTemplate() {
+        if (template == null) {
+            template = endpoint.createInOnlyTemplate();
+        }
+        return template;
+    }
+
+    public void setTemplate(SjmsTemplate template) {
+        this.template = template;
+    }
+
+    public Object getReplyToDestination() {
+        return replyToDestination;
+    }
+
+    public void setReplyToDestination(Object replyToDestination) {
+        this.replyToDestination = replyToDestination;
+    }
+
+    public boolean isDisableReplyTo() {
+        return disableReplyTo;
+    }
+
+    public void setDisableReplyTo(boolean disableReplyTo) {
+        this.disableReplyTo = disableReplyTo;
+    }
+
+    public boolean isAsync() {
+        return async;
+    }
+
+    public void setAsync(boolean async) {
+        this.async = async;
+    }
+
+    public boolean isEagerLoadingOfProperties() {
+        return eagerLoadingOfProperties;
+    }
+
+    public void setEagerLoadingOfProperties(boolean eagerLoadingOfProperties) {
+        this.eagerLoadingOfProperties = eagerLoadingOfProperties;
+    }
+
+    public String getEagerPoisonBody() {
+        return eagerPoisonBody;
+    }
+
+    public void setEagerPoisonBody(String eagerPoisonBody) {
+        this.eagerPoisonBody = eagerPoisonBody;
+    }
+
+    @Override
+    public void onMessage(Message message, Session session) {
+        LOG.trace("onMessage START");
+
+        LOG.debug("{} consumer received JMS message: {}", endpoint, message);
+
+        boolean sendReply;
+        RuntimeCamelException rce;
+        try {
+            Object replyDestination = getReplyToDestination(message);
+            // we can only send back a reply if there was a reply destination configured
+            // and disableReplyTo hasn't been explicit enabled
+            sendReply = replyDestination != null && !disableReplyTo;
+
+            // we should also not send back reply to ourself if this destination and replyDestination is the same
+            Destination destination = JmsMessageHelper.getJMSDestination(message);
+            if (destination != null && sendReply && !endpoint.isReplyToSameDestinationAllowed()
+                    && destination.equals(replyDestination)) {
+                LOG.debug("JMSDestination and JMSReplyTo is the same, will skip sending a reply message to itself: {}",
+                        destination);
+                sendReply = false;
+            }
+
+            final Exchange exchange = createExchange(message, session, replyDestination);
+            if (ObjectHelper.isNotEmpty(eagerPoisonBody) && eagerLoadingOfProperties) {
+                try {
+                    exchange.getIn().getBody();
+                    exchange.getIn().getHeaders();
+                } catch (Throwable e) {
+                    // any problems with eager loading then set an exception so Camel error handler can react
+                    exchange.setException(e);
+                    String text = eagerPoisonBody;
+                    try {
+                        text = endpoint.getCamelContext().resolveLanguage("simple")
+                                .createExpression(eagerPoisonBody).evaluate(exchange, String.class);
+                    } catch (Throwable t) {
+                        // ignore
+                    }
+                    exchange.getIn().setBody(text);
+                }
+            } else if (eagerLoadingOfProperties) {
+                exchange.getIn().getBody();
+                exchange.getIn().getHeaders();
+            }
+
+            String correlationId = message.getJMSCorrelationID();
+            if (correlationId != null) {
+                LOG.debug("Received Message has JMSCorrelationID [{}]", correlationId);
+            }
+
+            // process the exchange either asynchronously or synchronous
+            LOG.trace("onMessage.process START");
+            AsyncCallback callback
+                    = new EndpointMessageListenerAsyncCallback(
+                            session, message, exchange, endpoint, sendReply, replyDestination);
+
+            // async is by default false, which mean we by default will process the exchange synchronously
+            // to keep backwards compatible, as well ensure this consumer will pickup messages in order
+            // (eg to not consume the next message before the previous has been fully processed)
+            // but if end user explicit configure consumerAsync=true, then we can process the message
+            // asynchronously (unless endpoint has been configured synchronous, or we use transaction)
+            boolean forceSync = endpoint.isSynchronous() || endpoint.isTransacted();
+            if (forceSync || !isAsync()) {
+                // must process synchronous if transacted or configured to do so
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Processing exchange {} synchronously", exchange.getExchangeId());
+                }
+                try {
+                    processor.process(exchange);
+                } catch (Exception e) {
+                    exchange.setException(e);
+                } finally {
+                    callback.done(true);
+                }
+            } else {
+                // process asynchronous using the async routing engine
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Processing exchange {} asynchronously", exchange.getExchangeId());
+                }
+                boolean sync = processor.process(exchange, callback);
+                if (!sync) {
+                    // will be done async so return now
+                    return;
+                }
+            }
+            // if we failed processed the exchange from the async callback task, then grab the exception
+            rce = exchange.getException(RuntimeCamelException.class);
+
+        } catch (Exception e) {
+            rce = wrapRuntimeCamelException(e);
+        }
+
+        // an exception occurred so rethrow to trigger rollback on JMS listener
+        // the JMS listener will use the error handler to handle the uncaught exception
+        if (rce != null) {
+            LOG.trace("onMessage END throwing exception: {}", rce.getMessage());
+            // Spring message listener container will handle uncaught exceptions
+            // being thrown from this onMessage, and will us the ErrorHandler configured
+            // on the JmsEndpoint to handle the exception
+            throw rce;
+        }
+
+        LOG.trace("onMessage END");
+    }
+
+    protected Object getReplyToDestination(Message message) throws JMSException {
+        // lets send a response back if we can
+        Object destination = getReplyToDestination();
+        if (destination == null) {
+            destination = JmsMessageHelper.getJMSReplyTo(message);
+        }
+        return destination;
+    }
+
+    public Exchange createExchange(Message message, Session session, Object replyDestination) {
+        Exchange exchange = endpoint.createExchange(message, session);
+
+        // lets set to an InOut if we have some kind of reply-to destination
+        if (replyDestination != null && !disableReplyTo) {
+            // only change pattern if not already out capable
+            if (!exchange.getPattern().isOutCapable()) {
+                exchange.setPattern(ExchangePattern.InOut);
+            }
+        }
+        return exchange;
+    }
+
+    protected void sendReply(
+            Session session,
+            Destination replyDestination, final Message message, final Exchange exchange,
+            final org.apache.camel.Message out, final Exception cause) {
+        if (replyDestination == null) {
+            LOG.debug("Cannot send reply message as there is no replyDestination for: {}", out);
+            return;
+        }
+        try {
+            getTemplate().execute(session, sc -> {
+                MessageProducer producer = null;
+                try {
+                    Message reply = endpoint.getBinding().makeJmsMessage(exchange, out, sc, cause);
+                    final String correlationID = determineCorrelationId(message);
+                    reply.setJMSCorrelationID(correlationID);
+
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("{} sending reply JMS message [correlationId:{}]: {}", endpoint, correlationID, reply);
+                    }
+
+                    producer = endpoint.getJmsObjectFactory().createMessageProducer(sc, endpoint, replyDestination);
+                    template.send(producer, reply);
+                } finally {
+                    close(producer);
+                }
+                return null;
+            });
+        } catch (Exception e) {
+            exchange.setException(new CamelExchangeException("Unable to send reply JMS message", exchange, e));
+        }
+    }
+
+    protected void sendReply(
+            Session session,
+            String replyDestination, final Message message, final Exchange exchange,
+            final org.apache.camel.Message out, final Exception cause) {
+        if (replyDestination == null) {
+            LOG.debug("Cannot send reply message as there is no replyDestination for: {}", out);
+            return;
+        }
+        try {
+            getTemplate().execute(session, sc -> {
+                MessageProducer producer = null;
+                try {
+                    Message reply = endpoint.getBinding().makeJmsMessage(exchange, out, sc, cause);
+                    final String correlationID = determineCorrelationId(message);
+                    reply.setJMSCorrelationID(correlationID);
+
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("{} sending reply JMS message [correlationId:{}]: {}", endpoint, correlationID, reply);
+                    }
+
+                    producer = endpoint.getJmsObjectFactory().createMessageProducer(sc, endpoint, replyDestination);
+                    template.send(producer, reply);
+                } finally {
+                    close(producer);
+                }
+                return null;
+            });
+        } catch (Exception e) {
+            exchange.setException(new CamelExchangeException("Unable to send reply JMS message", exchange, e));
+        }
+    }
+
+    /**
+     * Strategy to determine which correlation id to use among <tt>JMSMessageID</tt> and <tt>JMSCorrelationID</tt>.
+     *
+     * @param  message      the JMS message
+     * @return              the correlation id to use
+     * @throws JMSException can be thrown
+     */
+    protected String determineCorrelationId(final Message message) throws JMSException {
+        final String messageId = message.getJMSMessageID();
+        final String correlationId = message.getJMSCorrelationID();
+
+        if (ObjectHelper.isEmpty(correlationId)) {
+            // correlation id is empty so fallback to message id
+            return messageId;
+        } else {
+            return correlationId;
+        }
+    }
+
+    protected static void close(MessageProducer producer) {
+        if (producer != null) {
+            try {
+                producer.close();
+            } catch (Throwable e) {
+                // ignore
+            }
+        }
+    }
+
+    /**
+     * Callback task that is performed when the exchange has been processed
+     */
+    private final class EndpointMessageListenerAsyncCallback implements AsyncCallback {
+
+        private final Session session;
+        private final Message message;
+        private final Exchange exchange;
+        private final SjmsEndpoint endpoint;
+        private final boolean sendReply;
+        private final Object replyDestination;
+
+        private EndpointMessageListenerAsyncCallback(Session session, Message message, Exchange exchange, SjmsEndpoint endpoint,
+                                                     boolean sendReply, Object replyDestination) {
+            this.session = session;
+            this.message = message;
+            this.exchange = exchange;
+            this.endpoint = endpoint;
+            this.sendReply = sendReply;
+            this.replyDestination = replyDestination;
+        }
+
+        @Override
+        public void done(boolean doneSync) {
+            LOG.trace("onMessage.process END");
+
+            // now we evaluate the processing of the exchange and determine if it was a success or failure
+            // we also grab information from the exchange to be used for sending back a reply (if we are to do so)
+            // so the following logic seems a bit complicated at first glance
+
+            // if we send back a reply it can either be the message body or transferring a caused exception
+            org.apache.camel.Message body = null;
+            Exception cause = null;
+            RuntimeCamelException rce = null;
+
+            if (exchange.isFailed() || exchange.isRollbackOnly()) {
+                if (exchange.isRollbackOnly()) {
+                    // rollback only so wrap an exception so we can rethrow the exception to cause rollback
+                    rce = wrapRuntimeCamelException(new RollbackExchangeException(exchange));
+                } else if (exchange.getException() != null) {
+                    // an exception occurred while processing
+                    if (endpoint.isTransferException()) {
+                        // send the exception as reply, so null body and set the exception as the cause
+                        body = null;
+                        cause = exchange.getException();
+                    } else {
+                        // only throw exception if endpoint is not configured to transfer exceptions back to caller
+                        // do not send a reply but wrap and rethrow the exception
+                        rce = wrapRuntimeCamelException(exchange.getException());
+                    }
+                }
+            } else {
+                // process OK so get the reply body if we are InOut and has a body
+                // If the ppl don't want to send the message back, he should use the InOnly
+                if (sendReply && exchange.getPattern().isOutCapable()) {
+                    if (exchange.hasOut()) {
+                        body = exchange.getOut();
+                    } else {
+                        body = exchange.getIn();
+                    }
+                    cause = null;
+                }
+            }
+
+            // send back reply if there was no error and we are supposed to send back a reply
+            if (rce == null && sendReply && (body != null || cause != null)) {
+                LOG.trace("onMessage.sendReply START");
+                if (replyDestination instanceof Destination) {
+                    sendReply(session, (Destination) replyDestination, message, exchange, body, cause);
+                } else {
+                    sendReply(session, (String) replyDestination, message, exchange, body, cause);
+                }
+                LOG.trace("onMessage.sendReply END");
+            }
+
+            // if an exception occurred
+            if (rce != null) {
+                if (doneSync) {
+                    // we were done sync, so put exception on exchange, so we can grab it in the onMessage
+                    // method and rethrow it
+                    exchange.setException(rce);
+                } else {
+                    // we were done async, so use the exception handler
+                    if (endpoint.getExceptionHandler() != null) {
+                        endpoint.getExceptionHandler().handleException(rce);
+                    }
+                }
+            }
+        }
+    }
+
+}
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOnlyMessageHandler.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOnlyMessageHandler.java
index 11676aa..0d9db8c 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOnlyMessageHandler.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOnlyMessageHandler.java
@@ -26,6 +26,7 @@ import org.apache.camel.spi.Synchronization;
 /**
  * An InOnly {@link AbstractMessageHandler}
  */
+@Deprecated
 public class InOnlyMessageHandler extends AbstractMessageHandler {
 
     public InOnlyMessageHandler(SjmsEndpoint endpoint, ExecutorService executor) {
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java
index c965874..d964be1 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java
@@ -38,6 +38,7 @@ import org.apache.camel.spi.Synchronization;
 /**
  * cache manager to store and purge unused cashed producers or we will have a memory leak
  */
+@Deprecated
 public class InOutMessageHandler extends AbstractMessageHandler {
 
     private Map<String, MessageProducer> producerCache = new TreeMap<>();
@@ -163,8 +164,7 @@ public class InOutMessageHandler extends AbstractMessageHandler {
         public void done(boolean sync) {
             try {
                 org.apache.camel.Message msg = exchange.getMessage();
-                Message response = getEndpoint().getBinding().makeJmsMessage(exchange, msg.getBody(), msg.getHeaders(),
-                        getSession(), null);
+                Message response = getEndpoint().getBinding().makeJmsMessage(exchange, msg, getSession(), null);
                 response.setJMSCorrelationID(exchange.getIn().getHeader(JmsConstants.JMS_CORRELATION_ID, String.class));
                 localProducer.send(response);
             } catch (Exception e) {
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/SimpleMessageListenerContainer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/SimpleMessageListenerContainer.java
new file mode 100644
index 0000000..e5ee4b2
--- /dev/null
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/SimpleMessageListenerContainer.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.consumer;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.component.sjms.SessionMessageListener;
+import org.apache.camel.component.sjms.SjmsEndpoint;
+import org.apache.camel.component.sjms.SjmsHelper;
+import org.apache.camel.support.service.ServiceSupport;
+import org.apache.camel.util.backoff.BackOff;
+import org.apache.camel.util.backoff.BackOffTimer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.camel.component.sjms.SjmsHelper.*;
+
+public class SimpleMessageListenerContainer extends ServiceSupport
+        implements org.apache.camel.component.sjms.MessageListenerContainer, ExceptionListener {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SimpleMessageListenerContainer.class);
+
+    private final SjmsEndpoint endpoint;
+    private SessionMessageListener messageListener;
+
+    // TODO: transaction
+    // TODO: Add concurrency later
+    private int concurrentConsumers = 1;
+
+    private final Object connectionLock = new Object();
+    private Connection connection;
+    private volatile boolean connectionStarted;
+    private final Object consumerLock = new Object();
+    private Set<MessageConsumer> consumers;
+    private Set<Session> sessions;
+
+    private BackOffTimer.Task recoverTask;
+    private ScheduledExecutorService scheduler;
+
+    public SimpleMessageListenerContainer(SjmsEndpoint endpoint) {
+        this.endpoint = endpoint;
+    }
+
+    public void setMessageListener(SessionMessageListener messageListener) {
+        this.messageListener = messageListener;
+    }
+
+    @Override
+    public void afterPropertiesConfigured(CamelContext camelContext) {
+        // noop
+    }
+
+    @Override
+    public ConnectionFactory getConnectionFactory() {
+        return endpoint.getConnectionFactory();
+    }
+
+    protected void configureConsumer(MessageConsumer consumer, Session session) throws Exception {
+        consumer.setMessageListener(new SimpleMessageListener(messageListener, session));
+    }
+
+    private class SimpleMessageListener implements MessageListener {
+
+        private final SessionMessageListener messageListener;
+        private final Session session;
+
+        public SimpleMessageListener(SessionMessageListener messageListener, Session session) {
+            this.messageListener = messageListener;
+            this.session = session;
+        }
+
+        @Override
+        public void onMessage(Message message) {
+            try {
+                doOnMessage(message);
+            } catch (Throwable e) {
+                if (e instanceof JMSException) {
+                    if (endpoint.getExceptionListener() != null) {
+                        endpoint.getExceptionListener().onException((JMSException) e);
+                    }
+                } else {
+                    LOG.warn("Execution of JMS message listener failed. This exception is ignored.", e);
+                }
+            }
+        }
+
+        protected void doOnMessage(Message message) throws Exception {
+            try {
+                messageListener.onMessage(message, session);
+            } catch (Exception e) {
+                // unexpected error so rollback
+                rollbackIfNeeded(session);
+                throw e;
+            }
+            // success then commit if we need to
+            commitIfNeeded(session, message);
+        }
+
+        protected void commitIfNeeded(Session session, Message message) throws Exception {
+            if (session.getTransacted()) {
+                SjmsHelper.commitIfNecessary(session);
+            } else if (message != null && session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) {
+                message.acknowledge();
+            }
+        }
+
+        protected void rollbackIfNeeded(Session session) throws JMSException {
+            if (session.getTransacted()) {
+                SjmsHelper.rollbackIfNecessary(session);
+            } else if (session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) {
+                session.recover();
+            }
+        }
+
+    }
+
+    @Override
+    public void onException(JMSException exception) {
+        if (endpoint.getExceptionListener() != null) {
+            try {
+                endpoint.getExceptionListener().onException(exception);
+            } catch (Throwable e) {
+                // ignore
+            }
+        }
+
+        synchronized (this.connectionLock) {
+            this.sessions = null;
+            this.consumers = null;
+        }
+        scheduleConnectionRecovery();
+    }
+
+    protected boolean recoverConnection(BackOffTimer.Task task) throws Exception {
+        LOG.debug("Recovering from JMS Connection exception (attempt: {})", task.getCurrentAttempts());
+        try {
+            refreshConnection();
+            initConsumers();
+            LOG.debug("Successfully recovered JMS Connection (attempt: {})", task.getCurrentAttempts());
+            // success so do not try again
+            return false;
+        } catch (Throwable e) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Failed to recover JMS Connection. Will try again in " + task.getCurrentDelay() + " millis", e);
+            }
+            // try again
+            return true;
+        }
+    }
+
+    protected void scheduleConnectionRecovery() {
+        if (scheduler == null) {
+            this.scheduler = endpoint.getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(this,
+                    "SimpleMessageListenerContainer");
+        }
+
+        // we need to recover using a background task
+        if (recoverTask == null || recoverTask.getStatus() != BackOffTimer.Task.Status.Active) {
+            BackOff backOff = BackOff.builder().delay(endpoint.getRecoveryInterval()).build();
+            recoverTask = new BackOffTimer(scheduler).schedule(backOff, this::recoverConnection);
+        }
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        createConnection();
+        initConsumers();
+
+        startConnection();
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        if (recoverTask != null) {
+            recoverTask.cancel();
+        }
+        stopConnection();
+        stopConsumers();
+        if (scheduler != null) {
+            endpoint.getCamelContext().getExecutorServiceManager().shutdown(scheduler);
+            scheduler = null;
+        }
+    }
+
+    protected void initConsumers() throws Exception {
+        synchronized (this.consumerLock) {
+            if (consumers == null) {
+                sessions = new HashSet<>(concurrentConsumers);
+                consumers = new HashSet<>(concurrentConsumers);
+                for (int i = 0; i < this.concurrentConsumers; i++) {
+                    Session session
+                            = connection.createSession(endpoint.isTransacted(), endpoint.getAcknowledgementMode().intValue());
+                    MessageConsumer consumer = endpoint.getJmsObjectFactory().createMessageConsumer(session, endpoint);
+                    configureConsumer(consumer, session);
+                    sessions.add(session);
+                    consumers.add(consumer);
+                }
+            }
+        }
+    }
+
+    protected void stopConsumers() throws Exception {
+        synchronized (this.consumerLock) {
+            if (consumers != null) {
+                LOG.debug("Stopping JMS MessageConsumers");
+                for (MessageConsumer consumer : this.consumers) {
+                    closeConsumer(consumer);
+                }
+                if (this.sessions != null) {
+                    LOG.debug("Stopping JMS Sessions");
+                    for (Session session : this.sessions) {
+                        closeSession(session);
+                    }
+                }
+            }
+        }
+    }
+
+    protected void createConnection() throws Exception {
+        synchronized (this.connectionLock) {
+            if (this.connection == null) {
+                Connection con = null;
+                try {
+                    con = endpoint.getConnectionFactory().createConnection();
+                    if (endpoint.getComponent().getConnectionClientId() != null) {
+                        con.setClientID(endpoint.getComponent().getConnectionClientId());
+                    }
+                    con.setExceptionListener(this);
+                } catch (JMSException e) {
+                    closeConnection(con);
+                    throw e;
+                }
+                this.connection = con;
+                LOG.debug("Created JMS Connection");
+            }
+        }
+    }
+
+    protected final void refreshConnection() throws Exception {
+        synchronized (this.connectionLock) {
+            closeConnection(connection);
+            this.connection = null;
+            createConnection();
+            if (this.connectionStarted) {
+                startConnection();
+            }
+        }
+    }
+
+    protected void startConnection() throws Exception {
+        synchronized (this.connectionLock) {
+            this.connectionStarted = true;
+            if (this.connection != null) {
+                try {
+                    this.connection.start();
+                } catch (javax.jms.IllegalStateException e) {
+                    // ignore as it may already be started
+                }
+            }
+        }
+    }
+
+    protected void stopConnection() {
+        synchronized (this.connectionLock) {
+            this.connectionStarted = false;
+            if (this.connection != null) {
+                try {
+                    this.connection.stop();
+                } catch (Throwable e) {
+                    LOG.debug("Error stopping connection. This exception is ignored.", e);
+                }
+            }
+        }
+    }
+
+}
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/Jms11ObjectFactory.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/Jms11ObjectFactory.java
index 4e7c4fb..699db13 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/Jms11ObjectFactory.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/Jms11ObjectFactory.java
@@ -111,6 +111,18 @@ public class Jms11ObjectFactory implements JmsObjectFactory {
     }
 
     @Override
+    public MessageProducer createMessageProducer(Session session, Endpoint endpoint, Destination destination) throws Exception {
+        SjmsEndpoint sjmsEndpoint = (SjmsEndpoint) endpoint;
+
+        boolean persistent = sjmsEndpoint.isDeliveryPersistent();
+        if (sjmsEndpoint.getDeliveryMode() != null) {
+            persistent = DeliveryMode.PERSISTENT == sjmsEndpoint.getDeliveryMode();
+        }
+
+        return createMessageProducer(session, destination, persistent, sjmsEndpoint.getTimeToLive());
+    }
+
+    @Override
     public MessageProducer createMessageProducer(
             Session session,
             Destination destination,
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsBinding.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsBinding.java
index 23f2d84..ed88c67 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsBinding.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsBinding.java
@@ -223,7 +223,7 @@ public class JmsBinding {
      * @throws JMSException if the message could not be created
      */
     public Message makeJmsMessage(Exchange exchange, Session session) throws JMSException {
-        Message answer = makeJmsMessage(exchange, exchange.getIn().getBody(), exchange.getIn().getHeaders(), session, null);
+        Message answer = makeJmsMessage(exchange, exchange.getIn(), session, null);
         if (answer != null && messageCreatedStrategy != null) {
             messageCreatedStrategy.onMessageCreated(answer, session, exchange, null);
         }
@@ -234,14 +234,13 @@ public class JmsBinding {
      * Creates a JMS message from the Camel exchange and message
      *
      * @param  exchange     the current exchange
-     * @param  body         the message body
-     * @param  headers      the message headers
+     * @param  camelMessage the body to make a javax.jms.Message as
      * @param  session      the JMS session used to create the message
      * @param  cause        optional exception occurred that should be sent as reply instead of a regular body
      * @return              a newly created JMS Message instance containing the
      * @throws JMSException if the message could not be created
      */
-    public Message makeJmsMessage(Exchange exchange, Object body, Map headers, Session session, Exception cause)
+    public Message makeJmsMessage(Exchange exchange, org.apache.camel.Message camelMessage, Session session, Exception cause)
             throws JMSException {
         Message answer = null;
 
@@ -283,8 +282,9 @@ public class JmsBinding {
                 answer = createJmsMessage(cause, session);
             } else {
                 // create regular jms message using the camel message body
-                answer = createJmsMessage(exchange, body, headers, session, exchange.getContext());
-                appendJmsProperties(answer, exchange, headers);
+                answer = createJmsMessage(exchange, camelMessage.getBody(), camelMessage.getHeaders(), session,
+                        exchange.getContext());
+                appendJmsProperties(answer, exchange, camelMessage.getHeaders());
             }
         }
 
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java
index b3494ea..0871be7 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java
@@ -400,4 +400,20 @@ public final class JmsMessageHelper {
         return null;
     }
 
+    /**
+     * Gets the JMSDestination from the message.
+     *
+     * @param  message the message
+     * @return         the JMSDestination, or <tt>null</tt> if not able to get
+     */
+    public static Destination getJMSDestination(Message message) {
+        try {
+            return message.getJMSDestination();
+        } catch (Exception e) {
+            // ignore if JMS broker do not support this
+        }
+
+        return null;
+    }
+
 }
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsObjectFactory.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsObjectFactory.java
index ab4b56f..056f676 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsObjectFactory.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsObjectFactory.java
@@ -42,6 +42,8 @@ public interface JmsObjectFactory {
 
     MessageProducer createMessageProducer(Session session, Endpoint endpoint, String destinationName) throws Exception;
 
+    MessageProducer createMessageProducer(Session session, Endpoint endpoint, Destination destination) throws Exception;
+
     MessageProducer createMessageProducer(
             Session session, Destination destination,
             boolean persistent, long ttl)
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
index e1b23d1..b21ce9e 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
@@ -32,6 +32,7 @@ import org.apache.camel.component.sjms.tx.DefaultTransactionCommitStrategy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.camel.component.sjms.SjmsHelper.closeProducer;
 import static org.apache.camel.component.sjms.jms.JmsMessageHelper.isTopicPrefix;
 
 /**
@@ -125,7 +126,7 @@ public class InOnlyProducer extends SjmsProducer {
                     producer = getEndpoint().getJmsObjectFactory().createMessageProducer(sc, getEndpoint(), destinationName);
                     template.send(producer, answer);
                 } finally {
-                    close(producer);
+                    closeProducer(producer);
                 }
                 return null;
             });
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
index 5fadf0d..426ae90 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
@@ -49,11 +49,15 @@ import org.apache.commons.pool.impl.GenericObjectPool;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.camel.component.sjms.SjmsHelper.closeProducer;
+
 /**
  * A Camel Producer that provides the InOut Exchange pattern.
  */
 public class InOutProducer extends SjmsProducer {
 
+    // TODO: reply manager
+
     private static final Logger LOG = LoggerFactory.getLogger(InOutProducer.class);
 
     private static final Map<String, Exchanger<Object>> EXCHANGERS = new ConcurrentHashMap<>();
@@ -275,7 +279,7 @@ public class InOutProducer extends SjmsProducer {
                     }
 
                 } finally {
-                    close(producer);
+                    closeProducer(producer);
                 }
                 return null;
             });
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SjmsEndpointConnectionSettingsTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SjmsEndpointConnectionSettingsTest.java
index 0b955ca..a888b50 100644
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SjmsEndpointConnectionSettingsTest.java
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SjmsEndpointConnectionSettingsTest.java
@@ -16,8 +16,6 @@
  */
 package org.apache.camel.component.sjms;
 
-import java.util.Random;
-
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
@@ -55,17 +53,6 @@ public class SjmsEndpointConnectionSettingsTest extends CamelTestSupport {
         assertEquals(connectionResource, qe.getConnectionResource());
     }
 
-    @Test
-    public void testConnectionCount() {
-        Random random = new Random();
-        int poolSize = random.nextInt(100);
-        Endpoint endpoint = context.getEndpoint("sjms:queue:test?connectionCount=" + poolSize);
-        assertNotNull(endpoint);
-        assertTrue(endpoint instanceof SjmsEndpoint);
-        SjmsEndpoint qe = (SjmsEndpoint) endpoint;
-        assertEquals(poolSize, qe.getConnectionCount());
-    }
-
     @Override
     protected CamelContext createCamelContext() throws Exception {
         SimpleRegistry registry = new SimpleRegistry();
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/AsyncConsumerInOutTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/AsyncConsumerInOutTest.java
index a585e0c..0192df4 100644
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/AsyncConsumerInOutTest.java
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/AsyncConsumerInOutTest.java
@@ -65,17 +65,17 @@ public class AsyncConsumerInOutTest extends CamelTestSupport {
             @Override
             public void configure() throws Exception {
                 // enable async in only mode on the consumer
-                from("sjms:queue:start?synchronous=false")
+                from("sjms:queue:start?asyncConsumer=true")
                         .choice()
                         .when(body().contains("Camel"))
                         .to("async:camel?delay=2000")
-                        .to(ExchangePattern.InOut, "sjms:queue:in.out.test?replyTo=response.queue&synchronous=false")
+                        .to(ExchangePattern.InOut, "sjms:queue:in.out.test?replyTo=response.queue")
                         .to("mock:result")
                         .otherwise()
                         .to("log:other")
                         .to("mock:result");
 
-                from("sjms:queue:in.out.test?exchangePattern=InOut&synchronous=false")
+                from("sjms:queue:in.out.test?asyncConsumer=true")
                         .to("log:camel")
                         .transform(constant("Bye Camel"));
             }
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyConsumerAsyncTrueTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyConsumerAsyncTrueTest.java
index 83a4081..7d9259f 100644
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyConsumerAsyncTrueTest.java
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyConsumerAsyncTrueTest.java
@@ -45,7 +45,7 @@ public class InOnlyConsumerAsyncTrueTest extends JmsTestSupport {
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("sjms:queue:in.only.consumer.async?synchronous=false").to("log:before")
+                from("sjms:queue:in.only.consumer.async?asyncConsumer=true").to("log:before")
                         .process(new Processor() {
                             public void process(Exchange exchange) throws Exception {
                                 if (exchange.getIn().getBody(String.class).equals("Hello Camel")) {
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyConsumerQueueSynchronousTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyConsumerQueueSynchronousTest.java
index f7f523c..75649e3 100644
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyConsumerQueueSynchronousTest.java
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyConsumerQueueSynchronousTest.java
@@ -23,7 +23,7 @@ import org.junit.jupiter.api.Test;
 
 public class InOnlyConsumerQueueSynchronousTest extends JmsTestSupport {
 
-    private static final String SJMS_QUEUE_NAME = "sjms:queue:in.only.consumer.queue?synchronous=true";
+    private static final String SJMS_QUEUE_NAME = "sjms:queue:in.only.consumer.queue";
     private static final String MOCK_RESULT = "mock:result";
 
     @Test
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOutConsumerQueueAsyncTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOutConsumerQueueAsyncTest.java
index 9d962ab..0b89f96 100644
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOutConsumerQueueAsyncTest.java
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOutConsumerQueueAsyncTest.java
@@ -38,11 +38,11 @@ public class InOutConsumerQueueAsyncTest extends JmsTestSupport {
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             public void configure() throws Exception {
-                from("sjms:queue:start?synchronous=false")
-                        .to("sjms:queue:in.out.queue?exchangePattern=InOut&synchronous=false&replyTo=in.out.queue.response")
+                from("sjms:queue:start?asyncConsumer=true")
+                        .to("sjms:queue:in.out.queue?replyTo=in.out.queue.response")
                         .to("mock:result");
 
-                from("sjms:queue:in.out.queue?exchangePattern=InOut&synchronous=false").to("log:before")
+                from("sjms:queue:in.out.queue?asyncConsumer=true").to("log:before")
                         .process(new Processor() {
                             public void process(Exchange exchange) throws Exception {
                                 String body = (String) exchange.getIn().getBody();
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOutConsumerTempQueueAsyncTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOutConsumerTempQueueAsyncTest.java
index c60236c..a6c8478 100644
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOutConsumerTempQueueAsyncTest.java
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOutConsumerTempQueueAsyncTest.java
@@ -28,8 +28,8 @@ public class InOutConsumerTempQueueAsyncTest extends JmsTestSupport {
     public void testAynchronous() throws Exception {
         getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", "Hello Camel");
 
-        template.sendBody("sjms:start", "Hello Camel");
-        template.sendBody("sjms:start", "Hello World");
+        template.requestBody("sjms:start", "Hello Camel");
+        template.requestBody("sjms:start", "Hello World");
         Thread.sleep(5000);
         assertMockEndpointsSatisfied();
     }
@@ -38,11 +38,11 @@ public class InOutConsumerTempQueueAsyncTest extends JmsTestSupport {
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             public void configure() throws Exception {
-                from("sjms:queue:start?synchronous=false")
-                        .to("sjms:queue:in.out.temp.queue?exchangePattern=InOut&synchronous=false")
+                from("sjms:queue:start?asyncConsumer=true")
+                        .to("sjms:queue:in.out.temp.queue")
                         .to("mock:result");
 
-                from("sjms:queue:in.out.temp.queue?exchangePattern=InOut&synchronous=false").to("log:before")
+                from("sjms:queue:in.out.temp.queue?asyncConsumer=true").to("log:before")
                         .process(new Processor() {
                             public void process(Exchange exchange) throws Exception {
                                 String body = (String) exchange.getIn().getBody();
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/AsyncJmsInOutIT.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/AsyncJmsInOutIT.java
index 9a6658c..2288380 100644
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/AsyncJmsInOutIT.java
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/AsyncJmsInOutIT.java
@@ -57,10 +57,10 @@ public class AsyncJmsInOutIT extends JmsTestSupport {
             public void configure() throws Exception {
 
                 from("seda:start")
-                        .to("sjms:queue:in.foo?synchronous=false&replyTo=out.bar&exchangePattern=InOut")
+                        .to("sjms:queue:in.foo?asyncConsumer=true&replyTo=out.bar&exchangePattern=InOut")
                         .to("mock:result");
 
-                from("sjms:queue:in.foo?synchronous=false&exchangePattern=InOut")
+                from("sjms:queue:in.foo?asyncConsumer=true")
                         .log("Using ${threadName} to process ${body}")
                         .transform(body().prepend("Bye "));
             }
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/AsyncJmsInOutTempDestIT.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/AsyncJmsInOutTempDestIT.java
index 580eefb..0e37e78 100644
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/AsyncJmsInOutTempDestIT.java
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/it/AsyncJmsInOutTempDestIT.java
@@ -57,10 +57,10 @@ public class AsyncJmsInOutTempDestIT extends JmsTestSupport {
             public void configure() throws Exception {
 
                 from("seda:start")
-                        .to("sjms:in.foo.tempQ?synchronous=false&exchangePattern=InOut")
+                        .to("sjms:in.foo.tempQ?exchangePattern=InOut")
                         .to("mock:result");
 
-                from("sjms:in.foo.tempQ?synchronous=false&exchangePattern=InOut")
+                from("sjms:in.foo.tempQ?asyncConsumer=true")
                         .log("Using ${threadName} to process ${body}")
                         .transform(body().prepend("Bye "));
             }
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/AsyncQueueProducerTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/AsyncQueueProducerTest.java
index 5d72413..ab3a132 100644
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/AsyncQueueProducerTest.java
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/AsyncQueueProducerTest.java
@@ -89,9 +89,9 @@ public class AsyncQueueProducerTest extends CamelTestSupport {
                                 afterThreadName = Thread.currentThread().getName();
                             }
                         })
-                        .to("sjms:queue:foo?synchronous=false");
+                        .to("sjms:queue:foo");
 
-                from("sjms:queue:foo?synchronous=false")
+                from("sjms:queue:foo?asyncConsumer=true")
                         .to("mock:after")
                         .to("log:after")
                         .delay(1000)
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/AsyncTopicProducerTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/AsyncTopicProducerTest.java
index 19f3d24..bf04b87 100644
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/AsyncTopicProducerTest.java
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/AsyncTopicProducerTest.java
@@ -89,7 +89,7 @@ public class AsyncTopicProducerTest extends CamelTestSupport {
                                 afterThreadName = Thread.currentThread().getName();
                             }
                         })
-                        .to("sjms:topic:foo?synchronous=false");
+                        .to("sjms:topic:foo");
 
                 from("sjms:topic:foo")
                         .to("mock:after")
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerAsyncLoadTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerAsyncLoadTest.java
index aaa2a31..4a618d0 100644
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerAsyncLoadTest.java
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerAsyncLoadTest.java
@@ -119,7 +119,7 @@ public class InOutQueueProducerAsyncLoadTest extends JmsTestSupport {
                         .to("log:" + TEST_DESTINATION_NAME + ".in.log?showBody=true")
                         .to(ExchangePattern.InOut, "sjms:queue:" + TEST_DESTINATION_NAME + ".request" + "?replyTo="
                                                    + TEST_DESTINATION_NAME
-                                                   + ".response&consumerCount=10&synchronous=false")
+                                                   + ".response&consumerCount=10")
                         .threads(20)
                         .to("log:" + TEST_DESTINATION_NAME + ".out.log?showBody=true");
             }
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerSyncLoadTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerSyncLoadTest.java
index 445cc01..d575040 100644
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerSyncLoadTest.java
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerSyncLoadTest.java
@@ -94,7 +94,7 @@ public class InOutQueueProducerSyncLoadTest extends JmsTestSupport {
                         assertNotNull(response);
                         assertEquals(responseText, response);
                     } catch (Exception e) {
-                        log.error("TODO Auto-generated catch block", e);
+                        log.warn("Error", e);
                     }
                 }
             };
@@ -117,7 +117,7 @@ public class InOutQueueProducerSyncLoadTest extends JmsTestSupport {
                         .to("log:" + TEST_DESTINATION_NAME + ".in.log?showBody=true")
                         .to(ExchangePattern.InOut, "sjms:queue:" + TEST_DESTINATION_NAME + ".request" + "?replyTo="
                                                    + TEST_DESTINATION_NAME
-                                                   + ".response&consumerCount=20&synchronous=true")
+                                                   + ".response&consumerCount=20")
                         .to("log:" + TEST_DESTINATION_NAME + ".out.log?showBody=true");
             }
         };
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/NoConnectionFactoryTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/NoConnectionFactoryTest.java
deleted file mode 100644
index a784b8f..0000000
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/NoConnectionFactoryTest.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.sjms.producer;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.FailedToStartRouteException;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.impl.DefaultCamelContext;
-import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.fail;
-
-/**
- * A unit test to ensure getting a meaningful error message when neither of ConnectionResource nor ConnectionFactory is
- * configured.
- */
-public class NoConnectionFactoryTest {
-
-    private static final Logger LOG = LoggerFactory.getLogger(NoConnectionFactoryTest.class);
-
-    @Test
-    public void testConsumerInOnly() throws Exception {
-        CamelContext context = new DefaultCamelContext();
-        context.addRoutes(createConsumerInOnlyRouteBuilder());
-        try {
-            context.start();
-        } catch (Throwable t) {
-            assertEquals(IllegalArgumentException.class, t.getClass());
-            LOG.info("Expected exception was thrown", t);
-            return;
-        }
-        fail("No exception was thrown");
-    }
-
-    @Test
-    public void testConsumerInOut() throws Exception {
-        CamelContext context = new DefaultCamelContext();
-        context.addRoutes(createConsumerInOutRouteBuilder());
-        try {
-            context.start();
-        } catch (Throwable t) {
-            assertEquals(IllegalArgumentException.class, t.getClass());
-            LOG.info("Expected exception was thrown", t);
-            return;
-        }
-        fail("No exception was thrown");
-    }
-
-    @Test
-    public void testProducerInOnly() throws Exception {
-        CamelContext context = new DefaultCamelContext();
-        context.addRoutes(createProducerInOnlyRouteBuilder());
-        try {
-            context.start();
-        } catch (Throwable t) {
-            assertEquals(FailedToStartRouteException.class, t.getClass());
-            assertEquals(IllegalArgumentException.class, t.getCause().getCause().getClass());
-            LOG.info("Expected exception was thrown", t);
-            return;
-        }
-        fail("No exception was thrown");
-    }
-
-    @Test
-    public void testProducerInOut() throws Exception {
-        CamelContext context = new DefaultCamelContext();
-        context.addRoutes(createProducerInOutRouteBuilder());
-        try {
-            context.start();
-        } catch (Throwable t) {
-            assertEquals(FailedToStartRouteException.class, t.getClass());
-            assertEquals(IllegalArgumentException.class, t.getCause().getCause().getClass());
-            LOG.info("Expected exception was thrown", t);
-            return;
-        }
-        fail("No exception was thrown");
-    }
-
-    protected RouteBuilder createConsumerInOnlyRouteBuilder() throws Exception {
-        return new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                from("sjms:queue:test-in?exchangePattern=InOnly")
-                        .to("mock:result");
-            }
-        };
-    }
-
-    protected RouteBuilder createConsumerInOutRouteBuilder() throws Exception {
-        return new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                from("sjms:queue:test-in?exchangePattern=InOut")
-                        .to("mock:result");
-            }
-        };
-    }
-
-    protected RouteBuilder createProducerInOnlyRouteBuilder() throws Exception {
-        return new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                from("direct:inonly")
-                        .to("sjms:queue:test-out?exchangePattern=InOnly")
-                        .to("mock:result");
-            }
-        };
-    }
-
-    protected RouteBuilder createProducerInOutRouteBuilder() throws Exception {
-        return new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                from("direct:inout")
-                        .to("sjms:queue:test-out?exchangePattern=InOut")
-                        .to("mock:result");
-            }
-        };
-    }
-}
diff --git a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/SjmsComponentBuilderFactory.java b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/SjmsComponentBuilderFactory.java
index a20f612..730c6ba 100644
--- a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/SjmsComponentBuilderFactory.java
+++ b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/SjmsComponentBuilderFactory.java
@@ -107,36 +107,6 @@ public interface SjmsComponentBuilderFactory {
             return this;
         }
         /**
-         * Backoff in millis on consumer pool reconnection attempts.
-         * 
-         * The option is a: &lt;code&gt;long&lt;/code&gt; type.
-         * 
-         * Default: 5000
-         * Group: consumer
-         * 
-         * @param reconnectBackOff the value to set
-         * @return the dsl builder
-         */
-        default SjmsComponentBuilder reconnectBackOff(long reconnectBackOff) {
-            doSetProperty("reconnectBackOff", reconnectBackOff);
-            return this;
-        }
-        /**
-         * Try to apply reconnection logic on consumer pool.
-         * 
-         * The option is a: &lt;code&gt;boolean&lt;/code&gt; type.
-         * 
-         * Default: true
-         * Group: consumer
-         * 
-         * @param reconnectOnError the value to set
-         * @return the dsl builder
-         */
-        default SjmsComponentBuilder reconnectOnError(boolean reconnectOnError) {
-            doSetProperty("reconnectOnError", reconnectOnError);
-            return this;
-        }
-        /**
          * Whether the producer should be started lazy (on the first message).
          * By starting lazy you can use this to allow CamelContext and routes to
          * startup in situations where a producer may otherwise fail during
@@ -307,6 +277,23 @@ public interface SjmsComponentBuilderFactory {
             return this;
         }
         /**
+         * Specifies the interval between recovery attempts, i.e. when a
+         * connection is being refreshed, in milliseconds. The default is 5000
+         * ms, that is, 5 seconds.
+         * 
+         * The option is a: &lt;code&gt;long&lt;/code&gt; type.
+         * 
+         * Default: 5000
+         * Group: advanced
+         * 
+         * @param recoveryInterval the value to set
+         * @return the dsl builder
+         */
+        default SjmsComponentBuilder recoveryInterval(long recoveryInterval) {
+            doSetProperty("recoveryInterval", recoveryInterval);
+            return this;
+        }
+        /**
          * To use a custom org.apache.camel.spi.HeaderFilterStrategy to filter
          * header to and from Camel message.
          * 
@@ -395,8 +382,6 @@ public interface SjmsComponentBuilderFactory {
             case "connectionCount": ((SjmsComponent) component).setConnectionCount((java.lang.Integer) value); return true;
             case "connectionFactory": ((SjmsComponent) component).setConnectionFactory((javax.jms.ConnectionFactory) value); return true;
             case "bridgeErrorHandler": ((SjmsComponent) component).setBridgeErrorHandler((boolean) value); return true;
-            case "reconnectBackOff": ((SjmsComponent) component).setReconnectBackOff((long) value); return true;
-            case "reconnectOnError": ((SjmsComponent) component).setReconnectOnError((boolean) value); return true;
             case "lazyStartProducer": ((SjmsComponent) component).setLazyStartProducer((boolean) value); return true;
             case "autowiredEnabled": ((SjmsComponent) component).setAutowiredEnabled((boolean) value); return true;
             case "connectionClientId": ((SjmsComponent) component).setConnectionClientId((java.lang.String) value); return true;
@@ -406,6 +391,7 @@ public interface SjmsComponentBuilderFactory {
             case "destinationCreationStrategy": ((SjmsComponent) component).setDestinationCreationStrategy((org.apache.camel.component.sjms.jms.DestinationCreationStrategy) value); return true;
             case "jmsKeyFormatStrategy": ((SjmsComponent) component).setJmsKeyFormatStrategy((org.apache.camel.component.sjms.jms.JmsKeyFormatStrategy) value); return true;
             case "messageCreatedStrategy": ((SjmsComponent) component).setMessageCreatedStrategy((org.apache.camel.component.sjms.jms.MessageCreatedStrategy) value); return true;
+            case "recoveryInterval": ((SjmsComponent) component).setRecoveryInterval((long) value); return true;
             case "headerFilterStrategy": ((SjmsComponent) component).setHeaderFilterStrategy((org.apache.camel.spi.HeaderFilterStrategy) value); return true;
             case "connectionPassword": ((SjmsComponent) component).setConnectionPassword((java.lang.String) value); return true;
             case "connectionUsername": ((SjmsComponent) component).setConnectionUsername((java.lang.String) value); return true;
diff --git a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/SjmsEndpointBuilderFactory.java b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/SjmsEndpointBuilderFactory.java
index 13587e2..31743a6 100644
--- a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/SjmsEndpointBuilderFactory.java
+++ b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/SjmsEndpointBuilderFactory.java
@@ -142,212 +142,234 @@ public interface SjmsEndpointBuilderFactory {
             return this;
         }
         /**
-         * Allows for bridging the consumer to the Camel routing Error Handler,
-         * which mean any exceptions occurred while the consumer is trying to
-         * pickup incoming messages, or the likes, will now be processed as a
-         * message and handled by the routing Error Handler. By default the
-         * consumer will use the org.apache.camel.spi.ExceptionHandler to deal
-         * with exceptions, that will be logged at WARN or ERROR level and
-         * ignored.
+         * Specifies whether to test the connection on startup. This ensures
+         * that when Camel starts that all the JMS consumers have a valid
+         * connection to the JMS broker. If a connection cannot be granted then
+         * Camel throws an exception on startup. This ensures that Camel is not
+         * started with failed connections. The JMS producers is tested as well.
          * 
          * The option is a: &lt;code&gt;boolean&lt;/code&gt; type.
          * 
          * Default: false
-         * Group: consumer
+         * Group: common
          * 
-         * @param bridgeErrorHandler the value to set
+         * @param testConnectionOnStartup the value to set
          * @return the dsl builder
          */
-        default SjmsEndpointConsumerBuilder bridgeErrorHandler(
-                boolean bridgeErrorHandler) {
-            doSetProperty("bridgeErrorHandler", bridgeErrorHandler);
+        default SjmsEndpointConsumerBuilder testConnectionOnStartup(
+                boolean testConnectionOnStartup) {
+            doSetProperty("testConnectionOnStartup", testConnectionOnStartup);
             return this;
         }
         /**
-         * Allows for bridging the consumer to the Camel routing Error Handler,
-         * which mean any exceptions occurred while the consumer is trying to
-         * pickup incoming messages, or the likes, will now be processed as a
-         * message and handled by the routing Error Handler. By default the
-         * consumer will use the org.apache.camel.spi.ExceptionHandler to deal
-         * with exceptions, that will be logged at WARN or ERROR level and
-         * ignored.
+         * Specifies whether to test the connection on startup. This ensures
+         * that when Camel starts that all the JMS consumers have a valid
+         * connection to the JMS broker. If a connection cannot be granted then
+         * Camel throws an exception on startup. This ensures that Camel is not
+         * started with failed connections. The JMS producers is tested as well.
          * 
          * The option will be converted to a &lt;code&gt;boolean&lt;/code&gt;
          * type.
          * 
          * Default: false
-         * Group: consumer
+         * Group: common
          * 
-         * @param bridgeErrorHandler the value to set
+         * @param testConnectionOnStartup the value to set
          * @return the dsl builder
          */
-        default SjmsEndpointConsumerBuilder bridgeErrorHandler(
-                String bridgeErrorHandler) {
-            doSetProperty("bridgeErrorHandler", bridgeErrorHandler);
+        default SjmsEndpointConsumerBuilder testConnectionOnStartup(
+                String testConnectionOnStartup) {
+            doSetProperty("testConnectionOnStartup", testConnectionOnStartup);
             return this;
         }
         /**
-         * Sets the number of consumer listeners used for this endpoint.
+         * Whether the JmsConsumer processes the Exchange asynchronously. If
+         * enabled then the JmsConsumer may pickup the next message from the JMS
+         * queue, while the previous message is being processed asynchronously
+         * (by the Asynchronous Routing Engine). This means that messages may be
+         * processed not 100% strictly in order. If disabled (as default) then
+         * the Exchange is fully processed before the JmsConsumer will pickup
+         * the next message from the JMS queue. Note if transacted has been
+         * enabled, then asyncConsumer=true does not run asynchronously, as
+         * transaction must be executed synchronously (Camel 3.0 may support
+         * async transactions).
          * 
-         * The option is a: &lt;code&gt;int&lt;/code&gt; type.
+         * The option is a: &lt;code&gt;boolean&lt;/code&gt; type.
          * 
-         * Default: 1
+         * Default: false
          * Group: consumer
          * 
-         * @param consumerCount the value to set
+         * @param asyncConsumer the value to set
          * @return the dsl builder
          */
-        default SjmsEndpointConsumerBuilder consumerCount(int consumerCount) {
-            doSetProperty("consumerCount", consumerCount);
+        default SjmsEndpointConsumerBuilder asyncConsumer(boolean asyncConsumer) {
+            doSetProperty("asyncConsumer", asyncConsumer);
             return this;
         }
         /**
-         * Sets the number of consumer listeners used for this endpoint.
+         * Whether the JmsConsumer processes the Exchange asynchronously. If
+         * enabled then the JmsConsumer may pickup the next message from the JMS
+         * queue, while the previous message is being processed asynchronously
+         * (by the Asynchronous Routing Engine). This means that messages may be
+         * processed not 100% strictly in order. If disabled (as default) then
+         * the Exchange is fully processed before the JmsConsumer will pickup
+         * the next message from the JMS queue. Note if transacted has been
+         * enabled, then asyncConsumer=true does not run asynchronously, as
+         * transaction must be executed synchronously (Camel 3.0 may support
+         * async transactions).
          * 
-         * The option will be converted to a &lt;code&gt;int&lt;/code&gt; type.
+         * The option will be converted to a &lt;code&gt;boolean&lt;/code&gt;
+         * type.
          * 
-         * Default: 1
+         * Default: false
          * Group: consumer
          * 
-         * @param consumerCount the value to set
+         * @param asyncConsumer the value to set
          * @return the dsl builder
          */
-        default SjmsEndpointConsumerBuilder consumerCount(String consumerCount) {
-            doSetProperty("consumerCount", consumerCount);
+        default SjmsEndpointConsumerBuilder asyncConsumer(String asyncConsumer) {
+            doSetProperty("asyncConsumer", asyncConsumer);
             return this;
         }
         /**
-         * Sets the durable subscription Id required for durable topics.
+         * Specifies whether the consumer container should auto-startup.
          * 
-         * The option is a: &lt;code&gt;java.lang.String&lt;/code&gt; type.
+         * The option is a: &lt;code&gt;boolean&lt;/code&gt; type.
          * 
+         * Default: true
          * Group: consumer
          * 
-         * @param durableSubscriptionId the value to set
+         * @param autoStartup the value to set
          * @return the dsl builder
          */
-        default SjmsEndpointConsumerBuilder durableSubscriptionId(
-                String durableSubscriptionId) {
-            doSetProperty("durableSubscriptionId", durableSubscriptionId);
+        default SjmsEndpointConsumerBuilder autoStartup(boolean autoStartup) {
+            doSetProperty("autoStartup", autoStartup);
             return this;
         }
         /**
-         * Backoff in millis on consumer pool reconnection attempts.
+         * Specifies whether the consumer container should auto-startup.
          * 
-         * The option is a: &lt;code&gt;long&lt;/code&gt; type.
+         * The option will be converted to a &lt;code&gt;boolean&lt;/code&gt;
+         * type.
          * 
-         * Default: 5000
+         * Default: true
          * Group: consumer
          * 
-         * @param reconnectBackOff the value to set
+         * @param autoStartup the value to set
          * @return the dsl builder
          */
-        default SjmsEndpointConsumerBuilder reconnectBackOff(
-                long reconnectBackOff) {
-            doSetProperty("reconnectBackOff", reconnectBackOff);
+        default SjmsEndpointConsumerBuilder autoStartup(String autoStartup) {
+            doSetProperty("autoStartup", autoStartup);
             return this;
         }
         /**
-         * Backoff in millis on consumer pool reconnection attempts.
+         * Allows for bridging the consumer to the Camel routing Error Handler,
+         * which mean any exceptions occurred while the consumer is trying to
+         * pickup incoming messages, or the likes, will now be processed as a
+         * message and handled by the routing Error Handler. By default the
+         * consumer will use the org.apache.camel.spi.ExceptionHandler to deal
+         * with exceptions, that will be logged at WARN or ERROR level and
+         * ignored.
          * 
-         * The option will be converted to a &lt;code&gt;long&lt;/code&gt; type.
+         * The option is a: &lt;code&gt;boolean&lt;/code&gt; type.
          * 
-         * Default: 5000
+         * Default: false
          * Group: consumer
          * 
-         * @param reconnectBackOff the value to set
+         * @param bridgeErrorHandler the value to set
          * @return the dsl builder
          */
-        default SjmsEndpointConsumerBuilder reconnectBackOff(
-                String reconnectBackOff) {
-            doSetProperty("reconnectBackOff", reconnectBackOff);
+        default SjmsEndpointConsumerBuilder bridgeErrorHandler(
+                boolean bridgeErrorHandler) {
+            doSetProperty("bridgeErrorHandler", bridgeErrorHandler);
             return this;
         }
         /**
-         * Try to apply reconnection logic on consumer pool.
+         * Allows for bridging the consumer to the Camel routing Error Handler,
+         * which mean any exceptions occurred while the consumer is trying to
+         * pickup incoming messages, or the likes, will now be processed as a
+         * message and handled by the routing Error Handler. By default the
+         * consumer will use the org.apache.camel.spi.ExceptionHandler to deal
+         * with exceptions, that will be logged at WARN or ERROR level and
+         * ignored.
          * 
-         * The option is a: &lt;code&gt;boolean&lt;/code&gt; type.
+         * The option will be converted to a &lt;code&gt;boolean&lt;/code&gt;
+         * type.
          * 
-         * Default: true
+         * Default: false
          * Group: consumer
          * 
-         * @param reconnectOnError the value to set
+         * @param bridgeErrorHandler the value to set
          * @return the dsl builder
          */
-        default SjmsEndpointConsumerBuilder reconnectOnError(
-                boolean reconnectOnError) {
-            doSetProperty("reconnectOnError", reconnectOnError);
+        default SjmsEndpointConsumerBuilder bridgeErrorHandler(
+                String bridgeErrorHandler) {
+            doSetProperty("bridgeErrorHandler", bridgeErrorHandler);
             return this;
         }
         /**
-         * Try to apply reconnection logic on consumer pool.
+         * Sets the number of consumer listeners used for this endpoint.
          * 
-         * The option will be converted to a &lt;code&gt;boolean&lt;/code&gt;
-         * type.
+         * The option is a: &lt;code&gt;int&lt;/code&gt; type.
          * 
-         * Default: true
+         * Default: 1
          * Group: consumer
          * 
-         * @param reconnectOnError the value to set
+         * @param consumerCount the value to set
          * @return the dsl builder
          */
-        default SjmsEndpointConsumerBuilder reconnectOnError(
-                String reconnectOnError) {
-            doSetProperty("reconnectOnError", reconnectOnError);
+        default SjmsEndpointConsumerBuilder consumerCount(int consumerCount) {
+            doSetProperty("consumerCount", consumerCount);
             return this;
         }
         /**
-         * Specifies whether to use persistent delivery by default for replies.
+         * Sets the number of consumer listeners used for this endpoint.
          * 
-         * The option is a: &lt;code&gt;boolean&lt;/code&gt; type.
+         * The option will be converted to a &lt;code&gt;int&lt;/code&gt; type.
          * 
-         * Default: true
+         * Default: 1
          * Group: consumer
          * 
-         * @param replyToDeliveryPersistent the value to set
+         * @param consumerCount the value to set
          * @return the dsl builder
          */
-        default SjmsEndpointConsumerBuilder replyToDeliveryPersistent(
-                boolean replyToDeliveryPersistent) {
-            doSetProperty("replyToDeliveryPersistent", replyToDeliveryPersistent);
+        default SjmsEndpointConsumerBuilder consumerCount(String consumerCount) {
+            doSetProperty("consumerCount", consumerCount);
             return this;
         }
         /**
-         * Specifies whether to use persistent delivery by default for replies.
+         * Sets the durable subscription Id required for durable topics.
          * 
-         * The option will be converted to a &lt;code&gt;boolean&lt;/code&gt;
-         * type.
+         * The option is a: &lt;code&gt;java.lang.String&lt;/code&gt; type.
          * 
-         * Default: true
          * Group: consumer
          * 
-         * @param replyToDeliveryPersistent the value to set
+         * @param durableSubscriptionId the value to set
          * @return the dsl builder
          */
-        default SjmsEndpointConsumerBuilder replyToDeliveryPersistent(
-                String replyToDeliveryPersistent) {
-            doSetProperty("replyToDeliveryPersistent", replyToDeliveryPersistent);
+        default SjmsEndpointConsumerBuilder durableSubscriptionId(
+                String durableSubscriptionId) {
+            doSetProperty("durableSubscriptionId", durableSubscriptionId);
             return this;
         }
         /**
-         * Sets whether synchronous processing should be strictly used or Camel
-         * is allowed to use asynchronous processing (if supported).
+         * Specifies whether to use persistent delivery by default for replies.
          * 
          * The option is a: &lt;code&gt;boolean&lt;/code&gt; type.
          * 
          * Default: true
          * Group: consumer
          * 
-         * @param synchronous the value to set
+         * @param replyToDeliveryPersistent the value to set
          * @return the dsl builder
          */
-        default SjmsEndpointConsumerBuilder synchronous(boolean synchronous) {
-            doSetProperty("synchronous", synchronous);
+        default SjmsEndpointConsumerBuilder replyToDeliveryPersistent(
+                boolean replyToDeliveryPersistent) {
+            doSetProperty("replyToDeliveryPersistent", replyToDeliveryPersistent);
             return this;
         }
         /**
-         * Sets whether synchronous processing should be strictly used or Camel
-         * is allowed to use asynchronous processing (if supported).
+         * Specifies whether to use persistent delivery by default for replies.
          * 
          * The option will be converted to a &lt;code&gt;boolean&lt;/code&gt;
          * type.
@@ -355,11 +377,12 @@ public interface SjmsEndpointBuilderFactory {
          * Default: true
          * Group: consumer
          * 
-         * @param synchronous the value to set
+         * @param replyToDeliveryPersistent the value to set
          * @return the dsl builder
          */
-        default SjmsEndpointConsumerBuilder synchronous(String synchronous) {
-            doSetProperty("synchronous", synchronous);
+        default SjmsEndpointConsumerBuilder replyToDeliveryPersistent(
+                String replyToDeliveryPersistent) {
+            doSetProperty("replyToDeliveryPersistent", replyToDeliveryPersistent);
             return this;
         }
         /**
@@ -508,6 +531,68 @@ public interface SjmsEndpointBuilderFactory {
             return (SjmsEndpointConsumerBuilder) this;
         }
         /**
+         * Enables eager loading of JMS properties and payload as soon as a
+         * message is loaded which generally is inefficient as the JMS
+         * properties may not be required but sometimes can catch early any
+         * issues with the underlying JMS provider and the use of JMS
+         * properties. See also the option eagerPoisonBody.
+         * 
+         * The option is a: &lt;code&gt;boolean&lt;/code&gt; type.
+         * 
+         * Default: false
+         * Group: consumer (advanced)
+         * 
+         * @param eagerLoadingOfProperties the value to set
+         * @return the dsl builder
+         */
+        default AdvancedSjmsEndpointConsumerBuilder eagerLoadingOfProperties(
+                boolean eagerLoadingOfProperties) {
+            doSetProperty("eagerLoadingOfProperties", eagerLoadingOfProperties);
+            return this;
+        }
+        /**
+         * Enables eager loading of JMS properties and payload as soon as a
+         * message is loaded which generally is inefficient as the JMS
+         * properties may not be required but sometimes can catch early any
+         * issues with the underlying JMS provider and the use of JMS
+         * properties. See also the option eagerPoisonBody.
+         * 
+         * The option will be converted to a &lt;code&gt;boolean&lt;/code&gt;
+         * type.
+         * 
+         * Default: false
+         * Group: consumer (advanced)
+         * 
+         * @param eagerLoadingOfProperties the value to set
+         * @return the dsl builder
+         */
+        default AdvancedSjmsEndpointConsumerBuilder eagerLoadingOfProperties(
+                String eagerLoadingOfProperties) {
+            doSetProperty("eagerLoadingOfProperties", eagerLoadingOfProperties);
+            return this;
+        }
+        /**
+         * If eagerLoadingOfProperties is enabled and the JMS message payload
+         * (JMS body or JMS properties) is poison (cannot be read/mapped), then
+         * set this text as the message body instead so the message can be
+         * processed (the cause of the poison are already stored as exception on
+         * the Exchange). This can be turned off by setting
+         * eagerPoisonBody=false. See also the option eagerLoadingOfProperties.
+         * 
+         * The option is a: &lt;code&gt;java.lang.String&lt;/code&gt; type.
+         * 
+         * Default: Poison JMS message due to ${exception.message}
+         * Group: consumer (advanced)
+         * 
+         * @param eagerPoisonBody the value to set
+         * @return the dsl builder
+         */
+        default AdvancedSjmsEndpointConsumerBuilder eagerPoisonBody(
+                String eagerPoisonBody) {
+            doSetProperty("eagerPoisonBody", eagerPoisonBody);
+            return this;
+        }
+        /**
          * To let the consumer use a custom ExceptionHandler. Notice if the
          * option bridgeErrorHandler is enabled then this option is not in use.
          * By default the consumer will deal with exceptions, that will be
@@ -593,6 +678,45 @@ public interface SjmsEndpointBuilderFactory {
             return this;
         }
         /**
+         * Whether a JMS consumer is allowed to send a reply message to the same
+         * destination that the consumer is using to consume from. This prevents
+         * an endless loop by consuming and sending back the same message to
+         * itself.
+         * 
+         * The option is a: &lt;code&gt;boolean&lt;/code&gt; type.
+         * 
+         * Default: false
+         * Group: consumer (advanced)
+         * 
+         * @param replyToSameDestinationAllowed the value to set
+         * @return the dsl builder
+         */
+        default AdvancedSjmsEndpointConsumerBuilder replyToSameDestinationAllowed(
+                boolean replyToSameDestinationAllowed) {
+            doSetProperty("replyToSameDestinationAllowed", replyToSameDestinationAllowed);
+            return this;
+        }
+        /**
+         * Whether a JMS consumer is allowed to send a reply message to the same
+         * destination that the consumer is using to consume from. This prevents
+         * an endless loop by consuming and sending back the same message to
+         * itself.
+         * 
+         * The option will be converted to a &lt;code&gt;boolean&lt;/code&gt;
+         * type.
+         * 
+         * Default: false
+         * Group: consumer (advanced)
+         * 
+         * @param replyToSameDestinationAllowed the value to set
+         * @return the dsl builder
+         */
+        default AdvancedSjmsEndpointConsumerBuilder replyToSameDestinationAllowed(
+                String replyToSameDestinationAllowed) {
+            doSetProperty("replyToSameDestinationAllowed", replyToSameDestinationAllowed);
+            return this;
+        }
+        /**
          * Whether to startup the consumer message listener asynchronously, when
          * starting a route. For example if a JmsConsumer cannot get a
          * connection to a remote JMS broker, then it may block while retrying
@@ -1042,6 +1166,136 @@ public interface SjmsEndpointBuilderFactory {
             return this;
         }
         /**
+         * Specifies the interval between recovery attempts, i.e. when a
+         * connection is being refreshed, in milliseconds. The default is 5000
+         * ms, that is, 5 seconds.
+         * 
+         * The option is a: &lt;code&gt;long&lt;/code&gt; type.
+         * 
+         * Default: 5000
+         * Group: advanced
+         * 
+         * @param recoveryInterval the value to set
+         * @return the dsl builder
+         */
+        default AdvancedSjmsEndpointConsumerBuilder recoveryInterval(
+                long recoveryInterval) {
+            doSetProperty("recoveryInterval", recoveryInterval);
+            return this;
+        }
+        /**
+         * Specifies the interval between recovery attempts, i.e. when a
+         * connection is being refreshed, in milliseconds. The default is 5000
+         * ms, that is, 5 seconds.
+         * 
+         * The option will be converted to a &lt;code&gt;long&lt;/code&gt; type.
+         * 
+         * Default: 5000
+         * Group: advanced
+         * 
+         * @param recoveryInterval the value to set
+         * @return the dsl builder
+         */
+        default AdvancedSjmsEndpointConsumerBuilder recoveryInterval(
+                String recoveryInterval) {
+            doSetProperty("recoveryInterval", recoveryInterval);
+            return this;
+        }
+        /**
+         * Sets whether synchronous processing should be strictly used, or Camel
+         * is allowed to use asynchronous processing (if supported).
+         * 
+         * The option is a: &lt;code&gt;boolean&lt;/code&gt; type.
+         * 
+         * Default: false
+         * Group: advanced
+         * 
+         * @param synchronous the value to set
+         * @return the dsl builder
+         */
+        default AdvancedSjmsEndpointConsumerBuilder synchronous(
+                boolean synchronous) {
+            doSetProperty("synchronous", synchronous);
+            return this;
+        }
+        /**
+         * Sets whether synchronous processing should be strictly used, or Camel
+         * is allowed to use asynchronous processing (if supported).
+         * 
+         * The option will be converted to a &lt;code&gt;boolean&lt;/code&gt;
+         * type.
+         * 
+         * Default: false
+         * Group: advanced
+         * 
+         * @param synchronous the value to set
+         * @return the dsl builder
+         */
+        default AdvancedSjmsEndpointConsumerBuilder synchronous(
+                String synchronous) {
+            doSetProperty("synchronous", synchronous);
+            return this;
+        }
+        /**
+         * If enabled and you are using Request Reply messaging (InOut) and an
+         * Exchange failed on the consumer side, then the caused Exception will
+         * be send back in response as a javax.jms.ObjectMessage. If the client
+         * is Camel, the returned Exception is rethrown. This allows you to use
+         * Camel JMS as a bridge in your routing - for example, using persistent
+         * queues to enable robust routing. Notice that if you also have
+         * transferExchange enabled, this option takes precedence. The caught
+         * exception is required to be serializable. The original Exception on
+         * the consumer side can be wrapped in an outer exception such as
+         * org.apache.camel.RuntimeCamelException when returned to the producer.
+         * Use this with caution as the data is using Java Object serialization
+         * and requires the received to be able to deserialize the data at Class
+         * level, which forces a strong coupling between the producers and
+         * consumer!.
+         * 
+         * The option is a: &lt;code&gt;boolean&lt;/code&gt; type.
+         * 
+         * Default: false
+         * Group: advanced
+         * 
+         * @param transferException the value to set
+         * @return the dsl builder
+         */
+        default AdvancedSjmsEndpointConsumerBuilder transferException(
+                boolean transferException) {
+            doSetProperty("transferException", transferException);
+            return this;
+        }
+        /**
+         * If enabled and you are using Request Reply messaging (InOut) and an
+         * Exchange failed on the consumer side, then the caused Exception will
+         * be send back in response as a javax.jms.ObjectMessage. If the client
+         * is Camel, the returned Exception is rethrown. This allows you to use
+         * Camel JMS as a bridge in your routing - for example, using persistent
+         * queues to enable robust routing. Notice that if you also have
+         * transferExchange enabled, this option takes precedence. The caught
+         * exception is required to be serializable. The original Exception on
+         * the consumer side can be wrapped in an outer exception such as
+         * org.apache.camel.RuntimeCamelException when returned to the producer.
+         * Use this with caution as the data is using Java Object serialization
+         * and requires the received to be able to deserialize the data at Class
+         * level, which forces a strong coupling between the producers and
+         * consumer!.
+         * 
+         * The option will be converted to a &lt;code&gt;boolean&lt;/code&gt;
+         * type.
+         * 
+         * Default: false
+         * Group: advanced
+         * 
+         * @param transferException the value to set
+         * @return the dsl builder
+         */
+        default AdvancedSjmsEndpointConsumerBuilder transferException(
+                String transferException) {
+            doSetProperty("transferException", transferException);
+            return this;
+        }
+        /**
          * Specifies whether to share JMS session with other SJMS endpoints.
          * Turn this off if your route is accessing to multiple JMS providers.
          * If you need transaction against multiple JMS providers, use jms
@@ -1189,6 +1443,47 @@ public interface SjmsEndpointBuilderFactory {
             return this;
         }
         /**
+         * Specifies whether to test the connection on startup. This ensures
+         * that when Camel starts that all the JMS consumers have a valid
+         * connection to the JMS broker. If a connection cannot be granted then
+         * Camel throws an exception on startup. This ensures that Camel is not
+         * started with failed connections. The JMS producers is tested as well.
+         * 
+         * The option is a: &lt;code&gt;boolean&lt;/code&gt; type.
+         * 
+         * Default: false
+         * Group: common
+         * 
+         * @param testConnectionOnStartup the value to set
+         * @return the dsl builder
+         */
+        default SjmsEndpointProducerBuilder testConnectionOnStartup(
+                boolean testConnectionOnStartup) {
+            doSetProperty("testConnectionOnStartup", testConnectionOnStartup);
+            return this;
+        }
+        /**
+         * Specifies whether to test the connection on startup. This ensures
+         * that when Camel starts that all the JMS consumers have a valid
+         * connection to the JMS broker. If a connection cannot be granted then
+         * Camel throws an exception on startup. This ensures that Camel is not
+         * started with failed connections. The JMS producers is tested as well.
+         * 
+         * The option will be converted to a &lt;code&gt;boolean&lt;/code&gt;
+         * type.
+         * 
+         * Default: false
+         * Group: common
+         * 
+         * @param testConnectionOnStartup the value to set
+         * @return the dsl builder
+         */
+        default SjmsEndpointProducerBuilder testConnectionOnStartup(
+                String testConnectionOnStartup) {
+            doSetProperty("testConnectionOnStartup", testConnectionOnStartup);
+            return this;
+        }
+        /**
          * Specifies the delivery mode to be used. Possible values are those
          * defined by javax.jms.DeliveryMode. NON_PERSISTENT = 1 and PERSISTENT
          * = 2.
@@ -1629,6 +1924,55 @@ public interface SjmsEndpointBuilderFactory {
             return this;
         }
         /**
+         * Use this option to force disabling time to live. For example when you
+         * do request/reply over JMS, then Camel will by default use the
+         * requestTimeout value as time to live on the message being sent. The
+         * problem is that the sender and receiver systems have to have their
+         * clocks synchronized, so they are in sync. This is not always so easy
+         * to archive. So you can use disableTimeToLive=true to not set a time
+         * to live value on the sent message. Then the message will not expire
+         * on the receiver system. See below in section About time to live for
+         * more details.
+         * 
+         * The option is a: &lt;code&gt;boolean&lt;/code&gt; type.
+         * 
+         * Default: false
+         * Group: producer (advanced)
+         * 
+         * @param disableTimeToLive the value to set
+         * @return the dsl builder
+         */
+        default AdvancedSjmsEndpointProducerBuilder disableTimeToLive(
+                boolean disableTimeToLive) {
+            doSetProperty("disableTimeToLive", disableTimeToLive);
+            return this;
+        }
+        /**
+         * Use this option to force disabling time to live. For example when you
+         * do request/reply over JMS, then Camel will by default use the
+         * requestTimeout value as time to live on the message being sent. The
+         * problem is that the sender and receiver systems have to have their
+         * clocks synchronized, so they are in sync. This is not always so easy
+         * to archive. So you can use disableTimeToLive=true to not set a time
+         * to live value on the sent message. Then the message will not expire
+         * on the receiver system. See below in section About time to live for
+         * more details.
+         * 
+         * The option will be converted to a &lt;code&gt;boolean&lt;/code&gt;
+         * type.
+         * 
+         * Default: false
+         * Group: producer (advanced)
+         * 
+         * @param disableTimeToLive the value to set
+         * @return the dsl builder
+         */
+        default AdvancedSjmsEndpointProducerBuilder disableTimeToLive(
+                String disableTimeToLive) {
+            doSetProperty("disableTimeToLive", disableTimeToLive);
+            return this;
+        }
+        /**
          * Whether to startup the consumer message listener asynchronously, when
          * starting a route. For example if a JmsConsumer cannot get a
          * connection to a remote JMS broker, then it may block while retrying
@@ -2078,6 +2422,136 @@ public interface SjmsEndpointBuilderFactory {
             return this;
         }
         /**
+         * Specifies the interval between recovery attempts, i.e. when a
+         * connection is being refreshed, in milliseconds. The default is 5000
+         * ms, that is, 5 seconds.
+         * 
+         * The option is a: &lt;code&gt;long&lt;/code&gt; type.
+         * 
+         * Default: 5000
+         * Group: advanced
+         * 
+         * @param recoveryInterval the value to set
+         * @return the dsl builder
+         */
+        default AdvancedSjmsEndpointProducerBuilder recoveryInterval(
+                long recoveryInterval) {
+            doSetProperty("recoveryInterval", recoveryInterval);
+            return this;
+        }
+        /**
+         * Specifies the interval between recovery attempts, i.e. when a
+         * connection is being refreshed, in milliseconds. The default is 5000
+         * ms, that is, 5 seconds.
+         * 
+         * The option will be converted to a &lt;code&gt;long&lt;/code&gt; type.
+         * 
+         * Default: 5000
+         * Group: advanced
+         * 
+         * @param recoveryInterval the value to set
+         * @return the dsl builder
+         */
+        default AdvancedSjmsEndpointProducerBuilder recoveryInterval(
+                String recoveryInterval) {
+            doSetProperty("recoveryInterval", recoveryInterval);
+            return this;
+        }
+        /**
+         * Sets whether synchronous processing should be strictly used, or Camel
+         * is allowed to use asynchronous processing (if supported).
+         * 
+         * The option is a: &lt;code&gt;boolean&lt;/code&gt; type.
+         * 
+         * Default: false
+         * Group: advanced
+         * 
+         * @param synchronous the value to set
+         * @return the dsl builder
+         */
+        default AdvancedSjmsEndpointProducerBuilder synchronous(
+                boolean synchronous) {
+            doSetProperty("synchronous", synchronous);
+            return this;
+        }
+        /**
+         * Sets whether synchronous processing should be strictly used, or Camel
+         * is allowed to use asynchronous processing (if supported).
+         * 
+         * The option will be converted to a &lt;code&gt;boolean&lt;/code&gt;
+         * type.
+         * 
+         * Default: false
+         * Group: advanced
+         * 
+         * @param synchronous the value to set
+         * @return the dsl builder
+         */
+        default AdvancedSjmsEndpointProducerBuilder synchronous(
+                String synchronous) {
+            doSetProperty("synchronous", synchronous);
+            return this;
+        }
+        /**
+         * If enabled and you are using Request Reply messaging (InOut) and an
+         * Exchange failed on the consumer side, then the caused Exception will
+         * be send back in response as a javax.jms.ObjectMessage. If the client
+         * is Camel, the returned Exception is rethrown. This allows you to use
+         * Camel JMS as a bridge in your routing - for example, using persistent
+         * queues to enable robust routing. Notice that if you also have
+         * transferExchange enabled, this option takes precedence. The caught
+         * exception is required to be serializable. The original Exception on
+         * the consumer side can be wrapped in an outer exception such as
+         * org.apache.camel.RuntimeCamelException when returned to the producer.
+         * Use this with caution as the data is using Java Object serialization
+         * and requires the received to be able to deserialize the data at Class
+         * level, which forces a strong coupling between the producers and
+         * consumer!.
+         * 
+         * The option is a: &lt;code&gt;boolean&lt;/code&gt; type.
+         * 
+         * Default: false
+         * Group: advanced
+         * 
+         * @param transferException the value to set
+         * @return the dsl builder
+         */
+        default AdvancedSjmsEndpointProducerBuilder transferException(
+                boolean transferException) {
+            doSetProperty("transferException", transferException);
+            return this;
+        }
+        /**
+         * If enabled and you are using Request Reply messaging (InOut) and an
+         * Exchange failed on the consumer side, then the caused Exception will
+         * be send back in response as a javax.jms.ObjectMessage. If the client
+         * is Camel, the returned Exception is rethrown. This allows you to use
+         * Camel JMS as a bridge in your routing - for example, using persistent
+         * queues to enable robust routing. Notice that if you also have
+         * transferExchange enabled, this option takes precedence. The caught
+         * exception is required to be serializable. The original Exception on
+         * the consumer side can be wrapped in an outer exception such as
+         * org.apache.camel.RuntimeCamelException when returned to the producer.
+         * Use this with caution as the data is using Java Object serialization
+         * and requires the received to be able to deserialize the data at Class
+         * level, which forces a strong coupling between the producers and
+         * consumer!.
+         * 
+         * The option will be converted to a &lt;code&gt;boolean&lt;/code&gt;
+         * type.
+         * 
+         * Default: false
+         * Group: advanced
+         * 
+         * @param transferException the value to set
+         * @return the dsl builder
+         */
+        default AdvancedSjmsEndpointProducerBuilder transferException(
+                String transferException) {
+            doSetProperty("transferException", transferException);
+            return this;
+        }
+        /**
          * Specifies whether to share JMS session with other SJMS endpoints.
          * Turn this off if your route is accessing to multiple JMS providers.
          * If you need transaction against multiple JMS providers, use jms
@@ -2225,6 +2699,47 @@ public interface SjmsEndpointBuilderFactory {
             return this;
         }
         /**
+         * Specifies whether to test the connection on startup. This ensures
+         * that when Camel starts that all the JMS consumers have a valid
+         * connection to the JMS broker. If a connection cannot be granted then
+         * Camel throws an exception on startup. This ensures that Camel is not
+         * started with failed connections. The JMS producers is tested as well.
+         * 
+         * The option is a: &lt;code&gt;boolean&lt;/code&gt; type.
+         * 
+         * Default: false
+         * Group: common
+         * 
+         * @param testConnectionOnStartup the value to set
+         * @return the dsl builder
+         */
+        default SjmsEndpointBuilder testConnectionOnStartup(
+                boolean testConnectionOnStartup) {
+            doSetProperty("testConnectionOnStartup", testConnectionOnStartup);
+            return this;
+        }
+        /**
+         * Specifies whether to test the connection on startup. This ensures
+         * that when Camel starts that all the JMS consumers have a valid
+         * connection to the JMS broker. If a connection cannot be granted then
+         * Camel throws an exception on startup. This ensures that Camel is not
+         * started with failed connections. The JMS producers is tested as well.
+         * 
+         * The option will be converted to a &lt;code&gt;boolean&lt;/code&gt;
+         * type.
+         * 
+         * Default: false
+         * Group: common
+         * 
+         * @param testConnectionOnStartup the value to set
+         * @return the dsl builder
+         */
+        default SjmsEndpointBuilder testConnectionOnStartup(
+                String testConnectionOnStartup) {
+            doSetProperty("testConnectionOnStartup", testConnectionOnStartup);
+            return this;
+        }
+        /**
          * Specifies whether to use transacted mode.
          * 
          * The option is a: &lt;code&gt;boolean&lt;/code&gt; type.
@@ -2747,6 +3262,134 @@ public interface SjmsEndpointBuilderFactory {
             return this;
         }
         /**
+         * Specifies the interval between recovery attempts, i.e. when a
+         * connection is being refreshed, in milliseconds. The default is 5000
+         * ms, that is, 5 seconds.
+         * 
+         * The option is a: &lt;code&gt;long&lt;/code&gt; type.
+         * 
+         * Default: 5000
+         * Group: advanced
+         * 
+         * @param recoveryInterval the value to set
+         * @return the dsl builder
+         */
+        default AdvancedSjmsEndpointBuilder recoveryInterval(
+                long recoveryInterval) {
+            doSetProperty("recoveryInterval", recoveryInterval);
+            return this;
+        }
+        /**
+         * Specifies the interval between recovery attempts, i.e. when a
+         * connection is being refreshed, in milliseconds. The default is 5000
+         * ms, that is, 5 seconds.
+         * 
+         * The option will be converted to a &lt;code&gt;long&lt;/code&gt; type.
+         * 
+         * Default: 5000
+         * Group: advanced
+         * 
+         * @param recoveryInterval the value to set
+         * @return the dsl builder
+         */
+        default AdvancedSjmsEndpointBuilder recoveryInterval(
+                String recoveryInterval) {
+            doSetProperty("recoveryInterval", recoveryInterval);
+            return this;
+        }
+        /**
+         * Sets whether synchronous processing should be strictly used, or Camel
+         * is allowed to use asynchronous processing (if supported).
+         * 
+         * The option is a: &lt;code&gt;boolean&lt;/code&gt; type.
+         * 
+         * Default: false
+         * Group: advanced
+         * 
+         * @param synchronous the value to set
+         * @return the dsl builder
+         */
+        default AdvancedSjmsEndpointBuilder synchronous(boolean synchronous) {
+            doSetProperty("synchronous", synchronous);
+            return this;
+        }
+        /**
+         * Sets whether synchronous processing should be strictly used, or Camel
+         * is allowed to use asynchronous processing (if supported).
+         * 
+         * The option will be converted to a &lt;code&gt;boolean&lt;/code&gt;
+         * type.
+         * 
+         * Default: false
+         * Group: advanced
+         * 
+         * @param synchronous the value to set
+         * @return the dsl builder
+         */
+        default AdvancedSjmsEndpointBuilder synchronous(String synchronous) {
+            doSetProperty("synchronous", synchronous);
+            return this;
+        }
+        /**
+         * If enabled and you are using Request Reply messaging (InOut) and an
+         * Exchange failed on the consumer side, then the caused Exception will
+         * be send back in response as a javax.jms.ObjectMessage. If the client
+         * is Camel, the returned Exception is rethrown. This allows you to use
+         * Camel JMS as a bridge in your routing - for example, using persistent
+         * queues to enable robust routing. Notice that if you also have
+         * transferExchange enabled, this option takes precedence. The caught
+         * exception is required to be serializable. The original Exception on
+         * the consumer side can be wrapped in an outer exception such as
+         * org.apache.camel.RuntimeCamelException when returned to the producer.
+         * Use this with caution as the data is using Java Object serialization
+         * and requires the received to be able to deserialize the data at Class
+         * level, which forces a strong coupling between the producers and
+         * consumer!.
+         * 
+         * The option is a: &lt;code&gt;boolean&lt;/code&gt; type.
+         * 
+         * Default: false
+         * Group: advanced
+         * 
+         * @param transferException the value to set
+         * @return the dsl builder
+         */
+        default AdvancedSjmsEndpointBuilder transferException(
+                boolean transferException) {
+            doSetProperty("transferException", transferException);
+            return this;
+        }
+        /**
+         * If enabled and you are using Request Reply messaging (InOut) and an
+         * Exchange failed on the consumer side, then the caused Exception will
+         * be send back in response as a javax.jms.ObjectMessage. If the client
+         * is Camel, the returned Exception is rethrown. This allows you to use
+         * Camel JMS as a bridge in your routing - for example, using persistent
+         * queues to enable robust routing. Notice that if you also have
+         * transferExchange enabled, this option takes precedence. The caught
+         * exception is required to be serializable. The original Exception on
+         * the consumer side can be wrapped in an outer exception such as
+         * org.apache.camel.RuntimeCamelException when returned to the producer.
+         * Use this with caution as the data is using Java Object serialization
+         * and requires the received to be able to deserialize the data at Class
+         * level, which forces a strong coupling between the producers and
+         * consumer!.
+         * 
+         * The option will be converted to a &lt;code&gt;boolean&lt;/code&gt;
+         * type.
+         * 
+         * Default: false
+         * Group: advanced
+         * 
+         * @param transferException the value to set
+         * @return the dsl builder
+         */
+        default AdvancedSjmsEndpointBuilder transferException(
+                String transferException) {
+            doSetProperty("transferException", transferException);
+            return this;
+        }
+        /**
          * Specifies whether to share JMS session with other SJMS endpoints.
          * Turn this off if your route is accessing to multiple JMS providers.
          * If you need transaction against multiple JMS providers, use jms
diff --git a/docs/components/modules/ROOT/pages/sjms-component.adoc b/docs/components/modules/ROOT/pages/sjms-component.adoc
index e599043..ca85f3a 100644
--- a/docs/components/modules/ROOT/pages/sjms-component.adoc
+++ b/docs/components/modules/ROOT/pages/sjms-component.adoc
@@ -92,7 +92,7 @@ You append query options to the URI using the following format,
 == Component Options and Configurations
 
 // component options: START
-The Simple JMS component supports 18 options, which are listed below.
+The Simple JMS component supports 17 options, which are listed below.
 
 
 
@@ -102,8 +102,6 @@ The Simple JMS component supports 18 options, which are listed below.
 | *connectionCount* (common) | The maximum number of connections available to endpoints started under this component | 1 | Integer
 | *connectionFactory* (common) | *Autowired* A ConnectionFactory is required to enable the SjmsComponent. It can be set directly or set set as part of a ConnectionResource. |  | ConnectionFactory
 | *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. | false | boolean
-| *reconnectBackOff* (consumer) | Backoff in millis on consumer pool reconnection attempts | 5000 | long
-| *reconnectOnError* (consumer) | Try to apply reconnection logic on consumer pool | true | boolean
 | *lazyStartProducer* (producer) | Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is processed then creating and [...]
 | *autowiredEnabled* (advanced) | Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc. | true | boolean
 | *connectionClientId* (advanced) | The client ID to use when creating javax.jms.Connection when using the default org.apache.camel.component.sjms.jms.ConnectionFactoryResource. |  | String
@@ -113,6 +111,7 @@ The Simple JMS component supports 18 options, which are listed below.
 | *destinationCreationStrategy* (advanced) | To use a custom DestinationCreationStrategy. |  | DestinationCreationStrategy
 | *jmsKeyFormatStrategy* (advanced) | Pluggable strategy for encoding and decoding JMS keys so they can be compliant with the JMS specification. Camel provides one implementation out of the box: default. The default strategy will safely marshal dots and hyphens (. and -). Can be used for JMS brokers which do not care whether JMS header keys contain illegal characters. You can provide your own implementation of the org.apache.camel.component.jms.JmsKeyFormatStrategy and refer to it using  [...]
 | *messageCreatedStrategy* (advanced) | To use the given MessageCreatedStrategy which are invoked when Camel creates new instances of javax.jms.Message objects when Camel is sending a JMS message. |  | MessageCreatedStrategy
+| *recoveryInterval* (advanced) | Specifies the interval between recovery attempts, i.e. when a connection is being refreshed, in milliseconds. The default is 5000 ms, that is, 5 seconds. | 5000 | long
 | *headerFilterStrategy* (filter) | To use a custom org.apache.camel.spi.HeaderFilterStrategy to filter header to and from Camel message. |  | HeaderFilterStrategy
 | *connectionPassword* (security) | The password to use when creating javax.jms.Connection when using the default org.apache.camel.component.sjms.jms.ConnectionFactoryResource. |  | String
 | *connectionUsername* (security) | The username to use when creating javax.jms.Connection when using the default org.apache.camel.component.sjms.jms.ConnectionFactoryResource. |  | String
@@ -141,7 +140,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (40 parameters):
+=== Query Parameters (47 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -150,16 +149,19 @@ with the following path and query parameters:
 | *acknowledgementMode* (common) | The JMS acknowledgement name, which is one of: SESSION_TRANSACTED, CLIENT_ACKNOWLEDGE, AUTO_ACKNOWLEDGE, DUPS_OK_ACKNOWLEDGE. There are 4 enums and the value can be one of: SESSION_TRANSACTED, CLIENT_ACKNOWLEDGE, AUTO_ACKNOWLEDGE, DUPS_OK_ACKNOWLEDGE | AUTO_ACKNOWLEDGE | SessionAcknowledgementType
 | *disableReplyTo* (common) | Specifies whether Camel ignores the JMSReplyTo header in messages. If true, Camel does not send a reply back to the destination specified in the JMSReplyTo header. You can use this option if you want Camel to consume from a route and you do not want Camel to automatically send back a reply message because another component in your code handles the reply message. You can also use this option if you want to use Camel as a proxy between different message broker [...]
 | *replyTo* (common) | Provides an explicit ReplyTo destination (overrides any incoming value of Message.getJMSReplyTo() in consumer). |  | String
+| *testConnectionOnStartup* (common) | Specifies whether to test the connection on startup. This ensures that when Camel starts that all the JMS consumers have a valid connection to the JMS broker. If a connection cannot be granted then Camel throws an exception on startup. This ensures that Camel is not started with failed connections. The JMS producers is tested as well. | false | boolean
+| *asyncConsumer* (consumer) | Whether the JmsConsumer processes the Exchange asynchronously. If enabled then the JmsConsumer may pickup the next message from the JMS queue, while the previous message is being processed asynchronously (by the Asynchronous Routing Engine). This means that messages may be processed not 100% strictly in order. If disabled (as default) then the Exchange is fully processed before the JmsConsumer will pickup the next message from the JMS queue. Note if transac [...]
+| *autoStartup* (consumer) | Specifies whether the consumer container should auto-startup. | true | boolean
 | *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. | false | boolean
 | *consumerCount* (consumer) | Sets the number of consumer listeners used for this endpoint. | 1 | int
 | *durableSubscriptionId* (consumer) | Sets the durable subscription Id required for durable topics. |  | String
-| *reconnectBackOff* (consumer) | Backoff in millis on consumer pool reconnection attempts | 5000 | long
-| *reconnectOnError* (consumer) | Try to apply reconnection logic on consumer pool | true | boolean
 | *replyToDeliveryPersistent* (consumer) | Specifies whether to use persistent delivery by default for replies. | true | boolean
-| *synchronous* (consumer) | Sets whether synchronous processing should be strictly used or Camel is allowed to use asynchronous processing (if supported). | true | boolean
+| *eagerLoadingOfProperties* (consumer) | Enables eager loading of JMS properties and payload as soon as a message is loaded which generally is inefficient as the JMS properties may not be required but sometimes can catch early any issues with the underlying JMS provider and the use of JMS properties. See also the option eagerPoisonBody. | false | boolean
+| *eagerPoisonBody* (consumer) | If eagerLoadingOfProperties is enabled and the JMS message payload (JMS body or JMS properties) is poison (cannot be read/mapped), then set this text as the message body instead so the message can be processed (the cause of the poison are already stored as exception on the Exchange). This can be turned off by setting eagerPoisonBody=false. See also the option eagerLoadingOfProperties. | Poison JMS message due to ${exception.message} | String
 | *exceptionHandler* (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored. |  | ExceptionHandler
 | *exchangePattern* (consumer) | Sets the exchange pattern when the consumer creates an exchange. There are 3 enums and the value can be one of: InOnly, InOut, InOptionalOut |  | ExchangePattern
 | *messageSelector* (consumer) | Sets the JMS Message selector syntax. |  | String
+| *replyToSameDestinationAllowed* (consumer) | Whether a JMS consumer is allowed to send a reply message to the same destination that the consumer is using to consume from. This prevents an endless loop by consuming and sending back the same message to itself. | false | boolean
 | *deliveryMode* (producer) | Specifies the delivery mode to be used. Possible values are those defined by javax.jms.DeliveryMode. NON_PERSISTENT = 1 and PERSISTENT = 2. There are 2 enums and the value can be one of: 1, 2 |  | Integer
 | *deliveryPersistent* (producer) | Specifies whether persistent delivery is used by default. | true | boolean
 | *explicitQosEnabled* (producer) | Set if the deliveryMode, priority or timeToLive qualities of service should be used when sending messages. This option is based on Spring's JmsTemplate. The deliveryMode, priority and timeToLive options are applied to the current endpoint. This contrasts with the preserveMessageQos option, which operates at message granularity, reading QoS properties exclusively from the Camel In message headers. | false | Boolean
@@ -170,6 +172,7 @@ with the following path and query parameters:
 | *requestTimeout* (producer) | The timeout for waiting for a reply when using the InOut Exchange Pattern (in milliseconds). The default is 20 seconds. You can include the header CamelJmsRequestTimeout to override this endpoint configured timeout value, and thus have per message individual timeout values. See also the requestTimeoutCheckerInterval option. | 20000 | long
 | *timeToLive* (producer) | When sending messages, specifies the time-to-live of the message (in milliseconds). | -1 | long
 | *allowNullBody* (producer) | Whether to allow sending messages with no body. If this option is false and the message body is null, then an JMSException is thrown. | true | boolean
+| *disableTimeToLive* (producer) | Use this option to force disabling time to live. For example when you do request/reply over JMS, then Camel will by default use the requestTimeout value as time to live on the message being sent. The problem is that the sender and receiver systems have to have their clocks synchronized, so they are in sync. This is not always so easy to archive. So you can use disableTimeToLive=true to not set a time to live value on the sent message. Then the message w [...]
 | *asyncStartListener* (advanced) | Whether to startup the consumer message listener asynchronously, when starting a route. For example if a JmsConsumer cannot get a connection to a remote JMS broker, then it may block while retrying and/or failover. This will cause Camel to block while starting routes. By setting this option to true, you will let routes startup, while the JmsConsumer connects to the JMS broker using a dedicated thread in asynchronous mode. If this option is used, then b [...]
 | *asyncStopListener* (advanced) | Whether to stop the consumer message listener asynchronously, when stopping a route. | false | boolean
 | *connectionCount* (advanced) | *Deprecated* The maximum number of connections available to this endpoint |  | Integer
@@ -182,6 +185,9 @@ with the following path and query parameters:
 | *jmsKeyFormatStrategy* (advanced) | Pluggable strategy for encoding and decoding JMS keys so they can be compliant with the JMS specification. Camel provides two implementations out of the box: default and passthrough. The default strategy will safely marshal dots and hyphens (. and -). The passthrough strategy leaves the key as is. Can be used for JMS brokers which do not care whether JMS header keys contain illegal characters. You can provide your own implementation of the org.apache [...]
 | *mapJmsMessage* (advanced) | Specifies whether Camel should auto map the received JMS message to a suited payload type, such as javax.jms.TextMessage to a String etc. See section about how mapping works below for more details. | true | boolean
 | *messageCreatedStrategy* (advanced) | To use the given MessageCreatedStrategy which are invoked when Camel creates new instances of javax.jms.Message objects when Camel is sending a JMS message. |  | MessageCreatedStrategy
+| *recoveryInterval* (advanced) | Specifies the interval between recovery attempts, i.e. when a connection is being refreshed, in milliseconds. The default is 5000 ms, that is, 5 seconds. | 5000 | long
+| *synchronous* (advanced) | Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported). | false | boolean
+| *transferException* (advanced) | If enabled and you are using Request Reply messaging (InOut) and an Exchange failed on the consumer side, then the caused Exception will be send back in response as a javax.jms.ObjectMessage. If the client is Camel, the returned Exception is rethrown. This allows you to use Camel JMS as a bridge in your routing - for example, using persistent queues to enable robust routing. Notice that if you also have transferExchange enabled, this option takes preced [...]
 | *errorHandlerLoggingLevel* (logging) | Allows to configure the default errorHandler logging level for logging uncaught exceptions. There are 6 enums and the value can be one of: TRACE, DEBUG, INFO, WARN, ERROR, OFF | WARN | LoggingLevel
 | *errorHandlerLogStackTrace* (logging) | Allows to control whether stacktraces should be logged or not, by the default errorHandler. | true | boolean
 | *transacted* (transaction) | Specifies whether to use transacted mode | false | boolean