You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2012/08/02 12:56:44 UTC

svn commit: r1368413 [2/5] - in /camel/trunk: components/ components/camel-sjms/ components/camel-sjms/src/ components/camel-sjms/src/main/ components/camel-sjms/src/main/java/ components/camel-sjms/src/main/java/org/ components/camel-sjms/src/main/jav...

Added: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java (added)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/InOutMessageHandler.java Thu Aug  2 10:56:40 2012
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.consumer;
+
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Topic;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.component.sjms.jms.JmsMessageHelper;
+import org.apache.camel.component.sjms.jms.JmsObjectFactory;
+import org.apache.camel.spi.Synchronization;
+import org.apache.camel.util.AsyncProcessorHelper;
+import org.apache.camel.util.ObjectHelper;
+
+/**
+ * TODO Add Class documentation for DefaultMessageHandler
+ * TODO Create a producer cache manager to store and purge unused cashed producers or we will have a memory leak
+ * 
+ */
+public class InOutMessageHandler extends DefaultMessageHandler {
+
+    private Map<String, MessageProducer> producerCache = new TreeMap<String, MessageProducer>();
+    private ReadWriteLock lock = new ReentrantReadWriteLock();
+
+    /**
+     * TODO Add Constructor Javadoc
+     * 
+     * @param endpoint
+     * @param processor
+     */
+    public InOutMessageHandler(Endpoint endpoint, ExecutorService executor) {
+        this(endpoint, executor, null);
+    }
+
+    /**
+     * TODO Add Constructor Javadoc
+     * 
+     * @param stopped
+     * @param synchronization
+     */
+    public InOutMessageHandler(Endpoint endpoint, ExecutorService executor, Synchronization synchronization) {
+        super(endpoint, executor, synchronization);
+    }
+    
+    /**
+     * @param message
+     */
+    @Override
+    public void doHandleMessage(final Exchange exchange) {
+        try {
+            MessageProducer messageProducer = null;
+            Object obj = exchange.getIn().getHeader("JMSReplyTo");
+            if (obj != null) {
+                Destination replyTo = null;
+                if (isDestination(obj)) {
+                    replyTo = (Destination)obj;
+                } else if (obj instanceof String) {
+                    replyTo = JmsObjectFactory.createDestination(getSession(), (String)obj, isTopic());
+                } else {
+                    throw new Exception("The value of JMSReplyTo must be a valid Destination or String.  Value provided: "
+                                            + obj);
+                }
+
+                String destinationName = getDestinationName(replyTo);
+                try {
+                    lock.readLock().lock();
+                    if (producerCache.containsKey(destinationName)) {
+                        messageProducer = producerCache.get(destinationName);
+                    }
+                } finally {
+                    lock.readLock().unlock();
+                }
+                if (messageProducer == null) {
+                    try {
+                        lock.writeLock().lock();
+                        messageProducer = getSession().createProducer(replyTo);
+                        producerCache.put(destinationName, messageProducer);
+                    } finally {
+                        lock.writeLock().unlock();
+                    }
+                }
+            }
+            
+            MessageHanderAsyncCallback callback = new MessageHanderAsyncCallback(exchange, messageProducer);
+            if (exchange.isFailed()) {
+                return;
+            } else {
+                if (isTransacted() || isSynchronous()) {
+                    // must process synchronous if transacted or configured to
+                    // do so
+                    if (log.isDebugEnabled()) {
+                        log.debug("Synchronous processing: Message[{}], Destination[{}] ", exchange.getIn().getBody(), this.getEndpoint().getEndpointUri());
+                    }
+                    try {
+                        AsyncProcessorHelper.process(getProcessor(), exchange);
+                    } catch (Exception e) {
+                        exchange.setException(e);
+                    } finally {
+                        callback.done(true);
+                    }
+                } else {
+                    // process asynchronous using the async routing engine
+                    if (log.isDebugEnabled()) {
+                        log.debug("Aynchronous processing: Message[{}], Destination[{}] ", exchange.getIn().getBody(), this.getEndpoint().getEndpointUri());
+                    }
+                    boolean sync = AsyncProcessorHelper.process(getProcessor(), exchange, callback);
+                    if (!sync) {
+                        // will be done async so return now
+                        return;
+                    }
+                }
+            }
+        } catch (Exception e) {
+            exchange.setException(e);
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("SjmsMessageConsumer invoked for Exchange id:{} ",
+                    exchange.getExchangeId());
+        }
+    }
+    
+    @Override
+    public void close() {
+        for (String key : producerCache.keySet()) {
+            MessageProducer mp = producerCache.get(key);
+            try {
+                mp.close();
+            } catch (JMSException e) {
+                ObjectHelper.wrapRuntimeCamelException(e);
+            }
+        }
+        producerCache.clear();
+    }
+    
+    private boolean isDestination(Object object) {
+        return object instanceof Destination;
+    }
+    
+    private String getDestinationName(Destination destination) throws Exception {
+        String answer = null;
+        if (destination instanceof Queue) {
+            answer = ((Queue)destination).getQueueName();
+        } else if (destination instanceof Topic) {
+            answer = ((Topic)destination).getTopicName();
+        }
+
+        return answer;
+    }
+
+    protected class MessageHanderAsyncCallback implements AsyncCallback {
+
+        private Exchange exchange;
+        private MessageProducer localProducer;
+
+        public MessageHanderAsyncCallback(Exchange exchange, MessageProducer localProducer) {
+            super();
+            this.exchange = exchange;
+            this.localProducer = localProducer;
+        }
+
+        @Override
+        public void done(boolean sync) {
+
+            try {
+                Message response = JmsMessageHelper.createMessage(exchange,
+                        getSession(), true);
+                response.setJMSCorrelationID(exchange.getIn().getHeader(
+                        "JMSCorrelationID", String.class)); 
+                localProducer.send(response);
+            } catch (Exception e) {
+                exchange.setException(e);
+            }
+        }
+    }
+}

Added: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsConstants.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsConstants.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsConstants.java (added)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsConstants.java Thu Aug  2 10:56:40 2012
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.jms;
+
+/**
+ * JMS constants
+ */
+public final class JmsConstants {
+
+    public static final String JMS_DESTINATION = "CamelJmsDestination";
+    public static final String JMS_DESTINATION_NAME = "CamelJmsDestinationName";
+    public static final String JMS_MESSAGE_TYPE = "CamelJmsMessageType";
+    public static final String JMS_DELIVERY_MODE = "CamelJmsDeliveryMode";
+
+    private JmsConstants() {
+        // utility class
+    }
+
+}

Added: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHeaderType.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHeaderType.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHeaderType.java (added)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHeaderType.java Thu Aug  2 10:56:40 2012
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.jms;
+
+public enum JmsMessageHeaderType {
+    JMSDestination,
+    JMSDeliveryMode,
+    JMSExpiration,
+    JMSPriority,
+    JMSMessageID,
+    JMSTimestamp,
+    JMSCorrelationID,
+    JMSReplyTo,
+    JMSType,
+    JMSRedelivered,
+    
+    /*
+     * Add known custom headers
+     */
+    JMSXGroupID
+    
+}

