You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by dk...@apache.org on 2011/09/19 22:16:37 UTC

svn commit: r1172794 - in /camel/branches/camel-2.8.x: ./ components/camel-jms/src/main/java/org/apache/camel/component/jms/ components/camel-jms/src/test/java/org/apache/camel/component/jms/

Author: dkulp
Date: Mon Sep 19 20:16:36 2011
New Revision: 1172794

URL: http://svn.apache.org/viewvc?rev=1172794&view=rev
Log:
Merged revisions 1156306 via svnmerge from 
https://svn.apache.org/repos/asf/camel/trunk

........
  r1156306 | davsclaus | 2011-08-10 14:43:07 -0400 (Wed, 10 Aug 2011) | 1 line
  
  CAMEL-3938: Reintroduced consumerType option. Thanks to Joshua for the patch. Disable the async listener destroy as its a bit unstable.
........

Added:
    camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java
      - copied unchanged from r1156306, camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java
    camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/SimpleJmsMessageListenerContainer.java
      - copied unchanged from r1156306, camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/SimpleJmsMessageListenerContainer.java
Removed:
    camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageListenerContainer.java
Modified:
    camel/branches/camel-2.8.x/   (props changed)
    camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
    camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
    camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
    camel/branches/camel-2.8.x/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java

Propchange: camel/branches/camel-2.8.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java?rev=1172794&r1=1172793&r2=1172794&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java (original)
+++ camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java Mon Sep 19 20:16:36 2011
@@ -35,7 +35,9 @@ import org.springframework.jms.core.JmsO
 import org.springframework.jms.core.JmsTemplate;
 import org.springframework.jms.core.MessageCreator;
 import org.springframework.jms.core.SessionCallback;
+import org.springframework.jms.listener.AbstractMessageListenerContainer;
 import org.springframework.jms.listener.DefaultMessageListenerContainer;
+import org.springframework.jms.listener.SimpleMessageListenerContainer;
 import org.springframework.jms.support.JmsUtils;
 import org.springframework.jms.support.converter.MessageConverter;
 import org.springframework.jms.support.destination.DestinationResolver;
@@ -65,6 +67,7 @@ public class JmsConfiguration implements
     private String acknowledgementModeName;
     // Used to configure the spring Container
     private ExceptionListener exceptionListener;
+    private ConsumerType consumerType = ConsumerType.Default;
     private ErrorHandler errorHandler;    
     private boolean autoStartup = true;
     private boolean acceptMessagesWhileStopping;
@@ -359,15 +362,33 @@ public class JmsConfiguration implements
         return template;
     }
 
