You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2008/05/03 10:31:49 UTC

svn commit: r653015 - in /activemq/camel/trunk: camel-core/src/main/java/org/apache/camel/ components/camel-jms/src/main/java/org/apache/camel/component/jms/ components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/ components/camel-...

Author: ningjiang
Date: Sat May  3 01:31:48 2008
New Revision: 653015

URL: http://svn.apache.org/viewvc?rev=653015&view=rev
Log:
CAMEL-490 applied the patch with thanks to Marat

Added:
    activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/MessageSelectorProvider.java   (with props)
    activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/PersistentReplyToFutureHandler.java   (with props)
    activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/PersistentReplyToRequestor.java   (with props)
    activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/QueueToQueueRequestReplyTransactionTest.java   (with props)
Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelTemplate.java
    activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
    activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
    activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
    activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
    activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
    activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java
    activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
    activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/DeferredRequestReplyMap.java
    activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/FutureHandler.java
    activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/Requestor.java
    activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteRequestReplyTest.java
    activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/ConditionalExceptionProcessor.java
    activemq/camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/tx/AbstractTransactionTest.xml

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelTemplate.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelTemplate.java?rev=653015&r1=653014&r2=653015&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelTemplate.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelTemplate.java Sat May  3 01:31:48 2008
@@ -440,6 +440,17 @@
         this.useEndpointCache = useEndpointCache;
     }
 
+    public <T extends Endpoint<?>> T getResolvedEndpoint(String endpointUri, Class<T> expectedClass) {
+        Endpoint<?> e = null;
+        synchronized (endpointCache) {
+            e = endpointCache.get(endpointUri);
+        }
+        if (e != null && expectedClass.isAssignableFrom(e.getClass())) {
+            return expectedClass.asSubclass(expectedClass).cast(e);
+        }
+        return null;
+    }
+    
     // Implementation methods
     // -----------------------------------------------------------------------
 

Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java?rev=653015&r1=653014&r2=653015&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java Sat May  3 01:31:48 2008
@@ -35,6 +35,8 @@
  * A JMS {@link MessageListener} which can be used to delegate processing to a
  * Camel endpoint.
  *
+ * Note that instance of this object has to be thread safe (reentrant)
+ * 
  * @version $Revision$    ;';;;
  */
 public class EndpointMessageListener implements MessageListener {
@@ -111,7 +113,7 @@
         this.eagerLoadingOfProperties = eagerLoadingOfProperties;
     }
 
