You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by js...@apache.org on 2007/12/03 18:16:53 UTC

svn commit: r600600 - in /activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms: JmsConfiguration.java JmsEndpoint.java

Author: jstrachan
Date: Mon Dec  3 09:16:52 2007
New Revision: 600600

URL: http://svn.apache.org/viewvc?rev=600600&view=rev
Log:
fix for https://issues.apache.org/activemq/browse/CAMEL-253

Modified:
    activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
    activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java

Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java?rev=600600&r1=600599&r2=600600&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java Mon Dec  3 09:16:52 2007
@@ -73,7 +73,7 @@
     private int maxMessagesPerTask = 1;
     private ServerSessionFactory serverSessionFactory;
     private int cacheLevel = -1;
-    private String cacheLevelName = "CACHE_CONNECTION";
+    private String cacheLevelName;
     private long recoveryInterval = -1;
     private long receiveTimeout = -1;
     private int idleTaskExecutionLimit = 1;
@@ -233,13 +233,13 @@
         return template;
     }
 
-    public AbstractMessageListenerContainer createMessageListenerContainer() {
+    public AbstractMessageListenerContainer createMessageListenerContainer(JmsEndpoint endpoint) {
         AbstractMessageListenerContainer container = chooseMessageListenerContainerImplementation();
-        configureMessageListenerContainer(container);
+        configureMessageListenerContainer(container, endpoint);
         return container;
     }
 
-    protected void configureMessageListenerContainer(AbstractMessageListenerContainer container) {
+    protected void configureMessageListenerContainer(AbstractMessageListenerContainer container, JmsEndpoint endpoint) {
         container.setConnectionFactory(getListenerConnectionFactory());
         if (destinationResolver != null) {
             container.setDestinationResolver(destinationResolver);
@@ -294,6 +294,9 @@
             else if (cacheLevelName != null) {
                 listenerContainer.setCacheLevelName(cacheLevelName);
             }
+            else {
+                listenerContainer.setCacheLevel(defaultCacheLevel(endpoint));
+            }
 
             if (idleTaskExecutionLimit >= 0) {
                 listenerContainer.setIdleTaskExecutionLimit(idleTaskExecutionLimit);
@@ -350,7 +353,6 @@
         }
     }
 
-
     public void configure(EndpointMessageListener listener) {
         if (isDisableReplyTo()) {
             listener.setDisableReplyTo(true);
@@ -390,7 +392,7 @@
 
     /**
      * Sets the connection factory to be used for consuming messages via the
-     * {@link #createMessageListenerContainer()}
+     * {@link #createMessageListenerContainer(JmsEndpoint)}
      *
      * @param listenerConnectionFactory the connection factory to use for
      *                                  consuming messages
@@ -729,6 +731,32 @@
         }
     }
 
+
+    /**
+     * Defaults the JMS cache level if none is explicitly specified.
+     *
+     * Note that due to this
+     * <a href="http://opensource.atlassian.com/projects/spring/browse/SPR-3890">Spring Bug</a>
+     * we cannot use CACHE_CONSUMER by default which we should do as its most efficient.
+     * Instead we use CACHE_CONNECTION - part from for non-durable topics which must use
+     * CACHE_CONSUMER to avoid missing messages (due to the consumer being created and destroyed per message).
+     *
+     * @return
+     * @param endpoint
+     */
+    protected int defaultCacheLevel(JmsEndpoint endpoint) {
+        if (endpoint.isPubSubDomain() && !isSubscriptionDurable()) {
+            // we must cache the consumer or we will miss messages
+            // see https://issues.apache.org/activemq/browse/CAMEL-253
+            return DefaultMessageListenerContainer.CACHE_CONSUMER;
+        }
+        else {
+            // to enable consuming and sending with a single JMS session (to avoid XA) we can only use CACHE_CONNECTION
+            // due to this bug : http://opensource.atlassian.com/projects/spring/browse/SPR-3890
+            return DefaultMessageListenerContainer.CACHE_CONNECTION;
+        }
+    }
+    
     /**
      * Factory method which allows derived classes to customize the lazy
      * creation

Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?rev=600600&r1=600599&r2=600600&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java Mon Dec  3 09:16:52 2007
@@ -69,7 +69,7 @@
     }
 
     public JmsConsumer createConsumer(Processor processor) throws Exception {
-        AbstractMessageListenerContainer listenerContainer = configuration.createMessageListenerContainer();
+        AbstractMessageListenerContainer listenerContainer = configuration.createMessageListenerContainer(this);
         return createConsumer(processor, listenerContainer);
     }
 
@@ -183,6 +183,10 @@
      */
     public void setRequestTimeout(long requestTimeout) {
         this.requestTimeout = requestTimeout;
+    }
+
+    public boolean isPubSubDomain() {
+        return pubSubDomain;
     }
 
     // Implementation methods