-    public DefaultMessageListenerContainer createMessageListenerContainer(JmsEndpoint endpoint) throws Exception {
-        DefaultMessageListenerContainer container = new JmsMessageListenerContainer(endpoint);
+    public AbstractMessageListenerContainer createMessageListenerContainer(JmsEndpoint endpoint) throws Exception {
+        AbstractMessageListenerContainer container = chooseMessageListenerContainerImplementation(endpoint);
         configureMessageListenerContainer(container, endpoint);
         return container;
     }
 
+    public AbstractMessageListenerContainer chooseMessageListenerContainerImplementation(JmsEndpoint endpoint) {
+        switch (consumerType) {
+        case Simple:
+            return new SimpleJmsMessageListenerContainer(endpoint);
+        case Default:
+            return new DefaultJmsMessageListenerContainer(endpoint);
+        default:
+            throw new IllegalArgumentException("Unknown consumer type: " + consumerType);
+        }
+    }
 
     // Properties
     // -------------------------------------------------------------------------
+    
+    public ConsumerType getConsumerType(){
+    	return consumerType;
+    }
+    
+    public void setConsumerType(ConsumerType consumerType) {
+        this.consumerType = consumerType;
+    }
 
     public ConnectionFactory getConnectionFactory() {
         if (connectionFactory == null) {
@@ -827,7 +848,7 @@ public class JmsConfiguration implements
     }
 
 
-    protected void configureMessageListenerContainer(DefaultMessageListenerContainer container,
+    protected void configureMessageListenerContainer(AbstractMessageListenerContainer container,
                                                      JmsEndpoint endpoint) throws Exception {
         container.setConnectionFactory(getListenerConnectionFactory());
         if (endpoint instanceof DestinationEndpoint) {
@@ -852,7 +873,7 @@ public class JmsConfiguration implements
         if (errorHandler != null) {
             container.setErrorHandler(errorHandler);
         }
-        
+
         container.setAcceptMessagesWhileStopping(acceptMessagesWhileStopping);
         container.setExposeListenerSession(exposeListenerSession);
         container.setSessionTransacted(transacted);
@@ -870,6 +891,33 @@ public class JmsConfiguration implements
             container.setMessageSelector(endpoint.getSelector());
         }
 
+        if (container instanceof DefaultMessageListenerContainer) {
+            DefaultMessageListenerContainer listenerContainer = (DefaultMessageListenerContainer) container;
+            configureDefaultMessageListenerContainer(endpoint, listenerContainer);
+        } else if (container instanceof SimpleMessageListenerContainer) {
+        	SimpleMessageListenerContainer listenerContainer = (SimpleMessageListenerContainer) container;
+        	configureSimpleMessageListenerContainer(listenerContainer); 
+        }
+    }
+
+    private void configureSimpleMessageListenerContainer(SimpleMessageListenerContainer listenerContainer) {
+        if (maxConcurrentConsumers > 0) {
+            if (maxConcurrentConsumers < concurrentConsumers) {
+                throw new IllegalArgumentException("Property maxConcurrentConsumers: " + maxConcurrentConsumers + " must be higher than concurrentConsumers: "
+                        + concurrentConsumers);
+            }
+            listenerContainer.setConcurrency(concurrentConsumers + "-" + maxConcurrentConsumers);
+        } else if (concurrentConsumers >= 0) {
+            listenerContainer.setConcurrentConsumers(concurrentConsumers);
+        }
+
+        listenerContainer.setPubSubNoLocal(pubSubNoLocal);
+        if (taskExecutor != null) {
+            listenerContainer.setTaskExecutor(taskExecutor);
+        }
+    }
+
+    private void configureDefaultMessageListenerContainer(JmsEndpoint endpoint, DefaultMessageListenerContainer container) {
         if (concurrentConsumers >= 0) {
             container.setConcurrentConsumers(concurrentConsumers);
         }

Modified: camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java?rev=1172794&r1=1172793&r2=1172794&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java (original)
+++ camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java Mon Sep 19 20:16:36 2011
@@ -22,20 +22,23 @@ import org.apache.camel.FailedToCreateCo
 import org.apache.camel.Processor;
 import org.apache.camel.SuspendableService;
 import org.apache.camel.impl.DefaultConsumer;
-import org.springframework.jms.listener.DefaultMessageListenerContainer;
+import org.springframework.jms.listener.AbstractMessageListenerContainer;
 import org.springframework.jms.support.JmsUtils;
 
 /**
- * A {@link org.apache.camel.Consumer} which uses Spring's {@link DefaultMessageListenerContainer} implementations to consume JMS messages
+ * A {@link org.apache.camel.Consumer} which uses Spring's {@link AbstractMessageListenerContainer} implementations
+ * to consume JMS messages.
  *
- * @version 
+ * @version
+ * @see DefaultJmsMessageListenerContainer
+ * @see SimpleJmsMessageListenerContainer
  */
 public class JmsConsumer extends DefaultConsumer implements SuspendableService {
-    private DefaultMessageListenerContainer listenerContainer;
+    private AbstractMessageListenerContainer listenerContainer;
     private EndpointMessageListener messageListener;
     private volatile boolean initialized;
 
-    public JmsConsumer(JmsEndpoint endpoint, Processor processor, DefaultMessageListenerContainer listenerContainer) {
+    public JmsConsumer(JmsEndpoint endpoint, Processor processor, AbstractMessageListenerContainer listenerContainer) {
         super(endpoint, processor);
         this.listenerContainer = listenerContainer;
         this.listenerContainer.setMessageListener(getEndpointMessageListener());
@@ -45,7 +48,7 @@ public class JmsConsumer extends Default
         return (JmsEndpoint) super.getEndpoint();
     }
 
-    public DefaultMessageListenerContainer getListenerContainer() throws Exception {
+    public AbstractMessageListenerContainer getListenerContainer() throws Exception {
         if (listenerContainer == null) {
             createMessageListenerContainer();
         }

Modified: camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?rev=1172794&r1=1172793&r2=1172794&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java (original)
+++ camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java Mon Sep 19 20:16:36 2011
@@ -18,6 +18,7 @@ package org.apache.camel.component.jms;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -53,7 +54,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.core.task.TaskExecutor;
 import org.springframework.jms.core.JmsOperations;
+import org.springframework.jms.listener.AbstractMessageListenerContainer;
 import org.springframework.jms.listener.DefaultMessageListenerContainer;
+import org.springframework.jms.listener.SimpleMessageListenerContainer;
 import org.springframework.jms.support.converter.MessageConverter;
 import org.springframework.jms.support.destination.DestinationResolver;
 import org.springframework.jmx.export.annotation.ManagedAttribute;
@@ -155,15 +158,15 @@ public class JmsEndpoint extends Default
     }
 
     public JmsConsumer createConsumer(Processor processor) throws Exception {
-        DefaultMessageListenerContainer listenerContainer = createMessageListenerContainer();
+        AbstractMessageListenerContainer listenerContainer = createMessageListenerContainer();
         return createConsumer(processor, listenerContainer);
     }
 
-    public DefaultMessageListenerContainer createMessageListenerContainer() throws Exception {
+    public AbstractMessageListenerContainer createMessageListenerContainer() throws Exception {
         return configuration.createMessageListenerContainer(this);
     }
 
-    public void configureListenerContainer(DefaultMessageListenerContainer listenerContainer, JmsConsumer consumer) {
+    public void configureListenerContainer(AbstractMessageListenerContainer listenerContainer, JmsConsumer consumer) {
         if (destinationName != null) {
             listenerContainer.setDestinationName(destinationName);
         } else if (destination != null) {
@@ -182,13 +185,21 @@ public class JmsEndpoint extends Default
             if (log.isDebugEnabled()) {
                 log.debug("Using custom TaskExecutor: {} on listener container: {}", configuration.getTaskExecutor(), listenerContainer);
             }
-            listenerContainer.setTaskExecutor(configuration.getTaskExecutor());
+            setContainerTaskExecutor(listenerContainer, configuration.getTaskExecutor());
         } else {
             // include destination name as part of thread name
             String name = "JmsConsumer[" + getEndpointConfiguredDestinationName() + "]";
             // use a cached pool as DefaultMessageListenerContainer will throttle pool sizing
             ExecutorService executor = getCamelContext().getExecutorServiceStrategy().newCachedThreadPool(consumer, name);
-            listenerContainer.setTaskExecutor(executor);
+            setContainerTaskExecutor(listenerContainer, executor);
+        }
+    }
+
+    private void setContainerTaskExecutor(AbstractMessageListenerContainer listenerContainer, Executor executor) {
+        if (listenerContainer instanceof SimpleMessageListenerContainer) {
+            ((SimpleMessageListenerContainer) listenerContainer).setTaskExecutor(executor);
+        } else if (listenerContainer instanceof DefaultMessageListenerContainer) {
+            ((DefaultMessageListenerContainer) listenerContainer).setTaskExecutor(executor);
         }
     }
 
@@ -210,7 +221,7 @@ public class JmsEndpoint extends Default
      * @return a newly created consumer
      * @throws Exception if the consumer cannot be created
      */
-    public JmsConsumer createConsumer(Processor processor, DefaultMessageListenerContainer listenerContainer) throws Exception {
+    public JmsConsumer createConsumer(Processor processor, AbstractMessageListenerContainer listenerContainer) throws Exception {
         JmsConsumer consumer = new JmsConsumer(this, processor, listenerContainer);
         configureListenerContainer(listenerContainer, consumer);
         return consumer;
@@ -753,7 +764,7 @@ public class JmsEndpoint extends Default
     public void setErrorHandler(ErrorHandler errorHandler) {
         getConfiguration().setErrorHandler(errorHandler);
     }
-    
+
     @ManagedAttribute
     public void setExplicitQosEnabled(boolean explicitQosEnabled) {
         getConfiguration().setExplicitQosEnabled(explicitQosEnabled);

Modified: camel/branches/camel-2.8.x/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java?rev=1172794&r1=1172793&r2=1172794&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java (original)
+++ camel/branches/camel-2.8.x/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java Mon Sep 19 20:16:36 2011
@@ -36,6 +36,7 @@ import org.springframework.jms.core.JmsO
 import org.springframework.jms.core.JmsTemplate;
 import org.springframework.jms.listener.AbstractMessageListenerContainer;
 import org.springframework.jms.listener.DefaultMessageListenerContainer;
+import org.springframework.jms.listener.SimpleMessageListenerContainer;
 import org.springframework.jms.support.converter.SimpleMessageConverter;
 import org.springframework.util.ErrorHandler;
 
@@ -129,6 +130,15 @@ public class JmsEndpointConfigurationTes
     }
 
     @Test
+    public void testCreateSimpleMessageListener() throws Exception{
+    	 JmsEndpoint endpoint = (JmsEndpoint) resolveMandatoryEndpoint("jms:Foo.Bar?consumerType=Simple");
+         JmsConsumer consumer = endpoint.createConsumer(dummyProcessor);
+
+         AbstractMessageListenerContainer container = consumer.getListenerContainer();
+         assertTrue("Should have been a SimpleMessageListenerContainer",container instanceof SimpleMessageListenerContainer);
+    }
+
+    @Test
     public void testCacheConsumerEnabledForQueue() throws Exception {
         JmsEndpoint endpoint = (JmsEndpoint) resolveMandatoryEndpoint("jms:Foo.Bar");
         assertCacheLevel(endpoint, DefaultMessageListenerContainer.CACHE_AUTO);
@@ -161,6 +171,12 @@ public class JmsEndpointConfigurationTes
         JmsEndpoint endpoint = (JmsEndpoint) resolveMandatoryEndpoint("jms:queue:Foo?maxConcurrentConsumers=5");
         assertEquals(5, endpoint.getMaxConcurrentConsumers());
     }
+    
+    @Test
+    public void testMaxConcurrentConsumersForSimpleConsumer() throws Exception {
+        JmsEndpoint endpoint = (JmsEndpoint) resolveMandatoryEndpoint("jms:queue:Foo?maxConcurrentConsumers=5&consumerType=Simple");
+        assertEquals(5, endpoint.getMaxConcurrentConsumers());
+    }
 
     @Test
     public void testInvalidMaxConcurrentConsumers() throws Exception {
@@ -172,14 +188,40 @@ public class JmsEndpointConfigurationTes
             assertEquals("Property maxConcurrentConsumers: 2 must be higher than concurrentConsumers: 5", e.getMessage());
         }
     }
+    
+    @Test
+    public void testInvalidMaxConcurrentConsumersForSimpleConsumer() throws Exception {
+    	JmsEndpoint endpoint = (JmsEndpoint) resolveMandatoryEndpoint("jms:queue:Foo?concurrentConsumers=5&maxConcurrentConsumers=2&consumerType=Simple");
+        
+    	try {
+            endpoint.createConsumer(new CamelLogger());
+            fail("Should have thrown exception");
+        } catch (IllegalArgumentException e) {
+            assertEquals("Property maxConcurrentConsumers: 2 must be higher than concurrentConsumers: 5", e.getMessage());
+        }
+    }
 
     @Test
     public void testConcurrentConsumers() throws Exception {
         JmsEndpoint endpoint = (JmsEndpoint) resolveMandatoryEndpoint("jms:queue:Foo?concurrentConsumers=4");
         assertEquals(4, endpoint.getConcurrentConsumers());
     }
+    
+    @Test
+    public void testConcurrentConsumersForSimpleConsumer() throws Exception {
+    	JmsEndpoint endpoint = (JmsEndpoint) resolveMandatoryEndpoint("jms:queue:Foo?concurrentConsumers=4&consumerType=Simple");
+        
+        assertEquals(4, endpoint.getConcurrentConsumers());
+    }
 
     @Test
+    public void testPubSubNoLocalForSimpleConsumer() throws Exception{
+    	JmsEndpoint endpoint = (JmsEndpoint) resolveMandatoryEndpoint("jms:queue:Foo?pubSubNoLocal=true&consumerType=Simple");
+      
+        assertTrue("PubSubNoLocal should be true", endpoint.isPubSubNoLocal());
+    }
+    
+    @Test
     public void testIdleTaskExecutionLimit() throws Exception {
         JmsEndpoint endpoint = (JmsEndpoint) resolveMandatoryEndpoint("jms:queue:Foo?idleTaskExecutionLimit=50");
         assertEquals(50, endpoint.getIdleTaskExecutionLimit());