You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/04/28 21:13:57 UTC

svn commit: r397985 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region: DurableTopicSubscription.java PrefetchSubscription.java QueueBrowserSubscription.java QueueSubscription.java

Author: chirino
Date: Fri Apr 28 12:13:54 2006
New Revision: 397985

URL: http://svn.apache.org/viewcvs?rev=397985&view=rev
Log:
Gaurd access to the pending list better.

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=397985&r1=397984&r2=397985&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Fri Apr 28 12:13:54 2006
@@ -63,9 +63,7 @@
             Topic topic = (Topic) destination;            
             topic.activate(context, this);
         }
-        if( !isFull() ) {
-            dispatchMatched();
-        }
+        dispatchMatched();
     }
    
     synchronized public void activate(ConnectionContext context, ConsumerInfo info) throws Exception {
@@ -79,9 +77,7 @@
                     topic.activate(context, this);
                 }
             }
-            if( !isFull() ) {
-                dispatchMatched();
-            }
+            dispatchMatched();
         }
     }
 
@@ -104,7 +100,9 @@
                 redeliveredMessages.put(node.getMessageId(), new Integer(1));
             }
             if( keepDurableSubsActive ) {
-                pending.addFirst(node);
+            	synchronized(pending) {
+            		pending.addFirst(node);
+            	}
             } else {
                 node.decrementReferenceCount();
             }
@@ -112,11 +110,13 @@
         }
         
         if( !keepDurableSubsActive ) {
-            for (Iterator iter = pending.iterator(); iter.hasNext();) {
-                MessageReference node = (MessageReference) iter.next();
-                node.decrementReferenceCount();
-                iter.remove();
-            }
+        	synchronized(pending) {
+	            for (Iterator iter = pending.iterator(); iter.hasNext();) {
+	                MessageReference node = (MessageReference) iter.next();
+	                node.decrementReferenceCount();
+	                iter.remove();
+	            }
+        	}
         }
         prefetchExtension=0;
     }
@@ -171,7 +171,7 @@
             ", destinations="+destinations.size()+
             ", dispatched="+dispatched.size()+
             ", delivered="+this.prefetchExtension+
-            ", pending="+this.pending.size();
+            ", pending="+getPendingQueueSize();
     }
 
     public String getClientId() {
@@ -186,13 +186,15 @@
      * Release any references that we are holding.
      */
     synchronized public void destroy() {
-        
-        for (Iterator iter = pending.iterator(); iter.hasNext();) {
-            MessageReference node = (MessageReference) iter.next();
-            node.decrementReferenceCount();
-        }
-        pending.clear();
-        
+    	
+    	synchronized(pending) {
+	        for (Iterator iter = pending.iterator(); iter.hasNext();) {
+	            MessageReference node = (MessageReference) iter.next();
+	            node.decrementReferenceCount();
+	        }
+	        pending.clear();
+    	}
+    	
         for (Iterator iter = dispatched.iterator(); iter.hasNext();) {
             MessageReference node = (MessageReference) iter.next();
             node.decrementReferenceCount();

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=397985&r1=397984&r2=397985&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Fri Apr 28 12:13:54 2006
@@ -94,7 +94,6 @@
 
     synchronized public void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception{
         // Handle the standard acknowledgment case.
-        boolean wasFull=isFull();
         if(ack.isStandardAck()){
             // Acknowledge all dispatched messages up till the message id of the acknowledgment.
             int index=0;
@@ -129,9 +128,7 @@
                             prefetchExtension=Math.max(prefetchExtension,index+1);
                         else
                             prefetchExtension=Math.max(0,prefetchExtension-(index+1));
-                        if(wasFull&&!isFull()){
-                            dispatchMatched();
-                        }
+                        dispatchMatched();
                         return;
                     }else{
                         // System.out.println("no match: "+ack.getLastMessageId()+","+messageId);
@@ -147,9 +144,7 @@
                 final MessageReference node=(MessageReference) iter.next();
                 if(ack.getLastMessageId().equals(node.getMessageId())){
                     prefetchExtension=Math.max(prefetchExtension,index+1);
-                    if(wasFull&&!isFull()){
-                        dispatchMatched();
-                    }
+                    dispatchMatched();
                     return;
                 }
             }
@@ -176,9 +171,7 @@
                     acknowledge(context,ack,node);
                     if(ack.getLastMessageId().equals(messageId)){
                         prefetchExtension=Math.max(0,prefetchExtension-(index+1));
-                        if(wasFull&&!isFull()){
-                            dispatchMatched();
-                        }
+                        dispatchMatched();
                         return;
                     }
                 }
@@ -226,8 +219,10 @@
         return (dispatched.size()-prefetchExtension) >= (info.getPrefetchSize() *.9);
     }
     
-    synchronized public int getPendingQueueSize(){
-        return pending.size();
+    public int getPendingQueueSize(){
+    	synchronized(pending) {
+    		return pending.size();
+    	}
     }
     
     synchronized public int getDispatchedQueueSize(){
@@ -312,16 +307,13 @@
     }
 
     synchronized protected void onDispatch(final MessageReference node,final Message message){
-        boolean wasFull=isFull();
         if(node.getRegionDestination()!=null){
             node.getRegionDestination().getDestinationStatistics().onMessageDequeue(message);
             context.getConnection().getStatistics().onMessageDequeue(message);
-            if(wasFull&&!isFull()){
-                try{
-                    dispatchMatched();
-                }catch(IOException e){
-                    context.getConnection().serviceException(e);
-                }
+            try{
+                dispatchMatched();
+            }catch(IOException e){
+                context.getConnection().serviceException(e);
             }
         }
     }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java?rev=397985&r1=397984&r2=397985&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java Fri Apr 28 12:13:54 2006
@@ -47,7 +47,7 @@
             ", destinations="+destinations.size()+
             ", dispatched="+dispatched.size()+
             ", delivered="+this.prefetchExtension+
-            ", pending="+this.pending.size();
+            ", pending="+getPendingQueueSize();
     }
 
     public void browseDone() throws Exception {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java?rev=397985&r1=397984&r2=397985&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java Fri Apr 28 12:13:54 2006
@@ -125,7 +125,7 @@
             ", destinations="+destinations.size()+
             ", dispatched="+dispatched.size()+
             ", delivered="+this.prefetchExtension+
-            ", pending="+this.pending.size();
+            ", pending="+getPendingQueueSize();
     }
 
     public int getLockPriority() {