You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ha...@apache.org on 2008/04/17 18:08:08 UTC

svn commit: r649165 - in /activemq/camel/trunk: camel-core/src/main/java/org/apache/camel/util/ components/camel-jms/src/main/java/org/apache/camel/component/jms/ components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/ components/c...

Author: hadrian
Date: Thu Apr 17 09:08:02 2008
New Revision: 649165

URL: http://svn.apache.org/viewvc?rev=649165&view=rev
Log:
CAMEL-469.  Patch applied with thanks!

Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMap.java
    activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
    activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
    activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
    activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
    activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
    activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
    activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
    activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/Requestor.java
    activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java?rev=649165&r1=649164&r2=649165&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java Thu Apr 17 09:08:02 2008
@@ -93,10 +93,16 @@
         Object[] keys = null;
         synchronized (map) {
             Set keySet = map.keySet();
-            keys = new String[keySet.size()];
+            keys = new Object[keySet.size()];
             keySet.toArray(keys);
         }
         return keys;
+    }
+    
+    public int size() {
+        synchronized (map) {
+            return map.size();
+        }
     }
 
     /**

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMap.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMap.java?rev=649165&r1=649164&r2=649165&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMap.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMap.java Thu Apr 17 09:08:02 2008
@@ -39,6 +39,7 @@
      */
     Object[] getKeys();
 
