You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by dk...@apache.org on 2011/09/19 22:24:48 UTC

svn commit: r1172799 - in /camel/branches/camel-2.8.x: ./ components/camel-jms/src/main/java/org/apache/camel/component/jms/ components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ components/camel-jms/src/test/java/org/apache/camel/co...

Author: dkulp
Date: Mon Sep 19 20:24:47 2011
New Revision: 1172799

URL: http://svn.apache.org/viewvc?rev=1172799&view=rev
Log:
Merged revisions 1157475,1158206,1158281 via svnmerge from 
https://svn.apache.org/repos/asf/camel/trunk

........
  r1157475 | davsclaus | 2011-08-14 02:48:25 -0400 (Sun, 14 Aug 2011) | 1 line
  
  CAMEL-4202: Added ReplyToType option. Added new Exclusive replyToType to support per producer exclusive persistent reply queues for request/reply over JMS. This allows to run faster than the Shared option.
........
  r1158206 | davsclaus | 2011-08-16 07:24:27 -0400 (Tue, 16 Aug 2011) | 1 line
  
  CAMEL-4202: ReplyToType now compliant with JMX
........
  r1158281 | davsclaus | 2011-08-16 10:08:19 -0400 (Tue, 16 Aug 2011) | 1 line
  
  CAMEL-4202: ReplyToType now compliant with JMX
........

Added:
    camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/ReplyToType.java
      - copied unchanged from r1157475, camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/ReplyToType.java
    camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ExclusivePersistentQueueMessageListenerContainer.java
      - copied unchanged from r1157475, camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ExclusivePersistentQueueMessageListenerContainer.java
    camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedPersistentQueueMessageListenerContainer.java
      - copied unchanged from r1157475, camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedPersistentQueueMessageListenerContainer.java
    camel/branches/camel-2.8.x/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyExclusiveReplyToComponentTest.java
      - copied unchanged from r1157475, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyExclusiveReplyToComponentTest.java
    camel/branches/camel-2.8.x/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyExclusiveReplyToTest.java
      - copied unchanged from r1157475, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyExclusiveReplyToTest.java
    camel/branches/camel-2.8.x/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplySharedReplyToTest.java
      - copied unchanged from r1157475, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplySharedReplyToTest.java
Modified:
    camel/branches/camel-2.8.x/   (props changed)
    camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
    camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
    camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
    camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
    camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java

Propchange: camel/branches/camel-2.8.x/
------------------------------------------------------------------------------
    svn:mergeinfo = /camel/trunk:1157475,1158206,1158281

Propchange: camel/branches/camel-2.8.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java?rev=1172799&r1=1172798&r2=1172799&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java (original)
+++ camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java Mon Sep 19 20:24:47 2011
@@ -334,6 +334,14 @@ public class JmsComponent extends Defaul
         getConfiguration().setDestinationResolver(destinationResolver);
     }
 
+    public ReplyToType getReplyToType() {
+        return configuration.getReplyToType();
+    }
+
+    public void setReplyToType(ReplyToType replyToType) {
+        configuration.setReplyToType(replyToType);
+    }
+
     public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
         this.applicationContext = applicationContext;
     }

Modified: camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java?rev=1172799&r1=1172798&r2=1172799&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java (original)
+++ camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java Mon Sep 19 20:24:47 2011
@@ -124,6 +124,7 @@ public class JmsConfiguration implements
     private boolean forceSendOriginalMessage;
     // to force disabling time to live (works in both in-only or in-out mode)
     private boolean disableTimeToLive;
+    private ReplyToType replyToType;
 
     public JmsConfiguration() {
     }
@@ -1176,4 +1177,19 @@ public class JmsConfiguration implements
     public void setDisableTimeToLive(boolean disableTimeToLive) {
         this.disableTimeToLive = disableTimeToLive;
     }
+
+    /**
+     * Gets the reply to type.
+     * <p/>
+     * Will only return a value if this option has been explicit configured.
+     *
+     * @return the reply type if configured, otherwise <tt>null</tt>
+     */
+    public ReplyToType getReplyToType() {
+        return replyToType;
+    }
+
+    public void setReplyToType(ReplyToType replyToType) {
+        this.replyToType = replyToType;
+    }
 }

Modified: camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?rev=1172799&r1=1172798&r2=1172799&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java (original)
+++ camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java Mon Sep 19 20:24:47 2011
@@ -983,6 +983,21 @@ public class JmsEndpoint extends Default
         configuration.setDisableTimeToLive(disableTimeToLive);
     }
 
+    @ManagedAttribute
+    public String getReplyToType() {
+        if (configuration.getReplyToType() != null) {
+            return configuration.getReplyToType().name();
+        } else {
+            return null;
+        }
+    }
+
+    @ManagedAttribute
+    public void setReplyToType(String replyToType) {
+        ReplyToType type = ReplyToType.valueOf(replyToType);
+        configuration.setReplyToType(type);
+    }
+
     @ManagedAttribute(description = "Camel id")
     public String getCamelId() {
         return getCamelContext().getName();
@@ -1022,6 +1037,4 @@ public class JmsEndpoint extends Default
         return super.createEndpointUri();
     }
 
