You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/07/25 09:31:31 UTC

svn commit: r978995 [1/2] - in /camel/trunk/components: camel-jms/src/main/java/org/apache/camel/component/jms/ camel-jms/src/main/java/org/apache/camel/component/jms/reply/ camel-jms/src/main/java/org/apache/camel/component/jms/requestor/ camel-jms/sr...

Author: davsclaus
Date: Sun Jul 25 07:31:29 2010
New Revision: 978995

URL: http://svn.apache.org/viewvc?rev=978995&view=rev
Log:
CAMEL-2970: JmsProducer supports non blocking async routing engine for InOut Exchanges (request-reply over JMS).

Added:
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/MessageSentCallback.java   (with props)
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationMap.java   (with props)
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/MessageSelectorCreator.java   (contents, props changed)
      - copied, changed from r966966, camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/MessageSelectorProvider.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyHandler.java   (with props)
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java   (with props)
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHandler.java   (with props)
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHolder.java   (with props)
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManager.java   (with props)
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java   (with props)
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyHandler.java   (with props)
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java   (with props)
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/UnknownReplyMessageException.java   (with props)
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/UseMessageIdAsCorrelationIdMessageSentCallback.java   (with props)
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/package.html   (with props)
    camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/async/
    camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/async/AsyncJmsInOutTest.java   (with props)
    camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/async/AsyncJmsProducerTest.java   (with props)
    camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsInOutUseMessageIDasCorrelationIDTest.java
      - copied, changed from r966501, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsInOutIssueTest.java
Removed:
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/
Modified:
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
    camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java
    camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteRequestReplyTest.java
    camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsInOutIssueTest.java
    camel/trunk/components/camel-jms/src/test/resources/log4j.properties
    camel/trunk/components/camel-test/src/main/java/org/apache/camel/test/CamelTestSupport.java
    camel/trunk/components/camel-test/src/main/java/org/apache/camel/test/junit4/CamelTestSupport.java

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java?rev=978995&r1=978994&r2=978995&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java Sun Jul 25 07:31:29 2010
@@ -67,7 +67,7 @@ public class EndpointMessageListener imp
         LOG.trace("onMessage START");
 
         if (LOG.isDebugEnabled()) {
-            LOG.debug(endpoint + " consumer receiving JMS message: " + message);
+            LOG.debug(endpoint + " consumer received JMS message: " + message);
         }
 
         RuntimeCamelException rce = null;
@@ -82,6 +82,12 @@ public class EndpointMessageListener imp
             if (LOG.isTraceEnabled()) {
                 LOG.trace("onMessage.process START");
             }
+
+            String correlationId = message.getJMSCorrelationID();
+            if (correlationId != null) {
+                LOG.debug("Received Message has JMSCorrelationID [" + correlationId + "]");
+            }
+            
             processor.process(exchange);
             if (LOG.isTraceEnabled()) {
                 LOG.trace("onMessage.process END");
@@ -292,19 +298,11 @@ public class EndpointMessageListener imp
         getTemplate().send(replyDestination, new MessageCreator() {
             public Message createMessage(Session session) throws JMSException {
                 Message reply = endpoint.getBinding().makeJmsMessage(exchange, out, session, cause);
-
-                if (endpoint.getConfiguration().isUseMessageIDAsCorrelationID()) {
-                    String messageID = exchange.getIn().getHeader("JMSMessageID", String.class);
-                    reply.setJMSCorrelationID(messageID);
-                } else {
-                    String correlationID = message.getJMSCorrelationID();
-                    if (correlationID != null) {
-                        reply.setJMSCorrelationID(correlationID);
-                    }
-                }
+                final String correlationID = determineCorrelationId(message);
+                reply.setJMSCorrelationID(correlationID);
 
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug(endpoint + " sending reply JMS message: " + reply);
+                    LOG.debug(endpoint + " sending reply JMS message [correlationId:" + correlationID + "]: " + reply);
                 }
                 return reply;
             }
@@ -318,7 +316,9 @@ public class EndpointMessageListener imp
             try {
                 destination = message.getJMSReplyTo();
             } catch (JMSException e) {
-                LOG.trace("Cannot read JMSReplyTo header. Will ignore this exception.", e);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Cannot read JMSReplyTo header. Will ignore this exception.", e);
+                }
             }
         }
         return destination;

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java?rev=978995&r1=978994&r2=978995&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java Sun Jul 25 07:31:29 2010
@@ -17,19 +17,16 @@
 package org.apache.camel.component.jms;
 
 import java.util.Map;
-import java.util.concurrent.ScheduledExecutorService;
 import javax.jms.ConnectionFactory;
 import javax.jms.ExceptionListener;
 import javax.jms.Session;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
-import org.apache.camel.component.jms.requestor.Requestor;
 import org.apache.camel.impl.DefaultComponent;
 import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.spi.HeaderFilterStrategyAware;
 import org.apache.camel.util.CastUtils;
-import org.apache.camel.util.EndpointHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -54,13 +51,10 @@ import static org.apache.camel.util.Obje
 public class JmsComponent extends DefaultComponent implements ApplicationContextAware, HeaderFilterStrategyAware {
 
     private static final transient Log LOG = LogFactory.getLog(JmsComponent.class);
-    private static final int DEFAULT_THREADPOOL_SIZE = 100;
     private static final String DEFAULT_QUEUE_BROWSE_STRATEGY = "org.apache.camel.component.jms.DefaultQueueBrowseStrategy";
     private static final String KEY_FORMAT_STRATEGY_PARAM = "jmsKeyFormatStrategy";
-    private ScheduledExecutorService scheduledExecutorService;
     private JmsConfiguration configuration;
     private ApplicationContext applicationContext;
-    private Requestor requestor;
     private QueueBrowseStrategy queueBrowseStrategy;
     private boolean attemptedToCreateQueueBrowserStrategy;
     private HeaderFilterStrategy headerFilterStrategy = new JmsHeaderFilterStrategy();
@@ -341,30 +335,6 @@ public class JmsComponent extends Defaul
         getConfiguration().setDestinationResolver(destinationResolver);
     }
 
-    public synchronized Requestor getRequestor() throws Exception {
-        if (requestor == null) {
-            requestor = new Requestor(getConfiguration(), getScheduledExecutorService());
-            requestor.start();
-        }
-        return requestor;
-    }
-
-    public void setRequestor(Requestor requestor) {
-        this.requestor = requestor;
-    }
-
-    public synchronized ScheduledExecutorService getScheduledExecutorService() {
-        if (scheduledExecutorService == null) {
-            scheduledExecutorService = getCamelContext().getExecutorServiceStrategy()
-                    .newScheduledThreadPool(this, "JmsComponent", DEFAULT_THREADPOOL_SIZE);
-        }
-        return scheduledExecutorService;
-    }
-
-    public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
-        this.scheduledExecutorService = scheduledExecutorService;
-    }
-
     public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
         this.applicationContext = applicationContext;
     }
@@ -401,9 +371,6 @@ public class JmsComponent extends Defaul
 
     @Override
     protected void doStop() throws Exception {
-        if (requestor != null) {
-            requestor.stop();
-        }
         super.doStop();
     }
 

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java?rev=978995&r1=978994&r2=978995&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java Sun Jul 25 07:31:29 2010
@@ -124,12 +124,6 @@ public class JmsConfiguration implements
     private boolean useMessageIDAsCorrelationID;
     private JmsProviderMetadata providerMetadata = new JmsProviderMetadata();
     private JmsOperations metadataJmsOperations;