Added: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java (added)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java Thu Aug  2 10:56:40 2012
@@ -0,0 +1,596 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.jms;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import javax.jms.BytesMessage;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.component.sjms.DefaultJmsKeyFormatStrategy;
+import org.apache.camel.component.sjms.IllegalHeaderException;
+import org.apache.camel.impl.DefaultMessage;
+import org.apache.camel.util.ExchangeHelper;
+import org.apache.camel.util.ObjectHelper;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.camel.component.sjms.SjmsConstants.JMS_MESSAGE_TYPE;
+import static org.apache.camel.component.sjms.SjmsConstants.QUEUE_PREFIX;
+import static org.apache.camel.component.sjms.SjmsConstants.TOPIC_PREFIX;
+import static org.apache.camel.util.ObjectHelper.removeStartingCharacters;
+
+/**
+ * Utility class for {@link javax.jms.Message}.
+ *
+ * @version 
+ */
+public final class JmsMessageHelper {
+    
+    private static final Logger LOGGER = LoggerFactory.getLogger(JmsMessageHelper.class);
+
+    private JmsMessageHelper() {
+    }
+    
+    public static Exchange createExchange(Message message, Endpoint endpoint) {
+        Exchange exchange = endpoint.createExchange();
+        return populateExchange(message, exchange, false);
+    }
+    
+    @SuppressWarnings("unchecked")
+    public static Exchange populateExchange(Message message, Exchange exchange, boolean out) {
+        try {
+            JmsMessageHelper.setJmsMessageHeaders(message, exchange, out);
+            if (message != null) {
+                // convert to JMS Message of the given type
+
+                DefaultMessage bodyMessage = null;
+                if (out) {
+                    bodyMessage = (DefaultMessage) exchange.getOut();
+                } else {
+                    bodyMessage = (DefaultMessage) exchange.getIn();
+                }
+                switch (JmsMessageHelper.discoverType(message)) {
+                case Bytes:
+                    BytesMessage bytesMessage = (BytesMessage) message;
+                    if (bytesMessage.getBodyLength() > Integer.MAX_VALUE) {
+                        LOGGER.warn("Length of BytesMessage is too long: {}", bytesMessage.getBodyLength());
+                        return null;
+                    }
+                    byte[] result = new byte[(int) bytesMessage.getBodyLength()];
+                    bytesMessage.readBytes(result);
+                    bodyMessage.setHeader(JMS_MESSAGE_TYPE, JmsMessageType.Bytes);
+                    bodyMessage.setBody(result);
+                    break;
+                case Map:
+                    HashMap<String, Object> body = new HashMap<String, Object>();
+                    MapMessage mapMessage = (MapMessage) message;
+                    Enumeration<String> names = mapMessage.getMapNames();
+                    while (names.hasMoreElements()) {
+                        String key = names.nextElement();
+                        Object value = mapMessage.getObject(key);
+                        body.put(key, value);
+                    }
+                    bodyMessage.setHeader(JMS_MESSAGE_TYPE, JmsMessageType.Map);
+                    bodyMessage.setBody(body);
+                    break;
+                case Object:
+                    ObjectMessage objMsg = (ObjectMessage) message;
+                    bodyMessage.setHeader(JMS_MESSAGE_TYPE, JmsMessageType.Object);
+                    bodyMessage.setBody(objMsg.getObject());
+                    break;
+                case Text:
+                    TextMessage textMsg = (TextMessage) message;
+                    bodyMessage.setHeader(JMS_MESSAGE_TYPE, JmsMessageType.Text);
+                    bodyMessage.setBody(textMsg.getText());
+                    break;
+                case Message:
+                default:
+                    // Do nothing. Only set the headers for an empty message
+                    bodyMessage.setBody(message);
+                    break;
+                }
+            }
+        } catch (Exception e) {
+            exchange.setException(e);
+        }
+        return exchange;
+    }
+
+    /**
+     * Removes the property from the JMS message.
+     *
+     * @param jmsMessage the JMS message
+     * @param name       name of the property to remove
+     * @return the old value of the property or <tt>null</tt> if not exists
+     * @throws JMSException can be thrown
+     */
+    public static Object removeJmsProperty(Message jmsMessage, String name) throws JMSException {
+        // check if the property exists
+        if (!jmsMessage.propertyExists(name)) {
+            return null;
+        }
+
+        Object answer = null;
+
+        // store the properties we want to keep in a temporary map
+        // as the JMS API is a bit strict as we are not allowed to
+        // clear a single property, but must clear them all and redo
+        // the properties
+        Map<String, Object> map = new LinkedHashMap<String, Object>();
+        Enumeration<?> en = jmsMessage.getPropertyNames();
+        while (en.hasMoreElements()) {
+            String key = (String) en.nextElement();
+            if (name.equals(key)) {
+                answer = key;
+            } else {
+                map.put(key, jmsMessage.getObjectProperty(key));
+            }
+        }
+
+        // redo the properties to keep
+        jmsMessage.clearProperties();
+        for (Entry<String, Object> entry : map.entrySet()) {
+            jmsMessage.setObjectProperty(entry.getKey(), entry.getValue());
+        }
+
+        return answer;
+    }
+
+    /**
+     * Tests whether a given property with the name exists
+     *
+     * @param jmsMessage the JMS message
+     * @param name       name of the property to test if exists
+     * @return <tt>true</tt> if the property exists, <tt>false</tt> if not.
+     * @throws JMSException can be thrown
+     */
+    public static boolean hasProperty(Message jmsMessage, String name) throws JMSException {
+        Enumeration<?> en = jmsMessage.getPropertyNames();
+        while (en.hasMoreElements()) {
+            String key = (String) en.nextElement();
+            if (name.equals(key)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Sets the property on the given JMS message.
+     *
+     * @param jmsMessage  the JMS message
+     * @param name        name of the property to set
+     * @param value       the value
+     * @throws JMSException can be thrown
+     */
+    public static void setProperty(Message jmsMessage, String name, Object value) throws JMSException {
+        if (value == null) {
+            return;
+        }
+        if (value instanceof Byte) {
+            jmsMessage.setByteProperty(name, (Byte) value);
+        } else if (value instanceof Boolean) {
+            jmsMessage.setBooleanProperty(name, (Boolean) value);
+        } else if (value instanceof Double) {
+            jmsMessage.setDoubleProperty(name, (Double) value);
+        } else if (value instanceof Float) {
+            jmsMessage.setFloatProperty(name, (Float) value);
+        } else if (value instanceof Integer) {
+            jmsMessage.setIntProperty(name, (Integer) value);
+        } else if (value instanceof Long) {
+            jmsMessage.setLongProperty(name, (Long) value);
+        } else if (value instanceof Short) {
+            jmsMessage.setShortProperty(name, (Short) value);
+        } else if (value instanceof String) {
+            jmsMessage.setStringProperty(name, (String) value);
+        } else {
+            // fallback to Object
+            jmsMessage.setObjectProperty(name, value);
+        }
+    }
+
+    /**
+     * Sets the correlation id on the JMS message.
+     * <p/>
+     * Will ignore exception thrown
+     *
+     * @param message  the JMS message
+     * @param correlationId the correlation id
+     */
+    public static void setCorrelationId(Message message, String correlationId) {
+        try {
+            message.setJMSCorrelationID(correlationId);
+        } catch (JMSException e) {
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug("Error setting the correlationId: {}", correlationId);
+            }
+        }
+    }
+
+    /**
+     * Normalizes the destination name, by removing any leading queue or topic prefixes.
+     *
+     * @param destination the destination
+     * @return the normalized destination
+     */
+    public static String normalizeDestinationName(String destination) {
+        if (ObjectHelper.isEmpty(destination)) {
+            return destination;
+        }
+        if (destination.startsWith(QUEUE_PREFIX)) {
+            return removeStartingCharacters(destination.substring(QUEUE_PREFIX.length()), '/');
+        } else if (destination.startsWith(TOPIC_PREFIX)) {
+            return removeStartingCharacters(destination.substring(TOPIC_PREFIX.length()), '/');
+        } else {
+            return destination;
+        }
+    }
+
+    /**
+     * Sets the JMSReplyTo on the message.
+     *
+     * @param message  the message
+     * @param replyTo  the reply to destination
+     */
+    public static void setJMSReplyTo(Message message, Destination replyTo) {
+        try {
+            message.setJMSReplyTo(replyTo);
+        } catch (Exception e) {
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug("Error setting the correlationId: {}", replyTo.toString());
+            }
+        }
+    }
+
+    /**
+     * Gets the JMSReplyTo from the message.
+     *
+     * @param message  the message
+     * @return the reply to, can be <tt>null</tt>
+     */
+    public static Destination getJMSReplyTo(Message message) {
+        try {
+            return message.getJMSReplyTo();
+        } catch (Exception e) {
+            // ignore due OracleAQ does not support accessing JMSReplyTo
+        }
+
+        return null;
+    }
+
+    /**
+     * Gets the JMSType from the message.
+     *
+     * @param message  the message
+     * @return the type, can be <tt>null</tt>
+     */
+    public static String getJMSType(Message message) {
+        try {
+            return message.getJMSType();
+        } catch (Exception e) {
+            // ignore due OracleAQ does not support accessing JMSType
+        }
+
+        return null;
+    }
+
+    /**
+     * Gets the JMSRedelivered from the message.
+     *
+     * @param message  the message
+     * @return <tt>true</tt> if redelivered, <tt>false</tt> if not, <tt>null</tt> if not able to determine
+     */
+    public static Boolean getJMSRedelivered(Message message) {
+        try {
+            return message.getJMSRedelivered();
+        } catch (Exception e) {
+            // ignore if JMS broker do not support this
+        }
+
+        return null;
+    }
+
+    /**
+     * Sets the JMSDeliveryMode on the message.
+     *
+     * @param exchange the exchange
+     * @param message  the message
+     * @param deliveryMode  the delivery mode, either as a String or integer
+     * @throws javax.jms.JMSException is thrown if error setting the delivery mode
+     */
+    public static void setJMSDeliveryMode(Exchange exchange, Message message, Object deliveryMode) throws JMSException {
+        Integer mode = null;
+
+        if (deliveryMode instanceof String) {
+            String s = (String) deliveryMode;
+            if ("PERSISTENT".equalsIgnoreCase(s)) {
+                mode = DeliveryMode.PERSISTENT;
+            } else if ("NON_PERSISTENT".equalsIgnoreCase(s)) {
+                mode = DeliveryMode.NON_PERSISTENT;
+            } else {
+                // it may be a number in the String so try that
+                Integer value = ExchangeHelper.convertToType(exchange, Integer.class, deliveryMode);
+                if (value != null) {
+                    mode = value;
+                } else {
+                    throw new IllegalArgumentException("Unknown delivery mode with value: " + deliveryMode);
+                }
+            }
+        } else {
+            // fallback and try to convert to a number
+            Integer value = ExchangeHelper.convertToType(exchange, Integer.class, deliveryMode);
+            if (value != null) {
+                mode = value;
+            }
+        }
+
+        if (mode != null) {
+            message.setJMSDeliveryMode(mode);
+            message.setIntProperty(JmsConstants.JMS_DELIVERY_MODE, mode);
+        }
+    }
+    
+
+    public static Message setJmsMessageHeaders(final Exchange exchange, final Message jmsMessage) throws Exception {
+        Map<String, Object> headers = new HashMap<String, Object>(exchange.getIn().getHeaders());
+        Set<String> keys = headers.keySet();
+        for (String headerName : keys) {
+            Object headerValue = headers.get(headerName);
+            if (headerName.equalsIgnoreCase("JMSCorrelationID")) {
+                jmsMessage.setJMSCorrelationID(ExchangeHelper.convertToType(exchange, String.class, headerValue));
+            } else if (headerName.equalsIgnoreCase("JMSReplyTo") && headerValue != null) {
+                if (headerValue instanceof String) {
+                    // if the value is a String we must normalize it first
+                    headerValue = (String) headerValue;
+                } else {
+                    // TODO write destination converter
+                    // Destination replyTo =
+                    // ExchangeHelper.convertToType(exchange, Destination.class,
+                    // headerValue);
+                    // jmsMessage.setJMSReplyTo(replyTo);
+                }
+            } else if (headerName.equalsIgnoreCase("JMSType")) {
+                jmsMessage.setJMSType(ExchangeHelper.convertToType(exchange, String.class, headerValue));
+            } else if (headerName.equalsIgnoreCase("JMSPriority")) {
+                jmsMessage.setJMSPriority(ExchangeHelper.convertToType(exchange, Integer.class, headerValue));
+            } else if (headerName.equalsIgnoreCase("JMSDeliveryMode")) {
+                JmsMessageHelper.setJMSDeliveryMode(exchange, jmsMessage, headerValue);
+            } else if (headerName.equalsIgnoreCase("JMSExpiration")) {
+                jmsMessage.setJMSExpiration(ExchangeHelper.convertToType(exchange, Long.class, headerValue));
+            } else {
+                // The following properties are set by the MessageProducer:
+                // JMSDestination
+                // The following are set on the underlying JMS provider:
+                // JMSMessageID, JMSTimestamp, JMSRedelivered
+                // log at trace level to not spam log
+                LOGGER.trace("Ignoring JMS header: {} with value: {}", headerName, headerValue);
+                if (headerName.equalsIgnoreCase("JMSDestination")
+                    || headerName.equalsIgnoreCase("JMSMessageID")
+                    || headerName.equalsIgnoreCase("JMSTimestamp")
+                    || headerName.equalsIgnoreCase("JMSRedelivered")) {
+                    // The following properties are set by the MessageProducer:
+                    // JMSDestination
+                    // The following are set on the underlying JMS provider:
+                    // JMSMessageID, JMSTimestamp, JMSRedelivered
+                    // log at trace level to not spam log
+                    LOGGER.trace("Ignoring JMS header: {} with value: {}", headerName, headerValue);
+                } else {
+                    if (!(headerValue instanceof JmsMessageType)) {
+                        String encodedName = new DefaultJmsKeyFormatStrategy().encodeKey(headerName);
+                        JmsMessageHelper.setProperty(jmsMessage, encodedName, headerValue);
+                    }
+                }
+            }            
+        }
+        return jmsMessage;
+    }
+    
+    public static JmsMessageType discoverType(Message value) throws Exception {
+        JmsMessageType answer = null;
+        if (value != null) { 
+            if (Message.class.isInstance(value)) {
+                if (BytesMessage.class.isInstance(value)) {
+                    answer = JmsMessageType.Bytes;
+                } else if (MapMessage.class.isInstance(value)) {
+                    answer = JmsMessageType.Map;
+                } else if (TextMessage.class.isInstance(value)) {
+                    answer = JmsMessageType.Text;
+                } else if (ObjectMessage.class.isInstance(value)) {
+                    answer = JmsMessageType.Object;
+                } else {
+                    answer = JmsMessageType.Message;
+                }
+            }
+        }
+        return answer;
+    }
+    
+    public static JmsMessageType discoverType(final Exchange exchange) {
+        JmsMessageType answer = (JmsMessageType) exchange.getIn().getHeader(JMS_MESSAGE_TYPE);
+        if (answer == null) {
+            final Object value = exchange.getIn().getBody();
+            if (value != null) {
+                if (Byte[].class.isInstance(value)) {
+                    answer = JmsMessageType.Bytes;
+                } else if (Collection.class.isInstance(value)) {
+                    answer = JmsMessageType.Map;
+                } else if (String.class.isInstance(value)) {
+                    answer = JmsMessageType.Text;
+                } else if (Serializable.class.isInstance(value)) {
+                    answer = JmsMessageType.Object;
+                } else {
+                    answer = JmsMessageType.Message;
+                }
+            }
+        }
+        return answer;
+    }
+
+    @SuppressWarnings("unchecked")
+    public static Exchange setJmsMessageHeaders(final Message jmsMessage, final Exchange exchange, boolean out)
+        throws JMSException {
+        HashMap<String, Object> headers = new HashMap<String, Object>();
+        if (jmsMessage != null) {
+            // lets populate the standard JMS message headers
+            try {
+                headers.put(
+                        JmsMessageHeaderType.JMSCorrelationID.toString(), 
+                        jmsMessage.getJMSCorrelationID());
+                headers.put(
+                        JmsMessageHeaderType.JMSDeliveryMode.toString(), 
+                        jmsMessage.getJMSDeliveryMode());
+                headers.put(
+                        JmsMessageHeaderType.JMSDestination.toString(), 
+                        jmsMessage.getJMSDestination());
+                headers.put(
+                        JmsMessageHeaderType.JMSExpiration.toString(), 
+                        jmsMessage.getJMSExpiration());
+                headers.put(
+                        JmsMessageHeaderType.JMSMessageID.toString(), 
+                        jmsMessage.getJMSMessageID());
+                headers.put(
+                        JmsMessageHeaderType.JMSPriority.toString(), 
+                        jmsMessage.getJMSPriority());
+                headers.put(
+                        JmsMessageHeaderType.JMSRedelivered.toString(), 
+                        jmsMessage.getJMSRedelivered());
+                headers.put(
+                        JmsMessageHeaderType.JMSTimestamp.toString(), 
+                        jmsMessage.getJMSTimestamp());
+                headers.put(
+                        JmsMessageHeaderType.JMSReplyTo.toString(), 
+                        JmsMessageHelper.getJMSReplyTo(jmsMessage));
+                headers.put(
+                        JmsMessageHeaderType.JMSType.toString(), 
+                        JmsMessageHelper.getJMSType(jmsMessage));
+
+                // this works around a bug in the ActiveMQ property handling
+                headers.put(
+                        JmsMessageHeaderType.JMSXGroupID.toString(), 
+                        jmsMessage.getStringProperty(JmsMessageHeaderType.JMSXGroupID.toString()));
+            } catch (JMSException e) {
+                throw new RuntimeCamelException(e);
+            }
+            
+            for (Enumeration<String> enumeration = jmsMessage
+                    .getPropertyNames(); enumeration.hasMoreElements();) {
+                String key = enumeration.nextElement();
+                if (hasIllegalHeaderKey(key)) {
+                    throw new IllegalHeaderException("Header " + key + " is not a legal JMS header name value");
+                }
+                Object value = jmsMessage.getObjectProperty(key);
+                headers.put(key, value);
+            }
+        }
+        if (out) {
+            exchange.getOut().setHeaders(headers);
+        } else {
+            exchange.getIn().setHeaders(headers);
+        }
+        return exchange;
+    }
+    
+    public static Message createMessage(Exchange exchange, Session session) throws Exception {
+        return createMessage(exchange, session, false);
+    }
+    
+    @SuppressWarnings("unchecked")
+    public static Message createMessage(Exchange exchange, Session session, boolean out) throws Exception {
+        Message answer = null;
+        Object body = null;
+        try {
+            if (out && exchange.getOut().getBody() != null) {
+                body = exchange.getOut().getBody();
+            } else {
+                body = exchange.getIn().getBody();
+            }
+            JmsMessageType messageType = JmsMessageHelper.discoverType(exchange);
+
+            switch (messageType) {
+            case Bytes:
+                BytesMessage bytesMessage = session.createBytesMessage();
+                bytesMessage.writeBytes((byte[])body);
+                answer = bytesMessage;
+                break;
+            case Map:
+                MapMessage mapMessage = session.createMapMessage();
+                Map<String, Object> objMap = (Map<String, Object>)body;
+                Set<String> keys = objMap.keySet();
+                for (String key : keys) {
+                    Object value = objMap.get(key);
+                    mapMessage.setObject(key, value);
+                }
+                answer = mapMessage;
+                break;
+            case Object:
+                ObjectMessage objectMessage = session.createObjectMessage();
+                objectMessage.setObject((Serializable)body);
+                answer = objectMessage;
+                break;
+            case Text:
+                TextMessage textMessage = session.createTextMessage();
+                textMessage.setText((String)body);
+                answer = textMessage;
+                break;
+            default:
+                break;
+            }
+        } catch (Exception e) {
+            LOGGER.error("TODO Auto-generated catch block", e);
+            throw e;
+        }
+
+        answer = JmsMessageHelper.setJmsMessageHeaders(exchange, answer);
+        return answer;
+    }
+    
+    private static boolean hasIllegalHeaderKey(String key) {
+        if (key == null) {
+            return true;
+        }
+        if (key.equals("")) {
+            return true;
+        }
+        if (key.indexOf(".") > -1) {
+            return true;
+        }
+        if (key.indexOf("-") > -1) {
+            return true;
+        }
+        return false;
+    }
+
+}

Added: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageType.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageType.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageType.java (added)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageType.java Thu Aug  2 10:56:40 2012
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.jms;
+
+/**
+ * Enum for different {@link javax.jms.Message} types.
+ *
+ * @version 
+ */
+public enum JmsMessageType {
+
+    /**
+     * First the JMS Message types
+     */
+    Bytes,
+    Map,
+    Object,
+    /**
+     * TODO Write support for Stream Messages
+     */
+    //Stream,
+    Text,
+    
+    /**
+     * BlobMessage which is not supported by all JMS implementations
+     */
+    Blob,
+    
+    /**
+     * The default type that can be used for empty messages.
+     */
+    Message
+
+}

Added: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsObjectFactory.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsObjectFactory.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsObjectFactory.java (added)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsObjectFactory.java Thu Aug  2 10:56:40 2012
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.jms;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicSession;
+
+import org.apache.camel.util.ObjectHelper;
+
+/**
+ * TODO Add Class documentation for JmsObjectFactory
+ *
+ */
+public final class JmsObjectFactory {
+    
+    private JmsObjectFactory() {
+        //Helper class
+    }
+
+    public static Destination createDestination(Session session, String destinationName, boolean topic)
+        throws Exception {
+        if (topic) {
+            return createTopic(session, destinationName);
+        } else {
+            return createQueue(session, destinationName);
+        }
+    }
+
+    public static Destination createQueue(Session session, String destinationName) throws Exception {
+        return session.createQueue(destinationName);
+    }
+
+    public static Destination createTemporaryDestination(Session session, boolean topic) throws Exception {
+        if (topic) {
+            return session.createTemporaryTopic();
+        } else {
+            return session.createTemporaryQueue();
+        }
+    }
+
+    public static Destination createTopic(Session session, String destinationName) throws Exception {
+        return session.createTopic(destinationName);
+    }
+
+    public static MessageConsumer createQueueConsumer(Session session, String destinationName) throws Exception {
+        return createMessageConsumer(session, destinationName, null, false, null, true);
+    }
+
+    public static MessageConsumer createQueueConsumer(Session session, String destinationName, String messageSelector) throws Exception {
+        return createMessageConsumer(session, destinationName, messageSelector, false, null, true);
+    }
+
+    public static MessageConsumer createTopicConsumer(Session session, String destinationName, String messageSelector) throws Exception {
+        return createMessageConsumer(session, destinationName, messageSelector, true, null, true);
+    }
+    
+    public static MessageConsumer createTemporaryMessageConsumer(
+            Session session, 
+            String messageSelector, 
+            boolean topic, 
+            String durableSubscriptionId,
+            boolean noLocal) throws Exception {
+        Destination destination = createTemporaryDestination(session, topic);
+        return createMessageConsumer(session, destination, messageSelector, topic, durableSubscriptionId, noLocal);
+    }
+    
+    public static MessageConsumer createMessageConsumer(
+            Session session, 
+            String destinationName, 
+            String messageSelector, 
+            boolean topic, 
+            String durableSubscriptionId) throws Exception {
+        return createMessageConsumer(session, destinationName, messageSelector, topic, durableSubscriptionId, true);
+    }
+    
+    public static MessageConsumer createMessageConsumer(
+            Session session, 
+            String destinationName, 
+            String messageSelector, 
+            boolean topic, 
+            String durableSubscriptionId,
+            boolean noLocal) throws Exception {
+        Destination destination = null;
+        if (topic) {
+            destination = session.createTopic(destinationName);
+            
+        } else {
+            destination = session.createQueue(destinationName);
+        }
+        return createMessageConsumer(session, destination, messageSelector, topic, durableSubscriptionId, noLocal);
+    }
+    
+    public static MessageConsumer createMessageConsumer(
+            Session session, 
+            Destination destination, 
+            String messageSelector, 
+            boolean topic, 
+            String durableSubscriptionId,
+            boolean noLocal) throws Exception {
+        MessageConsumer messageConsumer = null;
+        
+        if (topic) {
+            TopicSession ts = (TopicSession)session;
+            if (ObjectHelper.isNotEmpty(durableSubscriptionId)) {
+                if (ObjectHelper.isNotEmpty(messageSelector)) {
+                    messageConsumer = ts.createDurableSubscriber((Topic)destination, durableSubscriptionId,
+                                                                 messageSelector, noLocal);
+                } else {
+                    messageConsumer = ts.createDurableSubscriber((Topic)destination, durableSubscriptionId);
+                }
+
+            } else {
+                if (ObjectHelper.isNotEmpty(messageSelector)) {
+                    messageConsumer = ts.createSubscriber((Topic)destination, messageSelector, noLocal);
+                } else {
+                    messageConsumer = ts.createSubscriber((Topic)destination);
+                }
+            }
+        } else {
+            if (ObjectHelper.isNotEmpty(messageSelector)) {
+                messageConsumer = session.createConsumer(destination, messageSelector); 
+            } else {
+                messageConsumer = session.createConsumer(destination);
+            }
+        }
+        return messageConsumer;
+    }
+    
+    public static MessageProducer createQueueProducer(
+            Session session, 
+            String destinationName) throws Exception {
+        return createMessageProducer(session, destinationName, false, true, -1);
+    }
+    
+    public static MessageProducer createTopicProducer(
+            Session session, 
+            String destinationName) throws Exception {
+        return createMessageProducer(session, destinationName, true, false, -1);
+    }
+    
+    public static MessageProducer createMessageProducer(
+            Session session, 
+            String destinationName, 
+            boolean topic,
+            boolean persitent,
+            long ttl) throws Exception {
+        MessageProducer messageProducer = null;
+        Destination destination = null;
+        if (topic) {
+            if (destinationName.startsWith("topic://")) {
+                destinationName = destinationName.substring("topic://".length());
+            }
+            destination = session.createTopic(destinationName);
+        } else {
+            if (destinationName.startsWith("queue://")) {
+                destinationName = destinationName.substring("queue://".length());
+            }
+            destination = session.createQueue(destinationName);
+        }
+        messageProducer = session.createProducer(destination);
+
+        if (persitent) {
+            messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
+        } else {
+            messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+        }
+        if (ttl > 0) {
+            messageProducer.setTimeToLive(ttl);
+        }
+        return messageProducer;
+    }
+}

Added: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/SessionAcknowledgementType.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/SessionAcknowledgementType.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/SessionAcknowledgementType.java (added)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/SessionAcknowledgementType.java Thu Aug  2 10:56:40 2012
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.jms;
+
+import javax.jms.Session;
+
+/**
+ * Session acknowledge enum keys
+ * 
+ */
+public enum SessionAcknowledgementType {
+    AUTO_ACKNOWLEDGE(Session.AUTO_ACKNOWLEDGE), 
+    CLIENT_ACKNOWLEDGE(Session.CLIENT_ACKNOWLEDGE), 
+    DUPS_OK_ACKNOWLEDGE(Session.DUPS_OK_ACKNOWLEDGE), 
+    SESSION_TRANSACTED(Session.SESSION_TRANSACTED);
+    
+    private int intValue = -1;
+    
+    private SessionAcknowledgementType(int intValue) {
+        this.intValue = intValue;
+    }
+    
+    public int intValue() {
+        return this.intValue;
+    }
+}

Added: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/pool/DefaultConnectionResource.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/pool/DefaultConnectionResource.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/pool/DefaultConnectionResource.java (added)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/pool/DefaultConnectionResource.java Thu Aug  2 10:56:40 2012
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.pool;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+
+import org.apache.camel.component.sjms.ConnectionResource;
+import org.apache.camel.util.ObjectHelper;
+
+/**
+ * TODO Add Class documentation for DefaultConnectionResource
+ * 
+ */
+public class DefaultConnectionResource extends ObjectPool<Connection> implements ConnectionResource {
+    private ConnectionFactory connectionFactory;
+    private String username;
+    private String password;
+    private String connectionId;
+
+    /**
+     * TODO Add Constructor Javadoc
+     *
+     */
+    public DefaultConnectionResource() {
+        super();
+    }
+
+    /**
+     * TODO Add Constructor Javadoc
+     * 
+     * @param poolSize
+     * @param connectionFactory
+     */
+    public DefaultConnectionResource(int poolSize, ConnectionFactory connectionFactory) {
+        this(poolSize, connectionFactory, null, null);
+    }
+
+    /**
+     * TODO Add Constructor Javadoc
+     * 
+     * @param poolSize
+     * @param connectionFactory
+     * @param username
+     * @param password
+     */
+    public DefaultConnectionResource(int poolSize, ConnectionFactory connectionFactory, String username, String password) {
+        super(poolSize);
+        this.connectionFactory = connectionFactory;
+        this.username = username;
+        this.password = password;
+    }
+
+    /**
+     * TODO Add Constructor Javadoc
+     * 
+     * @param poolSize
+     * @param connectionFactory
+     * @param username
+     * @param password
+     */
+    public DefaultConnectionResource(int poolSize, ConnectionFactory connectionFactory, String username, String password, String connectionId) {
+        super(poolSize);
+        this.connectionFactory = connectionFactory;
+        this.username = username;
+        this.password = password;
+        this.connectionId = connectionId;
+    }
+    
+    @Override
+    public Connection borrowConnection() throws Exception {
+        return this.borrowObject();
+    }
+    
+    @Override
+    public Connection borrowConnection(long timeout) throws Exception {
+        return this.borrowObject(timeout);
+    }
+    
+    @Override
+    public void returnConnection(Connection connection) throws Exception {
+        returnObject(connection);
+    }
+
+    @Override
+    protected Connection createObject() throws Exception {
+        Connection connection = null;
+        if (connectionFactory != null) {
+            if (getUsername() != null && getPassword() != null) {
+                connection = connectionFactory.createConnection(getUsername(), getPassword());
+            } else {
+                connection = connectionFactory.createConnection();
+            }
+        }
+        if (connection != null) {
+            if (ObjectHelper.isNotEmpty(getConnectionId())) {
+                connection.setClientID(getConnectionId());
+            }
+            connection.start();
+        }
+        return connection;
+    }
+    
+    @Override
+    protected void destroyObject(Connection connection) throws Exception {
+        if (connection != null) {
+            connection.stop();
+            connection.close();
+        }
+
+    }
+
+    public ConnectionFactory getConnectionFactory() {
+        return connectionFactory;
+    }
+
+    public void setConnectionFactory(ConnectionFactory connectionFactory) {
+        this.connectionFactory = connectionFactory;
+    }
+
+    public String getUsername() {
+        return username;
+    }
+
+    public void setUsername(String username) {
+        this.username = username;
+    }
+
+    public String getPassword() {
+        return password;
+    }
+
+    public void setPassword(String password) {
+        this.password = password;
+    }
+
+    public String getConnectionId() {
+        return connectionId;
+    }
+
+    public void setConnectionId(String connectionId) {
+        this.connectionId = connectionId;
+    }
+}

Added: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/pool/ObjectPool.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/pool/ObjectPool.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/pool/ObjectPool.java (added)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/pool/ObjectPool.java Thu Aug  2 10:56:40 2012
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.pool;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TODO Add Class documentation for ObjectPool
+ * 
+ */
+public abstract class ObjectPool<T> {
+
+    private static final int DEFAULT_POOL_SIZE = 1;
+    protected final Logger logger = LoggerFactory.getLogger(getClass());
+    private BlockingQueue<T> objects;
+    private int maxSize = DEFAULT_POOL_SIZE;
+    private AtomicInteger poolCount = new AtomicInteger();
+    private ReadWriteLock lock = new ReentrantReadWriteLock();
+
+    public ObjectPool() {
+        this(DEFAULT_POOL_SIZE);
+    }
+
+    public ObjectPool(int poolSize) {
+        this.maxSize = poolSize;
+    }
+
+    public void fillPool() {
+        objects = new ArrayBlockingQueue<T>(getMaxSize(), false);
+        for (int i = 0; i < maxSize; i++) {
+            try {
+                T t = createObject();
+                objects.add(t);
+                poolCount.incrementAndGet();
+            } catch (Exception e) {
+                logger.error("Unable to create Object and add it to the pool. Reason: "
+                                 + e.getLocalizedMessage(), e);
+            }
+        }
+    }
+
+    public void drainPool() throws Exception {
+        getLock().writeLock().lock();
+        try {
+            while (!objects.isEmpty()) {
+                T t = objects.remove();
+                destroyObject(t);
+            }
+        } finally {
+            getLock().writeLock().unlock();
+        }
+    }
+
+    /**
+     * Implement to create new objects of type T when the pool is initialized
+     * empty.
+     * 
+     * @return
+     * @throws Exception
+     */
+    protected abstract T createObject() throws Exception;
+
+    /**
+     * Clean up pool objects
+     * 
+     * @return
+     * @throws Exception
+     */
+    protected abstract void destroyObject(T t) throws Exception;
+
+    /**
+     * @return
+     * @throws Exception
+     */
+    public T borrowObject() throws Exception {
+        return borrowObject(1000);
+    }
+
+    /**
+     * @return
+     * @throws Exception
+     */
+    public T borrowObject(long timeout) throws Exception {
+        T t = null;
+        getLock().writeLock().lock();
+        try {
+            t = objects.poll(timeout, TimeUnit.MILLISECONDS);
+        } finally {
+            getLock().writeLock().unlock();
+        }
+        return t;
+    }
+
+    /**
+     * @param object
+     * @throws Exception
+     */
+    public void returnObject(T object) throws Exception {
+        this.objects.add(object);
+    }
+
+    /**
+     * @return
+     */
+    int size() {
+        return objects.size();
+    }
+
+    /**
+     * Gets the ReadWriteLock value of lock for this instance of ObjectPool.
+     * 
+     * @return the lock
+     */
+    protected ReadWriteLock getLock() {
+        return lock;
+    }
+
+    /**
+     * Gets the int value of maxSize for this instance of ObjectPool.
+     * 
+     * @return the maxSize
+     */
+    public int getMaxSize() {
+        return maxSize;
+    }
+}

Added: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/pool/SessionPool.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/pool/SessionPool.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/pool/SessionPool.java (added)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/pool/SessionPool.java Thu Aug  2 10:56:40 2012
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.pool;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Session;
+import javax.jms.XASession;
+import javax.transaction.xa.XAResource;
+
+import org.apache.camel.component.sjms.ConnectionResource;
+import org.apache.camel.component.sjms.jms.SessionAcknowledgementType;
+
+/**
+ * TODO Add Class documentation for SessionPool
+ * 
+ */
+public class SessionPool extends ObjectPool<Session> {
+
+    private ConnectionResource connectionResource;
+    private boolean transacted;
+    private SessionAcknowledgementType acknowledgeMode = SessionAcknowledgementType.AUTO_ACKNOWLEDGE;
+
+    /**
+     * TODO Add Constructor Javadoc
+     *
+     */
+    public SessionPool(int poolSize, ConnectionResource connectionResource) {
+        super(poolSize);
+        this.connectionResource = connectionResource;
+    }
+
+    /**
+     * TODO Add Constructor Javadoc
+     *
+     * @param poolSize
+     */
+    public SessionPool(int poolSize) {
+        super(poolSize);
+    }
+    
+    @Override
+    protected Session createObject() throws Exception {
+        Session session = null;
+        final Connection connection = getConnectionResource().borrowConnection(5000);
+        if (connection != null) {
+            if (transacted) {
+                session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+            } else {
+                switch (acknowledgeMode) {
+                case CLIENT_ACKNOWLEDGE:
+                    session = connection.createSession(transacted, Session.CLIENT_ACKNOWLEDGE);
+                    break;
+                case DUPS_OK_ACKNOWLEDGE:
+                    session = connection.createSession(transacted, Session.DUPS_OK_ACKNOWLEDGE);
+                    break;
+                case AUTO_ACKNOWLEDGE:
+                    session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+                    break;
+                default:
+                    // do nothing here.
+                }
+            }
+        }
+        getConnectionResource().returnConnection(connection);
+        return session;
+    }
+    
+    @Override
+    protected void destroyObject(Session session) throws Exception {
+     // lets reset the session
+        session.setMessageListener(null);
+
+        if (transacted) {
+            try {
+                session.rollback();
+            } catch (JMSException e) {
+                logger.warn("Caught exception trying rollback() when putting session back into the pool, will invalidate. " + e, e);
+            }
+        }
+        if (session != null) {
+            session.close();
+            session = null;
+        }
+    }
+
+    /**
+     * Gets the SessionAcknowledgementType value of acknowledgeMode for this instance of SessionPool.
+     *
+     * @return the DEFAULT_ACKNOWLEDGE_MODE
+     */
+    public final SessionAcknowledgementType getAcknowledgeMode() {
+        return acknowledgeMode;
+    }
+
+    /**
+     * Sets the SessionAcknowledgementType value of acknowledgeMode for this instance of SessionPool.
+     *
+     * @param acknowledgeMode Sets SessionAcknowledgementType, default is AUTO_ACKNOWLEDGE
+     */
+    public final void setAcknowledgeMode(SessionAcknowledgementType acknowledgeMode) {
+        this.acknowledgeMode = acknowledgeMode;
+    }
+
+    /**
+     * Gets the boolean value of transacted for this instance of SessionPool.
+     *
+     * @return the transacted
+     */
+    public final boolean isTransacted() {
+        return transacted;
+    }
+
+    /**
+     * Sets the boolean value of transacted for this instance of SessionPool.
+     *
+     * @param transacted Sets boolean, default is TODO add default
+     */
+    public final void setTransacted(boolean transacted) {
+        this.transacted = transacted;
+    }
+
+    /**
+     * Gets the DefaultConnectionResource value of connectionResource for this instance of SessionPool.
+     *
+     * @return the connectionResource
+     */
+    public ConnectionResource getConnectionResource() {
+        return connectionResource;
+    }
+
+    protected XAResource createXaResource(XASession session) throws JMSException {
+        return session.getXAResource();
+    }
+    
+    
+//    protected class Synchronization implements javax.transaction.Synchronization {
+//        private final XASession session;
+//
+//        private Synchronization(XASession session) {
+//            this.session = session;
+//        }
+//
+//        public void beforeCompletion() {
+//        }
+//        
+//        public void afterCompletion(int status) {
+//            try {
+//                // This will return session to the pool.
+//                session.setIgnoreClose(false);
+//                session.close();
+//                session.setIsXa(false);
+//            } catch (JMSException e) {
+//                throw new RuntimeException(e);
+//            }
+//        }
+//    }
+}

Added: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java (added)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java Thu Aug  2 10:56:40 2012
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.producer;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.component.sjms.SjmsEndpoint;
+import org.apache.camel.component.sjms.SjmsProducer;
+import org.apache.camel.component.sjms.jms.JmsMessageHelper;
+import org.apache.camel.component.sjms.jms.JmsObjectFactory;
+import org.apache.camel.component.sjms.tx.SessionTransactionSynchronization;
+
+/**
+ * The InOnlyProducer is responsible for publishing messages to the JMS
+ * {@link Destination} for the value specified in the destinationName field.
+ */
+public class InOnlyProducer extends SjmsProducer {
+    
+    public InOnlyProducer(SjmsEndpoint endpoint) {
+        super(endpoint);
+    }
+    
+    /*
+     * @see org.apache.camel.component.sjms.SjmsProducer#doCreateProducerModel()
+     *
+     * @return
+     * @throws Exception
+     */
+    public MessageProducerResources doCreateProducerModel() throws Exception {
+        Connection conn = getConnectionResource().borrowConnection();
+        Session session = null;
+        if (isEndpointTransacted()) {
+            session = conn.createSession(true, getAcknowledgeMode());
+        } else {
+            session = conn.createSession(false, getAcknowledgeMode());
+        }
+        MessageProducer messageProducer = null;
+        if (isTopic()) {
+            messageProducer = JmsObjectFactory.createMessageProducer(session, getDestinationName(), isTopic(), isPersistent(), getTtl());
+        } else {
+            messageProducer = JmsObjectFactory.createQueueProducer(session, getDestinationName());
+        }
+        getConnectionResource().returnConnection(conn);
+        return new MessageProducerResources(session, messageProducer);
+    }
+    
+    @Override
+    public void sendMessage(Exchange exchange, AsyncCallback callback) throws Exception {
+        if (getProducers() != null) {
+            MessageProducerResources producer = getProducers().borrowObject();
+            
+            if (isEndpointTransacted()) {
+                exchange.getUnitOfWork().addSynchronization(new SessionTransactionSynchronization(producer.getSession()));
+            }
+            
+            Message message = JmsMessageHelper.createMessage(exchange, producer.getSession());
+            producer.getMessageProducer().send(message);
+            getProducers().returnObject(producer);
+            callback.done(isSynchronous());
+        }
+    }
+}

Added: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java (added)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java Thu Aug  2 10:56:40 2012
@@ -0,0 +1,381 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.producer;
+
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Exchanger;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelException;
+import org.apache.camel.Exchange;
+import org.apache.camel.component.sjms.SjmsEndpoint;
+import org.apache.camel.component.sjms.SjmsProducer;
+import org.apache.camel.component.sjms.jms.JmsMessageHelper;
+import org.apache.camel.component.sjms.jms.JmsObjectFactory;
+import org.apache.camel.component.sjms.pool.ObjectPool;
+import org.apache.camel.component.sjms.tx.SessionTransactionSynchronization;
+import org.apache.camel.util.ObjectHelper;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TODO Add Class documentation for InOutProducer
+ *
+ */
+public class InOutProducer extends SjmsProducer {
+
+    /**
+     * We use the {@link ReadWriteLock} to manage the {@link TreeMap} in place
+     * of a {@link ConcurrentMap} because due to significant performance gains.
+     * 
+     * TODO Externalize the Exchanger Map to a store object
+     */
+    private static Map<String, Exchanger<Object>> exchangerMap = new TreeMap<String, Exchanger<Object>>();
+    private ReadWriteLock lock = new ReentrantReadWriteLock();
+    
+    
+    /**
+     * A pool of {@link MessageConsumerResource} objects that are the
+     * reply consumers.
+     * 
+     * TODO Add Class documentation for MessageProducerPool
+     * TODO Externalize
+     *
+     */
+    protected class MessageConsumerPool extends ObjectPool<MessageConsumerResource> {
+
+        /**
+         * TODO Add Constructor Javadoc
+         *
+         * @param poolSize
+         */
+        public MessageConsumerPool(int poolSize) {
+            super(poolSize);
+        }
+
+        @Override
+        protected MessageConsumerResource createObject() throws Exception {
+            Connection conn = getConnectionResource().borrowConnection();
+            Session session = null;
+            if (isEndpointTransacted()) {
+                session = conn.createSession(true, Session.SESSION_TRANSACTED);
+            } else {
+                session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            }
+            getConnectionResource().returnConnection(conn);
+            Destination replyToDestination = null;
+            if (ObjectHelper.isEmpty(getNamedReplyTo())) {
+                replyToDestination = JmsObjectFactory.createTemporaryDestination(session, isTopic());
+            } else {
+                replyToDestination = JmsObjectFactory.createDestination(session, getNamedReplyTo(), isTopic());
+            }
+            MessageConsumer messageConsumer = JmsObjectFactory.createMessageConsumer(session, replyToDestination, null, isTopic(), null, true);
+            messageConsumer.setMessageListener(new MessageListener() {
+
+                @Override
+                public void onMessage(Message message) {
+                    logger.info("Message Received in the Consumer Pool");
+                    logger.info("  Message : {}", message);
+                    try {
+                        Exchanger<Object> exchanger = exchangerMap.get(message.getJMSCorrelationID());
+                        exchanger.exchange(message, getResponseTimeOut(), TimeUnit.MILLISECONDS);
+                    } catch (Exception e) {
+                        ObjectHelper.wrapRuntimeCamelException(e);
+                    }
+                    
+                }
+            });
+            MessageConsumerResource mcm = new MessageConsumerResource(session, messageConsumer, replyToDestination);
+            return mcm;
+        }
+        
+        @Override
+        protected void destroyObject(MessageConsumerResource model) throws Exception {
+            if (model.getMessageConsumer() != null) {
+                model.getMessageConsumer().close();
+            }
+            
+            if (model.getSession() != null) {
+                if (model.getSession().getTransacted()) {
+                    try {
+                        model.getSession().rollback();
+                    } catch (Exception e) {
+                        // Do nothing.  Just make sure we are cleaned up
+                    }
+                }
+                model.getSession().close();
+            }
+        }
+    }
+    
+    /**
+     * TODO Add Class documentation for MessageConsumerResource
+     */
+    protected class MessageConsumerResource {
+        private final Session session;
+        private final MessageConsumer messageConsumer;
+        private final Destination replyToDestination;
+
+        /**
+         * TODO Add Constructor Javadoc
+         * 
+         * @param session
+         * @param messageConsumer
+         */
+        public MessageConsumerResource(Session session, MessageConsumer messageConsumer, Destination replyToDestination) {
+            super();
+            this.session = session;
+            this.messageConsumer = messageConsumer;
+            this.replyToDestination = replyToDestination;
+        }
+
+        public Session getSession() {
+            return session;
+        }
+        
+        public MessageConsumer getMessageConsumer() {
+            return messageConsumer;
+        }
+
+        public Destination getReplyToDestination() {
+            return replyToDestination;
+        }
+    }
+
+    protected class InOutResponseContainer {
+        private final Exchange exchange;
+        private final AsyncCallback callback;
+
+        /**
+         * 
+         * @param exchange
+         * @param callback
+         */
+        public InOutResponseContainer(Exchange exchange, AsyncCallback callback) {
+            super();
+            this.exchange = exchange;
+            this.callback = callback;
+        }
+
+        public Exchange getExchange() {
+            return exchange;
+        }
+
+        public AsyncCallback getCallback() {
+            return callback;
+        }
+    }
+    
+    
+    protected class InternalTempDestinationListener implements MessageListener {
+        private final Logger tempLogger = LoggerFactory.getLogger(InternalTempDestinationListener.class);
+        private Exchanger<Object> exchanger;
+
+        /**
+         * TODO Add Constructor Javadoc
+         * 
+         * @param exchanger
+         */
+        public InternalTempDestinationListener(Exchanger<Object> exchanger) {
+            super();
+            this.exchanger = exchanger;
+        }
+
+        @Override
+        public void onMessage(Message message) {
+            if (tempLogger.isDebugEnabled()) {
+                tempLogger.debug("Message Received in the Consumer Pool");
+                tempLogger.debug("  Message : {}", message);
+            }
+            try {
+                exchanger.exchange(message, getResponseTimeOut(), TimeUnit.MILLISECONDS);
+            } catch (Exception e) {
+                ObjectHelper.wrapRuntimeCamelException(e);
+            }
+
+        }
+    }
+    
+    private MessageConsumerPool consumers;
+    
+    public InOutProducer(SjmsEndpoint endpoint) {
+        super(endpoint);
+        endpoint.getConsumerCount();
+    }
+    
+    @Override
+    protected void doStart() throws Exception {
+        if (ObjectHelper.isEmpty(getNamedReplyTo())) {
+            if (log.isDebugEnabled()) {
+                log.debug("No reply to destination is defined.  Using temporary destinations.");
+            }
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug("Using {} as the reply to destination.", getNamedReplyTo());
+            }
+        }
+        if (getConsumers() == null) {
+            setConsumers(new MessageConsumerPool(getConsumerCount()));
+            getConsumers().fillPool();
+        }
+        super.doStart();
+    }
+    
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+        if (getConsumers() != null) {
+            getConsumers().drainPool();
+            setConsumers(null);
+        }
+    }
+    
+    @Override
+    public MessageProducerResources doCreateProducerModel() throws Exception {
+        Connection conn = getConnectionResource().borrowConnection();
+        Session session = null;
+        if (isEndpointTransacted()) {
+            session = conn.createSession(true, getAcknowledgeMode());
+        } else {
+            session = conn.createSession(false, getAcknowledgeMode());
+        }
+        MessageProducer messageProducer = null;
+        if (isTopic()) {
+            messageProducer = JmsObjectFactory.createMessageProducer(session, getDestinationName(), isTopic(), isPersistent(), getTtl());
+        } else {
+            messageProducer = JmsObjectFactory.createQueueProducer(session, getDestinationName());
+        }
+        getConnectionResource().returnConnection(conn);
+        return new MessageProducerResources(session, messageProducer);
+    }
+    
+    /** 
+     * TODO Add override javadoc
+     * TODO time out is actually double as it waits for the producer and then waits for the response.  Use an atomiclong to manage the countdown
+     *
+     * @see org.apache.camel.component.sjms.SjmsProducer#sendMessage(org.apache.camel.Exchange, org.apache.camel.AsyncCallback)
+     *
+     * @param exchange
+     * @param callback
+     * @throws Exception
+     */
+    @Override
+    public void sendMessage(final Exchange exchange, final AsyncCallback callback) throws Exception {
+        if (getProducers() != null) {
+            MessageProducerResources producer = null;
+            try {
+                producer = getProducers().borrowObject(getResponseTimeOut());
+                // producer = getProducers().borrowObject();
+            } catch (Exception e1) {
+                log.warn("The producer pool is exhausted.  Consider setting producerCount to a higher value or disable the fixed size of the pool by setting fixedResourcePool=false.");
+                exchange.setException(new Exception("Producer Resource Pool is exhausted"));
+            }
+            if (producer != null) {
+
+                if (isEndpointTransacted()) {
+                    exchange.getUnitOfWork()
+                        .addSynchronization(new SessionTransactionSynchronization(producer.getSession()));
+                }
+
+                Message request = JmsMessageHelper.createMessage(exchange, producer.getSession());
+                // TODO just set the correlation id don't get it from the
+                // message
+                String correlationId = null;
+                if (exchange.getIn().getHeader("JMSCorrelationID", String.class) == null) {
+                    correlationId = UUID.randomUUID().toString().replace("-", "");
+                } else {
+                    correlationId = exchange.getIn().getHeader("JMSCorrelationID", String.class);
+                }
+                Object responseObject = null;
+                Exchanger<Object> messageExchanger = new Exchanger<Object>();
+                JmsMessageHelper.setCorrelationId(request, correlationId);
+                try {
+                    lock.writeLock().lock();
+                    exchangerMap.put(correlationId, messageExchanger);
+                } finally {
+                    lock.writeLock().unlock();
+                }
+
+                MessageConsumerResource consumer = consumers.borrowObject(getResponseTimeOut());
+                JmsMessageHelper.setJMSReplyTo(request, consumer.getReplyToDestination());
+                consumers.returnObject(consumer);
+                producer.getMessageProducer().send(request);
+
+                try {
+                    getProducers().returnObject(producer);
+                } catch (Exception exception) {
+                    // thrown if the pool is full. safe to ignore.
+                }
+
+                try {
+                    responseObject = messageExchanger.exchange(null, getResponseTimeOut(),
+                                                               TimeUnit.MILLISECONDS);
+
+                    try {
+                        lock.writeLock().lock();
+                        exchangerMap.remove(correlationId);
+                    } finally {
+                        lock.writeLock().unlock();
+                    }
+                } catch (InterruptedException e) {
+                    log.debug("Exchanger was interrupted while waiting on response", e);
+                    exchange.setException(e);
+                } catch (TimeoutException e) {
+                    log.debug("Exchanger timed out while waiting on response", e);
+                    exchange.setException(e);
+                }
+
+                if (exchange.getException() == null) {
+                    if (responseObject instanceof Throwable) {
+                        exchange.setException((Throwable)responseObject);
+                    } else if (responseObject instanceof Message) {
+                        Message response = (Message)responseObject;
+                        JmsMessageHelper.populateExchange(response, exchange, true);
+                    } else {
+                        exchange.setException(new CamelException("Unknown response type: " + responseObject));
+                    }
+                }
+            }
+
+            callback.done(isSynchronous());
+        }
+    }
+
+    public void setConsumers(MessageConsumerPool consumers) {
+        this.consumers = consumers;
+    }
+
+    public MessageConsumerPool getConsumers() {
+        return consumers;
+    }
+}

