You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/12/25 08:19:31 UTC
svn commit: r490111 -
/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Author: rajdavies
Date: Sun Dec 24 23:19:29 2006
New Revision: 490111
URL: http://svn.apache.org/viewvc?view=rev&rev=490111
Log:
it's ensure dispatching happens in order
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?view=diff&rev=490111&r1=490110&r2=490111
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Sun Dec 24 23:19:29 2006
@@ -22,6 +22,7 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import org.apache.activemq.broker.Broker;
@@ -60,6 +61,7 @@
protected long enqueueCounter;
protected long dispatchCounter;
protected long dequeueCounter;
+ private AtomicBoolean dispatching = new AtomicBoolean();
public PrefetchSubscription(Broker broker,ConnectionContext context,ConsumerInfo info, PendingMessageCursor cursor)
throws InvalidSelectorException{
@@ -389,31 +391,37 @@
protected void dispatchMatched() throws IOException{
- List toDispatch=null;
- synchronized(pending){
+ if(dispatching.compareAndSet(false,true)){
try{
- pending.reset();
- while(pending.hasNext()&&!isFull()){
- MessageReference node=pending.next();
- pending.remove();
- // Message may have been sitting in the pending list a while
- // waiting for the consumer to ak the message.
- if(node!=QueueMessageReference.NULL_MESSAGE&&node.isExpired()){
- continue; // just drop it.
+ List toDispatch=null;
+ synchronized(pending){
+ try{
+ pending.reset();
+ while(pending.hasNext()&&!isFull()){
+ MessageReference node=pending.next();
+ pending.remove();
+ // Message may have been sitting in the pending list a while
+ // waiting for the consumer to ak the message.
+ if(node!=QueueMessageReference.NULL_MESSAGE&&node.isExpired()){
+ continue; // just drop it.
+ }
+ if(toDispatch==null){
+ toDispatch=new ArrayList();
+ }
+ toDispatch.add(node);
+ }
+ }finally{
+ pending.release();
}
- if(toDispatch==null){
- toDispatch=new ArrayList();
+ }
+ if(toDispatch!=null){
+ for(int i=0;i<toDispatch.size();i++){
+ MessageReference node=(MessageReference)toDispatch.get(i);
+ dispatch(node);
}
- toDispatch.add(node);
}
}finally{
- pending.release();
- }
- }
- if(toDispatch!=null){
- for(int i=0;i<toDispatch.size();i++){
- MessageReference node=(MessageReference)toDispatch.get(i);
- dispatch(node);
+ dispatching.set(false);
}
}
}