You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/12/25 08:19:31 UTC

svn commit: r490111 - /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java

Author: rajdavies
Date: Sun Dec 24 23:19:29 2006
New Revision: 490111

URL: http://svn.apache.org/viewvc?view=rev&rev=490111
Log:
it's ensure dispatching happens in order

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?view=diff&rev=490111&r1=490110&r2=490111
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Sun Dec 24 23:19:29 2006
@@ -22,6 +22,7 @@
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
 import org.apache.activemq.broker.Broker;
@@ -60,6 +61,7 @@
     protected long enqueueCounter;
     protected long dispatchCounter;
     protected long dequeueCounter;
+    private AtomicBoolean dispatching = new AtomicBoolean();
     
     public PrefetchSubscription(Broker broker,ConnectionContext context,ConsumerInfo info, PendingMessageCursor cursor)
                     throws  InvalidSelectorException{
@@ -389,31 +391,37 @@
 
 
     protected void dispatchMatched() throws IOException{
-        List toDispatch=null;
-        synchronized(pending){
+        if(dispatching.compareAndSet(false,true)){
             try{
-                pending.reset();
-                while(pending.hasNext()&&!isFull()){
-                    MessageReference node=pending.next();
-                    pending.remove();
-                    // Message may have been sitting in the pending list a while
-                    // waiting for the consumer to ak the message.
-                    if(node!=QueueMessageReference.NULL_MESSAGE&&node.isExpired()){
-                        continue; // just drop it.
+                List toDispatch=null;
+                synchronized(pending){
+                    try{
+                        pending.reset();
+                        while(pending.hasNext()&&!isFull()){
+                            MessageReference node=pending.next();
+                            pending.remove();
+                            // Message may have been sitting in the pending list a while
+                            // waiting for the consumer to ak the message.
+                            if(node!=QueueMessageReference.NULL_MESSAGE&&node.isExpired()){
+                                continue; // just drop it.
+                            }
+                            if(toDispatch==null){
+                                toDispatch=new ArrayList();
+                            }
+                            toDispatch.add(node);
+                        }
+                    }finally{
+                        pending.release();
                     }
-                    if(toDispatch==null){
-                        toDispatch=new ArrayList();
+                }
+                if(toDispatch!=null){
+                    for(int i=0;i<toDispatch.size();i++){
+                        MessageReference node=(MessageReference)toDispatch.get(i);
+                        dispatch(node);
                     }
-                    toDispatch.add(node);
                 }
             }finally{
-                pending.release();
-            }
-        }
-        if(toDispatch!=null){
-            for(int i=0;i<toDispatch.size();i++){
-                MessageReference node=(MessageReference)toDispatch.get(i);
-                dispatch(node);
+                dispatching.set(false);
             }
         }
     }