You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/01/05 21:39:47 UTC

svn commit: r366274 - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/policy/ test/java/org/apache/activemq/broker/

Author: chirino
Date: Thu Jan  5 12:39:41 2006
New Revision: 366274

URL: http://svn.apache.org/viewcvs?rev=366274&view=rev
Log:
We now optimize out persisting persistent messages to a topic if it does not have any durable consumers.

Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=366274&r1=366273&r2=366274&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Thu Jan  5 12:39:41 2006
@@ -120,7 +120,7 @@
                 recovered=true;
                 for (Iterator iter = destinations.iterator(); iter.hasNext();) {
                     Topic topic = (Topic) iter.next();
-                    topic.recover(this, false);
+                    topic.recover(context, this, false);
                 }
             } else {
                 if( !isFull() ) {                            

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=366274&r1=366273&r2=366274&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Thu Jan  5 12:39:41 2006
@@ -21,9 +21,9 @@
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
-import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
 import org.apache.activemq.broker.region.policy.DispatchPolicy;
-import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy;
+import org.apache.activemq.broker.region.policy.FixedCountSubscriptionRecoveryPolicy;
+import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
 import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
 import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -43,6 +43,7 @@
 import org.apache.activemq.util.SubscriptionKey;
 
 import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * The Topic is a destination that sends a copy of a message to every active
@@ -60,10 +61,11 @@
     protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
 
     private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
-    private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy = new LastImageSubscriptionRecoveryPolicy();
+    private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy = new FixedCountSubscriptionRecoveryPolicy();
     private boolean sendAdvisoryIfNoConsumers;
     private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy();
-
+    private AtomicInteger durableSubscriberCounter = new AtomicInteger();
+    
     public Topic(ActiveMQDestination destination, TopicMessageStore store, UsageManager memoryManager, DestinationStatistics parentStats,
             TaskRunnerFactory taskFactory) {
 
@@ -90,19 +92,54 @@
         destinationStatistics.getConsumers().increment();
         sub.add(context, this);
         if (sub.getConsumerInfo().isDurable()) {
-            recover((DurableTopicSubscription) sub, true);
+            recover(context, (DurableTopicSubscription) sub, true);
         }
         else {
-            if (sub.getConsumerInfo().isRetroactive()) {
+            recover(context, sub);
+        }
+    }
+
+    /**
+     * Used to recover the message list non durable subscriptions.  Recovery only happens if the consumer is
+     * retroactive.
+     * 
+     * @param context
+     * @param sub
+     * @throws Throwable
+     */
+    private void recover(ConnectionContext context, final Subscription sub) throws Throwable {
+        if (sub.getConsumerInfo().isRetroactive()) {
+            
+            // synchronize with dispatch method so that no new messages are sent
+            // while we are recovering a subscription to avoid out of order messages.
+            dispatchValve.turnOff();
+            try {
+                
+                synchronized(consumers) {
+                    consumers.add(sub);
+                }
                 subscriptionRecoveryPolicy.recover(context, this, sub);
+                
+            } finally {
+                dispatchValve.turnOn();
             }
+            
+        } else {
             synchronized(consumers) {
                 consumers.add(sub);
             }
         }
     }
 
-    public void recover(final DurableTopicSubscription sub, boolean initialActivation) throws Throwable {
+    /**
+     * Used to recover the message list for a durable subscription.
+     * 
+     * @param context
+     * @param sub
+     * @param initialActivation
+     * @throws Throwable
+     */
+    public void recover(ConnectionContext context, final DurableTopicSubscription sub, boolean initialActivation) throws Throwable {
 
         // synchronize with dispatch method so that no new messages are sent
         // while
@@ -110,9 +147,11 @@
         dispatchValve.turnOff();
         try {
 
+            boolean persistenceWasOptimized = canOptimizeOutPersistence();
             if (initialActivation) {
-                synchronized(consumers) {
+                synchronized(consumers) {                    
                     consumers.add(sub);
+                    durableSubscriberCounter.incrementAndGet();
                 }
             }
 
@@ -160,6 +199,16 @@
                             throw new RuntimeException("Should not be called.");
                         }
                     });
+                    
+                    if( initialActivation && sub.getConsumerInfo().isRetroactive() ) {
+                        // Then use the subscriptionRecoveryPolicy since there will not be any messages in the persistent store.
+                        if( persistenceWasOptimized ) {
+                            subscriptionRecoveryPolicy.recover(context, this, sub);
+                        } else {
+                            // TODO: implement something like
+                            // subscriptionRecoveryPolicy.recoverNonPersistent(context, this, sub);
+                        }
+                    }
                 }
             }
 
@@ -173,6 +222,9 @@
         destinationStatistics.getConsumers().decrement();
         synchronized(consumers) {
             consumers.remove(sub);
+            if( sub.getConsumerInfo().isDurable() ) {
+                durableSubscriberCounter.decrementAndGet();
+            }
         }
         sub.remove(context, this);
     }
@@ -184,7 +236,7 @@
 
         message.setRegionDestination(this);
 
-        if (store != null && message.isPersistent())
+        if (store != null && message.isPersistent() && !canOptimizeOutPersistence() )
             store.addMessage(context, message);
 
         message.incrementReferenceCount();
@@ -207,6 +259,10 @@
             message.decrementReferenceCount();
         }
 