-    public JmsOperations getTemplate() {
+    public synchronized JmsOperations getTemplate() {
         if (template == null) {
             template = endpoint.createInOnlyTemplate();
         }

Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java?rev=653015&r1=653014&r2=653015&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java Sat May  3 01:31:48 2008
@@ -128,7 +128,7 @@
         boolean alwaysCopy = (endpoint != null) ? endpoint.getConfiguration().isAlwaysCopyMessage() : false;
         if (!alwaysCopy && camelMessage instanceof JmsMessage) {
             JmsMessage jmsMessage = (JmsMessage)camelMessage;
-            if (! jmsMessage.shouldCreateNewMessage()) {
+            if (!jmsMessage.shouldCreateNewMessage()) {
                 answer = jmsMessage.getJmsMessage();
             }
         }

Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java?rev=653015&r1=653014&r2=653015&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java Sat May  3 01:31:48 2008
@@ -49,11 +49,6 @@
  */
 public class JmsComponent extends DefaultComponent<JmsExchange> implements ApplicationContextAware {
 
-    public static final String QUEUE_PREFIX = "queue:";
-    public static final String TOPIC_PREFIX = "topic:";
-    public static final String TEMP_QUEUE_PREFIX = "temp:queue:";
-    public static final String TEMP_TOPIC_PREFIX = "temp:topic:";
-
     private static final transient Log LOG = LogFactory.getLog(JmsComponent.class);
     private static final String DEFAULT_QUEUE_BROWSE_STRATEGY = "org.apache.camel.component.jms.DefaultQueueBrowseStrategy";
     private JmsConfiguration configuration;
@@ -365,20 +360,20 @@
 
         boolean pubSubDomain = false;
         boolean tempDestination = false;
-        if (remaining.startsWith(QUEUE_PREFIX)) {
+        if (remaining.startsWith(JmsConfiguration.QUEUE_PREFIX)) {
             pubSubDomain = false;
-            remaining = removeStartingCharacters(remaining.substring(QUEUE_PREFIX.length()), '/');
-        } else if (remaining.startsWith(TOPIC_PREFIX)) {
+            remaining = removeStartingCharacters(remaining.substring(JmsConfiguration.QUEUE_PREFIX.length()), '/');
+        } else if (remaining.startsWith(JmsConfiguration.TOPIC_PREFIX)) {
             pubSubDomain = true;
-            remaining = removeStartingCharacters(remaining.substring(TOPIC_PREFIX.length()), '/');
-        } else if (remaining.startsWith(TEMP_QUEUE_PREFIX)) {
+            remaining = removeStartingCharacters(remaining.substring(JmsConfiguration.TOPIC_PREFIX.length()), '/');
+        } else if (remaining.startsWith(JmsConfiguration.TEMP_QUEUE_PREFIX)) {
             pubSubDomain = false;
             tempDestination = true;
-            remaining = removeStartingCharacters(remaining.substring(TEMP_QUEUE_PREFIX.length()), '/');
-        } else if (remaining.startsWith(TEMP_TOPIC_PREFIX)) {
+            remaining = removeStartingCharacters(remaining.substring(JmsConfiguration.TEMP_QUEUE_PREFIX.length()), '/');
+        } else if (remaining.startsWith(JmsConfiguration.TEMP_TOPIC_PREFIX)) {
             pubSubDomain = true;
             tempDestination = true;
-            remaining = removeStartingCharacters(remaining.substring(TEMP_TOPIC_PREFIX.length()), '/');
+            remaining = removeStartingCharacters(remaining.substring(JmsConfiguration.TEMP_TOPIC_PREFIX.length()), '/');
         }
 
         final String subject = convertPathToActualDestination(remaining, parameters);

Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java?rev=653015&r1=653014&r2=653015&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java Sat May  3 01:31:48 2008
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.jms;
 
+import static org.apache.camel.util.ObjectHelper.removeStartingCharacters;
+
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
 import javax.jms.ExceptionListener;
@@ -57,10 +59,20 @@
  * @version $Revision$
  */
 public class JmsConfiguration implements Cloneable {
+    
+    public static final String QUEUE_PREFIX = "queue:";
+    public static final String TOPIC_PREFIX = "topic:";
+    public static final String TEMP_QUEUE_PREFIX = "temp:queue:";
+    public static final String TEMP_TOPIC_PREFIX = "temp:topic:";
+
     protected static final String TRANSACTED = "TRANSACTED";
     protected static final String CLIENT_ACKNOWLEDGE = "CLIENT_ACKNOWLEDGE";
     protected static final String AUTO_ACKNOWLEDGE = "AUTO_ACKNOWLEDGE";
     protected static final String DUPS_OK_ACKNOWLEDGE = "DUPS_OK_ACKNOWLEDGE";
+    protected static final String REPLYTO_TEMP_DEST_AFFINITY_PER_COMPONENT = "component";
+    protected static final String REPLYTO_TEMP_DEST_AFFINITY_PER_ENDPOINT = "endpoint";
+    protected static final String REPLYTO_TEMP_DEST_AFFINITY_PER_PRODUCER = "producer";
+    
     private static final transient Log LOG = LogFactory.getLog(JmsConfiguration.class);
     private JmsOperations jmsOperations;
     private DestinationResolver destinationResolver;
@@ -87,6 +99,7 @@
     private String cacheLevelName;
     private long recoveryInterval = -1;
     private long receiveTimeout = -1;
+    private long requestTimeout = 20000L;
     private int idleTaskExecutionLimit = 1;
     private int maxConcurrentConsumers = 1;
     // JmsTemplate only
@@ -113,6 +126,14 @@
     private boolean useMessageIDAsCorrelationID;
     private JmsProviderMetadata providerMetadata = new JmsProviderMetadata();
     private JmsOperations metadataJmsOperations;
+    // defines the component created temporary replyTo destination sharing strategy:
+    // possible values are: "component", "endpoint", "producer"
+    // component - a single temp queue is shared among all producers for a given component instance
+    // endpoint - a single temp queue is shared among all producers for a given endpoint instance
+    // producer - a single temp queue is created per producer
+    private String replyToTempDestinationAffinity = REPLYTO_TEMP_DEST_AFFINITY_PER_ENDPOINT;
+    private String replyToDestination;
+    private String replyToDestinationSelectorName;
 
     public JmsConfiguration() {
     }
@@ -846,6 +867,10 @@
             }
         }
 
+        if (endpoint.getSelector() != null && endpoint.getSelector().length() != 0) {
+            container.setMessageSelector(endpoint.getSelector());
+        }
+        
         if (container instanceof DefaultMessageListenerContainer) {
             // this includes DefaultMessageListenerContainer102
             DefaultMessageListenerContainer listenerContainer = (DefaultMessageListenerContainer)container;
@@ -932,7 +957,8 @@
             template.setDeliveryPersistent(isReplyToDeliveryPersistent());
         }
     }
-    protected AbstractMessageListenerContainer chooseMessageListenerContainerImplementation() {
+    
+    public AbstractMessageListenerContainer chooseMessageListenerContainerImplementation() {
         // TODO we could allow a spring container to auto-inject these objects?
         switch (consumerType) {
         case Simple:
@@ -1038,4 +1064,48 @@
     public void setUseMessageIDAsCorrelationID(boolean useMessageIDAsCorrelationID) {
         this.useMessageIDAsCorrelationID = useMessageIDAsCorrelationID;
     }
+
+    public String getReplyToTempDestinationAffinity() {
+        return replyToTempDestinationAffinity;
+    }
+
+    public void setReplyToTempDestinationAffinity(
+            String replyToTempDestinationAffinity) {
+        this.replyToTempDestinationAffinity = replyToTempDestinationAffinity;
+    }
+
+    public long getRequestTimeout() {
+        return requestTimeout;
+    }
+
+    public void setRequestTimeout(long requestTimeout) {
+        this.requestTimeout = requestTimeout;
+    }
+
+    public String getReplyTo() {
+        return replyToDestination;
+    }
+
+    public void setReplyTo(String replyToDestination) {
+        if (!replyToDestination.startsWith(QUEUE_PREFIX)) {
+            throw new IllegalArgumentException("ReplyTo destination value has to be of type queue; "
+                                              + "e.g: \"queue:replyQueue\"");
+        }
+        this.replyToDestination = 
+            removeStartingCharacters(replyToDestination.substring(QUEUE_PREFIX.length()), '/');
+    }
+
+    public String getReplyToDestinationSelectorName() {
+        return replyToDestinationSelectorName;
+    }
+
+    public void setReplyToDestinationSelectorName(String replyToDestinationSelectorName) {
+        this.replyToDestinationSelectorName = replyToDestinationSelectorName;
+        // in case of consumer -> producer and a named replyTo correlation selector
+        // message passthough is impossible as we need to set the value of selector into 
+        // outgoing message, which would be read-only if passthough were to remain enabled
+        if (replyToDestinationSelectorName != null) {
+            setAlwaysCopyMessage(true);
+        }
+    }
 }

Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?rev=653015&r1=653014&r2=653015&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java Sat May  3 01:31:48 2008
@@ -42,7 +42,7 @@
     private String selector;
     private JmsConfiguration configuration;
     private Requestor requestor;
-    private long requestTimeout = 20000L;
+    private long requestTimeout;
 
     public JmsEndpoint(String uri, JmsComponent component, String destination, boolean pubSubDomain, JmsConfiguration configuration) {
         super(uri, component);
@@ -50,6 +50,7 @@
         this.configuration = configuration;
         this.destination = destination;
         this.pubSubDomain = pubSubDomain;
+        this.requestTimeout = configuration.getRequestTimeout();
     }
 
     public JmsProducer createProducer() throws Exception {
@@ -86,9 +87,6 @@
     public JmsConsumer createConsumer(Processor processor, AbstractMessageListenerContainer listenerContainer) throws Exception {
         listenerContainer.setDestinationName(destination);
         listenerContainer.setPubSubDomain(pubSubDomain);
-        if (selector != null) {
-            listenerContainer.setMessageSelector(selector);
-        }
         return new JmsConsumer(this, processor, listenerContainer);
     }
 
@@ -165,7 +163,8 @@
 
     public synchronized Requestor getRequestor() throws Exception {
         if (requestor == null) {
-            requestor = component.getRequestor();
+            requestor = new Requestor(getConfiguration(), getExecutorService());
+            requestor.start();
         }
         return requestor;
     }

Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java?rev=653015&r1=653014&r2=653015&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java Sat May  3 01:31:48 2008
@@ -107,6 +107,11 @@
     }
 
     public void setJmsMessage(Message jmsMessage) {
+        try {
+            setMessageId(jmsMessage.getJMSMessageID());
+        } catch (JMSException e) {
+            LOG.warn("Unable to retrieve JMSMessageID from JMS Message", e);
+        }
         this.jmsMessage = jmsMessage;
     }
 

Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java?rev=653015&r1=653014&r2=653015&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java Sat May  3 01:31:48 2008
@@ -19,6 +19,7 @@
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.jms.Destination;
 import javax.jms.JMSException;
@@ -27,9 +28,12 @@
 
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangeTimedOutException;
+import org.apache.camel.FailedToCreateProducerException;
+import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.RuntimeExchangeException;
 import org.apache.camel.component.jms.JmsConfiguration.CamelJmsTemplate;
 import org.apache.camel.component.jms.requestor.DeferredRequestReplyMap;
+import org.apache.camel.component.jms.requestor.PersistentReplyToRequestor;
 import org.apache.camel.component.jms.requestor.DeferredRequestReplyMap.DeferredMessageSentCallback;
 import org.apache.camel.component.jms.requestor.Requestor;
 import org.apache.camel.impl.DefaultProducer;
@@ -50,10 +54,32 @@
     private JmsOperations inOutTemplate;
     private UuidGenerator uuidGenerator;
     private DeferredRequestReplyMap deferredRequestReplyMap;
+    private Requestor requestor;
+    RequestorAffinity affinity;
+    private AtomicBoolean started = new AtomicBoolean(false);
+    
+    private enum RequestorAffinity {
+        PER_COMPONENT(0),
+        PER_ENDPOINT(1),
+        PER_PRODUCER(2);
+        private int value;
+        private RequestorAffinity(int value) {
+            this.value = value;
+        }
+    };
 
     public JmsProducer(JmsEndpoint endpoint) {
         super(endpoint);
         this.endpoint = endpoint;
+        JmsConfiguration c = endpoint.getConfiguration();
+        affinity = RequestorAffinity.PER_PRODUCER;
+        if (c.getReplyTo() != null) {
+            if (c.getReplyToTempDestinationAffinity().equals(c.REPLYTO_TEMP_DEST_AFFINITY_PER_ENDPOINT)) {
+                affinity = RequestorAffinity.PER_ENDPOINT;
+            } else if (c.getReplyToTempDestinationAffinity().equals(c.REPLYTO_TEMP_DEST_AFFINITY_PER_COMPONENT)) {
+                affinity = RequestorAffinity.PER_COMPONENT;
+            }
+        }
     }
 
     public long getRequestTimeout() {
@@ -62,12 +88,57 @@
 
     protected void doStart() throws Exception {
         super.doStart();
-        deferredRequestReplyMap = endpoint.getRequestor().getDeferredRequestReplyMap(this);
     }
 
+    protected void testAndSetRequestor() throws RuntimeCamelException {
+        if (started.get() == false) {
+            synchronized (this) {
+                if (started.get() == true) {
+                    return;
+                }
+                try {
+                    JmsConfiguration c = endpoint.getConfiguration();
+                    if (c.getReplyTo() != null) {
+                        requestor = new PersistentReplyToRequestor(endpoint.getConfiguration(), 
+                                                                   endpoint.getExecutorService());
+                        requestor.start();
+                    } else {
+                        if (affinity == RequestorAffinity.PER_PRODUCER) {
+                            requestor = new Requestor(endpoint.getConfiguration(), 
+                                                      endpoint.getExecutorService());
+                            requestor.start();
+                        } else if (affinity == RequestorAffinity.PER_ENDPOINT) {
+                            requestor = endpoint.getRequestor();
+                        } else if (affinity == RequestorAffinity.PER_COMPONENT) {
+                            requestor = ((JmsComponent)endpoint.getComponent()).getRequestor();
+                        }
+                    }
+                } catch (Exception e) {
+                    throw new FailedToCreateProducerException(endpoint, e);
+                }
+                deferredRequestReplyMap = requestor.getDeferredRequestReplyMap(this);
+                started.set(true);
+            }
+        }
+    }
+    
+    protected void testAndUnsetRequestor() throws Exception  {
+        if (started.get() == true) {
+            synchronized (this) {
+                if (started.get() == false) {
+                    return;
+                }
+                requestor.removeDeferredRequestReplyMap(this);
+                if (affinity == RequestorAffinity.PER_PRODUCER) {
+                    requestor.stop();
+                }
+                started.set(false);
+            }
+        }
+    }
+    
     protected void doStop() throws Exception {
-        endpoint.getRequestor().removeDeferredRequestReplyMap(this);
-        deferredRequestReplyMap = null;
+        testAndUnsetRequestor();
         super.doStop();
     }
 
@@ -75,21 +146,20 @@
         final org.apache.camel.Message in = exchange.getIn();
 
         if (exchange.getPattern().isOutCapable()) {
-            // create a temporary queue and consumer for responses...
+            
+            testAndSetRequestor();
+            
             // note due to JMS transaction semantics we cannot use a single transaction
             // for sending the request and receiving the response
-            final Requestor requestor;
-            try {
-                requestor = endpoint.getRequestor();
-            } catch (Exception e) {
-                throw new RuntimeExchangeException(e, exchange);
-            }
-
             final Destination replyTo = requestor.getReplyTo();
-
+            
+            if (replyTo == null) {
+                throw new RuntimeExchangeException("Failed to resolve replyTo destination", exchange);
+            }
+            
             final boolean msgIdAsCorrId = endpoint.getConfiguration().isUseMessageIDAsCorrelationID();
             String correlationId = in.getHeader("JMSCorrelationID", String.class);
-
+            
             if (correlationId == null && !msgIdAsCorrId) {
                 in.setHeader("JMSCorrelationID", getUuidGenerator().generateId());
             }
@@ -102,6 +172,7 @@
                 public Message createMessage(Session session) throws JMSException {
                     Message message = endpoint.getBinding().makeJmsMessage(exchange, in, session);
                     message.setJMSReplyTo(replyTo);
+                    requestor.setReplyToSelectorHeader(in, message);
 
                     FutureTask future = null;
                     future = (!msgIdAsCorrId)
@@ -116,6 +187,8 @@
                     return message;
                 }
             }, callback);
+            
+            setMessageId(exchange);
 
             // lets wait and return the response
             long requestTimeout = endpoint.getRequestTimeout();
@@ -159,9 +232,27 @@
                     return message;
                 }
             });
+            
+            setMessageId(exchange);            
         }
     }
 
