You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/11/22 13:31:09 UTC
svn commit: r1412524 - in /camel/trunk/components/camel-jms/src:
main/java/org/apache/camel/component/jms/
main/java/org/apache/camel/component/jms/reply/
test/java/org/apache/camel/component/jms/
Author: davsclaus
Date: Thu Nov 22 12:31:07 2012
New Revision: 1412524
URL: http://svn.apache.org/viewvc?rev=1412524&view=rev
Log:
CAMEL-5809: camel-jms request/reply over JMS allow to use concurrentConsumers/maxConcurrentConsumers options.
Modified:
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ExclusivePersistentQueueMessageListenerContainer.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedPersistentQueueMessageListenerContainer.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyExclusiveReplyToConcurrentTest.java
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java?rev=1412524&r1=1412523&r2=1412524&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java Thu Nov 22 12:31:07 2012
@@ -16,6 +16,9 @@
*/
package org.apache.camel.component.jms;
+import org.apache.camel.util.concurrent.CamelThreadFactory;
+import org.springframework.core.task.SimpleAsyncTaskExecutor;
+import org.springframework.core.task.TaskExecutor;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
/**
@@ -40,4 +43,22 @@ public class DefaultJmsMessageListenerCo
// do not run if we have been stopped
return endpoint.isRunning();
}
+
+ /**
+ * Create a default TaskExecutor. Called if no explicit TaskExecutor has been specified.
+ * <p>The default implementation builds a {@link org.springframework.core.task.SimpleAsyncTaskExecutor}
+ * with the specified bean name and using Camel's {@link org.apache.camel.spi.ExecutorServiceManager}
+ * to resolve the thread name.
+ * @see org.springframework.core.task.SimpleAsyncTaskExecutor#SimpleAsyncTaskExecutor(String)
+ */
+ @Override
+ protected TaskExecutor createDefaultTaskExecutor() {
+ String pattern = endpoint.getCamelContext().getExecutorServiceManager().getThreadNamePattern();
+ String beanName = getBeanName();
+
+ SimpleAsyncTaskExecutor answer = new SimpleAsyncTaskExecutor(beanName);
+ answer.setThreadFactory(new CamelThreadFactory(pattern, beanName, true));
+ return answer;
+ }
+
}
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ExclusivePersistentQueueMessageListenerContainer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ExclusivePersistentQueueMessageListenerContainer.java?rev=1412524&r1=1412523&r2=1412524&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ExclusivePersistentQueueMessageListenerContainer.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ExclusivePersistentQueueMessageListenerContainer.java Thu Nov 22 12:31:07 2012
@@ -16,7 +16,8 @@
*/
package org.apache.camel.component.jms.reply;
-import org.springframework.jms.listener.DefaultMessageListenerContainer;
+import org.apache.camel.component.jms.DefaultJmsMessageListenerContainer;
+import org.apache.camel.component.jms.JmsEndpoint;
/**
* This {@link org.springframework.jms.listener.DefaultMessageListenerContainer} is used for persistent reply queues
@@ -31,8 +32,11 @@ import org.springframework.jms.listener.
*
* @see SharedPersistentQueueMessageListenerContainer
*/
-public class ExclusivePersistentQueueMessageListenerContainer extends DefaultMessageListenerContainer {
+public class ExclusivePersistentQueueMessageListenerContainer extends DefaultJmsMessageListenerContainer {
// no need to override any methods currently
+ public ExclusivePersistentQueueMessageListenerContainer(JmsEndpoint endpoint) {
+ super(endpoint);
+ }
}
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java?rev=1412524&r1=1412523&r2=1412524&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java Thu Nov 22 12:31:07 2012
@@ -132,14 +132,14 @@ public class PersistentQueueReplyManager
// create a random selector value we will use for the persistent reply queue
replyToSelectorValue = "ID:" + new BigInteger(24 * 8, new Random()).toString(16);
String fixedMessageSelector = replyToSelectorName + "='" + replyToSelectorValue + "'";
- answer = new SharedPersistentQueueMessageListenerContainer(fixedMessageSelector);
+ answer = new SharedPersistentQueueMessageListenerContainer(endpoint, fixedMessageSelector);
// must use cache level consumer for fixed message selector
answer.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER);
log.debug("Using shared queue: " + endpoint.getReplyTo() + " with fixed message selector [" + fixedMessageSelector + "] as reply listener: " + answer);
} else {
// use a dynamic message selector which will select the message we want to receive as reply
dynamicMessageSelector = new MessageSelectorCreator(correlation);
- answer = new SharedPersistentQueueMessageListenerContainer(dynamicMessageSelector);
+ answer = new SharedPersistentQueueMessageListenerContainer(endpoint, dynamicMessageSelector);
// must use cache level session for dynamic message selector,
// as otherwise the dynamic message selector will not be updated on-the-fly
answer.setCacheLevel(DefaultMessageListenerContainer.CACHE_SESSION);
@@ -149,7 +149,7 @@ public class PersistentQueueReplyManager
log.warn("{} is using a shared reply queue, which is not as fast as alternatives."
+ " See more detail at the section 'Request-reply over JMS' at http://camel.apache.org/jms", endpoint);
} else if (ReplyToType.Exclusive == type) {
- answer = new ExclusivePersistentQueueMessageListenerContainer();
+ answer = new ExclusivePersistentQueueMessageListenerContainer(endpoint);
// must use cache level consumer for exclusive as there is no message selector
answer.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER);
log.debug("Using exclusive queue:" + endpoint.getReplyTo() + " as reply listener: " + answer);
@@ -208,12 +208,17 @@ public class PersistentQueueReplyManager
if (endpoint.getRecoveryInterval() >= 0) {
answer.setRecoveryInterval(endpoint.getRecoveryInterval());
}
- // do not use a task executor for reply as we are are always a single threaded task
+ // set task executor
+ if (endpoint.getTaskExecutor() != null) {
+ if (log.isDebugEnabled()) {
+ log.debug("Using custom TaskExecutor: {} on listener container: {}", endpoint.getTaskExecutor(), answer);
+ }
+ answer.setTaskExecutor(endpoint.getTaskExecutor());
+ }
// setup a bean name which is used ny Spring JMS as the thread name
String name = "PersistentQueueReplyManager[" + answer.getDestinationName() + "]";
- String beanName = endpoint.getCamelContext().getExecutorServiceManager().resolveThreadName(name);
- answer.setBeanName(beanName);
+ answer.setBeanName(name);
if (answer.getConcurrentConsumers() > 1) {
if (ReplyToType.Shared == type) {
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedPersistentQueueMessageListenerContainer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedPersistentQueueMessageListenerContainer.java?rev=1412524&r1=1412523&r2=1412524&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedPersistentQueueMessageListenerContainer.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedPersistentQueueMessageListenerContainer.java Thu Nov 22 12:31:07 2012
@@ -16,6 +16,8 @@
*/
package org.apache.camel.component.jms.reply;
+import org.apache.camel.component.jms.DefaultJmsMessageListenerContainer;
+import org.apache.camel.component.jms.JmsEndpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
@@ -32,7 +34,7 @@ import org.springframework.jms.listener.
*
* @see ExclusivePersistentQueueMessageListenerContainer
*/
-public class SharedPersistentQueueMessageListenerContainer extends DefaultMessageListenerContainer {
+public class SharedPersistentQueueMessageListenerContainer extends DefaultJmsMessageListenerContainer {
private static final Logger LOG = LoggerFactory.getLogger(SharedPersistentQueueMessageListenerContainer.class);
@@ -42,18 +44,22 @@ public class SharedPersistentQueueMessag
/**
* Use a fixed JMS message selector
*
+ * @param endpoint the endpoint
* @param fixedMessageSelector the fixed selector
*/
- public SharedPersistentQueueMessageListenerContainer(String fixedMessageSelector) {
+ public SharedPersistentQueueMessageListenerContainer(JmsEndpoint endpoint, String fixedMessageSelector) {
+ super(endpoint);
this.fixedMessageSelector = fixedMessageSelector;
}
/**
* Use a dynamic JMS message selector
*
+ * @param endpoint the endpoint
* @param creator the create to create the dynamic selector
*/
- public SharedPersistentQueueMessageListenerContainer(MessageSelectorCreator creator) {
+ public SharedPersistentQueueMessageListenerContainer(JmsEndpoint endpoint, MessageSelectorCreator creator) {
+ super(endpoint);
this.creator = creator;
}
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java?rev=1412524&r1=1412523&r2=1412524&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java Thu Nov 22 12:31:07 2012
@@ -25,6 +25,7 @@ import javax.jms.TemporaryQueue;
import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
+import org.apache.camel.component.jms.DefaultJmsMessageListenerContainer;
import org.apache.camel.component.jms.DefaultSpringErrorHandler;
import org.springframework.jms.listener.AbstractMessageListenerContainer;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
@@ -86,7 +87,7 @@ public class TemporaryQueueReplyManager
@Override
protected AbstractMessageListenerContainer createListenerContainer() throws Exception {
// Use DefaultMessageListenerContainer as it supports reconnects (see CAMEL-3193)
- DefaultMessageListenerContainer answer = new DefaultMessageListenerContainer();
+ DefaultMessageListenerContainer answer = new DefaultJmsMessageListenerContainer(endpoint);
answer.setDestinationName("temporary");
answer.setDestinationResolver(new DestinationResolver() {
@@ -136,12 +137,16 @@ public class TemporaryQueueReplyManager
if (endpoint.getRecoveryInterval() >= 0) {
answer.setRecoveryInterval(endpoint.getRecoveryInterval());
}
- // do not use a task executor for reply as we are are always a single threaded task
+ if (endpoint.getTaskExecutor() != null) {
+ if (log.isDebugEnabled()) {
+ log.debug("Using custom TaskExecutor: {} on listener container: {}", endpoint.getTaskExecutor(), answer);
+ }
+ answer.setTaskExecutor(endpoint.getTaskExecutor());
+ }
// setup a bean name which is used ny Spring JMS as the thread name
String name = "TemporaryQueueReplyManager[" + answer.getDestinationName() + "]";
- String beanName = endpoint.getCamelContext().getExecutorServiceManager().resolveThreadName(name);
- answer.setBeanName(beanName);
+ answer.setBeanName(name);
if (answer.getConcurrentConsumers() > 1) {
// log that we are using concurrent consumers
Modified: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyExclusiveReplyToConcurrentTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyExclusiveReplyToConcurrentTest.java?rev=1412524&r1=1412523&r2=1412524&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyExclusiveReplyToConcurrentTest.java (original)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyExclusiveReplyToConcurrentTest.java Thu Nov 22 12:31:07 2012
@@ -81,11 +81,11 @@ public class JmsRequestReplyExclusiveRep
@Override
public void configure() throws Exception {
from("direct:start")
- .to("activemq:queue:foo?replyTo=bar&replyToType=Exclusive&concurrentConsumers=5&maxConcurrentConsumers=10")
+ .to("activemq:queue:foo?replyTo=bar&replyToType=Exclusive&concurrentConsumers=5&maxConcurrentConsumers=10&maxMessagesPerTask=100")
.to("log:reply")
.to("mock:reply");
- from("activemq:queue:foo?concurrentConsumers=5&maxConcurrentConsumers=10")
+ from("activemq:queue:foo?concurrentConsumers=5&maxConcurrentConsumers=10&maxMessagesPerTask=100")
.transform(body().prepend("Hello "));
}
};