You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by js...@apache.org on 2007/12/03 18:16:53 UTC
svn commit: r600600 - in
/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms:
JmsConfiguration.java JmsEndpoint.java
Author: jstrachan
Date: Mon Dec 3 09:16:52 2007
New Revision: 600600
URL: http://svn.apache.org/viewvc?rev=600600&view=rev
Log:
fix for https://issues.apache.org/activemq/browse/CAMEL-253
Modified:
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java?rev=600600&r1=600599&r2=600600&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java Mon Dec 3 09:16:52 2007
@@ -73,7 +73,7 @@
private int maxMessagesPerTask = 1;
private ServerSessionFactory serverSessionFactory;
private int cacheLevel = -1;
- private String cacheLevelName = "CACHE_CONNECTION";
+ private String cacheLevelName;
private long recoveryInterval = -1;
private long receiveTimeout = -1;
private int idleTaskExecutionLimit = 1;
@@ -233,13 +233,13 @@
return template;
}
- public AbstractMessageListenerContainer createMessageListenerContainer() {
+ public AbstractMessageListenerContainer createMessageListenerContainer(JmsEndpoint endpoint) {
AbstractMessageListenerContainer container = chooseMessageListenerContainerImplementation();
- configureMessageListenerContainer(container);
+ configureMessageListenerContainer(container, endpoint);
return container;
}
- protected void configureMessageListenerContainer(AbstractMessageListenerContainer container) {
+ protected void configureMessageListenerContainer(AbstractMessageListenerContainer container, JmsEndpoint endpoint) {
container.setConnectionFactory(getListenerConnectionFactory());
if (destinationResolver != null) {
container.setDestinationResolver(destinationResolver);
@@ -294,6 +294,9 @@
else if (cacheLevelName != null) {
listenerContainer.setCacheLevelName(cacheLevelName);
}
+ else {
+ listenerContainer.setCacheLevel(defaultCacheLevel(endpoint));
+ }
if (idleTaskExecutionLimit >= 0) {
listenerContainer.setIdleTaskExecutionLimit(idleTaskExecutionLimit);
@@ -350,7 +353,6 @@
}
}
-
public void configure(EndpointMessageListener listener) {
if (isDisableReplyTo()) {
listener.setDisableReplyTo(true);
@@ -390,7 +392,7 @@
/**
* Sets the connection factory to be used for consuming messages via the
- * {@link #createMessageListenerContainer()}
+ * {@link #createMessageListenerContainer(JmsEndpoint)}
*
* @param listenerConnectionFactory the connection factory to use for
* consuming messages
@@ -729,6 +731,32 @@
}
}
+
+ /**
+ * Defaults the JMS cache level if none is explicitly specified.
+ *
+ * Note that due to this
+ * <a href="http://opensource.atlassian.com/projects/spring/browse/SPR-3890">Spring Bug</a>
+ * we cannot use CACHE_CONSUMER by default which we should do as its most efficient.
+ * Instead we use CACHE_CONNECTION - part from for non-durable topics which must use
+ * CACHE_CONSUMER to avoid missing messages (due to the consumer being created and destroyed per message).
+ *
+ * @return
+ * @param endpoint
+ */
+ protected int defaultCacheLevel(JmsEndpoint endpoint) {
+ if (endpoint.isPubSubDomain() && !isSubscriptionDurable()) {
+ // we must cache the consumer or we will miss messages
+ // see https://issues.apache.org/activemq/browse/CAMEL-253
+ return DefaultMessageListenerContainer.CACHE_CONSUMER;
+ }
+ else {
+ // to enable consuming and sending with a single JMS session (to avoid XA) we can only use CACHE_CONNECTION
+ // due to this bug : http://opensource.atlassian.com/projects/spring/browse/SPR-3890
+ return DefaultMessageListenerContainer.CACHE_CONNECTION;
+ }
+ }
+
/**
* Factory method which allows derived classes to customize the lazy
* creation
Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?rev=600600&r1=600599&r2=600600&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java Mon Dec 3 09:16:52 2007
@@ -69,7 +69,7 @@
}
public JmsConsumer createConsumer(Processor processor) throws Exception {
- AbstractMessageListenerContainer listenerContainer = configuration.createMessageListenerContainer();
+ AbstractMessageListenerContainer listenerContainer = configuration.createMessageListenerContainer(this);
return createConsumer(processor, listenerContainer);
}
@@ -183,6 +183,10 @@
*/
public void setRequestTimeout(long requestTimeout) {
this.requestTimeout = requestTimeout;
+ }
+
+ public boolean isPubSubDomain() {
+ return pubSubDomain;
}
// Implementation methods