You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2012/08/30 05:42:14 UTC

svn commit: r1378796 [2/3] - in /camel/trunk/components/camel-sjms: ./ src/main/java/org/apache/camel/component/sjms/ src/main/java/org/apache/camel/component/sjms/consumer/ src/main/java/org/apache/camel/component/sjms/jms/ src/main/java/org/apache/ca...

Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java Thu Aug 30 03:42:12 2012
@@ -18,11 +18,8 @@ package org.apache.camel.component.sjms.
 
 import java.io.Serializable;
 import java.util.Collection;
-import java.util.Enumeration;
 import java.util.HashMap;
-import java.util.LinkedHashMap;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
 
 import javax.jms.BytesMessage;
@@ -35,373 +32,202 @@ import javax.jms.ObjectMessage;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 
-import org.apache.camel.Endpoint;
-import org.apache.camel.Exchange;
-import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.component.sjms.DefaultJmsKeyFormatStrategy;
 import org.apache.camel.component.sjms.IllegalHeaderException;
-import org.apache.camel.impl.DefaultMessage;
-import org.apache.camel.util.ExchangeHelper;
-import org.apache.camel.util.ObjectHelper;
-
+import org.apache.camel.component.sjms.KeyFormatStrategy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.camel.component.sjms.SjmsConstants.JMS_MESSAGE_TYPE;
-import static org.apache.camel.component.sjms.SjmsConstants.QUEUE_PREFIX;
-import static org.apache.camel.component.sjms.SjmsConstants.TOPIC_PREFIX;
-import static org.apache.camel.util.ObjectHelper.removeStartingCharacters;
-
 /**
  * Utility class for {@link javax.jms.Message}.
- *
- * @version 
+ * 
+ * @author sully6768
  */
 public final class JmsMessageHelper {
-    
-    private static final Logger LOGGER = LoggerFactory.getLogger(JmsMessageHelper.class);
-
-    private JmsMessageHelper() {
-    }
-    
-    public static Exchange createExchange(Message message, Endpoint endpoint) {
-        Exchange exchange = endpoint.createExchange();
-        return populateExchange(message, exchange, false);
-    }
-    
-    @SuppressWarnings("unchecked")
-    public static Exchange populateExchange(Message message, Exchange exchange, boolean out) {
-        try {
-            JmsMessageHelper.setJmsMessageHeaders(message, exchange, out);
-            if (message != null) {
-                // convert to JMS Message of the given type
-
-                DefaultMessage bodyMessage = null;
-                if (out) {
-                    bodyMessage = (DefaultMessage) exchange.getOut();
-                } else {
-                    bodyMessage = (DefaultMessage) exchange.getIn();
-                }
-                switch (JmsMessageHelper.discoverType(message)) {
-                case Bytes:
-                    BytesMessage bytesMessage = (BytesMessage) message;
-                    if (bytesMessage.getBodyLength() > Integer.MAX_VALUE) {
-                        LOGGER.warn("Length of BytesMessage is too long: {}", bytesMessage.getBodyLength());
-                        return null;
-                    }
-                    byte[] result = new byte[(int) bytesMessage.getBodyLength()];
-                    bytesMessage.readBytes(result);
-                    bodyMessage.setHeader(JMS_MESSAGE_TYPE, JmsMessageType.Bytes);
-                    bodyMessage.setBody(result);
-                    break;
-                case Map:
-                    HashMap<String, Object> body = new HashMap<String, Object>();
-                    MapMessage mapMessage = (MapMessage) message;
-                    Enumeration<String> names = mapMessage.getMapNames();
-                    while (names.hasMoreElements()) {
-                        String key = names.nextElement();
-                        Object value = mapMessage.getObject(key);
-                        body.put(key, value);
-                    }
-                    bodyMessage.setHeader(JMS_MESSAGE_TYPE, JmsMessageType.Map);
-                    bodyMessage.setBody(body);
-                    break;
-                case Object:
-                    ObjectMessage objMsg = (ObjectMessage) message;
-                    bodyMessage.setHeader(JMS_MESSAGE_TYPE, JmsMessageType.Object);
-                    bodyMessage.setBody(objMsg.getObject());
-                    break;
-                case Text:
-                    TextMessage textMsg = (TextMessage) message;
-                    bodyMessage.setHeader(JMS_MESSAGE_TYPE, JmsMessageType.Text);
-                    bodyMessage.setBody(textMsg.getText());
-                    break;
-                case Message:
-                default:
-                    // Do nothing. Only set the headers for an empty message
-                    bodyMessage.setBody(message);
-                    break;
-                }
-            }
-        } catch (Exception e) {
-            exchange.setException(e);
-        }
-        return exchange;
-    }
 
     /**
-     * Removes the property from the JMS message.
-     *
-     * @param jmsMessage the JMS message
-     * @param name       name of the property to remove
-     * @return the old value of the property or <tt>null</tt> if not exists
-     * @throws JMSException can be thrown
+     * Set by the publishing Client
      */
-    public static Object removeJmsProperty(Message jmsMessage, String name) throws JMSException {
-        // check if the property exists
-        if (!jmsMessage.propertyExists(name)) {
-            return null;
-        }
-
-        Object answer = null;
-
-        // store the properties we want to keep in a temporary map
-        // as the JMS API is a bit strict as we are not allowed to
-        // clear a single property, but must clear them all and redo
-        // the properties
-        Map<String, Object> map = new LinkedHashMap<String, Object>();
-        Enumeration<?> en = jmsMessage.getPropertyNames();
-        while (en.hasMoreElements()) {
-            String key = (String) en.nextElement();
-            if (name.equals(key)) {
-                answer = key;
-            } else {
-                map.put(key, jmsMessage.getObjectProperty(key));
-            }
-        }
-
-        // redo the properties to keep
-        jmsMessage.clearProperties();
-        for (Entry<String, Object> entry : map.entrySet()) {
-            jmsMessage.setObjectProperty(entry.getKey(), entry.getValue());
-        }
-
-        return answer;
-    }
-
+    public static final String JMS_CORRELATION_ID = "JMSCorrelationID";
     /**
-     * Tests whether a given property with the name exists
-     *
-     * @param jmsMessage the JMS message
-     * @param name       name of the property to test if exists
-     * @return <tt>true</tt> if the property exists, <tt>false</tt> if not.
-     * @throws JMSException can be thrown
+     * Set on the send or publish event
      */
-    public static boolean hasProperty(Message jmsMessage, String name) throws JMSException {
-        Enumeration<?> en = jmsMessage.getPropertyNames();
-        while (en.hasMoreElements()) {
-            String key = (String) en.nextElement();
-            if (name.equals(key)) {
-                return true;
-            }
-        }
-        return false;
-    }
-
+    public static final String JMS_DELIVERY_MODE = "JMSDeliveryMode";
     /**
-     * Sets the property on the given JMS message.
-     *
-     * @param jmsMessage  the JMS message
-     * @param name        name of the property to set
-     * @param value       the value
-     * @throws JMSException can be thrown
+     * Set on the send or publish event
      */
-    public static void setProperty(Message jmsMessage, String name, Object value) throws JMSException {
-        if (value == null) {
-            return;
-        }
-        if (value instanceof Byte) {
-            jmsMessage.setByteProperty(name, (Byte) value);
-        } else if (value instanceof Boolean) {
-            jmsMessage.setBooleanProperty(name, (Boolean) value);
-        } else if (value instanceof Double) {
-            jmsMessage.setDoubleProperty(name, (Double) value);
-        } else if (value instanceof Float) {
-            jmsMessage.setFloatProperty(name, (Float) value);
-        } else if (value instanceof Integer) {
-            jmsMessage.setIntProperty(name, (Integer) value);
-        } else if (value instanceof Long) {
-            jmsMessage.setLongProperty(name, (Long) value);
-        } else if (value instanceof Short) {
-            jmsMessage.setShortProperty(name, (Short) value);
-        } else if (value instanceof String) {
-            jmsMessage.setStringProperty(name, (String) value);
-        } else {
-            // fallback to Object
-            jmsMessage.setObjectProperty(name, value);
-        }
-    }
-
+    public static final String JMS_DESTINATION = "JMSDestination";
     /**
-     * Sets the correlation id on the JMS message.
-     * <p/>
-     * Will ignore exception thrown
-     *
-     * @param message  the JMS message
-     * @param correlationId the correlation id
+     * Set on the send or publish event
      */
-    public static void setCorrelationId(Message message, String correlationId) {
-        try {
-            message.setJMSCorrelationID(correlationId);
-        } catch (JMSException e) {
-            if (LOGGER.isDebugEnabled()) {
-                LOGGER.debug("Error setting the correlationId: {}", correlationId);
-            }
-        }
-    }
-
+    public static final String JMS_EXPIRATION = "JMSExpiration";
     /**
-     * Normalizes the destination name, by removing any leading queue or topic prefixes.
-     *
-     * @param destination the destination
-     * @return the normalized destination
-     */
-    public static String normalizeDestinationName(String destination) {
-        if (ObjectHelper.isEmpty(destination)) {
-            return destination;
-        }
-        if (destination.startsWith(QUEUE_PREFIX)) {
-            return removeStartingCharacters(destination.substring(QUEUE_PREFIX.length()), '/');
-        } else if (destination.startsWith(TOPIC_PREFIX)) {
-            return removeStartingCharacters(destination.substring(TOPIC_PREFIX.length()), '/');
-        } else {
-            return destination;
-        }
-    }
-
+     * Set on the send or publish event
+     */
+    public static final String JMS_MESSAGE_ID = "JMSMessageID";
     /**
-     * Sets the JMSReplyTo on the message.
-     *
-     * @param message  the message
-     * @param replyTo  the reply to destination
+     * Set on the send or publish event
      */
-    public static void setJMSReplyTo(Message message, Destination replyTo) {
-        try {
-            message.setJMSReplyTo(replyTo);
-        } catch (Exception e) {
-            if (LOGGER.isDebugEnabled()) {
-                LOGGER.debug("Error setting the correlationId: {}", replyTo.toString());
-            }
-        }
-    }
-
+    public static final String JMS_PRIORITY = "JMSPriority";
     /**
-     * Gets the JMSReplyTo from the message.
-     *
-     * @param message  the message
-     * @return the reply to, can be <tt>null</tt>
+     * A redelivery flag set by the JMS provider
      */
-    public static Destination getJMSReplyTo(Message message) {
-        try {
-            return message.getJMSReplyTo();
-        } catch (Exception e) {
-            // ignore due OracleAQ does not support accessing JMSReplyTo
-        }
-
-        return null;
-    }
-
+    public static final String JMS_REDELIVERED = "JMSTimestamp";
     /**
-     * Gets the JMSType from the message.
-     *
-     * @param message  the message
-     * @return the type, can be <tt>null</tt>
+     * The JMS Reply To {@link Destination} set by the publishing Client
      */
-    public static String getJMSType(Message message) {
-        try {
-            return message.getJMSType();
-        } catch (Exception e) {
-            // ignore due OracleAQ does not support accessing JMSType
-        }
+    public static final String JMS_REPLY_TO = "JMSReplyTo";
+    /**
+     * Set on the send or publish event
+     */
+    public static final String JMS_TIMESTAMP = "JMSTimestamp";
+    /**
+     * Set by the publishing Client
+     */
+    public static final String JMS_TYPE = "JMSType";
 
-        return null;
+    private static final Logger LOGGER = LoggerFactory.getLogger(JmsMessageHelper.class);
+
+    private JmsMessageHelper() {
     }
 
-    /**
-     * Gets the JMSRedelivered from the message.
-     *
-     * @param message  the message
-     * @return <tt>true</tt> if redelivered, <tt>false</tt> if not, <tt>null</tt> if not able to determine
-     */
-    public static Boolean getJMSRedelivered(Message message) {
+    @SuppressWarnings("unchecked")
+    public static Message createMessage(Session session, Object payload, Map<String, Object> messageHeaders, KeyFormatStrategy keyFormatStrategy) throws Exception {
+        Message answer = null;
+        JmsMessageType messageType = JmsMessageHelper.discoverPayloadType(payload);
         try {
-            return message.getJMSRedelivered();
+
+            switch (messageType) {
+            case Bytes:
+                BytesMessage bytesMessage = session.createBytesMessage();
+                bytesMessage.writeBytes((byte[])payload);
+                answer = bytesMessage;
+                break;
+            case Map:
+                MapMessage mapMessage = session.createMapMessage();
+                Map<String, Object> objMap = (Map<String, Object>)payload;
+                Set<String> keys = objMap.keySet();
+                for (String key : keys) {
+                    Object value = objMap.get(key);
+                    mapMessage.setObject(key, value);
+                }
+                answer = mapMessage;
+                break;
+            case Object:
+                ObjectMessage objectMessage = session.createObjectMessage();
+                objectMessage.setObject((Serializable)payload);
+                answer = objectMessage;
+                break;
+            case Text:
+                TextMessage textMessage = session.createTextMessage();
+                textMessage.setText((String)payload);
+                answer = textMessage;
+                break;
+            default:
+                break;
+            }
         } catch (Exception e) {
-            // ignore if JMS broker do not support this
+            LOGGER.error("Error creating a message of type: " + messageType.toString());
+            throw e;
         }
-
-        return null;
+        if (messageHeaders != null && !messageHeaders.isEmpty()) {
+            answer = JmsMessageHelper.setJmsMessageHeaders(answer, messageHeaders, keyFormatStrategy);
+        }
+        return answer;
     }
 
     /**
-     * Sets the JMSDeliveryMode on the message.
-     *
-     * @param exchange the exchange
-     * @param message  the message
-     * @param deliveryMode  the delivery mode, either as a String or integer
-     * @throws javax.jms.JMSException is thrown if error setting the delivery mode
+     * Adds or updates the {@link Message} headers. Header names and values are
+     * checked for JMS 1.1 compliance.
+     * 
+     * @param jmsMessage the {@link Message} to add or update the headers on
+     * @param messageHeaders a {@link Map} of String/Object pairs
+     * @param keyFormatStrategy the a {@link KeyFormatStrategy} to used to
+     *            format keys in a JMS 1.1 compliant manner. If null the
+     *            {@link DefaultJmsKeyFormatStrategy} will be used.
+     * @return {@link Message}
+     * @throws Exception a
      */
-    public static void setJMSDeliveryMode(Exchange exchange, Message message, Object deliveryMode) throws JMSException {
-        Integer mode = null;
-
-        if (deliveryMode instanceof String) {
-            String s = (String) deliveryMode;
-            if ("PERSISTENT".equalsIgnoreCase(s)) {
-                mode = DeliveryMode.PERSISTENT;
-            } else if ("NON_PERSISTENT".equalsIgnoreCase(s)) {
-                mode = DeliveryMode.NON_PERSISTENT;
-            } else {
-                // it may be a number in the String so try that
-                Integer value = ExchangeHelper.convertToType(exchange, Integer.class, deliveryMode);
-                if (value != null) {
-                    mode = value;
-                } else {
-                    throw new IllegalArgumentException("Unknown delivery mode with value: " + deliveryMode);
-                }
-            }
+    public static Message setJmsMessageHeaders(final Message jmsMessage, Map<String, Object> messageHeaders, KeyFormatStrategy keyFormatStrategy) throws IllegalHeaderException {
+        // Support for the null keyFormatStrategy
+        KeyFormatStrategy localKeyFormatStrategy = null;
+        if (keyFormatStrategy == null) {
+            localKeyFormatStrategy = new DefaultJmsKeyFormatStrategy();
         } else {
-            // fallback and try to convert to a number
-            Integer value = ExchangeHelper.convertToType(exchange, Integer.class, deliveryMode);
-            if (value != null) {
-                mode = value;
-            }
-        }
-
-        if (mode != null) {
-            message.setJMSDeliveryMode(mode);
-            message.setIntProperty(JmsConstants.JMS_DELIVERY_MODE, mode);
+            localKeyFormatStrategy = keyFormatStrategy;
         }
-    }
-    
 
-    public static Message setJmsMessageHeaders(final Exchange exchange, final Message jmsMessage) throws Exception {
-        Map<String, Object> headers = new HashMap<String, Object>(exchange.getIn().getHeaders());
+        Map<String, Object> headers = new HashMap<String, Object>(messageHeaders);
         Set<String> keys = headers.keySet();
         for (String headerName : keys) {
             Object headerValue = headers.get(headerName);
-            if (headerName.equalsIgnoreCase("JMSCorrelationID")) {
-                jmsMessage.setJMSCorrelationID(ExchangeHelper.convertToType(exchange, String.class, headerValue));
-            } else if (headerName.equalsIgnoreCase("JMSReplyTo") && headerValue != null) {
+
+            if (headerName.equalsIgnoreCase(JMS_CORRELATION_ID)) {
+                if (headerValue == null) {
+                    // Value can be null but we can't cast a null to a String
+                    // so pass null to the setter
+                    setCorrelationId(jmsMessage, null);
+                } else if (headerValue instanceof String) {
+                    setCorrelationId(jmsMessage, (String)headerValue);
+                } else {
+                    throw new IllegalHeaderException("The " + JMS_CORRELATION_ID + " must either be a String or null.  Found: " + headerValue.getClass().getName());
+                }
+            } else if (headerName.equalsIgnoreCase(JMS_REPLY_TO)) {
                 if (headerValue instanceof String) {
-                    // if the value is a String we must normalize it first
-                    headerValue = (String) headerValue;
+                    // FIXME Setting the reply to appears broken. walk back
+                    // through it. If the value is a String we must normalize it
+                    // first
                 } else {
                     // TODO write destination converter
                     // Destination replyTo =
-                    // ExchangeHelper.convertToType(exchange, Destination.class,
+                    // ExchangeHelper.convertToType(exchange,
+                    // Destination.class,
                     // headerValue);
                     // jmsMessage.setJMSReplyTo(replyTo);
                 }
-            } else if (headerName.equalsIgnoreCase("JMSType")) {
-                jmsMessage.setJMSType(ExchangeHelper.convertToType(exchange, String.class, headerValue));
-            } else if (headerName.equalsIgnoreCase("JMSPriority")) {
-                jmsMessage.setJMSPriority(ExchangeHelper.convertToType(exchange, Integer.class, headerValue));
-            } else if (headerName.equalsIgnoreCase("JMSDeliveryMode")) {
-                JmsMessageHelper.setJMSDeliveryMode(exchange, jmsMessage, headerValue);
-            } else if (headerName.equalsIgnoreCase("JMSExpiration")) {
-                jmsMessage.setJMSExpiration(ExchangeHelper.convertToType(exchange, Long.class, headerValue));
+            } else if (headerName.equalsIgnoreCase(JMS_TYPE)) {
+                if (headerValue == null) {
+                    // Value can be null but we can't cast a null to a String
+                    // so pass null to the setter
+                    setMessageType(jmsMessage, null);
+                } else if (headerValue instanceof String) {
+                    // Not null but is a String
+                    setMessageType(jmsMessage, (String)headerValue);
+                } else {
+                    throw new IllegalHeaderException("The " + JMS_TYPE + " must either be a String or null.  Found: " + headerValue.getClass().getName());
+                }
+            } else if (headerName.equalsIgnoreCase(JMS_PRIORITY)) {
+                if (headerValue instanceof Integer) {
+                    try {
+                        jmsMessage.setJMSPriority((Integer)headerValue);
+                    } catch (JMSException e) {
+                        throw new IllegalHeaderException("Failed to set the " + JMS_PRIORITY + " header. Cause: " + e.getLocalizedMessage(), e);
+                    }
+                } else {
+                    throw new IllegalHeaderException("The " + JMS_PRIORITY + " must be a Integer.  Type found: " + headerValue.getClass().getName());
+                }
+            } else if (headerName.equalsIgnoreCase(JMS_DELIVERY_MODE)) {
+                try {
+                    JmsMessageHelper.setJMSDeliveryMode(jmsMessage, headerValue);
+                } catch (JMSException e) {
+                    throw new IllegalHeaderException("Failed to set the " + JMS_DELIVERY_MODE + " header. Cause: " + e.getLocalizedMessage(), e);
+                }
+            } else if (headerName.equalsIgnoreCase(JMS_EXPIRATION)) {
+                if (headerValue instanceof Long) {
+                    try {
+                        jmsMessage.setJMSExpiration((Long)headerValue);
+                    } catch (JMSException e) {
+                        throw new IllegalHeaderException("Failed to set the " + JMS_EXPIRATION + " header. Cause: " + e.getLocalizedMessage(), e);
+                    }
+                } else {
+                    throw new IllegalHeaderException("The " + JMS_EXPIRATION + " must be a Long.  Type found: " + headerValue.getClass().getName());
+                }
             } else {
-                // The following properties are set by the MessageProducer:
-                // JMSDestination
-                // The following are set on the underlying JMS provider:
-                // JMSMessageID, JMSTimestamp, JMSRedelivered
-                // log at trace level to not spam log
                 LOGGER.trace("Ignoring JMS header: {} with value: {}", headerName, headerValue);
-                if (headerName.equalsIgnoreCase("JMSDestination")
-                    || headerName.equalsIgnoreCase("JMSMessageID")
-                    || headerName.equalsIgnoreCase("JMSTimestamp")
+                if (headerName.equalsIgnoreCase(JMS_DESTINATION) || headerName.equalsIgnoreCase(JMS_MESSAGE_ID) || headerName.equalsIgnoreCase("JMSTimestamp")
                     || headerName.equalsIgnoreCase("JMSRedelivered")) {
-                    // The following properties are set by the MessageProducer:
+                    // The following properties are set by the
+                    // MessageProducer:
                     // JMSDestination
                     // The following are set on the underlying JMS provider:
                     // JMSMessageID, JMSTimestamp, JMSRedelivered
@@ -409,188 +235,151 @@ public final class JmsMessageHelper {
                     LOGGER.trace("Ignoring JMS header: {} with value: {}", headerName, headerValue);
                 } else {
                     if (!(headerValue instanceof JmsMessageType)) {
-                        String encodedName = new DefaultJmsKeyFormatStrategy().encodeKey(headerName);
-                        JmsMessageHelper.setProperty(jmsMessage, encodedName, headerValue);
+                        String encodedName = localKeyFormatStrategy.encodeKey(headerName);
+                        try {
+                            JmsMessageHelper.setProperty(jmsMessage, encodedName, headerValue);
+                        } catch (JMSException e) {
+                            throw new IllegalHeaderException("Failed to set the header " + encodedName + " header. Cause: " + e.getLocalizedMessage(), e);
+                        }
                     }
                 }
-            }            
+                // }
+            }
         }
         return jmsMessage;
     }
-    
-    public static JmsMessageType discoverType(Message value) throws Exception {
-        JmsMessageType answer = null;
-        if (value != null) { 
-            if (Message.class.isInstance(value)) {
-                if (BytesMessage.class.isInstance(value)) {
-                    answer = JmsMessageType.Bytes;
-                } else if (MapMessage.class.isInstance(value)) {
-                    answer = JmsMessageType.Map;
-                } else if (TextMessage.class.isInstance(value)) {
-                    answer = JmsMessageType.Text;
-                } else if (ObjectMessage.class.isInstance(value)) {
-                    answer = JmsMessageType.Object;
+
+    /**
+     * Sets the JMSDeliveryMode on the message.
+     * 
+     * @param exchange the exchange
+     * @param message the message
+     * @param deliveryMode the delivery mode, either as a String or integer
+     * @throws javax.jms.JMSException is thrown if error setting the delivery
+     *             mode
+     */
+    public static void setJMSDeliveryMode(Message message, Object deliveryMode) throws JMSException {
+        Integer mode = null;
+
+        if (deliveryMode instanceof String) {
+            String s = (String)deliveryMode;
+            if ("PERSISTENT".equalsIgnoreCase(s)) {
+                mode = DeliveryMode.PERSISTENT;
+            } else if ("NON_PERSISTENT".equalsIgnoreCase(s)) {
+                mode = DeliveryMode.NON_PERSISTENT;
+            } else {
+                // it may be a number in the String so try that
+                Integer value = null;
+                try {
+                    value = Integer.valueOf(s);
+                } catch (NumberFormatException e) {
+                    // Do nothing. The error handler below is sufficient
+                }
+                if (value != null) {
+                    mode = value;
                 } else {
-                    answer = JmsMessageType.Message;
+                    throw new IllegalArgumentException("Unknown delivery mode with value: " + deliveryMode);
                 }
             }
+        } else if (deliveryMode instanceof Integer) {
+            // fallback and try to convert to a number
+            mode = (Integer)deliveryMode;
+        } else {
+            throw new IllegalArgumentException("Unable to convert the given delivery mode of type " + deliveryMode.getClass().getName() + " with value: " + deliveryMode);
+        }
+
+        if (mode != null) {
+            message.setJMSDeliveryMode(mode);
         }
-        return answer;
     }
-    
-    public static JmsMessageType discoverType(final Exchange exchange) {
-        JmsMessageType answer = (JmsMessageType) exchange.getIn().getHeader(JMS_MESSAGE_TYPE);
-        if (answer == null) {
-            final Object value = exchange.getIn().getBody();
-            if (value != null) {
-                if (Byte[].class.isInstance(value)) {
-                    answer = JmsMessageType.Bytes;
-                } else if (Collection.class.isInstance(value)) {
-                    answer = JmsMessageType.Map;
-                } else if (String.class.isInstance(value)) {
-                    answer = JmsMessageType.Text;
-                } else if (Serializable.class.isInstance(value)) {
-                    answer = JmsMessageType.Object;
-                } else {
-                    answer = JmsMessageType.Message;
-                }
+
+    /**
+     * Sets the correlation id on the JMS message.
+     * <p/>
+     * Will ignore exception thrown
+     * 
+     * @param message the JMS message
+     * @param type the correlation id
+     */
+    public static void setMessageType(Message message, String type) {
+        try {
+            message.setJMSType(type);
+        } catch (JMSException e) {
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug("Error setting the message type: {}", type);
             }
         }
-        return answer;
     }
 
-    @SuppressWarnings("unchecked")
-    public static Exchange setJmsMessageHeaders(final Message jmsMessage, final Exchange exchange, boolean out)
-        throws JMSException {
-        HashMap<String, Object> headers = new HashMap<String, Object>();
-        if (jmsMessage != null) {
-            // lets populate the standard JMS message headers
-            try {
-                headers.put(
-                        JmsMessageHeaderType.JMSCorrelationID.toString(), 
-                        jmsMessage.getJMSCorrelationID());
-                headers.put(
-                        JmsMessageHeaderType.JMSDeliveryMode.toString(), 
-                        jmsMessage.getJMSDeliveryMode());
-                headers.put(
-                        JmsMessageHeaderType.JMSDestination.toString(), 
-                        jmsMessage.getJMSDestination());
-                headers.put(
-                        JmsMessageHeaderType.JMSExpiration.toString(), 
-                        jmsMessage.getJMSExpiration());
-                headers.put(
-                        JmsMessageHeaderType.JMSMessageID.toString(), 
-                        jmsMessage.getJMSMessageID());
-                headers.put(
-                        JmsMessageHeaderType.JMSPriority.toString(), 
-                        jmsMessage.getJMSPriority());
-                headers.put(
-                        JmsMessageHeaderType.JMSRedelivered.toString(), 
-                        jmsMessage.getJMSRedelivered());
-                headers.put(
-                        JmsMessageHeaderType.JMSTimestamp.toString(), 
-                        jmsMessage.getJMSTimestamp());
-                headers.put(
-                        JmsMessageHeaderType.JMSReplyTo.toString(), 
-                        JmsMessageHelper.getJMSReplyTo(jmsMessage));
-                headers.put(
-                        JmsMessageHeaderType.JMSType.toString(), 
-                        JmsMessageHelper.getJMSType(jmsMessage));
-
-                // this works around a bug in the ActiveMQ property handling
-                headers.put(
-                        JmsMessageHeaderType.JMSXGroupID.toString(), 
-                        jmsMessage.getStringProperty(JmsMessageHeaderType.JMSXGroupID.toString()));
-            } catch (JMSException e) {
-                throw new RuntimeCamelException(e);
-            }
-            
-            for (Enumeration<String> enumeration = jmsMessage
-                    .getPropertyNames(); enumeration.hasMoreElements();) {
-                String key = enumeration.nextElement();
-                if (hasIllegalHeaderKey(key)) {
-                    throw new IllegalHeaderException("Header " + key + " is not a legal JMS header name value");
-                }
-                Object value = jmsMessage.getObjectProperty(key);
-                headers.put(key, value);
+    /**
+     * Sets the correlation id on the JMS message.
+     * <p/>
+     * Will ignore exception thrown
+     * 
+     * @param message the JMS message
+     * @param correlationId the correlation id
+     */
+    public static void setCorrelationId(Message message, String correlationId) {
+        try {
+            message.setJMSCorrelationID(correlationId);
+        } catch (JMSException e) {
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug("Error setting the correlationId: {}", correlationId);
             }
         }
-        if (out) {
-            exchange.getOut().setHeaders(headers);
+    }
+
+    /**
+     * Sets the property on the given JMS message.
+     * 
+     * @param jmsMessage the JMS message
+     * @param name name of the property to set
+     * @param value the value
+     * @throws JMSException can be thrown
+     */
+    public static void setProperty(Message jmsMessage, String name, Object value) throws JMSException {
+        if (value == null) {
+            return;
+        }
+        if (value instanceof Byte) {
+            jmsMessage.setByteProperty(name, (Byte)value);
+        } else if (value instanceof Boolean) {
+            jmsMessage.setBooleanProperty(name, (Boolean)value);
+        } else if (value instanceof Double) {
+            jmsMessage.setDoubleProperty(name, (Double)value);
+        } else if (value instanceof Float) {
+            jmsMessage.setFloatProperty(name, (Float)value);
+        } else if (value instanceof Integer) {
+            jmsMessage.setIntProperty(name, (Integer)value);
+        } else if (value instanceof Long) {
+            jmsMessage.setLongProperty(name, (Long)value);
+        } else if (value instanceof Short) {
+            jmsMessage.setShortProperty(name, (Short)value);
+        } else if (value instanceof String) {
+            jmsMessage.setStringProperty(name, (String)value);
         } else {
-            exchange.getIn().setHeaders(headers);
+            // fallback to Object
+            jmsMessage.setObjectProperty(name, value);
         }
-        return exchange;
-    }
-    
-    public static Message createMessage(Exchange exchange, Session session) throws Exception {
-        return createMessage(exchange, session, false);
     }
-    
-    @SuppressWarnings("unchecked")
-    public static Message createMessage(Exchange exchange, Session session, boolean out) throws Exception {
-        Message answer = null;
-        Object body = null;
-        try {
-            if (out && exchange.getOut().getBody() != null) {
-                body = exchange.getOut().getBody();
-            } else {
-                body = exchange.getIn().getBody();
-            }
-            JmsMessageType messageType = JmsMessageHelper.discoverType(exchange);
 
-            switch (messageType) {
-            case Bytes:
-                BytesMessage bytesMessage = session.createBytesMessage();
-                bytesMessage.writeBytes((byte[])body);
-                answer = bytesMessage;
-                break;
-            case Map:
-                MapMessage mapMessage = session.createMapMessage();
-                Map<String, Object> objMap = (Map<String, Object>)body;
-                Set<String> keys = objMap.keySet();
-                for (String key : keys) {
-                    Object value = objMap.get(key);
-                    mapMessage.setObject(key, value);
-                }
-                answer = mapMessage;
-                break;
-            case Object:
-                ObjectMessage objectMessage = session.createObjectMessage();
-                objectMessage.setObject((Serializable)body);
-                answer = objectMessage;
-                break;
-            case Text:
-                TextMessage textMessage = session.createTextMessage();
-                textMessage.setText((String)body);
-                answer = textMessage;
-                break;
-            default:
-                break;
+    public static JmsMessageType discoverPayloadType(Object payload) {
+        JmsMessageType answer = null;
+        if (payload != null) {
+            if (Byte[].class.isInstance(payload)) {
+                answer = JmsMessageType.Bytes;
+            } else if (Collection.class.isInstance(payload)) {
+                answer = JmsMessageType.Map;
+            } else if (String.class.isInstance(payload)) {
+                answer = JmsMessageType.Text;
+            } else if (Serializable.class.isInstance(payload)) {
+                answer = JmsMessageType.Object;
+            } else {
+                answer = JmsMessageType.Message;
             }
-        } catch (Exception e) {
-            LOGGER.error("TODO Auto-generated catch block", e);
-            throw e;
+        } else {
+            answer = JmsMessageType.Message;
         }
-
-        answer = JmsMessageHelper.setJmsMessageHeaders(exchange, answer);
         return answer;
     }
-    
-    private static boolean hasIllegalHeaderKey(String key) {
-        if (key == null) {
-            return true;
-        }
-        if (key.equals("")) {
-            return true;
-        }
-        if (key.indexOf(".") > -1) {
-            return true;
-        }
-        if (key.indexOf("-") > -1) {
-            return true;
-        }
-        return false;
-    }
-
 }

Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java Thu Aug 30 03:42:12 2012
@@ -16,6 +16,9 @@
  */
 package org.apache.camel.component.sjms.producer;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import javax.jms.Connection;
 import javax.jms.Message;
 import javax.jms.MessageProducer;
@@ -23,32 +26,40 @@ import javax.jms.Session;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
+import org.apache.camel.component.sjms.BatchMessage;
 import org.apache.camel.component.sjms.SjmsEndpoint;
 import org.apache.camel.component.sjms.SjmsProducer;
+import org.apache.camel.component.sjms.TransactionCommitStrategy;
 import org.apache.camel.component.sjms.jms.JmsMessageHelper;
 import org.apache.camel.component.sjms.jms.JmsObjectFactory;
+import org.apache.camel.component.sjms.tx.DefaultTransactionCommitStrategy;
 import org.apache.camel.component.sjms.tx.SessionTransactionSynchronization;
 
 /**
- * The InOnlyProducer is responsible for publishing messages to the JMS
- * {@link Destination} for the value specified in the destinationName field.
+ * A Camel Producer that provides the InOnly Exchange pattern..
  */
 public class InOnlyProducer extends SjmsProducer {
-    
+
     public InOnlyProducer(SjmsEndpoint endpoint) {
         super(endpoint);
     }
-    
+
     /*
      * @see org.apache.camel.component.sjms.SjmsProducer#doCreateProducerModel()
-     *
      * @return
      * @throws Exception
      */
+    @Override
     public MessageProducerResources doCreateProducerModel() throws Exception {
         Connection conn = getConnectionResource().borrowConnection();
+        TransactionCommitStrategy commitStrategy = null;
         Session session = null;
         if (isEndpointTransacted()) {
+            if (this.getCommitStrategy() != null) {
+                commitStrategy = this.getCommitStrategy();
+            } else {
+                commitStrategy = new DefaultTransactionCommitStrategy();
+            }
             session = conn.createSession(true, getAcknowledgeMode());
         } else {
             session = conn.createSession(false, getAcknowledgeMode());
@@ -60,20 +71,48 @@ public class InOnlyProducer extends Sjms
             messageProducer = JmsObjectFactory.createQueueProducer(session, getDestinationName());
         }
         getConnectionResource().returnConnection(conn);
-        return new MessageProducerResources(session, messageProducer);
+        return new MessageProducerResources(session, messageProducer, commitStrategy);
     }
-    
+
+    /*
+     * @see org.apache.camel.component.sjms.SjmsProducer#sendMessage(org.apache.camel.Exchange, org.apache.camel.AsyncCallback)
+     *
+     * @param exchange
+     * @param callback
+     * @throws Exception
+     */
     @Override
     public void sendMessage(Exchange exchange, AsyncCallback callback) throws Exception {
+        List<Message> messages = new ArrayList<Message>();
+        MessageProducerResources producer = getProducers().borrowObject();
         if (getProducers() != null) {
-            MessageProducerResources producer = getProducers().borrowObject();
-            
+            if (exchange.getIn().getBody() != null) {
+                if (exchange.getIn().getBody() instanceof List) {
+                    List<?> payload = (List<?>)exchange.getIn().getBody();
+                    for (Object object : payload) {
+                        Message message = null;
+                        if (BatchMessage.class.isInstance(object)) {
+                            BatchMessage<?> batchMessage = (BatchMessage<?>)object;
+                            message = JmsMessageHelper.createMessage(producer.getSession(), batchMessage.getPayload(), batchMessage.getHeaders(), getSjmsEndpoint()
+                                .getJmsKeyFormatStrategy());
+                        } else {
+                            message = JmsMessageHelper.createMessage(producer.getSession(), object, exchange.getIn().getHeaders(), getSjmsEndpoint().getJmsKeyFormatStrategy());
+                        }
+                        messages.add(message);
+                    }
+                } else {
+                    Object payload = exchange.getIn().getBody();
+                    Message message = JmsMessageHelper.createMessage(producer.getSession(), payload, exchange.getIn().getHeaders(), getSjmsEndpoint().getJmsKeyFormatStrategy());
+                    messages.add(message);
+                }
+            }
+
             if (isEndpointTransacted()) {
-                exchange.getUnitOfWork().addSynchronization(new SessionTransactionSynchronization(producer.getSession()));
+                exchange.getUnitOfWork().addSynchronization(new SessionTransactionSynchronization(producer.getSession(), producer.getCommitStrategy()));
+            }
+            for (Message message : messages) {
+                producer.getMessageProducer().send(message);
             }
-            
-            Message message = JmsMessageHelper.createMessage(exchange, producer.getSession());
-            producer.getMessageProducer().send(message);
             getProducers().returnObject(producer);
             callback.done(isSynchronous());
         }

Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java Thu Aug 30 03:42:12 2012
@@ -39,44 +39,38 @@ import org.apache.camel.CamelException;
 import org.apache.camel.Exchange;
 import org.apache.camel.component.sjms.SjmsEndpoint;
 import org.apache.camel.component.sjms.SjmsProducer;
-import org.apache.camel.component.sjms.jms.JmsMessageHelper;
+import org.apache.camel.component.sjms.jms.JmsMessageExchangeHelper;
 import org.apache.camel.component.sjms.jms.JmsObjectFactory;
 import org.apache.camel.component.sjms.jms.ObjectPool;
 import org.apache.camel.component.sjms.tx.SessionTransactionSynchronization;
 import org.apache.camel.util.ObjectHelper;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * TODO Add Class documentation for InOutProducer
- *
+ * A Camel Producer that provides the InOut Exchange pattern.
  */
 public class InOutProducer extends SjmsProducer {
 
     /**
      * We use the {@link ReadWriteLock} to manage the {@link TreeMap} in place
      * of a {@link ConcurrentMap} because due to significant performance gains.
-     * 
      * TODO Externalize the Exchanger Map to a store object
      */
     private static Map<String, Exchanger<Object>> exchangerMap = new TreeMap<String, Exchanger<Object>>();
     private ReadWriteLock lock = new ReentrantReadWriteLock();
-    
-    
+
     /**
-     * A pool of {@link MessageConsumerResource} objects that are the
-     * reply consumers.
-     * 
-     * TODO Add Class documentation for MessageProducerPool
+     * A pool of {@link MessageConsumerResource} objects that are the reply
+     * consumers. 
+     * TODO Add Class documentation for MessageProducerPool 
      * TODO Externalize
-     *
      */
     protected class MessageConsumerPool extends ObjectPool<MessageConsumerResource> {
 
         /**
          * TODO Add Constructor Javadoc
-         *
+         * 
          * @param poolSize
          */
         public MessageConsumerPool(int poolSize) {
@@ -112,32 +106,32 @@ public class InOutProducer extends SjmsP
                     } catch (Exception e) {
                         ObjectHelper.wrapRuntimeCamelException(e);
                     }
-                    
+
                 }
             });
             MessageConsumerResource mcm = new MessageConsumerResource(session, messageConsumer, replyToDestination);
             return mcm;
         }
-        
+
         @Override
         protected void destroyObject(MessageConsumerResource model) throws Exception {
             if (model.getMessageConsumer() != null) {
                 model.getMessageConsumer().close();
             }
-            
+
             if (model.getSession() != null) {
                 if (model.getSession().getTransacted()) {
                     try {
                         model.getSession().rollback();
                     } catch (Exception e) {
-                        // Do nothing.  Just make sure we are cleaned up
+                        // Do nothing. Just make sure we are cleaned up
                     }
                 }
                 model.getSession().close();
             }
         }
     }
-    
+
     /**
      * TODO Add Class documentation for MessageConsumerResource
      */
@@ -153,6 +147,7 @@ public class InOutProducer extends SjmsP
          * @param messageConsumer
          */
         public MessageConsumerResource(Session session, MessageConsumer messageConsumer, Destination replyToDestination) {
+            super();
             this.session = session;
             this.messageConsumer = messageConsumer;
             this.replyToDestination = replyToDestination;
@@ -161,7 +156,7 @@ public class InOutProducer extends SjmsP
         public Session getSession() {
             return session;
         }
-        
+
         public MessageConsumer getMessageConsumer() {
             return messageConsumer;
         }
@@ -171,30 +166,6 @@ public class InOutProducer extends SjmsP
         }
     }
 
-    protected class InOutResponseContainer {
-        private final Exchange exchange;
-        private final AsyncCallback callback;
-
-        /**
-         * 
-         * @param exchange
-         * @param callback
-         */
-        public InOutResponseContainer(Exchange exchange, AsyncCallback callback) {
-            this.exchange = exchange;
-            this.callback = callback;
-        }
-
-        public Exchange getExchange() {
-            return exchange;
-        }
-
-        public AsyncCallback getCallback() {
-            return callback;
-        }
-    }
-    
-    
     protected class InternalTempDestinationListener implements MessageListener {
         private final Logger tempLogger = LoggerFactory.getLogger(InternalTempDestinationListener.class);
         private Exchanger<Object> exchanger;
@@ -205,6 +176,7 @@ public class InOutProducer extends SjmsP
          * @param exchanger
          */
         public InternalTempDestinationListener(Exchanger<Object> exchanger) {
+            super();
             this.exchanger = exchanger;
         }
 
@@ -222,24 +194,20 @@ public class InOutProducer extends SjmsP
 
         }
     }
-    
+
     private MessageConsumerPool consumers;
-    
+
     public InOutProducer(SjmsEndpoint endpoint) {
         super(endpoint);
         endpoint.getConsumerCount();
     }
-    
+
     @Override
     protected void doStart() throws Exception {
         if (ObjectHelper.isEmpty(getNamedReplyTo())) {
-            if (log.isDebugEnabled()) {
-                log.debug("No reply to destination is defined.  Using temporary destinations.");
-            }
+            log.debug("No reply to destination is defined.  Using temporary destinations.");
         } else {
-            if (log.isDebugEnabled()) {
-                log.debug("Using {} as the reply to destination.", getNamedReplyTo());
-            }
+            log.debug("Using {} as the reply to destination.", getNamedReplyTo());
         }
         if (getConsumers() == null) {
             setConsumers(new MessageConsumerPool(getConsumerCount()));
@@ -247,7 +215,7 @@ public class InOutProducer extends SjmsP
         }
         super.doStart();
     }
-    
+
     @Override
     protected void doStop() throws Exception {
         super.doStop();
@@ -256,7 +224,7 @@ public class InOutProducer extends SjmsP
             setConsumers(null);
         }
     }
-    
+
     @Override
     public MessageProducerResources doCreateProducerModel() throws Exception {
         Connection conn = getConnectionResource().borrowConnection();
@@ -275,13 +243,13 @@ public class InOutProducer extends SjmsP
         getConnectionResource().returnConnection(conn);
         return new MessageProducerResources(session, messageProducer);
     }
-    
-    /** 
-     * TODO Add override javadoc
-     * TODO time out is actually double as it waits for the producer and then waits for the response.  Use an atomiclong to manage the countdown
-     *
-     * @see org.apache.camel.component.sjms.SjmsProducer#sendMessage(org.apache.camel.Exchange, org.apache.camel.AsyncCallback)
-     *
+
+    /**
+     * TODO time out is actually double as it waits for the producer and then
+     * waits for the response. Use an atomic long to manage the countdown
+     * 
+     * @see org.apache.camel.component.sjms.SjmsProducer#sendMessage(org.apache.camel.Exchange,
+     *      org.apache.camel.AsyncCallback)
      * @param exchange
      * @param callback
      * @throws Exception
@@ -292,7 +260,6 @@ public class InOutProducer extends SjmsP
             MessageProducerResources producer = null;
             try {
                 producer = getProducers().borrowObject(getResponseTimeOut());
-                // producer = getProducers().borrowObject();
             } catch (Exception e1) {
                 log.warn("The producer pool is exhausted.  Consider setting producerCount to a higher value or disable the fixed size of the pool by setting fixedResourcePool=false.");
                 exchange.setException(new Exception("Producer Resource Pool is exhausted"));
@@ -300,11 +267,11 @@ public class InOutProducer extends SjmsP
             if (producer != null) {
 
                 if (isEndpointTransacted()) {
-                    exchange.getUnitOfWork()
-                        .addSynchronization(new SessionTransactionSynchronization(producer.getSession()));
+                    exchange.getUnitOfWork().addSynchronization(new SessionTransactionSynchronization(producer.getSession(), getCommitStrategy()));
                 }
 
-                Message request = JmsMessageHelper.createMessage(exchange, producer.getSession());
+                Message request = JmsMessageExchangeHelper.createMessage(exchange, producer.getSession());
+                
                 // TODO just set the correlation id don't get it from the
                 // message
                 String correlationId = null;
@@ -315,7 +282,7 @@ public class InOutProducer extends SjmsP
                 }
                 Object responseObject = null;
                 Exchanger<Object> messageExchanger = new Exchanger<Object>();
-                JmsMessageHelper.setCorrelationId(request, correlationId);
+                JmsMessageExchangeHelper.setCorrelationId(request, correlationId);
                 try {
                     lock.writeLock().lock();
                     exchangerMap.put(correlationId, messageExchanger);
@@ -324,7 +291,7 @@ public class InOutProducer extends SjmsP
                 }
 
                 MessageConsumerResource consumer = consumers.borrowObject(getResponseTimeOut());
-                JmsMessageHelper.setJMSReplyTo(request, consumer.getReplyToDestination());
+                JmsMessageExchangeHelper.setJMSReplyTo(request, consumer.getReplyToDestination());
                 consumers.returnObject(consumer);
                 producer.getMessageProducer().send(request);
 
@@ -335,8 +302,7 @@ public class InOutProducer extends SjmsP
                 }
 
                 try {
-                    responseObject = messageExchanger.exchange(null, getResponseTimeOut(),
-                                                               TimeUnit.MILLISECONDS);
+                    responseObject = messageExchanger.exchange(null, getResponseTimeOut(), TimeUnit.MILLISECONDS);
 
                     try {
                         lock.writeLock().lock();
@@ -357,7 +323,7 @@ public class InOutProducer extends SjmsP
                         exchange.setException((Throwable)responseObject);
                     } else if (responseObject instanceof Message) {
                         Message response = (Message)responseObject;
-                        JmsMessageHelper.populateExchange(response, exchange, true);
+                        JmsMessageExchangeHelper.populateExchange(response, exchange, true);
                     } else {
                         exchange.setException(new CamelException("Unknown response type: " + responseObject));
                     }

Added: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/BatchTransactionCommitStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/BatchTransactionCommitStrategy.java?rev=1378796&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/BatchTransactionCommitStrategy.java (added)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/BatchTransactionCommitStrategy.java Thu Aug 30 03:42:12 2012
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.tx;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.component.sjms.TransactionCommitStrategy;
+
+/**
+ * Provides a thread safe counter to track the number of {@link Exchange}
+ * objects that have been been processed.
+ * 
+ */
+public class BatchTransactionCommitStrategy implements TransactionCommitStrategy {
+
+    private final AtomicInteger current = new AtomicInteger(0);
+    private final int count;
+
+    /**
+     * @param count
+     */
+    public BatchTransactionCommitStrategy(int count) {
+        super();
+        this.count = count;
+    }
+
+    @Override
+    public boolean commit(Exchange exchange) throws Exception {
+        boolean answer = false;
+        int currentVal = current.incrementAndGet();
+        if (currentVal >= count) {
+            answer = true;
+            current.set(0);
+        }
+
+        return answer;
+    }
+
+    @Override
+    public boolean rollback(Exchange exchange) throws Exception {
+        current.set(0);
+        return true;
+    }
+
+}

Copied: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/DefaultTransactionCommitStrategy.java (from r1378786, camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/KeyFormatStrategy.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/DefaultTransactionCommitStrategy.java?p2=camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/DefaultTransactionCommitStrategy.java&p1=camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/KeyFormatStrategy.java&r1=1378786&r2=1378796&rev=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/KeyFormatStrategy.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/DefaultTransactionCommitStrategy.java Thu Aug 30 03:42:12 2012
@@ -14,28 +14,38 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.component.sjms;
+package org.apache.camel.component.sjms.tx;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.component.sjms.TransactionCommitStrategy;
 
 /**
- * Strategy for applying encoding and decoding of JMS headers so they apply to the JMS spec.
- *
- * @version 
+ * The default commit strategy for all transaction.
+ * 
  */
-public interface KeyFormatStrategy {
+public class DefaultTransactionCommitStrategy implements TransactionCommitStrategy {
 
     /**
-     * Encodes the key before its sent as a {@link javax.jms.Message} message.
+     * @see org.apache.camel.component.sjms.TransactionCommitStrategy#commit(org.apache.camel.Exchange)
      *
-     * @param key  the original key
-     * @return the encoded key
+     * @param exchange
+     * @return
+     * @throws Exception
      */
-    String encodeKey(String key);
+    @Override
+    public boolean commit(Exchange exchange) throws Exception {
+        return true;
+    }
 
     /**
-     * Decodes the key after its received from a {@link javax.jms.Message} message.
+     * @see org.apache.camel.component.sjms.TransactionCommitStrategy#rollback(org.apache.camel.Exchange)
      *
-     * @param key the encoded key
-     * @return the decoded key as the original key
+     * @param exchange
+     * @return
+     * @throws Exception
      */
-    String decodeKey(String key);
+    @Override
+    public boolean rollback(Exchange exchange) throws Exception {
+        return true;
+    }
 }

Modified: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/SessionTransactionSynchronization.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/SessionTransactionSynchronization.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/SessionTransactionSynchronization.java (original)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/SessionTransactionSynchronization.java Thu Aug 30 03:42:12 2012
@@ -16,61 +16,66 @@
  */
 package org.apache.camel.component.sjms.tx;
 
-import javax.jms.JMSException;
 import javax.jms.Session;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.component.sjms.TransactionCommitStrategy;
 import org.apache.camel.spi.Synchronization;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * TODO Add Class documentation for SessionTransactionSynchronization
- *
+ * SessionTransactionSynchronization is called at the completion of each {@link org.apache.camel.Exhcnage}.
  */
 public class SessionTransactionSynchronization implements Synchronization {
     private Logger log = LoggerFactory.getLogger(getClass());
     private Session session;
-    
-    public SessionTransactionSynchronization(Session session) {
+    private final TransactionCommitStrategy commitStrategy;
+
+    public SessionTransactionSynchronization(Session session, TransactionCommitStrategy commitStrategy) {
         this.session = session;
+        if (commitStrategy == null) {
+            this.commitStrategy = new DefaultTransactionCommitStrategy();
+        } else {
+            this.commitStrategy = commitStrategy;
+        }
     }
-    
-    /*
-     * @see org.apache.camel.spi.Synchronization#onFailure(org.apache.camel.Exchange)
-     *
+
+    /**
+     * @see
+     * org.apache.camel.spi.Synchronization#onFailure(org.apache.camel.Exchange)
      * @param exchange
      */
     @Override
     public void onFailure(Exchange exchange) {
-        if (log.isDebugEnabled()) {
-            log.debug("Processing failure of Exchange id:{}", exchange.getExchangeId());
-        }
         try {
-            if (session != null && session.getTransacted()) {
-                this.session.rollback();
+            if (commitStrategy.rollback(exchange)) {
+                log.debug("Processing failure of Exchange id:{}", exchange.getExchangeId());
+                if (session != null && session.getTransacted()) {
+                    this.session.rollback();
+                }
             }
-        } catch (JMSException e) {
+        } catch (Exception e) {
             log.warn("Failed to rollback the session: {}", e.getMessage());
         }
     }
-    
-    /*
-     * @see org.apache.camel.spi.Synchronization#onComplete(org.apache.camel.Exchange)
-     *
+
+    /**
+     * @see
+     * org.apache.camel.spi.Synchronization#onComplete(org.apache.camel.Exchange
+     * )
      * @param exchange
      */
     @Override
     public void onComplete(Exchange exchange) {
-        if (log.isDebugEnabled()) {
-            log.debug("Processing completion of Exchange id:{}", exchange.getExchangeId());
-        }
         try {
-            if (session != null && session.getTransacted()) {
-                this.session.commit();
+            if (commitStrategy.commit(exchange)) {
+                log.debug("Processing completion of Exchange id:{}", exchange.getExchangeId());
+                if (session != null && session.getTransacted()) {
+                    this.session.commit();
+                }
             }
-        } catch (JMSException e) {
+        } catch (Exception e) {
             log.warn("Failed to commit the session: {}", e.getMessage());
             exchange.setException(e);
         }

Modified: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/JmsSelectorOptionTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/JmsSelectorOptionTest.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/JmsSelectorOptionTest.java (original)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/JmsSelectorOptionTest.java Thu Aug 30 03:42:12 2012
@@ -24,19 +24,19 @@ import org.apache.camel.component.sjms.s
 
 import org.junit.Test;
 
-public class JmsSelectorOptionTest extends JmsTestSupport  {
+public class JmsSelectorOptionTest extends JmsTestSupport {
 
     @Test
     public void testJmsMessageWithSelector() throws Exception {
         MockEndpoint endpointA = getMockEndpoint("mock:a");
         MockEndpoint endpointB = getMockEndpoint("mock:b");
         MockEndpoint endpointC = getMockEndpoint("mock:c");
-        
+
         endpointA.expectedBodiesReceivedInAnyOrder("A blue car!", "A blue car, again!");
         endpointA.expectedHeaderReceived("color", "blue");
         endpointB.expectedHeaderReceived("color", "red");
         endpointB.expectedBodiesReceived("A red car!");
-        
+
         endpointC.expectedBodiesReceived("Message1", "Message2");
         endpointC.expectedMessageCount(2);
 
@@ -48,7 +48,7 @@ public class JmsSelectorOptionTest exten
         template.sendBodyAndHeader("sjms:queue:hello", "Message2", "SIZE_NUMBER", 1600);
         assertMockEndpointsSatisfied();
     }
-    
+
     @Test
     public void testConsumerTemplate() throws Exception {
         template.sendBodyAndHeader("sjms:queue:consumer", "Message1", "SIZE_NUMBER", 1505);
@@ -70,7 +70,7 @@ public class JmsSelectorOptionTest exten
         }
 
     }
-    
+
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             public void configure() throws Exception {

Modified: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/JmsSelectorTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/JmsSelectorTest.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/JmsSelectorTest.java (original)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/JmsSelectorTest.java Thu Aug 30 03:42:12 2012
@@ -23,7 +23,7 @@ import org.apache.camel.component.sjms.s
 import org.junit.Test;
 
 /**
- * @version 
+ * @version
  */
 public class JmsSelectorTest extends JmsTestSupport {
 
@@ -46,13 +46,13 @@ public class JmsSelectorTest extends Jms
 
         resultEndpoint.assertIsSatisfied();
     }
-    
+
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             public void configure() throws Exception {
-                from("sjms:test.a").to("sjms:test.b");
-                from("sjms:test.b?messageSelector=cheese='y'").to("mock:result");
+                from("sjms:test.a").to("log:test-before?showAll=true").to("sjms:test.b");
+                from("sjms:test.b?messageSelector=cheese='y'").to("log:test-after?showAll=true").to("mock:result");
             }
         };
     }
-}
\ No newline at end of file
+}

Modified: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SimpleJmsComponentTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SimpleJmsComponentTest.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SimpleJmsComponentTest.java (original)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SimpleJmsComponentTest.java Thu Aug 30 03:42:12 2012
@@ -24,10 +24,9 @@ import org.junit.Test;
 
 public class SimpleJmsComponentTest extends CamelTestSupport {
 
-
     @Test
     public void testHelloWorld() throws Exception {
-        SjmsComponent component = (SjmsComponent) this.context.getComponent("sjms");
+        SjmsComponent component = (SjmsComponent)this.context.getComponent("sjms");
         assertNotNull(component);
     }
 
@@ -35,8 +34,7 @@ public class SimpleJmsComponentTest exte
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             public void configure() {
-                ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
-                        "vm://broker?broker.persistent=false&broker.useJmx=true");
+                ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://broker?broker.persistent=false&broker.useJmx=true");
                 SjmsComponent component = new SjmsComponent();
                 component.setConnectionFactory(connectionFactory);
                 getContext().addComponent("sjms", component);

Modified: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SjmsEndpointNameOverrideTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SjmsEndpointNameOverrideTest.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SjmsEndpointNameOverrideTest.java (original)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SjmsEndpointNameOverrideTest.java Thu Aug 30 03:42:12 2012
@@ -28,7 +28,7 @@ import org.junit.Test;
 public class SjmsEndpointNameOverrideTest extends CamelTestSupport {
 
     private static final String BEAN_NAME = "not-sjms";
-    
+
     @Override
     protected boolean useJmx() {
         return true;
@@ -39,7 +39,7 @@ public class SjmsEndpointNameOverrideTes
         Endpoint endpoint = context.getEndpoint(BEAN_NAME + ":test");
         assertNotNull(endpoint);
         assertTrue(endpoint instanceof SjmsEndpoint);
-        SjmsEndpoint sjms = (SjmsEndpoint) endpoint;
+        SjmsEndpoint sjms = (SjmsEndpoint)endpoint;
         assertEquals(sjms.getEndpointUri(), BEAN_NAME + "://queue:test");
         assertEquals(sjms.createExchange().getPattern(), ExchangePattern.InOnly);
     }
@@ -68,8 +68,7 @@ public class SjmsEndpointNameOverrideTes
     protected CamelContext createCamelContext() throws Exception {
         CamelContext camelContext = super.createCamelContext();
 
-        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
-                "vm://broker?broker.persistent=false&broker.useJmx=false");
+        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://broker?broker.persistent=false&broker.useJmx=false");
         SjmsComponent component = new SjmsComponent();
         component.setMaxConnections(1);
         component.setConnectionFactory(connectionFactory);

Modified: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SjmsEndpointTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SjmsEndpointTest.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SjmsEndpointTest.java (original)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/SjmsEndpointTest.java Thu Aug 30 03:42:12 2012
@@ -26,7 +26,7 @@ import org.apache.camel.test.junit4.Came
 import org.junit.Test;
 
 public class SjmsEndpointTest extends CamelTestSupport {
-    
+
     @Override
     protected boolean useJmx() {
         return true;
@@ -37,7 +37,7 @@ public class SjmsEndpointTest extends Ca
         Endpoint endpoint = context.getEndpoint("sjms:test");
         assertNotNull(endpoint);
         assertTrue(endpoint instanceof SjmsEndpoint);
-        SjmsEndpoint sjms = (SjmsEndpoint) endpoint;
+        SjmsEndpoint sjms = (SjmsEndpoint)endpoint;
         assertEquals(sjms.getEndpointUri(), "sjms://queue:test");
         assertEquals(sjms.createExchange().getPattern(), ExchangePattern.InOnly);
     }
@@ -60,7 +60,7 @@ public class SjmsEndpointTest extends Ca
         Endpoint endpoint = context.getEndpoint("sjms:queue:test?transacted=true");
         assertNotNull(endpoint);
         assertTrue(endpoint instanceof SjmsEndpoint);
-        SjmsEndpoint qe = (SjmsEndpoint) endpoint;
+        SjmsEndpoint qe = (SjmsEndpoint)endpoint;
         assertTrue(qe.isTransacted());
     }
 
@@ -69,7 +69,7 @@ public class SjmsEndpointTest extends Ca
         Endpoint endpoint = context.getEndpoint("sjms:queue:test?synchronous=true");
         assertNotNull(endpoint);
         assertTrue(endpoint instanceof SjmsEndpoint);
-        SjmsEndpoint qe = (SjmsEndpoint) endpoint;
+        SjmsEndpoint qe = (SjmsEndpoint)endpoint;
         assertTrue(qe.isSynchronous());
     }
 
@@ -79,7 +79,7 @@ public class SjmsEndpointTest extends Ca
         Endpoint endpoint = context.getEndpoint("sjms:queue:test?namedReplyTo=" + namedReplyTo);
         assertNotNull(endpoint);
         assertTrue(endpoint instanceof SjmsEndpoint);
-        SjmsEndpoint qe = (SjmsEndpoint) endpoint;
+        SjmsEndpoint qe = (SjmsEndpoint)endpoint;
         assertEquals(qe.getNamedReplyTo(), namedReplyTo);
         assertEquals(qe.createExchange().getPattern(), ExchangePattern.InOut);
     }
@@ -87,10 +87,10 @@ public class SjmsEndpointTest extends Ca
     @Test
     public void testDefaultExchangePattern() throws Exception {
         try {
-            SjmsEndpoint sjms = (SjmsEndpoint) context.getEndpoint("sjms:queue:test");
+            SjmsEndpoint sjms = (SjmsEndpoint)context.getEndpoint("sjms:queue:test");
             assertNotNull(sjms);
             assertEquals(ExchangePattern.InOnly, sjms.getExchangePattern());
-//            assertTrue(sjms.createExchange().getPattern().equals(ExchangePattern.InOnly));
+            // assertTrue(sjms.createExchange().getPattern().equals(ExchangePattern.InOnly));
         } catch (Exception e) {
             fail("Exception thrown: " + e.getLocalizedMessage());
         }
@@ -129,7 +129,7 @@ public class SjmsEndpointTest extends Ca
         Endpoint endpoint = context.getEndpoint("sjms:queue:test?namedReplyTo=" + namedReplyTo + "&exchangePattern=" + ExchangePattern.InOut);
         assertNotNull(endpoint);
         assertTrue(endpoint instanceof SjmsEndpoint);
-        SjmsEndpoint qe = (SjmsEndpoint) endpoint;
+        SjmsEndpoint qe = (SjmsEndpoint)endpoint;
         assertEquals(qe.getNamedReplyTo(), namedReplyTo);
         assertEquals(qe.createExchange().getPattern(), ExchangePattern.InOut);
     }
@@ -144,15 +144,14 @@ public class SjmsEndpointTest extends Ca
         Endpoint endpoint = context.getEndpoint("sjms:queue:test?synchronous=true");
         assertNotNull(endpoint);
         assertTrue(endpoint instanceof SjmsEndpoint);
-        SjmsEndpoint qe = (SjmsEndpoint) endpoint;
+        SjmsEndpoint qe = (SjmsEndpoint)endpoint;
         assertTrue(qe.getDestinationName().equals("test"));
     }
 
     protected CamelContext createCamelContext() throws Exception {
         CamelContext camelContext = super.createCamelContext();
 
-        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
-                "vm://broker?broker.persistent=false&broker.useJmx=false");
+        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://broker?broker.persistent=false&broker.useJmx=false");
         SjmsComponent component = new SjmsComponent();
         component.setMaxConnections(3);
         component.setConnectionFactory(connectionFactory);

Modified: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyConsumerDefaultTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyConsumerDefaultTest.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyConsumerDefaultTest.java (original)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyConsumerDefaultTest.java Thu Aug 30 03:42:12 2012
@@ -16,19 +16,17 @@
  */
 package org.apache.camel.component.sjms.consumer;
 
-
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.component.sjms.support.JmsTestSupport;
-
 import org.junit.Test;
 
 /**
- * @version 
+ * @version
  */
 public class InOnlyConsumerDefaultTest extends JmsTestSupport {
 
-    private static final String SJMS_QUEUE_NAME = "sjms:in.only.consumer.";
+    private static final String SJMS_QUEUE_NAME = "sjms:in.only.consumer";
     private static final String MOCK_RESULT = "mock:result";
 
     @Test
@@ -39,15 +37,16 @@ public class InOnlyConsumerDefaultTest e
         mock.expectedBodiesReceived(expectedBody);
 
         template.sendBody(SJMS_QUEUE_NAME, expectedBody);
-        
+
         mock.assertIsSatisfied();
     }
 
+    @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
+            @Override
             public void configure() throws Exception {
-                from(SJMS_QUEUE_NAME)
-                    .to(MOCK_RESULT);
+                from(SJMS_QUEUE_NAME).to(MOCK_RESULT);
             }
         };
     }

Modified: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyQueueConsumerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyQueueConsumerTest.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyQueueConsumerTest.java (original)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyQueueConsumerTest.java Thu Aug 30 03:42:12 2012
@@ -45,7 +45,7 @@ public class InOnlyQueueConsumerTest ext
 
     }
 
-    /*
+    /**
      * @see org.apache.camel.test.junit4.CamelTestSupport#createRouteBuilder()
      *
      * @return

Modified: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyTopicConsumerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyTopicConsumerTest.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyTopicConsumerTest.java (original)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyTopicConsumerTest.java Thu Aug 30 03:42:12 2012
@@ -45,7 +45,7 @@ public class InOnlyTopicConsumerTest ext
 
     }
 
-    /*
+    /**
      * @see org.apache.camel.test.junit4.CamelTestSupport#createRouteBuilder()
      *
      * @return

Modified: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/ObjectPoolTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/ObjectPoolTest.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/ObjectPoolTest.java (original)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/jms/ObjectPoolTest.java Thu Aug 30 03:42:12 2012
@@ -171,6 +171,7 @@ public class ObjectPoolTest {
     class TestPool extends ObjectPool<MyPooledObject> {
 
         public TestPool() {
+            super();
         }
 
         public TestPool(int poolSize) {

Modified: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOnlyQueueProducerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOnlyQueueProducerTest.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOnlyQueueProducerTest.java (original)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOnlyQueueProducerTest.java Thu Aug 30 03:42:12 2012
@@ -65,7 +65,7 @@ public class InOnlyQueueProducerTest ext
 
     }
 
-    /*
+    /**
      * @see org.apache.camel.test.junit4.CamelTestSupport#createRouteBuilder()
      *
      * @return

Modified: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOnlyTopicProducerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOnlyTopicProducerTest.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOnlyTopicProducerTest.java (original)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOnlyTopicProducerTest.java Thu Aug 30 03:42:12 2012
@@ -65,7 +65,7 @@ public class InOnlyTopicProducerTest ext
 
     }
 
-    /*
+    /**
      * @see org.apache.camel.test.junit4.CamelTestSupport#createRouteBuilder()
      *
      * @return

Modified: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerAsyncLoadTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerAsyncLoadTest.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerAsyncLoadTest.java (original)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerAsyncLoadTest.java Thu Aug 30 03:42:12 2012
@@ -97,15 +97,15 @@ public class InOutQueueProducerAsyncLoad
             executor.execute(worker);
         }
         while (context.getInflightRepository().size() > 0) {
-
+            Thread.sleep(100);
         }
         executor.shutdown();
         while (!executor.isTerminated()) {
-            //
+            Thread.sleep(100);
         }
     }
 
-    /*
+    /**
      * @see org.apache.camel.test.junit4.CamelTestSupport#createRouteBuilder()
      * 
      * @return
@@ -129,6 +129,10 @@ public class InOutQueueProducerAsyncLoad
     protected class MyMessageListener implements MessageListener {
         private MessageProducer mp;
 
+        public MyMessageListener() {
+            super();
+        }
+
         @Override
         public void onMessage(Message message) {
             try {

Modified: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerSyncLoadTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerSyncLoadTest.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerSyncLoadTest.java (original)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerSyncLoadTest.java Thu Aug 30 03:42:12 2012
@@ -129,6 +129,10 @@ public class InOutQueueProducerSyncLoadT
     protected class MyMessageListener implements MessageListener {
         private MessageProducer mp;
 
+        public MyMessageListener() {
+            super();
+        }
+
         @Override
         public void onMessage(Message message) {
             try {

Modified: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerTest.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerTest.java (original)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutQueueProducerTest.java Thu Aug 30 03:42:12 2012
@@ -117,6 +117,7 @@ public class InOutQueueProducerTest exte
          * @param response
          */
         public MyMessageListener(String request, String response) {
+            super();
             this.requestText = request;
             this.responseText = response;
         }

Modified: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutTempQueueProducerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutTempQueueProducerTest.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutTempQueueProducerTest.java (original)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/InOutTempQueueProducerTest.java Thu Aug 30 03:42:12 2012
@@ -91,6 +91,7 @@ public class InOutTempQueueProducerTest 
          * @param response
          */
         public MyMessageListener(String request, String response) {
+            super();
             this.requestText = request;
             this.responseText = response;
         }

Modified: camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProducerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProducerTest.java?rev=1378796&r1=1378795&r2=1378796&view=diff
==============================================================================
--- camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProducerTest.java (original)
+++ camel/trunk/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/producer/QueueProducerTest.java Thu Aug 30 03:42:12 2012
@@ -65,7 +65,7 @@ public class QueueProducerTest extends J
 
     }
 
-    /*
+    /**
      * @see org.apache.camel.test.junit4.CamelTestSupport#createRouteBuilder()
      *
      * @return