-
-
 }

Modified: camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java?rev=1172799&r1=1172798&r2=1172799&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java (original)
+++ camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java Mon Sep 19 20:24:47 2011
@@ -18,7 +18,6 @@ package org.apache.camel.component.jms;
 
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
-
 import javax.jms.Connection;
 import javax.jms.Destination;
 import javax.jms.JMSException;
@@ -67,6 +66,15 @@ public class JmsProducer extends Default
                     return;
                 }
                 try {
+                    // validate that replyToType and replyTo is configured accordingly
+                    if (endpoint.getReplyToType() != null) {
+                        // setting temporary with a fixed replyTo is not supported
+                        if (endpoint.getReplyTo() != null && endpoint.getReplyToType().equals(ReplyToType.Temporary.name())) {
+                            throw new IllegalArgumentException("ReplyToType " + ReplyToType.Temporary
+                                    + " is not supported when replyTo " + endpoint.getReplyTo() + " is also configured.");
+                        }
+                    }
+
                     if (endpoint.getReplyTo() != null) {
                         replyManager = endpoint.getReplyManager(endpoint.getReplyTo());
                         LOG.info("Using JmsReplyManager: " + replyManager + " to process replies from: " + endpoint.getReplyTo());

Modified: camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java?rev=1172799&r1=1172798&r2=1172799&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java (original)
+++ camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java Mon Sep 19 20:24:47 2011
@@ -25,6 +25,7 @@ import javax.jms.Session;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
+import org.apache.camel.component.jms.ReplyToType;
 import org.springframework.jms.listener.AbstractMessageListenerContainer;
 import org.springframework.jms.listener.DefaultMessageListenerContainer;
 import org.springframework.jms.support.destination.DestinationResolver;
@@ -124,45 +125,35 @@ public class PersistentQueueReplyManager
         }
     };
 
-    private final class PersistentQueueMessageListenerContainer extends DefaultMessageListenerContainer {
-
-        private String fixedMessageSelector;
-        private MessageSelectorCreator creator;
-
-        private PersistentQueueMessageListenerContainer(String fixedMessageSelector) {
-            this.fixedMessageSelector = fixedMessageSelector;
-        }
-
-        private PersistentQueueMessageListenerContainer(MessageSelectorCreator creator) {
-            this.creator = creator;
-        }
-
-        @Override
-        public String getMessageSelector() {
-            String id = null;
-            if (fixedMessageSelector != null) {
-                id = fixedMessageSelector;
-            } else if (creator != null) {
-                id = creator.get();
-            }
-            log.trace("Using MessageSelector[{}]", id);
-            return id;
-        }
-    }
-
     protected AbstractMessageListenerContainer createListenerContainer() throws Exception {
         DefaultMessageListenerContainer answer;
 
-        String replyToSelectorName = endpoint.getReplyToDestinationSelectorName();
-        if (replyToSelectorName != null) {
-            // create a random selector value we will use for the persistent reply queue
-            replyToSelectorValue = "ID:" + new BigInteger(24 * 8, new Random()).toString(16);
-            String fixedMessageSelector = replyToSelectorName + "='" + replyToSelectorValue + "'";
-            answer = new PersistentQueueMessageListenerContainer(fixedMessageSelector);
+        ReplyToType type = endpoint.getConfiguration().getReplyToType();
+        if (type == null) {
+            // use shared by default for persistent reply queues
+            type = ReplyToType.Shared;
+        }
+
+        if (ReplyToType.Shared == type) {
+            // shared reply to queues support either a fixed or dynamic JMS message selector
+            String replyToSelectorName = endpoint.getReplyToDestinationSelectorName();
+            if (replyToSelectorName != null) {
+                // create a random selector value we will use for the persistent reply queue
+                replyToSelectorValue = "ID:" + new BigInteger(24 * 8, new Random()).toString(16);
+                String fixedMessageSelector = replyToSelectorName + "='" + replyToSelectorValue + "'";
+                answer = new SharedPersistentQueueMessageListenerContainer(fixedMessageSelector);
+                log.debug("Using shared queue: " + endpoint.getReplyTo() + " with fixed message selector [" + fixedMessageSelector + "] as reply listener: " + answer);
+            } else {
+                // use a dynamic message selector which will select the message we want to receive as reply
+                dynamicMessageSelector = new MessageSelectorCreator();
+                answer = new SharedPersistentQueueMessageListenerContainer(dynamicMessageSelector);
+                log.debug("Using shared queue: " + endpoint.getReplyTo() + " with dynamic message selector as reply listener: " + answer);
+            }
+        } else if (ReplyToType.Exclusive == type) {
+            answer = new ExclusivePersistentQueueMessageListenerContainer();
+            log.debug("Using exclusive queue:" + endpoint.getReplyTo() + " as reply listener: " + answer);
         } else {
-            // use a dynamic message selector which will select the message we want to receive as reply
-            dynamicMessageSelector = new MessageSelectorCreator();
-            answer = new PersistentQueueMessageListenerContainer(dynamicMessageSelector);
+            throw new IllegalArgumentException("ReplyToType " + type + " is not supported for persistent reply queues");
         }
 
         DestinationResolver resolver = endpoint.getDestinationResolver();