You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by dk...@apache.org on 2011/09/19 22:16:37 UTC
svn commit: r1172794 - in /camel/branches/camel-2.8.x: ./
components/camel-jms/src/main/java/org/apache/camel/component/jms/
components/camel-jms/src/test/java/org/apache/camel/component/jms/
Author: dkulp
Date: Mon Sep 19 20:16:36 2011
New Revision: 1172794
URL: http://svn.apache.org/viewvc?rev=1172794&view=rev
Log:
Merged revisions 1156306 via svnmerge from
https://svn.apache.org/repos/asf/camel/trunk
........
r1156306 | davsclaus | 2011-08-10 14:43:07 -0400 (Wed, 10 Aug 2011) | 1 line
CAMEL-3938: Reintroduced consumerType option. Thanks to Joshua for the patch. Disable the async listener destroy as its a bit unstable.
........
Added:
camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java
- copied unchanged from r1156306, camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java
camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/SimpleJmsMessageListenerContainer.java
- copied unchanged from r1156306, camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/SimpleJmsMessageListenerContainer.java
Removed:
camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageListenerContainer.java
Modified:
camel/branches/camel-2.8.x/ (props changed)
camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
camel/branches/camel-2.8.x/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java
Propchange: camel/branches/camel-2.8.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.
Modified: camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java?rev=1172794&r1=1172793&r2=1172794&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java (original)
+++ camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java Mon Sep 19 20:16:36 2011
@@ -35,7 +35,9 @@ import org.springframework.jms.core.JmsO
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.core.SessionCallback;
+import org.springframework.jms.listener.AbstractMessageListenerContainer;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
+import org.springframework.jms.listener.SimpleMessageListenerContainer;
import org.springframework.jms.support.JmsUtils;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.destination.DestinationResolver;
@@ -65,6 +67,7 @@ public class JmsConfiguration implements
private String acknowledgementModeName;
// Used to configure the spring Container
private ExceptionListener exceptionListener;
+ private ConsumerType consumerType = ConsumerType.Default;
private ErrorHandler errorHandler;
private boolean autoStartup = true;
private boolean acceptMessagesWhileStopping;
@@ -359,15 +362,33 @@ public class JmsConfiguration implements
return template;
}
- public DefaultMessageListenerContainer createMessageListenerContainer(JmsEndpoint endpoint) throws Exception {
- DefaultMessageListenerContainer container = new JmsMessageListenerContainer(endpoint);
+ public AbstractMessageListenerContainer createMessageListenerContainer(JmsEndpoint endpoint) throws Exception {
+ AbstractMessageListenerContainer container = chooseMessageListenerContainerImplementation(endpoint);
configureMessageListenerContainer(container, endpoint);
return container;
}
+ public AbstractMessageListenerContainer chooseMessageListenerContainerImplementation(JmsEndpoint endpoint) {
+ switch (consumerType) {
+ case Simple:
+ return new SimpleJmsMessageListenerContainer(endpoint);
+ case Default:
+ return new DefaultJmsMessageListenerContainer(endpoint);
+ default:
+ throw new IllegalArgumentException("Unknown consumer type: " + consumerType);
+ }
+ }
// Properties
// -------------------------------------------------------------------------
+
+ public ConsumerType getConsumerType(){
+ return consumerType;
+ }
+
+ public void setConsumerType(ConsumerType consumerType) {
+ this.consumerType = consumerType;
+ }
public ConnectionFactory getConnectionFactory() {
if (connectionFactory == null) {
@@ -827,7 +848,7 @@ public class JmsConfiguration implements
}
- protected void configureMessageListenerContainer(DefaultMessageListenerContainer container,
+ protected void configureMessageListenerContainer(AbstractMessageListenerContainer container,
JmsEndpoint endpoint) throws Exception {
container.setConnectionFactory(getListenerConnectionFactory());
if (endpoint instanceof DestinationEndpoint) {
@@ -852,7 +873,7 @@ public class JmsConfiguration implements
if (errorHandler != null) {
container.setErrorHandler(errorHandler);
}
-
+
container.setAcceptMessagesWhileStopping(acceptMessagesWhileStopping);
container.setExposeListenerSession(exposeListenerSession);
container.setSessionTransacted(transacted);
@@ -870,6 +891,33 @@ public class JmsConfiguration implements
container.setMessageSelector(endpoint.getSelector());
}
+ if (container instanceof DefaultMessageListenerContainer) {
+ DefaultMessageListenerContainer listenerContainer = (DefaultMessageListenerContainer) container;
+ configureDefaultMessageListenerContainer(endpoint, listenerContainer);
+ } else if (container instanceof SimpleMessageListenerContainer) {
+ SimpleMessageListenerContainer listenerContainer = (SimpleMessageListenerContainer) container;
+ configureSimpleMessageListenerContainer(listenerContainer);
+ }
+ }
+
+ private void configureSimpleMessageListenerContainer(SimpleMessageListenerContainer listenerContainer) {
+ if (maxConcurrentConsumers > 0) {
+ if (maxConcurrentConsumers < concurrentConsumers) {
+ throw new IllegalArgumentException("Property maxConcurrentConsumers: " + maxConcurrentConsumers + " must be higher than concurrentConsumers: "
+ + concurrentConsumers);
+ }
+ listenerContainer.setConcurrency(concurrentConsumers + "-" + maxConcurrentConsumers);
+ } else if (concurrentConsumers >= 0) {
+ listenerContainer.setConcurrentConsumers(concurrentConsumers);
+ }
+
+ listenerContainer.setPubSubNoLocal(pubSubNoLocal);
+ if (taskExecutor != null) {
+ listenerContainer.setTaskExecutor(taskExecutor);
+ }
+ }
+
+ private void configureDefaultMessageListenerContainer(JmsEndpoint endpoint, DefaultMessageListenerContainer container) {
if (concurrentConsumers >= 0) {
container.setConcurrentConsumers(concurrentConsumers);
}
Modified: camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java?rev=1172794&r1=1172793&r2=1172794&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java (original)
+++ camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java Mon Sep 19 20:16:36 2011
@@ -22,20 +22,23 @@ import org.apache.camel.FailedToCreateCo
import org.apache.camel.Processor;
import org.apache.camel.SuspendableService;
import org.apache.camel.impl.DefaultConsumer;
-import org.springframework.jms.listener.DefaultMessageListenerContainer;
+import org.springframework.jms.listener.AbstractMessageListenerContainer;
import org.springframework.jms.support.JmsUtils;
/**
- * A {@link org.apache.camel.Consumer} which uses Spring's {@link DefaultMessageListenerContainer} implementations to consume JMS messages
+ * A {@link org.apache.camel.Consumer} which uses Spring's {@link AbstractMessageListenerContainer} implementations
+ * to consume JMS messages.
*
- * @version
+ * @version
+ * @see DefaultJmsMessageListenerContainer
+ * @see SimpleJmsMessageListenerContainer
*/
public class JmsConsumer extends DefaultConsumer implements SuspendableService {
- private DefaultMessageListenerContainer listenerContainer;
+ private AbstractMessageListenerContainer listenerContainer;
private EndpointMessageListener messageListener;
private volatile boolean initialized;
- public JmsConsumer(JmsEndpoint endpoint, Processor processor, DefaultMessageListenerContainer listenerContainer) {
+ public JmsConsumer(JmsEndpoint endpoint, Processor processor, AbstractMessageListenerContainer listenerContainer) {
super(endpoint, processor);
this.listenerContainer = listenerContainer;
this.listenerContainer.setMessageListener(getEndpointMessageListener());
@@ -45,7 +48,7 @@ public class JmsConsumer extends Default
return (JmsEndpoint) super.getEndpoint();
}
- public DefaultMessageListenerContainer getListenerContainer() throws Exception {
+ public AbstractMessageListenerContainer getListenerContainer() throws Exception {
if (listenerContainer == null) {
createMessageListenerContainer();
}
Modified: camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?rev=1172794&r1=1172793&r2=1172794&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java (original)
+++ camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java Mon Sep 19 20:16:36 2011
@@ -18,6 +18,7 @@ package org.apache.camel.component.jms;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -53,7 +54,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.task.TaskExecutor;
import org.springframework.jms.core.JmsOperations;
+import org.springframework.jms.listener.AbstractMessageListenerContainer;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
+import org.springframework.jms.listener.SimpleMessageListenerContainer;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.destination.DestinationResolver;
import org.springframework.jmx.export.annotation.ManagedAttribute;
@@ -155,15 +158,15 @@ public class JmsEndpoint extends Default
}
public JmsConsumer createConsumer(Processor processor) throws Exception {
- DefaultMessageListenerContainer listenerContainer = createMessageListenerContainer();
+ AbstractMessageListenerContainer listenerContainer = createMessageListenerContainer();
return createConsumer(processor, listenerContainer);
}
- public DefaultMessageListenerContainer createMessageListenerContainer() throws Exception {
+ public AbstractMessageListenerContainer createMessageListenerContainer() throws Exception {
return configuration.createMessageListenerContainer(this);
}
- public void configureListenerContainer(DefaultMessageListenerContainer listenerContainer, JmsConsumer consumer) {
+ public void configureListenerContainer(AbstractMessageListenerContainer listenerContainer, JmsConsumer consumer) {
if (destinationName != null) {
listenerContainer.setDestinationName(destinationName);
} else if (destination != null) {
@@ -182,13 +185,21 @@ public class JmsEndpoint extends Default
if (log.isDebugEnabled()) {
log.debug("Using custom TaskExecutor: {} on listener container: {}", configuration.getTaskExecutor(), listenerContainer);
}
- listenerContainer.setTaskExecutor(configuration.getTaskExecutor());
+ setContainerTaskExecutor(listenerContainer, configuration.getTaskExecutor());
} else {
// include destination name as part of thread name
String name = "JmsConsumer[" + getEndpointConfiguredDestinationName() + "]";
// use a cached pool as DefaultMessageListenerContainer will throttle pool sizing
ExecutorService executor = getCamelContext().getExecutorServiceStrategy().newCachedThreadPool(consumer, name);
- listenerContainer.setTaskExecutor(executor);
+ setContainerTaskExecutor(listenerContainer, executor);
+ }
+ }
+
+ private void setContainerTaskExecutor(AbstractMessageListenerContainer listenerContainer, Executor executor) {
+ if (listenerContainer instanceof SimpleMessageListenerContainer) {
+ ((SimpleMessageListenerContainer) listenerContainer).setTaskExecutor(executor);
+ } else if (listenerContainer instanceof DefaultMessageListenerContainer) {
+ ((DefaultMessageListenerContainer) listenerContainer).setTaskExecutor(executor);
}
}
@@ -210,7 +221,7 @@ public class JmsEndpoint extends Default
* @return a newly created consumer
* @throws Exception if the consumer cannot be created
*/
- public JmsConsumer createConsumer(Processor processor, DefaultMessageListenerContainer listenerContainer) throws Exception {
+ public JmsConsumer createConsumer(Processor processor, AbstractMessageListenerContainer listenerContainer) throws Exception {
JmsConsumer consumer = new JmsConsumer(this, processor, listenerContainer);
configureListenerContainer(listenerContainer, consumer);
return consumer;
@@ -753,7 +764,7 @@ public class JmsEndpoint extends Default
public void setErrorHandler(ErrorHandler errorHandler) {
getConfiguration().setErrorHandler(errorHandler);
}
-
+
@ManagedAttribute
public void setExplicitQosEnabled(boolean explicitQosEnabled) {
getConfiguration().setExplicitQosEnabled(explicitQosEnabled);
Modified: camel/branches/camel-2.8.x/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java?rev=1172794&r1=1172793&r2=1172794&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java (original)
+++ camel/branches/camel-2.8.x/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java Mon Sep 19 20:16:36 2011
@@ -36,6 +36,7 @@ import org.springframework.jms.core.JmsO
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.listener.AbstractMessageListenerContainer;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
+import org.springframework.jms.listener.SimpleMessageListenerContainer;
import org.springframework.jms.support.converter.SimpleMessageConverter;
import org.springframework.util.ErrorHandler;
@@ -129,6 +130,15 @@ public class JmsEndpointConfigurationTes
}
@Test
+ public void testCreateSimpleMessageListener() throws Exception{
+ JmsEndpoint endpoint = (JmsEndpoint) resolveMandatoryEndpoint("jms:Foo.Bar?consumerType=Simple");
+ JmsConsumer consumer = endpoint.createConsumer(dummyProcessor);
+
+ AbstractMessageListenerContainer container = consumer.getListenerContainer();
+ assertTrue("Should have been a SimpleMessageListenerContainer",container instanceof SimpleMessageListenerContainer);
+ }
+
+ @Test
public void testCacheConsumerEnabledForQueue() throws Exception {
JmsEndpoint endpoint = (JmsEndpoint) resolveMandatoryEndpoint("jms:Foo.Bar");
assertCacheLevel(endpoint, DefaultMessageListenerContainer.CACHE_AUTO);
@@ -161,6 +171,12 @@ public class JmsEndpointConfigurationTes
JmsEndpoint endpoint = (JmsEndpoint) resolveMandatoryEndpoint("jms:queue:Foo?maxConcurrentConsumers=5");
assertEquals(5, endpoint.getMaxConcurrentConsumers());
}
+
+ @Test
+ public void testMaxConcurrentConsumersForSimpleConsumer() throws Exception {
+ JmsEndpoint endpoint = (JmsEndpoint) resolveMandatoryEndpoint("jms:queue:Foo?maxConcurrentConsumers=5&consumerType=Simple");
+ assertEquals(5, endpoint.getMaxConcurrentConsumers());
+ }
@Test
public void testInvalidMaxConcurrentConsumers() throws Exception {
@@ -172,14 +188,40 @@ public class JmsEndpointConfigurationTes
assertEquals("Property maxConcurrentConsumers: 2 must be higher than concurrentConsumers: 5", e.getMessage());
}
}
+
+ @Test
+ public void testInvalidMaxConcurrentConsumersForSimpleConsumer() throws Exception {
+ JmsEndpoint endpoint = (JmsEndpoint) resolveMandatoryEndpoint("jms:queue:Foo?concurrentConsumers=5&maxConcurrentConsumers=2&consumerType=Simple");
+
+ try {
+ endpoint.createConsumer(new CamelLogger());
+ fail("Should have thrown exception");
+ } catch (IllegalArgumentException e) {
+ assertEquals("Property maxConcurrentConsumers: 2 must be higher than concurrentConsumers: 5", e.getMessage());
+ }
+ }
@Test
public void testConcurrentConsumers() throws Exception {
JmsEndpoint endpoint = (JmsEndpoint) resolveMandatoryEndpoint("jms:queue:Foo?concurrentConsumers=4");
assertEquals(4, endpoint.getConcurrentConsumers());
}
+
+ @Test
+ public void testConcurrentConsumersForSimpleConsumer() throws Exception {
+ JmsEndpoint endpoint = (JmsEndpoint) resolveMandatoryEndpoint("jms:queue:Foo?concurrentConsumers=4&consumerType=Simple");
+
+ assertEquals(4, endpoint.getConcurrentConsumers());
+ }
@Test
+ public void testPubSubNoLocalForSimpleConsumer() throws Exception{
+ JmsEndpoint endpoint = (JmsEndpoint) resolveMandatoryEndpoint("jms:queue:Foo?pubSubNoLocal=true&consumerType=Simple");
+
+ assertTrue("PubSubNoLocal should be true", endpoint.isPubSubNoLocal());
+ }
+
+ @Test
public void testIdleTaskExecutionLimit() throws Exception {
JmsEndpoint endpoint = (JmsEndpoint) resolveMandatoryEndpoint("jms:queue:Foo?idleTaskExecutionLimit=50");
assertEquals(50, endpoint.getIdleTaskExecutionLimit());