+    int size();
     /**
      * Adds the key value pair into the map such that some time after the given
      * timeout the entry will be evicted

Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java?rev=649165&r1=649164&r2=649165&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java Thu Apr 17 09:08:02 2008
@@ -88,7 +88,7 @@
     // -------------------------------------------------------------------------
     public JmsBinding getBinding() {
         if (binding == null) {
-            binding = new JmsBinding();
+            binding = new JmsBinding(endpoint);
         }
         return binding;
     }
@@ -161,10 +161,14 @@
             public Message createMessage(Session session) throws JMSException {
                 Message reply = endpoint.getBinding().makeJmsMessage(exchange, out, session);
 
-                // lets preserve any correlation ID
-                String correlationID = message.getJMSCorrelationID();
-                if (correlationID != null) {
-                    reply.setJMSCorrelationID(correlationID);
+                if (endpoint.getConfiguration().isUseMessageIDAsCorrelationID()) {
+                    String messageID = exchange.getIn().getHeader("JMSMessageID", String.class);
+                	reply.setJMSCorrelationID(messageID);
+                } else {
+                    String correlationID = message.getJMSCorrelationID();
+                    if (correlationID != null) {
+                        reply.setJMSCorrelationID(correlationID);
+                    }
                 }
 
                 if (LOG.isDebugEnabled()) {

Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java?rev=649165&r1=649164&r2=649165&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java Thu Apr 17 09:08:02 2008
@@ -56,9 +56,16 @@
  */
 public class JmsBinding {
     private static final transient Log LOG = LogFactory.getLog(JmsBinding.class);
+    private JmsEndpoint endpoint;
     private Set<String> ignoreJmsHeaders;
     private XmlConverter xmlConverter = new XmlConverter();
 
+    public JmsBinding() {
+    }
+
+    public JmsBinding(JmsEndpoint endpoint) {
+    	this.endpoint = endpoint;
+    }
     /**
      * Extracts the body from the JMS message
      *
@@ -118,7 +125,8 @@
     public Message makeJmsMessage(Exchange exchange, org.apache.camel.Message camelMessage, Session session)
         throws JMSException {
         Message answer = null;
-        if (camelMessage instanceof JmsMessage) {
+        boolean alwaysCopy = (endpoint != null) ? endpoint.getConfiguration().isAlwaysCopyMessage() : false; 
+        if (!alwaysCopy && camelMessage instanceof JmsMessage) {
             JmsMessage jmsMessage = (JmsMessage)camelMessage;
             answer = jmsMessage.getJmsMessage();
         }

Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java?rev=649165&r1=649164&r2=649165&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java Thu Apr 17 09:08:02 2008
@@ -241,6 +241,14 @@
         getConfiguration().setMessageTimestampEnabled(messageTimestampEnabled);
     }
 
+    public void setAlwaysCopyMessage(boolean alwaysCopyMessage) {
+    	getConfiguration().setAlwaysCopyMessage(alwaysCopyMessage);
+    }
+    
+    public void setUseMessageIDAsCorrelationID(boolean useMessageIDAsCorrelationID) {
+        getConfiguration().setUseMessageIDAsCorrelationID(useMessageIDAsCorrelationID);
+    }
+
     public void setPriority(int priority) {
         getConfiguration().setPriority(priority);
     }
@@ -301,7 +309,7 @@
         getConfiguration().setDestinationResolver(destinationResolver);
     }
 
-    public Requestor getRequestor() throws Exception {
+    public synchronized Requestor getRequestor() throws Exception {
         if (requestor == null) {
             requestor = new Requestor(getConfiguration(), getExecutorService());
             requestor.start();
@@ -361,7 +369,7 @@
             remaining = removeStartingCharacters(remaining.substring(TOPIC_PREFIX.length()), '/');
         }
 
-        final String subject = convertPathToActualDestination(remaining);
+        final String subject = convertPathToActualDestination(remaining, parameters);
 
         // lets make sure we copy the configuration as each endpoint can
         // customize its own version
@@ -386,7 +394,7 @@
      * A strategy method allowing the URI destination to be translated into the
      * actual JMS destination name (say by looking up in JNDI or something)
      */
-    protected String convertPathToActualDestination(String path) {
+    protected String convertPathToActualDestination(String path, Map parameters) {
         return path;
     }
 

Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java?rev=649165&r1=649164&r2=649165&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java Thu Apr 17 09:08:02 2008
@@ -17,6 +17,7 @@
 package org.apache.camel.component.jms;
 
 import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
 import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
 import javax.jms.Message;
@@ -26,12 +27,18 @@
 import javax.jms.TopicPublisher;
 
 import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.component.jms.requestor.Requestor;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.PackageHelper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.springframework.core.task.TaskExecutor;
+import org.springframework.jms.JmsException;
 import org.springframework.jms.core.JmsOperations;
 import org.springframework.jms.core.JmsTemplate;
 import org.springframework.jms.core.JmsTemplate102;
+import org.springframework.jms.core.MessageCreator;
+import org.springframework.jms.core.SessionCallback;
 import org.springframework.jms.listener.AbstractMessageListenerContainer;
 import org.springframework.jms.listener.DefaultMessageListenerContainer;
 import org.springframework.jms.listener.DefaultMessageListenerContainer102;
@@ -40,14 +47,18 @@
 import org.springframework.jms.listener.serversession.ServerSessionFactory;
 import org.springframework.jms.listener.serversession.ServerSessionMessageListenerContainer;
 import org.springframework.jms.listener.serversession.ServerSessionMessageListenerContainer102;
+import org.springframework.jms.support.JmsUtils;
 import org.springframework.jms.support.converter.MessageConverter;
 import org.springframework.jms.support.destination.DestinationResolver;
 import org.springframework.transaction.PlatformTransactionManager;
+import org.springframework.util.Assert;
+
 
 /**
  * @version $Revision$
  */
 public class JmsConfiguration implements Cloneable {
+    private static final transient Log LOG = LogFactory.getLog(JmsConfiguration.class);
     protected static final String TRANSACTED = "TRANSACTED";
     protected static final String CLIENT_ACKNOWLEDGE = "CLIENT_ACKNOWLEDGE";
     protected static final String AUTO_ACKNOWLEDGE = "AUTO_ACKNOWLEDGE";
@@ -83,6 +94,7 @@
     private boolean useVersion102;
     private Boolean explicitQosEnabled;
     private boolean deliveryPersistent = true;
+    private boolean replyToDeliveryPersistent = true;
     private long timeToLive = -1;
     private MessageConverter messageConverter;
     private boolean messageIdEnabled = true;
@@ -97,6 +109,9 @@
     private long requestMapPurgePollTimeMillis = 1000L;
     private boolean disableReplyTo;
     private boolean eagerLoadingOfProperties;
+    // Always make a JMS message copy when it's passed to Producer
+    private boolean alwaysCopyMessage = false;
+    private boolean useMessageIDAsCorrelationID = false;
 
     public JmsConfiguration() {
     }
@@ -129,69 +144,156 @@
         }
         return answer;
     }
-
-    public JmsOperations createInOnlyTemplate(boolean pubSubDomain, String destination) {
-
-        if (jmsOperations != null) {
-            return jmsOperations;
-        }
-
-        ConnectionFactory factory = getTemplateConnectionFactory();
-
-        // I whish the spring templates had built in support for preserving the
-        // message
-        // qos when doing a send. :(
-        JmsTemplate template = useVersion102 ? new JmsTemplate102(factory, pubSubDomain) {
-            /**
-             * Override so we can support preserving the Qos settings that have
-             * been set on the message.
-             */
-            @Override
-            protected void doSend(MessageProducer producer, Message message) throws JMSException {
-                if (preserveMessageQos) {
-                    long ttl = message.getJMSExpiration();
-                    if (ttl != 0) {
-                        ttl = ttl - System.currentTimeMillis();
-                        // Message had expired.. so set the ttl as small as
-                        // possible
-                        if (ttl <= 0) {
-                            ttl = 1;
+    
+    public static interface MessageSentCallback {
+        public void sent(Message message);
+    }
+    
+    public static class CamelJmsTemplate extends JmsTemplate {
+        private JmsConfiguration config;
+        
+        public CamelJmsTemplate(JmsConfiguration config, ConnectionFactory connectionFactory) {
+            super(connectionFactory);
+            this.config = config;
+        }
+        
+        public void send(final String destinationName, 
+                         final MessageCreator messageCreator, 
+                         final MessageSentCallback callback) throws JmsException {
+            execute(new SessionCallback() {
+                public Object doInJms(Session session) throws JMSException {
+                    Destination destination = resolveDestinationName(session, destinationName);
+                    Assert.notNull(messageCreator, "MessageCreator must not be null");
+                    MessageProducer producer = createProducer(session, destination);
+                    Message message = null;
+                    try {
+                        message = messageCreator.createMessage(session);
+                        if (logger.isDebugEnabled()) {
+                            logger.debug("Sending created message: " + message);
+                        }
+                        doSend(producer, message);
+                        // Check commit - avoid commit call within a JTA transaction.
+                        if (session.getTransacted() && isSessionLocallyTransacted(session)) {
+                            // Transacted session created by this template -> commit.
+                            JmsUtils.commitIfNecessary(session);
                         }
                     }
-                    if (isPubSubDomain()) {
-                        ((TopicPublisher)producer).publish(message, message.getJMSDeliveryMode(), message
-                            .getJMSPriority(), ttl);
-                    } else {
-                        ((QueueSender)producer).send(message, message.getJMSDeliveryMode(), message
-                            .getJMSPriority(), ttl);
+                    finally {
+                        JmsUtils.closeMessageProducer(producer);
+                    }
+                    if (message != null && callback != null) {
+                        callback.sent(message);
+                    }
+                    return null;
+                }
+            }, false);
+        }
+
+        /**
+         * Override so we can support preserving the Qos settings that have
+         * been set on the message.
+         */
+        @Override
+        protected void doSend(MessageProducer producer, Message message) throws JMSException {
+            if (config.isPreserveMessageQos()) {
+                long ttl = message.getJMSExpiration();
+                if (ttl != 0) {
+                    ttl = ttl - System.currentTimeMillis();
+                    // Message had expired.. so set the ttl as small as
+                    // possible
+                    if (ttl <= 0) {
+                        ttl = 1;
                     }
-                } else {
-                    super.doSend(producer, message);
                 }
+                producer.send(message, message.getJMSDeliveryMode(), message.getJMSPriority(), ttl);
             }
-        } : new JmsTemplate(factory) {
-            /**
-             * Override so we can support preserving the Qos settings that have
-             * been set on the message.
-             */
-            @Override
-            protected void doSend(MessageProducer producer, Message message) throws JMSException {
-                if (preserveMessageQos) {
-                    long ttl = message.getJMSExpiration();
-                    if (ttl != 0) {
-                        ttl = ttl - System.currentTimeMillis();
-                        // Message had expired.. so set the ttl as small as
-                        // possible
-                        if (ttl <= 0) {
-                            ttl = 1;
-                        }
+            else {
+                super.doSend(producer, message);
+            }
+        }
+    }
+    
+    public static class CamelJmsTeemplate102 extends JmsTemplate102 {
+        private JmsConfiguration config;
+        
+        public CamelJmsTeemplate102(JmsConfiguration config, ConnectionFactory connectionFactory, boolean pubSubDomain) {
+            super(connectionFactory, pubSubDomain);
+            this.config = config;
+        }
+        
+        public void send(final String destinationName, 
+                final MessageCreator messageCreator, 
+                final MessageSentCallback callback) throws JmsException {
+           execute(new SessionCallback() {
+               public Object doInJms(Session session) throws JMSException {
+                   Destination destination = resolveDestinationName(session, destinationName);
+                   Assert.notNull(messageCreator, "MessageCreator must not be null");
+                   MessageProducer producer = createProducer(session, destination);
+                   Message message = null;
+                   try {
+                       message = messageCreator.createMessage(session);
+                       if (logger.isDebugEnabled()) {
+                           logger.debug("Sending created message: " + message);
+                       }
+                       doSend(producer, message);
+                       // Check commit - avoid commit call within a JTA transaction.
+                       if (session.getTransacted() && isSessionLocallyTransacted(session)) {
+                           // Transacted session created by this template -> commit.
+                           JmsUtils.commitIfNecessary(session);
+                       }
+                   }
+                   finally {
+                       JmsUtils.closeMessageProducer(producer);
+                   }
+                   if (message != null && callback != null) {
+                       callback.sent(message);
+                   }
+                   return null;
+               }
+           }, false);
+        }
+        
+        /**
+         * Override so we can support preserving the Qos settings that have
+         * been set on the message.
+         */
+        @Override
+        protected void doSend(MessageProducer producer, Message message) throws JMSException {
+            if (config.isPreserveMessageQos()) {
+                long ttl = message.getJMSExpiration();
+                if (ttl != 0) {
+                    ttl = ttl - System.currentTimeMillis();
+                    // Message had expired.. so set the ttl as small as
+                    // possible
+                    if (ttl <= 0) {
+                        ttl = 1;
                     }
-                    producer.send(message, message.getJMSDeliveryMode(), message.getJMSPriority(), ttl);
+                }
+                if (isPubSubDomain()) {
+                    ((TopicPublisher) producer).publish(message, message.getJMSDeliveryMode(), 
+                                                        message.getJMSPriority(), ttl);
                 } else {
-                    super.doSend(producer, message);
+                    ((QueueSender) producer).send(message, message.getJMSDeliveryMode(), 
+                                                  message.getJMSPriority(), ttl);
                 }
             }
-        };
+            else {
+                super.doSend(producer, message);
+            }
+        }
+    }
+    
+    public JmsOperations createInOnlyTemplate(boolean pubSubDomain, String destination) {
+
+        if (jmsOperations != null) {
+            return jmsOperations;
+        }
+
+        ConnectionFactory factory = getTemplateConnectionFactory();
+
+        JmsTemplate template = useVersion102 ? 
+                new CamelJmsTeemplate102(this, factory, pubSubDomain) 
+                : new CamelJmsTemplate(this, factory);
 
         template.setPubSubDomain(pubSubDomain);
         if (destinationResolver != null) {
@@ -499,6 +601,14 @@
         configuredQoS();
     }
 
+    public boolean isReplyToDeliveryPersistent() {
+        return replyToDeliveryPersistent;
+    }
+
+    public void setReplyToDeliveryPersistent(boolean replyToDeliveryPersistent) {
+        this.replyToDeliveryPersistent = replyToDeliveryPersistent;
+    }
+
     public long getTimeToLive() {
         return timeToLive;
     }
@@ -531,7 +641,7 @@
     public void setMessageTimestampEnabled(boolean messageTimestampEnabled) {
         this.messageTimestampEnabled = messageTimestampEnabled;
     }
-
+   
     public int getPriority() {
         return priority;
     }
@@ -760,6 +870,15 @@
         if (isEagerLoadingOfProperties()) {
             listener.setEagerLoadingOfProperties(true);
         }
+        // REVISIT: We really ought to change the model and let JmsProducer 
+        // and JmsConsumer have their own JmsConfiguration instance
+        // This way producer's and consumer's QoS can differ and be 
+        // independently configured 
+        JmsOperations operations = listener.getTemplate();
+        if (operations instanceof JmsTemplate) {
+            JmsTemplate template = (JmsTemplate)operations;
+            template.setDeliveryPersistent(isReplyToDeliveryPersistent());
+        }
     }
     protected AbstractMessageListenerContainer chooseMessageListenerContainerImplementation() {
         // TODO we could allow a spring container to auto-inject these objects?
@@ -851,4 +970,20 @@
         }
     }
 
+
+	public boolean isAlwaysCopyMessage() {
+		return alwaysCopyMessage;
+	}
+
+	public void setAlwaysCopyMessage(boolean alwaysCopyMessage) {
+		this.alwaysCopyMessage = alwaysCopyMessage;
+	}
+
+	public boolean isUseMessageIDAsCorrelationID() {
+		return useMessageIDAsCorrelationID;
+	}
+
+	public void setUseMessageIDAsCorrelationID(boolean useMessageIDAsCorrelationID) {
+		this.useMessageIDAsCorrelationID = useMessageIDAsCorrelationID;
+	}
 }

Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java?rev=649165&r1=649164&r2=649165&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java Thu Apr 17 09:08:02 2008
@@ -30,12 +30,13 @@
  */
 public class JmsConsumer extends DefaultConsumer<JmsExchange> {
     private final AbstractMessageListenerContainer listenerContainer;
+    private EndpointMessageListener messageListener;
 
     public JmsConsumer(JmsEndpoint endpoint, Processor processor, AbstractMessageListenerContainer listenerContainer) {
         super(endpoint, processor);
         this.listenerContainer = listenerContainer;
 
-        MessageListener messageListener = createMessageListener(endpoint, processor);
+        createMessageListener(endpoint, processor);
         this.listenerContainer.setMessageListener(messageListener);
     }
 
@@ -43,10 +44,13 @@
         return listenerContainer;
     }
 
-    protected MessageListener createMessageListener(JmsEndpoint endpoint, Processor processor) {
-        EndpointMessageListener messageListener = new EndpointMessageListener(endpoint, processor);
-        messageListener.setBinding(endpoint.getBinding());
+    public EndpointMessageListener getEndpointMessageListener() {
         return messageListener;
+    }
+    
+    protected void createMessageListener(JmsEndpoint endpoint, Processor processor) {
+        messageListener = new EndpointMessageListener(endpoint, processor);
+        messageListener.setBinding(endpoint.getBinding());
     }
 
     @Override

Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?rev=649165&r1=649164&r2=649165&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java Thu Apr 17 09:08:02 2008
@@ -123,7 +123,7 @@
     // -------------------------------------------------------------------------
     public JmsBinding getBinding() {
         if (binding == null) {
-            binding = new JmsBinding();
+            binding = new JmsBinding(this);
         }
         return binding;
     }
@@ -161,7 +161,7 @@
         return false;
     }
 
-    public Requestor getRequestor() throws Exception {
+    public synchronized Requestor getRequestor() throws Exception {
         if (requestor == null) {
             requestor = component.getRequestor();
         }

Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java?rev=649165&r1=649164&r2=649165&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java Thu Apr 17 09:08:02 2008
@@ -28,13 +28,21 @@
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangeTimedOutException;
 import org.apache.camel.RuntimeExchangeException;
+import org.apache.camel.component.jms.JmsConfiguration.CamelJmsTemplate;
+import org.apache.camel.component.jms.JmsConfiguration.MessageSentCallback;
+import org.apache.camel.component.jms.requestor.DeferredRequestReplyMap;
+import org.apache.camel.component.jms.requestor.FailedToProcessResponse;
 import org.apache.camel.component.jms.requestor.Requestor;
+import org.apache.camel.component.jms.requestor.DeferredRequestReplyMap.DeferredMessageSentCallback;
 import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.util.Out;
 import org.apache.camel.util.UuidGenerator;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.springframework.jms.core.JmsOperations;
+import org.springframework.jms.core.JmsTemplate;
 import org.springframework.jms.core.MessageCreator;
+import org.springframework.jms.core.SessionCallback;
 
 /**
  * @version $Revision$
@@ -45,12 +53,28 @@
     private JmsOperations inOnlyTemplate;
     private JmsOperations inOutTemplate;
     private UuidGenerator uuidGenerator;
+    private DeferredRequestReplyMap deferredRequestReplyMap;
 
     public JmsProducer(JmsEndpoint endpoint) {
         super(endpoint);
         this.endpoint = endpoint;
     }
 
+    public long getRequestTimeout() {
+        return endpoint.getRequestTimeout();
+    }
+    
+    protected void doStart() throws Exception {
+        super.doStart();
+        deferredRequestReplyMap = endpoint.getRequestor().getDeferredRequestReplyMap(this);
+    }
+
+    protected void doStop() throws Exception {
+        endpoint.getRequestor().removeDeferredRequestReplyMap(this);
+        deferredRequestReplyMap = null;
+        super.doStop();
+    }
+
     public void process(final Exchange exchange) {
         final org.apache.camel.Message in = exchange.getIn();
 
@@ -58,7 +82,7 @@
             // create a temporary queue and consumer for responses...
             // note due to JMS transaction semantics we cannot use a single transaction
             // for sending the request and receiving the response
-            Requestor requestor;
+            final Requestor requestor;
             try {
                 requestor = endpoint.getRequestor();
             } catch (Exception e) {
@@ -67,36 +91,47 @@
 
             final Destination replyTo = requestor.getReplyTo();
 
+            final boolean msgIdAsCorrId = endpoint.getConfiguration().isUseMessageIDAsCorrelationID();
             String correlationId = in.getHeader("JMSCorrelationID", String.class);
-            if (correlationId == null) {
-                correlationId = getUuidGenerator().generateId();
-                in.setHeader("JMSCorrelationID", correlationId);
+            
+            if (correlationId == null && !msgIdAsCorrId) {
+                in.setHeader("JMSCorrelationID", getUuidGenerator().generateId());
             }
+            
+            final Out<FutureTask> futureHolder = new Out<FutureTask>();
+            final DeferredMessageSentCallback callback = (msgIdAsCorrId) ?
+                        deferredRequestReplyMap.createDeferredMessageSentCallback() : null;
 
-            // lets register the future object before we try send just in case
-            long requestTimeout = endpoint.getRequestTimeout();
-            FutureTask future = requestor.getReceiveFuture(correlationId, requestTimeout);
-
-            getInOutTemplate().send(endpoint.getDestination(), new MessageCreator() {
+            final CamelJmsTemplate template = (CamelJmsTemplate)getInOutTemplate();
+            template.send(endpoint.getDestination(), new MessageCreator() {
                 public Message createMessage(Session session) throws JMSException {
                     Message message = endpoint.getBinding().makeJmsMessage(exchange, in, session);
                     message.setJMSReplyTo(replyTo);
+                    
+                    FutureTask future = null;
+                    future = (!msgIdAsCorrId) ? 
+                             requestor.getReceiveFuture(message.getJMSCorrelationID(), 
+                                                        endpoint.getRequestTimeout()) 
+                             : requestor.getReceiveFuture(callback);
 
+                    futureHolder.set(future);
+                    
                     if (LOG.isDebugEnabled()) {
                         LOG.debug(endpoint + " sending JMS message: " + message);
                     }
                     return message;
                 }
-            });
-
+            }, callback);
+            
             // lets wait and return the response
+            long requestTimeout = endpoint.getRequestTimeout();
             try {
                 Message message = null;
                 try {
                     if (requestTimeout < 0) {
-                        message = (Message)future.get();
+                        message = (Message)futureHolder.get().get();
                     } else {
-                        message = (Message)future.get(requestTimeout, TimeUnit.MILLISECONDS);
+                        message = (Message)futureHolder.get().get(requestTimeout, TimeUnit.MILLISECONDS);
                     }
                 } catch (InterruptedException e) {
                     if (LOG.isDebugEnabled()) {
@@ -109,7 +144,12 @@
                 }
                 if (message != null) {
                     exchange.setOut(new JmsMessage(message, endpoint.getBinding()));
-                } else {
+                    if (correlationId != null) {
+                        message.setJMSCorrelationID(correlationId);
+                        exchange.getOut(false).setHeader("JMSCorrelationID", correlationId);
+                    } 
+                }
+                else {
                     // lets set a timed out exception
                     exchange.setException(new ExchangeTimedOutException(exchange, requestTimeout));
                 }

Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/Requestor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/Requestor.java?rev=649165&r1=649164&r2=649165&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/Requestor.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/Requestor.java Thu Apr 17 09:08:02 2008
@@ -16,8 +16,11 @@
  */
 package org.apache.camel.component.jms.requestor;
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
 
 import javax.jms.Destination;
 import javax.jms.ExceptionListener;
@@ -31,6 +34,9 @@
 import org.apache.camel.impl.ServiceSupport;
 import org.apache.camel.util.DefaultTimeoutMap;
 import org.apache.camel.util.TimeoutMap;
+import org.apache.camel.util.UuidGenerator;
+import org.apache.camel.component.jms.JmsProducer;
+import org.apache.camel.component.jms.requestor.DeferredRequestReplyMap.DeferredMessageSentCallback;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.springframework.core.task.TaskExecutor;
@@ -45,26 +51,70 @@
 public class Requestor extends ServiceSupport implements MessageListener {
     private static final transient Log LOG = LogFactory.getLog(Requestor.class);
     private final JmsConfiguration configuration;
+    private ScheduledExecutorService executorService;
     private AbstractMessageListenerContainer listenerContainer;
     private TimeoutMap requestMap;
+    private Map<JmsProducer, DeferredRequestReplyMap> producerDeferredRequestReplyMap;
+    private TimeoutMap deferredRequestMap;
+    private TimeoutMap deferredReplyMap;
     private Destination replyTo;
-
+    private long maxRequestTimeout = -1;
+    private static UuidGenerator uuidGenerator;
+    
     public Requestor(JmsConfiguration configuration, ScheduledExecutorService executorService) {
         this.configuration = configuration;
+        this.executorService = executorService;
         requestMap = new DefaultTimeoutMap(executorService, configuration.getRequestMapPurgePollTimeMillis());
+        producerDeferredRequestReplyMap = new HashMap<JmsProducer, DeferredRequestReplyMap>();
+        deferredRequestMap = new DefaultTimeoutMap(executorService, configuration.getRequestMapPurgePollTimeMillis());
+        deferredReplyMap = new DefaultTimeoutMap(executorService, configuration.getRequestMapPurgePollTimeMillis());
+    }
+    
+    public synchronized DeferredRequestReplyMap getDeferredRequestReplyMap(JmsProducer producer) {
+        DeferredRequestReplyMap map = producerDeferredRequestReplyMap.get(producer);
+        if (map == null) {
+            map = new DeferredRequestReplyMap(this, producer, deferredRequestMap, deferredReplyMap); 
+            producerDeferredRequestReplyMap.put(producer, map);
+        }
+        if (maxRequestTimeout == -1) {
+            maxRequestTimeout = producer.getRequestTimeout();
+        } else if (maxRequestTimeout < producer.getRequestTimeout()) {
+            maxRequestTimeout = producer.getRequestTimeout();
+        }
+        return map;
+    }
+    
+    public synchronized void removeDeferredRequestReplyMap(JmsProducer producer) {
+        producerDeferredRequestReplyMap.remove(producer);
+        if (maxRequestTimeout == producer.getRequestTimeout()) {
+            long max = -1;
+            for (Map.Entry<JmsProducer, DeferredRequestReplyMap> entry : producerDeferredRequestReplyMap.entrySet()) {
+                if (max < entry.getKey().getRequestTimeout()) {
+                    max = entry.getKey().getRequestTimeout();
+                }
+            }
+            maxRequestTimeout = max;
+        }
+    }
+    
+    public synchronized long getMaxRequestTimeout() {
+        return maxRequestTimeout;
+    }
+    
+    public TimeoutMap getRequestMap() {
+        return requestMap;
+    }
+    
+    public TimeoutMap getDeferredRequestMap() {
+        return deferredRequestMap;
+    }
+    
+    public TimeoutMap getDeferredReplyMap() {
+        return deferredReplyMap;
     }
 
     public FutureTask getReceiveFuture(String correlationID, long requestTimeout) {
         FutureTask future = null;
-/*
-            // Deal with async handlers...
-
-            Object currentHandler = requestMap.get(correlationID);
-            if (currentHandler instanceof AsyncReplyHandler) {
-                AsyncReplyHandler handler = (AsyncReplyHandler) currentHandler;
-                future = handler.newResultHandler();
-            }
-*/
 
         if (future == null) {
             FutureHandler futureHandler = new FutureHandler();
@@ -74,6 +124,13 @@
         return future;
     }
 
+    public FutureTask getReceiveFuture(DeferredMessageSentCallback callback) {
+        FutureTask future = new FutureHandler();
+        DeferredRequestReplyMap map = callback.getDeferredRequestReplyMap();
+        map.put(callback, future);
+        return future;
+    }
+
     public void onMessage(Message message) {
         try {
             String correlationID = message.getJMSCorrelationID();
@@ -81,23 +138,24 @@
                 LOG.warn("Ignoring message with no correlationID! " + message);
                 return;
             }
-
             // lets notify the monitor for this response
             Object handler = requestMap.get(correlationID);
-            if (handler == null) {
-                LOG.warn("Response received for unknown correlationID: " + correlationID + " request: "
-                         + message);
-            } else if (handler instanceof ReplyHandler) {
-                ReplyHandler replyHandler = (ReplyHandler)handler;
+            if (handler != null && handler instanceof ReplyHandler) {
+                ReplyHandler replyHandler = (ReplyHandler) handler;
                 boolean complete = replyHandler.handle(message);
                 if (complete) {
                     requestMap.remove(correlationID);
                 }
+            } else {
+                DeferredRequestReplyMap.processDeferredRequests(
+                        this, deferredRequestMap, deferredReplyMap, 
+                        correlationID, getMaxRequestTimeout(), message);
             }
         } catch (JMSException e) {
             throw new FailedToProcessResponse(message, e);
         }
     }
+    
 
     public AbstractMessageListenerContainer getListenerContainer() {
         if (listenerContainer == null) {
@@ -166,5 +224,12 @@
             answer.setExceptionListener(exceptionListener);
         }
         return answer;
+    }
+    
+    public static synchronized UuidGenerator getUuidGenerator() {
+        if (uuidGenerator == null) {
+            uuidGenerator = new UuidGenerator();
+        }
+        return uuidGenerator;
     }
 }

Modified: activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java?rev=649165&r1=649164&r2=649165&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java Thu Apr 17 09:08:02 2008
@@ -18,12 +18,15 @@
 
 
 import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.camel.CamelContext;
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
+import org.springframework.jms.core.JmsOperations;
+import org.springframework.jms.core.JmsTemplate;
 import org.springframework.jms.listener.AbstractMessageListenerContainer;
 import org.springframework.jms.listener.DefaultMessageListenerContainer;
 import static org.apache.camel.component.jms.JmsComponent.jmsComponentClientAcknowledge;
@@ -83,6 +86,20 @@
     public void testCacheConsumerEnabledForTopic() throws Exception {
         JmsEndpoint endpoint = (JmsEndpoint) resolveMandatoryEndpoint("jms:topic:Foo.Bar");
         assertCacheLevel(endpoint, DefaultMessageListenerContainer.CACHE_CONSUMER);
+    }
+    
+    public void testReplyToPesistentDelivery() throws Exception {
+        JmsEndpoint endpoint = (JmsEndpoint) resolveMandatoryEndpoint("jms:queue:Foo");
+        endpoint.getConfiguration().setDeliveryPersistent(true);
+        endpoint.getConfiguration().setReplyToDeliveryPersistent(false);
+        JmsProducer producer = endpoint.createProducer();
+        JmsConsumer consumer = endpoint.createConsumer(dummyProcessor);
+        JmsOperations operations = consumer.getEndpointMessageListener().getTemplate();
+        assertTrue(operations instanceof JmsTemplate);
+        JmsTemplate template = (JmsTemplate)operations;
+        assertTrue("Wrong delivery mode on reply template; expected  " 
+                     + " DeliveryMode.NON_PERSISTENT but was DeliveryMode.PERSISTENT", 
+                     template.getDeliveryMode() == DeliveryMode.NON_PERSISTENT);
     }
 
     protected void assertCacheLevel(JmsEndpoint endpoint, int expected) throws Exception {