-    // defines the component created temporary replyTo destination sharing strategy:
-    // possible values are: "component", "endpoint", "producer"
-    // component - a single temp queue is shared among all producers for a given component instance
-    // endpoint - a single temp queue is shared among all producers for a given endpoint instance
-    // producer - a single temp queue is created per producer
-    private String replyToTempDestinationAffinity = REPLYTO_TEMP_DEST_AFFINITY_PER_ENDPOINT;
     private String replyToDestination;
     private String replyToDestinationSelectorName;
     private JmsMessageType jmsMessageType;
@@ -157,10 +151,6 @@ public class JmsConfiguration implements
     }
 
 
-    public static interface MessageSentCallback {
-        void sent(Message message);
-    }
-
     public static class CamelJmsTemplate extends JmsTemplate {
         private JmsConfiguration config;
 
@@ -220,6 +210,9 @@ public class JmsConfiguration implements
             try {
                 message = messageCreator.createMessage(session);
                 doSend(producer, message);
+                if (message != null && callback != null) {
+                    callback.sent(message, destination);
+                }
                 // Check commit - avoid commit call within a JTA transaction.
                 if (session.getTransacted() && isSessionLocallyTransacted(session)) {
                     // Transacted session created by this template -> commit.
@@ -228,9 +221,6 @@ public class JmsConfiguration implements
             } finally {
                 JmsUtils.closeMessageProducer(producer);
             }
-            if (message != null && callback != null) {
-                callback.sent(message);
-            }
             return null;
         }
 
@@ -349,6 +339,9 @@ public class JmsConfiguration implements
                     logger.debug("Sending JMS message to: " + producer.getDestination() + " with message: " + message);
                 }
                 doSend(producer, message);
+                if (message != null && callback != null) {
+                    callback.sent(message, destination);
+                }
                 // Check commit - avoid commit call within a JTA transaction.
                 if (session.getTransacted() && isSessionLocallyTransacted(session)) {
                     // Transacted session created by this template -> commit.
@@ -357,9 +350,6 @@ public class JmsConfiguration implements
             } finally {
                 JmsUtils.closeMessageProducer(producer);
             }
-            if (message != null && callback != null) {
-                callback.sent(message);
-            }
             return null;
         }
 
@@ -516,6 +506,7 @@ public class JmsConfiguration implements
 
     // Properties
     // -------------------------------------------------------------------------
