You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/12/27 11:25:48 UTC
[camel] branch master updated: CAMEL-15987: camel-sjms - Fix
AsyncCallback in producer being used correctly
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 276e570 CAMEL-15987: camel-sjms - Fix AsyncCallback in producer being used correctly
276e570 is described below
commit 276e570c4aeadb60cbc47457e93d9e2f2f6ba2ea
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sun Dec 27 12:24:21 2020 +0100
CAMEL-15987: camel-sjms - Fix AsyncCallback in producer being used correctly
CAMEL-15987: camel-sjms - Add support for sending to dynamic destination via header (same as in camel-jms)
CAMEL-15987: camel-sjms - Add support for SendDynamicAware for toD
---
.../org/apache/camel/catalog/components/sjms.json | 4 +-
.../org/apache/camel/catalog/components/sjms2.json | 4 +-
.../apache/camel/catalog/docs/sjms-component.adoc | 71 ++++-
.../apache/camel/catalog/docs/sjms2-component.adoc | 2 +-
.../services/org/apache/camel/send-dynamic/sjms | 2 +
.../org/apache/camel/component/sjms/sjms.json | 4 +-
.../camel-sjms/src/main/docs/sjms-component.adoc | 71 ++++-
.../camel/component/sjms/MessageProducerPool.java | 75 +++++
.../apache/camel/component/sjms/SjmsComponent.java | 6 +-
.../apache/camel/component/sjms/SjmsConstants.java | 4 +-
.../apache/camel/component/sjms/SjmsEndpoint.java | 13 +-
.../apache/camel/component/sjms/SjmsProducer.java | 310 ++++++++++-----------
.../camel/component/sjms/SjmsSendDynamicAware.java | 109 ++++++++
.../component/sjms/TransactionCommitStrategy.java | 10 +-
.../component/sjms/batch/SjmsBatchEndpoint.java | 8 +-
.../component/sjms/jms/ConnectionResource.java | 3 +-
.../component/sjms/jms/DestinationNameParser.java | 14 +-
.../component/sjms/producer/InOnlyProducer.java | 34 ++-
.../component/sjms/producer/InOutProducer.java | 184 ++++++++----
.../sjms/jms/DestinationNameParserTest.java | 23 +-
.../sjms/producer/InOnlyQueueProducerTest.java | 32 ++-
.../sjms/producer/InOutQueueProducerTest.java | 16 ++
.../sjms/producer/SjmsToDSendDynamicTest.java | 38 +++
.../SjmsToDSendDynamicTwoDisabledTest.java | 37 +++
.../sjms/producer/SjmsToDSendDynamicTwoTest.java | 38 +++
.../camel/component/sjms/producer/SjmsToDTest.java | 34 +++
.../component/sjms/support/JmsTestSupport.java | 9 -
.../services/org/apache/camel/send-dynamic/sjms2 | 2 +
.../org/apache/camel/component/sjms2/sjms2.json | 4 +-
.../camel-sjms2/src/main/docs/sjms2-component.adoc | 2 +-
.../component/sjms2/Sjms2SendDynamicAware.java} | 10 +-
.../sjms2/producer/Sjms2ToDSendDynamicTest.java | 54 ++++
.../dsl/Sjms2ComponentBuilderFactory.java | 3 +-
.../component/dsl/SjmsComponentBuilderFactory.java | 3 +-
.../endpoint/dsl/Sjms2EndpointBuilderFactory.java | 62 +++++
.../endpoint/dsl/SjmsEndpointBuilderFactory.java | 62 +++++
.../modules/ROOT/pages/sjms-component.adoc | 71 ++++-
.../modules/ROOT/pages/sjms2-component.adoc | 2 +-
38 files changed, 1134 insertions(+), 296 deletions(-)
diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/sjms.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/sjms.json
index 0476b0a..f504c36 100644
--- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/sjms.json
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/sjms.json
@@ -32,7 +32,7 @@
"connectionClientId": { "kind": "property", "displayName": "Connection Client Id", "group": "advanced", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "The client ID to use when creating javax.jms.Connection when using the default org.apache.camel.component.sjms.jms.ConnectionFactoryResource." },
"connectionMaxWait": { "kind": "property", "displayName": "Connection Max Wait", "group": "advanced", "label": "advanced", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 5000, "description": "The max wait time in millis to block and wait on free connection when the pool is exhausted when using the default org.apache.camel.component.sjms.jms.ConnectionFactoryResource." },
"connectionResource": { "kind": "property", "displayName": "Connection Resource", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.sjms.jms.ConnectionResource", "deprecated": false, "autowired": false, "secret": false, "description": "A ConnectionResource is an interface that allows for customization and container control of the ConnectionFactory. See Plugable Connection Resource Management for further details." },
- "connectionTestOnBorrow": { "kind": "property", "displayName": "Connection Test On Borrow", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "When using the default org.apache.camel.component.sjms.jms.ConnectionFactoryResource then should each javax.jms.Connection be tested (calling start) before returned from the pool." },
+ "connectionTestOnBorrow": { "kind": "property", "displayName": "Connection Test On Borrow", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "When using the default org.apache.camel.component.sjms.jms.ConnectionFactoryResource then should each javax.jms.Connection be tested before being used." },
"destinationCreationStrategy": { "kind": "property", "displayName": "Destination Creation Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.sjms.jms.DestinationCreationStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom DestinationCreationStrategy." },
"jmsKeyFormatStrategy": { "kind": "property", "displayName": "Jms Key Format Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.sjms.jms.JmsKeyFormatStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "Pluggable strategy for encoding and decoding JMS keys so they can be compliant with the JMS specification. Camel provides one implementation out of the box: default. The defau [...]
"messageCreatedStrategy": { "kind": "property", "displayName": "Message Created Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.sjms.jms.MessageCreatedStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "To use the given MessageCreatedStrategy which are invoked when Camel creates new instances of javax.jms.Message objects when Camel is sending a JMS message." },
@@ -77,7 +77,7 @@
"messageCreatedStrategy": { "kind": "parameter", "displayName": "Message Created Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.sjms.jms.MessageCreatedStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "To use the given MessageCreatedStrategy which are invoked when Camel creates new instances of javax.jms.Message objects when Camel is sending a JMS message." },
"errorHandlerLoggingLevel": { "kind": "parameter", "displayName": "Error Handler Logging Level", "group": "logging", "label": "consumer,logging", "required": false, "type": "object", "javaType": "org.apache.camel.LoggingLevel", "enum": [ "TRACE", "DEBUG", "INFO", "WARN", "ERROR", "OFF" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "WARN", "description": "Allows to configure the default errorHandler logging level for logging uncaught exceptions." },
"errorHandlerLogStackTrace": { "kind": "parameter", "displayName": "Error Handler Log Stack Trace", "group": "logging", "label": "consumer,logging", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Allows to control whether stacktraces should be logged or not, by the default errorHandler." },
- "transacted": { "kind": "parameter", "displayName": "Transacted", "group": "transaction", "label": "consumer,transaction", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Specifies whether to use transacted mode" },
+ "transacted": { "kind": "parameter", "displayName": "Transacted", "group": "transaction", "label": "transaction", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Specifies whether to use transacted mode" },
"transactionBatchCount": { "kind": "parameter", "displayName": "Transaction Batch Count", "group": "transaction", "label": "consumer,transaction", "required": false, "type": "integer", "javaType": "int", "deprecated": true, "autowired": false, "secret": false, "defaultValue": -1, "description": "If transacted sets the number of messages to process before committing a transaction." },
"transactionBatchTimeout": { "kind": "parameter", "displayName": "Transaction Batch Timeout", "group": "transaction", "label": "consumer,transaction", "required": false, "type": "duration", "javaType": "long", "deprecated": true, "autowired": false, "secret": false, "defaultValue": "5s", "description": "Sets timeout (in millis) for batch transactions, the value should be 1000 or higher." },
"transactionCommitStrategy": { "kind": "parameter", "displayName": "Transaction Commit Strategy", "group": "transaction", "label": "transaction", "required": false, "type": "object", "javaType": "org.apache.camel.component.sjms.TransactionCommitStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the commit strategy." },
diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/sjms2.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/sjms2.json
index ae75e53..09930ab 100644
--- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/sjms2.json
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/sjms2.json
@@ -32,7 +32,7 @@
"connectionClientId": { "kind": "property", "displayName": "Connection Client Id", "group": "advanced", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "The client ID to use when creating javax.jms.Connection when using the default org.apache.camel.component.sjms.jms.ConnectionFactoryResource." },
"connectionMaxWait": { "kind": "property", "displayName": "Connection Max Wait", "group": "advanced", "label": "advanced", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 5000, "description": "The max wait time in millis to block and wait on free connection when the pool is exhausted when using the default org.apache.camel.component.sjms.jms.ConnectionFactoryResource." },
"connectionResource": { "kind": "property", "displayName": "Connection Resource", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.sjms.jms.ConnectionResource", "deprecated": false, "autowired": false, "secret": false, "description": "A ConnectionResource is an interface that allows for customization and container control of the ConnectionFactory. See Plugable Connection Resource Management for further details." },
- "connectionTestOnBorrow": { "kind": "property", "displayName": "Connection Test On Borrow", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "When using the default org.apache.camel.component.sjms.jms.ConnectionFactoryResource then should each javax.jms.Connection be tested (calling start) before returned from the pool." },
+ "connectionTestOnBorrow": { "kind": "property", "displayName": "Connection Test On Borrow", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "When using the default org.apache.camel.component.sjms.jms.ConnectionFactoryResource then should each javax.jms.Connection be tested before being used." },
"destinationCreationStrategy": { "kind": "property", "displayName": "Destination Creation Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.sjms.jms.DestinationCreationStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom DestinationCreationStrategy." },
"jmsKeyFormatStrategy": { "kind": "property", "displayName": "Jms Key Format Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.sjms.jms.JmsKeyFormatStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "Pluggable strategy for encoding and decoding JMS keys so they can be compliant with the JMS specification. Camel provides one implementation out of the box: default. The defau [...]
"messageCreatedStrategy": { "kind": "property", "displayName": "Message Created Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.sjms.jms.MessageCreatedStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "To use the given MessageCreatedStrategy which are invoked when Camel creates new instances of javax.jms.Message objects when Camel is sending a JMS message." },
@@ -80,7 +80,7 @@
"messageCreatedStrategy": { "kind": "parameter", "displayName": "Message Created Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.sjms.jms.MessageCreatedStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "To use the given MessageCreatedStrategy which are invoked when Camel creates new instances of javax.jms.Message objects when Camel is sending a JMS message." },
"errorHandlerLoggingLevel": { "kind": "parameter", "displayName": "Error Handler Logging Level", "group": "logging", "label": "consumer,logging", "required": false, "type": "object", "javaType": "org.apache.camel.LoggingLevel", "enum": [ "TRACE", "DEBUG", "INFO", "WARN", "ERROR", "OFF" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "WARN", "description": "Allows to configure the default errorHandler logging level for logging uncaught exceptions." },
"errorHandlerLogStackTrace": { "kind": "parameter", "displayName": "Error Handler Log Stack Trace", "group": "logging", "label": "consumer,logging", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Allows to control whether stacktraces should be logged or not, by the default errorHandler." },
- "transacted": { "kind": "parameter", "displayName": "Transacted", "group": "transaction", "label": "consumer,transaction", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Specifies whether to use transacted mode" },
+ "transacted": { "kind": "parameter", "displayName": "Transacted", "group": "transaction", "label": "transaction", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Specifies whether to use transacted mode" },
"transactionBatchCount": { "kind": "parameter", "displayName": "Transaction Batch Count", "group": "transaction", "label": "consumer,transaction", "required": false, "type": "integer", "javaType": "int", "deprecated": true, "autowired": false, "secret": false, "defaultValue": -1, "description": "If transacted sets the number of messages to process before committing a transaction." },
"transactionBatchTimeout": { "kind": "parameter", "displayName": "Transaction Batch Timeout", "group": "transaction", "label": "consumer,transaction", "required": false, "type": "duration", "javaType": "long", "deprecated": true, "autowired": false, "secret": false, "defaultValue": "5s", "description": "Sets timeout (in millis) for batch transactions, the value should be 1000 or higher." },
"transactionCommitStrategy": { "kind": "parameter", "displayName": "Transaction Commit Strategy", "group": "transaction", "label": "transaction", "required": false, "type": "object", "javaType": "org.apache.camel.component.sjms.TransactionCommitStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the commit strategy." },
diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/sjms-component.adoc b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/sjms-component.adoc
index 175d43e..931c813 100644
--- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/sjms-component.adoc
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/sjms-component.adoc
@@ -107,7 +107,7 @@ The Simple JMS component supports 19 options, which are listed below.
| *connectionClientId* (advanced) | The client ID to use when creating javax.jms.Connection when using the default org.apache.camel.component.sjms.jms.ConnectionFactoryResource. | | String
| *connectionMaxWait* (advanced) | The max wait time in millis to block and wait on free connection when the pool is exhausted when using the default org.apache.camel.component.sjms.jms.ConnectionFactoryResource. | 5000 | long
| *connectionResource* (advanced) | A ConnectionResource is an interface that allows for customization and container control of the ConnectionFactory. See Plugable Connection Resource Management for further details. | | ConnectionResource
-| *connectionTestOnBorrow* (advanced) | When using the default org.apache.camel.component.sjms.jms.ConnectionFactoryResource then should each javax.jms.Connection be tested (calling start) before returned from the pool. | true | boolean
+| *connectionTestOnBorrow* (advanced) | When using the default org.apache.camel.component.sjms.jms.ConnectionFactoryResource then should each javax.jms.Connection be tested before being used. | true | boolean
| *destinationCreationStrategy* (advanced) | To use a custom DestinationCreationStrategy. | | DestinationCreationStrategy
| *jmsKeyFormatStrategy* (advanced) | Pluggable strategy for encoding and decoding JMS keys so they can be compliant with the JMS specification. Camel provides one implementation out of the box: default. The default strategy will safely marshal dots and hyphens (. and -). Can be used for JMS brokers which do not care whether JMS header keys contain illegal characters. You can provide your own implementation of the org.apache.camel.component.jms.JmsKeyFormatStrategy and refer to it using [...]
| *messageCreatedStrategy* (advanced) | To use the given MessageCreatedStrategy which are invoked when Camel creates new instances of javax.jms.Message objects when Camel is sending a JMS message. | | MessageCreatedStrategy
@@ -273,6 +273,75 @@ from("sjms:queue:in.out.test?exchangePattern=InOut")
.transform(constant("Bye Camel"));
----
+
+== Reuse endpoint and send to different destinations computed at runtime
+
+If you need to send messages to a lot of different JMS destinations, it
+makes sense to reuse a SJMS endpoint and specify the real destination in
+a message header. This allows Camel to reuse the same endpoint, but send
+to different destinations. This greatly reduces the number of endpoints
+created and economizes on memory and thread resources.
+
+TIP: Using xref:{eip-vc}:eips:toD-eip.adoc[toD] is easier than specifying the dynamic destination with a header
+
+You can specify the destination in the following headers:
+
+[width="100%",cols="10%,10%,80%",options="header",]
+|=====================================================================
+|Header |Type |Description
+|`CamelJmsDestinationName` |`String` |The destination name.
+|=====================================================================
+
+For example, the following route shows how you can compute a destination
+at run time and use it to override the destination appearing in the JMS
+URL:
+
+[source,java]
+--------------------------------
+from("file://inbox")
+ .to("bean:computeDestination")
+ .to("sjms:queue:dummy");
+--------------------------------
+
+The queue name, `dummy`, is just a placeholder. It must be provided as
+part of the JMS endpoint URL, but it will be ignored in this example.
+
+In the `computeDestination` bean, specify the real destination by
+setting the `CamelJmsDestinationName` header as follows:
+
+[source,java]
+-------------------------------------------------------------------------
+public void setJmsHeader(Exchange exchange) {
+ String id = ....
+ exchange.getIn().setHeader("CamelJmsDestinationName", "order:" + id");
+}
+-------------------------------------------------------------------------
+
+Then Camel will read this header and use it as the destination instead
+of the one configured on the endpoint. So, in this example Camel sends
+the message to `sjms:queue:order:2`, assuming the `id` value was 2.
+
+Keep in mind that the JMS producer removes both `CamelJmsDestinationName`
+headers from the exchange and do not propagate them to the created JMS
+message in order to avoid the accidental loops
+in the routes (in scenarios when the message will be forwarded to the
+another JMS endpoint).
+
+== Using toD
+
+If you need to send messages to a lot of different JMS destinations, it
+makes sense to reuse a SJMS endpoint and specify the dynamic destinations
+with simple language using xref:{eip-vc}:eips:toD-eip.adoc[toD].
+
+For example suppose you need to send messages to queues with order types,
+then using toD could for example be done as follows:
+
+[source,java]
+--------------------------------
+from("direct:order")
+ .toD("sjms:order-${header.orderType}");
+--------------------------------
+
== Advanced Usage Notes
=== Plugable Connection Resource Management [[SJMS-connectionresource]]
diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/sjms2-component.adoc b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/sjms2-component.adoc
index 3b9795f..342919f 100644
--- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/sjms2-component.adoc
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/sjms2-component.adoc
@@ -110,7 +110,7 @@ The Simple JMS2 component supports 19 options, which are listed below.
| *connectionClientId* (advanced) | The client ID to use when creating javax.jms.Connection when using the default org.apache.camel.component.sjms.jms.ConnectionFactoryResource. | | String
| *connectionMaxWait* (advanced) | The max wait time in millis to block and wait on free connection when the pool is exhausted when using the default org.apache.camel.component.sjms.jms.ConnectionFactoryResource. | 5000 | long
| *connectionResource* (advanced) | A ConnectionResource is an interface that allows for customization and container control of the ConnectionFactory. See Plugable Connection Resource Management for further details. | | ConnectionResource
-| *connectionTestOnBorrow* (advanced) | When using the default org.apache.camel.component.sjms.jms.ConnectionFactoryResource then should each javax.jms.Connection be tested (calling start) before returned from the pool. | true | boolean
+| *connectionTestOnBorrow* (advanced) | When using the default org.apache.camel.component.sjms.jms.ConnectionFactoryResource then should each javax.jms.Connection be tested before being used. | true | boolean
| *destinationCreationStrategy* (advanced) | To use a custom DestinationCreationStrategy. | | DestinationCreationStrategy
| *jmsKeyFormatStrategy* (advanced) | Pluggable strategy for encoding and decoding JMS keys so they can be compliant with the JMS specification. Camel provides one implementation out of the box: default. The default strategy will safely marshal dots and hyphens (. and -). Can be used for JMS brokers which do not care whether JMS header keys contain illegal characters. You can provide your own implementation of the org.apache.camel.component.jms.JmsKeyFormatStrategy and refer to it using [...]
| *messageCreatedStrategy* (advanced) | To use the given MessageCreatedStrategy which are invoked when Camel creates new instances of javax.jms.Message objects when Camel is sending a JMS message. | | MessageCreatedStrategy
diff --git a/components/camel-sjms/src/generated/resources/META-INF/services/org/apache/camel/send-dynamic/sjms b/components/camel-sjms/src/generated/resources/META-INF/services/org/apache/camel/send-dynamic/sjms
new file mode 100644
index 0000000..05ec05e
--- /dev/null
+++ b/components/camel-sjms/src/generated/resources/META-INF/services/org/apache/camel/send-dynamic/sjms
@@ -0,0 +1,2 @@
+# Generated by camel build tools - do NOT edit this file!
+class=org.apache.camel.component.sjms.SjmsSendDynamicAware
diff --git a/components/camel-sjms/src/generated/resources/org/apache/camel/component/sjms/sjms.json b/components/camel-sjms/src/generated/resources/org/apache/camel/component/sjms/sjms.json
index 0476b0a..f504c36 100644
--- a/components/camel-sjms/src/generated/resources/org/apache/camel/component/sjms/sjms.json
+++ b/components/camel-sjms/src/generated/resources/org/apache/camel/component/sjms/sjms.json
@@ -32,7 +32,7 @@
"connectionClientId": { "kind": "property", "displayName": "Connection Client Id", "group": "advanced", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "The client ID to use when creating javax.jms.Connection when using the default org.apache.camel.component.sjms.jms.ConnectionFactoryResource." },
"connectionMaxWait": { "kind": "property", "displayName": "Connection Max Wait", "group": "advanced", "label": "advanced", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 5000, "description": "The max wait time in millis to block and wait on free connection when the pool is exhausted when using the default org.apache.camel.component.sjms.jms.ConnectionFactoryResource." },
"connectionResource": { "kind": "property", "displayName": "Connection Resource", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.sjms.jms.ConnectionResource", "deprecated": false, "autowired": false, "secret": false, "description": "A ConnectionResource is an interface that allows for customization and container control of the ConnectionFactory. See Plugable Connection Resource Management for further details." },
- "connectionTestOnBorrow": { "kind": "property", "displayName": "Connection Test On Borrow", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "When using the default org.apache.camel.component.sjms.jms.ConnectionFactoryResource then should each javax.jms.Connection be tested (calling start) before returned from the pool." },
+ "connectionTestOnBorrow": { "kind": "property", "displayName": "Connection Test On Borrow", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "When using the default org.apache.camel.component.sjms.jms.ConnectionFactoryResource then should each javax.jms.Connection be tested before being used." },
"destinationCreationStrategy": { "kind": "property", "displayName": "Destination Creation Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.sjms.jms.DestinationCreationStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom DestinationCreationStrategy." },
"jmsKeyFormatStrategy": { "kind": "property", "displayName": "Jms Key Format Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.sjms.jms.JmsKeyFormatStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "Pluggable strategy for encoding and decoding JMS keys so they can be compliant with the JMS specification. Camel provides one implementation out of the box: default. The defau [...]
"messageCreatedStrategy": { "kind": "property", "displayName": "Message Created Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.sjms.jms.MessageCreatedStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "To use the given MessageCreatedStrategy which are invoked when Camel creates new instances of javax.jms.Message objects when Camel is sending a JMS message." },
@@ -77,7 +77,7 @@
"messageCreatedStrategy": { "kind": "parameter", "displayName": "Message Created Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.sjms.jms.MessageCreatedStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "To use the given MessageCreatedStrategy which are invoked when Camel creates new instances of javax.jms.Message objects when Camel is sending a JMS message." },
"errorHandlerLoggingLevel": { "kind": "parameter", "displayName": "Error Handler Logging Level", "group": "logging", "label": "consumer,logging", "required": false, "type": "object", "javaType": "org.apache.camel.LoggingLevel", "enum": [ "TRACE", "DEBUG", "INFO", "WARN", "ERROR", "OFF" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "WARN", "description": "Allows to configure the default errorHandler logging level for logging uncaught exceptions." },
"errorHandlerLogStackTrace": { "kind": "parameter", "displayName": "Error Handler Log Stack Trace", "group": "logging", "label": "consumer,logging", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Allows to control whether stacktraces should be logged or not, by the default errorHandler." },
- "transacted": { "kind": "parameter", "displayName": "Transacted", "group": "transaction", "label": "consumer,transaction", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Specifies whether to use transacted mode" },
+ "transacted": { "kind": "parameter", "displayName": "Transacted", "group": "transaction", "label": "transaction", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Specifies whether to use transacted mode" },
"transactionBatchCount": { "kind": "parameter", "displayName": "Transaction Batch Count", "group": "transaction", "label": "consumer,transaction", "required": false, "type": "integer", "javaType": "int", "deprecated": true, "autowired": false, "secret": false, "defaultValue": -1, "description": "If transacted sets the number of messages to process before committing a transaction." },
"transactionBatchTimeout": { "kind": "parameter", "displayName": "Transaction Batch Timeout", "group": "transaction", "label": "consumer,transaction", "required": false, "type": "duration", "javaType": "long", "deprecated": true, "autowired": false, "secret": false, "defaultValue": "5s", "description": "Sets timeout (in millis) for batch transactions, the value should be 1000 or higher." },
"transactionCommitStrategy": { "kind": "parameter", "displayName": "Transaction Commit Strategy", "group": "transaction", "label": "transaction", "required": false, "type": "object", "javaType": "org.apache.camel.component.sjms.TransactionCommitStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the commit strategy." },
diff --git a/components/camel-sjms/src/main/docs/sjms-component.adoc b/components/camel-sjms/src/main/docs/sjms-component.adoc
index 175d43e..931c813 100644
--- a/components/camel-sjms/src/main/docs/sjms-component.adoc
+++ b/components/camel-sjms/src/main/docs/sjms-component.adoc
@@ -107,7 +107,7 @@ The Simple JMS component supports 19 options, which are listed below.
| *connectionClientId* (advanced) | The client ID to use when creating javax.jms.Connection when using the default org.apache.camel.component.sjms.jms.ConnectionFactoryResource. | | String
| *connectionMaxWait* (advanced) | The max wait time in millis to block and wait on free connection when the pool is exhausted when using the default org.apache.camel.component.sjms.jms.ConnectionFactoryResource. | 5000 | long
| *connectionResource* (advanced) | A ConnectionResource is an interface that allows for customization and container control of the ConnectionFactory. See Plugable Connection Resource Management for further details. | | ConnectionResource
-| *connectionTestOnBorrow* (advanced) | When using the default org.apache.camel.component.sjms.jms.ConnectionFactoryResource then should each javax.jms.Connection be tested (calling start) before returned from the pool. | true | boolean
+| *connectionTestOnBorrow* (advanced) | When using the default org.apache.camel.component.sjms.jms.ConnectionFactoryResource then should each javax.jms.Connection be tested before being used. | true | boolean
| *destinationCreationStrategy* (advanced) | To use a custom DestinationCreationStrategy. | | DestinationCreationStrategy
| *jmsKeyFormatStrategy* (advanced) | Pluggable strategy for encoding and decoding JMS keys so they can be compliant with the JMS specification. Camel provides one implementation out of the box: default. The default strategy will safely marshal dots and hyphens (. and -). Can be used for JMS brokers which do not care whether JMS header keys contain illegal characters. You can provide your own implementation of the org.apache.camel.component.jms.JmsKeyFormatStrategy and refer to it using [...]
| *messageCreatedStrategy* (advanced) | To use the given MessageCreatedStrategy which are invoked when Camel creates new instances of javax.jms.Message objects when Camel is sending a JMS message. | | MessageCreatedStrategy
@@ -273,6 +273,75 @@ from("sjms:queue:in.out.test?exchangePattern=InOut")
.transform(constant("Bye Camel"));
----
+
+== Reuse endpoint and send to different destinations computed at runtime
+
+If you need to send messages to a lot of different JMS destinations, it
+makes sense to reuse a SJMS endpoint and specify the real destination in
+a message header. This allows Camel to reuse the same endpoint, but send
+to different destinations. This greatly reduces the number of endpoints
+created and economizes on memory and thread resources.
+
+TIP: Using xref:{eip-vc}:eips:toD-eip.adoc[toD] is easier than specifying the dynamic destination with a header
+
+You can specify the destination in the following headers:
+
+[width="100%",cols="10%,10%,80%",options="header",]
+|=====================================================================
+|Header |Type |Description
+|`CamelJmsDestinationName` |`String` |The destination name.
+|=====================================================================
+
+For example, the following route shows how you can compute a destination
+at run time and use it to override the destination appearing in the JMS
+URL:
+
+[source,java]
+--------------------------------
+from("file://inbox")
+ .to("bean:computeDestination")
+ .to("sjms:queue:dummy");
+--------------------------------
+
+The queue name, `dummy`, is just a placeholder. It must be provided as
+part of the JMS endpoint URL, but it will be ignored in this example.
+
+In the `computeDestination` bean, specify the real destination by
+setting the `CamelJmsDestinationName` header as follows:
+
+[source,java]
+-------------------------------------------------------------------------
+public void setJmsHeader(Exchange exchange) {
+ String id = ....
+ exchange.getIn().setHeader("CamelJmsDestinationName", "order:" + id");
+}
+-------------------------------------------------------------------------
+
+Then Camel will read this header and use it as the destination instead
+of the one configured on the endpoint. So, in this example Camel sends
+the message to `sjms:queue:order:2`, assuming the `id` value was 2.
+
+Keep in mind that the JMS producer removes both `CamelJmsDestinationName`
+headers from the exchange and do not propagate them to the created JMS
+message in order to avoid the accidental loops
+in the routes (in scenarios when the message will be forwarded to the
+another JMS endpoint).
+
+== Using toD
+
+If you need to send messages to a lot of different JMS destinations, it
+makes sense to reuse a SJMS endpoint and specify the dynamic destinations
+with simple language using xref:{eip-vc}:eips:toD-eip.adoc[toD].
+
+For example suppose you need to send messages to queues with order types,
+then using toD could for example be done as follows:
+
+[source,java]
+--------------------------------
+from("direct:order")
+ .toD("sjms:order-${header.orderType}");
+--------------------------------
+
== Advanced Usage Notes
=== Plugable Connection Resource Management [[SJMS-connectionresource]]
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/MessageProducerPool.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/MessageProducerPool.java
new file mode 100644
index 0000000..682b21d
--- /dev/null
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/MessageProducerPool.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms;
+
+import javax.jms.JMSException;
+
+import org.apache.commons.pool.BasePoolableObjectFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The {@link MessageProducerResources} pool for {@link SjmsProducer} producers.
+ */
+class MessageProducerPool extends BasePoolableObjectFactory<MessageProducerResources> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MessageProducerPool.class);
+
+ private final SjmsProducer sjmsProducer;
+
+ public MessageProducerPool(SjmsProducer sjmsProducer) {
+ this.sjmsProducer = sjmsProducer;
+ }
+
+ @Override
+ public MessageProducerResources makeObject() throws Exception {
+ return sjmsProducer.doCreateProducerModel(sjmsProducer.createSession());
+ }
+
+ @Override
+ public boolean validateObject(MessageProducerResources obj) {
+ try {
+ obj.getSession().getAcknowledgeMode();
+ return true;
+ } catch (JMSException ex) {
+ LOG.warn("Cannot validate JMS session", ex);
+ }
+ return false;
+ }
+
+ @Override
+ public void destroyObject(MessageProducerResources model) throws Exception {
+ if (model.getMessageProducer() != null) {
+ model.getMessageProducer().close();
+ }
+
+ if (model.getSession() != null) {
+ try {
+ if (model.getSession().getTransacted()) {
+ try {
+ model.getSession().rollback();
+ } catch (Exception e) {
+ // Do nothing. Just make sure we are cleaned up
+ }
+ }
+ model.getSession().close();
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ }
+}
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
index 9667de6..4d32269 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
@@ -47,7 +47,7 @@ public class SjmsComponent extends HeaderFilterStrategyComponent {
private ConnectionFactory connectionFactory;
@Metadata(label = "advanced",
description = "A ConnectionResource is an interface that allows for customization and container control of the ConnectionFactory."
- + " * See Plugable Connection Resource Management for further details.")
+ + " See Plugable Connection Resource Management for further details.")
private ConnectionResource connectionResource;
@Metadata(label = "advanced",
description = "Pluggable strategy for encoding and decoding JMS keys so they can be compliant with the JMS specification."
@@ -71,7 +71,7 @@ public class SjmsComponent extends HeaderFilterStrategyComponent {
private MessageCreatedStrategy messageCreatedStrategy;
@Metadata(label = "advanced", defaultValue = "true",
description = "When using the default org.apache.camel.component.sjms.jms.ConnectionFactoryResource"
- + " then should each javax.jms.Connection be tested (calling start) before returned from the pool.")
+ + " then should each javax.jms.Connection be tested before being used.")
private boolean connectionTestOnBorrow = true;
@Metadata(label = "security", secret = true,
description = "The username to use when creating javax.jms.Connection when using the"
@@ -286,7 +286,7 @@ public class SjmsComponent extends HeaderFilterStrategyComponent {
/**
* When using the default {@link org.apache.camel.component.sjms.jms.ConnectionFactoryResource} then should each
- * {@link javax.jms.Connection} be tested (calling start) before returned from the pool.
+ * {@link javax.jms.Connection} be tested before being used.
*/
public void setConnectionTestOnBorrow(boolean connectionTestOnBorrow) {
this.connectionTestOnBorrow = connectionTestOnBorrow;
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConstants.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConstants.java
index 54e9f9d..9a40c1f 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConstants.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConstants.java
@@ -18,8 +18,8 @@ package org.apache.camel.component.sjms;
public interface SjmsConstants {
- String JMS_MESSAGE_TYPE = "JmsMessageType";
-
+ String JMS_MESSAGE_TYPE = "CamelJmsMessageType";
String JMS_SESSION = "CamelJMSSession";
+ String JMS_DESTINATION_NAME = "CamelJMSDestinationName";
}
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
index a0db9b6..b432547 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
@@ -54,6 +54,7 @@ import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriPath;
import org.apache.camel.support.DefaultEndpoint;
import org.apache.camel.support.EndpointHelper;
+import org.apache.camel.support.LanguageSupport;
import org.apache.camel.support.LoggingExceptionHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -88,7 +89,7 @@ public class SjmsEndpoint extends DefaultEndpoint
description = "Whether to include all JMSXxxx properties when mapping from JMS to Camel Message."
+ " Setting this to true will include properties such as JMSXAppID, and JMSXUserID etc. Note: If you are using a custom headerFilterStrategy then this option does not apply.")
private boolean includeAllJMSXProperties;
- @UriParam(label = "consumer,transaction",
+ @UriParam(label = "transaction",
description = "Specifies whether to use transacted mode")
private boolean transacted;
@UriParam(label = "transaction,advanced", defaultValue = "true",
@@ -205,9 +206,9 @@ public class SjmsEndpoint extends DefaultEndpoint
public SjmsEndpoint(String uri, Component component, String remaining) {
super(uri, component);
- DestinationNameParser parser = new DestinationNameParser();
- this.topic = parser.isTopic(remaining);
- this.destinationName = parser.getShortName(remaining);
+ // TODO: optimize for dynamic destination name via toD (eg ${ } somewhere)
+ this.topic = DestinationNameParser.isTopic(remaining);
+ this.destinationName = DestinationNameParser.getShortName(remaining);
}
@Override
@@ -253,6 +254,10 @@ public class SjmsEndpoint extends DefaultEndpoint
} else {
producer = new InOutProducer(this);
}
+ // are we using dynamic destinations?
+ if (LanguageSupport.hasSimpleFunction(getDestinationName())) {
+ producer.disableProducers();
+ }
return producer;
}
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
index 2044df6..217ad4f 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
@@ -20,18 +20,15 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import javax.jms.Connection;
-import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
-import org.apache.camel.RuntimeCamelException;
import org.apache.camel.component.sjms.jms.ConnectionResource;
import org.apache.camel.component.sjms.tx.SessionTransactionSynchronization;
import org.apache.camel.support.DefaultAsyncProducer;
-import org.apache.commons.pool.BasePoolableObjectFactory;
import org.apache.commons.pool.impl.GenericObjectPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,51 +40,8 @@ public abstract class SjmsProducer extends DefaultAsyncProducer {
private static final Logger LOG = LoggerFactory.getLogger(SjmsProducer.class);
- /**
- * The {@link MessageProducerResources} pool for all {@link SjmsProducer} classes.
- */
- protected class MessageProducerResourcesFactory extends BasePoolableObjectFactory<MessageProducerResources> {
-
- @Override
- public MessageProducerResources makeObject() throws Exception {
- return doCreateProducerModel(createSession());
- }
-
- @Override
- public boolean validateObject(MessageProducerResources obj) {
- try {
- obj.getSession().getAcknowledgeMode();
- return true;
- } catch (JMSException ex) {
- LOG.error("Cannot validate session", ex);
- }
- return false;
- }
-
- @Override
- public void destroyObject(MessageProducerResources model) throws Exception {
- if (model.getMessageProducer() != null) {
- model.getMessageProducer().close();
- }
-
- if (model.getSession() != null) {
- try {
- if (model.getSession().getTransacted()) {
- try {
- model.getSession().rollback();
- } catch (Exception e) {
- // Do nothing. Just make sure we are cleaned up
- }
- }
- model.getSession().close();
- } catch (Exception e) {
- // TODO why is the session closed already?
- }
- }
- }
- }
-
private GenericObjectPool<MessageProducerResources> producers;
+ private boolean useProducers = true;
private ExecutorService executor;
private Future<?> asyncStart;
@@ -95,39 +49,52 @@ public abstract class SjmsProducer extends DefaultAsyncProducer {
super(endpoint);
}
+ /**
+ * Used to disable using the internal producer pool when using dynamic endpoints with toD
+ */
+ protected void disableProducers() {
+ useProducers = false;
+ }
+
@Override
protected void doStart() throws Exception {
super.doStart();
- this.executor = getEndpoint().getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "SjmsProducer");
- if (getProducers() == null) {
- GenericObjectPool<MessageProducerResources> producers
- = new GenericObjectPool<>(new MessageProducerResourcesFactory());
- setProducers(producers);
- producers.setMaxActive(getProducerCount());
- producers.setMaxIdle(getProducerCount());
- producers.setTestOnBorrow(getEndpoint().getComponent().isConnectionTestOnBorrow());
- producers.setLifo(false);
- if (getEndpoint().isPrefillPool()) {
- if (getEndpoint().isAsyncStartListener()) {
- asyncStart = getEndpoint().getComponent().getAsyncStartStopExecutorService().submit(new Runnable() {
- @Override
- public void run() {
- try {
- fillProducersPool();
- } catch (Throwable e) {
- LOG.warn("Error filling producer pool for destination: {}. This exception will be ignored.",
- getDestinationName(), e);
+ if (useProducers) {
+ if (!isSynchronous()) {
+ this.executor
+ = getEndpoint().getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this,
+ "SjmsProducer");
+ }
+ if (getProducers() == null) {
+ GenericObjectPool<MessageProducerResources> producers
+ = new GenericObjectPool<>(new MessageProducerPool(this));
+ setProducers(producers);
+ producers.setMaxActive(getProducerCount());
+ producers.setMaxIdle(getProducerCount());
+ producers.setTestOnBorrow(getEndpoint().getComponent().isConnectionTestOnBorrow());
+ producers.setLifo(false);
+ if (getEndpoint().isPrefillPool()) {
+ if (getEndpoint().isAsyncStartListener()) {
+ asyncStart = getEndpoint().getComponent().getAsyncStartStopExecutorService().submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ fillProducersPool();
+ } catch (Throwable e) {
+ LOG.warn("Error filling producer pool for destination: {}. This exception will be ignored.",
+ getDestinationName(), e);
+ }
}
- }
- @Override
- public String toString() {
- return "AsyncStartListenerTask[" + getDestinationName() + "]";
- }
- });
- } else {
- fillProducersPool();
+ @Override
+ public String toString() {
+ return "AsyncStartListenerTask[" + getDestinationName() + "]";
+ }
+ });
+ } else {
+ fillProducersPool();
+ }
}
}
}
@@ -180,17 +147,8 @@ public abstract class SjmsProducer extends DefaultAsyncProducer {
}
protected MessageProducerResources doCreateProducerModel(Session session) throws Exception {
- MessageProducerResources answer;
- try {
- MessageProducer messageProducer = getEndpoint().getJmsObjectFactory().createMessageProducer(session, getEndpoint());
-
- answer = new MessageProducerResources(session, messageProducer, getCommitStrategy());
-
- } catch (Exception e) {
- LOG.error("Unable to create the MessageProducer", e);
- throw e;
- }
- return answer;
+ MessageProducer messageProducer = getEndpoint().getJmsObjectFactory().createMessageProducer(session, getEndpoint());
+ return new MessageProducerResources(session, messageProducer, getCommitStrategy());
}
protected Session createSession() throws Exception {
@@ -198,39 +156,116 @@ public abstract class SjmsProducer extends DefaultAsyncProducer {
Connection conn = connectionResource.borrowConnection();
try {
return conn.createSession(isEndpointTransacted(), getAcknowledgeMode());
- } catch (Exception e) {
- LOG.error("Unable to create the Session", e);
- throw e;
} finally {
connectionResource.returnConnection(conn);
}
}
protected interface ReleaseProducerCallback {
- void release(MessageProducerResources producer) throws Exception;
+ void release(MessageProducerResources producer);
}
- protected class CloseProducerCallback implements ReleaseProducerCallback {
+ private static class CloseProducerCallback implements ReleaseProducerCallback {
@Override
- public void release(MessageProducerResources producer) throws Exception {
- producer.getMessageProducer().close();
+ public void release(MessageProducerResources producer) {
+ try {
+ producer.getMessageProducer().close();
+ } catch (Exception e) {
+ // ignore
+ }
}
}
- protected class ReturnProducerCallback implements ReleaseProducerCallback {
+ private class ReturnProducerCallback implements ReleaseProducerCallback {
@Override
- public void release(MessageProducerResources producer) throws Exception {
- getProducers().returnObject(producer);
+ public void release(MessageProducerResources producer) {
+ try {
+ getProducers().returnObject(producer);
+ } catch (Exception e) {
+ // ignore
+ }
}
}
public abstract void sendMessage(
Exchange exchange, AsyncCallback callback, MessageProducerResources producer,
- ReleaseProducerCallback releaseProducerCallback)
- throws Exception;
+ ReleaseProducerCallback releaseProducerCallback);
+
+ public abstract void sendMessage(
+ Exchange exchange, AsyncCallback callback, Session session, String destinationName);
@Override
public boolean process(final Exchange exchange, final AsyncCallback callback) {
+ String destinationName = exchange.getMessage().getHeader(SjmsConstants.JMS_DESTINATION_NAME, String.class);
+ if (destinationName != null) {
+ // remove the header so it wont be propagated
+ exchange.getMessage().removeHeader(SjmsConstants.JMS_DESTINATION_NAME);
+ }
+ if (destinationName != null) {
+ return doProcess(exchange, callback, destinationName);
+ } else {
+ return doProcess(exchange, callback);
+ }
+ }
+
+ protected boolean doProcess(final Exchange exchange, AsyncCallback callback, final String destinationName) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Processing Exchange.id:{}", exchange.getExchangeId());
+ }
+
+ try {
+ AsyncCallback ac = callback;
+ Session session;
+ if (isEndpointTransacted() && isSharedJMSSession()) {
+ session = exchange.getIn().getHeader(SjmsConstants.JMS_SESSION, Session.class);
+ } else {
+ // TODO: Either rewrite to have connection factory externally pooled
+ // or we have some built in session pooling
+ session = createSession();
+ ac = doneSync -> {
+ try {
+ // ensure session is closed after use
+ session.close();
+ } catch (Exception e) {
+ // ignore
+ }
+ callback.done(doneSync);
+ };
+ }
+ final AsyncCallback asyncCallback = ac;
+
+ if (!isSynchronous()) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(" Sending message asynchronously: {}", exchange.getExchangeId());
+ }
+ getExecutor().execute(() -> sendMessage(exchange, asyncCallback, session, destinationName));
+ return false;
+ } else {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(" Sending message synchronously: {}", exchange.getExchangeId());
+ }
+ sendMessage(exchange, asyncCallback, session, destinationName);
+ }
+ } catch (Exception e) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Processing Exchange.id:{}", exchange.getExchangeId() + " - FAILED");
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Exception: {}", e.getMessage(), e);
+ }
+ // an error occurred so set on exchange and call the callback
+ exchange.setException(e);
+ callback.done(true);
+ return true;
+ }
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Processing Exchange.id:{}", exchange.getExchangeId() + " - SUCCESS");
+ }
+ return true;
+ }
+
+ protected boolean doProcess(final Exchange exchange, final AsyncCallback callback) {
if (LOG.isDebugEnabled()) {
LOG.debug("Processing Exchange.id:{}", exchange.getExchangeId());
}
@@ -262,42 +297,35 @@ public abstract class SjmsProducer extends DefaultAsyncProducer {
}
}
- if (producer == null) {
- exchange.setException(new Exception("Unable to send message: connection not available"));
+ if (!isSynchronous()) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(" Sending message asynchronously: {}", exchange.getExchangeId());
+ }
+ getExecutor().execute(() -> sendMessage(exchange, callback, producer, releaseProducerCallback));
+ return false;
} else {
- if (!isSynchronous()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(" Sending message asynchronously: {}", exchange.getIn().getBody());
- }
- getExecutor().execute(new Runnable() {
- @Override
- public void run() {
- try {
- sendMessage(exchange, callback, producer, releaseProducerCallback);
- } catch (Exception e) {
- RuntimeCamelException.wrapRuntimeCamelException(e);
- }
- }
- });
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug(" Sending message synchronously: {}", exchange.getIn().getBody());
- }
- sendMessage(exchange, callback, producer, releaseProducerCallback);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(" Sending message synchronously: {}", exchange.getExchangeId());
}
+ sendMessage(exchange, callback, producer, releaseProducerCallback);
}
} catch (Exception e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Processing Exchange.id:{}", exchange.getExchangeId() + " - FAILED");
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Processing Exchange.id:{}", exchange.getExchangeId() + " - FAILED");
}
- if (LOG.isDebugEnabled()) {
- LOG.trace("Exception: {}", e.getLocalizedMessage(), e);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Exception: {}", e.getMessage(), e);
}
+ // an error occurred so set on exchange and call the callback
exchange.setException(e);
+ callback.done(true);
+ return true;
}
- LOG.debug("Processing Exchange.id:{}", exchange.getExchangeId() + " - SUCCESS");
- return isSynchronous();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Processing Exchange.id:{}", exchange.getExchangeId() + " - SUCCESS");
+ }
+ return true;
}
/**
@@ -318,8 +346,6 @@ public abstract class SjmsProducer extends DefaultAsyncProducer {
/**
* Gets the acknowledgment mode for this instance of DestinationProducer.
- *
- * @return int
*/
public int getAcknowledgeMode() {
return getEndpoint().getAcknowledgementMode().intValue();
@@ -327,8 +353,6 @@ public abstract class SjmsProducer extends DefaultAsyncProducer {
/**
* Gets the synchronous value for this instance of DestinationProducer.
- *
- * @return true if synchronous, otherwise false
*/
public boolean isSynchronous() {
return getEndpoint().isSynchronous();
@@ -336,8 +360,6 @@ public abstract class SjmsProducer extends DefaultAsyncProducer {
/**
* Gets the replyTo for this instance of DestinationProducer.
- *
- * @return String
*/
public String getReplyTo() {
return getEndpoint().getNamedReplyTo();
@@ -345,8 +367,6 @@ public abstract class SjmsProducer extends DefaultAsyncProducer {
/**
* Gets the destinationName for this instance of DestinationProducer.
- *
- * @return String
*/
public String getDestinationName() {
return getEndpoint().getDestinationName();
@@ -354,8 +374,6 @@ public abstract class SjmsProducer extends DefaultAsyncProducer {
/**
* Sets the producer pool for this instance of SjmsProducer.
- *
- * @param producers A MessageProducerPool
*/
public void setProducers(GenericObjectPool<MessageProducerResources> producers) {
this.producers = producers;
@@ -363,8 +381,6 @@ public abstract class SjmsProducer extends DefaultAsyncProducer {
/**
* Gets the MessageProducerPool value of producers for this instance of SjmsProducer.
- *
- * @return the producers
*/
public GenericObjectPool<MessageProducerResources> getProducers() {
return producers;
@@ -372,8 +388,6 @@ public abstract class SjmsProducer extends DefaultAsyncProducer {
/**
* Test to verify if this endpoint is a JMS Topic or Queue.
- *
- * @return true if it is a Topic, otherwise it is a Queue
*/
public boolean isTopic() {
return getEndpoint().isTopic();
@@ -381,8 +395,6 @@ public abstract class SjmsProducer extends DefaultAsyncProducer {
/**
* Test to determine if this endpoint should use a JMS Transaction.
- *
- * @return true if transacted, otherwise false
*/
public boolean isEndpointTransacted() {
return getEndpoint().isTransacted();
@@ -390,8 +402,6 @@ public abstract class SjmsProducer extends DefaultAsyncProducer {
/**
* Test to determine if this endpoint should share a JMS Session with other SJMS endpoints.
- *
- * @return true if shared, otherwise false
*/
public boolean isSharedJMSSession() {
return getEndpoint().isSharedJMSSession();
@@ -399,8 +409,6 @@ public abstract class SjmsProducer extends DefaultAsyncProducer {
/**
* Returns the named reply to value for this producer
- *
- * @return true if it is a Topic, otherwise it is a Queue
*/
public String getNamedReplyTo() {
return getEndpoint().getNamedReplyTo();
@@ -408,8 +416,6 @@ public abstract class SjmsProducer extends DefaultAsyncProducer {
/**
* Gets the producerCount for this instance of SjmsProducer.
- *
- * @return int
*/
public int getProducerCount() {
return getEndpoint().getProducerCount();
@@ -417,8 +423,6 @@ public abstract class SjmsProducer extends DefaultAsyncProducer {
/**
* Gets consumerCount for this instance of SjmsProducer.
- *
- * @return int
*/
public int getConsumerCount() {
return getEndpoint().getConsumerCount();
@@ -426,8 +430,6 @@ public abstract class SjmsProducer extends DefaultAsyncProducer {
/**
* Gets the executor for this instance of SjmsProducer.
- *
- * @return ExecutorService
*/
public ExecutorService getExecutor() {
return executor;
@@ -435,8 +437,6 @@ public abstract class SjmsProducer extends DefaultAsyncProducer {
/**
* Gets the ttl for this instance of SjmsProducer.
- *
- * @return long
*/
public long getTtl() {
return getEndpoint().getTtl();
@@ -444,8 +444,6 @@ public abstract class SjmsProducer extends DefaultAsyncProducer {
/**
* Gets the boolean value of persistent for this instance of SjmsProducer.
- *
- * @return true if persistent, otherwise false
*/
public boolean isPersistent() {
return getEndpoint().isPersistent();
@@ -453,8 +451,6 @@ public abstract class SjmsProducer extends DefaultAsyncProducer {
/**
* Gets responseTimeOut for this instance of SjmsProducer.
- *
- * @return long
*/
public long getResponseTimeOut() {
return getEndpoint().getResponseTimeOut();
@@ -462,8 +458,6 @@ public abstract class SjmsProducer extends DefaultAsyncProducer {
/**
* Gets commitStrategy for this instance of SjmsProducer.
- *
- * @return TransactionCommitStrategy
*/
protected TransactionCommitStrategy getCommitStrategy() {
if (isEndpointTransacted()) {
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsSendDynamicAware.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsSendDynamicAware.java
new file mode 100644
index 0000000..7d10ac0
--- /dev/null
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsSendDynamicAware.java
@@ -0,0 +1,109 @@
+package org.apache.camel.component.sjms;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.spi.SendDynamicAware;
+import org.apache.camel.spi.annotations.SendDynamic;
+import org.apache.camel.support.service.ServiceSupport;
+import org.apache.camel.util.StringHelper;
+
+/**
+ * JMS based {@link org.apache.camel.spi.SendDynamicAware} which allows to optimise Simple JMS components with the toD
+ * (dynamic to) DSL in Camel. This implementation optimises by allowing to provide dynamic parameters via
+ * {@link SjmsConstants#JMS_DESTINATION_NAME} header instead of the endpoint uri. That allows to use a static endpoint
+ * and its producer to service dynamic requests.
+ */
+@SendDynamic("sjms")
+public class SjmsSendDynamicAware extends ServiceSupport implements SendDynamicAware {
+
+ private CamelContext camelContext;
+ private String scheme;
+
+ @Override
+ public String getScheme() {
+ return scheme;
+ }
+
+ @Override
+ public void setScheme(String scheme) {
+ this.scheme = scheme;
+ }
+
+ @Override
+ public CamelContext getCamelContext() {
+ return camelContext;
+ }
+
+ @Override
+ public void setCamelContext(CamelContext camelContext) {
+ this.camelContext = camelContext;
+ }
+
+ @Override
+ public boolean isOnlyDynamicQueryParameters() {
+ return true;
+ }
+
+ @Override
+ public boolean isLenientProperties() {
+ return false;
+ }
+
+ @Override
+ public DynamicAwareEntry prepare(Exchange exchange, String uri, String originalUri) throws Exception {
+ return new DynamicAwareEntry(uri, originalUri, null, null);
+ }
+
+ @Override
+ public String resolveStaticUri(Exchange exchange, DynamicAwareEntry entry) throws Exception {
+ String destination = parseDestinationName(entry.getUri());
+ if (destination != null) {
+ String originalDestination = parseDestinationName(entry.getOriginalUri());
+ if (!destination.equals(originalDestination)) {
+ // okay the destination was dynamic, so use the original as endpoint name
+ String answer = entry.getUri();
+ answer = StringHelper.replaceFirst(answer, destination, originalDestination);
+ return answer;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public Processor createPreProcessor(Exchange exchange, DynamicAwareEntry entry) throws Exception {
+ if (exchange.getMessage().getHeader(SjmsConstants.JMS_DESTINATION_NAME) != null) {
+ return null;
+ }
+
+ final String destinationName = parseDestinationName(entry.getUri());
+ return new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ exchange.getMessage().setHeader(SjmsConstants.JMS_DESTINATION_NAME, destinationName);
+ }
+ };
+ }
+
+ @Override
+ public Processor createPostProcessor(Exchange exchange, DynamicAwareEntry entry) throws Exception {
+ // no post processor is needed
+ return null;
+ }
+
+ private String parseDestinationName(String uri) {
+ // strip query
+ int pos = uri.indexOf('?');
+ if (pos != -1) {
+ uri = uri.substring(0, pos);
+ }
+ // destination name is after last colon
+ pos = uri.lastIndexOf(':');
+ if (pos != -1) {
+ return uri.substring(pos + 1);
+ } else {
+ return null;
+ }
+ }
+
+}
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/TransactionCommitStrategy.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/TransactionCommitStrategy.java
index 2c2e2ee..055fd91 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/TransactionCommitStrategy.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/TransactionCommitStrategy.java
@@ -28,9 +28,8 @@ public interface TransactionCommitStrategy {
* Should returns true to allow the commit to proceed. If false, the commit will be skipped. The default should
* always be true to avoid messages remaining uncommitted.
*
- * @param exchange {@link org.apache.camel.Exchange}
- * @return true if the {@link javax.jms.Session} should be committed, otherwise false
- * @throws Exception
+ * @param exchange {@link org.apache.camel.Exchange}
+ * @return true if the {@link javax.jms.Session} should be committed, otherwise false
*/
boolean commit(Exchange exchange) throws Exception;
@@ -38,9 +37,8 @@ public interface TransactionCommitStrategy {
* Should returns true to allow the commit to proceed. If false, the commit will be skipped. The default should
* always be true to avoid messages remaining uncommitted.
*
- * @param exchange {@link org.apache.camel.Exchange}
- * @return true if the {@link javax.jms.Session} should be committed, otherwise false
- * @throws Exception
+ * @param exchange {@link org.apache.camel.Exchange}
+ * @return true if the {@link javax.jms.Session} should be committed, otherwise false
*/
boolean rollback(Exchange exchange) throws Exception;
}
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
index 1967860..3a29049 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
@@ -22,6 +22,7 @@ import javax.jms.Message;
import javax.jms.Session;
import org.apache.camel.AggregationStrategy;
+import org.apache.camel.Category;
import org.apache.camel.Component;
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
@@ -47,7 +48,7 @@ import org.apache.camel.support.DefaultEndpoint;
* Highly performant and transactional batch consumption of messages from a JMS queue.
*/
@UriEndpoint(firstVersion = "2.16.0", scheme = "sjms-batch", title = "Simple JMS Batch", syntax = "sjms-batch:destinationName",
- label = "messaging", consumerOnly = true)
+ category = { Category.MESSAGING }, consumerOnly = true)
public class SjmsBatchEndpoint extends DefaultEndpoint implements HeaderFilterStrategyAware {
public static final int DEFAULT_COMPLETION_SIZE = 200; // the default dispatch queue size in ActiveMQ
@@ -104,13 +105,12 @@ public class SjmsBatchEndpoint extends DefaultEndpoint implements HeaderFilterSt
public SjmsBatchEndpoint(String endpointUri, Component component, String remaining) {
super(endpointUri, component);
- DestinationNameParser parser = new DestinationNameParser();
- if (parser.isTopic(remaining)) {
+ if (DestinationNameParser.isTopic(remaining)) {
throw new IllegalArgumentException(
"Only batch consumption from queues is supported. For topics you "
+ "should use a regular JMS consumer with an aggregator.");
}
- this.destinationName = parser.getShortName(remaining);
+ this.destinationName = DestinationNameParser.getShortName(remaining);
}
@Override
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/ConnectionResource.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/ConnectionResource.java
index 6eb95ed..9041bff 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/ConnectionResource.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/ConnectionResource.java
@@ -43,8 +43,7 @@ public interface ConnectionResource {
/**
* Returns the {@link Connection} to the connection pool.
*
- * @param connection the borrowed {@link Connection}
- * @throws Exception
+ * @param connection the borrowed {@link Connection}
*/
void returnConnection(Connection connection) throws Exception;
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DestinationNameParser.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DestinationNameParser.java
index 1054d04..b37e87f 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DestinationNameParser.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DestinationNameParser.java
@@ -16,19 +16,19 @@
*/
package org.apache.camel.component.sjms.jms;
-/**
- * @author jkorab
- */
-public class DestinationNameParser {
+public final class DestinationNameParser {
+
+ private DestinationNameParser() {
+ }
- public boolean isTopic(String destinationName) {
+ public static boolean isTopic(String destinationName) {
if (destinationName == null) {
throw new IllegalArgumentException("destinationName is null");
}
return destinationName.startsWith("topic:");
}
- public boolean isNamedReplyToTopic(String namedReplyTo, boolean isDestinationTopic) {
+ public static boolean isNamedReplyToTopic(String namedReplyTo, boolean isDestinationTopic) {
if (namedReplyTo == null) {
throw new IllegalArgumentException("namedReplyTo is null");
}
@@ -41,7 +41,7 @@ public class DestinationNameParser {
}
}
- public String getShortName(String destinationName) {
+ public static String getShortName(String destinationName) {
if (destinationName == null) {
throw new IllegalArgumentException("destinationName is null");
}
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
index 3e718a0..42d5fb3 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
@@ -16,7 +16,11 @@
*/
package org.apache.camel.component.sjms.producer;
+import javax.jms.Destination;
+import javax.jms.JMSException;
import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelExchangeException;
@@ -47,8 +51,7 @@ public class InOnlyProducer extends SjmsProducer {
@Override
public void sendMessage(
final Exchange exchange, final AsyncCallback callback, final MessageProducerResources producer,
- final ReleaseProducerCallback releaseProducerCallback)
- throws Exception {
+ final ReleaseProducerCallback releaseProducerCallback) {
try {
Message message = getEndpoint().getBinding().makeJmsMessage(exchange, producer.getSession());
producer.getMessageProducer().send(message);
@@ -60,4 +63,31 @@ public class InOnlyProducer extends SjmsProducer {
}
}
+ @Override
+ public void sendMessage(Exchange exchange, AsyncCallback callback, Session session, String destinationName) {
+ MessageProducer producer = null;
+ try {
+ Destination destination = resolveDestinationName(session, destinationName);
+ Message message = getEndpoint().getBinding().makeJmsMessage(exchange, session);
+ producer = session.createProducer(destination);
+ producer.send(message);
+ } catch (Exception e) {
+ exchange.setException(new CamelExchangeException("Unable to complete sending the JMS message", exchange, e));
+ } finally {
+ try {
+ if (producer != null) {
+ producer.close();
+ }
+ } catch (Throwable e) {
+ // ignore
+ }
+ callback.done(isSynchronous());
+ }
+ }
+
+ protected Destination resolveDestinationName(Session session, String destinationName) throws JMSException {
+ return getEndpoint().getDestinationCreationStrategy().createDestination(session,
+ destinationName, getEndpoint().isTopic());
+ }
+
}
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
index ce2fd10..6940ba5 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
@@ -28,10 +28,12 @@ import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelException;
+import org.apache.camel.CamelExchangeException;
import org.apache.camel.Exchange;
import org.apache.camel.component.sjms.MessageConsumerResources;
import org.apache.camel.component.sjms.MessageProducerResources;
@@ -99,8 +101,7 @@ public class InOutProducer extends SjmsProducer {
replyToDestination = getEndpoint().getDestinationCreationStrategy().createTemporaryDestination(session,
isReplyToTopic);
} else {
- DestinationNameParser parser = new DestinationNameParser();
- isReplyToTopic = parser.isNamedReplyToTopic(getNamedReplyTo(), isTopic());
+ isReplyToTopic = DestinationNameParser.isNamedReplyToTopic(getNamedReplyTo(), isTopic());
replyToDestination = getEndpoint().getDestinationCreationStrategy().createDestination(session,
getNamedReplyTo(), isReplyToTopic);
}
@@ -125,13 +126,13 @@ public class InOutProducer extends SjmsProducer {
correlationID, replyToDestination, EXCHANGERS.size(), message);
}
} catch (Exception e) {
- LOG.warn("Unable to exchange message: {}. This exception is ignored.", message, e);
+ LOG.warn("Unable to match correlated exchange message: {}. This exception is ignored.", message, e);
}
}
});
answer = new MessageConsumerResources(session, messageConsumer, replyToDestination);
} catch (Exception e) {
- LOG.error("Unable to create the MessageConsumerResource: {}", e.getLocalizedMessage());
+ LOG.error("Unable to create the MessageConsumerResource: {}", e.getMessage());
throw new CamelException(e);
} finally {
connectionResource.returnConnection(conn);
@@ -145,7 +146,7 @@ public class InOutProducer extends SjmsProducer {
obj.getSession().getAcknowledgeMode();
return true;
} catch (JMSException ex) {
- LOG.error("Cannot validate session", ex);
+ LOG.debug("Cannot validate session. This exception is ignored.", ex);
}
return false;
}
@@ -171,7 +172,6 @@ public class InOutProducer extends SjmsProducer {
@Override
protected void doStart() throws Exception {
-
if (isEndpointTransacted()) {
throw new IllegalArgumentException(
"InOut exchange pattern is incompatible with transacted=true as it cause a deadlock. Please use transacted=false or InOnly exchange pattern.");
@@ -202,11 +202,81 @@ public class InOutProducer extends SjmsProducer {
protected void doStop() throws Exception {
super.doStop();
if (consumers != null) {
- consumers.close();
+ try {
+ consumers.close();
+ } catch (Exception e) {
+ // ignore
+ }
consumers = null;
}
}
+ @Override
+ public void sendMessage(Exchange exchange, AsyncCallback callback, Session session, String destinationName) {
+ MessageProducer producer = null;
+ try {
+ Destination destination = resolveDestinationName(session, destinationName);
+ Message request = getEndpoint().getBinding().makeJmsMessage(exchange, session);
+
+ String correlationId = exchange.getIn().getHeader(JmsConstants.JMS_CORRELATION_ID, String.class);
+ if (correlationId == null) {
+ // we append the 'Camel-' prefix to know it was generated by us
+ correlationId = GENERATED_CORRELATION_ID_PREFIX + getUuidGenerator().generateUuid();
+ }
+
+ Object responseObject = null;
+ Exchanger<Object> messageExchanger = new Exchanger<>();
+ JmsMessageHelper.setCorrelationId(request, correlationId);
+ EXCHANGERS.put(correlationId, messageExchanger);
+
+ MessageConsumerResources consumer = consumers.borrowObject();
+ JmsMessageHelper.setJMSReplyTo(request, consumer.getReplyToDestination());
+ consumers.returnObject(consumer);
+
+ producer = session.createProducer(destination);
+ producer.send(request);
+
+ try {
+ responseObject = messageExchanger.exchange(null, getResponseTimeOut(), TimeUnit.MILLISECONDS);
+ EXCHANGERS.remove(correlationId);
+ } catch (InterruptedException e) {
+ LOG.debug("Exchanger was interrupted while waiting on response", e);
+ exchange.setException(e);
+ } catch (TimeoutException e) {
+ LOG.debug("Exchanger timed out while waiting on response", e);
+ exchange.setException(e);
+ }
+
+ if (exchange.getException() == null) {
+ if (responseObject instanceof Throwable) {
+ exchange.setException((Throwable) responseObject);
+ } else if (responseObject instanceof Message) {
+ Message message = (Message) responseObject;
+
+ SjmsMessage response
+ = new SjmsMessage(exchange, message, consumer.getSession(), getEndpoint().getBinding());
+ // the JmsBinding is designed to be "pull-based": it will populate the Camel message on demand
+ // therefore, we link Exchange and OUT message before continuing, so that the JmsBinding has full access
+ // to everything it may need, and can populate headers, properties, etc. accordingly (solves CAMEL-6218).
+ exchange.setOut(response);
+ } else {
+ exchange.setException(new CamelException("Unknown response type: " + responseObject));
+ }
+ }
+ } catch (Exception e) {
+ exchange.setException(new CamelExchangeException("Unable to complete sending the JMS message", exchange, e));
+ } finally {
+ try {
+ if (producer != null) {
+ producer.close();
+ }
+ } catch (Throwable e) {
+ // ignore
+ }
+ callback.done(isSynchronous());
+ }
+ }
+
/**
* TODO time out is actually double as it waits for the producer and then waits for the response. Use an atomic long
* to manage the countdown
@@ -214,63 +284,67 @@ public class InOutProducer extends SjmsProducer {
@Override
public void sendMessage(
final Exchange exchange, final AsyncCallback callback, final MessageProducerResources producer,
- final ReleaseProducerCallback releaseProducerCallback)
- throws Exception {
- Message request = getEndpoint().getBinding().makeJmsMessage(exchange, producer.getSession());
-
- String correlationId = exchange.getIn().getHeader(JmsConstants.JMS_CORRELATION_ID, String.class);
- if (correlationId == null) {
- // we append the 'Camel-' prefix to know it was generated by us
- correlationId = GENERATED_CORRELATION_ID_PREFIX + getUuidGenerator().generateUuid();
- }
+ final ReleaseProducerCallback releaseProducerCallback) {
+ try {
+ Message request = getEndpoint().getBinding().makeJmsMessage(exchange, producer.getSession());
- Object responseObject = null;
- Exchanger<Object> messageExchanger = new Exchanger<>();
- JmsMessageHelper.setCorrelationId(request, correlationId);
- EXCHANGERS.put(correlationId, messageExchanger);
+ String correlationId = exchange.getIn().getHeader(JmsConstants.JMS_CORRELATION_ID, String.class);
+ if (correlationId == null) {
+ // we append the 'Camel-' prefix to know it was generated by us
+ correlationId = GENERATED_CORRELATION_ID_PREFIX + getUuidGenerator().generateUuid();
+ }
- MessageConsumerResources consumer = consumers.borrowObject();
- JmsMessageHelper.setJMSReplyTo(request, consumer.getReplyToDestination());
- consumers.returnObject(consumer);
- producer.getMessageProducer().send(request);
+ Object responseObject = null;
+ Exchanger<Object> messageExchanger = new Exchanger<>();
+ JmsMessageHelper.setCorrelationId(request, correlationId);
+ EXCHANGERS.put(correlationId, messageExchanger);
- // Return the producer to the pool so another waiting producer
- // can move forward
- // without waiting on us to complete the exchange
- try {
+ MessageConsumerResources consumer = consumers.borrowObject();
+ JmsMessageHelper.setJMSReplyTo(request, consumer.getReplyToDestination());
+ consumers.returnObject(consumer);
+ producer.getMessageProducer().send(request);
+
+ // Return the producer to the pool so another waiting producer can move forward
+ // without waiting on us to complete the exchange
releaseProducerCallback.release(producer);
- } catch (Exception exception) {
- // thrown if the pool is full. safe to ignore.
- }
- try {
- responseObject = messageExchanger.exchange(null, getResponseTimeOut(), TimeUnit.MILLISECONDS);
- EXCHANGERS.remove(correlationId);
- } catch (InterruptedException e) {
- LOG.debug("Exchanger was interrupted while waiting on response", e);
- exchange.setException(e);
- } catch (TimeoutException e) {
- LOG.debug("Exchanger timed out while waiting on response", e);
- exchange.setException(e);
- }
+ try {
+ responseObject = messageExchanger.exchange(null, getResponseTimeOut(), TimeUnit.MILLISECONDS);
+ EXCHANGERS.remove(correlationId);
+ } catch (InterruptedException e) {
+ LOG.debug("Exchanger was interrupted while waiting on response", e);
+ exchange.setException(e);
+ } catch (TimeoutException e) {
+ LOG.debug("Exchanger timed out while waiting on response", e);
+ exchange.setException(e);
+ }
- if (exchange.getException() == null) {
- if (responseObject instanceof Throwable) {
- exchange.setException((Throwable) responseObject);
- } else if (responseObject instanceof Message) {
- Message message = (Message) responseObject;
-
- SjmsMessage response = new SjmsMessage(exchange, message, consumer.getSession(), getEndpoint().getBinding());
- // the JmsBinding is designed to be "pull-based": it will populate the Camel message on demand
- // therefore, we link Exchange and OUT message before continuing, so that the JmsBinding has full access
- // to everything it may need, and can populate headers, properties, etc. accordingly (solves CAMEL-6218).
- exchange.setOut(response);
- } else {
- exchange.setException(new CamelException("Unknown response type: " + responseObject));
+ if (exchange.getException() == null) {
+ if (responseObject instanceof Throwable) {
+ exchange.setException((Throwable) responseObject);
+ } else if (responseObject instanceof Message) {
+ Message message = (Message) responseObject;
+
+ SjmsMessage response
+ = new SjmsMessage(exchange, message, consumer.getSession(), getEndpoint().getBinding());
+ // the JmsBinding is designed to be "pull-based": it will populate the Camel message on demand
+ // therefore, we link Exchange and OUT message before continuing, so that the JmsBinding has full access
+ // to everything it may need, and can populate headers, properties, etc. accordingly (solves CAMEL-6218).
+ exchange.setOut(response);
+ } else {
+ exchange.setException(new CamelException("Unknown response type: " + responseObject));
+ }
}
+ } catch (Exception e) {
+ exchange.setException(new CamelExchangeException("Unable to complete sending the JMS message", exchange, e));
+ } finally {
+ callback.done(isSynchronous());
}
+ }
- callback.done(isSynchronous());
+ protected Destination resolveDestinationName(Session session, String destinationName) throws JMSException {
+ return getEndpoint().getDestinationCreationStrategy().createDestination(session,
+ destinationName, getEndpoint().isTopic());
}
}
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/DestinationNameParserTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/DestinationNameParserTest.java
index 2c40b26..ec6e7b8 100644
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/DestinationNameParserTest.java
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/DestinationNameParserTest.java
@@ -23,39 +23,32 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
-/**
- * @author jkorab
- */
public class DestinationNameParserTest {
@Test
public void testIsTopic() throws Exception {
- DestinationNameParser parser = new DestinationNameParser();
- assertTrue(parser.isTopic("topic:foo"));
- assertFalse(parser.isTopic("queue:bar"));
- assertFalse(parser.isTopic("bar"));
+ assertTrue(DestinationNameParser.isTopic("topic:foo"));
+ assertFalse(DestinationNameParser.isTopic("queue:bar"));
+ assertFalse(DestinationNameParser.isTopic("bar"));
}
@Test
public void testIsTopicNullDestinationName() throws Exception {
- DestinationNameParser parser = new DestinationNameParser();
assertThrows(IllegalArgumentException.class,
- () -> parser.isTopic(null));
+ () -> DestinationNameParser.isTopic(null));
}
@Test
public void testGetShortName() throws Exception {
- DestinationNameParser parser = new DestinationNameParser();
- assertEquals("foo", parser.getShortName("topic:foo"));
- assertFalse(parser.isTopic("queue:bar"), "bar");
- assertFalse(parser.isTopic("bar"), "bar");
+ assertEquals("foo", DestinationNameParser.getShortName("topic:foo"));
+ assertFalse(DestinationNameParser.isTopic("queue:bar"), "bar");
+ assertFalse(DestinationNameParser.isTopic("bar"), "bar");
}
@Test
public void testGetShortNameNullDestinationName() throws Exception {
- DestinationNameParser parser = new DestinationNameParser();
assertThrows(IllegalArgumentException.class,
- () -> parser.getShortName(null));
+ () -> DestinationNameParser.getShortName(null));
}
}
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOnlyQueueProducerTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOnlyQueueProducerTest.java
index 042ff91..2abaa0a 100644
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOnlyQueueProducerTest.java
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOnlyQueueProducerTest.java
@@ -22,6 +22,7 @@ import javax.jms.TextMessage;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.sjms.SjmsConstants;
import org.apache.camel.component.sjms.support.JmsTestSupport;
import org.junit.jupiter.api.Test;
@@ -63,15 +64,34 @@ public class InOnlyQueueProducerTest extends JmsTestSupport {
mock.assertIsSatisfied();
mc.close();
+ }
+
+ @Test
+ public void testInOnlyQueueProducerHeader() throws Exception {
+ MessageConsumer mc = createQueueConsumer("foo");
+ assertNotNull(mc);
+
+ final String expectedBody = "Hello World!";
+ MockEndpoint mock = getMockEndpoint("mock:result");
+
+ mock.expectedMessageCount(1);
+ mock.expectedBodiesReceived(expectedBody);
+
+ template.sendBodyAndHeader("direct:start", expectedBody, SjmsConstants.JMS_DESTINATION_NAME, "foo");
+ Message message = mc.receive(5000);
+ assertNotNull(message);
+ assertTrue(message instanceof TextMessage);
+ TextMessage tm = (TextMessage) message;
+ String text = tm.getText();
+ assertNotNull(text);
+
+ template.sendBody("direct:finish", text);
+
+ mock.assertIsSatisfied();
+ mc.close();
}
- /**
- * @see org.apache.camel.test.junit5.CamelTestSupport#createRouteBuilder()
- *
- * @return
- * @throws Exception
- */
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerTest.java
index 5ca91e3..df2dbcc 100644
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerTest.java
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerTest.java
@@ -28,6 +28,7 @@ import javax.jms.TextMessage;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.sjms.SjmsConstants;
import org.apache.camel.component.sjms.support.JmsTestSupport;
import org.junit.jupiter.api.Test;
@@ -60,7 +61,22 @@ public class InOutQueueProducerTest extends JmsTestSupport {
assertTrue(responseObject instanceof String);
assertEquals(responseText, responseObject);
mc.close();
+ }
+ @Test
+ public void testInOutQueueProducerHeader() throws Exception {
+ MessageConsumer mc = createQueueConsumer("foo");
+ assertNotNull(mc);
+ final String requestText = "Hello World!";
+ final String responseText = "How are you";
+ mc.setMessageListener(new MyMessageListener(requestText, responseText));
+
+ Object responseObject
+ = template.requestBodyAndHeader("direct:start", requestText, SjmsConstants.JMS_DESTINATION_NAME, "foo");
+ assertNotNull(responseObject);
+ assertTrue(responseObject instanceof String);
+ assertEquals(responseText, responseObject);
+ mc.close();
}
@Test
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/SjmsToDSendDynamicTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/SjmsToDSendDynamicTest.java
new file mode 100644
index 0000000..51b6120
--- /dev/null
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/SjmsToDSendDynamicTest.java
@@ -0,0 +1,38 @@
+package org.apache.camel.component.sjms.producer;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.sjms.support.JmsTestSupport;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class SjmsToDSendDynamicTest extends JmsTestSupport {
+
+ @Test
+ public void testToD() throws Exception {
+ template.sendBodyAndHeader("direct:start", "Hello bar", "where", "bar");
+ template.sendBodyAndHeader("direct:start", "Hello beer", "where", "beer");
+
+ // there should only be one sjms endpoint
+ long count = context.getEndpoints().stream().filter(e -> e.getEndpointUri().startsWith("sjms:")).count();
+ assertEquals(1, count, "There should only be 1 sjms endpoint");
+
+ // and the messages should be in the queues
+ String out = consumer.receiveBody("sjms:queue:bar", 2000, String.class);
+ assertEquals("Hello bar", out);
+ out = consumer.receiveBody("sjms:queue:beer", 2000, String.class);
+ assertEquals("Hello beer", out);
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ // route message dynamic using toD
+ from("direct:start").toD("sjms:queue:${header.where}");
+ }
+ };
+ }
+
+}
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/SjmsToDSendDynamicTwoDisabledTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/SjmsToDSendDynamicTwoDisabledTest.java
new file mode 100644
index 0000000..cda29a2
--- /dev/null
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/SjmsToDSendDynamicTwoDisabledTest.java
@@ -0,0 +1,37 @@
+package org.apache.camel.component.sjms.producer;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.sjms.support.JmsTestSupport;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class SjmsToDSendDynamicTwoDisabledTest extends JmsTestSupport {
+
+ @Test
+ public void testToD() throws Exception {
+ template.sendBodyAndHeader("direct:start", "Hello bar", "where", "bar");
+ template.sendBodyAndHeader("direct:start", "Hello beer", "where", "beer");
+ template.sendBodyAndHeader("direct:start", "Hello gin", "where", "gin");
+
+ template.sendBodyAndHeader("direct:start2", "Hello beer", "where2", "beer");
+ template.sendBodyAndHeader("direct:start2", "Hello whiskey", "where2", "whiskey");
+
+ // there should be 4 sjms endpoint
+ long count = context.getEndpoints().stream().filter(e -> e.getEndpointUri().startsWith("sjms:")).count();
+ assertEquals(4, count, "There should be 4 sjms endpoint");
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ // route message dynamic using toD but turn off send dynamic aware
+ from("direct:start").toD().allowOptimisedComponents(false).uri("sjms:queue:${header.where}");
+ from("direct:start2").toD().allowOptimisedComponents(false).uri("sjms:queue:${header.where2}");
+ }
+ };
+ }
+
+}
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/SjmsToDSendDynamicTwoTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/SjmsToDSendDynamicTwoTest.java
new file mode 100644
index 0000000..c86b68e
--- /dev/null
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/SjmsToDSendDynamicTwoTest.java
@@ -0,0 +1,38 @@
+package org.apache.camel.component.sjms.producer;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.sjms.support.JmsTestSupport;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class SjmsToDSendDynamicTwoTest extends JmsTestSupport {
+
+ @Test
+ public void testToD() throws Exception {
+ template.sendBodyAndHeader("direct:start", "Hello bar", "where", "bar");
+ template.sendBodyAndHeader("direct:start", "Hello beer", "where", "beer");
+ template.sendBodyAndHeader("direct:start", "Hello gin", "where", "gin");
+
+ template.sendBodyAndHeader("direct:start2", "Hello beer", "where2", "beer");
+ template.sendBodyAndHeader("direct:start2", "Hello whiskey", "where2", "whiskey");
+
+ // there should be 2 sjms endpoint
+ long count = context.getEndpoints().stream().filter(e -> e.getEndpointUri().startsWith("sjms:")).count();
+ assertEquals(2, count, "There should only be 2 sjms endpoint");
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ // route message dynamic using toD
+ from("direct:start").toD("sjms:queue:${header.where}");
+
+ from("direct:start2").toD("sjms:queue:${header.where2}");
+ }
+ };
+ }
+
+}
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/SjmsToDTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/SjmsToDTest.java
new file mode 100644
index 0000000..17f379c
--- /dev/null
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/SjmsToDTest.java
@@ -0,0 +1,34 @@
+package org.apache.camel.component.sjms.producer;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.sjms.support.JmsTestSupport;
+import org.junit.jupiter.api.Test;
+
+public class SjmsToDTest extends JmsTestSupport {
+
+ @Test
+ public void testToD() throws Exception {
+ getMockEndpoint("mock:bar").expectedBodiesReceived("Hello bar");
+ getMockEndpoint("mock:beer").expectedBodiesReceived("Hello beer");
+
+ template.sendBodyAndHeader("direct:start", "Hello bar", "where", "bar");
+ template.sendBodyAndHeader("direct:start", "Hello beer", "where", "beer");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ // route message dynamic using toD
+ from("direct:start").toD("sjms:queue:${header.where}");
+
+ from("sjms:queue:bar").to("mock:bar");
+ from("sjms:queue:beer").to("mock:beer");
+ }
+ };
+ }
+
+}
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/JmsTestSupport.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/JmsTestSupport.java
index b97df84..e1b6fbb 100644
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/JmsTestSupport.java
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/JmsTestSupport.java
@@ -68,10 +68,6 @@ public class JmsTestSupport extends CamelTestSupport {
/**
* Set up the Broker
- *
- * @see org.apache.camel.test.junit5.CamelTestSupport#doPreSetup()
- *
- * @throws Exception
*/
@Override
protected void doPreSetup() throws Exception {
@@ -145,11 +141,6 @@ public class JmsTestSupport extends CamelTestSupport {
}
}
- /*
- * @see org.apache.camel.test.junit5.CamelTestSupport#createCamelContext()
- * @return
- * @throws Exception
- */
@Override
protected CamelContext createCamelContext() throws Exception {
CamelContext camelContext = super.createCamelContext();
diff --git a/components/camel-sjms2/src/generated/resources/META-INF/services/org/apache/camel/send-dynamic/sjms2 b/components/camel-sjms2/src/generated/resources/META-INF/services/org/apache/camel/send-dynamic/sjms2
new file mode 100644
index 0000000..668bc4c
--- /dev/null
+++ b/components/camel-sjms2/src/generated/resources/META-INF/services/org/apache/camel/send-dynamic/sjms2
@@ -0,0 +1,2 @@
+# Generated by camel build tools - do NOT edit this file!
+class=org.apache.camel.component.sjms2.Sjms2SendDynamicAware
diff --git a/components/camel-sjms2/src/generated/resources/org/apache/camel/component/sjms2/sjms2.json b/components/camel-sjms2/src/generated/resources/org/apache/camel/component/sjms2/sjms2.json
index ae75e53..09930ab 100644
--- a/components/camel-sjms2/src/generated/resources/org/apache/camel/component/sjms2/sjms2.json
+++ b/components/camel-sjms2/src/generated/resources/org/apache/camel/component/sjms2/sjms2.json
@@ -32,7 +32,7 @@
"connectionClientId": { "kind": "property", "displayName": "Connection Client Id", "group": "advanced", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "The client ID to use when creating javax.jms.Connection when using the default org.apache.camel.component.sjms.jms.ConnectionFactoryResource." },
"connectionMaxWait": { "kind": "property", "displayName": "Connection Max Wait", "group": "advanced", "label": "advanced", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 5000, "description": "The max wait time in millis to block and wait on free connection when the pool is exhausted when using the default org.apache.camel.component.sjms.jms.ConnectionFactoryResource." },
"connectionResource": { "kind": "property", "displayName": "Connection Resource", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.sjms.jms.ConnectionResource", "deprecated": false, "autowired": false, "secret": false, "description": "A ConnectionResource is an interface that allows for customization and container control of the ConnectionFactory. See Plugable Connection Resource Management for further details." },
- "connectionTestOnBorrow": { "kind": "property", "displayName": "Connection Test On Borrow", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "When using the default org.apache.camel.component.sjms.jms.ConnectionFactoryResource then should each javax.jms.Connection be tested (calling start) before returned from the pool." },
+ "connectionTestOnBorrow": { "kind": "property", "displayName": "Connection Test On Borrow", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "When using the default org.apache.camel.component.sjms.jms.ConnectionFactoryResource then should each javax.jms.Connection be tested before being used." },
"destinationCreationStrategy": { "kind": "property", "displayName": "Destination Creation Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.sjms.jms.DestinationCreationStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom DestinationCreationStrategy." },
"jmsKeyFormatStrategy": { "kind": "property", "displayName": "Jms Key Format Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.sjms.jms.JmsKeyFormatStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "Pluggable strategy for encoding and decoding JMS keys so they can be compliant with the JMS specification. Camel provides one implementation out of the box: default. The defau [...]
"messageCreatedStrategy": { "kind": "property", "displayName": "Message Created Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.sjms.jms.MessageCreatedStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "To use the given MessageCreatedStrategy which are invoked when Camel creates new instances of javax.jms.Message objects when Camel is sending a JMS message." },
@@ -80,7 +80,7 @@
"messageCreatedStrategy": { "kind": "parameter", "displayName": "Message Created Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.sjms.jms.MessageCreatedStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "To use the given MessageCreatedStrategy which are invoked when Camel creates new instances of javax.jms.Message objects when Camel is sending a JMS message." },
"errorHandlerLoggingLevel": { "kind": "parameter", "displayName": "Error Handler Logging Level", "group": "logging", "label": "consumer,logging", "required": false, "type": "object", "javaType": "org.apache.camel.LoggingLevel", "enum": [ "TRACE", "DEBUG", "INFO", "WARN", "ERROR", "OFF" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "WARN", "description": "Allows to configure the default errorHandler logging level for logging uncaught exceptions." },
"errorHandlerLogStackTrace": { "kind": "parameter", "displayName": "Error Handler Log Stack Trace", "group": "logging", "label": "consumer,logging", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Allows to control whether stacktraces should be logged or not, by the default errorHandler." },
- "transacted": { "kind": "parameter", "displayName": "Transacted", "group": "transaction", "label": "consumer,transaction", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Specifies whether to use transacted mode" },
+ "transacted": { "kind": "parameter", "displayName": "Transacted", "group": "transaction", "label": "transaction", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Specifies whether to use transacted mode" },
"transactionBatchCount": { "kind": "parameter", "displayName": "Transaction Batch Count", "group": "transaction", "label": "consumer,transaction", "required": false, "type": "integer", "javaType": "int", "deprecated": true, "autowired": false, "secret": false, "defaultValue": -1, "description": "If transacted sets the number of messages to process before committing a transaction." },
"transactionBatchTimeout": { "kind": "parameter", "displayName": "Transaction Batch Timeout", "group": "transaction", "label": "consumer,transaction", "required": false, "type": "duration", "javaType": "long", "deprecated": true, "autowired": false, "secret": false, "defaultValue": "5s", "description": "Sets timeout (in millis) for batch transactions, the value should be 1000 or higher." },
"transactionCommitStrategy": { "kind": "parameter", "displayName": "Transaction Commit Strategy", "group": "transaction", "label": "transaction", "required": false, "type": "object", "javaType": "org.apache.camel.component.sjms.TransactionCommitStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the commit strategy." },
diff --git a/components/camel-sjms2/src/main/docs/sjms2-component.adoc b/components/camel-sjms2/src/main/docs/sjms2-component.adoc
index 3b9795f..342919f 100644
--- a/components/camel-sjms2/src/main/docs/sjms2-component.adoc
+++ b/components/camel-sjms2/src/main/docs/sjms2-component.adoc
@@ -110,7 +110,7 @@ The Simple JMS2 component supports 19 options, which are listed below.
| *connectionClientId* (advanced) | The client ID to use when creating javax.jms.Connection when using the default org.apache.camel.component.sjms.jms.ConnectionFactoryResource. | | String
| *connectionMaxWait* (advanced) | The max wait time in millis to block and wait on free connection when the pool is exhausted when using the default org.apache.camel.component.sjms.jms.ConnectionFactoryResource. | 5000 | long
| *connectionResource* (advanced) | A ConnectionResource is an interface that allows for customization and container control of the ConnectionFactory. See Plugable Connection Resource Management for further details. | | ConnectionResource
-| *connectionTestOnBorrow* (advanced) | When using the default org.apache.camel.component.sjms.jms.ConnectionFactoryResource then should each javax.jms.Connection be tested (calling start) before returned from the pool. | true | boolean
+| *connectionTestOnBorrow* (advanced) | When using the default org.apache.camel.component.sjms.jms.ConnectionFactoryResource then should each javax.jms.Connection be tested before being used. | true | boolean
| *destinationCreationStrategy* (advanced) | To use a custom DestinationCreationStrategy. | | DestinationCreationStrategy
| *jmsKeyFormatStrategy* (advanced) | Pluggable strategy for encoding and decoding JMS keys so they can be compliant with the JMS specification. Camel provides one implementation out of the box: default. The default strategy will safely marshal dots and hyphens (. and -). Can be used for JMS brokers which do not care whether JMS header keys contain illegal characters. You can provide your own implementation of the org.apache.camel.component.jms.JmsKeyFormatStrategy and refer to it using [...]
| *messageCreatedStrategy* (advanced) | To use the given MessageCreatedStrategy which are invoked when Camel creates new instances of javax.jms.Message objects when Camel is sending a JMS message. | | MessageCreatedStrategy
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConstants.java b/components/camel-sjms2/src/main/java/org/apache/camel/component/sjms2/Sjms2SendDynamicAware.java
similarity index 76%
copy from components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConstants.java
copy to components/camel-sjms2/src/main/java/org/apache/camel/component/sjms2/Sjms2SendDynamicAware.java
index 54e9f9d..5d4627d 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConstants.java
+++ b/components/camel-sjms2/src/main/java/org/apache/camel/component/sjms2/Sjms2SendDynamicAware.java
@@ -14,12 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.component.sjms;
+package org.apache.camel.component.sjms2;
-public interface SjmsConstants {
+import org.apache.camel.component.sjms.SjmsSendDynamicAware;
+import org.apache.camel.spi.annotations.SendDynamic;
- String JMS_MESSAGE_TYPE = "JmsMessageType";
-
- String JMS_SESSION = "CamelJMSSession";
+@SendDynamic("sjms2")
+public class Sjms2SendDynamicAware extends SjmsSendDynamicAware {
}
diff --git a/components/camel-sjms2/src/test/java/org/apache/camel/component/sjms2/producer/Sjms2ToDSendDynamicTest.java b/components/camel-sjms2/src/test/java/org/apache/camel/component/sjms2/producer/Sjms2ToDSendDynamicTest.java
new file mode 100644
index 0000000..8499cdc
--- /dev/null
+++ b/components/camel-sjms2/src/test/java/org/apache/camel/component/sjms2/producer/Sjms2ToDSendDynamicTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms2.producer;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.sjms2.support.Jms2TestSupport;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class Sjms2ToDSendDynamicTest extends Jms2TestSupport {
+
+ @Test
+ public void testToD() throws Exception {
+ template.sendBodyAndHeader("direct:start", "Hello bar", "where", "bar");
+ template.sendBodyAndHeader("direct:start", "Hello beer", "where", "beer");
+
+ // there should only be one sjms2 endpoint
+ long count = context.getEndpoints().stream().filter(e -> e.getEndpointUri().startsWith("sjms2:")).count();
+ assertEquals(1, count, "There should only be 1 sjms2 endpoint");
+
+ // and the messages should be in the queues
+ String out = consumer.receiveBody("sjms2:queue:bar", 2000, String.class);
+ assertEquals("Hello bar", out);
+ out = consumer.receiveBody("sjms2:queue:beer", 2000, String.class);
+ assertEquals("Hello beer", out);
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ // route message dynamic using toD
+ from("direct:start").toD("sjms2:queue:${header.where}");
+ }
+ };
+ }
+
+}
diff --git a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/Sjms2ComponentBuilderFactory.java b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/Sjms2ComponentBuilderFactory.java
index 5433a41..5ddffe8 100644
--- a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/Sjms2ComponentBuilderFactory.java
+++ b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/Sjms2ComponentBuilderFactory.java
@@ -235,8 +235,7 @@ public interface Sjms2ComponentBuilderFactory {
/**
* When using the default
* org.apache.camel.component.sjms.jms.ConnectionFactoryResource then
- * should each javax.jms.Connection be tested (calling start) before
- * returned from the pool.
+ * should each javax.jms.Connection be tested before being used.
*
* The option is a: <code>boolean</code> type.
*
diff --git a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/SjmsComponentBuilderFactory.java b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/SjmsComponentBuilderFactory.java
index 6fe3878..7e75d72 100644
--- a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/SjmsComponentBuilderFactory.java
+++ b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/SjmsComponentBuilderFactory.java
@@ -234,8 +234,7 @@ public interface SjmsComponentBuilderFactory {
/**
* When using the default
* org.apache.camel.component.sjms.jms.ConnectionFactoryResource then
- * should each javax.jms.Connection be tested (calling start) before
- * returned from the pool.
+ * should each javax.jms.Connection be tested before being used.
*
* The option is a: <code>boolean</code> type.
*
diff --git a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Sjms2EndpointBuilderFactory.java b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Sjms2EndpointBuilderFactory.java
index 58b3bc9..b48e79a 100644
--- a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Sjms2EndpointBuilderFactory.java
+++ b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Sjms2EndpointBuilderFactory.java
@@ -1337,6 +1337,37 @@ public interface Sjms2EndpointBuilderFactory {
return this;
}
/**
+ * Specifies whether to use transacted mode.
+ *
+ * The option is a: <code>boolean</code> type.
+ *
+ * Default: false
+ * Group: transaction
+ *
+ * @param transacted the value to set
+ * @return the dsl builder
+ */
+ default Sjms2EndpointProducerBuilder transacted(boolean transacted) {
+ doSetProperty("transacted", transacted);
+ return this;
+ }
+ /**
+ * Specifies whether to use transacted mode.
+ *
+ * The option will be converted to a <code>boolean</code>
+ * type.
+ *
+ * Default: false
+ * Group: transaction
+ *
+ * @param transacted the value to set
+ * @return the dsl builder
+ */
+ default Sjms2EndpointProducerBuilder transacted(String transacted) {
+ doSetProperty("transacted", transacted);
+ return this;
+ }
+ /**
* Sets the commit strategy.
*
* The option is a:
@@ -2018,6 +2049,37 @@ public interface Sjms2EndpointBuilderFactory {
return this;
}
/**
+ * Specifies whether to use transacted mode.
+ *
+ * The option is a: <code>boolean</code> type.
+ *
+ * Default: false
+ * Group: transaction
+ *
+ * @param transacted the value to set
+ * @return the dsl builder
+ */
+ default Sjms2EndpointBuilder transacted(boolean transacted) {
+ doSetProperty("transacted", transacted);
+ return this;
+ }
+ /**
+ * Specifies whether to use transacted mode.
+ *
+ * The option will be converted to a <code>boolean</code>
+ * type.
+ *
+ * Default: false
+ * Group: transaction
+ *
+ * @param transacted the value to set
+ * @return the dsl builder
+ */
+ default Sjms2EndpointBuilder transacted(String transacted) {
+ doSetProperty("transacted", transacted);
+ return this;
+ }
+ /**
* Sets the commit strategy.
*
* The option is a:
diff --git a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/SjmsEndpointBuilderFactory.java b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/SjmsEndpointBuilderFactory.java
index 31a969d..37925ef 100644
--- a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/SjmsEndpointBuilderFactory.java
+++ b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/SjmsEndpointBuilderFactory.java
@@ -1260,6 +1260,37 @@ public interface SjmsEndpointBuilderFactory {
return this;
}
/**
+ * Specifies whether to use transacted mode.
+ *
+ * The option is a: <code>boolean</code> type.
+ *
+ * Default: false
+ * Group: transaction
+ *
+ * @param transacted the value to set
+ * @return the dsl builder
+ */
+ default SjmsEndpointProducerBuilder transacted(boolean transacted) {
+ doSetProperty("transacted", transacted);
+ return this;
+ }
+ /**
+ * Specifies whether to use transacted mode.
+ *
+ * The option will be converted to a <code>boolean</code>
+ * type.
+ *
+ * Default: false
+ * Group: transaction
+ *
+ * @param transacted the value to set
+ * @return the dsl builder
+ */
+ default SjmsEndpointProducerBuilder transacted(String transacted) {
+ doSetProperty("transacted", transacted);
+ return this;
+ }
+ /**
* Sets the commit strategy.
*
* The option is a:
@@ -1941,6 +1972,37 @@ public interface SjmsEndpointBuilderFactory {
return this;
}
/**
+ * Specifies whether to use transacted mode.
+ *
+ * The option is a: <code>boolean</code> type.
+ *
+ * Default: false
+ * Group: transaction
+ *
+ * @param transacted the value to set
+ * @return the dsl builder
+ */
+ default SjmsEndpointBuilder transacted(boolean transacted) {
+ doSetProperty("transacted", transacted);
+ return this;
+ }
+ /**
+ * Specifies whether to use transacted mode.
+ *
+ * The option will be converted to a <code>boolean</code>
+ * type.
+ *
+ * Default: false
+ * Group: transaction
+ *
+ * @param transacted the value to set
+ * @return the dsl builder
+ */
+ default SjmsEndpointBuilder transacted(String transacted) {
+ doSetProperty("transacted", transacted);
+ return this;
+ }
+ /**
* Sets the commit strategy.
*
* The option is a:
diff --git a/docs/components/modules/ROOT/pages/sjms-component.adoc b/docs/components/modules/ROOT/pages/sjms-component.adoc
index 63e46fe..575717b 100644
--- a/docs/components/modules/ROOT/pages/sjms-component.adoc
+++ b/docs/components/modules/ROOT/pages/sjms-component.adoc
@@ -109,7 +109,7 @@ The Simple JMS component supports 19 options, which are listed below.
| *connectionClientId* (advanced) | The client ID to use when creating javax.jms.Connection when using the default org.apache.camel.component.sjms.jms.ConnectionFactoryResource. | | String
| *connectionMaxWait* (advanced) | The max wait time in millis to block and wait on free connection when the pool is exhausted when using the default org.apache.camel.component.sjms.jms.ConnectionFactoryResource. | 5000 | long
| *connectionResource* (advanced) | A ConnectionResource is an interface that allows for customization and container control of the ConnectionFactory. See Plugable Connection Resource Management for further details. | | ConnectionResource
-| *connectionTestOnBorrow* (advanced) | When using the default org.apache.camel.component.sjms.jms.ConnectionFactoryResource then should each javax.jms.Connection be tested (calling start) before returned from the pool. | true | boolean
+| *connectionTestOnBorrow* (advanced) | When using the default org.apache.camel.component.sjms.jms.ConnectionFactoryResource then should each javax.jms.Connection be tested before being used. | true | boolean
| *destinationCreationStrategy* (advanced) | To use a custom DestinationCreationStrategy. | | DestinationCreationStrategy
| *jmsKeyFormatStrategy* (advanced) | Pluggable strategy for encoding and decoding JMS keys so they can be compliant with the JMS specification. Camel provides one implementation out of the box: default. The default strategy will safely marshal dots and hyphens (. and -). Can be used for JMS brokers which do not care whether JMS header keys contain illegal characters. You can provide your own implementation of the org.apache.camel.component.jms.JmsKeyFormatStrategy and refer to it using [...]
| *messageCreatedStrategy* (advanced) | To use the given MessageCreatedStrategy which are invoked when Camel creates new instances of javax.jms.Message objects when Camel is sending a JMS message. | | MessageCreatedStrategy
@@ -275,6 +275,75 @@ from("sjms:queue:in.out.test?exchangePattern=InOut")
.transform(constant("Bye Camel"));
----
+
+== Reuse endpoint and send to different destinations computed at runtime
+
+If you need to send messages to a lot of different JMS destinations, it
+makes sense to reuse a SJMS endpoint and specify the real destination in
+a message header. This allows Camel to reuse the same endpoint, but send
+to different destinations. This greatly reduces the number of endpoints
+created and economizes on memory and thread resources.
+
+TIP: Using xref:{eip-vc}:eips:toD-eip.adoc[toD] is easier than specifying the dynamic destination with a header
+
+You can specify the destination in the following headers:
+
+[width="100%",cols="10%,10%,80%",options="header",]
+|=====================================================================
+|Header |Type |Description
+|`CamelJmsDestinationName` |`String` |The destination name.
+|=====================================================================
+
+For example, the following route shows how you can compute a destination
+at run time and use it to override the destination appearing in the JMS
+URL:
+
+[source,java]
+--------------------------------
+from("file://inbox")
+ .to("bean:computeDestination")
+ .to("sjms:queue:dummy");
+--------------------------------
+
+The queue name, `dummy`, is just a placeholder. It must be provided as
+part of the JMS endpoint URL, but it will be ignored in this example.
+
+In the `computeDestination` bean, specify the real destination by
+setting the `CamelJmsDestinationName` header as follows:
+
+[source,java]
+-------------------------------------------------------------------------
+public void setJmsHeader(Exchange exchange) {
+ String id = ....
+ exchange.getIn().setHeader("CamelJmsDestinationName", "order:" + id");
+}
+-------------------------------------------------------------------------
+
+Then Camel will read this header and use it as the destination instead
+of the one configured on the endpoint. So, in this example Camel sends
+the message to `sjms:queue:order:2`, assuming the `id` value was 2.
+
+Keep in mind that the JMS producer removes both `CamelJmsDestinationName`
+headers from the exchange and do not propagate them to the created JMS
+message in order to avoid the accidental loops
+in the routes (in scenarios when the message will be forwarded to the
+another JMS endpoint).
+
+== Using toD
+
+If you need to send messages to a lot of different JMS destinations, it
+makes sense to reuse a SJMS endpoint and specify the dynamic destinations
+with simple language using xref:{eip-vc}:eips:toD-eip.adoc[toD].
+
+For example suppose you need to send messages to queues with order types,
+then using toD could for example be done as follows:
+
+[source,java]
+--------------------------------
+from("direct:order")
+ .toD("sjms:order-${header.orderType}");
+--------------------------------
+
== Advanced Usage Notes
=== Plugable Connection Resource Management [[SJMS-connectionresource]]
diff --git a/docs/components/modules/ROOT/pages/sjms2-component.adoc b/docs/components/modules/ROOT/pages/sjms2-component.adoc
index b8084ae..e55c094 100644
--- a/docs/components/modules/ROOT/pages/sjms2-component.adoc
+++ b/docs/components/modules/ROOT/pages/sjms2-component.adoc
@@ -112,7 +112,7 @@ The Simple JMS2 component supports 19 options, which are listed below.
| *connectionClientId* (advanced) | The client ID to use when creating javax.jms.Connection when using the default org.apache.camel.component.sjms.jms.ConnectionFactoryResource. | | String
| *connectionMaxWait* (advanced) | The max wait time in millis to block and wait on free connection when the pool is exhausted when using the default org.apache.camel.component.sjms.jms.ConnectionFactoryResource. | 5000 | long
| *connectionResource* (advanced) | A ConnectionResource is an interface that allows for customization and container control of the ConnectionFactory. See Plugable Connection Resource Management for further details. | | ConnectionResource
-| *connectionTestOnBorrow* (advanced) | When using the default org.apache.camel.component.sjms.jms.ConnectionFactoryResource then should each javax.jms.Connection be tested (calling start) before returned from the pool. | true | boolean
+| *connectionTestOnBorrow* (advanced) | When using the default org.apache.camel.component.sjms.jms.ConnectionFactoryResource then should each javax.jms.Connection be tested before being used. | true | boolean
| *destinationCreationStrategy* (advanced) | To use a custom DestinationCreationStrategy. | | DestinationCreationStrategy
| *jmsKeyFormatStrategy* (advanced) | Pluggable strategy for encoding and decoding JMS keys so they can be compliant with the JMS specification. Camel provides one implementation out of the box: default. The default strategy will safely marshal dots and hyphens (. and -). Can be used for JMS brokers which do not care whether JMS header keys contain illegal characters. You can provide your own implementation of the org.apache.camel.component.jms.JmsKeyFormatStrategy and refer to it using [...]
| *messageCreatedStrategy* (advanced) | To use the given MessageCreatedStrategy which are invoked when Camel creates new instances of javax.jms.Message objects when Camel is sending a JMS message. | | MessageCreatedStrategy