+    }
+
+    private boolean canOptimizeOutPersistence() {
+        return durableSubscriberCounter.get()==0;
     }
 
     public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws IOException {

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java?rev=366274&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java Thu Jan  5 12:39:41 2006
@@ -0,0 +1,96 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.policy;
+
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.Topic;
+import org.apache.activemq.filter.MessageEvaluationContext;
+
+/**
+ * This implementation of {@link SubscriptionRecoveryPolicy} will only keep 
+ * the last message.
+ * 
+ * @org.xbean.XBean
+ * 
+ * @version $Revision$
+ */
+public class FixedCountSubscriptionRecoveryPolicy implements SubscriptionRecoveryPolicy {
+
+    volatile private MessageReference messages[];
+    private int maximumSize=100;
+    private int tail=0;
+
+    synchronized public boolean add(ConnectionContext context, MessageReference node) throws Throwable {
+        messages[tail++] = node;
+        if( tail >= messages.length )
+            tail = 0;
+        return true;
+    }
+
+    synchronized public void recover(ConnectionContext context, Topic topic, Subscription sub) throws Throwable {
+        // Re-dispatch the last message seen.
+        int t = tail;
+        // The buffer may not have rolled over yet..., start from the front
+        if( messages[t]==null )
+            t=0;
+        // Well the buffer is really empty then.
+        if( messages[t]==null )
+            return;
+        
+        // Keep dispatching until t hit's tail again.
+        MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
+        do {            
+             MessageReference node = messages[t];
+             try {
+                 msgContext.setDestination(node.getRegionDestination().getActiveMQDestination());
+                 msgContext.setMessageReference(node);                        
+                 if (sub.matches(node, msgContext)) {
+                     sub.add(node);
+                 }
+             } finally {
+                 msgContext.clear();
+             }
+             t++;
+             if( t >= messages.length )
+                 t = 0;
+        } while( t!=tail );
+        
+    }
+
+    public void start() throws Exception {
+        messages = new MessageReference[maximumSize];
+    }
+
+    public void stop() throws Exception {
+        messages = null;
+    }
+    
+    public int getMaximumSize() {
+        return maximumSize;
+    }
+
+    /**
+     * Sets the maximum number of messages that this destination will hold around in RAM
+     */
+    public void setMaximumSize(int maximumSize) {
+        this.maximumSize = maximumSize;
+    }
+
+
+}

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java?rev=366274&r1=366273&r2=366274&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java Thu Jan  5 12:39:41 2006
@@ -518,27 +518,19 @@
         
         connection1.send(createMessage(producerInfo1, destination, deliveryMode));
         connection1.send(createMessage(producerInfo1, destination, deliveryMode));
+
+        // the behavior is VERY dependent on the recovery policy used.
+        // But the default broker settings try to make it as consistent as possible
         
-        if( deliveryMode == DeliveryMode.NON_PERSISTENT && durableConsumer ) {
-            // Durable subs don't keep non persistent messages around!
-            for( int i=0; i < 2 ; i++ ) {
-                Message m2 = receiveMessage(connection1);
-                assertNotNull(m2);
-            }
-            
-        } else {
-            
-            // Subscription should see all messages sent.
-            Message m2 = receiveMessage(connection1);
+        // Subscription should see all messages sent.
+        Message m2 = receiveMessage(connection1);
+        assertNotNull(m2);
+        assertEquals(m.getMessageId(), m2.getMessageId());
+        for( int i=0; i < 2 ; i++ ) {
+            m2 = receiveMessage(connection1);
             assertNotNull(m2);
-            assertEquals(m.getMessageId(), m2.getMessageId());
-            for( int i=0; i < 2 ; i++ ) {
-                m2 = receiveMessage(connection1);
-                assertNotNull(m2);
-            }
-            
         }
-        
+            
         assertNoMessagesLeft(connection1);
     }