You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/04/17 17:32:30 UTC

svn commit: r394707 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region: AbstractRegion.java DurableTopicSubscription.java PrefetchSubscription.java QueueSubscription.java Subscription.java TopicSubscription.java

Author: chirino
Date: Mon Apr 17 08:32:28 2006
New Revision: 394707

URL: http://svn.apache.org/viewcvs?rev=394707&view=rev
Log:
If a topic consumer was hung up, it would eventually stop the producers since the broker memory limit would be reached.
The problem was if the consumer was killed, the broker memory would not get freed up and so the producer would remain blocked.
When a subscription is removed, the memory of the pending messages are now released.

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?rev=394707&r1=394706&r2=394707&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java Mon Apr 17 08:32:28 2006
@@ -180,9 +180,11 @@
         }
         
         destroySubscription(sub);
+        
     }
 
     protected void destroySubscription(Subscription sub) {        
+        sub.destroy();
     }
 
     public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=394707&r1=394706&r2=394707&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Mon Apr 17 08:32:28 2006
@@ -181,5 +181,24 @@
     public SubscriptionKey getSubscriptionKey() {
         return subscriptionKey;
     }
+    
+    /**
+     * Release any references that we are holding.
+     */
+    synchronized public void destroy() {
+        
+        for (Iterator iter = pending.iterator(); iter.hasNext();) {
+            MessageReference node = (MessageReference) iter.next();
+            node.decrementReferenceCount();
+        }
+        pending.clear();
+        
+        for (Iterator iter = dispatched.iterator(); iter.hasNext();) {
+            MessageReference node = (MessageReference) iter.next();
+            node.decrementReferenceCount();
+        }
+        dispatched.clear();
+        
+    }
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=394707&r1=394706&r2=394707&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Mon Apr 17 08:32:28 2006
@@ -372,5 +372,4 @@
     protected void acknowledge(ConnectionContext context,final MessageAck ack,final MessageReference node)
                     throws IOException{}
 
-
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java?rev=394707&r1=394706&r2=394707&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java Mon Apr 17 08:32:28 2006
@@ -28,6 +28,7 @@
 import javax.jms.InvalidSelectorException;
 
 import java.io.IOException;
+import java.util.Iterator;
 
 public class QueueSubscription extends PrefetchSubscription implements LockOwner {
     
@@ -184,4 +185,9 @@
         }
     }
     
+    /**
+     */
+    synchronized public void destroy() {        
+    }
+
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java?rev=394707&r1=394706&r2=394707&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java Mon Apr 17 08:32:28 2006
@@ -170,4 +170,10 @@
      *
      */
     public void optimizePrefetch();
+    
+    /**
+     * Called when the subscription is destroyed.
+     */
+    public void destroy();
+
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=394707&r1=394706&r2=394707&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java Mon Apr 17 08:32:28 2006
@@ -53,7 +53,8 @@
     private MessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy();
     private int discarded = 0;
     private final Object matchedListMutex=new Object();
-    long enqueueCounter;
+    private final AtomicLong enqueueCounter = new AtomicLong(0);
+    private final AtomicLong dequeueCounter = new AtomicLong(0);
     
     public TopicSubscription(Broker broker,ConnectionContext context,ConsumerInfo info,UsageManager usageManager)
                     throws InvalidSelectorException{
@@ -62,8 +63,10 @@
     }
 
     public void add(MessageReference node) throws InterruptedException,IOException{
-        enqueueCounter++;
+        
+        enqueueCounter.incrementAndGet();        
         node.incrementReferenceCount();
+        
         if(!isFull()&&!isSlaveBroker()){
             optimizePrefetch();
             // if maximumPendingMessages is set we will only discard messages which
@@ -131,6 +134,7 @@
     }
 
     synchronized public void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception{
+        
         // Handle the standard acknowledgment case.
         boolean wasFull=isFull();
         if(ack.isStandardAck()||ack.isPoisonAck()){
@@ -138,11 +142,13 @@
                 delivered.addAndGet(ack.getMessageCount());
                 context.getTransaction().addSynchronization(new Synchronization(){
                     public void afterCommit() throws Exception{
+                        dequeueCounter.addAndGet(ack.getMessageCount());
                         dispatched.addAndGet(-ack.getMessageCount());
                         delivered.set(Math.max(0,delivered.get()-ack.getMessageCount()));
                     }
                 });
             }else{
+                dequeueCounter.addAndGet(ack.getMessageCount());
                 dispatched.addAndGet(-ack.getMessageCount());
                 delivered.set(Math.max(0,delivered.get()-ack.getMessageCount()));
             }
@@ -178,14 +184,13 @@
 	}
 
 	public long getEnqueueCounter() {
-		return enqueueCounter;
+		return enqueueCounter.get();
 	}
+    
     public long getDequeueCounter(){
-        return delivered.get();
+        return dequeueCounter.get();
     }
 
-
-
     /**
      * @return the number of messages discarded due to being a slow consumer
      */
@@ -313,6 +318,16 @@
     public String toString(){
         return "TopicSubscription:"+" consumer="+info.getConsumerId()+", destinations="+destinations.size()
                         +", dispatched="+getDispatchedQueueSize()+", delivered="+getDequeueCounter()+", matched="+matched()+", discarded="+discarded();
+    }
+
+    public void destroy() {
+        synchronized(matchedListMutex){
+            for (Iterator iter = matched.iterator(); iter.hasNext();) {
+                MessageReference node = (MessageReference) iter.next();
+                node.decrementReferenceCount();
+            }
+            matched.clear();
+        }
     }
 
 }