You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/01/19 11:12:09 UTC

svn commit: r1060739 - in /camel/trunk/components/camel-jms/src: main/java/org/apache/camel/component/jms/ main/java/org/apache/camel/component/jms/reply/ test/java/org/apache/camel/component/jms/

Author: davsclaus
Date: Wed Jan 19 10:12:09 2011
New Revision: 1060739

URL: http://svn.apache.org/viewvc?rev=1060739&view=rev
Log:
CAMEL-3286: Spring 2.x task executor should be configurable on consumer and request/reply consumers as well.

Modified:
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
    camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/CamelJmsTestHelper.java
    camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyProcessRepliesConcurrentUsingThreadsTest.java

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java?rev=1060739&r1=1060738&r2=1060739&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java Wed Jan 19 10:12:09 2011
@@ -25,6 +25,7 @@ import javax.jms.MessageProducer;
 import javax.jms.Session;
 
 import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.util.IntrospectionSupport;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.PackageHelper;
 import org.apache.commons.logging.Log;
@@ -354,7 +355,7 @@ public class JmsConfiguration implements
         return template;
     }
 
-    public AbstractMessageListenerContainer createMessageListenerContainer(JmsEndpoint endpoint) {
+    public AbstractMessageListenerContainer createMessageListenerContainer(JmsEndpoint endpoint) throws Exception {
         AbstractMessageListenerContainer container = chooseMessageListenerContainerImplementation();
         configureMessageListenerContainer(container, endpoint);
         return container;
@@ -824,7 +825,7 @@ public class JmsConfiguration implements
 
 
     protected void configureMessageListenerContainer(AbstractMessageListenerContainer container,
-                                                     JmsEndpoint endpoint) {
+                                                     JmsEndpoint endpoint) throws Exception {
         container.setConnectionFactory(getListenerConnectionFactory());
         if (endpoint instanceof DestinationEndpoint) {
             container.setDestinationResolver(createDestinationResolver((DestinationEndpoint) endpoint));
@@ -915,6 +916,12 @@ public class JmsConfiguration implements
             if (transactionTimeout >= 0) {
                 listenerContainer.setTransactionTimeout(transactionTimeout);
             }
+            if (taskExecutor != null) {
+                listenerContainer.setTaskExecutor(taskExecutor);
+            } else if (taskExecutorSpring2 != null) {
+                // use reflection to invoke to support spring 2 when JAR is compiled with Spring 3.0
+                IntrospectionSupport.setProperty(listenerContainer, "taskExecutor", endpoint.getTaskExecutorSpring2());
+            }
         } else if (container instanceof SimpleMessageListenerContainer) {
             // this includes SimpleMessageListenerContainer102
             SimpleMessageListenerContainer listenerContainer = (SimpleMessageListenerContainer) container;
@@ -924,6 +931,9 @@ public class JmsConfiguration implements
             listenerContainer.setPubSubNoLocal(pubSubNoLocal);
             if (taskExecutor != null) {
                 listenerContainer.setTaskExecutor(taskExecutor);
+            } else if (taskExecutorSpring2 != null) {
+                // use reflection to invoke to support spring 2 when JAR is compiled with Spring 3.0
+                IntrospectionSupport.setProperty(listenerContainer, "taskExecutor", endpoint.getTaskExecutorSpring2());
             }
         }
     }

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java?rev=1060739&r1=1060738&r2=1060739&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java Wed Jan 19 10:12:09 2011
@@ -45,7 +45,7 @@ public class JmsConsumer extends Default
         return (JmsEndpoint) super.getEndpoint();
     }
 
-    public AbstractMessageListenerContainer getListenerContainer() {
+    public AbstractMessageListenerContainer getListenerContainer() throws Exception {
         if (listenerContainer == null) {
             createMessageListenerContainer();
         }
@@ -64,7 +64,7 @@ public class JmsConsumer extends Default
         messageListener.setBinding(endpoint.getBinding());
     }
 
-    protected void createMessageListenerContainer() {
+    protected void createMessageListenerContainer() throws Exception {
         listenerContainer = getEndpoint().createMessageListenerContainer();
         getEndpoint().configureListenerContainer(listenerContainer);
         listenerContainer.setMessageListener(getEndpointMessageListener());

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?rev=1060739&r1=1060738&r2=1060739&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java Wed Jan 19 10:12:09 2011
@@ -150,7 +150,7 @@ public class JmsEndpoint extends Default
         return createConsumer(processor, listenerContainer);
     }
 
-    public AbstractMessageListenerContainer createMessageListenerContainer() {
+    public AbstractMessageListenerContainer createMessageListenerContainer() throws Exception {
         return configuration.createMessageListenerContainer(this);
     }
 

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java?rev=1060739&r1=1060738&r2=1060739&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java Wed Jan 19 10:12:09 2011
@@ -170,7 +170,6 @@ public class PersistentQueueReplyManager
             answer = new PersistentQueueMessageListenerContainer(dynamicMessageSelector);
         }
 
-        answer.setConnectionFactory(endpoint.getListenerConnectionFactory());
         DestinationResolver resolver = endpoint.getDestinationResolver();
         if (resolver == null) {
             resolver = answer.getDestinationResolver();
@@ -183,6 +182,12 @@ public class PersistentQueueReplyManager
         answer.setPubSubDomain(false);
         answer.setSubscriptionDurable(false);
         answer.setConcurrentConsumers(1);
+        answer.setConnectionFactory(endpoint.getConnectionFactory());
+        String clientId = endpoint.getClientId();
+        if (clientId != null) {
+            clientId += ".CamelReplyManager";
+            answer.setClientId(clientId);
+        }
         // must use cache level session
         answer.setCacheLevel(DefaultMessageListenerContainer.CACHE_SESSION);
 
@@ -201,8 +206,7 @@ public class PersistentQueueReplyManager
         }
         if (endpoint.getTaskExecutor() != null) {
             answer.setTaskExecutor(endpoint.getTaskExecutor());
-        }
-        if (endpoint.getTaskExecutorSpring2() != null) {
+        } else if (endpoint.getTaskExecutorSpring2() != null) {
             // use reflection to invoke to support spring 2 when JAR is compiled with Spring 3.0
             IntrospectionSupport.setProperty(answer, "taskExecutor", endpoint.getTaskExecutorSpring2());
         }

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java?rev=1060739&r1=1060738&r2=1060739&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java Wed Jan 19 10:12:09 2011
@@ -26,7 +26,6 @@ import javax.jms.TemporaryQueue;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.util.IntrospectionSupport;
-import org.springframework.core.task.TaskExecutor;
 import org.springframework.jms.listener.AbstractMessageListenerContainer;
 import org.springframework.jms.listener.DefaultMessageListenerContainer;
 import org.springframework.jms.support.destination.DestinationResolver;
@@ -102,24 +101,32 @@ public class TemporaryQueueReplyManager 
         answer.setSubscriptionDurable(false);
         answer.setConcurrentConsumers(1);
         answer.setConnectionFactory(endpoint.getConnectionFactory());
-        answer.setSessionTransacted(false);
         String clientId = endpoint.getClientId();
         if (clientId != null) {
             clientId += ".CamelReplyManager";
             answer.setClientId(clientId);
         }
-        TaskExecutor taskExecutor = endpoint.getTaskExecutor();
-        if (taskExecutor != null) {
-            answer.setTaskExecutor(taskExecutor);
-        }
-        if (endpoint.getTaskExecutorSpring2() != null) {
+
+        // we cannot do request-reply over JMS with transaction
+        answer.setSessionTransacted(false);
+
+        // other optional properties
+        if (endpoint.getExceptionListener() != null) {
+            answer.setExceptionListener(endpoint.getExceptionListener());
+        }
+        if (endpoint.getReceiveTimeout() >= 0) {
+            answer.setReceiveTimeout(endpoint.getReceiveTimeout());
+        }
+        if (endpoint.getRecoveryInterval() >= 0) {
+            answer.setRecoveryInterval(endpoint.getRecoveryInterval());
+        }
+        if (endpoint.getTaskExecutor() != null) {
+            answer.setTaskExecutor(endpoint.getTaskExecutor());
+        } else if (endpoint.getTaskExecutorSpring2() != null) {
             // use reflection to invoke to support spring 2 when JAR is compiled with Spring 3.0
             IntrospectionSupport.setProperty(answer, "taskExecutor", endpoint.getTaskExecutorSpring2());
         }
-        ExceptionListener exceptionListener = endpoint.getExceptionListener();
-        if (exceptionListener != null) {
-            answer.setExceptionListener(exceptionListener);
-        }
+
         return answer;
     }
 

Modified: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/CamelJmsTestHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/CamelJmsTestHelper.java?rev=1060739&r1=1060738&r2=1060739&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/CamelJmsTestHelper.java (original)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/CamelJmsTestHelper.java Wed Jan 19 10:12:09 2011
@@ -34,16 +34,32 @@ public final class CamelJmsTestHelper {
     }
 
     public static ConnectionFactory createConnectionFactory() {
+        return createConnectionFactory(null);
+    }
+
+    public static ConnectionFactory createConnectionFactory(String options) {
         // using a unique broker name improves testing when running the entire test suite in the same JVM
         int id = counter.incrementAndGet();
-        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://test-broker-" + id + "?broker.persistent=false&broker.useJmx=false");
+        String url = "vm://test-broker-" + id + "?broker.persistent=false&broker.useJmx=false";
+        if (options != null) {
+            url = url + "&" + options;
+        }
+        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
         return connectionFactory;
     }
 
     public static ConnectionFactory createPersistentConnectionFactory() {
+        return createPersistentConnectionFactory(null);
+    }
+
+    public static ConnectionFactory createPersistentConnectionFactory(String options) {
         // using a unique broker name improves testing when running the entire test suite in the same JVM
         int id = counter.incrementAndGet();
-        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://test-broker-" + id + "?broker.persistent=true&broker.useJmx=false");
+        String url = "vm://test-broker-" + id + "?broker.persistent=true&broker.useJmx=false";
+        if (options != null) {
+            url = url + "&" + options;
+        }
+        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
         return connectionFactory;
     }
 }

Modified: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyProcessRepliesConcurrentUsingThreadsTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyProcessRepliesConcurrentUsingThreadsTest.java?rev=1060739&r1=1060738&r2=1060739&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyProcessRepliesConcurrentUsingThreadsTest.java (original)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyProcessRepliesConcurrentUsingThreadsTest.java Wed Jan 19 10:12:09 2011
@@ -18,7 +18,6 @@ package org.apache.camel.component.jms;
 
 import javax.jms.ConnectionFactory;
 
-import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.camel.CamelContext;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
@@ -66,8 +65,9 @@ public class JmsRequestReplyProcessRepli
             public void configure() throws Exception {
                 from("seda:start")
                     .inOut().to("activemq:queue:foo")
-                    .threads(5)
                     .log("reply   - ${body}")
+                    .threads(5)
+                    .log("delay   - ${body}")
                     .delay(2000)
                     .log("done    - ${body}")
                     .to("mock:result");