You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/09/07 11:38:50 UTC

[1/5] camel git commit: CAMEL-9116: camel-sjms should use same binding to/from JMS as camel-jms does.

Repository: camel
Updated Branches:
  refs/heads/master eef0c490d -> 25103bf65


CAMEL-9116: camel-sjms should use same binding to/from JMS as camel-jms does.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5b1d8da9
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5b1d8da9
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5b1d8da9

Branch: refs/heads/master
Commit: 5b1d8da9b9b537615e764ebb4f0f8298e83421a8
Parents: eef0c49
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Sep 7 09:31:30 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Sep 7 09:31:30 2015 +0200

----------------------------------------------------------------------
 .../camel/component/sjms/jms/JmsConstants.java  |   5 +
 .../component/sjms/jms/JmsMessageHelper.java    | 339 ++++++++++++-------
 .../component/sjms/producer/InOnlyProducer.java |   8 +-
 3 files changed, 227 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/5b1d8da9/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsConstants.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsConstants.java
index 08e0339..ccf1f96 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsConstants.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsConstants.java
@@ -21,6 +21,11 @@ package org.apache.camel.component.sjms.jms;
  */
 public interface JmsConstants {
 
+    String QUEUE_PREFIX = "queue:";
+    String TOPIC_PREFIX = "topic:";
+    String TEMP_QUEUE_PREFIX = "temp:queue:";
+    String TEMP_TOPIC_PREFIX = "temp:topic:";
+
     /**
      * Set by the publishing Client
      */

http://git-wip-us.apache.org/repos/asf/camel/blob/5b1d8da9/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java
index 79787c9..0a93f16 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java
@@ -20,10 +20,13 @@ import java.io.File;
 import java.io.InputStream;
 import java.io.Reader;
 import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.nio.CharBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Date;
 import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.List;
@@ -47,16 +50,20 @@ import org.apache.camel.TypeConverter;
 import org.apache.camel.component.sjms.SjmsConstants;
 import org.apache.camel.component.sjms.SjmsEndpoint;
 import org.apache.camel.impl.DefaultMessage;
+import org.apache.camel.spi.HeaderFilterStrategy;
+import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.camel.util.ObjectHelper.removeStartingCharacters;
+
 /**
  * Utility class for {@link javax.jms.Message}.
  */
 public final class JmsMessageHelper implements JmsConstants {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(JmsMessageHelper.class);
+    private static final Logger LOG = LoggerFactory.getLogger(JmsMessageHelper.class);
 
     private JmsMessageHelper() {
     }
@@ -98,7 +105,7 @@ public final class JmsMessageHelper implements JmsConstants {
                 case Bytes:
                     BytesMessage bytesMessage = (BytesMessage) message;
                     if (bytesMessage.getBodyLength() > Integer.MAX_VALUE) {
-                        LOGGER.warn("Length of BytesMessage is too long: {}", bytesMessage.getBodyLength());
+                        LOG.warn("Length of BytesMessage is too long: {}", bytesMessage.getBodyLength());
                         return null;
                     }
                     byte[] result = new byte[(int) bytesMessage.getBodyLength()];
@@ -164,16 +171,17 @@ public final class JmsMessageHelper implements JmsConstants {
             bodyHeaders = new HashMap<String, Object>(exchange.getIn().getHeaders());
         }
 
-        answer = createMessage(session, body, bodyHeaders, endpoint);
+        answer = createMessage(exchange, session, body, bodyHeaders, endpoint);
         return answer;
     }
 
-    public static Message createMessage(Session session, Object payload, Map<String, Object> messageHeaders, SjmsEndpoint endpoint) throws Exception {
-        return createMessage(session, payload, messageHeaders, endpoint.isAllowNullBody(), endpoint.getJmsKeyFormatStrategy(), endpoint.getCamelContext().getTypeConverter());
+    public static Message createMessage(Exchange exchange, Session session, Object payload, Map<String, Object> messageHeaders, SjmsEndpoint endpoint) throws Exception {
+        return createMessage(exchange, session, payload, messageHeaders, endpoint.isAllowNullBody(),
+                endpoint.getSjmsHeaderFilterStrategy(), endpoint.getJmsKeyFormatStrategy(), endpoint.getCamelContext().getTypeConverter());
     }
 
-    private static Message createMessage(Session session, Object payload, Map<String, Object> messageHeaders, boolean allowNullBody,
-                                         KeyFormatStrategy keyFormatStrategy, TypeConverter typeConverter) throws Exception {
+    private static Message createMessage(Exchange exchange, Session session, Object payload, Map<String, Object> messageHeaders, boolean allowNullBody,
+                                         HeaderFilterStrategy headerFilterStrategy, KeyFormatStrategy keyFormatStrategy, TypeConverter typeConverter) throws Exception {
         Message answer = null;
         JmsMessageType messageType = JmsMessageHelper.discoverMessageTypeFromPayload(payload);
 
@@ -224,113 +232,134 @@ public final class JmsMessageHelper implements JmsConstants {
             break;
         }
 
-        if (messageHeaders != null && !messageHeaders.isEmpty()) {
-            answer = JmsMessageHelper.setJmsMessageHeaders(answer, messageHeaders, keyFormatStrategy);
-        }
+        appendJmsProperties(answer, exchange, exchange.getIn(), headerFilterStrategy, keyFormatStrategy);
         return answer;
     }
 
     /**
-     * Adds or updates the {@link Message} headers. Header names and values are
-     * checked for JMS 1.1 compliance.
-     *
-     * @param jmsMessage        the {@link Message} to add or update the headers on
-     * @param messageHeaders    a {@link Map} of String/Object pairs
-     * @param keyFormatStrategy the a {@link KeyFormatStrategy} to used to
-     *                          format keys in a JMS 1.1 compliant manner.
-     * @return {@link Message}
+     * Appends the JMS headers from the Camel {@link Message}
      */
-    private static Message setJmsMessageHeaders(final Message jmsMessage, Map<String, Object> messageHeaders, KeyFormatStrategy keyFormatStrategy) throws IllegalHeaderException {
-
-        Map<String, Object> headers = new HashMap<String, Object>(messageHeaders);
-        for (final Map.Entry<String, Object> entry : headers.entrySet()) {
+    private static void appendJmsProperties(Message jmsMessage, Exchange exchange, org.apache.camel.Message in,
+                                            HeaderFilterStrategy headerFilterStrategy, KeyFormatStrategy keyFormatStrategy) throws JMSException {
+        Set<Map.Entry<String, Object>> entries = in.getHeaders().entrySet();
+        for (Map.Entry<String, Object> entry : entries) {
             String headerName = entry.getKey();
             Object headerValue = entry.getValue();
+            appendJmsProperty(jmsMessage, exchange, in, headerName, headerValue, headerFilterStrategy, keyFormatStrategy);
+        }
+    }
 
-            if (headerName.equalsIgnoreCase(JMS_CORRELATION_ID)) {
-                if (headerValue == null) {
-                    // Value can be null but we can't cast a null to a String
-                    // so pass null to the setter
-                    setCorrelationId(jmsMessage, null);
-                } else if (headerValue instanceof String) {
-                    setCorrelationId(jmsMessage, (String) headerValue);
-                } else {
-                    throw new IllegalHeaderException("The " + JMS_CORRELATION_ID + " must either be a String or null.  Found: " + headerValue.getClass().getName());
-                }
-            } else if (headerName.equalsIgnoreCase(JMS_REPLY_TO)) {
+    private static void appendJmsProperty(Message jmsMessage, Exchange exchange, org.apache.camel.Message in,
+                                  String headerName, Object headerValue,
+                                  HeaderFilterStrategy headerFilterStrategy, KeyFormatStrategy keyFormatStrategy) throws JMSException {
+        if (isStandardJMSHeader(headerName)) {
+            if (headerName.equals("JMSCorrelationID")) {
+                jmsMessage.setJMSCorrelationID(ExchangeHelper.convertToType(exchange, String.class, headerValue));
+            } else if (headerName.equals("JMSReplyTo") && headerValue != null) {
                 if (headerValue instanceof String) {
-                    // FIXME Setting the reply to appears broken. walk back
-                    // through it. If the value is a String we must normalize it
-                    // first
-                } else {
-                    // TODO write destination converter
-                    // Destination replyTo =
-                    // ExchangeHelper.convertToType(exchange,
-                    // Destination.class,
-                    // headerValue);
-                    // jmsMessage.setJMSReplyTo(replyTo);
-                }
-            } else if (headerName.equalsIgnoreCase(JMS_TYPE)) {
-                if (headerValue == null) {
-                    // Value can be null but we can't cast a null to a String
-                    // so pass null to the setter
-                    setMessageType(jmsMessage, null);
-                } else if (headerValue instanceof String) {
-                    // Not null but is a String
-                    setMessageType(jmsMessage, (String) headerValue);
-                } else {
-                    throw new IllegalHeaderException("The " + JMS_TYPE + " must either be a String or null.  Found: " + headerValue.getClass().getName());
-                }
-            } else if (headerName.equalsIgnoreCase(JMS_PRIORITY)) {
-                if (headerValue instanceof Integer) {
-                    try {
-                        jmsMessage.setJMSPriority((Integer) headerValue);
-                    } catch (JMSException e) {
-                        throw new IllegalHeaderException("Failed to set the " + JMS_PRIORITY + " header. Cause: " + e.getLocalizedMessage(), e);
-                    }
-                } else {
-                    throw new IllegalHeaderException("The " + JMS_PRIORITY + " must be a Integer.  Type found: " + headerValue.getClass().getName());
-                }
-            } else if (headerName.equalsIgnoreCase(JMS_DELIVERY_MODE)) {
-                try {
-                    JmsMessageHelper.setJMSDeliveryMode(jmsMessage, headerValue);
-                } catch (JMSException e) {
-                    throw new IllegalHeaderException("Failed to set the " + JMS_DELIVERY_MODE + " header. Cause: " + e.getLocalizedMessage(), e);
-                }
-            } else if (headerName.equalsIgnoreCase(JMS_EXPIRATION)) {
-                if (headerValue instanceof Long) {
-                    try {
-                        jmsMessage.setJMSExpiration((Long) headerValue);
-                    } catch (JMSException e) {
-                        throw new IllegalHeaderException("Failed to set the " + JMS_EXPIRATION + " header. Cause: " + e.getLocalizedMessage(), e);
-                    }
-                } else {
-                    throw new IllegalHeaderException("The " + JMS_EXPIRATION + " must be a Long.  Type found: " + headerValue.getClass().getName());
+                    // if the value is a String we must normalize it first, and must include the prefix
+                    // as ActiveMQ requires that when converting the String to a javax.jms.Destination type
+                    headerValue = normalizeDestinationName((String) headerValue, true);
                 }
+                Destination replyTo = ExchangeHelper.convertToType(exchange, Destination.class, headerValue);
+                JmsMessageHelper.setJMSReplyTo(jmsMessage, replyTo);
+            } else if (headerName.equals("JMSType")) {
+                jmsMessage.setJMSType(ExchangeHelper.convertToType(exchange, String.class, headerValue));
+            } else if (headerName.equals("JMSPriority")) {
+                jmsMessage.setJMSPriority(ExchangeHelper.convertToType(exchange, Integer.class, headerValue));
+            } else if (headerName.equals("JMSDeliveryMode")) {
+                JmsMessageHelper.setJMSDeliveryMode(exchange, jmsMessage, headerValue);
+            } else if (headerName.equals("JMSExpiration")) {
+                jmsMessage.setJMSExpiration(ExchangeHelper.convertToType(exchange, Long.class, headerValue));
             } else {
-                LOGGER.trace("Ignoring JMS header: {} with value: {}", headerName, headerValue);
-                if (headerName.equalsIgnoreCase(JMS_DESTINATION) || headerName.equalsIgnoreCase(JMS_MESSAGE_ID) || headerName.equalsIgnoreCase(JMS_TIMESTAMP)
-                        || headerName.equalsIgnoreCase(JMS_REDELIVERED)) {
-                    // The following properties are set by the
-                    // MessageProducer:
-                    // JMSDestination
-                    // The following are set on the underlying JMS provider:
-                    // JMSMessageID, JMSTimestamp, JMSRedelivered
-                    // log at trace level to not spam log
-                    LOGGER.trace("Ignoring JMS header: {} with value: {}", headerName, headerValue);
-                } else {
-                    if (!(headerValue instanceof JmsMessageType)) {
-                        String encodedName = keyFormatStrategy.encodeKey(headerName);
-                        try {
-                            JmsMessageHelper.setProperty(jmsMessage, encodedName, headerValue);
-                        } catch (JMSException e) {
-                            throw new IllegalHeaderException("Failed to set the header " + encodedName + " header. Cause: " + e.getLocalizedMessage(), e);
-                        }
-                    }
-                }
+                // The following properties are set by the MessageProducer:
+                // JMSDestination
+                // The following are set on the underlying JMS provider:
+                // JMSMessageID, JMSTimestamp, JMSRedelivered
+                // log at trace level to not spam log
+                LOG.trace("Ignoring JMS header: {} with value: {}", headerName, headerValue);
+            }
+        } else if (shouldOutputHeader(in, headerName, headerValue, exchange, headerFilterStrategy)) {
+            // only primitive headers and strings is allowed as properties
+            // see message properties: http://java.sun.com/j2ee/1.4/docs/api/javax/jms/Message.html
+            Object value = getValidJMSHeaderValue(headerName, headerValue);
+            if (value != null) {
+                // must encode to safe JMS header name before setting property on jmsMessage
+                String key = keyFormatStrategy.encodeKey(headerName);
+                // set the property
+                JmsMessageHelper.setProperty(jmsMessage, key, value);
+            } else if (LOG.isDebugEnabled()) {
+                // okay the value is not a primitive or string so we cannot sent it over the wire
+                LOG.debug("Ignoring non primitive header: {} of class: {} with value: {}",
+                        new Object[]{headerName, headerValue.getClass().getName(), headerValue});
+            }
+        }
+    }
+
+    /**
+     * Is the given header a standard JMS header
+     * @param headerName the header name
+     * @return <tt>true</tt> if its a standard JMS header
+     */
+    protected static boolean isStandardJMSHeader(String headerName) {
+        if (!headerName.startsWith("JMS")) {
+            return false;
+        }
+        if (headerName.startsWith("JMSX")) {
+            return false;
+        }
+        // vendors will use JMS_XXX as their special headers (where XXX is vendor name, such as JMS_IBM)
+        if (headerName.startsWith("JMS_")) {
+            return false;
+        }
+
+        // the 4th char must be a letter to be a standard JMS header
+        if (headerName.length() > 3) {
+            Character fourth = headerName.charAt(3);
+            if (Character.isLetter(fourth)) {
+                return true;
             }
         }
-        return jmsMessage;
+
+        return false;
+    }
+
+    /**
+     * Strategy to test if the given header is valid according to the JMS spec to be set as a property
+     * on the JMS message.
+     * <p/>
+     * This default implementation will allow:
+     * <ul>
+     *   <li>any primitives and their counter Objects (Integer, Double etc.)</li>
+     *   <li>String and any other literals, Character, CharSequence</li>
+     *   <li>Boolean</li>
+     *   <li>Number</li>
+     *   <li>java.util.Date</li>
+     * </ul>
+     *
+     * @param headerName   the header name
+     * @param headerValue  the header value
+     * @return  the value to use, <tt>null</tt> to ignore this header
+     */
+    protected static Object getValidJMSHeaderValue(String headerName, Object headerValue) {
+        if (headerValue instanceof String) {
+            return headerValue;
+        } else if (headerValue instanceof BigInteger) {
+            return headerValue.toString();
+        } else if (headerValue instanceof BigDecimal) {
+            return headerValue.toString();
+        } else if (headerValue instanceof Number) {
+            return headerValue;
+        } else if (headerValue instanceof Character) {
+            return headerValue;
+        } else if (headerValue instanceof CharSequence) {
+            return headerValue.toString();
+        } else if (headerValue instanceof Boolean) {
+            return headerValue;
+        } else if (headerValue instanceof Date) {
+            return headerValue.toString();
+        }
+        return null;
     }
 
     @SuppressWarnings("unchecked")
@@ -375,6 +404,17 @@ public final class JmsMessageHelper implements JmsConstants {
     }
 
     /**
+     * Strategy to allow filtering of headers which are put on the JMS message
+     * <p/>
+     * <b>Note</b>: Currently only supports sending java identifiers as keys
+     */
+    protected static boolean shouldOutputHeader(org.apache.camel.Message camelMessage, String headerName,
+                                               Object headerValue, Exchange exchange, HeaderFilterStrategy headerFilterStrategy) {
+        return headerFilterStrategy == null
+                || !headerFilterStrategy.applyFilterToCamelHeaders(headerName, headerValue, exchange);
+    }
+
+    /**
      * Gets the JMSType from the message.
      *
      * @param message the message
@@ -393,41 +433,41 @@ public final class JmsMessageHelper implements JmsConstants {
     /**
      * Sets the JMSDeliveryMode on the message.
      *
-     * @param message      the message
-     * @param deliveryMode the delivery mode, either as a String or integer
+     * @param exchange the exchange
+     * @param message  the message
+     * @param deliveryMode  the delivery mode, either as a String or integer
      * @throws javax.jms.JMSException is thrown if error setting the delivery mode
      */
-    public static void setJMSDeliveryMode(Message message, Object deliveryMode) throws JMSException {
-        Integer mode;
+    public static void setJMSDeliveryMode(Exchange exchange, Message message, Object deliveryMode) throws JMSException {
+        Integer mode = null;
 
         if (deliveryMode instanceof String) {
             String s = (String) deliveryMode;
-            if (JMS_DELIVERY_MODE_PERSISTENT.equalsIgnoreCase(s)) {
+            if ("PERSISTENT".equalsIgnoreCase(s)) {
                 mode = DeliveryMode.PERSISTENT;
-            } else if (JMS_DELIVERY_MODE_NON_PERSISTENT.equalsIgnoreCase(s)) {
+            } else if ("NON_PERSISTENT".equalsIgnoreCase(s)) {
                 mode = DeliveryMode.NON_PERSISTENT;
             } else {
                 // it may be a number in the String so try that
-                Integer value = null;
-                try {
-                    value = Integer.valueOf(s);
-                } catch (NumberFormatException e) {
-                    // Do nothing. The error handler below is sufficient
-                }
+                Integer value = ExchangeHelper.convertToType(exchange, Integer.class, deliveryMode);
                 if (value != null) {
                     mode = value;
                 } else {
                     throw new IllegalArgumentException("Unknown delivery mode with value: " + deliveryMode);
                 }
             }
-        } else if (deliveryMode instanceof Integer) {
-            // fallback and try to convert to a number
-            mode = (Integer) deliveryMode;
         } else {
-            throw new IllegalArgumentException("Unable to convert the given delivery mode of type " + deliveryMode.getClass().getName() + " with value: " + deliveryMode);
+            // fallback and try to convert to a number
+            Integer value = ExchangeHelper.convertToType(exchange, Integer.class, deliveryMode);
+            if (value != null) {
+                mode = value;
+            }
         }
 
-        message.setJMSDeliveryMode(mode);
+        if (mode != null) {
+            message.setJMSDeliveryMode(mode);
+            message.setIntProperty(JmsConstants.JMS_DELIVERY_MODE, mode);
+        }
     }
 
     /**
@@ -442,7 +482,7 @@ public final class JmsMessageHelper implements JmsConstants {
         try {
             message.setJMSType(type);
         } catch (JMSException e) {
-            LOGGER.debug("Error setting the message type: {}", type, e);
+            LOG.debug("Error setting the message type: {}", type, e);
         }
     }
 
@@ -458,7 +498,7 @@ public final class JmsMessageHelper implements JmsConstants {
         try {
             message.setJMSCorrelationID(correlationId);
         } catch (JMSException e) {
-            LOGGER.debug("Error setting the correlationId: {}", correlationId, e);
+            LOG.debug("Error setting the correlationId: {}", correlationId, e);
         }
     }
 
@@ -472,7 +512,7 @@ public final class JmsMessageHelper implements JmsConstants {
         try {
             message.setJMSReplyTo(replyTo);
         } catch (Exception e) {
-            LOGGER.debug("Error setting the correlationId: {}", replyTo.toString());
+            LOG.debug("Error setting the correlationId: {}", replyTo.toString());
         }
     }
 
@@ -589,4 +629,61 @@ public final class JmsMessageHelper implements JmsConstants {
         return key == null || key.isEmpty() || key.contains(".") || key.contains("-");
     }
 
+    /**
+     * Normalizes the destination name.
+     * <p/>
+     * This ensures the destination name is correct, and we do not create queues as <tt>queue://queue:foo</tt>, which
+     * was intended as <tt>queue://foo</tt>.
+     *
+     * @param destination the destination
+     * @return the normalized destination
+     */
+    public static String normalizeDestinationName(String destination) {
+        // do not include prefix which is the current behavior when using this method.
+        return normalizeDestinationName(destination, false);
+    }
+
+    /**
+     * Normalizes the destination name.
+     * <p/>
+     * This ensures the destination name is correct, and we do not create queues as <tt>queue://queue:foo</tt>, which
+     * was intended as <tt>queue://foo</tt>.
+     *
+     * @param destination the destination
+     * @param includePrefix whether to include <tt>queue://</tt>, or <tt>topic://</tt> prefix in the normalized destination name
+     * @return the normalized destination
+     */
+    public static String normalizeDestinationName(String destination, boolean includePrefix) {
+        if (ObjectHelper.isEmpty(destination)) {
+            return destination;
+        }
+        if (destination.startsWith(QUEUE_PREFIX)) {
+            String s = removeStartingCharacters(destination.substring(QUEUE_PREFIX.length()), '/');
+            if (includePrefix) {
+                s = QUEUE_PREFIX + "//" + s;
+            }
+            return s;
+        } else if (destination.startsWith(TEMP_QUEUE_PREFIX)) {
+            String s = removeStartingCharacters(destination.substring(TEMP_QUEUE_PREFIX.length()), '/');
+            if (includePrefix) {
+                s = TEMP_QUEUE_PREFIX + "//" + s;
+            }
+            return s;
+        } else if (destination.startsWith(TOPIC_PREFIX)) {
+            String s = removeStartingCharacters(destination.substring(TOPIC_PREFIX.length()), '/');
+            if (includePrefix) {
+                s = TOPIC_PREFIX + "//" + s;
+            }
+            return s;
+        } else if (destination.startsWith(TEMP_TOPIC_PREFIX)) {
+            String s = removeStartingCharacters(destination.substring(TEMP_TOPIC_PREFIX.length()), '/');
+            if (includePrefix) {
+                s = TEMP_TOPIC_PREFIX + "//" + s;
+            }
+            return s;
+        } else {
+            return destination;
+        }
+    }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/5b1d8da9/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
index 71f5770..3fa23b0 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
@@ -93,19 +93,19 @@ public class InOnlyProducer extends SjmsProducer {
                         Message message;
                         if (BatchMessage.class.isInstance(object)) {
                             BatchMessage<?> batchMessage = (BatchMessage<?>) object;
-                            message = JmsMessageHelper.createMessage(producer.getSession(), batchMessage.getPayload(), batchMessage.getHeaders(), getEndpoint());
+                            message = JmsMessageHelper.createMessage(exchange, producer.getSession(), batchMessage.getPayload(), batchMessage.getHeaders(), getEndpoint());
                         } else {
-                            message = JmsMessageHelper.createMessage(producer.getSession(), object, exchange.getIn().getHeaders(), getEndpoint());
+                            message = JmsMessageHelper.createMessage(exchange, producer.getSession(), object, exchange.getIn().getHeaders(), getEndpoint());
                         }
                         messages.add(message);
                     }
                 } else {
                     Object payload = exchange.getIn().getBody();
-                    Message message = JmsMessageHelper.createMessage(producer.getSession(), payload, exchange.getIn().getHeaders(), getEndpoint());
+                    Message message = JmsMessageHelper.createMessage(exchange, producer.getSession(), payload, exchange.getIn().getHeaders(), getEndpoint());
                     messages.add(message);
                 }
             } else {
-                Message message = JmsMessageHelper.createMessage(producer.getSession(), null, exchange.getIn().getHeaders(), getEndpoint());
+                Message message = JmsMessageHelper.createMessage(exchange, producer.getSession(), null, exchange.getIn().getHeaders(), getEndpoint());
                 messages.add(message);
             }
 


[5/5] camel git commit: CAMEL-9116: camel-sjms should use same binding to/from JMS as camel-jms does.

Posted by da...@apache.org.
CAMEL-9116: camel-sjms should use same binding to/from JMS as camel-jms does.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/25103bf6
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/25103bf6
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/25103bf6

Branch: refs/heads/master
Commit: 25103bf65d15da2eafe147299f563385c33ff526
Parents: 93bf668
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Sep 7 11:24:54 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Sep 7 11:24:54 2015 +0200

----------------------------------------------------------------------
 .../camel/component/sjms/jms/JmsBinding.java    | 78 ++++++++++----------
 .../component/sjms/producer/InOutProducer.java  | 41 ++++------
 2 files changed, 55 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/25103bf6/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsBinding.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsBinding.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsBinding.java
index 8dc2841..773813c 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsBinding.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsBinding.java
@@ -39,6 +39,8 @@ import javax.jms.Session;
 import javax.jms.StreamMessage;
 import javax.jms.TextMessage;
 
+import org.w3c.dom.Node;
+
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.NoTypeConversionAvailableException;
@@ -52,7 +54,6 @@ import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.w3c.dom.Node;
 
 import static org.apache.camel.component.sjms.jms.JmsMessageHelper.normalizeDestinationName;
 
@@ -455,7 +456,8 @@ public class JmsBinding {
             // force a specific type from the endpoint configuration
             type = endpoint.getConfiguration().getJmsMessageType();
         } else {
-*/            type = getJMSMessageTypeForBody(exchange, body, headers, session, context);
+*/
+        type = getJMSMessageTypeForBody(exchange, body, headers, session, context);
         //}
 
         // create the JmsMessage based on the type
@@ -523,46 +525,46 @@ public class JmsBinding {
      */
     protected Message createJmsMessageForType(Exchange exchange, Object body, Map<String, Object> headers, Session session, CamelContext context, JmsMessageType type) throws JMSException {
         switch (type) {
-            case Text: {
-                TextMessage message = session.createTextMessage();
-                if (body != null) {
-                    String payload = context.getTypeConverter().convertTo(String.class, exchange, body);
-                    message.setText(payload);
-                }
-                return message;
+        case Text: {
+            TextMessage message = session.createTextMessage();
+            if (body != null) {
+                String payload = context.getTypeConverter().convertTo(String.class, exchange, body);
+                message.setText(payload);
             }
-            case Bytes: {
-                BytesMessage message = session.createBytesMessage();
-                if (body != null) {
-                    byte[] payload = context.getTypeConverter().convertTo(byte[].class, exchange, body);
-                    message.writeBytes(payload);
-                }
-                return message;
+            return message;
+        }
+        case Bytes: {
+            BytesMessage message = session.createBytesMessage();
+            if (body != null) {
+                byte[] payload = context.getTypeConverter().convertTo(byte[].class, exchange, body);
+                message.writeBytes(payload);
             }
-            case Map: {
-                MapMessage message = session.createMapMessage();
-                if (body != null) {
-                    Map<?, ?> payload = context.getTypeConverter().convertTo(Map.class, exchange, body);
-                    populateMapMessage(message, payload, context);
-                }
-                return message;
+            return message;
+        }
+        case Map: {
+            MapMessage message = session.createMapMessage();
+            if (body != null) {
+                Map<?, ?> payload = context.getTypeConverter().convertTo(Map.class, exchange, body);
+                populateMapMessage(message, payload, context);
             }
-            case Object:
-                ObjectMessage message = session.createObjectMessage();
-                if (body != null) {
-                    try {
-                        Serializable payload = context.getTypeConverter().mandatoryConvertTo(Serializable.class, exchange, body);
-                        message.setObject(payload);
-                    } catch (NoTypeConversionAvailableException e) {
-                        // cannot convert to serializable then thrown an exception to avoid sending a null message
-                        JMSException cause = new MessageFormatException(e.getMessage());
-                        cause.initCause(e);
-                        throw cause;
-                    }
+            return message;
+        }
+        case Object:
+            ObjectMessage message = session.createObjectMessage();
+            if (body != null) {
+                try {
+                    Serializable payload = context.getTypeConverter().mandatoryConvertTo(Serializable.class, exchange, body);
+                    message.setObject(payload);
+                } catch (NoTypeConversionAvailableException e) {
+                    // cannot convert to serializable then thrown an exception to avoid sending a null message
+                    JMSException cause = new MessageFormatException(e.getMessage());
+                    cause.initCause(e);
+                    throw cause;
                 }
-                return message;
-            default:
-                break;
+            }
+            return message;
+        default:
+            break;
         }
         return null;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/25103bf6/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
index 1c535b6..202b429 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
@@ -17,7 +17,6 @@
 package org.apache.camel.component.sjms.producer;
 
 import java.util.Map;
-import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Exchanger;
 import java.util.concurrent.TimeUnit;
@@ -43,7 +42,6 @@ import org.apache.camel.component.sjms.jms.JmsMessageHelper;
 import org.apache.camel.component.sjms.jms.JmsObjectFactory;
 import org.apache.camel.component.sjms.tx.SessionTransactionSynchronization;
 import org.apache.camel.spi.UuidGenerator;
-import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.commons.pool.BasePoolableObjectFactory;
 import org.apache.commons.pool.impl.GenericObjectPool;
@@ -57,6 +55,11 @@ public class InOutProducer extends SjmsProducer {
 
     private static final String GENERATED_CORRELATION_ID_PREFIX = "Camel-";
     private UuidGenerator uuidGenerator;
+    private GenericObjectPool<MessageConsumerResources> consumers;
+
+    public InOutProducer(final SjmsEndpoint endpoint) {
+        super(endpoint);
+    }
 
     public UuidGenerator getUuidGenerator() {
         return uuidGenerator;
@@ -67,8 +70,7 @@ public class InOutProducer extends SjmsProducer {
     }
 
     /**
-     * A pool of {@link MessageConsumerResources} objects that are the reply
-     * consumers.
+     * A pool of {@link MessageConsumerResources} objects that are the reply consumers.
      */
     protected class MessageConsumerResourcesFactory extends BasePoolableObjectFactory<MessageConsumerResources> {
 
@@ -135,12 +137,6 @@ public class InOutProducer extends SjmsProducer {
         }
     }
 
-    private GenericObjectPool<MessageConsumerResources> consumers;
-
-    public InOutProducer(final SjmsEndpoint endpoint) {
-        super(endpoint);
-    }
-
     @Override
     protected void doStart() throws Exception {
         if (ObjectHelper.isEmpty(getNamedReplyTo())) {
@@ -152,12 +148,12 @@ public class InOutProducer extends SjmsProducer {
             // use the generator configured on the camel context
             uuidGenerator = getEndpoint().getCamelContext().getUuidGenerator();
         }
-        if (getConsumers() == null) {
-            setConsumers(new GenericObjectPool<MessageConsumerResources>(new MessageConsumerResourcesFactory()));
-            getConsumers().setMaxActive(getConsumerCount());
-            getConsumers().setMaxIdle(getConsumerCount());
-            while (getConsumers().getNumIdle() < getConsumers().getMaxIdle()) {
-                getConsumers().addObject();
+        if (consumers == null) {
+            consumers = new GenericObjectPool<MessageConsumerResources>(new MessageConsumerResourcesFactory());
+            consumers.setMaxActive(getConsumerCount());
+            consumers.setMaxIdle(getConsumerCount());
+            while (consumers.getNumIdle() < consumers.getMaxIdle()) {
+                consumers.addObject();
             }
         }
         super.doStart();
@@ -166,9 +162,9 @@ public class InOutProducer extends SjmsProducer {
     @Override
     protected void doStop() throws Exception {
         super.doStop();
-        if (getConsumers() != null) {
-            getConsumers().close();
-            setConsumers(null);
+        if (consumers != null) {
+            consumers.close();
+            consumers = null;
         }
     }
 
@@ -260,11 +256,4 @@ public class InOutProducer extends SjmsProducer {
         callback.done(isSynchronous());
     }
 
-    public void setConsumers(GenericObjectPool<MessageConsumerResources> consumers) {
-        this.consumers = consumers;
-    }
-
-    public GenericObjectPool<MessageConsumerResources> getConsumers() {
-        return consumers;
-    }
 }


[3/5] camel git commit: CAMEL-9116: camel-sjms should use same binding to/from JMS as camel-jms does.

Posted by da...@apache.org.
CAMEL-9116: camel-sjms should use same binding to/from JMS as camel-jms does.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d19e5d74
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d19e5d74
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d19e5d74

Branch: refs/heads/master
Commit: d19e5d742b2cbed2f84ad2598bbe9c4789789d3b
Parents: 5b1d8da
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Sep 7 11:10:01 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Sep 7 11:10:01 2015 +0200

----------------------------------------------------------------------
 components/camel-sjms/pom.xml                   | 219 +++---
 .../camel/component/sjms/SjmsComponent.java     |  27 +-
 .../camel/component/sjms/SjmsConsumer.java      |   2 +-
 .../camel/component/sjms/SjmsEndpoint.java      | 127 ++-
 .../sjms/SjmsHeaderFilterStrategy.java          |  14 +-
 .../camel/component/sjms/SjmsMessage.java       | 283 +++++++
 .../component/sjms/batch/SjmsBatchConsumer.java |   5 +-
 .../component/sjms/batch/SjmsBatchEndpoint.java | 152 +++-
 .../sjms/consumer/AbstractMessageHandler.java   |  22 +-
 .../sjms/consumer/InOutMessageHandler.java      |   5 +-
 .../sjms/jms/DefaultJmsKeyFormatStrategy.java   |   2 +-
 .../camel/component/sjms/jms/JmsBinding.java    | 606 ++++++++++++++
 .../sjms/jms/JmsKeyFormatStrategy.java          |  41 +
 .../component/sjms/jms/JmsMessageHelper.java    | 782 ++++++-------------
 .../component/sjms/jms/KeyFormatStrategy.java   |  41 -
 .../sjms/jms/MessageCreatedStrategy.java        |  39 +
 .../component/sjms/producer/InOnlyProducer.java |  23 +-
 .../component/sjms/producer/InOutProducer.java  |  41 +-
 .../JMSMessageHelperTypeConversionTest.java     | 201 -----
 19 files changed, 1671 insertions(+), 961 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/d19e5d74/components/camel-sjms/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-sjms/pom.xml b/components/camel-sjms/pom.xml
index 6977796..3f8d023 100644
--- a/components/camel-sjms/pom.xml
+++ b/components/camel-sjms/pom.xml
@@ -15,121 +15,122 @@
     See the License for the specific language governing permissions and
     limitations under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-    <modelVersion>4.0.0</modelVersion>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
 
-    <parent>
-        <groupId>org.apache.camel</groupId>
-        <artifactId>components</artifactId>
-        <version>2.16-SNAPSHOT</version>
-    </parent>
+  <parent>
+    <groupId>org.apache.camel</groupId>
+    <artifactId>components</artifactId>
+    <version>2.16-SNAPSHOT</version>
+  </parent>
 
-    <artifactId>camel-sjms</artifactId>
-    <packaging>bundle</packaging>
-    <name>Camel :: Simple JMS</name>
-    <description>A pure Java JMS Camel Component</description>
+  <artifactId>camel-sjms</artifactId>
+  <packaging>bundle</packaging>
+  <name>Camel :: Simple JMS</name>
+  <description>A pure Java JMS Camel Component</description>
 
-    <properties>
-        <camel.osgi.export.pkg>
-            org.apache.camel.component.sjms,
-            org.apache.camel.component.sjms.jms,
-            org.apache.camel.component.sjms.batch
-        </camel.osgi.export.pkg>
-        <camel.osgi.private.pkg>
-            org.apache.camel.component.sjms.consumer,
-            org.apache.camel.component.sjms.producer,
-            org.apache.camel.component.sjms.taskmanager,
-            org.apache.camel.component.sjms.tx
-        </camel.osgi.private.pkg>
-        <camel.osgi.export.service>
-            org.apache.camel.spi.ComponentResolver;component=sjms,
-            org.apache.camel.spi.ComponentResolver;component=sjms-batch
-        </camel.osgi.export.service>
-    </properties>
+  <properties>
+    <camel.osgi.export.pkg>
+      org.apache.camel.component.sjms,
+      org.apache.camel.component.sjms.jms,
+      org.apache.camel.component.sjms.batch
+    </camel.osgi.export.pkg>
+    <camel.osgi.private.pkg>
+      org.apache.camel.component.sjms.consumer,
+      org.apache.camel.component.sjms.producer,
+      org.apache.camel.component.sjms.taskmanager,
+      org.apache.camel.component.sjms.tx
+    </camel.osgi.private.pkg>
+    <camel.osgi.export.service>
+      org.apache.camel.spi.ComponentResolver;component=sjms,
+      org.apache.camel.spi.ComponentResolver;component=sjms-batch
+    </camel.osgi.export.service>
+  </properties>
 
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.camel</groupId>
-            <artifactId>camel-core</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>commons-pool</groupId>
-            <artifactId>commons-pool</artifactId>
-        </dependency>
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.camel</groupId>
+      <artifactId>camel-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>commons-pool</groupId>
+      <artifactId>commons-pool</artifactId>
+    </dependency>
 
-        <dependency>
-            <groupId>org.apache.geronimo.specs</groupId>
-            <artifactId>geronimo-jms_1.1_spec</artifactId>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.geronimo.specs</groupId>
-            <artifactId>geronimo-annotation_1.0_spec</artifactId>
-            <version>${geronimo-annotation-spec-version}</version>
-            <scope>provided</scope>
-        </dependency>
+    <dependency>
+      <groupId>org.apache.geronimo.specs</groupId>
+      <artifactId>geronimo-jms_1.1_spec</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.geronimo.specs</groupId>
+      <artifactId>geronimo-annotation_1.0_spec</artifactId>
+      <version>${geronimo-annotation-spec-version}</version>
+      <scope>provided</scope>
+    </dependency>
 
-        <dependency>
-            <groupId>org.apache.camel</groupId>
-            <artifactId>camel-test</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.activemq</groupId>
-            <artifactId>activemq-broker</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.activemq</groupId>
-            <artifactId>activemq-kahadb-store</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.activemq</groupId>
-            <artifactId>activemq-pool</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>com.atomikos</groupId>
-            <artifactId>transactions-jta</artifactId>
-            <version>${atomikos-transactions-version}</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-log4j12</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>commons-io</groupId>
-            <artifactId>commons-io</artifactId>
-            <version>${commons-io-version}</version>
-            <scope>test</scope>
-        </dependency>
-    </dependencies>
+    <dependency>
+      <groupId>org.apache.camel</groupId>
+      <artifactId>camel-test</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-broker</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-kahadb-store</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-pool</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.atomikos</groupId>
+      <artifactId>transactions-jta</artifactId>
+      <version>${atomikos-transactions-version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+      <version>${commons-io-version}</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
 
-    <build>
-        <plugins>
-            <plugin>
-                <artifactId>maven-surefire-plugin</artifactId>
-                <configuration>
-          	    <forkCount>1</forkCount>
-	  	    <reuseForks>false</reuseForks>
-                    <forkedProcessTimeoutInSeconds>6000</forkedProcessTimeoutInSeconds>
-                </configuration>
-            </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-clean-plugin</artifactId>
-                <configuration>
-                    <filesets>
-                        <fileset>
-                            <directory>${basedir}/activemq-data</directory>
-                        </fileset>
-                    </filesets>
-                </configuration>
-            </plugin>
-        </plugins>
-    </build>
+  <build>
+    <plugins>
+      <plugin>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <forkCount>1</forkCount>
+          <reuseForks>false</reuseForks>
+          <forkedProcessTimeoutInSeconds>6000</forkedProcessTimeoutInSeconds>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-clean-plugin</artifactId>
+        <configuration>
+          <filesets>
+            <fileset>
+              <directory>${basedir}/activemq-data</directory>
+            </fileset>
+          </filesets>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
 
 </project>

http://git-wip-us.apache.org/repos/asf/camel/blob/d19e5d74/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
index 3433ec9..a7347c7 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
@@ -27,7 +27,8 @@ import org.apache.camel.component.sjms.jms.ConnectionFactoryResource;
 import org.apache.camel.component.sjms.jms.ConnectionResource;
 import org.apache.camel.component.sjms.jms.DefaultJmsKeyFormatStrategy;
 import org.apache.camel.component.sjms.jms.DestinationCreationStrategy;
-import org.apache.camel.component.sjms.jms.KeyFormatStrategy;
+import org.apache.camel.component.sjms.jms.JmsKeyFormatStrategy;
+import org.apache.camel.component.sjms.jms.MessageCreatedStrategy;
 import org.apache.camel.component.sjms.taskmanager.TimedTaskManager;
 import org.apache.camel.impl.UriEndpointComponent;
 import org.apache.camel.spi.HeaderFilterStrategy;
@@ -44,12 +45,13 @@ public class SjmsComponent extends UriEndpointComponent implements HeaderFilterS
     private ConnectionFactory connectionFactory;
     private ConnectionResource connectionResource;
     private HeaderFilterStrategy headerFilterStrategy = new SjmsHeaderFilterStrategy();
-    private KeyFormatStrategy keyFormatStrategy = new DefaultJmsKeyFormatStrategy();
+    private JmsKeyFormatStrategy jmsKeyFormatStrategy = new DefaultJmsKeyFormatStrategy();
     private Integer connectionCount = 1;
     private TransactionCommitStrategy transactionCommitStrategy;
     private TimedTaskManager timedTaskManager;
     private DestinationCreationStrategy destinationCreationStrategy;
     private ExecutorService asyncStartStopExecutorService;
+    private MessageCreatedStrategy messageCreatedStrategy;
 
     public SjmsComponent() {
         super(SjmsEndpoint.class);
@@ -200,12 +202,12 @@ public class SjmsComponent extends UriEndpointComponent implements HeaderFilterS
      * You can provide your own implementation of the org.apache.camel.component.jms.JmsKeyFormatStrategy
      * and refer to it using the # notation.
      */
-    public void setKeyFormatStrategy(KeyFormatStrategy keyFormatStrategy) {
-        this.keyFormatStrategy = keyFormatStrategy;
+    public void setJmsKeyFormatStrategy(JmsKeyFormatStrategy jmsKeyFormatStrategy) {
+        this.jmsKeyFormatStrategy = jmsKeyFormatStrategy;
     }
 
-    public KeyFormatStrategy getKeyFormatStrategy() {
-        return keyFormatStrategy;
+    public JmsKeyFormatStrategy getJmsKeyFormatStrategy() {
+        return jmsKeyFormatStrategy;
     }
 
     public TransactionCommitStrategy getTransactionCommitStrategy() {
@@ -241,4 +243,17 @@ public class SjmsComponent extends UriEndpointComponent implements HeaderFilterS
     public void setTimedTaskManager(TimedTaskManager timedTaskManager) {
         this.timedTaskManager = timedTaskManager;
     }
+
+    public MessageCreatedStrategy getMessageCreatedStrategy() {
+        return messageCreatedStrategy;
+    }
+
+    /**
+     * To use the given MessageCreatedStrategy which are invoked when Camel creates new instances of <tt>javax.jms.Message</tt>
+     * objects when Camel is sending a JMS message.
+     */
+    public void setMessageCreatedStrategy(MessageCreatedStrategy messageCreatedStrategy) {
+        this.messageCreatedStrategy = messageCreatedStrategy;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/d19e5d74/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java
index 322ad5c..162e252 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java
@@ -206,7 +206,7 @@ public class SjmsConsumer extends DefaultConsumer {
      */
     protected MessageListener createMessageHandler(Session session) {
 
-        TransactionCommitStrategy commitStrategy = null;
+        TransactionCommitStrategy commitStrategy;
         if (getTransactionCommitStrategy() != null) {
             commitStrategy = getTransactionCommitStrategy();
         } else if (getTransactionBatchCount() > 0) {

http://git-wip-us.apache.org/repos/asf/camel/blob/d19e5d74/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
index c7ed9ac..cb5d396 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
@@ -16,22 +16,30 @@
  */
 package org.apache.camel.component.sjms;
 
+import javax.jms.Message;
+import javax.jms.Session;
+
 import org.apache.camel.Component;
 import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.MultipleConsumersSupport;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.component.sjms.jms.ConnectionResource;
 import org.apache.camel.component.sjms.jms.DefaultDestinationCreationStrategy;
+import org.apache.camel.component.sjms.jms.DefaultJmsKeyFormatStrategy;
 import org.apache.camel.component.sjms.jms.DestinationCreationStrategy;
 import org.apache.camel.component.sjms.jms.DestinationNameParser;
-import org.apache.camel.component.sjms.jms.KeyFormatStrategy;
+import org.apache.camel.component.sjms.jms.JmsBinding;
+import org.apache.camel.component.sjms.jms.JmsKeyFormatStrategy;
+import org.apache.camel.component.sjms.jms.MessageCreatedStrategy;
 import org.apache.camel.component.sjms.jms.SessionAcknowledgementType;
 import org.apache.camel.component.sjms.producer.InOnlyProducer;
 import org.apache.camel.component.sjms.producer.InOutProducer;
 import org.apache.camel.impl.DefaultEndpoint;
 import org.apache.camel.spi.HeaderFilterStrategy;
+import org.apache.camel.spi.HeaderFilterStrategyAware;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
@@ -43,11 +51,13 @@ import org.slf4j.LoggerFactory;
  * A JMS Endpoint
  */
 @UriEndpoint(scheme = "sjms", title = "Simple JMS", syntax = "sjms:destinationType:destinationName", consumerClass = SjmsConsumer.class, label = "messaging")
-public class SjmsEndpoint extends DefaultEndpoint implements MultipleConsumersSupport {
+public class SjmsEndpoint extends DefaultEndpoint implements MultipleConsumersSupport, HeaderFilterStrategyAware {
     protected final Logger logger = LoggerFactory.getLogger(getClass());
 
     private boolean topic;
 
+    private JmsBinding binding;
+
     @UriPath(enums = "queue,topic", defaultValue = "queue", description = "The kind of destination to use")
     private String destinationType;
     @UriPath @Metadata(required = "true")
@@ -55,6 +65,10 @@ public class SjmsEndpoint extends DefaultEndpoint implements MultipleConsumersSu
     @UriParam(label = "consumer", defaultValue = "true")
     private boolean synchronous = true;
     @UriParam
+    private HeaderFilterStrategy headerFilterStrategy;
+    @UriParam
+    private boolean includeAllJMSXProperties;
+    @UriParam
     private boolean transacted;
     @UriParam(label = "producer")
     private String namedReplyTo;
@@ -88,10 +102,16 @@ public class SjmsEndpoint extends DefaultEndpoint implements MultipleConsumersSu
     private boolean prefillPool = true;
     @UriParam(label = "producer", defaultValue = "true")
     private boolean allowNullBody = true;
+    @UriParam(defaultValue = "true")
+    private boolean mapJmsMessage = true;
     @UriParam
     private TransactionCommitStrategy transactionCommitStrategy;
     @UriParam
     private DestinationCreationStrategy destinationCreationStrategy = new DefaultDestinationCreationStrategy();
+    @UriParam
+    private MessageCreatedStrategy messageCreatedStrategy;
+    @UriParam
+    private JmsKeyFormatStrategy jmsKeyFormatStrategy;
 
     public SjmsEndpoint() {
     }
@@ -146,6 +166,34 @@ public class SjmsEndpoint extends DefaultEndpoint implements MultipleConsumersSu
         return true;
     }
 
+    public Exchange createExchange(Message message, Session session) {
+        Exchange exchange = createExchange(getExchangePattern());
+        exchange.setIn(new SjmsMessage(message, session, getBinding()));
+        return exchange;
+    }
+
+    public JmsBinding getBinding() {
+        if (binding == null) {
+            binding = createBinding();
+        }
+        return binding;
+    }
+
+    /**
+     * Creates the {@link org.apache.camel.component.sjms.jms.JmsBinding} to use.
+     */
+    protected JmsBinding createBinding() {
+        return new JmsBinding(isMapJmsMessage(), isAllowNullBody(), getHeaderFilterStrategy(), getJmsKeyFormatStrategy(), getMessageCreatedStrategy());
+    }
+
+    /**
+     * Sets the binding used to convert from a Camel message to and from a JMS
+     * message
+     */
+    public void setBinding(JmsBinding binding) {
+        this.binding = binding;
+    }
+
     /**
      * DestinationName is a JMS queue or topic name. By default, the destinationName is interpreted as a queue name.
      */
@@ -157,16 +205,35 @@ public class SjmsEndpoint extends DefaultEndpoint implements MultipleConsumersSu
         return destinationName;
     }
 
-    public ConnectionResource getConnectionResource() {
-        return getComponent().getConnectionResource();
+    public HeaderFilterStrategy getHeaderFilterStrategy() {
+        if (headerFilterStrategy == null) {
+            headerFilterStrategy = new SjmsHeaderFilterStrategy(isIncludeAllJMSXProperties());
+        }
+        return headerFilterStrategy;
+    }
+
+    /**
+     * To use a custom HeaderFilterStrategy to filter header to and from Camel message.
+     */
+    public void setHeaderFilterStrategy(HeaderFilterStrategy strategy) {
+        this.headerFilterStrategy = strategy;
     }
 
-    public HeaderFilterStrategy getSjmsHeaderFilterStrategy() {
-        return getComponent().getHeaderFilterStrategy();
+    public boolean isIncludeAllJMSXProperties() {
+        return includeAllJMSXProperties;
     }
 
-    public KeyFormatStrategy getJmsKeyFormatStrategy() {
-        return getComponent().getKeyFormatStrategy();
+    /**
+     * Whether to include all JMSXxxx properties when mapping from JMS to Camel Message.
+     * Setting this to true will include properties such as JMSXAppID, and JMSXUserID etc.
+     * Note: If you are using a custom headerFilterStrategy then this option does not apply.
+     */
+    public void setIncludeAllJMSXProperties(boolean includeAllJMSXProperties) {
+        this.includeAllJMSXProperties = includeAllJMSXProperties;
+    }
+
+    public ConnectionResource getConnectionResource() {
+        return getComponent().getConnectionResource();
     }
 
     public boolean isSynchronous() {
@@ -417,4 +484,48 @@ public class SjmsEndpoint extends DefaultEndpoint implements MultipleConsumersSu
     public void setAllowNullBody(boolean allowNullBody) {
         this.allowNullBody = allowNullBody;
     }
+
+    public boolean isMapJmsMessage() {
+        return mapJmsMessage;
+    }
+
+    /**
+     * Specifies whether Camel should auto map the received JMS message to a suited payload type, such as javax.jms.TextMessage to a String etc.
+     * See section about how mapping works below for more details.
+     */
+    public void setMapJmsMessage(boolean mapJmsMessage) {
+        this.mapJmsMessage = mapJmsMessage;
+    }
+
+    public MessageCreatedStrategy getMessageCreatedStrategy() {
+        return messageCreatedStrategy;
+    }
+
+    /**
+     * To use the given MessageCreatedStrategy which are invoked when Camel creates new instances of <tt>javax.jms.Message</tt>
+     * objects when Camel is sending a JMS message.
+     */
+    public void setMessageCreatedStrategy(MessageCreatedStrategy messageCreatedStrategy) {
+        this.messageCreatedStrategy = messageCreatedStrategy;
+    }
+
+    public JmsKeyFormatStrategy getJmsKeyFormatStrategy() {
+        if (jmsKeyFormatStrategy == null) {
+            jmsKeyFormatStrategy = new DefaultJmsKeyFormatStrategy();
+        }
+        return jmsKeyFormatStrategy;
+    }
+
+    /**
+     * Pluggable strategy for encoding and decoding JMS keys so they can be compliant with the JMS specification.
+     * Camel provides two implementations out of the box: default and passthrough.
+     * The default strategy will safely marshal dots and hyphens (. and -). The passthrough strategy leaves the key as is.
+     * Can be used for JMS brokers which do not care whether JMS header keys contain illegal characters.
+     * You can provide your own implementation of the org.apache.camel.component.jms.JmsKeyFormatStrategy
+     * and refer to it using the # notation.
+     */
+    public void setJmsKeyFormatStrategy(JmsKeyFormatStrategy jmsKeyFormatStrategy) {
+        this.jmsKeyFormatStrategy = jmsKeyFormatStrategy;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/d19e5d74/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsHeaderFilterStrategy.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsHeaderFilterStrategy.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsHeaderFilterStrategy.java
index 82a8a90..0da77ca 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsHeaderFilterStrategy.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsHeaderFilterStrategy.java
@@ -24,14 +24,18 @@ import org.apache.camel.impl.DefaultHeaderFilterStrategy;
 public class SjmsHeaderFilterStrategy extends DefaultHeaderFilterStrategy {
 
     public SjmsHeaderFilterStrategy() {
-        initialize();
+        this(false);
+    }
+
+    public SjmsHeaderFilterStrategy(boolean includeAllJMSXProperties) {
+        if (!includeAllJMSXProperties) {
+            initialize();
+        }
     }
 
     protected void initialize() {
-        // ignore provider specified JMS extension headers see page 39 of JMS
-        // 1.1 specification
-        // added "JMSXRecvTimestamp" as a workaround for an Oracle bug/typo in
-        // AqjmsMessage
+        // ignore provider specified JMS extension headers see page 39 of JMS 1.1 specification
+        // added "JMSXRecvTimestamp" as a workaround for an Oracle bug/typo in AqjmsMessage
         getOutFilter().add("JMSXUserID");
         getOutFilter().add("JMSXAppID");
         getOutFilter().add("JMSXDeliveryCount");

http://git-wip-us.apache.org/repos/asf/camel/blob/d19e5d74/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsMessage.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsMessage.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsMessage.java
new file mode 100644
index 0000000..92f8531
--- /dev/null
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsMessage.java
@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms;
+
+import java.io.File;
+import java.util.Map;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import org.apache.camel.RuntimeExchangeException;
+import org.apache.camel.component.sjms.jms.JmsBinding;
+import org.apache.camel.component.sjms.jms.JmsMessageHelper;
+import org.apache.camel.impl.DefaultMessage;
+import org.apache.camel.util.ExchangeHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Represents a {@link org.apache.camel.Message} for working with JMS
+ *
+ * @version
+ */
+public class SjmsMessage extends DefaultMessage {
+    private static final Logger LOG = LoggerFactory.getLogger(SjmsMessage.class);
+    private Message jmsMessage;
+    private Session jmsSession;
+    private JmsBinding binding;
+
+    public SjmsMessage(Message jmsMessage, Session jmsSession, JmsBinding binding) {
+        setJmsMessage(jmsMessage);
+        setJmsSession(jmsSession);
+        setBinding(binding);
+    }
+
+    @Override
+    public String toString() {
+        // do not print jmsMessage as there could be sensitive details
+        if (jmsMessage != null) {
+            try {
+                return "SjmsMessage[JmsMessageID: " + jmsMessage.getJMSMessageID() + "]";
+            } catch (Throwable e) {
+                // ignore
+            }
+        }
+        return "SjmsMessage@" + ObjectHelper.getIdentityHashCode(this);
+    }
+
+    @Override
+    public void copyFrom(org.apache.camel.Message that) {
+        if (that == this) {
+            // the same instance so do not need to copy
+            return;
+        }
+
+        // must initialize headers before we set the JmsMessage to avoid Camel
+        // populating it before we do the copy
+        getHeaders().clear();
+
+        boolean copyMessageId = true;
+        if (that instanceof SjmsMessage) {
+            SjmsMessage thatMessage = (SjmsMessage) that;
+            this.jmsMessage = thatMessage.jmsMessage;
+            if (this.jmsMessage != null) {
+                // for performance lets not copy the messageID if we are a JMS message
+                copyMessageId = false;
+            }
+        }
+        if (copyMessageId) {
+            setMessageId(that.getMessageId());
+        }
+
+        // copy body and fault flag
+        setBody(that.getBody());
+        setFault(that.isFault());
+
+        // we have already cleared the headers
+        if (that.hasHeaders()) {
+            getHeaders().putAll(that.getHeaders());
+        }
+
+        getAttachments().clear();
+        if (that.hasAttachments()) {
+            getAttachments().putAll(that.getAttachments());
+        }
+    }
+
+    public JmsBinding getBinding() {
+        if (binding == null) {
+            binding = ExchangeHelper.getBinding(getExchange(), JmsBinding.class);
+        }
+        return binding;
+    }
+
+    public void setBinding(JmsBinding binding) {
+        this.binding = binding;
+    }
+
+    /**
+     * Returns the underlying JMS message
+     */
+    public Message getJmsMessage() {
+        return jmsMessage;
+    }
+
+    public void setJmsMessage(Message jmsMessage) {
+        if (jmsMessage != null) {
+            try {
+                setMessageId(jmsMessage.getJMSMessageID());
+            } catch (JMSException e) {
+                LOG.warn("Unable to retrieve JMSMessageID from JMS Message", e);
+            }
+        }
+        this.jmsMessage = jmsMessage;
+    }
+
+    /**
+     * Returns the underlying JMS session.
+     * <p/>
+     * This may be <tt>null</tt>.
+     */
+    public Session getJmsSession() {
+        return jmsSession;
+    }
+
+    public void setJmsSession(Session jmsSession) {
+        this.jmsSession = jmsSession;
+    }
+
+    @Override
+    public void setBody(Object body) {
+        super.setBody(body);
+        if (body == null) {
+            // preserver headers even if we set body to null
+            ensureInitialHeaders();
+            // remove underlying jmsMessage since we mutated body to null
+            jmsMessage = null;
+        }
+    }
+
+    public Object getHeader(String name) {
+        Object answer = null;
+
+        // we will exclude using JMS-prefixed headers here to avoid strangeness with some JMS providers
+        // e.g. ActiveMQ returns the String not the Destination type for "JMSReplyTo"!
+        // only look in jms message directly if we have not populated headers
+        if (jmsMessage != null && !hasPopulatedHeaders() && !name.startsWith("JMS")) {
+            try {
+                // use binding to do the lookup as it has to consider using encoded keys
+                answer = getBinding().getObjectProperty(jmsMessage, name);
+            } catch (JMSException e) {
+                throw new RuntimeExchangeException("Unable to retrieve header from JMS Message: " + name, getExchange(), e);
+            }
+        }
+        // only look if we have populated headers otherwise there are no headers at all
+        // if we do lookup a header starting with JMS then force a lookup
+        if (answer == null && (hasPopulatedHeaders() || name.startsWith("JMS"))) {
+            answer = super.getHeader(name);
+        }
+        return answer;
+    }
+
+    @Override
+    public Map<String, Object> getHeaders() {
+        ensureInitialHeaders();
+        return super.getHeaders();
+    }
+
+    @Override
+    public Object removeHeader(String name) {
+        ensureInitialHeaders();
+        return super.removeHeader(name);
+    }
+
+    @Override
+    public void setHeaders(Map<String, Object> headers) {
+        ensureInitialHeaders();
+        super.setHeaders(headers);
+    }
+
+    @Override
+    public void setHeader(String name, Object value) {
+        ensureInitialHeaders();
+        super.setHeader(name, value);
+    }
+
+    @Override
+    public SjmsMessage newInstance() {
+        return new SjmsMessage(null, null, binding);
+    }
+
+    /**
+     * Returns true if a new JMS message instance should be created to send to the next component
+     */
+    public boolean shouldCreateNewMessage() {
+        return super.hasPopulatedHeaders();
+    }
+
+    /**
+     * Ensure that the headers have been populated from the underlying JMS message
+     * before we start mutating the headers
+     */
+    protected void ensureInitialHeaders() {
+        if (jmsMessage != null && !hasPopulatedHeaders()) {
+            // we have not populated headers so force this by creating
+            // new headers and set it on super
+            super.setHeaders(createHeaders());
+        }
+    }
+
+    @Override
+    protected Object createBody() {
+        if (jmsMessage != null) {
+            return getBinding().extractBodyFromJms(getExchange(), jmsMessage);
+        }
+        return null;
+    }
+
+    @Override
+    protected void populateInitialHeaders(Map<String, Object> map) {
+        if (jmsMessage != null && map != null) {
+            map.putAll(getBinding().extractHeadersFromJms(jmsMessage, getExchange()));
+        }
+    }
+
+    @Override
+    protected String createMessageId() {
+        if (jmsMessage == null) {
+            LOG.trace("No javax.jms.Message set so generating a new message id");
+            return super.createMessageId();
+        }
+        try {
+            String id = getDestinationAsString(jmsMessage.getJMSDestination()) + jmsMessage.getJMSMessageID();
+            return getSanitizedString(id);
+        } catch (JMSException e) {
+            throw new RuntimeExchangeException("Unable to retrieve JMSMessageID from JMS Message", getExchange(), e);
+        }
+    }
+
+    @Override
+    protected Boolean isTransactedRedelivered() {
+        if (jmsMessage != null) {
+            return JmsMessageHelper.getJMSRedelivered(jmsMessage);
+        } else {
+            return null;
+        }
+    }
+
+    private String getDestinationAsString(Destination destination) throws JMSException {
+        String result;
+        if (destination == null) {
+            result = "null destination!" + File.separator;
+        } else if (destination instanceof Topic) {
+            result = "topic" + File.separator + ((Topic) destination).getTopicName() + File.separator;
+        } else {
+            result = "queue" + File.separator + ((Queue) destination).getQueueName() + File.separator;
+        }
+        return result;
+    }
+
+    private String getSanitizedString(Object value) {
+        return value != null ? value.toString().replaceAll("[^a-zA-Z0-9\\.\\_\\-]", "_") : "";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/d19e5d74/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
index ee2b250..e8ce161 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.sjms.batch;
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.util.Date;
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -41,6 +42,7 @@ import org.apache.camel.Processor;
 import org.apache.camel.component.sjms.jms.JmsMessageHelper;
 import org.apache.camel.impl.DefaultConsumer;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -226,7 +228,8 @@ public class SjmsBatchConsumer extends DefaultConsumer {
                             LOG.debug("Message received: {}", messageCount);
                             if ((message instanceof ObjectMessage)
                                     || (message instanceof TextMessage)) {
-                                Exchange exchange = JmsMessageHelper.createExchange(message, getEndpoint());
+
+                                final Exchange exchange = getEndpoint().createExchange(message, session);
                                 aggregatedExchange = aggregationStrategy.aggregate(aggregatedExchange, exchange);
                                 aggregatedExchange.setProperty(SjmsBatchEndpoint.PROPERTY_BATCH_SIZE, messageCount);
                             } else {

http://git-wip-us.apache.org/repos/asf/camel/blob/d19e5d74/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
index b4c052f..49c74ba 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
@@ -16,38 +16,65 @@
  */
 package org.apache.camel.component.sjms.batch;
 
+import javax.jms.Message;
+import javax.jms.Session;
+
 import org.apache.camel.Component;
 import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
+import org.apache.camel.component.sjms.SjmsHeaderFilterStrategy;
+import org.apache.camel.component.sjms.SjmsMessage;
+import org.apache.camel.component.sjms.jms.DefaultJmsKeyFormatStrategy;
 import org.apache.camel.component.sjms.jms.DestinationNameParser;
+import org.apache.camel.component.sjms.jms.JmsBinding;
+import org.apache.camel.component.sjms.jms.JmsKeyFormatStrategy;
+import org.apache.camel.component.sjms.jms.MessageCreatedStrategy;
 import org.apache.camel.impl.DefaultEndpoint;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.spi.HeaderFilterStrategy;
+import org.apache.camel.spi.HeaderFilterStrategyAware;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriPath;
 
 @UriEndpoint(scheme = "sjms-batch", title = "Simple JMS Batch Component", syntax = "sjms-batch:destinationName",
-        consumerClass = SjmsBatchComponent.class, label = "messaging")
-public class SjmsBatchEndpoint extends DefaultEndpoint {
+        consumerClass = SjmsBatchComponent.class, label = "messaging", consumerOnly = true)
+public class SjmsBatchEndpoint extends DefaultEndpoint implements HeaderFilterStrategyAware {
 
     public static final int DEFAULT_COMPLETION_SIZE = 200; // the default dispatch queue size in ActiveMQ
     public static final int DEFAULT_COMPLETION_TIMEOUT = 500;
     public static final String PROPERTY_BATCH_SIZE = "CamelSjmsBatchSize";
 
-    @UriPath(label = "consumer") @Metadata(required = "true")
+    private JmsBinding binding;
+
+    @UriPath @Metadata(required = "true")
     private String destinationName;
-    @UriParam(label = "consumer", defaultValue = "1")
+    @UriParam(defaultValue = "1")
     private int consumerCount = 1;
-    @UriParam(label = "consumer", defaultValue = "200")
+    @UriParam(defaultValue = "200")
     private int completionSize = DEFAULT_COMPLETION_SIZE;
-    @UriParam(label = "consumer", defaultValue = "500")
+    @UriParam(defaultValue = "500")
     private int completionTimeout = DEFAULT_COMPLETION_TIMEOUT;
-    @UriParam(label = "consumer", defaultValue = "1000")
+    @UriParam(defaultValue = "1000")
     private int pollDuration = 1000;
-    @UriParam(label = "consumer") @Metadata(required = "true")
+    @UriParam @Metadata(required = "true")
     private AggregationStrategy aggregationStrategy;
+    @UriParam
+    private HeaderFilterStrategy headerFilterStrategy;
+    @UriParam
+    private boolean includeAllJMSXProperties;
+    @UriParam(defaultValue = "true")
+    private boolean allowNullBody = true;
+    @UriParam(defaultValue = "true")
+    private boolean mapJmsMessage = true;
+    @UriParam
+    private MessageCreatedStrategy messageCreatedStrategy;
+    @UriParam
+    private JmsKeyFormatStrategy jmsKeyFormatStrategy;
+
 
     public SjmsBatchEndpoint() {
     }
@@ -78,6 +105,34 @@ public class SjmsBatchEndpoint extends DefaultEndpoint {
         return new SjmsBatchConsumer(this, processor);
     }
 
+    public Exchange createExchange(Message message, Session session) {
+        Exchange exchange = createExchange(getExchangePattern());
+        exchange.setIn(new SjmsMessage(message, session, getBinding()));
+        return exchange;
+    }
+
+    public JmsBinding getBinding() {
+        if (binding == null) {
+            binding = createBinding();
+        }
+        return binding;
+    }
+
+    /**
+     * Creates the {@link org.apache.camel.component.sjms.jms.JmsBinding} to use.
+     */
+    protected JmsBinding createBinding() {
+        return new JmsBinding(isMapJmsMessage(), isAllowNullBody(), getHeaderFilterStrategy(), getJmsKeyFormatStrategy(), getMessageCreatedStrategy());
+    }
+
+    /**
+     * Sets the binding used to convert from a Camel message to and from a JMS
+     * message
+     */
+    public void setBinding(JmsBinding binding) {
+        this.binding = binding;
+    }
+
     public AggregationStrategy getAggregationStrategy() {
         return aggregationStrategy;
     }
@@ -141,4 +196,85 @@ public class SjmsBatchEndpoint extends DefaultEndpoint {
         this.pollDuration = pollDuration;
     }
 
+    public boolean isAllowNullBody() {
+        return allowNullBody;
+    }
+
+    /**
+     * Whether to allow sending messages with no body. If this option is false and the message body is null, then an JMSException is thrown.
+     */
+    public void setAllowNullBody(boolean allowNullBody) {
+        this.allowNullBody = allowNullBody;
+    }
+
+    public boolean isMapJmsMessage() {
+        return mapJmsMessage;
+    }
+
+    /**
+     * Specifies whether Camel should auto map the received JMS message to a suited payload type, such as javax.jms.TextMessage to a String etc.
+     * See section about how mapping works below for more details.
+     */
+    public void setMapJmsMessage(boolean mapJmsMessage) {
+        this.mapJmsMessage = mapJmsMessage;
+    }
+
+    public MessageCreatedStrategy getMessageCreatedStrategy() {
+        return messageCreatedStrategy;
+    }
+
+    /**
+     * To use the given MessageCreatedStrategy which are invoked when Camel creates new instances of <tt>javax.jms.Message</tt>
+     * objects when Camel is sending a JMS message.
+     */
+    public void setMessageCreatedStrategy(MessageCreatedStrategy messageCreatedStrategy) {
+        this.messageCreatedStrategy = messageCreatedStrategy;
+    }
+
+    public JmsKeyFormatStrategy getJmsKeyFormatStrategy() {
+        if (jmsKeyFormatStrategy == null) {
+            jmsKeyFormatStrategy = new DefaultJmsKeyFormatStrategy();
+        }
+        return jmsKeyFormatStrategy;
+    }
+
+    /**
+     * Pluggable strategy for encoding and decoding JMS keys so they can be compliant with the JMS specification.
+     * Camel provides two implementations out of the box: default and passthrough.
+     * The default strategy will safely marshal dots and hyphens (. and -). The passthrough strategy leaves the key as is.
+     * Can be used for JMS brokers which do not care whether JMS header keys contain illegal characters.
+     * You can provide your own implementation of the org.apache.camel.component.jms.JmsKeyFormatStrategy
+     * and refer to it using the # notation.
+     */
+    public void setJmsKeyFormatStrategy(JmsKeyFormatStrategy jmsKeyFormatStrategy) {
+        this.jmsKeyFormatStrategy = jmsKeyFormatStrategy;
+    }
+
+    public HeaderFilterStrategy getHeaderFilterStrategy() {
+        if (headerFilterStrategy == null) {
+            headerFilterStrategy = new SjmsHeaderFilterStrategy(isIncludeAllJMSXProperties());
+        }
+        return headerFilterStrategy;
+    }
+
+    /**
+     * To use a custom HeaderFilterStrategy to filter header to and from Camel message.
+     */
+    public void setHeaderFilterStrategy(HeaderFilterStrategy strategy) {
+        this.headerFilterStrategy = strategy;
+    }
+
+    public boolean isIncludeAllJMSXProperties() {
+        return includeAllJMSXProperties;
+    }
+
+    /**
+     * Whether to include all JMSXxxx properties when mapping from JMS to Camel Message.
+     * Setting this to true will include properties such as JMSXAppID, and JMSXUserID etc.
+     * Note: If you are using a custom headerFilterStrategy then this option does not apply.
+     */
+    public void setIncludeAllJMSXProperties(boolean includeAllJMSXProperties) {
+        this.includeAllJMSXProperties = includeAllJMSXProperties;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/d19e5d74/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java
index 1598a43..f394008 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/AbstractMessageHandler.java
@@ -25,8 +25,6 @@ import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.component.sjms.SjmsEndpoint;
-import org.apache.camel.component.sjms.jms.JmsMessageHelper;
-import org.apache.camel.impl.DefaultExchange;
 import org.apache.camel.spi.Synchronization;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -70,8 +68,7 @@ public abstract class AbstractMessageHandler implements MessageListener {
     public void onMessage(Message message) {
         RuntimeCamelException rce = null;
         try {
-            SjmsEndpoint endpoint = (SjmsEndpoint) getEndpoint();
-            final DefaultExchange exchange = (DefaultExchange) JmsMessageHelper.createExchange(message, endpoint, endpoint.getJmsKeyFormatStrategy());
+            final Exchange exchange = getEndpoint().createExchange(message, getSession());
 
             log.debug("Processing Exchange.id:{}", exchange.getExchangeId());
 
@@ -80,10 +77,10 @@ public abstract class AbstractMessageHandler implements MessageListener {
             }
             try {
                 if (isTransacted() || isSynchronous()) {
-                    log.debug("  Handling synchronous message: {}", exchange.getIn().getBody());
+                    log.debug("Handling synchronous message: {}", exchange.getIn().getBody());
                     handleMessage(exchange);
                 } else {
-                    log.debug("  Handling asynchronous message: {}", exchange.getIn().getBody());
+                    log.debug("Handling asynchronous message: {}", exchange.getIn().getBody());
                     executor.execute(new Runnable() {
                         @Override
                         public void run() {
@@ -96,12 +93,10 @@ public abstract class AbstractMessageHandler implements MessageListener {
                     });
                 }
             } catch (Exception e) {
-                if (exchange != null) {
-                    if (exchange.getException() == null) {
-                        exchange.setException(e);
-                    } else {
-                        throw e;
-                    }
+                if (exchange.getException() == null) {
+                    exchange.setException(e);
+                } else {
+                    throw e;
                 }
             }
         } catch (Exception e) {
@@ -113,9 +108,6 @@ public abstract class AbstractMessageHandler implements MessageListener {
         }
     }
 
-    /**
-     * @param exchange
-     */
     public abstract void handleMessage(final Exchange exchange);
 
     /**

http://git-wip-us.apache.org/repos/asf/camel/blob/d19e5d74/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java
index 2068da5..97def12 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java
@@ -32,7 +32,6 @@ import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.component.sjms.SjmsEndpoint;
 import org.apache.camel.component.sjms.jms.JmsConstants;
-import org.apache.camel.component.sjms.jms.JmsMessageHelper;
 import org.apache.camel.spi.Synchronization;
 
 /**
@@ -157,7 +156,9 @@ public class InOutMessageHandler extends AbstractMessageHandler {
         @Override
         public void done(boolean sync) {
             try {
-                Message response = JmsMessageHelper.createMessage(exchange, getSession(), getEndpoint());
+                // the response can either be in OUT or IN
+                org.apache.camel.Message msg = exchange.hasOut() ? exchange.getOut() : exchange.getIn();
+                Message response = getEndpoint().getBinding().makeJmsMessage(exchange, msg.getBody(), msg.getHeaders(), getSession(), null);
                 response.setJMSCorrelationID(exchange.getIn().getHeader(JmsConstants.JMS_CORRELATION_ID, String.class));
                 localProducer.send(response);
             } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/camel/blob/d19e5d74/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DefaultJmsKeyFormatStrategy.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DefaultJmsKeyFormatStrategy.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DefaultJmsKeyFormatStrategy.java
index d95d2b5..4fa3308 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DefaultJmsKeyFormatStrategy.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/DefaultJmsKeyFormatStrategy.java
@@ -22,7 +22,7 @@ package org.apache.camel.component.sjms.jms;
  * This can be used for sending keys contain package names that is common by
  * Java frameworks.
  */
-public class DefaultJmsKeyFormatStrategy implements KeyFormatStrategy {
+public class DefaultJmsKeyFormatStrategy implements JmsKeyFormatStrategy {
 
     public String encodeKey(String key) {
         String answer = key.replace(".", "_DOT_");

http://git-wip-us.apache.org/repos/asf/camel/blob/d19e5d74/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsBinding.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsBinding.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsBinding.java
new file mode 100644
index 0000000..8dc2841
--- /dev/null
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsBinding.java
@@ -0,0 +1,606 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.jms;
+
+import java.io.File;
+import java.io.InputStream;
+import java.io.Reader;
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.Date;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageFormatException;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.NoTypeConversionAvailableException;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.StreamCache;
+import org.apache.camel.WrappedFile;
+import org.apache.camel.impl.DefaultExchangeHolder;
+import org.apache.camel.spi.HeaderFilterStrategy;
+import org.apache.camel.util.CamelContextHelper;
+import org.apache.camel.util.ExchangeHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Node;
+
+import static org.apache.camel.component.sjms.jms.JmsMessageHelper.normalizeDestinationName;
+
+/**
+ * A Strategy used to convert between a Camel {@link org.apache.camel.Exchange} and {@link org.apache.camel.Message}
+ * to and from a JMS {@link javax.jms.Message}
+ */
+public class JmsBinding {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JmsBinding.class);
+    private final boolean mapJmsMessage;
+    private final boolean allowNullBody;
+    private final HeaderFilterStrategy headerFilterStrategy;
+    private final JmsKeyFormatStrategy jmsJmsKeyFormatStrategy;
+    private final MessageCreatedStrategy messageCreatedStrategy;
+
+    public JmsBinding(boolean mapJmsMessage, boolean allowNullBody,
+                      HeaderFilterStrategy headerFilterStrategy, JmsKeyFormatStrategy jmsJmsKeyFormatStrategy,
+                      MessageCreatedStrategy messageCreatedStrategy) {
+        this.mapJmsMessage = mapJmsMessage;
+        this.allowNullBody = allowNullBody;
+        this.headerFilterStrategy = headerFilterStrategy;
+        this.jmsJmsKeyFormatStrategy = jmsJmsKeyFormatStrategy;
+        this.messageCreatedStrategy = messageCreatedStrategy;
+    }
+
+    /**
+     * Extracts the body from the JMS message
+     *
+     * @param exchange the exchange
+     * @param message  the message to extract its body
+     * @return the body, can be <tt>null</tt>
+     */
+    public Object extractBodyFromJms(Exchange exchange, Message message) {
+        try {
+
+            // TODO: new options to support
+
+            // is a custom message converter configured on endpoint then use it instead of doing the extraction
+            // based on message type
+/*            if (endpoint != null && endpoint.getMessageConverter() != null) {
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Extracting body using a custom MessageConverter: {} from JMS message: {}", endpoint.getMessageConverter(), message);
+                }
+                return endpoint.getMessageConverter().fromMessage(message);
+            }
+*/
+            // if we are configured to not map the jms message then return it as body
+            if (!mapJmsMessage) {
+                LOG.trace("Option map JMS message is false so using JMS message as body: {}", message);
+                return message;
+            }
+
+            if (message instanceof ObjectMessage) {
+                LOG.trace("Extracting body as a ObjectMessage from JMS message: {}", message);
+                ObjectMessage objectMessage = (ObjectMessage)message;
+                Object payload = objectMessage.getObject();
+                if (payload instanceof DefaultExchangeHolder) {
+                    DefaultExchangeHolder holder = (DefaultExchangeHolder) payload;
+                    DefaultExchangeHolder.unmarshal(exchange, holder);
+                    return exchange.getIn().getBody();
+                } else {
+                    return objectMessage.getObject();
+                }
+            } else if (message instanceof TextMessage) {
+                LOG.trace("Extracting body as a TextMessage from JMS message: {}", message);
+                TextMessage textMessage = (TextMessage)message;
+                return textMessage.getText();
+            } else if (message instanceof MapMessage) {
+                LOG.trace("Extracting body as a MapMessage from JMS message: {}", message);
+                return createMapFromMapMessage((MapMessage)message);
+            } else if (message instanceof BytesMessage) {
+                LOG.trace("Extracting body as a BytesMessage from JMS message: {}", message);
+                return createByteArrayFromBytesMessage((BytesMessage)message);
+            } else if (message instanceof StreamMessage) {
+                LOG.trace("Extracting body as a StreamMessage from JMS message: {}", message);
+                return message;
+            } else {
+                return null;
+            }
+        } catch (JMSException e) {
+            throw new RuntimeCamelException("Failed to extract body due to: " + e + ". Message: " + message, e);
+        }
+    }
+
+    public Map<String, Object> extractHeadersFromJms(Message jmsMessage, Exchange exchange) {
+        Map<String, Object> map = new HashMap<String, Object>();
+        if (jmsMessage != null) {
+            // lets populate the standard JMS message headers
+            try {
+                map.put("JMSCorrelationID", jmsMessage.getJMSCorrelationID());
+                map.put("JMSCorrelationIDAsBytes", JmsMessageHelper.getJMSCorrelationIDAsBytes(jmsMessage));
+                map.put("JMSDeliveryMode", jmsMessage.getJMSDeliveryMode());
+                map.put("JMSDestination", jmsMessage.getJMSDestination());
+                map.put("JMSExpiration", jmsMessage.getJMSExpiration());
+                map.put("JMSMessageID", jmsMessage.getJMSMessageID());
+                map.put("JMSPriority", jmsMessage.getJMSPriority());
+                map.put("JMSRedelivered", jmsMessage.getJMSRedelivered());
+                map.put("JMSTimestamp", jmsMessage.getJMSTimestamp());
+
+                map.put("JMSReplyTo", JmsMessageHelper.getJMSReplyTo(jmsMessage));
+                map.put("JMSType", JmsMessageHelper.getJMSType(jmsMessage));
+
+                // this works around a bug in the ActiveMQ property handling
+                map.put(JmsConstants.JMSX_GROUP_ID, JmsMessageHelper.getStringProperty(jmsMessage, JmsConstants.JMSX_GROUP_ID));
+                map.put("JMSXUserID", JmsMessageHelper.getStringProperty(jmsMessage, "JMSXUserID"));
+            } catch (JMSException e) {
+                throw new RuntimeCamelException(e);
+            }
+
+            Enumeration<?> names;
+            try {
+                names = jmsMessage.getPropertyNames();
+            } catch (JMSException e) {
+                throw new RuntimeCamelException(e);
+            }
+            while (names.hasMoreElements()) {
+                String name = names.nextElement().toString();
+                try {
+                    Object value = JmsMessageHelper.getProperty(jmsMessage, name);
+                    if (headerFilterStrategy != null
+                            && headerFilterStrategy.applyFilterToExternalHeaders(name, value, exchange)) {
+                        continue;
+                    }
+
+                    // must decode back from safe JMS header name to original header name
+                    // when storing on this Camel JmsMessage object.
+                    String key = jmsJmsKeyFormatStrategy.decodeKey(name);
+                    map.put(key, value);
+                } catch (JMSException e) {
+                    throw new RuntimeCamelException(name, e);
+                }
+            }
+        }
+
+        return map;
+    }
+
+    public Object getObjectProperty(Message jmsMessage, String name) throws JMSException {
+        // try a direct lookup first
+        Object answer = jmsMessage.getObjectProperty(name);
+        if (answer == null) {
+            // then encode the key and do another lookup
+            String key = jmsJmsKeyFormatStrategy.encodeKey(name);
+            answer = jmsMessage.getObjectProperty(key);
+        }
+        return answer;
+    }
+
+    protected byte[] createByteArrayFromBytesMessage(BytesMessage message) throws JMSException {
+        if (message.getBodyLength() > Integer.MAX_VALUE) {
+            LOG.warn("Length of BytesMessage is too long: {}", message.getBodyLength());
+            return null;
+        }
+        byte[] result = new byte[(int)message.getBodyLength()];
+        message.readBytes(result);
+        return result;
+    }
+
+    /**
+     * Creates a JMS message from the Camel exchange and message
+     *
+     * @param exchange the current exchange
+     * @param session the JMS session used to create the message
+     * @return a newly created JMS Message instance containing the
+     * @throws JMSException if the message could not be created
+     */
+    public Message makeJmsMessage(Exchange exchange, Session session) throws JMSException {
+        Message answer = makeJmsMessage(exchange, exchange.getIn().getBody(), exchange.getIn().getHeaders(), session, null);
+        if (answer != null && messageCreatedStrategy != null) {
+            messageCreatedStrategy.onMessageCreated(answer, session, exchange, null);
+        }
+        return answer;
+    }
+
+    /**
+     * Creates a JMS message from the Camel exchange and message
+     *
+     * @param exchange the current exchange
+     * @param body the message body
+     * @param headers the message headers
+     * @param session the JMS session used to create the message
+     * @param cause optional exception occurred that should be sent as reply instead of a regular body
+     * @return a newly created JMS Message instance containing the
+     * @throws JMSException if the message could not be created
+     */
+    public Message makeJmsMessage(Exchange exchange, Object body, Map headers, Session session, Exception cause) throws JMSException {
+        Message answer = null;
+
+        // TODO: look at supporting some of these options
+
+/*        boolean alwaysCopy = endpoint != null && endpoint.getConfiguration().isAlwaysCopyMessage();
+        boolean force = endpoint != null && endpoint.getConfiguration().isForceSendOriginalMessage();
+        if (!alwaysCopy && camelMessage instanceof JmsMessage) {
+            JmsMessage jmsMessage = (JmsMessage)camelMessage;
+            if (!jmsMessage.shouldCreateNewMessage() || force) {
+                answer = jmsMessage.getJmsMessage();
+
+                if (!force) {
+                    // answer must match endpoint type
+                    JmsMessageType type = endpoint != null ? endpoint.getConfiguration().getJmsMessageType() : null;
+                    if (type != null && answer != null) {
+                        if (type == JmsMessageType.Text) {
+                            answer = answer instanceof TextMessage ? answer : null;
+                        } else if (type == JmsMessageType.Bytes) {
+                            answer = answer instanceof BytesMessage ? answer : null;
+                        } else if (type == JmsMessageType.Map) {
+                            answer = answer instanceof MapMessage ? answer : null;
+                        } else if (type == JmsMessageType.Object) {
+                            answer = answer instanceof ObjectMessage ? answer : null;
+                        } else if (type == JmsMessageType.Stream) {
+                            answer = answer instanceof StreamMessage ? answer : null;
+                        }
+                    }
+                }
+            }
+        }
+*/
+
+        if (answer == null) {
+            if (cause != null) {
+                // an exception occurred so send it as response
+                LOG.debug("Will create JmsMessage with caused exception: {}", cause);
+                // create jms message containing the caused exception
+                answer = createJmsMessage(cause, session);
+            } else {
+                // create regular jms message using the camel message body
+                answer = createJmsMessage(exchange, body, headers, session, exchange.getContext());
+                appendJmsProperties(answer, exchange, headers);
+            }
+        }
+
+        if (answer != null && messageCreatedStrategy != null) {
+            messageCreatedStrategy.onMessageCreated(answer, session, exchange, null);
+        }
+        return answer;
+    }
+
+    /**
+     * Appends the JMS headers from the Camel {@link Message}
+     */
+    public void appendJmsProperties(Message jmsMessage, Exchange exchange, Map<String, Object> headers) throws JMSException {
+        if (headers != null) {
+            Set<Map.Entry<String, Object>> entries = headers.entrySet();
+            for (Map.Entry<String, Object> entry : entries) {
+                String headerName = entry.getKey();
+                Object headerValue = entry.getValue();
+                appendJmsProperty(jmsMessage, exchange, headerName, headerValue);
+            }
+        }
+    }
+
+    public void appendJmsProperty(Message jmsMessage, Exchange exchange, String headerName, Object headerValue) throws JMSException {
+        if (isStandardJMSHeader(headerName)) {
+            if (headerName.equals("JMSCorrelationID")) {
+                jmsMessage.setJMSCorrelationID(ExchangeHelper.convertToType(exchange, String.class, headerValue));
+            } else if (headerName.equals("JMSReplyTo") && headerValue != null) {
+                if (headerValue instanceof String) {
+                    // if the value is a String we must normalize it first, and must include the prefix
+                    // as ActiveMQ requires that when converting the String to a javax.jms.Destination type
+                    headerValue = normalizeDestinationName((String) headerValue, true);
+                }
+                Destination replyTo = ExchangeHelper.convertToType(exchange, Destination.class, headerValue);
+                JmsMessageHelper.setJMSReplyTo(jmsMessage, replyTo);
+            } else if (headerName.equals("JMSType")) {
+                jmsMessage.setJMSType(ExchangeHelper.convertToType(exchange, String.class, headerValue));
+            } else if (headerName.equals("JMSPriority")) {
+                jmsMessage.setJMSPriority(ExchangeHelper.convertToType(exchange, Integer.class, headerValue));
+            } else if (headerName.equals("JMSDeliveryMode")) {
+                JmsMessageHelper.setJMSDeliveryMode(exchange, jmsMessage, headerValue);
+            } else if (headerName.equals("JMSExpiration")) {
+                jmsMessage.setJMSExpiration(ExchangeHelper.convertToType(exchange, Long.class, headerValue));
+            } else {
+                // The following properties are set by the MessageProducer:
+                // JMSDestination
+                // The following are set on the underlying JMS provider:
+                // JMSMessageID, JMSTimestamp, JMSRedelivered
+                // log at trace level to not spam log
+                LOG.trace("Ignoring JMS header: {} with value: {}", headerName, headerValue);
+            }
+        } else if (shouldOutputHeader(headerName, headerValue, exchange)) {
+            // only primitive headers and strings is allowed as properties
+            // see message properties: http://java.sun.com/j2ee/1.4/docs/api/javax/jms/Message.html
+            Object value = getValidJMSHeaderValue(headerName, headerValue);
+            if (value != null) {
+                // must encode to safe JMS header name before setting property on jmsMessage
+                String key = jmsJmsKeyFormatStrategy.encodeKey(headerName);
+                // set the property
+                JmsMessageHelper.setProperty(jmsMessage, key, value);
+            } else if (LOG.isDebugEnabled()) {
+                // okay the value is not a primitive or string so we cannot sent it over the wire
+                LOG.debug("Ignoring non primitive header: {} of class: {} with value: {}",
+                        new Object[]{headerName, headerValue.getClass().getName(), headerValue});
+            }
+        }
+    }
+
+    /**
+     * Is the given header a standard JMS header
+     * @param headerName the header name
+     * @return <tt>true</tt> if its a standard JMS header
+     */
+    protected boolean isStandardJMSHeader(String headerName) {
+        if (!headerName.startsWith("JMS")) {
+            return false;
+        }
+        if (headerName.startsWith("JMSX")) {
+            return false;
+        }
+        // vendors will use JMS_XXX as their special headers (where XXX is vendor name, such as JMS_IBM)
+        if (headerName.startsWith("JMS_")) {
+            return false;
+        }
+
+        // the 4th char must be a letter to be a standard JMS header
+        if (headerName.length() > 3) {
+            Character fourth = headerName.charAt(3);
+            if (Character.isLetter(fourth)) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    /**
+     * Strategy to test if the given header is valid according to the JMS spec to be set as a property
+     * on the JMS message.
+     * <p/>
+     * This default implementation will allow:
+     * <ul>
+     *   <li>any primitives and their counter Objects (Integer, Double etc.)</li>
+     *   <li>String and any other literals, Character, CharSequence</li>
+     *   <li>Boolean</li>
+     *   <li>Number</li>
+     *   <li>java.util.Date</li>
+     * </ul>
+     *
+     * @param headerName   the header name
+     * @param headerValue  the header value
+     * @return  the value to use, <tt>null</tt> to ignore this header
+     */
+    protected Object getValidJMSHeaderValue(String headerName, Object headerValue) {
+        if (headerValue instanceof String) {
+            return headerValue;
+        } else if (headerValue instanceof BigInteger) {
+            return headerValue.toString();
+        } else if (headerValue instanceof BigDecimal) {
+            return headerValue.toString();
+        } else if (headerValue instanceof Number) {
+            return headerValue;
+        } else if (headerValue instanceof Character) {
+            return headerValue;
+        } else if (headerValue instanceof CharSequence) {
+            return headerValue.toString();
+        } else if (headerValue instanceof Boolean) {
+            return headerValue;
+        } else if (headerValue instanceof Date) {
+            return headerValue.toString();
+        }
+        return null;
+    }
+
+    protected Message createJmsMessage(Exception cause, Session session) throws JMSException {
+        LOG.trace("Using JmsMessageType: {}", JmsMessageType.Object);
+        Message answer = session.createObjectMessage(cause);
+        // ensure default delivery mode is used by default
+        answer.setJMSDeliveryMode(Message.DEFAULT_DELIVERY_MODE);
+        return answer;
+    }
+
+    protected Message createJmsMessage(Exchange exchange, Object body, Map<String, Object> headers, Session session, CamelContext context) throws JMSException {
+        JmsMessageType type = null;
+
+        // TODO: support some of these options?
+
+/*        // special for transferExchange
+        if (endpoint != null && endpoint.isTransferExchange()) {
+            LOG.trace("Option transferExchange=true so we use JmsMessageType: Object");
+            Serializable holder = DefaultExchangeHolder.marshal(exchange);
+            Message answer = session.createObjectMessage(holder);
+            // ensure default delivery mode is used by default
+            answer.setJMSDeliveryMode(Message.DEFAULT_DELIVERY_MODE);
+            return answer;
+        }
+
+        // use a custom message converter
+        if (endpoint != null && endpoint.getMessageConverter() != null) {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Creating JmsMessage using a custom MessageConverter: {} with body: {}", endpoint.getMessageConverter(), body);
+            }
+            return endpoint.getMessageConverter().toMessage(body, session);
+        }
+*/
+        // check if header have a type set, if so we force to use it
+/*
+        if (headers.containsKey(JmsConstants.JMS_MESSAGE_TYPE)) {
+            type = context.getTypeConverter().convertTo(JmsMessageType.class, headers.get(JmsConstants.JMS_MESSAGE_TYPE));
+        } else if (endpoint != null && endpoint.getConfiguration().getJmsMessageType() != null) {
+            // force a specific type from the endpoint configuration
+            type = endpoint.getConfiguration().getJmsMessageType();
+        } else {
+*/            type = getJMSMessageTypeForBody(exchange, body, headers, session, context);
+        //}
+
+        // create the JmsMessage based on the type
+        if (type != null) {
+            if (body == null && !allowNullBody) {
+                throw new JMSException("Cannot send message as message body is null, and option allowNullBody is false.");
+            }
+            LOG.trace("Using JmsMessageType: {}", type);
+            Message answer = createJmsMessageForType(exchange, body, headers, session, context, type);
+            // ensure default delivery mode is used by default
+            answer.setJMSDeliveryMode(Message.DEFAULT_DELIVERY_MODE);
+            return answer;
+        }
+
+        // check for null body
+        if (body == null && !allowNullBody) {
+            throw new JMSException("Cannot send message as message body is null, and option allowNullBody is false.");
+        }
+
+        // warn if the body could not be mapped
+        if (body != null && LOG.isWarnEnabled()) {
+            LOG.warn("Cannot determine specific JmsMessage type to use from body class."
+                    + " Will use generic JmsMessage."
+                    + " Body class: " + ObjectHelper.classCanonicalName(body)
+                    + ". If you want to send a POJO then your class might need to implement java.io.Serializable"
+                    + ", or you can force a specific type by setting the jmsMessageType option on the JMS endpoint.");
+        }
+
+        // return a default message
+        Message answer = session.createMessage();
+        // ensure default delivery mode is used by default
+        answer.setJMSDeliveryMode(Message.DEFAULT_DELIVERY_MODE);
+        return answer;
+    }
+
+    /**
+     * Return the {@link JmsMessageType}
+     *
+     * @return type or null if no mapping was possible
+     */
+    protected JmsMessageType getJMSMessageTypeForBody(Exchange exchange, Object body, Map<String, Object> headers, Session session, CamelContext context) {
+        JmsMessageType type = null;
+        // let body determine the type
+        if (body instanceof Node || body instanceof String) {
+            type = JmsMessageType.Text;
+        } else if (body instanceof byte[] || body instanceof WrappedFile || body instanceof File || body instanceof Reader
+                || body instanceof InputStream || body instanceof ByteBuffer || body instanceof StreamCache) {
+            type = JmsMessageType.Bytes;
+        } else if (body instanceof Map) {
+            type = JmsMessageType.Map;
+        } else if (body instanceof Serializable) {
+            type = JmsMessageType.Object;
+        } else if (exchange.getContext().getTypeConverter().tryConvertTo(File.class, body) != null
+                || exchange.getContext().getTypeConverter().tryConvertTo(InputStream.class, body) != null) {
+            type = JmsMessageType.Bytes;
+        }
+        return type;
+    }
+
+    /**
+     *
+     * Create the {@link Message}
+     *
+     * @return jmsMessage or null if the mapping was not successfully
+     */
+    protected Message createJmsMessageForType(Exchange exchange, Object body, Map<String, Object> headers, Session session, CamelContext context, JmsMessageType type) throws JMSException {
+        switch (type) {
+            case Text: {
+                TextMessage message = session.createTextMessage();
+                if (body != null) {
+                    String payload = context.getTypeConverter().convertTo(String.class, exchange, body);
+                    message.setText(payload);
+                }
+                return message;
+            }
+            case Bytes: {
+                BytesMessage message = session.createBytesMessage();
+                if (body != null) {
+                    byte[] payload = context.getTypeConverter().convertTo(byte[].class, exchange, body);
+                    message.writeBytes(payload);
+                }
+                return message;
+            }
+            case Map: {
+                MapMessage message = session.createMapMessage();
+                if (body != null) {
+                    Map<?, ?> payload = context.getTypeConverter().convertTo(Map.class, exchange, body);
+                    populateMapMessage(message, payload, context);
+                }
+                return message;
+            }
+            case Object:
+                ObjectMessage message = session.createObjectMessage();
+                if (body != null) {
+                    try {
+                        Serializable payload = context.getTypeConverter().mandatoryConvertTo(Serializable.class, exchange, body);
+                        message.setObject(payload);
+                    } catch (NoTypeConversionAvailableException e) {
+                        // cannot convert to serializable then thrown an exception to avoid sending a null message
+                        JMSException cause = new MessageFormatException(e.getMessage());
+                        cause.initCause(e);
+                        throw cause;
+                    }
+                }
+                return message;
+            default:
+                break;
+        }
+        return null;
+    }
+    /**
+     * Populates a {@link MapMessage} from a {@link Map} instance.
+     */
+    protected void populateMapMessage(MapMessage message, Map<?, ?> map, CamelContext context)
+            throws JMSException {
+        for (Map.Entry<?, ?> entry : map.entrySet()) {
+            String keyString = CamelContextHelper.convertTo(context, String.class, entry.getKey());
+            if (keyString != null) {
+                message.setObject(keyString, entry.getValue());
+            }
+        }
+    }
+
+    /**
+     * Extracts a {@link Map} from a {@link MapMessage}
+     */
+    public Map<String, Object> createMapFromMapMessage(MapMessage message) throws JMSException {
+        Map<String, Object> answer = new HashMap<String, Object>();
+        Enumeration<?> names = message.getMapNames();
+        while (names.hasMoreElements()) {
+            String name = names.nextElement().toString();
+            Object value = message.getObject(name);
+            answer.put(name, value);
+        }
+        return answer;
+    }
+
+    /**
+     * Strategy to allow filtering of headers which are put on the JMS message
+     * <p/>
+     * <b>Note</b>: Currently only supports sending java identifiers as keys
+     */
+    protected boolean shouldOutputHeader(String headerName, Object headerValue, Exchange exchange) {
+        return headerFilterStrategy == null
+                || !headerFilterStrategy.applyFilterToCamelHeaders(headerName, headerValue, exchange);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/d19e5d74/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsKeyFormatStrategy.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsKeyFormatStrategy.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsKeyFormatStrategy.java
new file mode 100644
index 0000000..5a0327e
--- /dev/null
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsKeyFormatStrategy.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.jms;
+
+/**
+ * Strategy for applying encoding and decoding of JMS headers so they apply to
+ * the JMS spec.
+ */
+public interface JmsKeyFormatStrategy {
+
+    /**
+     * Encodes the key before its sent as a {@link javax.jms.Message} message.
+     *
+     * @param key the original key
+     * @return the encoded key
+     */
+    String encodeKey(String key);
+
+    /**
+     * Decodes the key after its received from a {@link javax.jms.Message}
+     * message.
+     *
+     * @param key the encoded key
+     * @return the decoded key as the original key
+     */
+    String decodeKey(String key);
+}


[4/5] camel git commit: CAMEL-9116: camel-sjms should use same binding to/from JMS as camel-jms does.

Posted by da...@apache.org.
CAMEL-9116: camel-sjms should use same binding to/from JMS as camel-jms does.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/93bf668b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/93bf668b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/93bf668b

Branch: refs/heads/master
Commit: 93bf668b2ff8787b72fc5ca68058838fa6a9af44
Parents: d19e5d7
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Sep 7 11:18:06 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Sep 7 11:18:06 2015 +0200

----------------------------------------------------------------------
 .../java/org/apache/camel/component/sjms/SjmsComponent.java    | 6 ++++++
 1 file changed, 6 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/93bf668b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
index a7347c7..2748aae 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java
@@ -71,6 +71,12 @@ public class SjmsComponent extends UriEndpointComponent implements HeaderFilterS
         if (destinationCreationStrategy != null) {
             endpoint.setDestinationCreationStrategy(destinationCreationStrategy);
         }
+        if (headerFilterStrategy != null) {
+            endpoint.setHeaderFilterStrategy(headerFilterStrategy);
+        }
+        if (messageCreatedStrategy != null) {
+            endpoint.setMessageCreatedStrategy(messageCreatedStrategy);
+        }
         return endpoint;
     }
 


[2/5] camel git commit: CAMEL-9116: camel-sjms should use same binding to/from JMS as camel-jms does.

Posted by da...@apache.org.
http://git-wip-us.apache.org/repos/asf/camel/blob/d19e5d74/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java
index 0a93f16..f3224b6 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java
@@ -16,408 +16,272 @@
  */
 package org.apache.camel.component.sjms.jms;
 
-import java.io.File;
-import java.io.InputStream;
-import java.io.Reader;
-import java.io.Serializable;
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.nio.ByteBuffer;
-import java.nio.CharBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Date;
 import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.List;
+import java.util.LinkedHashMap;
 import java.util.Map;
-import java.util.Set;
-import javax.jms.BytesMessage;
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
 import javax.jms.JMSException;
-import javax.jms.MapMessage;
 import javax.jms.Message;
-import javax.jms.ObjectMessage;
-import javax.jms.Session;
-import javax.jms.StreamMessage;
-import javax.jms.TextMessage;
 
-import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
-import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.TypeConverter;
-import org.apache.camel.component.sjms.SjmsConstants;
-import org.apache.camel.component.sjms.SjmsEndpoint;
-import org.apache.camel.impl.DefaultMessage;
-import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.ObjectHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import static org.apache.camel.util.ObjectHelper.removeStartingCharacters;
 
 /**
  * Utility class for {@link javax.jms.Message}.
  */
-public final class JmsMessageHelper implements JmsConstants {
-
-    private static final Logger LOG = LoggerFactory.getLogger(JmsMessageHelper.class);
+public final class JmsMessageHelper {
 
     private JmsMessageHelper() {
     }
 
-    public static Exchange createExchange(Message message, Endpoint endpoint) {
-        return createExchange(message, endpoint, null);
-    }
-
     /**
-     * Creates an Exchange from a JMS Message.
-     * @param message The JMS message.
-     * @param endpoint The Endpoint to use to create the Exchange object.
-     * @param keyFormatStrategy the a {@link KeyFormatStrategy} to used to
-     *                          format keys in a JMS 1.1 compliant manner. If null the
-     *                          {@link DefaultJmsKeyFormatStrategy} will be used.
-     * @return Populated Exchange.
+     * Removes the property from the JMS message.
+     *
+     * @param jmsMessage the JMS message
+     * @param name       name of the property to remove
+     * @return the old value of the property or <tt>null</tt> if not exists
+     * @throws javax.jms.JMSException can be thrown
      */
-    public static Exchange createExchange(Message message, Endpoint endpoint, KeyFormatStrategy keyFormatStrategy) {
-        Exchange exchange = endpoint.createExchange();
-        KeyFormatStrategy initialisedKeyFormatStrategy = (keyFormatStrategy == null)
-                ? new DefaultJmsKeyFormatStrategy() : keyFormatStrategy;
-        return populateExchange(message, exchange, false, initialisedKeyFormatStrategy);
-    }
-
-    @SuppressWarnings("unchecked")
-    public static Exchange populateExchange(Message message, Exchange exchange, boolean out, KeyFormatStrategy keyFormatStrategy) {
-        try {
-            setJmsMessageHeaders(message, exchange, out, keyFormatStrategy);
-            if (message != null) {
-                // convert to JMS Message of the given type
+    public static Object removeJmsProperty(Message jmsMessage, String name) throws JMSException {
+        // check if the property exists
+        if (!jmsMessage.propertyExists(name)) {
+            return null;
+        }
 
-                DefaultMessage bodyMessage;
-                if (out) {
-                    bodyMessage = (DefaultMessage) exchange.getOut();
-                } else {
-                    bodyMessage = (DefaultMessage) exchange.getIn();
-                }
-                switch (JmsMessageHelper.discoverJmsMessageType(message)) {
-                case Bytes:
-                    BytesMessage bytesMessage = (BytesMessage) message;
-                    if (bytesMessage.getBodyLength() > Integer.MAX_VALUE) {
-                        LOG.warn("Length of BytesMessage is too long: {}", bytesMessage.getBodyLength());
-                        return null;
-                    }
-                    byte[] result = new byte[(int) bytesMessage.getBodyLength()];
-                    bytesMessage.readBytes(result);
-                    bodyMessage.setHeader(SjmsConstants.JMS_MESSAGE_TYPE, JmsMessageType.Bytes);
-                    bodyMessage.setBody(result);
-                    break;
-                case Map:
-                    Map<String, Object> body = new HashMap<String, Object>();
-                    MapMessage mapMessage = (MapMessage) message;
-                    Enumeration<String> names = mapMessage.getMapNames();
-                    while (names.hasMoreElements()) {
-                        String key = names.nextElement();
-                        Object value = mapMessage.getObject(key);
-                        body.put(key, value);
-                    }
-                    bodyMessage.setHeader(SjmsConstants.JMS_MESSAGE_TYPE, JmsMessageType.Map);
-                    bodyMessage.setBody(body);
-                    break;
-                case Object:
-                    ObjectMessage objMsg = (ObjectMessage) message;
-                    bodyMessage.setHeader(SjmsConstants.JMS_MESSAGE_TYPE, JmsMessageType.Object);
-                    bodyMessage.setBody(objMsg.getObject());
-                    break;
-                case Text:
-                    TextMessage textMsg = (TextMessage) message;
-                    bodyMessage.setHeader(SjmsConstants.JMS_MESSAGE_TYPE, JmsMessageType.Text);
-                    bodyMessage.setBody(textMsg.getText());
-                    break;
-                case Stream:
-                    StreamMessage streamMessage = (StreamMessage) message;
-                    List<Object> list = new ArrayList<Object>();
-                    Object obj;
-                    while ((obj = streamMessage.readObject()) != null) {
-                        list.add(obj);
-                    }
-                    bodyMessage.setHeader(SjmsConstants.JMS_MESSAGE_TYPE, JmsMessageType.Stream);
-                    bodyMessage.setBody(list);
-                    break;
-                case Message:
-                default:
-                    // Do nothing. Only set the headers for an empty message
-                    bodyMessage.setBody(message);
-                    break;
-                }
+        Object answer = null;
+
+        // store the properties we want to keep in a temporary map
+        // as the JMS API is a bit strict as we are not allowed to
+        // clear a single property, but must clear them all and redo
+        // the properties
+        Map<String, Object> map = new LinkedHashMap<String, Object>();
+        Enumeration<?> en = jmsMessage.getPropertyNames();
+        while (en.hasMoreElements()) {
+            String key = (String) en.nextElement();
+            if (name.equals(key)) {
+                answer = key;
+            } else {
+                map.put(key, getProperty(jmsMessage, key));
             }
-        } catch (Exception e) {
-            exchange.setException(e);
         }
-        return exchange;
-    }
-
-    public static Message createMessage(Exchange exchange, Session session, SjmsEndpoint endpoint) throws Exception {
-        Message answer;
-        Object body;
-        Map<String, Object> bodyHeaders;
 
-        if (exchange.getOut().getBody() != null) {
-            body = exchange.getOut().getBody();
-            bodyHeaders = new HashMap<String, Object>(exchange.getOut().getHeaders());
-        } else {
-            body = exchange.getIn().getBody();
-            bodyHeaders = new HashMap<String, Object>(exchange.getIn().getHeaders());
+        // redo the properties to keep
+        jmsMessage.clearProperties();
+        for (Map.Entry<String, Object> entry : map.entrySet()) {
+            jmsMessage.setObjectProperty(entry.getKey(), entry.getValue());
         }
 
-        answer = createMessage(exchange, session, body, bodyHeaders, endpoint);
         return answer;
     }
 
-    public static Message createMessage(Exchange exchange, Session session, Object payload, Map<String, Object> messageHeaders, SjmsEndpoint endpoint) throws Exception {
-        return createMessage(exchange, session, payload, messageHeaders, endpoint.isAllowNullBody(),
-                endpoint.getSjmsHeaderFilterStrategy(), endpoint.getJmsKeyFormatStrategy(), endpoint.getCamelContext().getTypeConverter());
-    }
-
-    private static Message createMessage(Exchange exchange, Session session, Object payload, Map<String, Object> messageHeaders, boolean allowNullBody,
-                                         HeaderFilterStrategy headerFilterStrategy, KeyFormatStrategy keyFormatStrategy, TypeConverter typeConverter) throws Exception {
-        Message answer = null;
-        JmsMessageType messageType = JmsMessageHelper.discoverMessageTypeFromPayload(payload);
-
-        switch (messageType) {
-        case Bytes:
-            BytesMessage bytesMessage = session.createBytesMessage();
-            byte[] bytesToWrite = typeConverter.convertTo(byte[].class, payload);
-            bytesMessage.writeBytes(bytesToWrite);
-            answer = bytesMessage;
-            break;
-        case Map:
-            MapMessage mapMessage = session.createMapMessage();
-            Map objMap = (Map) payload;
-            for (final Map.Entry entry : (Set<Map.Entry>)objMap.entrySet()) {
-                mapMessage.setObject(entry.getKey().toString(), entry.getValue());
-            }
-            answer = mapMessage;
-            break;
-        case Object:
-            ObjectMessage objectMessage = session.createObjectMessage();
-            objectMessage.setObject((Serializable) payload);
-            answer = objectMessage;
-            break;
-        case Text:
-            TextMessage textMessage = session.createTextMessage();
-            String convertedText = typeConverter.convertTo(String.class, payload);
-            textMessage.setText(convertedText);
-            answer = textMessage;
-            break;
-        case Stream:
-            StreamMessage streamMessage = session.createStreamMessage();
-            Collection collection = (Collection)payload;
-            for (final Object obj : collection) {
-                streamMessage.writeObject(obj);
-            }
-            answer = streamMessage;
-            break;
-        case Message:
-            if (allowNullBody && payload == null) {
-                answer = session.createMessage();
-            } else if (payload != null) {
-                throw new JMSException("Unsupported message body type " + ObjectHelper.classCanonicalName(payload));
-            } else {
-                throw new JMSException("Null body is not allowed");
+    /**
+     * Tests whether a given property with the name exists
+     *
+     * @param jmsMessage the JMS message
+     * @param name       name of the property to test if exists
+     * @return <tt>true</tt> if the property exists, <tt>false</tt> if not.
+     * @throws JMSException can be thrown
+     */
+    public static boolean hasProperty(Message jmsMessage, String name) throws JMSException {
+        Enumeration<?> en = jmsMessage.getPropertyNames();
+        while (en.hasMoreElements()) {
+            String key = (String) en.nextElement();
+            if (name.equals(key)) {
+                return true;
             }
-            break;
-        default:
-            break;
         }
-
-        appendJmsProperties(answer, exchange, exchange.getIn(), headerFilterStrategy, keyFormatStrategy);
-        return answer;
+        return false;
     }
 
     /**
-     * Appends the JMS headers from the Camel {@link Message}
+     * Gets a JMS property
+     *
+     * @param jmsMessage the JMS message
+     * @param name       name of the property to get
+     * @return the property value, or <tt>null</tt> if does not exists
+     * @throws JMSException can be thrown
      */
-    private static void appendJmsProperties(Message jmsMessage, Exchange exchange, org.apache.camel.Message in,
-                                            HeaderFilterStrategy headerFilterStrategy, KeyFormatStrategy keyFormatStrategy) throws JMSException {
-        Set<Map.Entry<String, Object>> entries = in.getHeaders().entrySet();
-        for (Map.Entry<String, Object> entry : entries) {
-            String headerName = entry.getKey();
-            Object headerValue = entry.getValue();
-            appendJmsProperty(jmsMessage, exchange, in, headerName, headerValue, headerFilterStrategy, keyFormatStrategy);
+    public static Object getProperty(Message jmsMessage, String name) throws JMSException {
+        Object value = jmsMessage.getObjectProperty(name);
+        if (value == null) {
+            value = jmsMessage.getStringProperty(name);
         }
+        return value;
     }
 
-    private static void appendJmsProperty(Message jmsMessage, Exchange exchange, org.apache.camel.Message in,
-                                  String headerName, Object headerValue,
-                                  HeaderFilterStrategy headerFilterStrategy, KeyFormatStrategy keyFormatStrategy) throws JMSException {
-        if (isStandardJMSHeader(headerName)) {
-            if (headerName.equals("JMSCorrelationID")) {
-                jmsMessage.setJMSCorrelationID(ExchangeHelper.convertToType(exchange, String.class, headerValue));
-            } else if (headerName.equals("JMSReplyTo") && headerValue != null) {
-                if (headerValue instanceof String) {
-                    // if the value is a String we must normalize it first, and must include the prefix
-                    // as ActiveMQ requires that when converting the String to a javax.jms.Destination type
-                    headerValue = normalizeDestinationName((String) headerValue, true);
-                }
-                Destination replyTo = ExchangeHelper.convertToType(exchange, Destination.class, headerValue);
-                JmsMessageHelper.setJMSReplyTo(jmsMessage, replyTo);
-            } else if (headerName.equals("JMSType")) {
-                jmsMessage.setJMSType(ExchangeHelper.convertToType(exchange, String.class, headerValue));
-            } else if (headerName.equals("JMSPriority")) {
-                jmsMessage.setJMSPriority(ExchangeHelper.convertToType(exchange, Integer.class, headerValue));
-            } else if (headerName.equals("JMSDeliveryMode")) {
-                JmsMessageHelper.setJMSDeliveryMode(exchange, jmsMessage, headerValue);
-            } else if (headerName.equals("JMSExpiration")) {
-                jmsMessage.setJMSExpiration(ExchangeHelper.convertToType(exchange, Long.class, headerValue));
-            } else {
-                // The following properties are set by the MessageProducer:
-                // JMSDestination
-                // The following are set on the underlying JMS provider:
-                // JMSMessageID, JMSTimestamp, JMSRedelivered
-                // log at trace level to not spam log
-                LOG.trace("Ignoring JMS header: {} with value: {}", headerName, headerValue);
-            }
-        } else if (shouldOutputHeader(in, headerName, headerValue, exchange, headerFilterStrategy)) {
-            // only primitive headers and strings is allowed as properties
-            // see message properties: http://java.sun.com/j2ee/1.4/docs/api/javax/jms/Message.html
-            Object value = getValidJMSHeaderValue(headerName, headerValue);
-            if (value != null) {
-                // must encode to safe JMS header name before setting property on jmsMessage
-                String key = keyFormatStrategy.encodeKey(headerName);
-                // set the property
-                JmsMessageHelper.setProperty(jmsMessage, key, value);
-            } else if (LOG.isDebugEnabled()) {
-                // okay the value is not a primitive or string so we cannot sent it over the wire
-                LOG.debug("Ignoring non primitive header: {} of class: {} with value: {}",
-                        new Object[]{headerName, headerValue.getClass().getName(), headerValue});
-            }
+    /**
+     * Sets the property on the given JMS message.
+     *
+     * @param jmsMessage  the JMS message
+     * @param name        name of the property to set
+     * @param value       the value
+     * @throws JMSException can be thrown
+     */
+    public static void setProperty(Message jmsMessage, String name, Object value) throws JMSException {
+        if (value == null) {
+            return;
+        }
+        if (value instanceof Byte) {
+            jmsMessage.setByteProperty(name, (Byte) value);
+        } else if (value instanceof Boolean) {
+            jmsMessage.setBooleanProperty(name, (Boolean) value);
+        } else if (value instanceof Double) {
+            jmsMessage.setDoubleProperty(name, (Double) value);
+        } else if (value instanceof Float) {
+            jmsMessage.setFloatProperty(name, (Float) value);
+        } else if (value instanceof Integer) {
+            jmsMessage.setIntProperty(name, (Integer) value);
+        } else if (value instanceof Long) {
+            jmsMessage.setLongProperty(name, (Long) value);
+        } else if (value instanceof Short) {
+            jmsMessage.setShortProperty(name, (Short) value);
+        } else if (value instanceof String) {
+            jmsMessage.setStringProperty(name, (String) value);
+        } else {
+            // fallback to Object
+            jmsMessage.setObjectProperty(name, value);
         }
     }
 
     /**
-     * Is the given header a standard JMS header
-     * @param headerName the header name
-     * @return <tt>true</tt> if its a standard JMS header
+     * Sets the correlation id on the JMS message.
+     * <p/>
+     * Will ignore exception thrown
+     *
+     * @param message  the JMS message
+     * @param correlationId the correlation id
      */
-    protected static boolean isStandardJMSHeader(String headerName) {
-        if (!headerName.startsWith("JMS")) {
-            return false;
-        }
-        if (headerName.startsWith("JMSX")) {
-            return false;
+    public static void setCorrelationId(Message message, String correlationId) {
+        try {
+            message.setJMSCorrelationID(correlationId);
+        } catch (JMSException e) {
+            // ignore
         }
-        // vendors will use JMS_XXX as their special headers (where XXX is vendor name, such as JMS_IBM)
-        if (headerName.startsWith("JMS_")) {
+    }
+
+    /**
+     * Whether the destination name has either queue or temp queue prefix.
+     *
+     * @param destination the destination
+     * @return <tt>true</tt> if queue or temp-queue prefix, <tt>false</tt> otherwise
+     */
+    public static boolean isQueuePrefix(String destination) {
+        if (ObjectHelper.isEmpty(destination)) {
             return false;
         }
 
-        // the 4th char must be a letter to be a standard JMS header
-        if (headerName.length() > 3) {
-            Character fourth = headerName.charAt(3);
-            if (Character.isLetter(fourth)) {
-                return true;
-            }
+        return destination.startsWith(JmsConstants.QUEUE_PREFIX) || destination.startsWith(JmsConstants.TEMP_QUEUE_PREFIX);
+    }
+
+    /**
+     * Whether the destination name has either topic or temp topic prefix.
+     *
+     * @param destination the destination
+     * @return <tt>true</tt> if topic or temp-topic prefix, <tt>false</tt> otherwise
+     */
+    public static boolean isTopicPrefix(String destination) {
+        if (ObjectHelper.isEmpty(destination)) {
+            return false;
         }
 
-        return false;
+        return destination.startsWith(JmsConstants.TOPIC_PREFIX) || destination.startsWith(JmsConstants.TEMP_TOPIC_PREFIX);
     }
 
     /**
-     * Strategy to test if the given header is valid according to the JMS spec to be set as a property
-     * on the JMS message.
+     * Normalizes the destination name.
      * <p/>
-     * This default implementation will allow:
-     * <ul>
-     *   <li>any primitives and their counter Objects (Integer, Double etc.)</li>
-     *   <li>String and any other literals, Character, CharSequence</li>
-     *   <li>Boolean</li>
-     *   <li>Number</li>
-     *   <li>java.util.Date</li>
-     * </ul>
+     * This ensures the destination name is correct, and we do not create queues as <tt>queue://queue:foo</tt>, which
+     * was intended as <tt>queue://foo</tt>.
      *
-     * @param headerName   the header name
-     * @param headerValue  the header value
-     * @return  the value to use, <tt>null</tt> to ignore this header
+     * @param destination the destination
+     * @return the normalized destination
      */
-    protected static Object getValidJMSHeaderValue(String headerName, Object headerValue) {
-        if (headerValue instanceof String) {
-            return headerValue;
-        } else if (headerValue instanceof BigInteger) {
-            return headerValue.toString();
-        } else if (headerValue instanceof BigDecimal) {
-            return headerValue.toString();
-        } else if (headerValue instanceof Number) {
-            return headerValue;
-        } else if (headerValue instanceof Character) {
-            return headerValue;
-        } else if (headerValue instanceof CharSequence) {
-            return headerValue.toString();
-        } else if (headerValue instanceof Boolean) {
-            return headerValue;
-        } else if (headerValue instanceof Date) {
-            return headerValue.toString();
-        }
-        return null;
+    public static String normalizeDestinationName(String destination) {
+        // do not include prefix which is the current behavior when using this method.
+        return normalizeDestinationName(destination, false);
     }
 
-    @SuppressWarnings("unchecked")
-    public static Exchange setJmsMessageHeaders(final Message jmsMessage, final Exchange exchange, boolean out, KeyFormatStrategy keyFormatStrategy) throws JMSException {
-        Map<String, Object> headers = new HashMap<String, Object>();
-        if (jmsMessage != null) {
-            // lets populate the standard JMS message headers
-            try {
-                headers.put(JMS_CORRELATION_ID, jmsMessage.getJMSCorrelationID());
-                headers.put(JMS_DELIVERY_MODE, jmsMessage.getJMSDeliveryMode());
-                headers.put(JMS_DESTINATION, jmsMessage.getJMSDestination());
-                headers.put(JMS_EXPIRATION, jmsMessage.getJMSExpiration());
-                headers.put(JMS_MESSAGE_ID, jmsMessage.getJMSMessageID());
-                headers.put(JMS_PRIORITY, jmsMessage.getJMSPriority());
-                headers.put(JMS_REDELIVERED, jmsMessage.getJMSRedelivered());
-                headers.put(JMS_TIMESTAMP, jmsMessage.getJMSTimestamp());
-                headers.put(JMS_REPLY_TO, getJMSReplyTo(jmsMessage));
-                headers.put(JMS_TYPE, getJMSType(jmsMessage));
-
-                // this works around a bug in the ActiveMQ property handling
-                headers.put(JMSX_GROUP_ID, jmsMessage.getStringProperty(JMSX_GROUP_ID));
-            } catch (JMSException e) {
-                throw new RuntimeCamelException(e);
+    /**
+     * Normalizes the destination name.
+     * <p/>
+     * This ensures the destination name is correct, and we do not create queues as <tt>queue://queue:foo</tt>, which
+     * was intended as <tt>queue://foo</tt>.
+     *
+     * @param destination the destination
+     * @param includePrefix whether to include <tt>queue://</tt>, or <tt>topic://</tt> prefix in the normalized destination name
+     * @return the normalized destination
+     */
+    public static String normalizeDestinationName(String destination, boolean includePrefix) {
+        if (ObjectHelper.isEmpty(destination)) {
+            return destination;
+        }
+        if (destination.startsWith(JmsConstants.QUEUE_PREFIX)) {
+            String s = removeStartingCharacters(destination.substring(JmsConstants.QUEUE_PREFIX.length()), '/');
+            if (includePrefix) {
+                s = JmsConstants.QUEUE_PREFIX + "//" + s;
             }
-
-            for (final Enumeration<String> enumeration = jmsMessage.getPropertyNames(); enumeration.hasMoreElements();) {
-                String key = enumeration.nextElement();
-                if (hasIllegalHeaderKey(key)) {
-                    throw new IllegalHeaderException("Header " + key + " is not a legal JMS header name value");
-                }
-                Object value = jmsMessage.getObjectProperty(key);
-                String decodedName = keyFormatStrategy.decodeKey(key);
-                headers.put(decodedName, value);
+            return s;
+        } else if (destination.startsWith(JmsConstants.TEMP_QUEUE_PREFIX)) {
+            String s = removeStartingCharacters(destination.substring(JmsConstants.TEMP_QUEUE_PREFIX.length()), '/');
+            if (includePrefix) {
+                s = JmsConstants.TEMP_QUEUE_PREFIX + "//" + s;
             }
-        }
-        if (out) {
-            exchange.getOut().setHeaders(headers);
+            return s;
+        } else if (destination.startsWith(JmsConstants.TOPIC_PREFIX)) {
+            String s = removeStartingCharacters(destination.substring(JmsConstants.TOPIC_PREFIX.length()), '/');
+            if (includePrefix) {
+                s = JmsConstants.TOPIC_PREFIX + "//" + s;
+            }
+            return s;
+        } else if (destination.startsWith(JmsConstants.TEMP_TOPIC_PREFIX)) {
+            String s = removeStartingCharacters(destination.substring(JmsConstants.TEMP_TOPIC_PREFIX.length()), '/');
+            if (includePrefix) {
+                s = JmsConstants.TEMP_TOPIC_PREFIX + "//" + s;
+            }
+            return s;
         } else {
-            exchange.getIn().setHeaders(headers);
+            return destination;
         }
-        return exchange;
     }
 
     /**
-     * Strategy to allow filtering of headers which are put on the JMS message
-     * <p/>
-     * <b>Note</b>: Currently only supports sending java identifiers as keys
+     * Sets the JMSReplyTo on the message.
+     *
+     * @param message  the message
+     * @param replyTo  the reply to destination
      */
-    protected static boolean shouldOutputHeader(org.apache.camel.Message camelMessage, String headerName,
-                                               Object headerValue, Exchange exchange, HeaderFilterStrategy headerFilterStrategy) {
-        return headerFilterStrategy == null
-                || !headerFilterStrategy.applyFilterToCamelHeaders(headerName, headerValue, exchange);
+    public static void setJMSReplyTo(Message message, Destination replyTo) {
+        try {
+            message.setJMSReplyTo(replyTo);
+        } catch (Exception e) {
+            // ignore due OracleAQ does not support accessing JMSReplyTo
+        }
+    }
+
+    /**
+     * Gets the JMSReplyTo from the message.
+     *
+     * @param message  the message
+     * @return the reply to, can be <tt>null</tt>
+     */
+    public static Destination getJMSReplyTo(Message message) {
+        try {
+            return message.getJMSReplyTo();
+        } catch (Exception e) {
+            // ignore due OracleAQ does not support accessing JMSReplyTo
+        }
+
+        return null;
     }
 
     /**
      * Gets the JMSType from the message.
      *
-     * @param message the message
+     * @param message  the message
      * @return the type, can be <tt>null</tt>
      */
     public static String getJMSType(Message message) {
@@ -431,6 +295,54 @@ public final class JmsMessageHelper implements JmsConstants {
     }
 
     /**
+     * Gets the String Properties from the message.
+     *
+     * @param message  the message
+     * @return the type, can be <tt>null</tt>
+     */
+    public static String getStringProperty(Message message, String propertyName) {
+        try {
+            return message.getStringProperty(propertyName);
+        } catch (Exception e) {
+            // ignore due some broker client does not support accessing StringProperty
+        }
+
+        return null;
+    }
+
+    /**
+     * Gets the JMSRedelivered from the message.
+     *
+     * @param message  the message
+     * @return <tt>true</tt> if redelivered, <tt>false</tt> if not, <tt>null</tt> if not able to determine
+     */
+    public static Boolean getJMSRedelivered(Message message) {
+        try {
+            return message.getJMSRedelivered();
+        } catch (Exception e) {
+            // ignore if JMS broker do not support this
+        }
+
+        return null;
+    }
+
+    /**
+     * Gets the JMSMessageID from the message.
+     *
+     * @param message  the message
+     * @return the JMSMessageID, or <tt>null</tt> if not able to get
+     */
+    public static String getJMSMessageID(Message message) {
+        try {
+            return message.getJMSMessageID();
+        } catch (Exception e) {
+            // ignore if JMS broker do not support this
+        }
+
+        return null;
+    }
+
+    /**
      * Sets the JMSDeliveryMode on the message.
      *
      * @param exchange the exchange
@@ -471,219 +383,19 @@ public final class JmsMessageHelper implements JmsConstants {
     }
 
     /**
-     * Sets the correlation id on the JMS message.
-     * <p/>
-     * Will ignore exception thrown
-     *
-     * @param message the JMS message
-     * @param type    the correlation id
-     */
-    public static void setMessageType(Message message, String type) {
-        try {
-            message.setJMSType(type);
-        } catch (JMSException e) {
-            LOG.debug("Error setting the message type: {}", type, e);
-        }
-    }
-
-    /**
-     * Sets the correlation id on the JMS message.
-     * <p/>
-     * Will ignore exception thrown
-     *
-     * @param message       the JMS message
-     * @param correlationId the correlation id
-     */
-    public static void setCorrelationId(Message message, String correlationId) {
-        try {
-            message.setJMSCorrelationID(correlationId);
-        } catch (JMSException e) {
-            LOG.debug("Error setting the correlationId: {}", correlationId, e);
-        }
-    }
-
-    /**
-     * Sets the JMSReplyTo on the message.
+     * Gets the JMSCorrelationIDAsBytes from the message.
      *
      * @param message the message
-     * @param replyTo the reply to destination
+     * @return the JMSCorrelationIDAsBytes, or <tt>null</tt> if not able to get
      */
-    public static void setJMSReplyTo(Message message, Destination replyTo) {
+    public static String getJMSCorrelationIDAsBytes(Message message) {
         try {
-            message.setJMSReplyTo(replyTo);
+            return new String(message.getJMSCorrelationIDAsBytes());
         } catch (Exception e) {
-            LOG.debug("Error setting the correlationId: {}", replyTo.toString());
+            // ignore if JMS broker do not support this
         }
-    }
 
-    /**
-     * Gets the JMSReplyTo from the message.
-     *
-     * @param message the message
-     * @return the reply to, can be <tt>null</tt>
-     */
-    public static Destination getJMSReplyTo(Message message) {
-        try {
-            return message.getJMSReplyTo();
-        } catch (Exception e) {
-            // ignore due OracleAQ does not support accessing JMSReplyTo
-        }
         return null;
     }
 
-    /**
-     * Sets the property on the given JMS message.
-     *
-     * @param jmsMessage the JMS message
-     * @param name       name of the property to set
-     * @param value      the value
-     * @throws JMSException can be thrown
-     */
-    public static void setProperty(Message jmsMessage, String name, Object value) throws JMSException {
-        if (value == null) {
-            jmsMessage.setObjectProperty(name, null);
-        } else if (value instanceof Byte) {
-            jmsMessage.setByteProperty(name, (Byte) value);
-        } else if (value instanceof Boolean) {
-            jmsMessage.setBooleanProperty(name, (Boolean) value);
-        } else if (value instanceof Double) {
-            jmsMessage.setDoubleProperty(name, (Double) value);
-        } else if (value instanceof Float) {
-            jmsMessage.setFloatProperty(name, (Float) value);
-        } else if (value instanceof Integer) {
-            jmsMessage.setIntProperty(name, (Integer) value);
-        } else if (value instanceof Long) {
-            jmsMessage.setLongProperty(name, (Long) value);
-        } else if (value instanceof Short) {
-            jmsMessage.setShortProperty(name, (Short) value);
-        } else if (value instanceof String) {
-            jmsMessage.setStringProperty(name, (String) value);
-        } else {
-            // fallback to Object
-            jmsMessage.setObjectProperty(name, value);
-        }
-    }
-
-    public static JmsMessageType discoverMessageTypeFromPayload(final Object payload) {
-        JmsMessageType answer;
-        // Default is a JMS Message since a body is not required
-        if (payload == null) {
-            answer = JmsMessageType.Message;
-        } else {
-            // Something was found in the body so determine
-            // what type of message we need to create
-            if (byte[].class.isInstance(payload)) {
-                answer = JmsMessageType.Bytes;
-            } else if (Map.class.isInstance(payload)) {
-                answer = JmsMessageType.Map;
-            } else if (Collection.class.isInstance(payload)) {
-                answer = JmsMessageType.Stream;
-            } else if (InputStream.class.isInstance(payload)) {
-                answer = JmsMessageType.Bytes;
-            } else if (ByteBuffer.class.isInstance(payload)) {
-                answer = JmsMessageType.Bytes;
-            } else if (File.class.isInstance(payload)) {
-                answer = JmsMessageType.Bytes;
-            } else if (Reader.class.isInstance(payload)) {
-                answer = JmsMessageType.Text;
-            } else if (String.class.isInstance(payload)) {
-                answer = JmsMessageType.Text;
-            } else if (CharBuffer.class.isInstance(payload)) {
-                answer = JmsMessageType.Text;
-            } else if (char[].class.isInstance(payload)) {
-                answer = JmsMessageType.Text;
-            } else if (Character.class.isInstance(payload)) {
-                answer = JmsMessageType.Text;
-            } else if (Serializable.class.isInstance(payload)) {
-                answer = JmsMessageType.Object;
-            } else {
-                answer = JmsMessageType.Message;
-            }
-        }
-        return answer;
-    }
-
-    public static JmsMessageType discoverJmsMessageType(Message message) {
-        JmsMessageType answer;
-        if (message != null) {
-            if (BytesMessage.class.isInstance(message)) {
-                answer = JmsMessageType.Bytes;
-            } else if (MapMessage.class.isInstance(message)) {
-                answer = JmsMessageType.Map;
-            } else if (TextMessage.class.isInstance(message)) {
-                answer = JmsMessageType.Text;
-            } else if (StreamMessage.class.isInstance(message)) {
-                answer = JmsMessageType.Stream;
-            } else if (ObjectMessage.class.isInstance(message)) {
-                answer = JmsMessageType.Object;
-            } else {
-                answer = JmsMessageType.Message;
-            }
-        } else {
-            answer = JmsMessageType.Message;
-        }
-        return answer;
-    }
-
-    private static boolean hasIllegalHeaderKey(String key) {
-        return key == null || key.isEmpty() || key.contains(".") || key.contains("-");
-    }
-
-    /**
-     * Normalizes the destination name.
-     * <p/>
-     * This ensures the destination name is correct, and we do not create queues as <tt>queue://queue:foo</tt>, which
-     * was intended as <tt>queue://foo</tt>.
-     *
-     * @param destination the destination
-     * @return the normalized destination
-     */
-    public static String normalizeDestinationName(String destination) {
-        // do not include prefix which is the current behavior when using this method.
-        return normalizeDestinationName(destination, false);
-    }
-
-    /**
-     * Normalizes the destination name.
-     * <p/>
-     * This ensures the destination name is correct, and we do not create queues as <tt>queue://queue:foo</tt>, which
-     * was intended as <tt>queue://foo</tt>.
-     *
-     * @param destination the destination
-     * @param includePrefix whether to include <tt>queue://</tt>, or <tt>topic://</tt> prefix in the normalized destination name
-     * @return the normalized destination
-     */
-    public static String normalizeDestinationName(String destination, boolean includePrefix) {
-        if (ObjectHelper.isEmpty(destination)) {
-            return destination;
-        }
-        if (destination.startsWith(QUEUE_PREFIX)) {
-            String s = removeStartingCharacters(destination.substring(QUEUE_PREFIX.length()), '/');
-            if (includePrefix) {
-                s = QUEUE_PREFIX + "//" + s;
-            }
-            return s;
-        } else if (destination.startsWith(TEMP_QUEUE_PREFIX)) {
-            String s = removeStartingCharacters(destination.substring(TEMP_QUEUE_PREFIX.length()), '/');
-            if (includePrefix) {
-                s = TEMP_QUEUE_PREFIX + "//" + s;
-            }
-            return s;
-        } else if (destination.startsWith(TOPIC_PREFIX)) {
-            String s = removeStartingCharacters(destination.substring(TOPIC_PREFIX.length()), '/');
-            if (includePrefix) {
-                s = TOPIC_PREFIX + "//" + s;
-            }
-            return s;
-        } else if (destination.startsWith(TEMP_TOPIC_PREFIX)) {
-            String s = removeStartingCharacters(destination.substring(TEMP_TOPIC_PREFIX.length()), '/');
-            if (includePrefix) {
-                s = TEMP_TOPIC_PREFIX + "//" + s;
-            }
-            return s;
-        } else {
-            return destination;
-        }
-    }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/d19e5d74/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/KeyFormatStrategy.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/KeyFormatStrategy.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/KeyFormatStrategy.java
deleted file mode 100644
index 3b7566f..0000000
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/KeyFormatStrategy.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.sjms.jms;
-
-/**
- * Strategy for applying encoding and decoding of JMS headers so they apply to
- * the JMS spec.
- */
-public interface KeyFormatStrategy {
-
-    /**
-     * Encodes the key before its sent as a {@link javax.jms.Message} message.
-     *
-     * @param key the original key
-     * @return the encoded key
-     */
-    String encodeKey(String key);
-
-    /**
-     * Decodes the key after its received from a {@link javax.jms.Message}
-     * message.
-     *
-     * @param key the encoded key
-     * @return the decoded key as the original key
-     */
-    String decodeKey(String key);
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/d19e5d74/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/MessageCreatedStrategy.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/MessageCreatedStrategy.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/MessageCreatedStrategy.java
new file mode 100644
index 0000000..894ef61
--- /dev/null
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/MessageCreatedStrategy.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.jms;
+
+import javax.jms.Message;
+import javax.jms.Session;
+
+import org.apache.camel.Exchange;
+
+/**
+ * A strategy that allows custom components to plugin and perform custom logic when Camel creates {@link javax.jms.Message} instance.
+ * <p/>
+ * For example to populate the message with custom information that are component specific and not part of the JMS specification.
+ */
+public interface MessageCreatedStrategy {
+
+    /**
+     * Callback when the JMS message has <i>just</i> been created, which allows custom modifications afterwards.
+     *
+     * @param exchange the current exchange
+     * @param session the JMS session used to create the message
+     * @param cause optional exception occurred that should be sent as reply instead of a regular body
+     */
+    void onMessageCreated(Message message, Session session, Exchange exchange, Throwable cause);
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/d19e5d74/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
index 3fa23b0..93f8648 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
@@ -32,7 +32,6 @@ import org.apache.camel.component.sjms.BatchMessage;
 import org.apache.camel.component.sjms.MessageProducerResources;
 import org.apache.camel.component.sjms.SjmsProducer;
 import org.apache.camel.component.sjms.TransactionCommitStrategy;
-import org.apache.camel.component.sjms.jms.JmsMessageHelper;
 import org.apache.camel.component.sjms.jms.JmsObjectFactory;
 import org.apache.camel.component.sjms.tx.DefaultTransactionCommitStrategy;
 import org.apache.camel.component.sjms.tx.SessionTransactionSynchronization;
@@ -46,11 +45,6 @@ public class InOnlyProducer extends SjmsProducer {
         super(endpoint);
     }
 
-    /*
-     * @see org.apache.camel.component.sjms.SjmsProducer#doCreateProducerModel()
-     * @return
-     * @throws Exception
-     */
     @Override
     public MessageProducerResources doCreateProducerModel() throws Exception {
         MessageProducerResources answer;
@@ -75,13 +69,6 @@ public class InOnlyProducer extends SjmsProducer {
         return answer;
     }
 
-    /*
-     * @see
-     * org.apache.camel.component.sjms.SjmsProducer#sendMessage(org.apache.camel.Exchange, org.apache.camel.AsyncCallback)
-     * @param exchange
-     * @param callback
-     * @throws Exception
-     */
     @Override
     public void sendMessage(final Exchange exchange, final AsyncCallback callback, final MessageProducerResources producer) throws Exception {
         try {
@@ -93,19 +80,18 @@ public class InOnlyProducer extends SjmsProducer {
                         Message message;
                         if (BatchMessage.class.isInstance(object)) {
                             BatchMessage<?> batchMessage = (BatchMessage<?>) object;
-                            message = JmsMessageHelper.createMessage(exchange, producer.getSession(), batchMessage.getPayload(), batchMessage.getHeaders(), getEndpoint());
+                            message = getEndpoint().getBinding().makeJmsMessage(exchange, batchMessage.getPayload(), batchMessage.getHeaders(), producer.getSession(), null);
                         } else {
-                            message = JmsMessageHelper.createMessage(exchange, producer.getSession(), object, exchange.getIn().getHeaders(), getEndpoint());
+                            message = getEndpoint().getBinding().makeJmsMessage(exchange, object, exchange.getIn().getHeaders(), producer.getSession(), null);
                         }
                         messages.add(message);
                     }
                 } else {
-                    Object payload = exchange.getIn().getBody();
-                    Message message = JmsMessageHelper.createMessage(exchange, producer.getSession(), payload, exchange.getIn().getHeaders(), getEndpoint());
+                    Message message = getEndpoint().getBinding().makeJmsMessage(exchange, producer.getSession());
                     messages.add(message);
                 }
             } else {
-                Message message = JmsMessageHelper.createMessage(exchange, producer.getSession(), null, exchange.getIn().getHeaders(), getEndpoint());
+                Message message = getEndpoint().getBinding().makeJmsMessage(exchange, producer.getSession());
                 messages.add(message);
             }
 
@@ -124,4 +110,5 @@ public class InOnlyProducer extends SjmsProducer {
             callback.done(isSynchronous());
         }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/d19e5d74/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
index 039daf0..1c535b6 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
@@ -36,11 +36,14 @@ import org.apache.camel.Exchange;
 import org.apache.camel.component.sjms.MessageConsumerResources;
 import org.apache.camel.component.sjms.MessageProducerResources;
 import org.apache.camel.component.sjms.SjmsEndpoint;
+import org.apache.camel.component.sjms.SjmsMessage;
 import org.apache.camel.component.sjms.SjmsProducer;
 import org.apache.camel.component.sjms.jms.JmsConstants;
 import org.apache.camel.component.sjms.jms.JmsMessageHelper;
 import org.apache.camel.component.sjms.jms.JmsObjectFactory;
 import org.apache.camel.component.sjms.tx.SessionTransactionSynchronization;
+import org.apache.camel.spi.UuidGenerator;
+import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.commons.pool.BasePoolableObjectFactory;
 import org.apache.commons.pool.impl.GenericObjectPool;
@@ -52,6 +55,17 @@ public class InOutProducer extends SjmsProducer {
 
     private static final Map<String, Exchanger<Object>> EXCHANGERS = new ConcurrentHashMap<String, Exchanger<Object>>();
 
+    private static final String GENERATED_CORRELATION_ID_PREFIX = "Camel-";
+    private UuidGenerator uuidGenerator;
+
+    public UuidGenerator getUuidGenerator() {
+        return uuidGenerator;
+    }
+
+    public void setUuidGenerator(UuidGenerator uuidGenerator) {
+        this.uuidGenerator = uuidGenerator;
+    }
+
     /**
      * A pool of {@link MessageConsumerResources} objects that are the reply
      * consumers.
@@ -134,6 +148,10 @@ public class InOutProducer extends SjmsProducer {
         } else {
             log.debug("Using {} as the reply to destination.", getNamedReplyTo());
         }
+        if (uuidGenerator == null) {
+            // use the generator configured on the camel context
+            uuidGenerator = getEndpoint().getCamelContext().getUuidGenerator();
+        }
         if (getConsumers() == null) {
             setConsumers(new GenericObjectPool<MessageConsumerResources>(new MessageConsumerResourcesFactory()));
             getConsumers().setMaxActive(getConsumerCount());
@@ -185,16 +203,14 @@ public class InOutProducer extends SjmsProducer {
             exchange.getUnitOfWork().addSynchronization(new SessionTransactionSynchronization(producer.getSession(), getCommitStrategy()));
         }
 
-        Message request = JmsMessageHelper.createMessage(exchange, producer.getSession(), getEndpoint());
+        Message request = getEndpoint().getBinding().makeJmsMessage(exchange, producer.getSession());
 
-        // TODO just set the correlation id don't get it from the
-        // message
-        String correlationId;
-        if (exchange.getIn().getHeader(JmsConstants.JMS_CORRELATION_ID, String.class) == null) {
-            correlationId = UUID.randomUUID().toString().replace("-", "");
-        } else {
-            correlationId = exchange.getIn().getHeader(JmsConstants.JMS_CORRELATION_ID, String.class);
+        String correlationId = exchange.getIn().getHeader(JmsConstants.JMS_CORRELATION_ID, String.class);
+        if (correlationId == null) {
+            // we append the 'Camel-' prefix to know it was generated by us
+            correlationId = GENERATED_CORRELATION_ID_PREFIX + getUuidGenerator().generateUuid();
         }
+
         Object responseObject = null;
         Exchanger<Object> messageExchanger = new Exchanger<Object>();
         JmsMessageHelper.setCorrelationId(request, correlationId);
@@ -229,8 +245,13 @@ public class InOutProducer extends SjmsProducer {
             if (responseObject instanceof Throwable) {
                 exchange.setException((Throwable) responseObject);
             } else if (responseObject instanceof Message) {
-                Message response = (Message) responseObject;
-                JmsMessageHelper.populateExchange(response, exchange, true, getEndpoint().getJmsKeyFormatStrategy());
+                Message message = (Message) responseObject;
+
+                SjmsMessage response = new SjmsMessage(message, consumer.getSession(), getEndpoint().getBinding());
+                // the JmsBinding is designed to be "pull-based": it will populate the Camel message on demand
+                // therefore, we link Exchange and OUT message before continuing, so that the JmsBinding has full access
+                // to everything it may need, and can populate headers, properties, etc. accordingly (solves CAMEL-6218).
+                exchange.setOut(response);
             } else {
                 exchange.setException(new CamelException("Unknown response type: " + responseObject));
             }

http://git-wip-us.apache.org/repos/asf/camel/blob/d19e5d74/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/typeconversion/JMSMessageHelperTypeConversionTest.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/typeconversion/JMSMessageHelperTypeConversionTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/typeconversion/JMSMessageHelperTypeConversionTest.java
deleted file mode 100644
index d389886..0000000
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/typeconversion/JMSMessageHelperTypeConversionTest.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.sjms.typeconversion;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.Reader;
-import java.io.StringReader;
-import java.nio.ByteBuffer;
-import java.nio.CharBuffer;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.sjms.support.JmsTestSupport;
-import org.junit.Ignore;
-import org.junit.Test;
-
-public class JMSMessageHelperTypeConversionTest extends JmsTestSupport {
-
-    private static final String SJMS_QUEUE_URI = "sjms:queue:start";
-    private static final String MOCK_RESULT_URI = "mock:result";
-    private Exchange message;
-
-    @Test
-    public void testJMSMessageHelperString() throws Exception {
-        getMockEndpoint(MOCK_RESULT_URI).expectedBodiesReceived("Hello Camel");
-
-        template.sendBody(SJMS_QUEUE_URI, "Hello Camel");
-        assertMockEndpointsSatisfied();
-        assertTrue(String.class.isInstance(message.getIn().getBody()));
-    }
-
-    @Test
-    public void testJMSMessageHelperMap() throws Exception {
-        getMockEndpoint(MOCK_RESULT_URI).expectedMessageCount(1);
-
-        Map<Object, Object> map = new HashMap<Object, Object>();
-        map.put("Hello", "Camel");
-        map.put("Int", Integer.MAX_VALUE);
-        map.put("Boolean", Boolean.TRUE);
-        map.put(Boolean.TRUE, Long.MAX_VALUE);
-
-        template.sendBody(SJMS_QUEUE_URI, map);
-        assertMockEndpointsSatisfied();
-        assertTrue(Map.class.isInstance(message.getIn().getBody()));
-        assertEquals("Camel", message.getIn().getBody(Map.class).get("Hello"));
-        assertEquals(Integer.MAX_VALUE, message.getIn().getBody(Map.class).get("Int"));
-        assertEquals(Boolean.TRUE, message.getIn().getBody(Map.class).get("Boolean"));
-        assertEquals(Long.MAX_VALUE, message.getIn().getBody(Map.class).get("true"));
-    }
-
-    @Ignore
-    @Test
-    public void testJMSMessageHelperCollection() throws Exception {
-        // TODO: Once SJMS can accept a Collection as Body
-    }
-
-    @Test
-    public void testJMSMessageHelperByteArray() throws Exception {
-        getMockEndpoint(MOCK_RESULT_URI).expectedBodiesReceived("Hello Camel".getBytes());
-
-        byte[] bytes = "Hello Camel".getBytes();
-        template.sendBody(SJMS_QUEUE_URI, bytes);
-        assertMockEndpointsSatisfied();
-        assertTrue(byte[].class.isInstance(message.getIn().getBody()));
-    }
-
-    @Test
-    public void testJMSMessageHelperInputStream() throws Exception {
-        getMockEndpoint(MOCK_RESULT_URI).expectedBodiesReceived("Hello Camel".getBytes());
-        String p = "Hello Camel";
-        InputStream is = new ByteArrayInputStream(p.getBytes());
-        template.sendBody(SJMS_QUEUE_URI, is);
-        assertMockEndpointsSatisfied();
-        assertTrue(byte[].class.isInstance(message.getIn().getBody()));
-    }
-    
-    @Test
-    public void testJMSMessageHelperCharBuffer() throws Exception {
-        getMockEndpoint(MOCK_RESULT_URI).expectedBodiesReceived("Hello Camel");
-        CharBuffer cb = CharBuffer.wrap("Hello Camel");
-        template.sendBody(SJMS_QUEUE_URI, cb);
-        assertMockEndpointsSatisfied();
-        assertTrue(String.class.isInstance(message.getIn().getBody()));
-    }
-
-    @Test
-    public void testJMSMessageHelperByteBuffer() throws Exception {
-        getMockEndpoint(MOCK_RESULT_URI).expectedBodiesReceived("Hello Camel".getBytes());
-        String p = "Hello Camel";
-        ByteBuffer bb = ByteBuffer.wrap(p.getBytes());
-        template.sendBody(SJMS_QUEUE_URI, bb);
-        assertMockEndpointsSatisfied();
-        assertTrue(byte[].class.isInstance(message.getIn().getBody()));
-    }
-    
-    @Test
-    public void testJMSMessageHelperFile() throws InterruptedException, IOException {
-        getMockEndpoint(MOCK_RESULT_URI).expectedBodiesReceived("Hello Camel".getBytes());
-        String p = "Hello Camel";
-        File f = File.createTempFile("tmp-test", ".txt");
-        BufferedWriter bw = new BufferedWriter(new FileWriter(f));
-        bw.write(p);
-        bw.close();
-        template.sendBody(SJMS_QUEUE_URI, f);
-        assertMockEndpointsSatisfied();
-        boolean resultDelete = f.delete();
-        assertTrue(resultDelete);
-        assertTrue(byte[].class.isInstance(message.getIn().getBody()));
-    }
-    
-    @Test
-    public void testJMSMessageHelperReader() throws InterruptedException, IOException {
-        getMockEndpoint(MOCK_RESULT_URI).expectedBodiesReceived("Hello Camel");
-        String p = "Hello Camel";
-        File f = File.createTempFile("tmp-test", ".txt");
-        BufferedWriter bw = new BufferedWriter(new FileWriter(f));
-        bw.write(p);
-        bw.close();
-        Reader test = new BufferedReader(new FileReader(f.getAbsolutePath()));
-        template.sendBody(SJMS_QUEUE_URI, test);
-        assertMockEndpointsSatisfied();
-        assertTrue(f.delete());
-        assertTrue(String.class.isInstance(message.getIn().getBody()));
-    }
-    
-    @Test
-    public void testJMSMessageHelperStringReader() throws InterruptedException, FileNotFoundException {
-        getMockEndpoint(MOCK_RESULT_URI).expectedBodiesReceived("Hello Camel");
-        String p = "Hello Camel";
-        StringReader test = new StringReader(p);
-        template.sendBody(SJMS_QUEUE_URI, test);
-        assertMockEndpointsSatisfied();
-        assertTrue(String.class.isInstance(message.getIn().getBody()));
-    }
-    
-    @Test
-    public void testJMSMessageHelperChar() throws InterruptedException, FileNotFoundException {
-        getMockEndpoint(MOCK_RESULT_URI).expectedBodiesReceived("H");
-        char p = 'H';
-        template.sendBody(SJMS_QUEUE_URI, p);
-        assertMockEndpointsSatisfied();
-        assertTrue(String.class.isInstance(message.getIn().getBody()));
-    }
-    
-    @Test
-    public void testJMSMessageHelperCharacter() throws InterruptedException, FileNotFoundException {
-        getMockEndpoint(MOCK_RESULT_URI).expectedBodiesReceived("H");
-        Character p = 'H';
-        template.sendBody(SJMS_QUEUE_URI, p);
-        assertMockEndpointsSatisfied();
-        assertTrue(String.class.isInstance(message.getIn().getBody()));
-    }
-    
-    @Test
-    public void testJMSMessageHelperCharArray() throws InterruptedException, FileNotFoundException {
-        getMockEndpoint(MOCK_RESULT_URI).expectedBodiesReceived("Hello Camel");
-        char[] p = {'H', 'e', 'l', 'l', 'o', ' ', 'C', 'a', 'm', 'e', 'l'};
-        template.sendBody(SJMS_QUEUE_URI, p);
-        assertMockEndpointsSatisfied();
-        assertTrue(String.class.isInstance(message.getIn().getBody()));
-    }
-    
-    protected RouteBuilder createRouteBuilder() throws Exception {
-        return new RouteBuilder() {
-            public void configure() throws Exception {
-                interceptSendToEndpoint(MOCK_RESULT_URI).process(new Processor() {
-                    public void process(Exchange exchange) throws Exception {
-                        message = exchange;
-                    }
-                });
-                
-                from(SJMS_QUEUE_URI).to(MOCK_RESULT_URI);
-            }
-        };
-    }
-}