You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/01/19 11:12:09 UTC
svn commit: r1060739 - in /camel/trunk/components/camel-jms/src:
main/java/org/apache/camel/component/jms/
main/java/org/apache/camel/component/jms/reply/
test/java/org/apache/camel/component/jms/
Author: davsclaus
Date: Wed Jan 19 10:12:09 2011
New Revision: 1060739
URL: http://svn.apache.org/viewvc?rev=1060739&view=rev
Log:
CAMEL-3286: Spring 2.x task executor should be configurable on consumer and request/reply consumers as well.
Modified:
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/CamelJmsTestHelper.java
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyProcessRepliesConcurrentUsingThreadsTest.java
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java?rev=1060739&r1=1060738&r2=1060739&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java Wed Jan 19 10:12:09 2011
@@ -25,6 +25,7 @@ import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.util.IntrospectionSupport;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.PackageHelper;
import org.apache.commons.logging.Log;
@@ -354,7 +355,7 @@ public class JmsConfiguration implements
return template;
}
- public AbstractMessageListenerContainer createMessageListenerContainer(JmsEndpoint endpoint) {
+ public AbstractMessageListenerContainer createMessageListenerContainer(JmsEndpoint endpoint) throws Exception {
AbstractMessageListenerContainer container = chooseMessageListenerContainerImplementation();
configureMessageListenerContainer(container, endpoint);
return container;
@@ -824,7 +825,7 @@ public class JmsConfiguration implements
protected void configureMessageListenerContainer(AbstractMessageListenerContainer container,
- JmsEndpoint endpoint) {
+ JmsEndpoint endpoint) throws Exception {
container.setConnectionFactory(getListenerConnectionFactory());
if (endpoint instanceof DestinationEndpoint) {
container.setDestinationResolver(createDestinationResolver((DestinationEndpoint) endpoint));
@@ -915,6 +916,12 @@ public class JmsConfiguration implements
if (transactionTimeout >= 0) {
listenerContainer.setTransactionTimeout(transactionTimeout);
}
+ if (taskExecutor != null) {
+ listenerContainer.setTaskExecutor(taskExecutor);
+ } else if (taskExecutorSpring2 != null) {
+ // use reflection to invoke to support spring 2 when JAR is compiled with Spring 3.0
+ IntrospectionSupport.setProperty(listenerContainer, "taskExecutor", endpoint.getTaskExecutorSpring2());
+ }
} else if (container instanceof SimpleMessageListenerContainer) {
// this includes SimpleMessageListenerContainer102
SimpleMessageListenerContainer listenerContainer = (SimpleMessageListenerContainer) container;
@@ -924,6 +931,9 @@ public class JmsConfiguration implements
listenerContainer.setPubSubNoLocal(pubSubNoLocal);
if (taskExecutor != null) {
listenerContainer.setTaskExecutor(taskExecutor);
+ } else if (taskExecutorSpring2 != null) {
+ // use reflection to invoke to support spring 2 when JAR is compiled with Spring 3.0
+ IntrospectionSupport.setProperty(listenerContainer, "taskExecutor", endpoint.getTaskExecutorSpring2());
}
}
}
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java?rev=1060739&r1=1060738&r2=1060739&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java Wed Jan 19 10:12:09 2011
@@ -45,7 +45,7 @@ public class JmsConsumer extends Default
return (JmsEndpoint) super.getEndpoint();
}
- public AbstractMessageListenerContainer getListenerContainer() {
+ public AbstractMessageListenerContainer getListenerContainer() throws Exception {
if (listenerContainer == null) {
createMessageListenerContainer();
}
@@ -64,7 +64,7 @@ public class JmsConsumer extends Default
messageListener.setBinding(endpoint.getBinding());
}
- protected void createMessageListenerContainer() {
+ protected void createMessageListenerContainer() throws Exception {
listenerContainer = getEndpoint().createMessageListenerContainer();
getEndpoint().configureListenerContainer(listenerContainer);
listenerContainer.setMessageListener(getEndpointMessageListener());
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?rev=1060739&r1=1060738&r2=1060739&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java Wed Jan 19 10:12:09 2011
@@ -150,7 +150,7 @@ public class JmsEndpoint extends Default
return createConsumer(processor, listenerContainer);
}
- public AbstractMessageListenerContainer createMessageListenerContainer() {
+ public AbstractMessageListenerContainer createMessageListenerContainer() throws Exception {
return configuration.createMessageListenerContainer(this);
}
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java?rev=1060739&r1=1060738&r2=1060739&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java Wed Jan 19 10:12:09 2011
@@ -170,7 +170,6 @@ public class PersistentQueueReplyManager
answer = new PersistentQueueMessageListenerContainer(dynamicMessageSelector);
}
- answer.setConnectionFactory(endpoint.getListenerConnectionFactory());
DestinationResolver resolver = endpoint.getDestinationResolver();
if (resolver == null) {
resolver = answer.getDestinationResolver();
@@ -183,6 +182,12 @@ public class PersistentQueueReplyManager
answer.setPubSubDomain(false);
answer.setSubscriptionDurable(false);
answer.setConcurrentConsumers(1);
+ answer.setConnectionFactory(endpoint.getConnectionFactory());
+ String clientId = endpoint.getClientId();
+ if (clientId != null) {
+ clientId += ".CamelReplyManager";
+ answer.setClientId(clientId);
+ }
// must use cache level session
answer.setCacheLevel(DefaultMessageListenerContainer.CACHE_SESSION);
@@ -201,8 +206,7 @@ public class PersistentQueueReplyManager
}
if (endpoint.getTaskExecutor() != null) {
answer.setTaskExecutor(endpoint.getTaskExecutor());
- }
- if (endpoint.getTaskExecutorSpring2() != null) {
+ } else if (endpoint.getTaskExecutorSpring2() != null) {
// use reflection to invoke to support spring 2 when JAR is compiled with Spring 3.0
IntrospectionSupport.setProperty(answer, "taskExecutor", endpoint.getTaskExecutorSpring2());
}
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java?rev=1060739&r1=1060738&r2=1060739&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java Wed Jan 19 10:12:09 2011
@@ -26,7 +26,6 @@ import javax.jms.TemporaryQueue;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.util.IntrospectionSupport;
-import org.springframework.core.task.TaskExecutor;
import org.springframework.jms.listener.AbstractMessageListenerContainer;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.jms.support.destination.DestinationResolver;
@@ -102,24 +101,32 @@ public class TemporaryQueueReplyManager
answer.setSubscriptionDurable(false);
answer.setConcurrentConsumers(1);
answer.setConnectionFactory(endpoint.getConnectionFactory());
- answer.setSessionTransacted(false);
String clientId = endpoint.getClientId();
if (clientId != null) {
clientId += ".CamelReplyManager";
answer.setClientId(clientId);
}
- TaskExecutor taskExecutor = endpoint.getTaskExecutor();
- if (taskExecutor != null) {
- answer.setTaskExecutor(taskExecutor);
- }
- if (endpoint.getTaskExecutorSpring2() != null) {
+
+ // we cannot do request-reply over JMS with transaction
+ answer.setSessionTransacted(false);
+
+ // other optional properties
+ if (endpoint.getExceptionListener() != null) {
+ answer.setExceptionListener(endpoint.getExceptionListener());
+ }
+ if (endpoint.getReceiveTimeout() >= 0) {
+ answer.setReceiveTimeout(endpoint.getReceiveTimeout());
+ }
+ if (endpoint.getRecoveryInterval() >= 0) {
+ answer.setRecoveryInterval(endpoint.getRecoveryInterval());
+ }
+ if (endpoint.getTaskExecutor() != null) {
+ answer.setTaskExecutor(endpoint.getTaskExecutor());
+ } else if (endpoint.getTaskExecutorSpring2() != null) {
// use reflection to invoke to support spring 2 when JAR is compiled with Spring 3.0
IntrospectionSupport.setProperty(answer, "taskExecutor", endpoint.getTaskExecutorSpring2());
}
- ExceptionListener exceptionListener = endpoint.getExceptionListener();
- if (exceptionListener != null) {
- answer.setExceptionListener(exceptionListener);
- }
+
return answer;
}
Modified: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/CamelJmsTestHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/CamelJmsTestHelper.java?rev=1060739&r1=1060738&r2=1060739&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/CamelJmsTestHelper.java (original)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/CamelJmsTestHelper.java Wed Jan 19 10:12:09 2011
@@ -34,16 +34,32 @@ public final class CamelJmsTestHelper {
}
public static ConnectionFactory createConnectionFactory() {
+ return createConnectionFactory(null);
+ }
+
+ public static ConnectionFactory createConnectionFactory(String options) {
// using a unique broker name improves testing when running the entire test suite in the same JVM
int id = counter.incrementAndGet();
- ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://test-broker-" + id + "?broker.persistent=false&broker.useJmx=false");
+ String url = "vm://test-broker-" + id + "?broker.persistent=false&broker.useJmx=false";
+ if (options != null) {
+ url = url + "&" + options;
+ }
+ ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
return connectionFactory;
}
public static ConnectionFactory createPersistentConnectionFactory() {
+ return createPersistentConnectionFactory(null);
+ }
+
+ public static ConnectionFactory createPersistentConnectionFactory(String options) {
// using a unique broker name improves testing when running the entire test suite in the same JVM
int id = counter.incrementAndGet();
- ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://test-broker-" + id + "?broker.persistent=true&broker.useJmx=false");
+ String url = "vm://test-broker-" + id + "?broker.persistent=true&broker.useJmx=false";
+ if (options != null) {
+ url = url + "&" + options;
+ }
+ ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
return connectionFactory;
}
}
Modified: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyProcessRepliesConcurrentUsingThreadsTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyProcessRepliesConcurrentUsingThreadsTest.java?rev=1060739&r1=1060738&r2=1060739&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyProcessRepliesConcurrentUsingThreadsTest.java (original)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyProcessRepliesConcurrentUsingThreadsTest.java Wed Jan 19 10:12:09 2011
@@ -18,7 +18,6 @@ package org.apache.camel.component.jms;
import javax.jms.ConnectionFactory;
-import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
@@ -66,8 +65,9 @@ public class JmsRequestReplyProcessRepli
public void configure() throws Exception {
from("seda:start")
.inOut().to("activemq:queue:foo")
- .threads(5)
.log("reply - ${body}")
+ .threads(5)
+ .log("delay - ${body}")
.delay(2000)
.log("done - ${body}")
.to("mock:result");