+
     public ConnectionFactory getConnectionFactory() {
         if (connectionFactory == null) {
             connectionFactory = createConnectionFactory();
@@ -863,10 +854,12 @@ public class JmsConfiguration implements
      * <p/>
      * By default this is false as you need to commit the outgoing request before you can consume the input
      */
+    @Deprecated
     public boolean isTransactedInOut() {
         return transactedInOut;
     }
 
+    @Deprecated
     public void setTransactedInOut(boolean transactedInOut) {
         this.transactedInOut = transactedInOut;
     }
@@ -1231,14 +1224,6 @@ public class JmsConfiguration implements
         this.useMessageIDAsCorrelationID = useMessageIDAsCorrelationID;
     }
 
-    public String getReplyToTempDestinationAffinity() {
-        return replyToTempDestinationAffinity;
-    }
-
-    public void setReplyToTempDestinationAffinity(String replyToTempDestinationAffinity) {
-        this.replyToTempDestinationAffinity = replyToTempDestinationAffinity;
-    }
-
     public long getRequestTimeout() {
         return requestTimeout;
     }

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?rev=978995&r1=978994&r2=978995&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java Sun Jul 25 07:31:29 2010
@@ -16,6 +16,10 @@
  */
 package org.apache.camel.component.jms;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
@@ -32,12 +36,17 @@ import org.apache.camel.ExchangePattern;
 import org.apache.camel.MultipleConsumersSupport;
 import org.apache.camel.PollingConsumer;
 import org.apache.camel.Processor;
-import org.apache.camel.component.jms.requestor.Requestor;
+import org.apache.camel.Service;
+import org.apache.camel.component.jms.reply.PersistentQueueReplyManager;
+import org.apache.camel.component.jms.reply.ReplyHolder;
+import org.apache.camel.component.jms.reply.ReplyManager;
+import org.apache.camel.component.jms.reply.TemporaryQueueReplyManager;
 import org.apache.camel.impl.DefaultEndpoint;
 import org.apache.camel.impl.DefaultExchange;
 import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.spi.HeaderFilterStrategyAware;
 import org.apache.camel.spi.ManagementAware;
+import org.apache.camel.util.ServiceHelper;
 import org.springframework.core.task.TaskExecutor;
 import org.springframework.jms.core.JmsOperations;
 import org.springframework.jms.listener.AbstractMessageListenerContainer;
@@ -53,7 +62,7 @@ import org.springframework.transaction.P
  * @version $Revision:520964 $
  */
 @ManagedResource(description = "Managed JMS Endpoint")
-public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategyAware, ManagementAware<JmsEndpoint>, MultipleConsumersSupport {
+public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategyAware, ManagementAware<JmsEndpoint>, MultipleConsumersSupport, Service {
     private HeaderFilterStrategy headerFilterStrategy;
     private boolean pubSubDomain;
     private JmsBinding binding;
@@ -61,8 +70,10 @@ public class JmsEndpoint extends Default
     private Destination destination;
     private String selector;
     private JmsConfiguration configuration;
-    private Requestor requestor;
-    private ScheduledExecutorService requestorExecutorService;
+    private final Map<String, ReplyManager> replyToReplyManager = new HashMap<String, ReplyManager>();
+    private ReplyManager replyManager;
+    // scheduled executor to check for timeout (reply not received)
+    private ScheduledExecutorService replyManagerExecutorService;
 
     public JmsEndpoint() {
         this(null, null);
@@ -284,16 +295,29 @@ public class JmsEndpoint extends Default
         return true;
     }
 
-    public synchronized Requestor getRequestor() throws Exception {
-        if (requestor == null) {
-            requestor = new Requestor(getConfiguration(), getRequestorExecutorService());
-            requestor.start();
+    public synchronized ReplyManager getReplyManager() throws Exception {
+        if (replyManager == null) {
+            // use a temporary queue
+            replyManager = new TemporaryQueueReplyManager();
+            replyManager.setEndpoint(this);
+            replyManager.setScheduledExecutorService(getReplyManagerExecutorService());
+            ServiceHelper.startService(replyManager);
         }
-        return requestor;
+        return replyManager;
     }
 
-    public void setRequestor(Requestor requestor) {
-        this.requestor = requestor;
+    public synchronized ReplyManager getReplyManager(String replyTo) throws Exception {
+        ReplyManager answer = replyToReplyManager.get(replyTo);
+        if (answer == null) {
+            // use a persistent queue
+            answer = new PersistentQueueReplyManager();
+            answer.setEndpoint(this);
+            answer.setScheduledExecutorService(getReplyManagerExecutorService());
+            ServiceHelper.startService(answer);
+            // remember this manager so we can re-use it
+            replyToReplyManager.put(replyTo, answer);
+        }
+        return answer;
     }
 
     public boolean isPubSubDomain() {
@@ -343,11 +367,28 @@ public class JmsEndpoint extends Default
         return template;
     }
 
-    protected synchronized ScheduledExecutorService getRequestorExecutorService() {
-        if (requestorExecutorService == null) {
-            requestorExecutorService = getCamelContext().getExecutorServiceStrategy().newScheduledThreadPool(this, "JmsRequesterTimeoutTask", 1);
+    protected synchronized ScheduledExecutorService getReplyManagerExecutorService() {
+        if (replyManagerExecutorService == null) {
+            replyManagerExecutorService = getCamelContext().getExecutorServiceStrategy().newScheduledThreadPool(this, "JmsReplyManagerTimeoutChecker", 1);
+        }
+        return replyManagerExecutorService;
+    }
+
+    public void start() throws Exception {
+    }
+
+    public void stop() throws Exception {
+        if (replyManager != null) {
+            ServiceHelper.stopService(replyManager);
+            replyManager = null;
+        }
+
+        if (!replyToReplyManager.isEmpty()) {
+            for (ReplyManager replyManager : replyToReplyManager.values()) {
+                ServiceHelper.stopService(replyManager);
+            }
+            replyToReplyManager.clear();
         }
-        return requestorExecutorService;
     }
 
     // Delegated properties from the configuration
@@ -459,10 +500,6 @@ public class JmsEndpoint extends Default
         return getConfiguration().getReplyToDestinationSelectorName();
     }
 
-    public String getReplyToTempDestinationAffinity() {
-        return getConfiguration().getReplyToTempDestinationAffinity();
-    }
-
     public long getRequestMapPurgePollTimeMillis() {
         return getConfiguration().getRequestMapPurgePollTimeMillis();
     }
@@ -768,10 +805,6 @@ public class JmsEndpoint extends Default
         getConfiguration().setReplyToDestinationSelectorName(replyToDestinationSelectorName);
     }
 
-    public void setReplyToTempDestinationAffinity(String replyToTempDestinationAffinity) {
-        getConfiguration().setReplyToTempDestinationAffinity(replyToTempDestinationAffinity);
-    }
-
     public void setRequestMapPurgePollTimeMillis(long requestMapPurgePollTimeMillis) {
         getConfiguration().setRequestMapPurgePollTimeMillis(requestMapPurgePollTimeMillis);
     }
@@ -911,4 +944,6 @@ public class JmsEndpoint extends Default
         return super.createEndpointUri();
     }
 
+
+
 }

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java?rev=978995&r1=978994&r2=978995&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java Sun Jul 25 07:31:29 2010
@@ -125,4 +125,19 @@ public final class JmsMessageHelper {
         }
     }
 
+    /**
+     * Sets the correlation id on the JMS message.
+     * <p/>
+     * Will ignore exception thrown
+     *
+     * @param message  the JMS message
+     * @param correlationId the correlation id
+     */
+    public static void setCorrelationId(Message message, String correlationId) {
+        try {
+            message.setJMSCorrelationID(correlationId);
+        } catch (JMSException e) {
+            // ignore
+        }
+    }
 }

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java?rev=978995&r1=978994&r2=978995&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java Sun Jul 25 07:31:29 2010
@@ -16,11 +16,7 @@
  */
 package org.apache.camel.component.jms;
 
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
-
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
@@ -28,21 +24,19 @@ import javax.jms.Session;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
-import org.apache.camel.ExchangeTimedOutException;
 import org.apache.camel.FailedToCreateProducerException;
-import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.RuntimeExchangeException;
 import org.apache.camel.component.jms.JmsConfiguration.CamelJmsTemplate;
 import org.apache.camel.component.jms.JmsConfiguration.CamelJmsTemplate102;
-import org.apache.camel.component.jms.requestor.DeferredRequestReplyMap;
-import org.apache.camel.component.jms.requestor.DeferredRequestReplyMap.DeferredMessageSentCallback;
-import org.apache.camel.component.jms.requestor.PersistentReplyToRequestor;
-import org.apache.camel.component.jms.requestor.Requestor;
+import org.apache.camel.component.jms.reply.ReplyManager;
+import org.apache.camel.component.jms.reply.UseMessageIdAsCorrelationIdMessageSentCallback;
 import org.apache.camel.impl.DefaultAsyncProducer;
+import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.UuidGenerator;
 import org.apache.camel.util.ValueHolder;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+
 import org.springframework.jms.core.JmsOperations;
 import org.springframework.jms.core.MessageCreator;
 
@@ -51,97 +45,46 @@ import org.springframework.jms.core.Mess
  */
 public class JmsProducer extends DefaultAsyncProducer {
     private static final transient Log LOG = LogFactory.getLog(JmsProducer.class);
-    private RequestorAffinity affinity;
     private final JmsEndpoint endpoint;
+    private final AtomicBoolean started = new AtomicBoolean(false);
     private JmsOperations inOnlyTemplate;
     private JmsOperations inOutTemplate;
     private UuidGenerator uuidGenerator;
-    private DeferredRequestReplyMap deferredRequestReplyMap;
-    private Requestor requestor;
-    private AtomicBoolean started = new AtomicBoolean(false);
-
-    private enum RequestorAffinity {
-        PER_COMPONENT(0),
-        PER_ENDPOINT(1),
-        PER_PRODUCER(2);
-        private int value;
-        private RequestorAffinity(int value) {
-            this.value = value;
-        }
-    }
+    private ReplyManager replyManager;
 
     public JmsProducer(JmsEndpoint endpoint) {
         super(endpoint);
         this.endpoint = endpoint;
-        JmsConfiguration c = endpoint.getConfiguration();
-        affinity = RequestorAffinity.PER_PRODUCER;
-        if (c.getReplyTo() != null) {
-            if (c.getReplyToTempDestinationAffinity().equals(JmsConfiguration.REPLYTO_TEMP_DEST_AFFINITY_PER_ENDPOINT)) {
-                affinity = RequestorAffinity.PER_ENDPOINT;
-            } else if (c.getReplyToTempDestinationAffinity().equals(JmsConfiguration.REPLYTO_TEMP_DEST_AFFINITY_PER_COMPONENT)) {
-                affinity = RequestorAffinity.PER_COMPONENT;
-            }
-        }
-    }
-
-    public long getRequestTimeout() {
-        return endpoint.getConfiguration().getRequestTimeout();
     }
 
-    protected void doStart() throws Exception {
-        super.doStart();
-    }
-
-    protected void testAndSetRequestor() throws RuntimeCamelException {
+    protected void initReplyManager() {
         if (!started.get()) {
             synchronized (this) {
                 if (started.get()) {
                     return;
                 }
                 try {
-                    JmsConfiguration c = endpoint.getConfiguration();
-                    if (c.getReplyTo() != null) {
-                        requestor = new PersistentReplyToRequestor(endpoint.getConfiguration(), endpoint.getRequestorExecutorService());
-                        requestor.start();
+                    if (endpoint.getReplyTo() != null) {
+                        replyManager = endpoint.getReplyManager(endpoint.getReplyTo());
+                        if (LOG.isInfoEnabled()) {
+                            LOG.info("Using JmsReplyManager: " + replyManager + " to process replies from: " + endpoint.getReplyTo()
+                                    + " queue with " + endpoint.getConcurrentConsumers() + " concurrent consumers.");
+                        }
                     } else {
-                        if (affinity == RequestorAffinity.PER_PRODUCER) {
-                            requestor = new Requestor(endpoint.getConfiguration(), endpoint.getRequestorExecutorService());
-                            requestor.start();
-                        } else if (affinity == RequestorAffinity.PER_ENDPOINT) {
-                            requestor = endpoint.getRequestor();
-                        } else if (affinity == RequestorAffinity.PER_COMPONENT) {
-                            requestor = ((JmsComponent)endpoint.getComponent()).getRequestor();
+                        replyManager = endpoint.getReplyManager();
+                        if (LOG.isInfoEnabled()) {
+                            LOG.info("Using JmsReplyManager: " + replyManager + " to process replies from temporary queue with "
+                                    + endpoint.getConcurrentConsumers() + " concurrent consumers.");
                         }
                     }
                 } catch (Exception e) {
                     throw new FailedToCreateProducerException(endpoint, e);
                 }
-                deferredRequestReplyMap = requestor.getDeferredRequestReplyMap(this);
                 started.set(true);
             }
         }
     }
 
-    protected void testAndUnsetRequestor() throws Exception  {
-        if (started.get()) {
-            synchronized (this) {
-                if (!started.get()) {
-                    return;
-                }
-                requestor.removeDeferredRequestReplyMap(this);
-                if (affinity == RequestorAffinity.PER_PRODUCER) {
-                    requestor.stop();
-                }
-                started.set(false);
-            }
-        }
-    }
-
-    protected void doStop() throws Exception {
-        testAndUnsetRequestor();
-        super.doStop();
-    }
-
     public boolean process(Exchange exchange, AsyncCallback callback) {
         if (!endpoint.isDisableReplyTo() && exchange.getPattern().isOutCapable()) {
             // in out requires a bit more work than in only
@@ -173,111 +116,76 @@ public class JmsProducer extends Default
             destinationName = null;
         }
 
-        testAndSetRequestor();
+        initReplyManager();
 
         // note due to JMS transaction semantics we cannot use a single transaction
         // for sending the request and receiving the response
-        final Destination replyTo = requestor.getReplyTo();
-
+        final Destination replyTo = replyManager.getReplyTo();
         if (replyTo == null) {
             throw new RuntimeExchangeException("Failed to resolve replyTo destination", exchange);
         }
 
+        // when using message id as correlation id, we need at first to use a provisional correlation id
+        // which we then update to the real JMSMessageID when the message has been sent
+        // this is done with the help of the MessageSentCallback
         final boolean msgIdAsCorrId = endpoint.getConfiguration().isUseMessageIDAsCorrelationID();
-        String correlationId = in.getHeader("JMSCorrelationID", String.class);
+        final String provisionalCorrelationId = msgIdAsCorrId ? getUuidGenerator().generateUuid() : null;
+        MessageSentCallback messageSentCallback = null;
+        if (msgIdAsCorrId) {
+            messageSentCallback = new UseMessageIdAsCorrelationIdMessageSentCallback(replyManager, provisionalCorrelationId, endpoint.getRequestTimeout());
+        }
+        final ValueHolder<MessageSentCallback> sentCallback = new ValueHolder<MessageSentCallback>(messageSentCallback);
 
-        if (correlationId == null && !msgIdAsCorrId) {
+        final String originalCorrelationId = in.getHeader("JMSCorrelationID", String.class);
+        if (originalCorrelationId == null && !msgIdAsCorrId) {
             in.setHeader("JMSCorrelationID", getUuidGenerator().generateUuid());
         }
 
-        final ValueHolder<FutureTask> futureHolder = new ValueHolder<FutureTask>();
-        final DeferredMessageSentCallback jmsCallback = msgIdAsCorrId ? deferredRequestReplyMap.createDeferredMessageSentCallback() : null;
-
         MessageCreator messageCreator = new MessageCreator() {
             public Message createMessage(Session session) throws JMSException {
                 Message message = endpoint.getBinding().makeJmsMessage(exchange, in, session, null);
                 message.setJMSReplyTo(replyTo);
-                requestor.setReplyToSelectorHeader(in, message);
+                replyManager.setReplyToSelectorHeader(in, message);
 
-                FutureTask future;
-                future = (!msgIdAsCorrId)
-                        ? requestor.getReceiveFuture(message.getJMSCorrelationID(), endpoint.getConfiguration().getRequestTimeout())
-                        : requestor.getReceiveFuture(jmsCallback);
+                String correlationId = determineCorrelationId(message, provisionalCorrelationId);
+                replyManager.registerReply(replyManager, exchange, callback, originalCorrelationId, correlationId, endpoint.getRequestTimeout());
 
-                futureHolder.set(future);
                 return message;
             }
         };
 
-        doSend(true, destinationName, destination, messageCreator, jmsCallback);
+        doSend(true, destinationName, destination, messageCreator, sentCallback.get());
 
         // after sending then set the OUT message id to the JMSMessageID so its identical
         setMessageId(exchange);
 
-        // now we should routing asynchronously to not block while waiting for the reply
-        // TODO:
-        // we need a thread pool to use for continue routing messages, just like a seda consumer
-        // and we need options to configure it as well so you can indicate how many threads to use
-        // TODO: Also consider requestTimeout
-
-        // lets wait and return the response
-        long requestTimeout = endpoint.getConfiguration().getRequestTimeout();
-        try {
-            Message message = null;
-            try {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Message sent, now waiting for reply at: " + replyTo.toString());
-                }
-                if (requestTimeout <= 0) {
-                    message = (Message)futureHolder.get().get();
-                } else {
-                    message = (Message)futureHolder.get().get(requestTimeout, TimeUnit.MILLISECONDS);
-                }
-            } catch (InterruptedException e) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Future interrupted: " + e, e);
-                }
-            } catch (TimeoutException e) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Future timed out: " + e, e);
-                }
-            }
-            if (message != null) {
-                // the response can be an exception
-                JmsMessage response = new JmsMessage(message, endpoint.getBinding());
-                Object body = response.getBody();
-
-                if (endpoint.isTransferException() && body instanceof Exception) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Reply received. Setting reply as an Exception: " + body);
-                    }
-                    // we got an exception back and endpoint was configured to transfer exception
-                    // therefore set response as exception
-                    exchange.setException((Exception) body);
-                } else {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Reply received. Setting reply as OUT message: " + body);
-                    }
-                    // regular response
-                    exchange.setOut(response);
-                }
+        // continue routing asynchronously (reply will be processed async when its received)
+        return false;
+    }
 
-                // restore correlation id in case the remote server messed with it
-                if (correlationId != null) {
-                    message.setJMSCorrelationID(correlationId);
-                    exchange.getOut().setHeader("JMSCorrelationID", correlationId);
-                }
-            } else {
-                // no response, so lets set a timed out exception
-                exchange.setException(new ExchangeTimedOutException(exchange, requestTimeout));
-            }
-        } catch (Exception e) {
-            exchange.setException(e);
+    /**
+     * Strategy to determine which correlation id to use among <tt>JMSMessageID</tt> and <tt>JMSCorrelationID</tt>.
+     *
+     * @param message   the JMS message
+     * @param provisionalCorrelationId an optional provisional correlation id, which is preferred to be used
+     * @return the correlation id to use
+     * @throws JMSException can be thrown
+     */
+    protected String determineCorrelationId(Message message, String provisionalCorrelationId) throws JMSException {
+        if (provisionalCorrelationId != null) {
+            return provisionalCorrelationId;
         }
 
-        // TODO: should be async
-        callback.done(true);
-        return true;
+        final String messageId = message.getJMSMessageID();
+        final String correlationId = message.getJMSCorrelationID();
+        if (endpoint.getConfiguration().isUseMessageIDAsCorrelationID()) {
+            return messageId;
+        } else if (ObjectHelper.isEmpty(correlationId)) {
+            // correlation id is empty so fallback to message id
+            return messageId;
+        } else {
+            return correlationId;
+        }
     }
 
     protected boolean processInOnly(final Exchange exchange, final AsyncCallback callback) {
@@ -340,14 +248,14 @@ public class JmsProducer extends Default
     /**
      * Sends the message using the JmsTemplate.
      *
-     * @param inOut  use inOut or inOnly template
+     * @param inOut           use inOut or inOnly template
      * @param destinationName the destination name
      * @param destination     the destination (if no name provided)
-     * @param messageCreator  the creator to create the javax.jms.Message to send
+     * @param messageCreator  the creator to create the {@link Message} to send
      * @param callback        optional callback for inOut messages
      */
     protected void doSend(boolean inOut, String destinationName, Destination destination,
-                          MessageCreator messageCreator, DeferredMessageSentCallback callback) {
+                          MessageCreator messageCreator, MessageSentCallback callback) {
 
         CamelJmsTemplate template = null;
         CamelJmsTemplate102 template102 = null;
@@ -405,7 +313,7 @@ public class JmsProducer extends Default
                 }
             } catch (JMSException e) {
                 LOG.warn("Unable to retrieve JMSMessageID from outgoing "
-                    + "JMS Message and set it into Camel's MessageId", e);
+                        + "JMS Message and set it into Camel's MessageId", e);
             }
         }
     }
