You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/11/22 13:32:28 UTC

svn commit: r1412525 - in /camel/branches/camel-2.10.x: ./ components/camel-jms/src/main/java/org/apache/camel/component/jms/ components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ components/camel-jms/src/test/java/org/apache/camel/c...

Author: davsclaus
Date: Thu Nov 22 12:32:27 2012
New Revision: 1412525

URL: http://svn.apache.org/viewvc?rev=1412525&view=rev
Log:
CAMEL-5809: camel-jms request/reply over JMS allow to use concurrentConsumers/maxConcurrentConsumers options.

Modified:
    camel/branches/camel-2.10.x/   (props changed)
    camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java
    camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ExclusivePersistentQueueMessageListenerContainer.java
    camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java
    camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedPersistentQueueMessageListenerContainer.java
    camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
    camel/branches/camel-2.10.x/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyExclusiveReplyToConcurrentTest.java

Propchange: camel/branches/camel-2.10.x/
------------------------------------------------------------------------------
  Merged /camel/trunk:r1412524

Propchange: camel/branches/camel-2.10.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java?rev=1412525&r1=1412524&r2=1412525&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java (original)
+++ camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java Thu Nov 22 12:32:27 2012
@@ -16,6 +16,9 @@
  */
 package org.apache.camel.component.jms;
 
+import org.apache.camel.util.concurrent.CamelThreadFactory;
+import org.springframework.core.task.SimpleAsyncTaskExecutor;
+import org.springframework.core.task.TaskExecutor;
 import org.springframework.jms.listener.DefaultMessageListenerContainer;
 
 /**
@@ -40,4 +43,22 @@ public class DefaultJmsMessageListenerCo
         // do not run if we have been stopped
         return endpoint.isRunning();
     }
+
+    /**
+     * Create a default TaskExecutor. Called if no explicit TaskExecutor has been specified.
+     * <p>The default implementation builds a {@link org.springframework.core.task.SimpleAsyncTaskExecutor}
+     * with the specified bean name and using Camel's {@link org.apache.camel.spi.ExecutorServiceManager}
+     * to resolve the thread name.
+     * @see org.springframework.core.task.SimpleAsyncTaskExecutor#SimpleAsyncTaskExecutor(String)
+     */
+    @Override
+    protected TaskExecutor createDefaultTaskExecutor() {
+        String pattern = endpoint.getCamelContext().getExecutorServiceManager().getThreadNamePattern();
+        String beanName = getBeanName();
+
+        SimpleAsyncTaskExecutor answer = new SimpleAsyncTaskExecutor(beanName);
+        answer.setThreadFactory(new CamelThreadFactory(pattern, beanName, true));
+        return answer;
+    }
+
 }

Modified: camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ExclusivePersistentQueueMessageListenerContainer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ExclusivePersistentQueueMessageListenerContainer.java?rev=1412525&r1=1412524&r2=1412525&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ExclusivePersistentQueueMessageListenerContainer.java (original)
+++ camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ExclusivePersistentQueueMessageListenerContainer.java Thu Nov 22 12:32:27 2012
@@ -16,7 +16,8 @@
  */
 package org.apache.camel.component.jms.reply;
 
-import org.springframework.jms.listener.DefaultMessageListenerContainer;
+import org.apache.camel.component.jms.DefaultJmsMessageListenerContainer;
+import org.apache.camel.component.jms.JmsEndpoint;
 
 /**
  * This {@link org.springframework.jms.listener.DefaultMessageListenerContainer} is used for persistent reply queues
@@ -31,8 +32,11 @@ import org.springframework.jms.listener.
  *
  * @see SharedPersistentQueueMessageListenerContainer
  */