+    protected void setMessageId(Exchange exchange) {
+        if (!(exchange instanceof JmsExchange)) {
+            return;
+        }
+        try {
+            JmsExchange jmsExchange = JmsExchange.class.cast(exchange); 
+            JmsMessage out = jmsExchange.getOut(false);
+            if (out != null) {
+                out.setMessageId(out.getJmsMessage().getJMSMessageID());
+            }
+        } catch (JMSException e) {
+            LOG.warn("Unable to retrieve JMSMessageID from outgoing JMS Message and " 
+                     + "set it into Camel's MessageId", e);
+        }
+    }
+    
     /**
      * Preserved for backwards compatibility.
      *

Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/DeferredRequestReplyMap.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/DeferredRequestReplyMap.java?rev=653015&r1=653014&r2=653015&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/DeferredRequestReplyMap.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/DeferredRequestReplyMap.java Sat May  3 01:31:48 2008
@@ -38,6 +38,7 @@
     public static class DeferredMessageSentCallback implements MessageSentCallback {
         private DeferredRequestReplyMap map;
         private String transitionalID;
+        private Message message;
         private Object monitor;
 
         public DeferredMessageSentCallback(DeferredRequestReplyMap map, UuidGenerator uuidGenerator, Object monitor) {
@@ -54,7 +55,12 @@
             return transitionalID;
         }
 
+        public Message getMessage() {
+            return message;
+        }
+        
         public void sent(Message message) {
+            this.message = message;
             map.processDeferredReplies(monitor, getID(), message);
         }
     }
@@ -124,6 +130,7 @@
                 }
                 deferredRequestMap.remove(transitionalID);
                 String correlationID = outMessage.getJMSMessageID();
+                // System.out.println("DeferredRequestReplyMap.processDeferredReplies: sent messageID = " + correlationID);
                 Object in = deferredReplyMap.get(correlationID);
 
                 if (in != null && in instanceof Message) {

Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/FutureHandler.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/FutureHandler.java?rev=653015&r1=653014&r2=653015&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/FutureHandler.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/FutureHandler.java Sat May  3 01:31:48 2008
@@ -28,9 +28,10 @@
  *
  * @version $Revision$
  */