Added: camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/SessionTransactionSynchronization.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/SessionTransactionSynchronization.java?rev=1368413&view=auto
==============================================================================
--- camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/SessionTransactionSynchronization.java (added)
+++ camel/trunk/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/tx/SessionTransactionSynchronization.java Thu Aug  2 10:56:40 2012
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms.tx;
+
+import javax.jms.JMSException;
+import javax.jms.Session;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.spi.Synchronization;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TODO Add Class documentation for SessionTransactionSynchronization
+ *
+ */
+public class SessionTransactionSynchronization implements Synchronization {
+    private Logger log = LoggerFactory.getLogger(getClass());
+    private Session session;
+    
+    public SessionTransactionSynchronization(Session session) {
+        this.session = session;
+    }
+    
+    /*
+     * @see org.apache.camel.spi.Synchronization#onFailure(org.apache.camel.Exchange)
+     *
+     * @param exchange
+     */
+    @Override
+    public void onFailure(Exchange exchange) {
+        if (log.isDebugEnabled()) {
+            log.debug("Processing failure of Exchange id:{}", exchange.getExchangeId());
+        }
+        try {
+            if (session != null && session.getTransacted()) {
+                this.session.rollback();
+            }
+        } catch (JMSException e) {
+            log.warn("Failed to rollback the session: {}", e.getMessage());
+        }
+    }
+    
+    /*
+     * @see org.apache.camel.spi.Synchronization#onComplete(org.apache.camel.Exchange)
+     *
+     * @param exchange
+     */
+    @Override
+    public void onComplete(Exchange exchange) {
+        if (log.isDebugEnabled()) {
+            log.debug("Processing completion of Exchange id:{}", exchange.getExchangeId());
+        }
+        try {
+            if (session != null && session.getTransacted()) {
+                this.session.commit();
+            }
+        } catch (JMSException e) {
+            log.warn("Failed to commit the session: {}", e.getMessage());
+            exchange.setException(e);
+        }
+    }
+
+}