You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/09/07 11:38:52 UTC
[3/5] camel git commit: CAMEL-9116: camel-sjms should use same
binding to/from JMS as camel-jms does.
CAMEL-9116: camel-sjms should use same binding to/from JMS as camel-jms does.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d19e5d74
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d19e5d74
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d19e5d74
Branch: refs/heads/master
Commit: d19e5d742b2cbed2f84ad2598bbe9c4789789d3b
Parents: 5b1d8da
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Sep 7 11:10:01 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Sep 7 11:10:01 2015 +0200
----------------------------------------------------------------------
components/camel-sjms/pom.xml | 219 +++---
.../camel/component/sjms/SjmsComponent.java | 27 +-
.../camel/component/sjms/SjmsConsumer.java | 2 +-
.../camel/component/sjms/SjmsEndpoint.java | 127 ++-
.../sjms/SjmsHeaderFilterStrategy.java | 14 +-
.../camel/component/sjms/SjmsMessage.java | 283 +++++++
.../component/sjms/batch/SjmsBatchConsumer.java | 5 +-
.../component/sjms/batch/SjmsBatchEndpoint.java | 152 +++-
.../sjms/consumer/AbstractMessageHandler.java | 22 +-
.../sjms/consumer/InOutMessageHandler.java | 5 +-
.../sjms/jms/DefaultJmsKeyFormatStrategy.java | 2 +-
.../camel/component/sjms/jms/JmsBinding.java | 606 ++++++++++++++
.../sjms/jms/JmsKeyFormatStrategy.java | 41 +
.../component/sjms/jms/JmsMessageHelper.java | 782 ++++++-------------
.../component/sjms/jms/KeyFormatStrategy.java | 41 -
.../sjms/jms/MessageCreatedStrategy.java | 39 +
.../component/sjms/producer/InOnlyProducer.java | 23 +-
.../component/sjms/producer/InOutProducer.java | 41 +-
.../JMSMessageHelperTypeConversionTest.java | 201 -----
19 files changed, 1671 insertions(+), 961 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/d19e5d74/components/camel-sjms/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-sjms/pom.xml b/components/camel-sjms/pom.xml
index 6977796..3f8d023 100644
--- a/components/camel-sjms/pom.xml
+++ b/components/camel-sjms/pom.xml
@@ -15,121 +15,122 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
- <modelVersion>4.0.0</modelVersion>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.camel</groupId>
- <artifactId>components</artifactId>
- <version>2.16-SNAPSHOT</version>
- </parent>
+ <parent>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>components</artifactId>
+ <version>2.16-SNAPSHOT</version>
+ </parent>
- <artifactId>camel-sjms</artifactId>
- <packaging>bundle</packaging>
- <name>Camel :: Simple JMS</name>
- <description>A pure Java JMS Camel Component</description>
+ <artifactId>camel-sjms</artifactId>
+ <packaging>bundle</packaging>
+ <name>Camel :: Simple JMS</name>
+ <description>A pure Java JMS Camel Component</description>
- <properties>
- <camel.osgi.export.pkg>
- org.apache.camel.component.sjms,
- org.apache.camel.component.sjms.jms,
- org.apache.camel.component.sjms.batch
- </camel.osgi.export.pkg>
- <camel.osgi.private.pkg>
- org.apache.camel.component.sjms.consumer,
- org.apache.camel.component.sjms.producer,
- org.apache.camel.component.sjms.taskmanager,
- org.apache.camel.component.sjms.tx
- </camel.osgi.private.pkg>
- <camel.osgi.export.service>
- org.apache.camel.spi.ComponentResolver;component=sjms,
- org.apache.camel.spi.ComponentResolver;component=sjms-batch
- </camel.osgi.export.service>
- </properties>
+ <properties>
+ <camel.osgi.export.pkg>
+ org.apache.camel.component.sjms,
+ org.apache.camel.component.sjms.jms,
+ org.apache.camel.component.sjms.batch
+ </camel.osgi.export.pkg>
+ <camel.osgi.private.pkg>
+ org.apache.camel.component.sjms.consumer,
+ org.apache.camel.component.sjms.producer,
+ org.apache.camel.component.sjms.taskmanager,
+ org.apache.camel.component.sjms.tx
+ </camel.osgi.private.pkg>
+ <camel.osgi.export.service>
+ org.apache.camel.spi.ComponentResolver;component=sjms,
+ org.apache.camel.spi.ComponentResolver;component=sjms-batch
+ </camel.osgi.export.service>
+ </properties>
- <dependencies>
- <dependency>
- <groupId>org.apache.camel</groupId>
- <artifactId>camel-core</artifactId>
- </dependency>
- <dependency>
- <groupId>commons-pool</groupId>
- <artifactId>commons-pool</artifactId>
- </dependency>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>commons-pool</groupId>
+ <artifactId>commons-pool</artifactId>
+ </dependency>
- <dependency>
- <groupId>org.apache.geronimo.specs</groupId>
- <artifactId>geronimo-jms_1.1_spec</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.geronimo.specs</groupId>
- <artifactId>geronimo-annotation_1.0_spec</artifactId>
- <version>${geronimo-annotation-spec-version}</version>
- <scope>provided</scope>
- </dependency>
+ <dependency>
+ <groupId>org.apache.geronimo.specs</groupId>
+ <artifactId>geronimo-jms_1.1_spec</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.geronimo.specs</groupId>
+ <artifactId>geronimo-annotation_1.0_spec</artifactId>
+ <version>${geronimo-annotation-spec-version}</version>
+ <scope>provided</scope>
+ </dependency>
- <dependency>
- <groupId>org.apache.camel</groupId>
- <artifactId>camel-test</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.activemq</groupId>
- <artifactId>activemq-broker</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.activemq</groupId>
- <artifactId>activemq-kahadb-store</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.activemq</groupId>
- <artifactId>activemq-pool</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>com.atomikos</groupId>
- <artifactId>transactions-jta</artifactId>
- <version>${atomikos-transactions-version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
- <version>${commons-io-version}</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-broker</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-kahadb-store</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-pool</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.atomikos</groupId>
+ <artifactId>transactions-jta</artifactId>
+ <version>${atomikos-transactions-version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <version>${commons-io-version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
- <build>
- <plugins>
- <plugin>
- <artifactId>maven-surefire-plugin</artifactId>
- <configuration>
- <forkCount>1</forkCount>
- <reuseForks>false</reuseForks>
- <forkedProcessTimeoutInSeconds>6000</forkedProcessTimeoutInSeconds>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-clean-plugin</artifactId>
- <configuration>
- <filesets>
- <fileset>
- <directory>${basedir}/activemq-data</directory>
- </fileset>
- </filesets>
- </configuration>
- </plugin>
- </plugins>
- </build>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <forkCount>1</forkCount>
+ <reuseForks>false</reuseForks>
+ <forkedProcessTimeoutInSeconds>6000</forkedProcessTimeoutInSeconds>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-clean-plugin</artifactId>
+ <configuration>
+ <filesets>
+ <fileset>
+ <directory>${basedir}/activemq-data</directory>
+ </fileset>
+ </filesets>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
</project>
http://git-wip-us.apache.org/repos/asf/camel/blob/d19e5d74/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
index 3433ec9..a7347c7 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
@@ -27,7 +27,8 @@ import org.apache.camel.component.sjms.jms.ConnectionFactoryResource;
import org.apache.camel.component.sjms.jms.ConnectionResource;
import org.apache.camel.component.sjms.jms.DefaultJmsKeyFormatStrategy;
import org.apache.camel.component.sjms.jms.DestinationCreationStrategy;
-import org.apache.camel.component.sjms.jms.KeyFormatStrategy;
+import org.apache.camel.component.sjms.jms.JmsKeyFormatStrategy;
+import org.apache.camel.component.sjms.jms.MessageCreatedStrategy;
import org.apache.camel.component.sjms.taskmanager.TimedTaskManager;
import org.apache.camel.impl.UriEndpointComponent;
import org.apache.camel.spi.HeaderFilterStrategy;
@@ -44,12 +45,13 @@ public class SjmsComponent extends UriEndpointComponent implements HeaderFilterS
private ConnectionFactory connectionFactory;
private ConnectionResource connectionResource;
private HeaderFilterStrategy headerFilterStrategy = new SjmsHeaderFilterStrategy();
- private KeyFormatStrategy keyFormatStrategy = new DefaultJmsKeyFormatStrategy();
+ private JmsKeyFormatStrategy jmsKeyFormatStrategy = new DefaultJmsKeyFormatStrategy();
private Integer connectionCount = 1;
private TransactionCommitStrategy transactionCommitStrategy;
private TimedTaskManager timedTaskManager;
private DestinationCreationStrategy destinationCreationStrategy;
private ExecutorService asyncStartStopExecutorService;
+ private MessageCreatedStrategy messageCreatedStrategy;
public SjmsComponent() {
super(SjmsEndpoint.class);
@@ -200,12 +202,12 @@ public class SjmsComponent extends UriEndpointComponent implements HeaderFilterS
* You can provide your own implementation of the org.apache.camel.component.jms.JmsKeyFormatStrategy
* and refer to it using the # notation.
*/
- public void setKeyFormatStrategy(KeyFormatStrategy keyFormatStrategy) {
- this.keyFormatStrategy = keyFormatStrategy;
+ public void setJmsKeyFormatStrategy(JmsKeyFormatStrategy jmsKeyFormatStrategy) {
+ this.jmsKeyFormatStrategy = jmsKeyFormatStrategy;
}
- public KeyFormatStrategy getKeyFormatStrategy() {
- return keyFormatStrategy;
+ public JmsKeyFormatStrategy getJmsKeyFormatStrategy() {
+ return jmsKeyFormatStrategy;
}
public TransactionCommitStrategy getTransactionCommitStrategy() {
@@ -241,4 +243,17 @@ public class SjmsComponent extends UriEndpointComponent implements HeaderFilterS
public void setTimedTaskManager(TimedTaskManager timedTaskManager) {
this.timedTaskManager = timedTaskManager;
}
+
+ public MessageCreatedStrategy getMessageCreatedStrategy() {
+ return messageCreatedStrategy;
+ }
+
+ /**
+ * To use the given MessageCreatedStrategy which are invoked when Camel creates new instances of <tt>javax.jms.Message</tt>
+ * objects when Camel is sending a JMS message.
+ */
+ public void setMessageCreatedStrategy(MessageCreatedStrategy messageCreatedStrategy) {
+ this.messageCreatedStrategy = messageCreatedStrategy;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/d19e5d74/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java
index 322ad5c..162e252 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java
@@ -206,7 +206,7 @@ public class SjmsConsumer extends DefaultConsumer {
*/
protected MessageListener createMessageHandler(Session session) {
- TransactionCommitStrategy commitStrategy = null;
+ TransactionCommitStrategy commitStrategy;
if (getTransactionCommitStrategy() != null) {
commitStrategy = getTransactionCommitStrategy();
} else if (getTransactionBatchCount() > 0) {
http://git-wip-us.apache.org/repos/asf/camel/blob/d19e5d74/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
index c7ed9ac..cb5d396 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
@@ -16,22 +16,30 @@
*/
package org.apache.camel.component.sjms;
+import javax.jms.Message;
+import javax.jms.Session;
+
import org.apache.camel.Component;
import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.MultipleConsumersSupport;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.component.sjms.jms.ConnectionResource;
import org.apache.camel.component.sjms.jms.DefaultDestinationCreationStrategy;
+import org.apache.camel.component.sjms.jms.DefaultJmsKeyFormatStrategy;
import org.apache.camel.component.sjms.jms.DestinationCreationStrategy;
import org.apache.camel.component.sjms.jms.DestinationNameParser;
-import org.apache.camel.component.sjms.jms.KeyFormatStrategy;
+import org.apache.camel.component.sjms.jms.JmsBinding;
+import org.apache.camel.component.sjms.jms.JmsKeyFormatStrategy;
+import org.apache.camel.component.sjms.jms.MessageCreatedStrategy;
import org.apache.camel.component.sjms.jms.SessionAcknowledgementType;
import org.apache.camel.component.sjms.producer.InOnlyProducer;
import org.apache.camel.component.sjms.producer.InOutProducer;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.spi.HeaderFilterStrategy;
+import org.apache.camel.spi.HeaderFilterStrategyAware;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;
@@ -43,11 +51,13 @@ import org.slf4j.LoggerFactory;
* A JMS Endpoint
*/
@UriEndpoint(scheme = "sjms", title = "Simple JMS", syntax = "sjms:destinationType:destinationName", consumerClass = SjmsConsumer.class, label = "messaging")
-public class SjmsEndpoint extends DefaultEndpoint implements MultipleConsumersSupport {
+public class SjmsEndpoint extends DefaultEndpoint implements MultipleConsumersSupport, HeaderFilterStrategyAware {
protected final Logger logger = LoggerFactory.getLogger(getClass());
private boolean topic;
+ private JmsBinding binding;
+
@UriPath(enums = "queue,topic", defaultValue = "queue", description = "The kind of destination to use")
private String destinationType;
@UriPath @Metadata(required = "true")
@@ -55,6 +65,10 @@ public class SjmsEndpoint extends DefaultEndpoint implements MultipleConsumersSu
@UriParam(label = "consumer", defaultValue = "true")
private boolean synchronous = true;
@UriParam
+ private HeaderFilterStrategy headerFilterStrategy;
+ @UriParam
+ private boolean includeAllJMSXProperties;
+ @UriParam
private boolean transacted;
@UriParam(label = "producer")
private String namedReplyTo;
@@ -88,10 +102,16 @@ public class SjmsEndpoint extends DefaultEndpoint implements MultipleConsumersSu
private boolean prefillPool = true;
@UriParam(label = "producer", defaultValue = "true")
private boolean allowNullBody = true;
+ @UriParam(defaultValue = "true")
+ private boolean mapJmsMessage = true;
@UriParam
private TransactionCommitStrategy transactionCommitStrategy;
@UriParam
private DestinationCreationStrategy destinationCreationStrategy = new DefaultDestinationCreationStrategy();
+ @UriParam
+ private MessageCreatedStrategy messageCreatedStrategy;
+ @UriParam
+ private JmsKeyFormatStrategy jmsKeyFormatStrategy;
public SjmsEndpoint() {
}
@@ -146,6 +166,34 @@ public class SjmsEndpoint extends DefaultEndpoint implements MultipleConsumersSu
return true;
}
+ public Exchange createExchange(Message message, Session session) {
+ Exchange exchange = createExchange(getExchangePattern());
+ exchange.setIn(new SjmsMessage(message, session, getBinding()));
+ return exchange;
+ }
+
+ public JmsBinding getBinding() {
+ if (binding == null) {
+ binding = createBinding();
+ }
+ return binding;
+ }
+
+ /**
+ * Creates the {@link org.apache.camel.component.sjms.jms.JmsBinding} to use.
+ */
+ protected JmsBinding createBinding() {
+ return new JmsBinding(isMapJmsMessage(), isAllowNullBody(), getHeaderFilterStrategy(), getJmsKeyFormatStrategy(), getMessageCreatedStrategy());
+ }
+
+ /**
+ * Sets the binding used to convert from a Camel message to and from a JMS
+ * message
+ */
+ public void setBinding(JmsBinding binding) {
+ this.binding = binding;
+ }
+
/**
* DestinationName is a JMS queue or topic name. By default, the destinationName is interpreted as a queue name.
*/
@@ -157,16 +205,35 @@ public class SjmsEndpoint extends DefaultEndpoint implements MultipleConsumersSu
return destinationName;
}
- public ConnectionResource getConnectionResource() {
- return getComponent().getConnectionResource();
+ public HeaderFilterStrategy getHeaderFilterStrategy() {
+ if (headerFilterStrategy == null) {
+ headerFilterStrategy = new SjmsHeaderFilterStrategy(isIncludeAllJMSXProperties());
+ }
+ return headerFilterStrategy;
+ }
+
+ /**
+ * To use a custom HeaderFilterStrategy to filter header to and from Camel message.
+ */
+ public void setHeaderFilterStrategy(HeaderFilterStrategy strategy) {
+ this.headerFilterStrategy = strategy;
}
- public HeaderFilterStrategy getSjmsHeaderFilterStrategy() {
- return getComponent().getHeaderFilterStrategy();
+ public boolean isIncludeAllJMSXProperties() {
+ return includeAllJMSXProperties;
}
- public KeyFormatStrategy getJmsKeyFormatStrategy() {
- return getComponent().getKeyFormatStrategy();
+ /**
+ * Whether to include all JMSXxxx properties when mapping from JMS to Camel Message.
+ * Setting this to true will include properties such as JMSXAppID, and JMSXUserID etc.
+ * Note: If you are using a custom headerFilterStrategy then this option does not apply.
+ */
+ public void setIncludeAllJMSXProperties(boolean includeAllJMSXProperties) {
+ this.includeAllJMSXProperties = includeAllJMSXProperties;
+ }
+
+ public ConnectionResource getConnectionResource() {
+ return getComponent().getConnectionResource();
}
public boolean isSynchronous() {
@@ -417,4 +484,48 @@ public class SjmsEndpoint extends DefaultEndpoint implements MultipleConsumersSu
public void setAllowNullBody(boolean allowNullBody) {
this.allowNullBody = allowNullBody;
}
+
+ public boolean isMapJmsMessage() {
+ return mapJmsMessage;
+ }
+
+ /**
+ * Specifies whether Camel should auto map the received JMS message to a suited payload type, such as javax.jms.TextMessage to a String etc.
+ * See section about how mapping works below for more details.
+ */
+ public void setMapJmsMessage(boolean mapJmsMessage) {
+ this.mapJmsMessage = mapJmsMessage;
+ }
+
+ public MessageCreatedStrategy getMessageCreatedStrategy() {
+ return messageCreatedStrategy;
+ }
+
+ /**
+ * To use the given MessageCreatedStrategy which are invoked when Camel creates new instances of <tt>javax.jms.Message</tt>
+ * objects when Camel is sending a JMS message.
+ */
+ public void setMessageCreatedStrategy(MessageCreatedStrategy messageCreatedStrategy) {
+ this.messageCreatedStrategy = messageCreatedStrategy;
+ }
+
+ public JmsKeyFormatStrategy getJmsKeyFormatStrategy() {
+ if (jmsKeyFormatStrategy == null) {
+ jmsKeyFormatStrategy = new DefaultJmsKeyFormatStrategy();
+ }
+ return jmsKeyFormatStrategy;
+ }
+
+ /**
+ * Pluggable strategy for encoding and decoding JMS keys so they can be compliant with the JMS specification.
+ * Camel provides two implementations out of the box: default and passthrough.
+ * The default strategy will safely marshal dots and hyphens (. and -). The passthrough strategy leaves the key as is.
+ * Can be used for JMS brokers which do not care whether JMS header keys contain illegal characters.
+ * You can provide your own implementation of the org.apache.camel.component.jms.JmsKeyFormatStrategy
+ * and refer to it using the # notation.
+ */
+ public void setJmsKeyFormatStrategy(JmsKeyFormatStrategy jmsKeyFormatStrategy) {
+ this.jmsKeyFormatStrategy = jmsKeyFormatStrategy;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/d19e5d74/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsHeaderFilterStrategy.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsHeaderFilterStrategy.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsHeaderFilterStrategy.java
index 82a8a90..0da77ca 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsHeaderFilterStrategy.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsHeaderFilterStrategy.java
@@ -24,14 +24,18 @@ import org.apache.camel.impl.DefaultHeaderFilterStrategy;
public class SjmsHeaderFilterStrategy extends DefaultHeaderFilterStrategy {
public SjmsHeaderFilterStrategy() {
- initialize();
+ this(false);
+ }
+
+ public SjmsHeaderFilterStrategy(boolean includeAllJMSXProperties) {
+ if (!includeAllJMSXProperties) {
+ initialize();
+ }
}
protected void initialize() {
- // ignore provider specified JMS extension headers see page 39 of JMS
- // 1.1 specification
- // added "JMSXRecvTimestamp" as a workaround for an Oracle bug/typo in
- // AqjmsMessage
+ // ignore provider specified JMS extension headers see page 39 of JMS 1.1 specification
+ // added "JMSXRecvTimestamp" as a workaround for an Oracle bug/typo in AqjmsMessage
getOutFilter().add("JMSXUserID");
getOutFilter().add("JMSXAppID");
getOutFilter().add("JMSXDeliveryCount");
http://git-wip-us.apache.org/repos/asf/camel/blob/d19e5d74/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsMessage.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsMessage.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsMessage.java
new file mode 100644
index 0000000..92f8531
--- /dev/null
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsMessage.java
@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms;
+
+import java.io.File;
+import java.util.Map;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import org.apache.camel.RuntimeExchangeException;
+import org.apache.camel.component.sjms.jms.JmsBinding;
+import org.apache.camel.component.sjms.jms.JmsMessageHelper;
+import org.apache.camel.impl.DefaultMessage;
+import org.apache.camel.util.ExchangeHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Represents a {@link org.apache.camel.Message} for working with JMS
+ *
+ * @version
+ */
+public class SjmsMessage extends DefaultMessage {
+ private static final Logger LOG = LoggerFactory.getLogger(SjmsMessage.class);
+ private Message jmsMessage;
+ private Session jmsSession;
+ private JmsBinding binding;
+
+ public SjmsMessage(Message jmsMessage, Session jmsSession, JmsBinding binding) {
+ setJmsMessage(jmsMessage);
+ setJmsSession(jmsSession);
+ setBinding(binding);
+ }
+
+ @Override
+ public String toString() {
+ // do not print jmsMessage as there could be sensitive details
+ if (jmsMessage != null) {
+ try {
+ return "SjmsMessage[JmsMessageID: " + jmsMessage.getJMSMessageID() + "]";
+ } catch (Throwable e) {
+ // ignore
+ }
+ }
+ return "SjmsMessage@" + ObjectHelper.getIdentityHashCode(this);
+ }
+
+ @Override
+ public void copyFrom(org.apache.camel.Message that) {
+ if (that == this) {
+ // the same instance so do not need to copy
+ return;
+ }
+
+ // must initialize headers before we set the JmsMessage to avoid Camel
+ // populating it before we do the copy
+ getHeaders().clear();
+
+ boolean copyMessageId = true;
+ if (that instanceof SjmsMessage) {
+ SjmsMessage thatMessage = (SjmsMessage) that;
+ this.jmsMessage = thatMessage.jmsMessage;
+ if (this.jmsMessage != null) {
+ // for performance lets not copy the messageID if we are a JMS message
+ copyMessageId = false;
+ }
+ }
+ if (copyMessageId) {
+ setMessageId(that.getMessageId());
+ }
+
+ // copy body and fault flag
+ setBody(that.getBody());
+ setFault(that.isFault());
+
+ // we have already cleared the headers
+ if (that.hasHeaders()) {
+ getHeaders().putAll(that.getHeaders());
+ }
+
+ getAttachments().clear();
+ if (that.hasAttachments()) {
+ getAttachments().putAll(that.getAttachments());
+ }
+ }
+
+ public JmsBinding getBinding() {
+ if (binding == null) {
+ binding = ExchangeHelper.getBinding(getExchange(), JmsBinding.class);
+ }
+ return binding;
+ }
+
+ public void setBinding(JmsBinding binding) {
+ this.binding = binding;
+ }
+
+ /**
+ * Returns the underlying JMS message
+ */
+ public Message getJmsMessage() {
+ return jmsMessage;
+ }
+
+ public void setJmsMessage(Message jmsMessage) {
+ if (jmsMessage != null) {
+ try {
+ setMessageId(jmsMessage.getJMSMessageID());
+ } catch (JMSException e) {
+ LOG.warn("Unable to retrieve JMSMessageID from JMS Message", e);
+ }
+ }
+ this.jmsMessage = jmsMessage;
+ }
+
+ /**
+ * Returns the underlying JMS session.
+ * <p/>
+ * This may be <tt>null</tt>.
+ */
+ public Session getJmsSession() {
+ return jmsSession;
+ }
+
+ public void setJmsSession(Session jmsSession) {
+ this.jmsSession = jmsSession;
+ }
+
+ @Override
+ public void setBody(Object body) {
+ super.setBody(body);
+ if (body == null) {
+ // preserver headers even if we set body to null
+ ensureInitialHeaders();
+ // remove underlying jmsMessage since we mutated body to null
+ jmsMessage = null;
+ }
+ }
+
+ public Object getHeader(String name) {
+ Object answer = null;
+
+ // we will exclude using JMS-prefixed headers here to avoid strangeness with some JMS providers
+ // e.g. ActiveMQ returns the String not the Destination type for "JMSReplyTo"!
+ // only look in jms message directly if we have not populated headers
+ if (jmsMessage != null && !hasPopulatedHeaders() && !name.startsWith("JMS")) {
+ try {
+ // use binding to do the lookup as it has to consider using encoded keys
+ answer = getBinding().getObjectProperty(jmsMessage, name);
+ } catch (JMSException e) {
+ throw new RuntimeExchangeException("Unable to retrieve header from JMS Message: " + name, getExchange(), e);
+ }
+ }
+ // only look if we have populated headers otherwise there are no headers at all
+ // if we do lookup a header starting with JMS then force a lookup
+ if (answer == null && (hasPopulatedHeaders() || name.startsWith("JMS"))) {
+ answer = super.getHeader(name);
+ }
+ return answer;
+ }
+
+ @Override
+ public Map<String, Object> getHeaders() {
+ ensureInitialHeaders();
+ return super.getHeaders();
+ }
+
+ @Override
+ public Object removeHeader(String name) {
+ ensureInitialHeaders();
+ return super.removeHeader(name);
+ }
+
+ @Override
+ public void setHeaders(Map<String, Object> headers) {
+ ensureInitialHeaders();
+ super.setHeaders(headers);
+ }
+
+ @Override
+ public void setHeader(String name, Object value) {
+ ensureInitialHeaders();
+ super.setHeader(name, value);
+ }
+
+ @Override
+ public SjmsMessage newInstance() {
+ return new SjmsMessage(null, null, binding);
+ }
+
+ /**
+ * Returns true if a new JMS message instance should be created to send to the next component
+ */
+ public boolean shouldCreateNewMessage() {
+ return super.hasPopulatedHeaders();
+ }
+
+ /**
+ * Ensure that the headers have been populated from the underlying JMS message
+ * before we start mutating the headers
+ */
+ protected void ensureInitialHeaders() {
+ if (jmsMessage != null && !hasPopulatedHeaders()) {
+ // we have not populated headers so force this by creating
+ // new headers and set it on super
+ super.setHeaders(createHeaders());
+ }
+ }
+
+ @Override
+ protected Object createBody() {
+ if (jmsMessage != null) {
+ return getBinding().extractBodyFromJms(getExchange(), jmsMessage);
+ }
+ return null;
+ }
+
+ @Override
+ protected void populateInitialHeaders(Map<String, Object> map) {
+ if (jmsMessage != null && map != null) {
+ map.putAll(getBinding().extractHeadersFromJms(jmsMessage, getExchange()));
+ }
+ }
+
+ @Override
+ protected String createMessageId() {
+ if (jmsMessage == null) {
+ LOG.trace("No javax.jms.Message set so generating a new message id");
+ return super.createMessageId();
+ }
+ try {
+ String id = getDestinationAsString(jmsMessage.getJMSDestination()) + jmsMessage.getJMSMessageID();
+ return getSanitizedString(id);
+ } catch (JMSException e) {
+ throw new RuntimeExchangeException("Unable to retrieve JMSMessageID from JMS Message", getExchange(), e);
+ }
+ }
+
+ @Override
+ protected Boolean isTransactedRedelivered() {
+ if (jmsMessage != null) {
+ return JmsMessageHelper.getJMSRedelivered(jmsMessage);
+ } else {
+ return null;
+ }
+ }
+
+ private String getDestinationAsString(Destination destination) throws JMSException {
+ String result;
+ if (destination == null) {
+ result = "null destination!" + File.separator;
+ } else if (destination instanceof Topic) {
+ result = "topic" + File.separator + ((Topic) destination).getTopicName() + File.separator;
+ } else {
+ result = "queue" + File.separator + ((Queue) destination).getQueueName() + File.separator;
+ }
+ return result;
+ }
+
+ private String getSanitizedString(Object value) {
+ return value != null ? value.toString().replaceAll("[^a-zA-Z0-9\\.\\_\\-]", "_") : "";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/d19e5d74/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
index ee2b250..e8ce161 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.sjms.batch;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Date;
+import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@@ -41,6 +42,7 @@ import org.apache.camel.Processor;
import org.apache.camel.component.sjms.jms.JmsMessageHelper;
import org.apache.camel.impl.DefaultConsumer;
import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -226,7 +228,8 @@ public class SjmsBatchConsumer extends DefaultConsumer {
LOG.debug("Message received: {}", messageCount);
if ((message instanceof ObjectMessage)
|| (message instanceof TextMessage)) {
- Exchange exchange = JmsMessageHelper.createExchange(message, getEndpoint());
+
+ final Exchange exchange = getEndpoint().createExchange(message, session);
aggregatedExchange = aggregationStrategy.aggregate(aggregatedExchange, exchange);
aggregatedExchange.setProperty(SjmsBatchEndpoint.PROPERTY_BATCH_SIZE, messageCount);
} else {
http://git-wip-us.apache.org/repos/asf/camel/blob/d19e5d74/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
index b4c052f..49c74ba 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
@@ -16,38 +16,65 @@
*/
package org.apache.camel.component.sjms.batch;
+import javax.jms.Message;
+import javax.jms.Session;
+
import org.apache.camel.Component;
import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
+import org.apache.camel.component.sjms.SjmsHeaderFilterStrategy;
+import org.apache.camel.component.sjms.SjmsMessage;
+import org.apache.camel.component.sjms.jms.DefaultJmsKeyFormatStrategy;
import org.apache.camel.component.sjms.jms.DestinationNameParser;
+import org.apache.camel.component.sjms.jms.JmsBinding;
+import org.apache.camel.component.sjms.jms.JmsKeyFormatStrategy;
+import org.apache.camel.component.sjms.jms.MessageCreatedStrategy;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.spi.HeaderFilterStrategy;
+import org.apache.camel.spi.HeaderFilterStrategyAware;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriPath;
@UriEndpoint(scheme = "sjms-batch", title = "Simple JMS Batch Component", syntax = "sjms-batch:destinationName",
- consumerClass = SjmsBatchComponent.class, label = "messaging")
-public class SjmsBatchEndpoint extends DefaultEndpoint {
+ consumerClass = SjmsBatchComponent.class, label = "messaging", consumerOnly = true)
+public class SjmsBatchEndpoint extends DefaultEndpoint implements HeaderFilterStrategyAware {
public static final int DEFAULT_COMPLETION_SIZE = 200; // the default dispatch queue size in ActiveMQ
public static final int DEFAULT_COMPLETION_TIMEOUT = 500;
public static final String PROPERTY_BATCH_SIZE = "CamelSjmsBatchSize";
- @UriPath(label = "consumer") @Metadata(required = "true")
+ private JmsBinding binding;
+
+ @UriPath @Metadata(required = "true")
private String destinationName;
- @UriParam(label = "consumer", defaultValue = "1")
+ @UriParam(defaultValue = "1")
private int consumerCount = 1;
- @UriParam(label = "consumer", defaultValue = "200")
+ @UriParam(defaultValue = "200")
private int completionSize = DEFAULT_COMPLETION_SIZE;
- @UriParam(label = "consumer", defaultValue = "500")
+ @UriParam(defaultValue = "500")
private int completionTimeout = DEFAULT_COMPLETION_TIMEOUT;
- @UriParam(label = "consumer", defaultValue = "1000")
+ @UriParam(defaultValue = "1000")
private int pollDuration = 1000;
- @UriParam(label = "consumer") @Metadata(required = "true")
+ @UriParam @Metadata(required = "true")
private AggregationStrategy aggregationStrategy;
+ @UriParam
+ private HeaderFilterStrategy headerFilterStrategy;
+ @UriParam
+ private boolean includeAllJMSXProperties;
+ @UriParam(defaultValue = "true")
+ private boolean allowNullBody = true;
+ @UriParam(defaultValue = "true")
+ private boolean mapJmsMessage = true;
+ @UriParam
+ private MessageCreatedStrategy messageCreatedStrategy;
+ @UriParam
+ private JmsKeyFormatStrategy jmsKeyFormatStrategy;
+
public SjmsBatchEndpoint() {
}
@@ -78,6 +105,34 @@ public class SjmsBatchEndpoint extends DefaultEndpoint {
return new SjmsBatchConsumer(this, processor);
}
+ public Exchange createExchange(Message message, Session session) {
+ Exchange exchange = createExchange(getExchangePattern());
+ exchange.setIn(new SjmsMessage(message, session, getBinding()));
+ return exchange;
+ }
+
+ public JmsBinding getBinding() {
+ if (binding == null) {
+ binding = createBinding();
+ }
+ return binding;
+ }
+
+ /**
+ * Creates the {@link org.apache.camel.component.sjms.jms.JmsBinding} to use.
+ */
+ protected JmsBinding createBinding() {
+ return new JmsBinding(isMapJmsMessage(), isAllowNullBody(), getHeaderFilterStrategy(), getJmsKeyFormatStrategy(), getMessageCreatedStrategy());
+ }
+
+ /**
+ * Sets the binding used to convert from a Camel message to and from a JMS
+ * message
+ */
+ public void setBinding(JmsBinding binding) {
+ this.binding = binding;
+ }
+
public AggregationStrategy getAggregationStrategy() {
return aggregationStrategy;
}
@@ -141,4 +196,85 @@ public class SjmsBatchEndpoint extends DefaultEndpoint {
this.pollDuration = pollDuration;
}
+ public boolean isAllowNullBody() {
+ return allowNullBody;
+ }
+
+ /**
+ * Whether to allow sending messages with no body. If this option is false and the message body is null, then an JMSException is thrown.
+ */
+ public void setAllowNullBody(boolean allowNullBody) {
+ this.allowNullBody = allowNullBody;
+ }
+
+ public boolean isMapJmsMessage() {
+ return mapJmsMessage;
+ }
+
+ /**
+ * Specifies whether Camel should auto map the received JMS message to a suited payload type, such as javax.jms.TextMessage to a String etc.
+ * See section about how mapping works below for more details.
+ */
+ public void setMapJmsMessage(boolean mapJmsMessage) {
+ this.mapJmsMessage = mapJmsMessage;
+ }
+
+ public MessageCreatedStrategy getMessageCreatedStrategy() {
+ return messageCreatedStrategy;
+ }
+
+ /**
+ * To use the given MessageCreatedStrategy which are invoked when Camel creates new instances of <tt>javax.jms.Message</tt>
+ * objects when Camel is sending a JMS message.
+ */
+ public void setMessageCreatedStrategy(MessageCreatedStrategy messageCreatedStrategy) {
+ this.messageCreatedStrategy = messageCreatedStrategy;
+ }
+
+ public JmsKeyFormatStrategy getJmsKeyFormatStrategy() {
+ if (jmsKeyFormatStrategy == null) {
+ jmsKeyFormatStrategy = new DefaultJmsKeyFormatStrategy();
+ }
+ return jmsKeyFormatStrategy;
+ }
+
+ /**
+ * Pluggable strategy for encoding and decoding JMS keys so they can be compliant with the JMS specification.
+ * Camel provides two implementations out of the box: default and passthrough.
+ * The default strategy will safely marshal dots and hyphens (. and -). The passthrough strategy leaves the key as is.
+ * Can be used for JMS brokers which do not care whether JMS header keys contain illegal characters.
+ * You can provide your own implementation of the org.apache.camel.component.jms.JmsKeyFormatStrategy
+ * and refer to it using the # notation.
+ */
+ public void setJmsKeyFormatStrategy(JmsKeyFormatStrategy jmsKeyFormatStrategy) {
+ this.jmsKeyFormatStrategy = jmsKeyFormatStrategy;
+ }
+
+ public HeaderFilterStrategy getHeaderFilterStrategy() {
+ if (headerFilterStrategy == null) {
+ headerFilterStrategy = new SjmsHeaderFilterStrategy(isIncludeAllJMSXProperties());
+ }
+ return headerFilterStrategy;
+ }
+
+ /**
+ * To use a custom HeaderFilterStrategy to filter header to and from Camel message.
+ */
+ public void setHeaderFilterStrategy(HeaderFilterStrategy strategy) {
+ this.headerFilterStrategy = strategy;
+ }
+
+ public boolean isIncludeAllJMSXProperties() {
+ return includeAllJMSXProperties;
+ }
+
+ /**
+ * Whether to include all JMSXxxx properties when mapping from JMS to Camel Message.
+ * Setting this to true will include properties such as JMSXAppID, and JMSXUserID etc.
+ * Note: If you are using a custom headerFilterStrategy then this option does not apply.
+ */
+ public void setIncludeAllJMSXProperties(boolean includeAllJMSXProperties) {
+ this.includeAllJMSXProperties = includeAllJMSXProperties;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/d19e5d74/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java
index 1598a43..f394008 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java
@@ -25,8 +25,6 @@ import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.component.sjms.SjmsEndpoint;
-import org.apache.camel.component.sjms.jms.JmsMessageHelper;
-import org.apache.camel.impl.DefaultExchange;
import org.apache.camel.spi.Synchronization;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,8 +68,7 @@ public abstract class AbstractMessageHandler implements MessageListener {
public void onMessage(Message message) {
RuntimeCamelException rce = null;
try {
- SjmsEndpoint endpoint = (SjmsEndpoint) getEndpoint();
- final DefaultExchange exchange = (DefaultExchange) JmsMessageHelper.createExchange(message, endpoint, endpoint.getJmsKeyFormatStrategy());
+ final Exchange exchange = getEndpoint().createExchange(message, getSession());
log.debug("Processing Exchange.id:{}", exchange.getExchangeId());
@@ -80,10 +77,10 @@ public abstract class AbstractMessageHandler implements MessageListener {
}
try {
if (isTransacted() || isSynchronous()) {
- log.debug(" Handling synchronous message: {}", exchange.getIn().getBody());
+ log.debug("Handling synchronous message: {}", exchange.getIn().getBody());
handleMessage(exchange);
} else {
- log.debug(" Handling asynchronous message: {}", exchange.getIn().getBody());
+ log.debug("Handling asynchronous message: {}", exchange.getIn().getBody());
executor.execute(new Runnable() {
@Override
public void run() {
@@ -96,12 +93,10 @@ public abstract class AbstractMessageHandler implements MessageListener {
});
}
} catch (Exception e) {
- if (exchange != null) {
- if (exchange.getException() == null) {
- exchange.setException(e);
- } else {
- throw e;
- }
+ if (exchange.getException() == null) {
+ exchange.setException(e);
+ } else {
+ throw e;
}
}
} catch (Exception e) {
@@ -113,9 +108,6 @@ public abstract class AbstractMessageHandler implements MessageListener {
}
}
- /**
- * @param exchange
- */
public abstract void handleMessage(final Exchange exchange);
/**
http://git-wip-us.apache.org/repos/asf/camel/blob/d19e5d74/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java
index 2068da5..97def12 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java
@@ -32,7 +32,6 @@ import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.component.sjms.SjmsEndpoint;
import org.apache.camel.component.sjms.jms.JmsConstants;
-import org.apache.camel.component.sjms.jms.JmsMessageHelper;
import org.apache.camel.spi.Synchronization;
/**
@@ -157,7 +156,9 @@ public class InOutMessageHandler extends AbstractMessageHandler {
@Override
public void done(boolean sync) {
try {
- Message response = JmsMessageHelper.createMessage(exchange, getSession(), getEndpoint());
+ // the response can either be in OUT or IN
+ org.apache.camel.Message msg = exchange.hasOut() ? exchange.getOut() : exchange.getIn();
+ Message response = getEndpoint().getBinding().makeJmsMessage(exchange, msg.getBody(), msg.getHeaders(), getSession(), null);
response.setJMSCorrelationID(exchange.getIn().getHeader(JmsConstants.JMS_CORRELATION_ID, String.class));
localProducer.send(response);
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/camel/blob/d19e5d74/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DefaultJmsKeyFormatStrategy.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DefaultJmsKeyFormatStrategy.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DefaultJmsKeyFormatStrategy.java
index d95d2b5..4fa3308 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DefaultJmsKeyFormatStrategy.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DefaultJmsKeyFormatStrategy.java
@@ -22,7 +22,7 @@ package org.apache.camel.component.sjms.jms;
* This can be used for sending keys contain package names that is common by
* Java frameworks.
*/
-public class DefaultJmsKeyFormatStrategy implements KeyFormatStrategy {
+public class DefaultJmsKeyFormatStrategy implements JmsKeyFormatStrategy {
public String encodeKey(String key) {
String answer = key.replace(".", "_DOT_");
http://git-wip-us.apache.org/repos/asf/camel/blob/d19e5d74/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsBinding.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsBinding.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsBinding.java
new file mode 100644
index 0000000..8dc2841
--- /dev/null
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsBinding.java
@@ -0,0 +1,606 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.jms;
+
+import java.io.File;
+import java.io.InputStream;
+import java.io.Reader;
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.Date;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageFormatException;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.NoTypeConversionAvailableException;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.StreamCache;
+import org.apache.camel.WrappedFile;
+import org.apache.camel.impl.DefaultExchangeHolder;
+import org.apache.camel.spi.HeaderFilterStrategy;
+import org.apache.camel.util.CamelContextHelper;
+import org.apache.camel.util.ExchangeHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Node;
+
+import static org.apache.camel.component.sjms.jms.JmsMessageHelper.normalizeDestinationName;
+
+/**
+ * A Strategy used to convert between a Camel {@link org.apache.camel.Exchange} and {@link org.apache.camel.Message}
+ * to and from a JMS {@link javax.jms.Message}
+ */
+public class JmsBinding {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JmsBinding.class);
+ private final boolean mapJmsMessage;
+ private final boolean allowNullBody;
+ private final HeaderFilterStrategy headerFilterStrategy;
+ private final JmsKeyFormatStrategy jmsJmsKeyFormatStrategy;
+ private final MessageCreatedStrategy messageCreatedStrategy;
+
+ public JmsBinding(boolean mapJmsMessage, boolean allowNullBody,
+ HeaderFilterStrategy headerFilterStrategy, JmsKeyFormatStrategy jmsJmsKeyFormatStrategy,
+ MessageCreatedStrategy messageCreatedStrategy) {
+ this.mapJmsMessage = mapJmsMessage;
+ this.allowNullBody = allowNullBody;
+ this.headerFilterStrategy = headerFilterStrategy;
+ this.jmsJmsKeyFormatStrategy = jmsJmsKeyFormatStrategy;
+ this.messageCreatedStrategy = messageCreatedStrategy;
+ }
+
+ /**
+ * Extracts the body from the JMS message
+ *
+ * @param exchange the exchange
+ * @param message the message to extract its body
+ * @return the body, can be <tt>null</tt>
+ */
+ public Object extractBodyFromJms(Exchange exchange, Message message) {
+ try {
+
+ // TODO: new options to support
+
+ // is a custom message converter configured on endpoint then use it instead of doing the extraction
+ // based on message type
+/* if (endpoint != null && endpoint.getMessageConverter() != null) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Extracting body using a custom MessageConverter: {} from JMS message: {}", endpoint.getMessageConverter(), message);
+ }
+ return endpoint.getMessageConverter().fromMessage(message);
+ }
+*/
+ // if we are configured to not map the jms message then return it as body
+ if (!mapJmsMessage) {
+ LOG.trace("Option map JMS message is false so using JMS message as body: {}", message);
+ return message;
+ }
+
+ if (message instanceof ObjectMessage) {
+ LOG.trace("Extracting body as a ObjectMessage from JMS message: {}", message);
+ ObjectMessage objectMessage = (ObjectMessage)message;
+ Object payload = objectMessage.getObject();
+ if (payload instanceof DefaultExchangeHolder) {
+ DefaultExchangeHolder holder = (DefaultExchangeHolder) payload;
+ DefaultExchangeHolder.unmarshal(exchange, holder);
+ return exchange.getIn().getBody();
+ } else {
+ return objectMessage.getObject();
+ }
+ } else if (message instanceof TextMessage) {
+ LOG.trace("Extracting body as a TextMessage from JMS message: {}", message);
+ TextMessage textMessage = (TextMessage)message;
+ return textMessage.getText();
+ } else if (message instanceof MapMessage) {
+ LOG.trace("Extracting body as a MapMessage from JMS message: {}", message);
+ return createMapFromMapMessage((MapMessage)message);
+ } else if (message instanceof BytesMessage) {
+ LOG.trace("Extracting body as a BytesMessage from JMS message: {}", message);
+ return createByteArrayFromBytesMessage((BytesMessage)message);
+ } else if (message instanceof StreamMessage) {
+ LOG.trace("Extracting body as a StreamMessage from JMS message: {}", message);
+ return message;
+ } else {
+ return null;
+ }
+ } catch (JMSException e) {
+ throw new RuntimeCamelException("Failed to extract body due to: " + e + ". Message: " + message, e);
+ }
+ }
+
+ public Map<String, Object> extractHeadersFromJms(Message jmsMessage, Exchange exchange) {
+ Map<String, Object> map = new HashMap<String, Object>();
+ if (jmsMessage != null) {
+ // lets populate the standard JMS message headers
+ try {
+ map.put("JMSCorrelationID", jmsMessage.getJMSCorrelationID());
+ map.put("JMSCorrelationIDAsBytes", JmsMessageHelper.getJMSCorrelationIDAsBytes(jmsMessage));
+ map.put("JMSDeliveryMode", jmsMessage.getJMSDeliveryMode());
+ map.put("JMSDestination", jmsMessage.getJMSDestination());
+ map.put("JMSExpiration", jmsMessage.getJMSExpiration());
+ map.put("JMSMessageID", jmsMessage.getJMSMessageID());
+ map.put("JMSPriority", jmsMessage.getJMSPriority());
+ map.put("JMSRedelivered", jmsMessage.getJMSRedelivered());
+ map.put("JMSTimestamp", jmsMessage.getJMSTimestamp());
+
+ map.put("JMSReplyTo", JmsMessageHelper.getJMSReplyTo(jmsMessage));
+ map.put("JMSType", JmsMessageHelper.getJMSType(jmsMessage));
+
+ // this works around a bug in the ActiveMQ property handling
+ map.put(JmsConstants.JMSX_GROUP_ID, JmsMessageHelper.getStringProperty(jmsMessage, JmsConstants.JMSX_GROUP_ID));
+ map.put("JMSXUserID", JmsMessageHelper.getStringProperty(jmsMessage, "JMSXUserID"));
+ } catch (JMSException e) {
+ throw new RuntimeCamelException(e);
+ }
+
+ Enumeration<?> names;
+ try {
+ names = jmsMessage.getPropertyNames();
+ } catch (JMSException e) {
+ throw new RuntimeCamelException(e);
+ }
+ while (names.hasMoreElements()) {
+ String name = names.nextElement().toString();
+ try {
+ Object value = JmsMessageHelper.getProperty(jmsMessage, name);
+ if (headerFilterStrategy != null
+ && headerFilterStrategy.applyFilterToExternalHeaders(name, value, exchange)) {
+ continue;
+ }
+
+ // must decode back from safe JMS header name to original header name
+ // when storing on this Camel JmsMessage object.
+ String key = jmsJmsKeyFormatStrategy.decodeKey(name);
+ map.put(key, value);
+ } catch (JMSException e) {
+ throw new RuntimeCamelException(name, e);
+ }
+ }
+ }
+
+ return map;
+ }
+
+ public Object getObjectProperty(Message jmsMessage, String name) throws JMSException {
+ // try a direct lookup first
+ Object answer = jmsMessage.getObjectProperty(name);
+ if (answer == null) {
+ // then encode the key and do another lookup
+ String key = jmsJmsKeyFormatStrategy.encodeKey(name);
+ answer = jmsMessage.getObjectProperty(key);
+ }
+ return answer;
+ }
+
+ protected byte[] createByteArrayFromBytesMessage(BytesMessage message) throws JMSException {
+ if (message.getBodyLength() > Integer.MAX_VALUE) {
+ LOG.warn("Length of BytesMessage is too long: {}", message.getBodyLength());
+ return null;
+ }
+ byte[] result = new byte[(int)message.getBodyLength()];
+ message.readBytes(result);
+ return result;
+ }
+
+ /**
+ * Creates a JMS message from the Camel exchange and message
+ *
+ * @param exchange the current exchange
+ * @param session the JMS session used to create the message
+ * @return a newly created JMS Message instance containing the
+ * @throws JMSException if the message could not be created
+ */
+ public Message makeJmsMessage(Exchange exchange, Session session) throws JMSException {
+ Message answer = makeJmsMessage(exchange, exchange.getIn().getBody(), exchange.getIn().getHeaders(), session, null);
+ if (answer != null && messageCreatedStrategy != null) {
+ messageCreatedStrategy.onMessageCreated(answer, session, exchange, null);
+ }
+ return answer;
+ }
+
+ /**
+ * Creates a JMS message from the Camel exchange and message
+ *
+ * @param exchange the current exchange
+ * @param body the message body
+ * @param headers the message headers
+ * @param session the JMS session used to create the message
+ * @param cause optional exception occurred that should be sent as reply instead of a regular body
+ * @return a newly created JMS Message instance containing the
+ * @throws JMSException if the message could not be created
+ */
+ public Message makeJmsMessage(Exchange exchange, Object body, Map headers, Session session, Exception cause) throws JMSException {
+ Message answer = null;
+
+ // TODO: look at supporting some of these options
+
+/* boolean alwaysCopy = endpoint != null && endpoint.getConfiguration().isAlwaysCopyMessage();
+ boolean force = endpoint != null && endpoint.getConfiguration().isForceSendOriginalMessage();
+ if (!alwaysCopy && camelMessage instanceof JmsMessage) {
+ JmsMessage jmsMessage = (JmsMessage)camelMessage;
+ if (!jmsMessage.shouldCreateNewMessage() || force) {
+ answer = jmsMessage.getJmsMessage();
+
+ if (!force) {
+ // answer must match endpoint type
+ JmsMessageType type = endpoint != null ? endpoint.getConfiguration().getJmsMessageType() : null;
+ if (type != null && answer != null) {
+ if (type == JmsMessageType.Text) {
+ answer = answer instanceof TextMessage ? answer : null;
+ } else if (type == JmsMessageType.Bytes) {
+ answer = answer instanceof BytesMessage ? answer : null;
+ } else if (type == JmsMessageType.Map) {
+ answer = answer instanceof MapMessage ? answer : null;
+ } else if (type == JmsMessageType.Object) {
+ answer = answer instanceof ObjectMessage ? answer : null;
+ } else if (type == JmsMessageType.Stream) {
+ answer = answer instanceof StreamMessage ? answer : null;
+ }
+ }
+ }
+ }
+ }
+*/
+
+ if (answer == null) {
+ if (cause != null) {
+ // an exception occurred so send it as response
+ LOG.debug("Will create JmsMessage with caused exception: {}", cause);
+ // create jms message containing the caused exception
+ answer = createJmsMessage(cause, session);
+ } else {
+ // create regular jms message using the camel message body
+ answer = createJmsMessage(exchange, body, headers, session, exchange.getContext());
+ appendJmsProperties(answer, exchange, headers);
+ }
+ }
+
+ if (answer != null && messageCreatedStrategy != null) {
+ messageCreatedStrategy.onMessageCreated(answer, session, exchange, null);
+ }
+ return answer;
+ }
+
+ /**
+ * Appends the JMS headers from the Camel {@link Message}
+ */
+ public void appendJmsProperties(Message jmsMessage, Exchange exchange, Map<String, Object> headers) throws JMSException {
+ if (headers != null) {
+ Set<Map.Entry<String, Object>> entries = headers.entrySet();
+ for (Map.Entry<String, Object> entry : entries) {
+ String headerName = entry.getKey();
+ Object headerValue = entry.getValue();
+ appendJmsProperty(jmsMessage, exchange, headerName, headerValue);
+ }
+ }
+ }
+
+ public void appendJmsProperty(Message jmsMessage, Exchange exchange, String headerName, Object headerValue) throws JMSException {
+ if (isStandardJMSHeader(headerName)) {
+ if (headerName.equals("JMSCorrelationID")) {
+ jmsMessage.setJMSCorrelationID(ExchangeHelper.convertToType(exchange, String.class, headerValue));
+ } else if (headerName.equals("JMSReplyTo") && headerValue != null) {
+ if (headerValue instanceof String) {
+ // if the value is a String we must normalize it first, and must include the prefix
+ // as ActiveMQ requires that when converting the String to a javax.jms.Destination type
+ headerValue = normalizeDestinationName((String) headerValue, true);
+ }
+ Destination replyTo = ExchangeHelper.convertToType(exchange, Destination.class, headerValue);
+ JmsMessageHelper.setJMSReplyTo(jmsMessage, replyTo);
+ } else if (headerName.equals("JMSType")) {
+ jmsMessage.setJMSType(ExchangeHelper.convertToType(exchange, String.class, headerValue));
+ } else if (headerName.equals("JMSPriority")) {
+ jmsMessage.setJMSPriority(ExchangeHelper.convertToType(exchange, Integer.class, headerValue));
+ } else if (headerName.equals("JMSDeliveryMode")) {
+ JmsMessageHelper.setJMSDeliveryMode(exchange, jmsMessage, headerValue);
+ } else if (headerName.equals("JMSExpiration")) {
+ jmsMessage.setJMSExpiration(ExchangeHelper.convertToType(exchange, Long.class, headerValue));
+ } else {
+ // The following properties are set by the MessageProducer:
+ // JMSDestination
+ // The following are set on the underlying JMS provider:
+ // JMSMessageID, JMSTimestamp, JMSRedelivered
+ // log at trace level to not spam log
+ LOG.trace("Ignoring JMS header: {} with value: {}", headerName, headerValue);
+ }
+ } else if (shouldOutputHeader(headerName, headerValue, exchange)) {
+ // only primitive headers and strings is allowed as properties
+ // see message properties: http://java.sun.com/j2ee/1.4/docs/api/javax/jms/Message.html
+ Object value = getValidJMSHeaderValue(headerName, headerValue);
+ if (value != null) {
+ // must encode to safe JMS header name before setting property on jmsMessage
+ String key = jmsJmsKeyFormatStrategy.encodeKey(headerName);
+ // set the property
+ JmsMessageHelper.setProperty(jmsMessage, key, value);
+ } else if (LOG.isDebugEnabled()) {
+ // okay the value is not a primitive or string so we cannot sent it over the wire
+ LOG.debug("Ignoring non primitive header: {} of class: {} with value: {}",
+ new Object[]{headerName, headerValue.getClass().getName(), headerValue});
+ }
+ }
+ }
+
+ /**
+ * Is the given header a standard JMS header
+ * @param headerName the header name
+ * @return <tt>true</tt> if its a standard JMS header
+ */
+ protected boolean isStandardJMSHeader(String headerName) {
+ if (!headerName.startsWith("JMS")) {
+ return false;
+ }
+ if (headerName.startsWith("JMSX")) {
+ return false;
+ }
+ // vendors will use JMS_XXX as their special headers (where XXX is vendor name, such as JMS_IBM)
+ if (headerName.startsWith("JMS_")) {
+ return false;
+ }
+
+ // the 4th char must be a letter to be a standard JMS header
+ if (headerName.length() > 3) {
+ Character fourth = headerName.charAt(3);
+ if (Character.isLetter(fourth)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * Strategy to test if the given header is valid according to the JMS spec to be set as a property
+ * on the JMS message.
+ * <p/>
+ * This default implementation will allow:
+ * <ul>
+ * <li>any primitives and their counter Objects (Integer, Double etc.)</li>
+ * <li>String and any other literals, Character, CharSequence</li>
+ * <li>Boolean</li>
+ * <li>Number</li>
+ * <li>java.util.Date</li>
+ * </ul>
+ *
+ * @param headerName the header name
+ * @param headerValue the header value
+ * @return the value to use, <tt>null</tt> to ignore this header
+ */
+ protected Object getValidJMSHeaderValue(String headerName, Object headerValue) {
+ if (headerValue instanceof String) {
+ return headerValue;
+ } else if (headerValue instanceof BigInteger) {
+ return headerValue.toString();
+ } else if (headerValue instanceof BigDecimal) {
+ return headerValue.toString();
+ } else if (headerValue instanceof Number) {
+ return headerValue;
+ } else if (headerValue instanceof Character) {
+ return headerValue;
+ } else if (headerValue instanceof CharSequence) {
+ return headerValue.toString();
+ } else if (headerValue instanceof Boolean) {
+ return headerValue;
+ } else if (headerValue instanceof Date) {
+ return headerValue.toString();
+ }
+ return null;
+ }
+
+ protected Message createJmsMessage(Exception cause, Session session) throws JMSException {
+ LOG.trace("Using JmsMessageType: {}", JmsMessageType.Object);
+ Message answer = session.createObjectMessage(cause);
+ // ensure default delivery mode is used by default
+ answer.setJMSDeliveryMode(Message.DEFAULT_DELIVERY_MODE);
+ return answer;
+ }
+
+ protected Message createJmsMessage(Exchange exchange, Object body, Map<String, Object> headers, Session session, CamelContext context) throws JMSException {
+ JmsMessageType type = null;
+
+ // TODO: support some of these options?
+
+/* // special for transferExchange
+ if (endpoint != null && endpoint.isTransferExchange()) {
+ LOG.trace("Option transferExchange=true so we use JmsMessageType: Object");
+ Serializable holder = DefaultExchangeHolder.marshal(exchange);
+ Message answer = session.createObjectMessage(holder);
+ // ensure default delivery mode is used by default
+ answer.setJMSDeliveryMode(Message.DEFAULT_DELIVERY_MODE);
+ return answer;
+ }
+
+ // use a custom message converter
+ if (endpoint != null && endpoint.getMessageConverter() != null) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Creating JmsMessage using a custom MessageConverter: {} with body: {}", endpoint.getMessageConverter(), body);
+ }
+ return endpoint.getMessageConverter().toMessage(body, session);
+ }
+*/
+ // check if header have a type set, if so we force to use it
+/*
+ if (headers.containsKey(JmsConstants.JMS_MESSAGE_TYPE)) {
+ type = context.getTypeConverter().convertTo(JmsMessageType.class, headers.get(JmsConstants.JMS_MESSAGE_TYPE));
+ } else if (endpoint != null && endpoint.getConfiguration().getJmsMessageType() != null) {
+ // force a specific type from the endpoint configuration
+ type = endpoint.getConfiguration().getJmsMessageType();
+ } else {
+*/ type = getJMSMessageTypeForBody(exchange, body, headers, session, context);
+ //}
+
+ // create the JmsMessage based on the type
+ if (type != null) {
+ if (body == null && !allowNullBody) {
+ throw new JMSException("Cannot send message as message body is null, and option allowNullBody is false.");
+ }
+ LOG.trace("Using JmsMessageType: {}", type);
+ Message answer = createJmsMessageForType(exchange, body, headers, session, context, type);
+ // ensure default delivery mode is used by default
+ answer.setJMSDeliveryMode(Message.DEFAULT_DELIVERY_MODE);
+ return answer;
+ }
+
+ // check for null body
+ if (body == null && !allowNullBody) {
+ throw new JMSException("Cannot send message as message body is null, and option allowNullBody is false.");
+ }
+
+ // warn if the body could not be mapped
+ if (body != null && LOG.isWarnEnabled()) {
+ LOG.warn("Cannot determine specific JmsMessage type to use from body class."
+ + " Will use generic JmsMessage."
+ + " Body class: " + ObjectHelper.classCanonicalName(body)
+ + ". If you want to send a POJO then your class might need to implement java.io.Serializable"
+ + ", or you can force a specific type by setting the jmsMessageType option on the JMS endpoint.");
+ }
+
+ // return a default message
+ Message answer = session.createMessage();
+ // ensure default delivery mode is used by default
+ answer.setJMSDeliveryMode(Message.DEFAULT_DELIVERY_MODE);
+ return answer;
+ }
+
+ /**
+ * Return the {@link JmsMessageType}
+ *
+ * @return type or null if no mapping was possible
+ */
+ protected JmsMessageType getJMSMessageTypeForBody(Exchange exchange, Object body, Map<String, Object> headers, Session session, CamelContext context) {
+ JmsMessageType type = null;
+ // let body determine the type
+ if (body instanceof Node || body instanceof String) {
+ type = JmsMessageType.Text;
+ } else if (body instanceof byte[] || body instanceof WrappedFile || body instanceof File || body instanceof Reader
+ || body instanceof InputStream || body instanceof ByteBuffer || body instanceof StreamCache) {
+ type = JmsMessageType.Bytes;
+ } else if (body instanceof Map) {
+ type = JmsMessageType.Map;
+ } else if (body instanceof Serializable) {
+ type = JmsMessageType.Object;
+ } else if (exchange.getContext().getTypeConverter().tryConvertTo(File.class, body) != null
+ || exchange.getContext().getTypeConverter().tryConvertTo(InputStream.class, body) != null) {
+ type = JmsMessageType.Bytes;
+ }
+ return type;
+ }
+
+ /**
+ *
+ * Create the {@link Message}
+ *
+ * @return jmsMessage or null if the mapping was not successfully
+ */
+ protected Message createJmsMessageForType(Exchange exchange, Object body, Map<String, Object> headers, Session session, CamelContext context, JmsMessageType type) throws JMSException {
+ switch (type) {
+ case Text: {
+ TextMessage message = session.createTextMessage();
+ if (body != null) {
+ String payload = context.getTypeConverter().convertTo(String.class, exchange, body);
+ message.setText(payload);
+ }
+ return message;
+ }
+ case Bytes: {
+ BytesMessage message = session.createBytesMessage();
+ if (body != null) {
+ byte[] payload = context.getTypeConverter().convertTo(byte[].class, exchange, body);
+ message.writeBytes(payload);
+ }
+ return message;
+ }
+ case Map: {
+ MapMessage message = session.createMapMessage();
+ if (body != null) {
+ Map<?, ?> payload = context.getTypeConverter().convertTo(Map.class, exchange, body);
+ populateMapMessage(message, payload, context);
+ }
+ return message;
+ }
+ case Object:
+ ObjectMessage message = session.createObjectMessage();
+ if (body != null) {
+ try {
+ Serializable payload = context.getTypeConverter().mandatoryConvertTo(Serializable.class, exchange, body);
+ message.setObject(payload);
+ } catch (NoTypeConversionAvailableException e) {
+ // cannot convert to serializable then thrown an exception to avoid sending a null message
+ JMSException cause = new MessageFormatException(e.getMessage());
+ cause.initCause(e);
+ throw cause;
+ }
+ }
+ return message;
+ default:
+ break;
+ }
+ return null;
+ }
+ /**
+ * Populates a {@link MapMessage} from a {@link Map} instance.
+ */
+ protected void populateMapMessage(MapMessage message, Map<?, ?> map, CamelContext context)
+ throws JMSException {
+ for (Map.Entry<?, ?> entry : map.entrySet()) {
+ String keyString = CamelContextHelper.convertTo(context, String.class, entry.getKey());
+ if (keyString != null) {
+ message.setObject(keyString, entry.getValue());
+ }
+ }
+ }
+
+ /**
+ * Extracts a {@link Map} from a {@link MapMessage}
+ */
+ public Map<String, Object> createMapFromMapMessage(MapMessage message) throws JMSException {
+ Map<String, Object> answer = new HashMap<String, Object>();
+ Enumeration<?> names = message.getMapNames();
+ while (names.hasMoreElements()) {
+ String name = names.nextElement().toString();
+ Object value = message.getObject(name);
+ answer.put(name, value);
+ }
+ return answer;
+ }
+
+ /**
+ * Strategy to allow filtering of headers which are put on the JMS message
+ * <p/>
+ * <b>Note</b>: Currently only supports sending java identifiers as keys
+ */
+ protected boolean shouldOutputHeader(String headerName, Object headerValue, Exchange exchange) {
+ return headerFilterStrategy == null
+ || !headerFilterStrategy.applyFilterToCamelHeaders(headerName, headerValue, exchange);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/d19e5d74/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsKeyFormatStrategy.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsKeyFormatStrategy.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsKeyFormatStrategy.java
new file mode 100644
index 0000000..5a0327e
--- /dev/null
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsKeyFormatStrategy.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.jms;
+
+/**
+ * Strategy for applying encoding and decoding of JMS headers so they apply to
+ * the JMS spec.
+ */
+public interface JmsKeyFormatStrategy {
+
+ /**
+ * Encodes the key before its sent as a {@link javax.jms.Message} message.
+ *
+ * @param key the original key
+ * @return the encoded key
+ */
+ String encodeKey(String key);
+
+ /**
+ * Decodes the key after its received from a {@link javax.jms.Message}
+ * message.
+ *
+ * @param key the encoded key
+ * @return the decoded key as the original key
+ */
+ String decodeKey(String key);
+}