@@ -433,9 +341,6 @@ public class JmsProducer extends Default
     }
 
     public UuidGenerator getUuidGenerator() {
-        if (uuidGenerator == null) {
-            uuidGenerator = UuidGenerator.get();
-        }
         return uuidGenerator;
     }
 
@@ -443,4 +348,16 @@ public class JmsProducer extends Default
         this.uuidGenerator = uuidGenerator;
     }
 
+    protected void doStart() throws Exception {
+        super.doStart();
+        if (uuidGenerator == null) {
+            // use the default generator
+            uuidGenerator = UuidGenerator.get();
+        }
+    }
+
+    protected void doStop() throws Exception {
+        super.doStop();
+    }
+
 }

Added: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/MessageSentCallback.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/MessageSentCallback.java?rev=978995&view=auto
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/MessageSentCallback.java (added)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/MessageSentCallback.java Sun Jul 25 07:31:29 2010
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms;
+
+import javax.jms.Destination;
+import javax.jms.Message;
+
+/**
+ * Callback when a {@link Message} has been sent.
+ *
+ * @version $Revision$
+ */
+public interface MessageSentCallback {
+
+    /**
+     * Callback when the message has been sent.
+     *
+     * @param message     the message
+     * @param destination the destination
+     */
+    void sent(Message message, Destination destination);
+}