-public class ExclusivePersistentQueueMessageListenerContainer extends DefaultMessageListenerContainer {
+public class ExclusivePersistentQueueMessageListenerContainer extends DefaultJmsMessageListenerContainer {
 
     // no need to override any methods currently
 
+    public ExclusivePersistentQueueMessageListenerContainer(JmsEndpoint endpoint) {
+        super(endpoint);
+    }
 }

Modified: camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java?rev=1412525&r1=1412524&r2=1412525&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java (original)
+++ camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java Thu Nov 22 12:32:27 2012
@@ -132,14 +132,14 @@ public class PersistentQueueReplyManager
                 // create a random selector value we will use for the persistent reply queue
                 replyToSelectorValue = "ID:" + new BigInteger(24 * 8, new Random()).toString(16);
                 String fixedMessageSelector = replyToSelectorName + "='" + replyToSelectorValue + "'";
-                answer = new SharedPersistentQueueMessageListenerContainer(fixedMessageSelector);
+                answer = new SharedPersistentQueueMessageListenerContainer(endpoint, fixedMessageSelector);
                 // must use cache level consumer for fixed message selector
                 answer.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER);
                 log.debug("Using shared queue: " + endpoint.getReplyTo() + " with fixed message selector [" + fixedMessageSelector + "] as reply listener: " + answer);
             } else {
                 // use a dynamic message selector which will select the message we want to receive as reply
                 dynamicMessageSelector = new MessageSelectorCreator(correlation);
-                answer = new SharedPersistentQueueMessageListenerContainer(dynamicMessageSelector);
+                answer = new SharedPersistentQueueMessageListenerContainer(endpoint, dynamicMessageSelector);
                 // must use cache level session for dynamic message selector,
                 // as otherwise the dynamic message selector will not be updated on-the-fly
                 answer.setCacheLevel(DefaultMessageListenerContainer.CACHE_SESSION);
@@ -149,7 +149,7 @@ public class PersistentQueueReplyManager
             log.warn("{} is using a shared reply queue, which is not as fast as alternatives."
                     + " See more detail at the section 'Request-reply over JMS' at http://camel.apache.org/jms", endpoint);
         } else if (ReplyToType.Exclusive == type) {
-            answer = new ExclusivePersistentQueueMessageListenerContainer();
+            answer = new ExclusivePersistentQueueMessageListenerContainer(endpoint);
             // must use cache level consumer for exclusive as there is no message selector
             answer.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER);
             log.debug("Using exclusive queue:" + endpoint.getReplyTo() + " as reply listener: " + answer);
@@ -208,12 +208,17 @@ public class PersistentQueueReplyManager
         if (endpoint.getRecoveryInterval() >= 0) {
             answer.setRecoveryInterval(endpoint.getRecoveryInterval());
         }
-        // do not use a task executor for reply as we are are always a single threaded task
+        // set task executor
+        if (endpoint.getTaskExecutor() != null) {
+            if (log.isDebugEnabled()) {
+                log.debug("Using custom TaskExecutor: {} on listener container: {}", endpoint.getTaskExecutor(), answer);
+            }
+            answer.setTaskExecutor(endpoint.getTaskExecutor());
+        }
 
         // setup a bean name which is used ny Spring JMS as the thread name
         String name = "PersistentQueueReplyManager[" + answer.getDestinationName() + "]";