-public class FutureHandler extends FutureTask implements ReplyHandler {
-    private static final Callable EMPTY_CALLABLE = new Callable() {
-        public Object call() throws Exception {
+public class FutureHandler extends FutureTask<Message> implements ReplyHandler {
+    
+    private static final Callable<Message> EMPTY_CALLABLE = new Callable<Message>() {
+        public Message call() throws Exception {
             return null;
         }
     };
@@ -39,7 +40,7 @@
         super(EMPTY_CALLABLE);
     }
 
-    public synchronized void set(Object result) {
+    public synchronized void set(Message result) {
         super.set(result);
     }
 

Added: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/MessageSelectorProvider.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/MessageSelectorProvider.java?rev=653015&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/MessageSelectorProvider.java (added)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/MessageSelectorProvider.java Sat May  3 01:31:48 2008
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms.requestor;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class MessageSelectorProvider {
+    protected Map<String, String> correlationIds;
+    protected boolean dirty = true;
+    protected StringBuilder expression;
+
+    public MessageSelectorProvider() {
+        correlationIds = new HashMap<String, String>();
+    }
+
+    public synchronized void addCorrelationID(String id) {
+        correlationIds.put(id, id);
+        dirty = true;
+    }
+
+    public synchronized void removeCorrelationID(String id) {
+        correlationIds.remove(id);
+        dirty = true;
+    }
+
+    public synchronized String get() {
+        if (!dirty) {
+            return expression.toString();
+        }
+        expression = new StringBuilder("JMSCorrelationID='");
+        boolean first = true;
+        for (Map.Entry<String, String> entry : correlationIds.entrySet()) {
+            if (!first) {
+                expression.append(" OR JMSCorrelationID='");
+            }
+            expression.append(entry.getValue()).append("'");
+            if (first) {
+                first = false;
+            }
+        }
+        dirty = false;
+        return expression.toString();
+    }
+}

Propchange: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/MessageSelectorProvider.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/MessageSelectorProvider.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/PersistentReplyToFutureHandler.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/PersistentReplyToFutureHandler.java?rev=653015&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/PersistentReplyToFutureHandler.java (added)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/PersistentReplyToFutureHandler.java Sat May  3 01:31:48 2008
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms.requestor;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.jms.ExceptionListener;
+import javax.jms.Message;
+import javax.jms.Session;
+
+import org.apache.camel.component.jms.JmsConfiguration;
+import org.apache.camel.component.jms.JmsProducer;
+import org.apache.camel.component.jms.requestor.DeferredRequestReplyMap.DeferredMessageSentCallback;
+import org.apache.camel.component.jms.requestor.PersistentReplyToRequestor.MessageSelectorComposer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.core.task.TaskExecutor;
+import org.springframework.jms.listener.AbstractMessageListenerContainer;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
+import org.springframework.transaction.PlatformTransactionManager;
+
+public class PersistentReplyToFutureHandler extends FutureHandler {
+
+    private static final transient Log LOG = LogFactory.getLog(PersistentReplyToFutureHandler.class);
+    protected PersistentReplyToRequestor requestor;
+    protected DeferredMessageSentCallback callback;
+    protected String correlationID;
+
+    public PersistentReplyToFutureHandler(PersistentReplyToRequestor requestor,
+                                          String correlationID) {
+        super();
+        this.requestor = requestor;
+        this.correlationID = correlationID;
+    }
+
+    public PersistentReplyToFutureHandler(PersistentReplyToRequestor requestor,
+                                          DeferredMessageSentCallback callback) {
+        super();
+        this.requestor = requestor;
+        this.callback = callback;
+    }
+
+    @Override
+    public Message get() throws InterruptedException, ExecutionException {
+        Message result = null;
+        try {
+            updateSelector();
+            result = super.get();
+        } finally {
+            revertSelector();
+        }
+        return result;
+    }
+
+    @Override
+    public Message get(long timeout, TimeUnit unit) throws InterruptedException,
+                                                           ExecutionException,
+                                                           TimeoutException {
+        Message result = null;
+        try {
+            updateSelector();
+            result = super.get(timeout, unit);
+        } finally {
+            revertSelector();
+        }
+        return result;
+    }
+
+    protected void updateSelector() throws ExecutionException {
+        try {
+            MessageSelectorComposer composer = (MessageSelectorComposer)requestor.getListenerContainer();
+            composer.addCorrelationID((correlationID != null) ? correlationID : callback.getMessage().getJMSMessageID());
+        } catch (Exception e) {
+            throw new ExecutionException(e);
+        }
+    }
+
+    protected void revertSelector() throws ExecutionException {
+        try {
+            MessageSelectorComposer composer = (MessageSelectorComposer)requestor.getListenerContainer();
+            composer.removeCorrelationID((correlationID != null) ? correlationID : callback.getMessage().getJMSMessageID());
+        } catch (Exception e) {
+            throw new ExecutionException(e);
+        }
+    }
+}

Propchange: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/PersistentReplyToFutureHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/PersistentReplyToFutureHandler.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/PersistentReplyToRequestor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/PersistentReplyToRequestor.java?rev=653015&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/PersistentReplyToRequestor.java (added)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/PersistentReplyToRequestor.java Sat May  3 01:31:48 2008
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms.requestor;
+
+import java.math.BigInteger;
+import java.util.Random;
+import java.util.concurrent.ScheduledExecutorService;
+
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Session;
+
+import org.apache.camel.component.jms.JmsConfiguration;
+import org.apache.camel.component.jms.requestor.DeferredRequestReplyMap.DeferredMessageSentCallback;
+import org.springframework.core.task.TaskExecutor;
+import org.springframework.jms.listener.AbstractMessageListenerContainer;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
+import org.springframework.jms.listener.DefaultMessageListenerContainer102;
+import org.springframework.jms.support.destination.DestinationResolver;
+import org.springframework.transaction.PlatformTransactionManager;
+
+public class PersistentReplyToRequestor extends Requestor {
+    private String replyToSelectorValue;
+
+    public class DestinationResolverDelegate implements DestinationResolver {
+        private DestinationResolver delegate;
+        private Destination destination;
+
+        public DestinationResolverDelegate(DestinationResolver delegate) {
+            this.delegate = delegate;
+        }
+
+        public Destination resolveDestinationName(Session session, String destinationName,
+                                                  boolean pubSubDomain) throws JMSException {
+            synchronized (getOutterInstance()) {
+                try {
+                    if (destination == null) {
+                        destination = delegate.resolveDestinationName(session, destinationName, pubSubDomain);
+                        setReplyTo(destination);
+                    }
+                } finally {
+                    getOutterInstance().notifyAll();
+                }
+            }
+            return destination;
+        }
+    };
+
+    public static interface MessageSelectorComposer {
+        void addCorrelationID(String id);
+        void removeCorrelationID(String id);
+    }
+
+    public static class CamelDefaultMessageListenerContainer102 extends DefaultMessageListenerContainer102
+                                                                implements MessageSelectorComposer {
+        MessageSelectorProvider provider = new MessageSelectorProvider();
+
+        public void addCorrelationID(String id) {
+            provider.addCorrelationID(id);
+        }
+
+        public void removeCorrelationID(String id) {
+            provider.removeCorrelationID(id);
+        }
+
+        @Override
+        public void setMessageSelector(String messageSelector) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public String getMessageSelector() {
+            return provider.get();
+        }
+    }
+
+    public static class CamelDefaultMessageListenerContainer extends DefaultMessageListenerContainer
+                                                             implements MessageSelectorComposer {
+
+        MessageSelectorProvider provider = new MessageSelectorProvider();
+
+        public void addCorrelationID(String id) {
+            provider.addCorrelationID(id);
+        }
+
+        public void removeCorrelationID(String id) {
+            provider.removeCorrelationID(id);
+        }
+
+        @Override
+        public void setMessageSelector(String messageSelector) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public String getMessageSelector() {
+            return provider.get();
+        }
+    }
+
+    public PersistentReplyToRequestor(JmsConfiguration configuration,
+                                      ScheduledExecutorService executorService) {
+        super(configuration, executorService);
+    }
+
+
+    @Override
+    protected FutureHandler createFutureHandler(String correlationID) {
+        boolean dynamicSelector = (getConfiguration().getReplyToDestinationSelectorName() == null);
+        if (dynamicSelector) {
+            return new PersistentReplyToFutureHandler(this, correlationID);
+        }
+        return new FutureHandler();
+    }
+
+    @Override
+    protected FutureHandler createFutureHandler(DeferredMessageSentCallback callback) {
+        boolean dynamicSelector = (getConfiguration().getReplyToDestinationSelectorName() == null);
+        if (dynamicSelector) {
+            return new PersistentReplyToFutureHandler(this, callback);
+        }
+        return new FutureHandler();
+    }
+
+    @Override
+    public AbstractMessageListenerContainer createListenerContainer() {
+        JmsConfiguration config = getConfiguration();
+        String replyToSelectorName = getConfiguration().getReplyToDestinationSelectorName();
+
+        AbstractMessageListenerContainer container =
+            config.isUseVersion102() ?
+                    (replyToSelectorName != null) ? new DefaultMessageListenerContainer102()
+                           : new CamelDefaultMessageListenerContainer102()
+                    : (replyToSelectorName != null) ? new DefaultMessageListenerContainer()
+                           : new CamelDefaultMessageListenerContainer();
+
+        container.setConnectionFactory(config.getListenerConnectionFactory());
+
+        DestinationResolver resolver = config.getDestinationResolver();
+        if (resolver == null) {
+            resolver = container.getDestinationResolver();
+        }
+
+        container.setDestinationResolver(new DestinationResolverDelegate(resolver));
+        container.setDestinationName(getConfiguration().getReplyTo());
+
+        if (replyToSelectorName != null) {
+            replyToSelectorValue = "ID:" + new BigInteger(24 * 8, new Random()).toString(16);
+            container.setMessageSelector(replyToSelectorName + "='" + replyToSelectorValue + "'");
+        } else {
+            ((MessageSelectorComposer)container).addCorrelationID("ID:" + new BigInteger(24 * 8, new Random()).toString(16));
+        }
+
+        container.setAutoStartup(true);
+        container.setMessageListener(this);
+        container.setPubSubDomain(false);
+        container.setSubscriptionDurable(false);
+        ExceptionListener exceptionListener = config.getExceptionListener();
+        if (exceptionListener != null) {
+            container.setExceptionListener(exceptionListener);
+        }
+        container.setSessionTransacted(config.isTransacted());
+        if (config.isTransacted()) {
+            container.setSessionAcknowledgeMode(Session.SESSION_TRANSACTED);
+        } else {
+            if (config.getAcknowledgementMode() >= 0) {
+                container.setSessionAcknowledgeMode(config.getAcknowledgementMode());
+            } else if (config.getAcknowledgementModeName() != null) {
+                container.setSessionAcknowledgeModeName(config.getAcknowledgementModeName());
+            }
+        }
+        if (container instanceof DefaultMessageListenerContainer) {
+            DefaultMessageListenerContainer defContainer = (DefaultMessageListenerContainer)container;
+            defContainer.setConcurrentConsumers(1);
+            defContainer.setCacheLevel(DefaultMessageListenerContainer.CACHE_SESSION);
+
+            if (config.getReceiveTimeout() >= 0) {
+                defContainer.setReceiveTimeout(config.getReceiveTimeout());
+            }
+            if (config.getRecoveryInterval() >= 0) {
+                defContainer.setRecoveryInterval(config.getRecoveryInterval());
+            }
+            TaskExecutor taskExecutor = config.getTaskExecutor();
+            if (taskExecutor != null) {
+                defContainer.setTaskExecutor(taskExecutor);
+            }
+            PlatformTransactionManager tm = config.getTransactionManager();
+            if (tm != null) {
+                defContainer.setTransactionManager(tm);
+            } else if (config.isTransacted()) {
+                throw new IllegalArgumentException("Property transacted is enabled but a transactionManager was not injected!");
+            }
+            if (config.getTransactionName() != null) {
+                defContainer.setTransactionName(config.getTransactionName());
+            }
+            if (config.getTransactionTimeout() >= 0) {
+                defContainer.setTransactionTimeout(config.getTransactionTimeout());
+            }
+        }
+        return container;
+    }
+
+    @Override
+    public void setReplyToSelectorHeader(org.apache.camel.Message in, Message jmsIn) throws JMSException {
+        String replyToSelectorName = getConfiguration().getReplyToDestinationSelectorName();
+        if (replyToSelectorValue != null) {
+            in.setHeader(replyToSelectorName, replyToSelectorValue);
+            jmsIn.setStringProperty(replyToSelectorName, replyToSelectorValue);
+        }
+    }
+}

Propchange: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/PersistentReplyToRequestor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/PersistentReplyToRequestor.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/Requestor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/Requestor.java?rev=653015&r1=653014&r2=653015&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/Requestor.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/Requestor.java Sat May  3 01:31:48 2008
@@ -56,9 +56,10 @@
     private TimeoutMap requestMap;
     private Map<JmsProducer, DeferredRequestReplyMap> producerDeferredRequestReplyMap;
     private TimeoutMap deferredRequestMap;
-    private TimeoutMap deferredReplyMap;
+    private TimeoutMap deferredReplyMap;    
     private Destination replyTo;
     private long maxRequestTimeout = -1;
+    private long replyToResolverTimeout = 5000;
 
 
     public Requestor(JmsConfiguration configuration, ScheduledExecutorService executorService) {
@@ -75,17 +76,21 @@
         if (map == null) {
             map = new DeferredRequestReplyMap(this, producer, deferredRequestMap, deferredReplyMap);
             producerDeferredRequestReplyMap.put(producer, map);
-        }
-        if (maxRequestTimeout == -1) {
-            maxRequestTimeout = producer.getRequestTimeout();
-        } else if (maxRequestTimeout < producer.getRequestTimeout()) {
-            maxRequestTimeout = producer.getRequestTimeout();
+            if (maxRequestTimeout == -1) {
+                maxRequestTimeout = producer.getRequestTimeout();
+            } else if (maxRequestTimeout < producer.getRequestTimeout()) {
+                maxRequestTimeout = producer.getRequestTimeout();
+            }
         }
         return map;
     }
 
     public synchronized void removeDeferredRequestReplyMap(JmsProducer producer) {
-        producerDeferredRequestReplyMap.remove(producer);
+        DeferredRequestReplyMap map = producerDeferredRequestReplyMap.remove(producer);
+        if (map == null) {
+            // already removed;
+            return;
+        }
         if (maxRequestTimeout == producer.getRequestTimeout()) {
             long max = -1;
             for (Map.Entry<JmsProducer, DeferredRequestReplyMap> entry : producerDeferredRequestReplyMap.entrySet()) {
@@ -114,26 +119,30 @@
     }
 
     public FutureTask getReceiveFuture(String correlationID, long requestTimeout) {
-        FutureTask future = null;
-
-        if (future == null) {
-            FutureHandler futureHandler = new FutureHandler();
-            future = futureHandler;
-            requestMap.put(correlationID, futureHandler, requestTimeout);
-        }
+        FutureHandler future = createFutureHandler(correlationID);
+        requestMap.put(correlationID, future, requestTimeout);
         return future;
     }
 
     public FutureTask getReceiveFuture(DeferredMessageSentCallback callback) {
-        FutureTask future = new FutureHandler();
+        FutureHandler future = createFutureHandler(callback);
         DeferredRequestReplyMap map = callback.getDeferredRequestReplyMap();
         map.put(callback, future);
         return future;
     }
+    
+    protected FutureHandler createFutureHandler(String correlationID) {
+        return new FutureHandler();
+    }
+
+    protected FutureHandler createFutureHandler(DeferredMessageSentCallback callback) {
+        return new FutureHandler();
+    }
 
     public void onMessage(Message message) {
         try {
             String correlationID = message.getJMSCorrelationID();
+            // System.out.println("Requestor.onMessage: correlationID " + correlationID);
             if (correlationID == null) {
                 LOG.warn("Ignoring message with no correlationID! " + message);
                 return;
@@ -169,6 +178,15 @@
     }
 
     public Destination getReplyTo() {
+        synchronized(this) {
+            try {
+                if (replyTo == null) {
+                    wait(replyToResolverTimeout);
+                }
+            } catch (Throwable e) {
+                // eat it
+            }
+        }
         return replyTo;
     }
 
@@ -179,11 +197,13 @@
     // Implementation methods
     //-------------------------------------------------------------------------
 
+    @Override
     protected void doStart() throws Exception {
         AbstractMessageListenerContainer container = getListenerContainer();
         container.afterPropertiesSet();
     }
 
+    @Override
     protected void doStop() throws Exception {
         if (listenerContainer != null) {
             listenerContainer.stop();
@@ -191,6 +211,10 @@
         }
     }
 
+    protected Requestor getOutterInstance() {
+        return this;
+    }
+    
     protected AbstractMessageListenerContainer createListenerContainer() {
         SimpleMessageListenerContainer answer = configuration.isUseVersion102()
             ? new SimpleMessageListenerContainer102() : new SimpleMessageListenerContainer();
@@ -199,8 +223,15 @@
 
             public Destination resolveDestinationName(Session session, String destinationName,
                                                       boolean pubSubDomain) throws JMSException {
-                TemporaryQueue queue = session.createTemporaryQueue();
-                replyTo = queue;
+                TemporaryQueue queue = null;
+                synchronized (getOutterInstance()) {
+                    try {
+                        queue = session.createTemporaryQueue();
+                        setReplyTo(queue);
+                    } finally {
+                        getOutterInstance().notifyAll();                        
+                    }
+                }
                 return queue;
             }
         });
@@ -232,4 +263,12 @@
         }
         return uuidGenerator;
     }
+
+    protected JmsConfiguration getConfiguration() {
+        return configuration;
+    }
+    
+    public void setReplyToSelectorHeader(org.apache.camel.Message in, Message jmsIn) throws JMSException {
+        // complete
+    }
 }

Modified: activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteRequestReplyTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteRequestReplyTest.java?rev=653015&r1=653014&r2=653015&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteRequestReplyTest.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteRequestReplyTest.java Sat May  3 01:31:48 2008
@@ -16,7 +16,6 @@
  */
 package org.apache.camel.component.jms;
 
-import java.lang.reflect.Method;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -28,9 +27,9 @@
 import org.apache.camel.CamelContext;
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
+import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
 
 import static org.apache.camel.component.jms.JmsComponent.jmsComponentClientAcknowledge;
 
@@ -38,15 +37,20 @@
  * @version $Revision$
  */
 public class JmsRouteRequestReplyTest extends ContextTestSupport {
+    protected static String REPLY_TO_DESTINATION_SELECTOR_NAME = "camelProducer";
     protected static String componentName = "amq";
     protected static String componentName1 = "amq1";
     protected static String endpoingUriA = componentName + ":queue:test.a";
     protected static String endpointUriB = componentName + ":queue:test.b";
     protected static String endpointUriB1 = componentName1 + ":queue:test.b";
+    // note that the replyTo both A and B endpoints share the persistent replyTo queue, 
+    // which is one more way to verify that reply listeners of A and B endpoints don't steal each other messages
+    protected static String endpoingtReplyToUriA = componentName + ":queue:test.a?replyTo=queue:test.a.reply";
+    protected static String endpoingtReplyToUriB = componentName + ":queue:test.b?replyTo=queue:test.a.reply";
     protected static String request = "Hello World";
     protected static String expectedReply = "Re: " + request;
     protected static int maxTasks = 100;
-    protected static int maxServerTasks = maxTasks / 5;
+    protected static int maxServerTasks = 1/*maxTasks / 5*/;
     protected static int maxCalls = 10;
     protected static AtomicBoolean inited = new AtomicBoolean(false);
     protected static Map<String, ContextBuilder> contextBuilders = new HashMap<String, ContextBuilder>();
@@ -55,7 +59,7 @@
     private interface ContextBuilder {
         CamelContext buildContext(CamelContext context) throws Exception;
     }
-
+    
     public static class SingleNodeDeadEndRouteBuilder extends RouteBuilder {
         public void configure() throws Exception {
             from(endpoingUriA).process(new Processor() {
@@ -89,6 +93,22 @@
         }
     };
 
+    public static class MultiNodeReplyToRouteBuilder extends RouteBuilder {
+        public void configure() throws Exception {
+            from(endpoingtReplyToUriA).to(endpoingtReplyToUriB);
+            from(endpointUriB).process(new Processor() {
+                public void process(Exchange e) {
+                    Message in = e.getIn();
+                    Message out = e.getOut(true);
+                    String selectorValue = in.getHeader(REPLY_TO_DESTINATION_SELECTOR_NAME, String.class);
+                    String request = in.getBody(String.class);                    
+                    out.setHeader(REPLY_TO_DESTINATION_SELECTOR_NAME, selectorValue);
+                    out.setBody(expectedReply + request.substring(request.indexOf('-')));
+                }
+            });
+        }
+    };
+
     public static class MultiNodeDiffCompRouteBuilder extends RouteBuilder {
         public void configure() throws Exception {
             from(endpoingUriA).to(endpointUriB1);
@@ -100,34 +120,88 @@
             });
         }
     };
+    
+    public static class ContextBuilderMessageID implements ContextBuilder {
+        public CamelContext buildContext(CamelContext context) throws Exception {
+            ConnectionFactory connectionFactory =
+                new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+            JmsComponent jmsComponent = jmsComponentClientAcknowledge(connectionFactory);
+            jmsComponent.setUseMessageIDAsCorrelationID(true);
+            jmsComponent.setConcurrentConsumers(maxServerTasks);
+            /*
+            jmsComponent.getConfiguration().setRequestTimeout(600000);
+            jmsComponent.getConfiguration().setRequestMapPurgePollTimeMillis(30000);
+             */
+            context.addComponent(componentName, jmsComponent);
+            return context;
+        }        
+    };
+    
+    public static class ContextBuilderMessageIDReplyToTempDestinationAffinity extends ContextBuilderMessageID {
+        private String affinity;
+        public ContextBuilderMessageIDReplyToTempDestinationAffinity(String affinity) {
+            this.affinity = affinity;
+        }
+        public CamelContext buildContext(CamelContext context) throws Exception {
+            super.buildContext(context);
+            JmsComponent component = context.getComponent(componentName, JmsComponent.class);
+            component.getConfiguration().setReplyToTempDestinationAffinity(affinity);
+            return context;
+        }
+    }
 
     protected static void init() {
         if (inited.compareAndSet(false, true)) {
 
-            ContextBuilder contextBuilderMessageID = new ContextBuilder() {
+            ContextBuilder contextBuilderMessageID = new ContextBuilderMessageID();
+            ContextBuilder contextBuilderMessageIDReplyToTempDestinationPerComponent = 
+                new ContextBuilderMessageIDReplyToTempDestinationAffinity("component");
+            ContextBuilder contextBuilderMessageIDReplyToTempDestinationPerProducer = 
+                new ContextBuilderMessageIDReplyToTempDestinationAffinity("producer");
+
+            ContextBuilder contextBuilderCorrelationID = new ContextBuilder() {
                 public CamelContext buildContext(CamelContext context) throws Exception {
                     ConnectionFactory connectionFactory =
                         new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
                     JmsComponent jmsComponent = jmsComponentClientAcknowledge(connectionFactory);
-                    jmsComponent.setUseMessageIDAsCorrelationID(true);
+                    jmsComponent.setUseMessageIDAsCorrelationID(false);
                     jmsComponent.setConcurrentConsumers(maxServerTasks);
+                    /*
+                    jmsComponent.getConfiguration().setRequestTimeout(600000);
+                    jmsComponent.getConfiguration().setRequestMapPurgePollTimeMillis(60000);
+                    */
                     context.addComponent(componentName, jmsComponent);
                     return context;
                 }
             };
 
-            ContextBuilder contextBuilderCorrelationID = new ContextBuilder() {
+            ContextBuilder contextBuilderMessageIDNamedReplyToSelector = new ContextBuilder() {
+                public CamelContext buildContext(CamelContext context) throws Exception {
+                    ConnectionFactory connectionFactory =
+                        new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+                    JmsComponent jmsComponent = jmsComponentClientAcknowledge(connectionFactory);
+                    jmsComponent.setUseMessageIDAsCorrelationID(true);
+                    jmsComponent.setConcurrentConsumers(maxServerTasks);
+                    jmsComponent.getConfiguration().setReplyToDestinationSelectorName(REPLY_TO_DESTINATION_SELECTOR_NAME);
+                    context.addComponent(componentName, jmsComponent);
+                    return context;
+                }
+            };
+            
+            ContextBuilder contextBuilderCorrelationIDNamedReplyToSelector = new ContextBuilder() {
                 public CamelContext buildContext(CamelContext context) throws Exception {
                     ConnectionFactory connectionFactory =
                         new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
                     JmsComponent jmsComponent = jmsComponentClientAcknowledge(connectionFactory);
                     jmsComponent.setUseMessageIDAsCorrelationID(false);
                     jmsComponent.setConcurrentConsumers(maxServerTasks);
+                    jmsComponent.getConfiguration().setReplyToDestinationSelectorName(REPLY_TO_DESTINATION_SELECTOR_NAME);
                     context.addComponent(componentName, jmsComponent);
                     return context;
                 }
             };
 
+            
             ContextBuilder contextBuilderCorrelationIDDiffComp = new ContextBuilder() {
                 public CamelContext buildContext(CamelContext context) throws Exception {
                     ConnectionFactory connectionFactory =
@@ -144,20 +218,71 @@
                 }
             };
 
+            ContextBuilder contextBuilderMessageIDDiffComp = new ContextBuilder() {
+                public CamelContext buildContext(CamelContext context) throws Exception {
+                    ConnectionFactory connectionFactory =
+                        new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+                    JmsComponent jmsComponent = jmsComponentClientAcknowledge(connectionFactory);
+                    jmsComponent.setUseMessageIDAsCorrelationID(true);
+                    jmsComponent.setConcurrentConsumers(maxServerTasks);
+                    context.addComponent(componentName, jmsComponent);
+                    jmsComponent = jmsComponentClientAcknowledge(connectionFactory);
+                    jmsComponent.setUseMessageIDAsCorrelationID(true);
+                    jmsComponent.setConcurrentConsumers(maxServerTasks);
+                    context.addComponent(componentName1, jmsComponent);
+                    return context;
+                }
+            };
+
 
             contextBuilders.put("testUseMessageIDAsCorrelationID", contextBuilderMessageID);
+            contextBuilders.put("testUseMessageIDAsCorrelationIDReplyToTempDestinationPerComponent",
+                                 contextBuilderMessageIDReplyToTempDestinationPerComponent);
+            contextBuilders.put("testUseMessageIDAsCorrelationIDReplyToTempDestinationPerProducer", 
+                                 contextBuilderMessageIDReplyToTempDestinationPerProducer);
+            
             contextBuilders.put("testUseCorrelationID", contextBuilderCorrelationID);
             contextBuilders.put("testUseMessageIDAsCorrelationIDMultiNode", contextBuilderMessageID);
             contextBuilders.put("testUseCorrelationIDMultiNode", contextBuilderCorrelationID);
+
+            contextBuilders.put("testUseMessageIDAsCorrelationIDPersistReplyToMultiNode", contextBuilderMessageID);
+            contextBuilders.put("testUseCorrelationIDPersistReplyToMultiNode", contextBuilderCorrelationID);
+            
+            contextBuilders.put("testUseMessageIDAsCorrelationIDPersistMultiReplyToMultiNode", contextBuilderMessageID);
+            contextBuilders.put("testUseCorrelationIDPersistMultiReplyToMultiNode", contextBuilderCorrelationID);
+            
+            contextBuilders.put("testUseMessageIDAsCorrelationIDPersistMultiReplyToWithNamedSelectorMultiNode", 
+                                 contextBuilderMessageIDNamedReplyToSelector);
+            
+            contextBuilders.put("testUseCorrelationIDPersistMultiReplyToWithNamedSelectorMultiNode", 
+                                 contextBuilderCorrelationIDNamedReplyToSelector);
+            
             contextBuilders.put("testUseCorrelationIDMultiNodeDiffComponents", contextBuilderCorrelationIDDiffComp);
+            contextBuilders.put("testUseMessageIDAsCorrelationIDMultiNodeDiffComponents", contextBuilderMessageIDDiffComp);
             contextBuilders.put("testUseMessageIDAsCorrelationIDTimeout", contextBuilderMessageID);
             contextBuilders.put("testUseCorrelationIDTimeout", contextBuilderMessageID);
 
             routeBuilders.put("testUseMessageIDAsCorrelationID", new SingleNodeRouteBuilder());
+            routeBuilders.put("testUseMessageIDAsCorrelationIDReplyToTempDestinationPerComponent", new SingleNodeRouteBuilder());
+            routeBuilders.put("testUseMessageIDAsCorrelationIDReplyToTempDestinationPerProducer", new SingleNodeRouteBuilder());
             routeBuilders.put("testUseCorrelationID", new SingleNodeRouteBuilder());
             routeBuilders.put("testUseMessageIDAsCorrelationIDMultiNode", new MultiNodeRouteBuilder());
             routeBuilders.put("testUseCorrelationIDMultiNode", new MultiNodeRouteBuilder());
+            
+            routeBuilders.put("testUseMessageIDAsCorrelationIDPersistReplyToMultiNode", new MultiNodeRouteBuilder());
+            routeBuilders.put("testUseCorrelationIDPersistReplyToMultiNode", new MultiNodeRouteBuilder());
+            
+            routeBuilders.put("testUseMessageIDAsCorrelationIDPersistMultiReplyToMultiNode", new MultiNodeReplyToRouteBuilder());
+            routeBuilders.put("testUseCorrelationIDPersistMultiReplyToMultiNode", new MultiNodeReplyToRouteBuilder());
+            
+            routeBuilders.put("testUseMessageIDAsCorrelationIDPersistMultiReplyToWithNamedSelectorMultiNode", 
+                               new MultiNodeReplyToRouteBuilder());
+            
+            routeBuilders.put("testUseCorrelationIDPersistMultiReplyToWithNamedSelectorMultiNode", 
+                               new MultiNodeReplyToRouteBuilder());
+            
             routeBuilders.put("testUseCorrelationIDMultiNodeDiffComponents", new MultiNodeDiffCompRouteBuilder());
+            routeBuilders.put("testUseMessageIDAsCorrelationIDMultiNodeDiffComponents", new MultiNodeDiffCompRouteBuilder());
             routeBuilders.put("testUseMessageIDAsCorrelationIDTimeout", new SingleNodeDeadEndRouteBuilder());
             routeBuilders.put("testUseCorrelationIDTimeout", new SingleNodeDeadEndRouteBuilder());
         }
@@ -165,17 +290,19 @@
 
     public class Task extends Thread {
         private AtomicInteger counter;
+        private String fromUri;
         private boolean ok = true;
         private String message = "";
 
-        public Task(AtomicInteger counter) {
+        public Task(AtomicInteger counter, String fromUri) {
             this.counter = counter;
+            this.fromUri = fromUri;
         }
 
         public void run() {
             for (int i = 0; i < maxCalls; i++) {
                 int callId = counter.incrementAndGet();
-                Object reply = template.requestBody(endpoingUriA, request + "-" + callId);
+                Object reply = template.requestBody(fromUri, request + "-" + callId);
                 if (!reply.equals(expectedReply + "-" + callId)) {
                     ok = false;
                     message = "Unexpected reply. Expected: '" + expectedReply  + "-" + callId
@@ -195,61 +322,131 @@
     }
 
     public void testUseMessageIDAsCorrelationID() throws Exception {
-        runRequestReplyThreaded();
+        runRequestReplyThreaded(endpoingUriA);
+    }
+
+    public void testUseMessageIDAsCorrelationIDReplyToTempDestinationPerComponent() throws Exception {
+        runRequestReplyThreaded(endpoingUriA);
+    }
+    
+    public void testUseMessageIDAsCorrelationIDReplyToTempDestinationPerProducer() throws Exception {
+        runRequestReplyThreaded(endpoingUriA);
     }
 
     public void testUseCorrelationID() throws Exception {
-        runRequestReplyThreaded();
+        runRequestReplyThreaded(endpoingUriA);
     }
 
     public void testUseMessageIDAsCorrelationIDMultiNode() throws Exception {
-        runRequestReplyThreaded();
+        runRequestReplyThreaded(endpoingUriA);
+    }
+
+    public void testUseCorrelationIDMultiNode() throws Exception {
+        runRequestReplyThreaded(endpoingUriA);
+    }
+
+    public void testUseMessageIDAsCorrelationIDPersistReplyToMultiNode() throws Exception {
+        runRequestReplyThreaded(endpoingtReplyToUriA);
+    }
+
+    public void testUseCorrelationIDPersistReplyToMultiNode() throws Exception {
+        runRequestReplyThreaded(endpoingUriA);
     }
 
+    // (1)
+    // note this is an inefficient way of correlating replies to a persistent queue
+    // a consumer will have to be created for each reply message
+    // see testUseMessageIDAsCorrelationIDPersistMultiReplyToWithNamedSelectorMultiNode
+    // or testCorrelationIDPersistMultiReplyToWithNamedSelectorMultiNode
+    // for a faster way to do this. Note however that in this case the message copy has to occur
+    // between consumer -> producer as the selector value needs to be propagated to the ultimate
+    // destination, which in turn will copy this value back into the reply message
+    public void testUseMessageIDAsCorrelationIDPersistMultiReplyToMultiNode() throws Exception {
+        int oldMaxTasks = maxTasks;
+        int oldMaxServerTasks = maxServerTasks;
+        int oldMaxCalls = maxCalls;
+        
+        maxTasks = 10;
+        maxServerTasks = 1;
+        maxCalls = 2;
+        
+        try {
+            runRequestReplyThreaded(endpoingUriA);
+        } finally {
+            maxTasks = oldMaxTasks;
+            maxServerTasks = oldMaxServerTasks;
+            maxCalls = oldMaxCalls;
+        }
+    }
+
+    // see (1)
+    public void testUseCorrelationIDPersistMultiReplyToMultiNode() throws Exception {
+        int oldMaxTasks = maxTasks;
+        int oldMaxServerTasks = maxServerTasks;
+        int oldMaxCalls = maxCalls;
+        
+        maxTasks = 10;
+        maxServerTasks = 1;
+        maxCalls = 2;
+
+        try {
+            runRequestReplyThreaded(endpoingUriA);
+        } finally {
+            maxTasks = oldMaxTasks;
+            maxServerTasks = oldMaxServerTasks;
+            maxCalls = oldMaxCalls;
+        }
+    }
+
+    public void testUseMessageIDAsCorrelationIDPersistMultiReplyToWithNamedSelectorMultiNode() throws Exception {
+        runRequestReplyThreaded(endpoingUriA);
+    }
+
+    public void testUseCorrelationIDPersistMultiReplyToWithNamedSelectorMultiNode() throws Exception {
+        runRequestReplyThreaded(endpoingUriA);
+    }
+    
     public void testUseCorrelationIDTimeout() throws Exception {
+        JmsComponent c = (JmsComponent)context.getComponent(componentName);
+        c.getConfiguration().setRequestTimeout(1000);
+        c.getConfiguration().setRequestMapPurgePollTimeMillis(1000);
+
         Object reply = template.requestBody(endpoingUriA, request);
         assertEquals(reply, request);
-        JmsComponent c = (JmsComponent)context.getComponent(componentName);
+        
+        JmsEndpoint endpoint = template.getResolvedEndpoint(endpoingUriA, JmsEndpoint.class);
         // Wait 1 extra purge cycle to make sure that TimeoutMap had a chance to cleanup
-        Thread.sleep(c.getConfiguration().getRequestMapPurgePollTimeMillis());
-        assertTrue(c.getRequestor().getRequestMap().size() == 0);
+        Thread.sleep(endpoint.getConfiguration().getRequestMapPurgePollTimeMillis());
+        assertTrue(endpoint.getRequestor().getRequestMap().size() == 0);
     }
 
     public void testUseMessageIDAsCorrelationIDTimeout() throws Exception {
         JmsComponent c = (JmsComponent)context.getComponent(componentName);
+        c.getConfiguration().setRequestTimeout(1000);
+        c.getConfiguration().setRequestMapPurgePollTimeMillis(1000);
+        
         Object reply = template.requestBody(endpoingUriA, request);
         assertEquals(reply, request);
+        
+        JmsEndpoint endpoint = template.getResolvedEndpoint(endpoingUriA, JmsEndpoint.class);
         // Wait 1 extra purge cycle to make sure that TimeoutMap had a chance to cleanup
-        Thread.sleep(c.getConfiguration().getRequestMapPurgePollTimeMillis());
-        assertTrue(c.getRequestor().getDeferredRequestMap().size() == 0);
+        Thread.sleep(endpoint.getConfiguration().getRequestMapPurgePollTimeMillis());
+        assertTrue(endpoint.getRequestor().getDeferredRequestMap().size() == 0);
     }
 
     public void testUseCorrelationIDMultiNodeDiffComponents() throws Exception {
-        runRequestReplyThreaded();
+        runRequestReplyThreaded(endpoingUriA);
     }
 
-    /*
-     * REVISIT: This currently fails because there is a single instance of Requestor per JmsComponent
-     * which shares requestMap amongst JmsProducers. This is a problem in case where the same correlationID
-     * value travels between nodes serviced by the same JmsComponent:
-     * client -> producer1 -> corrId -> consumer1 -> producer2 -> corrId -> consumer
-     * producer1 (Bum! @) <- corrId <- consumer1 <- producer2 <- corrId <- reply
-     *
-     * @ - The request entry for corrId was already removed from JmsProducer shared requestMap
-     *
-     * Possible ways to solve this: Each JmsProducer gets its own replyTo destination
-     *
-
-        public void testUseCorrelationIDMultiNode() throws Exception {
-            runRequestReplyThreaded();
-        }
-    */
-
-    protected void runRequestReplyThreaded() throws Exception {
+    public void testUseMessageIDAsCorrelationIDMultiNodeDiffComponents() throws Exception {
+        runRequestReplyThreaded(endpoingUriA);
+    }
+    
+    protected void runRequestReplyThreaded(String fromUri) throws Exception {
         final AtomicInteger counter = new AtomicInteger(-1);
         Task[] tasks = new Task[maxTasks];
         for (int i = 0; i < maxTasks; ++i) {
-            Task task = new Task(counter);
+            Task task = new Task(counter, fromUri);
             tasks[i] = task;
             task.start();
         }

Modified: activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/ConditionalExceptionProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/ConditionalExceptionProcessor.java?rev=653015&r1=653014&r2=653015&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/ConditionalExceptionProcessor.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/ConditionalExceptionProcessor.java Sat May  3 01:31:48 2008
@@ -30,9 +30,15 @@
     private Logger log = Logger.getLogger(getClass());
     private int count;
 
+    public ConditionalExceptionProcessor() {
+        
+    }
+    
     public void process(Exchange exchange) throws Exception {
 
         setCount(getCount() + 1);
+        
+        // System.out.println(this + "; getCount() = " + getCount());
 
         AbstractTransactionTest
             .assertTrue(

Added: activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/QueueToQueueRequestReplyTransactionTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/QueueToQueueRequestReplyTransactionTest.java?rev=653015&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/QueueToQueueRequestReplyTransactionTest.java (added)
+++ activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/QueueToQueueRequestReplyTransactionTest.java Sat May  3 01:31:48 2008
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms.tx;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.component.jms.JmsComponent;
+import org.apache.camel.spi.Policy;
+import org.apache.camel.spring.SpringRouteBuilder;
+import org.apache.camel.spring.spi.SpringTransactionPolicy;
+import org.apache.log4j.Logger;
+
+/**
+ * Test case derived from:
+ * http://activemq.apache.org/camel/transactional-client.html and Martin
+ * Krasser's sample:
+ * http://www.nabble.com/JMS-Transactions---How-To-td15168958s22882.html#a15198803
+ * NOTE: had to split into separate test classes as I was unable to fully tear
+ * down and isolate the test cases, I'm not sure why, but as soon as we know the
+ * Transaction classes can be joined into one.
+ *
+ * @author Kevin Ross
+ */
+public class QueueToQueueRequestReplyTransactionTest extends AbstractTransactionTest {
+
+    private Logger log = Logger.getLogger(getClass());
+
+    public void testRollbackUsingXmlQueueToQueueRequestReplyUsingDynamicMessageSelector() throws Exception {
+
+        JmsComponent c = (JmsComponent)context.getComponent("activemq");
+        // c.getConfiguration().setRequestTimeout(600000);
+        JmsComponent c1 = (JmsComponent)context.getComponent("activemq-1");
+        
+        context.addRoutes(new SpringRouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                Policy required = bean(SpringTransactionPolicy.class, "PROPAGATION_REQUIRED_POLICY");
+                from("activemq:queue:foo?replyTo=queue:foo.reply").policy(required).process(new ConditionalExceptionProcessor()).to("activemq-1:queue:bar?replyTo=queue:bar.reply");
+                from("activemq-1:queue:bar").process(new Processor() {
+                    public void process(Exchange e) {
+                        String request = e.getIn().getBody(String.class);
+                        Message out = e.getOut(true);
+                        String selectorValue = e.getIn().getHeader("camelProvider", String.class);
+                        if (selectorValue != null) {
+                            out.setHeader("camelProvider", selectorValue);
+                        }
+                        out.setBody("Re: " + request);
+                    }
+                });
+            }
+        });
+
+        Object reply = template.requestBody("activemq:queue:foo", "blah");
+        assertTrue("Received unexpeced reply", reply.equals("Re: blah"));
+    }
+/*
+ * This is a working test but is commented out because there is bug in that ConditionalExceptionProcessor 
+ * gets somehow reused among different tests, which it should not and then the second test always get its request 
+ * flow rolled back
+ * 
+ * I didn't split this test into two separate tests as I think this will be a good reminder of the problem that
+ * needs fixing
+ * 
+ * The bellow log crearly shows the same processor reused between tests
+ *  testRollbackUsingXmlQueueToQueueRequestReplyUsingDynamicMessageSelector()
+ *  org.apache.camel.component.jms.tx.ConditionalExceptionProcessor@63a721; getCount() = 1
+ *  org.apache.camel.component.jms.tx.ConditionalExceptionProcessor@63a721; getCount() = 2
+ *       
+ *  testRollbackUsingXmlQueueToQueueRequestReplyUsingMessageSelectorPerProducer()
+ *  org.apache.camel.component.jms.tx.ConditionalExceptionProcessor@63a721; getCount() = 3
+ *  org.apache.camel.component.jms.tx.ConditionalExceptionProcessor@63a721; getCount() = 4
+*/
+    /*
+    public void testRollbackUsingXmlQueueToQueueRequestReplyUsingMessageSelectorPerProducer() throws Exception {
+
+        JmsComponent c = (JmsComponent)context.getComponent("activemq");
+        c.getConfiguration().setReplyToDestinationSelectorName("camelProvider");
+        JmsComponent c1 = (JmsComponent)context.getComponent("activemq-1");
+        c1.getConfiguration().setReplyToDestinationSelectorName("camelProvider");
+        
+        context.addRoutes(new SpringRouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                Policy required = bean(SpringTransactionPolicy.class, "PROPAGATION_REQUIRED_POLICY");
+                from("activemq:queue:foo?replyTo=queue:foo.reply").policy(required).process(new ConditionalExceptionProcessor()).to("activemq-1:queue:bar?replyTo=queue:bar.reply");
+                from("activemq-1:queue:bar").process(new Processor() {
+                    public void process(Exchange e) {
+                        System.out.println(e);
+                        String request = e.getIn().getBody(String.class);
+                        Message out = e.getOut(true);
+                        String selectorValue = e.getIn().getHeader("camelProvider", String.class);
+                        System.out.println("selectorValue = " + selectorValue);
+                        out.setHeader("camelProvider", selectorValue);
+                        out.setBody("Re: " + request);
+                    }
+                });
+            }
+        });
+
+        Object reply = template.requestBody("activemq:queue:foo", "blah");
+        assertTrue("Received unexpeced reply", reply.equals("Re: blah"));
+    }
+    */
+
+}

Propchange: activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/QueueToQueueRequestReplyTransactionTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/QueueToQueueRequestReplyTransactionTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/tx/AbstractTransactionTest.xml
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/tx/AbstractTransactionTest.xml?rev=653015&r1=653014&r2=653015&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/tx/AbstractTransactionTest.xml (original)
+++ activemq/camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/tx/AbstractTransactionTest.xml Sat May  3 01:31:48 2008
@@ -25,10 +25,18 @@
         <property name="brokerURL" value="vm://localhost?broker.persistent=false&amp;broker.useJmx=false"/>
     </bean>
 
+    <bean id="jmsConnectionFactory-1" class="org.apache.activemq.ActiveMQConnectionFactory">
+        <property name="brokerURL" value="vm://localhost?broker.persistent=false&amp;broker.useJmx=false"/>
+    </bean>
+
     <bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
         <property name="connectionFactory" ref="jmsConnectionFactory"/>
     </bean>
 
+    <bean id="jmsTransactionManager-1" class="org.springframework.jms.connection.JmsTransactionManager">
+        <property name="connectionFactory" ref="jmsConnectionFactory-1"/>
+    </bean>
+
     <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
         <property name="connectionFactory" ref="jmsConnectionFactory"/>
         <property name="transactionManager" ref="jmsTransactionManager"/>
@@ -36,10 +44,21 @@
         <property name="concurrentConsumers" value="1"/>
     </bean>
 
+    <bean id="jmsConfig-1" class="org.apache.camel.component.jms.JmsConfiguration">
+        <property name="connectionFactory" ref="jmsConnectionFactory-1"/>
+        <property name="transactionManager" ref="jmsTransactionManager-1"/>
+        <property name="transacted" value="true"/>
+        <property name="concurrentConsumers" value="1"/>
+    </bean>
+
     <bean id="activemq" class="org.apache.camel.component.jms.JmsComponent">
         <property name="configuration" ref="jmsConfig"/>
     </bean>
 
+    <bean id="activemq-1" class="org.apache.camel.component.jms.JmsComponent">
+        <property name="configuration" ref="jmsConfig-1"/>
+    </bean>
+
     <bean id="PROPAGATION_REQUIRED_POLICY" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
         <constructor-arg>
             <bean class="org.springframework.transaction.support.TransactionTemplate">