Propchange: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/MessageSentCallback.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/MessageSentCallback.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationMap.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationMap.java?rev=978995&view=auto
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationMap.java (added)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationMap.java Sun Jul 25 07:31:29 2010
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms.reply;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.camel.util.DefaultTimeoutMap;
+
+/**
+ * @version $Revision$
+ */
+public class CorrelationMap extends DefaultTimeoutMap<String, ReplyHandler> {
+
+    public CorrelationMap(ScheduledExecutorService executor, long requestMapPollTimeMillis) {
+        super(executor, requestMapPollTimeMillis);
+    }
+
+    public boolean onEviction(String key, ReplyHandler value) {
+        // trigger timeout
+        value.onTimeout(key);
+        // return true to remove the element
+        return true;
+    }
+
+}

Propchange: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationMap.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationMap.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Copied: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/MessageSelectorCreator.java (from r966966, camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/MessageSelectorProvider.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/MessageSelectorCreator.java?p2=camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/MessageSelectorCreator.java&p1=camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/MessageSelectorProvider.java&r1=966966&r2=978995&rev=978995&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/MessageSelectorProvider.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/MessageSelectorCreator.java Sun Jul 25 07:31:29 2010
@@ -14,17 +14,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.component.jms.requestor;
+package org.apache.camel.component.jms.reply;
 
 import java.util.HashMap;
 import java.util.Map;
 
-public class MessageSelectorProvider {
+/**
+ * A creator which can build the JMS message selector query string to use
+ * with a shared persistent reply-to queue, so we can select the correct messages we expect as replies.
+ */
+public class MessageSelectorCreator {
     protected Map<String, String> correlationIds;
     protected boolean dirty = true;
     protected StringBuilder expression;
 
-    public MessageSelectorProvider() {
+    public MessageSelectorCreator() {
         correlationIds = new HashMap<String, String>();
     }
 
@@ -42,18 +46,27 @@ public class MessageSelectorProvider {
         if (!dirty) {
             return expression.toString();
         }
+
         expression = new StringBuilder("JMSCorrelationID='");
-        boolean first = true;
-        for (Map.Entry<String, String> entry : correlationIds.entrySet()) {
-            if (!first) {
-                expression.append(" OR JMSCorrelationID='");
-            }
-            expression.append(entry.getValue()).append("'");
-            if (first) {
-                first = false;
+
+        if (correlationIds.isEmpty()) {
+            // no id's so use a dummy to select nothing
+            expression.append("CamelDummyJmsMessageSelector'");
+        } else {
+            boolean first = true;
+            for (Map.Entry<String, String> entry : correlationIds.entrySet()) {
+                if (!first) {
+                    expression.append(" OR JMSCorrelationID='");
+                }
+                expression.append(entry.getValue()).append("'");
+                if (first) {
+                    first = false;
+                }
             }
         }
+
         dirty = false;
         return expression.toString();
     }
-}
+
+}
\ No newline at end of file

Propchange: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/MessageSelectorCreator.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/MessageSelectorCreator.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyHandler.java?rev=978995&view=auto
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyHandler.java (added)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyHandler.java Sun Jul 25 07:31:29 2010
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms.reply;
+
+import javax.jms.Message;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+
+/**
+ * {@link ReplyHandler} to handle processing replies when using persistent queues.
+ *
+ * @version $Revision$
+ */
+public class PersistentQueueReplyHandler extends TemporaryQueueReplyHandler {
+
+    private MessageSelectorCreator dynamicMessageSelector;
+
+    public PersistentQueueReplyHandler(ReplyManager replyManager, Exchange exchange, AsyncCallback callback,
+                                       String originalCorrelationId, long timeout, MessageSelectorCreator dynamicMessageSelector) {
+        super(replyManager, exchange, callback, originalCorrelationId, timeout);
+        this.dynamicMessageSelector = dynamicMessageSelector;
+    }
+
+    @Override
+    public void onReply(String correlationId, Message reply) {
+        if (dynamicMessageSelector != null) {
+            // remove correlation id from message selector
+            dynamicMessageSelector.removeCorrelationID(correlationId);
+        }
+        super.onReply(correlationId, reply);
+    }
+
+    @Override
+    public void onTimeout(String correlationId) {
+        if (dynamicMessageSelector != null) {
+            // remove correlation id from message selector
+            dynamicMessageSelector.removeCorrelationID(correlationId);
+        }
+        super.onTimeout(correlationId);
+    }
+}

Propchange: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyHandler.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java?rev=978995&view=auto
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java (added)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java Sun Jul 25 07:31:29 2010
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms.reply;
+
+import java.math.BigInteger;
+import java.util.Random;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Session;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.springframework.core.task.TaskExecutor;
+import org.springframework.jms.listener.AbstractMessageListenerContainer;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
+import org.springframework.jms.support.destination.DestinationResolver;
+import org.springframework.transaction.PlatformTransactionManager;
+
+/**
+ * A {@link ReplyManager} when using persistent queues.
+ *
+ * @version $Revision$
+ */
+public class PersistentQueueReplyManager extends ReplyManagerSupport {
+
+    private String replyToSelectorValue;
+    private MessageSelectorCreator dynamicMessageSelector;
+
+    public String registerReply(ReplyManager replyManager, Exchange exchange, AsyncCallback callback,
+                                String originalCorrelationId, String correlationId, long requestTimeout) {
+        // add to correlation map
+        PersistentQueueReplyHandler handler = new PersistentQueueReplyHandler(replyManager, exchange, callback,
+                originalCorrelationId, requestTimeout, dynamicMessageSelector);
+        correlation.put(correlationId, handler, requestTimeout);
+        if (dynamicMessageSelector != null) {
+            // also remember to keep the dynamic selector updated with the new correlation id
+            dynamicMessageSelector.addCorrelationID(correlationId);
+        }
+        return correlationId;
+    }
+
+    public void updateCorrelationId(String correlationId, String newCorrelationId, long requestTimeout) {
+        if (log.isTraceEnabled()) {
+            log.trace("Updated provisional correlationId [" + correlationId + "] to expected correlationId [" + newCorrelationId + "]");
+        }
+
+        ReplyHandler handler = correlation.remove(correlationId);
+        if (handler == null) {
+            // should not happen that we can't find the handler
+            return;
+        }
+
+        correlation.put(newCorrelationId, handler, requestTimeout);
+
+        // no not arrived early
+        if (dynamicMessageSelector != null) {
+            // also remember to keep the dynamic selector updated with the new correlation id
+            dynamicMessageSelector.addCorrelationID(newCorrelationId);
+        }
+    }
+
+    protected void handleReplyMessage(String correlationID, Message message) {
+        ReplyHandler handler = correlation.get(correlationID);
+        if (handler == null && endpoint.isUseMessageIDAsCorrelationID()) {
+            handler = waitForProvisionCorrelationToBeUpdated(correlationID, message);
+        }
+
+        if (handler != null) {
+            try {
+                handler.onReply(correlationID, message);
+            } finally {
+                if (dynamicMessageSelector != null) {
+                    // also remember to keep the dynamic selector updated with the new correlation id
+                    dynamicMessageSelector.removeCorrelationID(correlationID);
+                }
+                correlation.remove(correlationID);
+            }
+        } else {
+            // we could not correlate the received reply message to a matching request and therefore
+            // we cannot continue routing the unknown message
+            String text = "Reply received for unknown correlationID [" + correlationID + "] -> " + message;
+            log.warn(text);
+            throw new UnknownReplyMessageException(text, message, correlationID);
+        }
+    }
+
+    public void setReplyToSelectorHeader(org.apache.camel.Message camelMessage, Message jmsMessage) throws JMSException {
+        String replyToSelectorName = endpoint.getReplyToDestinationSelectorName();
+        if (replyToSelectorName != null && replyToSelectorValue != null) {
+            camelMessage.setHeader(replyToSelectorName, replyToSelectorValue);
+            jmsMessage.setStringProperty(replyToSelectorName, replyToSelectorValue);
+        }
+    }
+
+    private final class DestinationResolverDelegate implements DestinationResolver {
+        private DestinationResolver delegate;
+        private Destination destination;
+
+        public DestinationResolverDelegate(DestinationResolver delegate) {
+            this.delegate = delegate;
+        }
+
+        public Destination resolveDestinationName(Session session, String destinationName,
+                                                  boolean pubSubDomain) throws JMSException {
+            synchronized (PersistentQueueReplyManager.this) {
+                try {
+                    // resolve the reply to destination
+                    if (destination == null) {
+                        destination = delegate.resolveDestinationName(session, destinationName, pubSubDomain);
+                        setReplyTo(destination);
+                    }
+                } finally {
+                    PersistentQueueReplyManager.this.notifyAll();
+                }
+            }
+            return destination;
+        }
+    };
+
+    private final class PersistentQueueMessageListenerContainer extends DefaultMessageListenerContainer {
+
+        private String fixedMessageSelector;
+        private MessageSelectorCreator creator;
+
+        private PersistentQueueMessageListenerContainer(String fixedMessageSelector) {
+            this.fixedMessageSelector = fixedMessageSelector;
+        }
+
+        private PersistentQueueMessageListenerContainer(MessageSelectorCreator creator) {
+            this.creator = creator;
+        }
+
+        @Override
+        public String getMessageSelector() {
+            String id = null;
+            if (fixedMessageSelector != null) {
+                id = fixedMessageSelector;
+            } else if (creator != null) {
+                id = creator.get();
+            }
+            if (log.isTraceEnabled()) {
+                log.trace("Using MessageSelector[" + id + "]");
+            }
+            return id;
+        }
+    }
+
+    protected AbstractMessageListenerContainer createListenerContainer() throws Exception {
+        DefaultMessageListenerContainer answer;
+
+        String replyToSelectorName = endpoint.getReplyToDestinationSelectorName();
+        if (replyToSelectorName != null) {
+            // 24 max char is what IBM WebSphereMQ supports in CorrelationIDs
+            // use a fixed selector name so we can select the replies which is intended for us
+            replyToSelectorValue = "ID:" + new BigInteger(24 * 8, new Random()).toString(16);
+            String fixedMessageSelector = replyToSelectorName + "='" + replyToSelectorValue + "'";
+            answer = new PersistentQueueMessageListenerContainer(fixedMessageSelector);
+        } else {
+            // use a dynamic message selector which will select the message we want to receive as reply
+            dynamicMessageSelector = new MessageSelectorCreator();
+            answer = new PersistentQueueMessageListenerContainer(dynamicMessageSelector);
+        }
+
+        answer.setConnectionFactory(endpoint.getListenerConnectionFactory());
+        DestinationResolver resolver = endpoint.getDestinationResolver();
+        if (resolver == null) {
+            resolver = answer.getDestinationResolver();
+        }
+        answer.setDestinationResolver(new DestinationResolverDelegate(resolver));
+        answer.setDestinationName(endpoint.getReplyTo());
+
+        answer.setAutoStartup(true);
+        answer.setMessageListener(this);
+        answer.setPubSubDomain(false);
+        answer.setSubscriptionDurable(false);
+        answer.setConcurrentConsumers(endpoint.getConcurrentConsumers());
+
+        ExceptionListener exceptionListener = endpoint.getExceptionListener();
+        if (exceptionListener != null) {
+            answer.setExceptionListener(exceptionListener);
+        }
+
+        answer.setSessionTransacted(endpoint.isTransacted());
+        if (endpoint.isTransacted()) {
+            answer.setSessionAcknowledgeMode(Session.SESSION_TRANSACTED);
+        } else {
+            if (endpoint.getAcknowledgementMode() >= 0) {
+                answer.setSessionAcknowledgeMode(endpoint.getAcknowledgementMode());
+            } else if (endpoint.getAcknowledgementModeName() != null) {
+                answer.setSessionAcknowledgeModeName(endpoint.getAcknowledgementModeName());
+            }
+        }
+
+        answer.setConcurrentConsumers(1);
+        answer.setCacheLevel(DefaultMessageListenerContainer.CACHE_SESSION);
+
+        if (endpoint.getReceiveTimeout() >= 0) {
+            answer.setReceiveTimeout(endpoint.getReceiveTimeout());
+        }
+        if (endpoint.getRecoveryInterval() >= 0) {
+            answer.setRecoveryInterval(endpoint.getRecoveryInterval());
+        }
+        TaskExecutor taskExecutor = endpoint.getTaskExecutor();
+        if (taskExecutor != null) {
+            answer.setTaskExecutor(taskExecutor);
+        }
+        PlatformTransactionManager tm = endpoint.getTransactionManager();
+        if (tm != null) {
+            answer.setTransactionManager(tm);
+        } else if (endpoint.isTransacted()) {
+            throw new IllegalArgumentException("Property transacted is enabled but a transactionManager was not injected!");
+        }
+        if (endpoint.getTransactionName() != null) {
+            answer.setTransactionName(endpoint.getTransactionName());
+        }
+        if (endpoint.getTransactionTimeout() >= 0) {
+            answer.setTransactionTimeout(endpoint.getTransactionTimeout());
+        }
+
+        return answer;
+    }
+
+}

Propchange: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHandler.java?rev=978995&view=auto
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHandler.java (added)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHandler.java Sun Jul 25 07:31:29 2010
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms.reply;
+
+import javax.jms.Message;
+
+/**
+ * Handles a reply.
+ *
+ * @version $Revision$
+ */
+public interface ReplyHandler {
+
+    /**
+     * The reply message was received
+     *
+     * @param correlationId  the correlation id
+     * @param reply  the reply message
+     */
+    void onReply(String correlationId, Message reply);
+
+    /**
+     * The reply message was not received and a timeout triggered
+     *
+     * @param correlationId  the correlation id
+     */
+    void onTimeout(String correlationId);
+}

Propchange: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHandler.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHolder.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHolder.java?rev=978995&view=auto
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHolder.java (added)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHolder.java Sun Jul 25 07:31:29 2010
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms.reply;
+
+import javax.jms.Message;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+
+/**
+ * Holder which contains the {@link Exchange} and {@link org.apache.camel.AsyncCallback} to be used
+ * when the reply arrives, so we can set the reply on the {@link Exchange} and continue routing using the callback.
+ *
+ * @version $Revision$
+ */
+public class ReplyHolder {
+
+    private final Exchange exchange;
+    private final AsyncCallback callback;
+    private final Message message;
+    private final String originalCorrelationId;
+    private long timeout;
+
+    /**
+     * Constructor to use when a reply message was received
+     */
+    public ReplyHolder(Exchange exchange, AsyncCallback callback, String originalCorrelationId, Message message) {
+        this.exchange = exchange;
+        this.callback = callback;
+        this.originalCorrelationId = originalCorrelationId;
+        this.message = message;
+    }
+
+    /**
+     * Constructor to use when a timeout occurred
+     */
+    public ReplyHolder(Exchange exchange, AsyncCallback callback, String originalCorrelationId, long timeout) {
+        this(exchange, callback, originalCorrelationId, null);
+        this.timeout = timeout;
+    }
+
+    public Exchange getExchange() {
+        return exchange;
+    }
+
+    public AsyncCallback getCallback() {
+        return callback;
+    }
+
+    /**
+     * Gets the original correlation id, if one was set when sending the message.
+     * <p/>
+     * Some JMS brokers will mess with the correlation id and send back a different/empty correlation id.
+     * So we need to remember it so we can restore the correlation id.
+     */
+    public String getOriginalCorrelationId() {
+        return originalCorrelationId;
+    }
+
+    /**
+     * Gets the received message
+     *
+     * @return  the received message, or <tt>null</tt> if timeout occurred and no message has been received
+     * @see #isTimeout()
+     */
+    public Message getMessage() {
+        return message;
+    }
+
+    /**
+     * Whether timeout triggered or not.
+     * <p/>
+     * A timeout is triggered if <tt>requestTimeout</tt> option has been configured, and a reply message has <b>not</b> been
+     * received within that time frame.
+     */
+    public boolean isTimeout() {
+        return message == null;
+    }
+
+    /**
+     * The timeout value
+     */
+    public long getRequestTimeout() {
+        return timeout;
+    }
+}

Propchange: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHolder.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHolder.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManager.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManager.java?rev=978995&view=auto
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManager.java (added)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManager.java Sun Jul 25 07:31:29 2010
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms.reply;
+
+import java.util.concurrent.ScheduledExecutorService;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.component.jms.JmsEndpoint;
+
+/**
+ * The {@link ReplyManager} is responsible for handling <a href="http://camel.apache.org/request-reply.html">request-reply</a>
+ * over JMS.
+ *
+ * @version $Revision$
+ */
+public interface ReplyManager extends MessageListener {
+
+    /**
+     * Sets the belonging {@link org.apache.camel.component.jms.JmsEndpoint}.
+     */
+    void setEndpoint(JmsEndpoint endpoint);
+
+    /**
+     * Sets the reply to queue the manager should listen for replies.
+     * <p/>
+     * The queue is either a temporary or a persistent queue.
+     */
+    void setReplyTo(Destination replyTo);
+
+    /**
+     * Sets the scheduled to use when checking for timeouts (no reply received within a given time period)
+     */
+    void setScheduledExecutorService(ScheduledExecutorService executorService);
+
+    /**
+     * Gets the reply to queue being used
+     */
+    Destination getReplyTo();
+
+    /**
+     * To be used when a persistent reply queue is used with a custom JMS selector is being used.
+     */
+    void setReplyToSelectorHeader(org.apache.camel.Message camelMessage, Message jmsMessage) throws JMSException;
+
+    /**
+     * Register a reply
+     *
+     * @param replyManager    the reply manager being used
+     * @param exchange        the exchange
+     * @param callback        the callback
+     * @param originalCorrelationId  an optional original correlation id
+     * @param correlationId   the correlation id to expect being used
+     * @param requestTimeout  an optional timeout
+     * @return the correlation id used
+     */
+    String registerReply(ReplyManager replyManager, Exchange exchange, AsyncCallback callback,
+                         String originalCorrelationId, String correlationId, long requestTimeout);
+
+    /**
+     * Updates the correlation id to the new correlation id.
+     * <p/>
+     * This is only used when <tt>useMessageIDasCorrelationID</tt> option is used, which means a
+     * provisional correlation id is first used, then after the message has been sent, the real
+     * correlation id is known. This allows us then to update the internal mapping to expect the
+     * real correlation id.
+     *
+     * @param correlationId     the provisional correlation id
+     * @param newCorrelationId  the real correlation id
+     * @param requestTimeout    an optional timeout
+     */
+    void updateCorrelationId(String correlationId, String newCorrelationId, long requestTimeout);
+
+    /**
+     * Process the reply
+     *
+     * @param holder  containing needed data to process the reply and continue routing
+     */
+    void processReply(ReplyHolder holder);
+}

Propchange: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManager.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManager.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java?rev=978995&view=auto
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java (added)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java Sun Jul 25 07:31:29 2010
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms.reply;
+
+import java.util.concurrent.ScheduledExecutorService;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeTimedOutException;
+import org.apache.camel.component.jms.JmsEndpoint;
+import org.apache.camel.component.jms.JmsMessage;
+import org.apache.camel.component.jms.JmsMessageHelper;
+import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.jms.listener.AbstractMessageListenerContainer;
+
+/**
+ * Base class for {@link ReplyManager} implementations.
+ *
+ * @version $Revision$
+ */
+public abstract class ReplyManagerSupport extends ServiceSupport implements ReplyManager {
+
+    protected final Log log = LogFactory.getLog(getClass());
+    protected ScheduledExecutorService executorService;
+    protected JmsEndpoint endpoint;
+    protected Destination replyTo;
+    protected AbstractMessageListenerContainer listenerContainer;
+    protected long replyToResolverTimeout = 5000;
+    protected CorrelationMap correlation;
+
+    public void setScheduledExecutorService(ScheduledExecutorService executorService) {
+        this.executorService = executorService;
+    }
+
+    public void setEndpoint(JmsEndpoint endpoint) {
+        this.endpoint = endpoint;
+    }
+
+    public void setReplyTo(Destination replyTo) {
+        if (log.isTraceEnabled()) {
+            log.trace("ReplyTo destination: " + replyTo);
+        }
+        this.replyTo = replyTo;
+    }
+
+    public Destination getReplyTo() {
+        synchronized (this) {
+            try {
+                // wait for the reply to destination to be resolved
+                if (replyTo == null) {
+                    wait(replyToResolverTimeout);
+                }
+            } catch (Throwable e) {
+                // ignore
+            }
+        }
+        return replyTo;
+    }
+
+    public void onMessage(Message message) {
+        String correlationID = null;
+        try {
+            correlationID = message.getJMSCorrelationID();
+        } catch (JMSException e) {
+            // ignore
+        }
+        if (correlationID == null) {
+            log.warn("Ignoring message with no correlationID: " + message);
+            return;
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Received reply message with correlationID: " + correlationID + " -> " + message);
+        }
+
+        // handle the reply message
+        handleReplyMessage(correlationID, message);
+    }
+
+    public void processReply(ReplyHolder holder) {
+        if (holder != null && isRunAllowed()) {
+            Exchange exchange = holder.getExchange();
+            Message message = holder.getMessage();
+
+            boolean timeout = holder.isTimeout();
+            if (timeout) {
+                // no response, so lets set a timed out exception
+                exchange.setException(new ExchangeTimedOutException(exchange, holder.getRequestTimeout()));
+            } else {
+                JmsMessage response = new JmsMessage(message, endpoint.getBinding());
+                Object body = response.getBody();
+
+                if (endpoint.isTransferException() && body instanceof Exception) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Reply received. Setting reply as an Exception: " + body);
+                    }
+                    // we got an exception back and endpoint was configured to transfer exception
+                    // therefore set response as exception
+                    exchange.setException((Exception) body);
+                } else {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Reply received. Setting reply as OUT message: " + body);
+                    }
+                    // regular response
+                    exchange.setOut(response);
+                }
+
+                // restore correlation id in case the remote server messed with it
+                if (holder.getOriginalCorrelationId() != null) {
+                    JmsMessageHelper.setCorrelationId(message, holder.getOriginalCorrelationId());
+                    exchange.getOut().setHeader("JMSCorrelationID", holder.getOriginalCorrelationId());
+                }
+            }
+
+            // notify callback
+            AsyncCallback callback = holder.getCallback();
+            callback.done(false);
+        }
+    }
+
+    protected abstract void handleReplyMessage(String correlationID, Message message);
+
+    protected abstract AbstractMessageListenerContainer createListenerContainer() throws Exception;
+
+    /**
+     * <b>IMPORTANT:</b> This logic is only being used due to high performance in-memory only
+     * testing using InOut over JMS. Its unlikely to happen in a real life situation with communication
+     * to a remote broker, which always will be slower to send back reply, before Camel had a chance
+     * to update it's internal correlation map.
+     */
+    protected ReplyHandler waitForProvisionCorrelationToBeUpdated(String correlationID, Message message) {
+        // race condition, when using messageID as correlationID then we store a provisional correlation id
+        // at first, which gets updated with the JMSMessageID after the message has been sent. And in the unlikely
+        // event that the reply comes back really really fast, and the correlation map hasn't yet been updated
+        // from the provisional id to the JMSMessageID. If so we have to wait a bit and lookup again.
+        if (log.isWarnEnabled()) {
+            log.warn("Early reply received with correlationID [" + correlationID + "] -> " + message);
+        }
+
+        ReplyHandler answer = null;
+
+        // wait up till 5 seconds
+        boolean done = false;
+        int counter = 0;
+        while (!done && counter++ < 50) {
+            if (log.isTraceEnabled()) {
+                log.trace("Early reply not found handler at attempt " + counter + ". Waiting a bit longer.");
+            }
+            try {
+                Thread.sleep(100);
+            } catch (InterruptedException e) {
+                // ignore
+            }
+
+            // try again
+            answer = correlation.get(correlationID);
+            done = answer != null;
+
+            if (answer != null) {
+                if (log.isTraceEnabled()) {
+                    log.trace("Early reply with correlationID [" + correlationID + "] has been matched after "
+                            + counter + " attempts and can be processed using handler: " + answer);
+                }
+            }
+        }
+
+        return answer;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        ObjectHelper.notNull(executorService, "executorService", this);
+        ObjectHelper.notNull(endpoint, "endpoint", this);
+
+        // purge for timeout every second
+        correlation = new CorrelationMap(executorService, 1000);
+        ServiceHelper.startService(correlation);
+
+        // create JMS listener and start it
+        listenerContainer = createListenerContainer();
+        listenerContainer.afterPropertiesSet();
+        listenerContainer.start();
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        ServiceHelper.stopService(correlation);
+
+        if (listenerContainer != null) {
+            listenerContainer.stop();
+            listenerContainer.destroy();
+            listenerContainer = null;
+        }
+    }
+
+}

Propchange: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyHandler.java?rev=978995&view=auto
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyHandler.java (added)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyHandler.java Sun Jul 25 07:31:29 2010
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms.reply;
+
+import javax.jms.Message;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+
+/**
+ * {@link ReplyHandler} to handle processing replies when using temporary queues.
+ *
+ * @version $Revision$
+ */
+public class TemporaryQueueReplyHandler implements ReplyHandler {
+
+    // task queue to add the holder so we can process the reply
+    protected final ReplyManager replyManager;
+    protected final Exchange exchange;
+    protected final AsyncCallback callback;
+    // remember the original correlation id, in case the server returns back a reply with a messed up correlation id
+    protected final String originalCorrelationId;
+    protected final long timeout;
+
+    public TemporaryQueueReplyHandler(ReplyManager replyManager, Exchange exchange, AsyncCallback callback,
+                                      String originalCorrelationId, long timeout) {
+        this.replyManager = replyManager;
+        this.exchange = exchange;
+        this.originalCorrelationId = originalCorrelationId;
+        this.callback = callback;
+        this.timeout = timeout;
+    }
+
+    public void onReply(String correlationId, Message reply) {
+        // create holder object with the reply and add to task queue so we can process the reply and continue
+        // route the exchange using the async routing engine
+        ReplyHolder holder = new ReplyHolder(exchange, callback, originalCorrelationId, reply);
+        // process reply
+        replyManager.processReply(holder);
+    }
+
+    public void onTimeout(String correlationId) {
+        // create holder object without the reply which means a timeout occurred
+        ReplyHolder holder = new ReplyHolder(exchange, callback, originalCorrelationId, timeout);
+        // process timeout
+        replyManager.processReply(holder);
+    }
+}