-        String beanName = endpoint.getCamelContext().getExecutorServiceManager().resolveThreadName(name);
-        answer.setBeanName(beanName);
+        answer.setBeanName(name);
 
         if (answer.getConcurrentConsumers() > 1) {
             if (ReplyToType.Shared == type) {

Modified: camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedPersistentQueueMessageListenerContainer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedPersistentQueueMessageListenerContainer.java?rev=1412525&r1=1412524&r2=1412525&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedPersistentQueueMessageListenerContainer.java (original)
+++ camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedPersistentQueueMessageListenerContainer.java Thu Nov 22 12:32:27 2012
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.jms.reply;
 
+import org.apache.camel.component.jms.DefaultJmsMessageListenerContainer;
+import org.apache.camel.component.jms.JmsEndpoint;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.jms.listener.DefaultMessageListenerContainer;
@@ -32,7 +34,7 @@ import org.springframework.jms.listener.
  *
  * @see ExclusivePersistentQueueMessageListenerContainer
  */
-public class SharedPersistentQueueMessageListenerContainer extends DefaultMessageListenerContainer {
+public class SharedPersistentQueueMessageListenerContainer extends DefaultJmsMessageListenerContainer {
 
     private static final Logger LOG = LoggerFactory.getLogger(SharedPersistentQueueMessageListenerContainer.class);
 
@@ -42,18 +44,22 @@ public class SharedPersistentQueueMessag
     /**
      * Use a fixed JMS message selector
      *
+     * @param endpoint the endpoint
      * @param fixedMessageSelector the fixed selector
      */
-    public SharedPersistentQueueMessageListenerContainer(String fixedMessageSelector) {
+    public SharedPersistentQueueMessageListenerContainer(JmsEndpoint endpoint, String fixedMessageSelector) {
+        super(endpoint);
         this.fixedMessageSelector = fixedMessageSelector;
     }
 
     /**
      * Use a dynamic JMS message selector
      *
+     * @param endpoint the endpoint
      * @param creator the create to create the dynamic selector
      */
-    public SharedPersistentQueueMessageListenerContainer(MessageSelectorCreator creator) {
+    public SharedPersistentQueueMessageListenerContainer(JmsEndpoint endpoint, MessageSelectorCreator creator) {
+        super(endpoint);
         this.creator = creator;
     }
 

Modified: camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java?rev=1412525&r1=1412524&r2=1412525&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java (original)
+++ camel/branches/camel-2.10.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java Thu Nov 22 12:32:27 2012
@@ -25,6 +25,7 @@ import javax.jms.TemporaryQueue;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
+import org.apache.camel.component.jms.DefaultJmsMessageListenerContainer;
 import org.apache.camel.component.jms.DefaultSpringErrorHandler;
 import org.springframework.jms.listener.AbstractMessageListenerContainer;
 import org.springframework.jms.listener.DefaultMessageListenerContainer;
@@ -86,7 +87,7 @@ public class TemporaryQueueReplyManager 
     @Override
     protected AbstractMessageListenerContainer createListenerContainer() throws Exception {
         // Use DefaultMessageListenerContainer as it supports reconnects (see CAMEL-3193)
-        DefaultMessageListenerContainer answer = new DefaultMessageListenerContainer();
+        DefaultMessageListenerContainer answer = new DefaultJmsMessageListenerContainer(endpoint);
 
         answer.setDestinationName("temporary");
         answer.setDestinationResolver(new DestinationResolver() {
@@ -136,12 +137,16 @@ public class TemporaryQueueReplyManager 
         if (endpoint.getRecoveryInterval() >= 0) {
             answer.setRecoveryInterval(endpoint.getRecoveryInterval());
         }
-        // do not use a task executor for reply as we are are always a single threaded task
+        if (endpoint.getTaskExecutor() != null) {
+            if (log.isDebugEnabled()) {
+                log.debug("Using custom TaskExecutor: {} on listener container: {}", endpoint.getTaskExecutor(), answer);
+            }
+            answer.setTaskExecutor(endpoint.getTaskExecutor());
+        }
 
         // setup a bean name which is used ny Spring JMS as the thread name
         String name = "TemporaryQueueReplyManager[" + answer.getDestinationName() + "]";
-        String beanName = endpoint.getCamelContext().getExecutorServiceManager().resolveThreadName(name);
-        answer.setBeanName(beanName);
+        answer.setBeanName(name);
 
         if (answer.getConcurrentConsumers() > 1) {
             // log that we are using concurrent consumers

Modified: camel/branches/camel-2.10.x/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyExclusiveReplyToConcurrentTest.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyExclusiveReplyToConcurrentTest.java?rev=1412525&r1=1412524&r2=1412525&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyExclusiveReplyToConcurrentTest.java (original)
+++ camel/branches/camel-2.10.x/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyExclusiveReplyToConcurrentTest.java Thu Nov 22 12:32:27 2012
@@ -81,11 +81,11 @@ public class JmsRequestReplyExclusiveRep
             @Override
             public void configure() throws Exception {
                 from("direct:start")
-                    .to("activemq:queue:foo?replyTo=bar&replyToType=Exclusive&concurrentConsumers=5&maxConcurrentConsumers=10")
+                    .to("activemq:queue:foo?replyTo=bar&replyToType=Exclusive&concurrentConsumers=5&maxConcurrentConsumers=10&maxMessagesPerTask=100")
                     .to("log:reply")
                     .to("mock:reply");
 
-                from("activemq:queue:foo?concurrentConsumers=5&maxConcurrentConsumers=10")
+                from("activemq:queue:foo?concurrentConsumers=5&maxConcurrentConsumers=10&maxMessagesPerTask=100")
                     .transform(body().prepend("Hello "));
             }
         };