You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2010/06/25 12:28:19 UTC

svn commit: r957881 [1/2] - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: ./ broker/jmx/ broker/region/ broker/region/cursors/ broker/region/policy/ broker/util/ command/ memory/list/ plugin/

Author: rajdavies
Date: Fri Jun 25 10:28:17 2010
New Revision: 957881

URL: http://svn.apache.org/viewvc?rev=957881&view=rev
Log:
changes for https://issues.apache.org/activemq/browse/AMQ-2791

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingNode.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/SimplePriorityMessageDispatchChannel.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingSubscriberMessageStoragePolicy.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingSubscriberMessageStoragePolicy.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingSubscriberMessageStoragePolicy.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/LoggingBrokerPlugin.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/list/SimpleMessageList.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/DiscardingDLQBroker.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/SimplePriorityMessageDispatchChannel.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/SimplePriorityMessageDispatchChannel.java?rev=957881&r1=957880&r2=957881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/SimplePriorityMessageDispatchChannel.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/SimplePriorityMessageDispatchChannel.java Fri Jun 25 10:28:17 2010
@@ -22,7 +22,7 @@ import java.util.List;
 import org.apache.activemq.command.MessageDispatch;
 
 public class SimplePriorityMessageDispatchChannel implements MessageDispatchChannel {
-    private static Integer MAX_PRIORITY = 10;
+    private static final Integer MAX_PRIORITY = 10;
     private final Object mutex = new Object();
     private final LinkedList<MessageDispatch>[] lists;
     private boolean closed;
@@ -234,7 +234,7 @@ public class SimplePriorityMessageDispat
     }
 
     protected int getPriority(MessageDispatch message) {
-        int priority = Message.DEFAULT_PRIORITY;
+        int priority = javax.jms.Message.DEFAULT_PRIORITY;
         if (message.getMessage() != null) {
         Math.max(message.getMessage().getPriority(), 0);
         priority = Math.min(priority, 9);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java?rev=957881&r1=957880&r2=957881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java Fri Jun 25 10:28:17 2010
@@ -16,19 +16,6 @@
  */
 package org.apache.activemq.broker.jmx;
 
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
-import org.apache.activemq.broker.region.Destination;
-import org.apache.activemq.broker.region.Subscription;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.ActiveMQTextMessage;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.filter.BooleanExpression;
-import org.apache.activemq.filter.MessageEvaluationContext;
-import org.apache.activemq.selector.SelectorParser;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -48,6 +35,19 @@ import javax.management.openmbean.OpenDa
 import javax.management.openmbean.TabularData;
 import javax.management.openmbean.TabularDataSupport;
 import javax.management.openmbean.TabularType;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.filter.BooleanExpression;
+import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.selector.SelectorParser;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 public class DestinationView implements DestinationViewMBean {
     private static final Log LOG = LogFactory.getLog(DestinationViewMBean.class);
@@ -126,6 +126,10 @@ public class DestinationView implements 
     public long getMinEnqueueTime() {
         return destination.getDestinationStatistics().getProcessTime().getMinTime();
     }
+    
+    public boolean isPrioritizedMessages() {
+        return destination.isPrioritizedMessages();
+    }
 
     public CompositeData[] browse() throws OpenDataException {
         try {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java?rev=957881&r1=957880&r2=957881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java Fri Jun 25 10:28:17 2010
@@ -16,16 +16,15 @@
  */
 package org.apache.activemq.broker.jmx;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
-import java.io.IOException;
-
 import javax.jms.InvalidSelectorException;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
 import javax.management.openmbean.CompositeData;
 import javax.management.openmbean.OpenDataException;
 import javax.management.openmbean.TabularData;
-import javax.management.ObjectName;
-import javax.management.MalformedObjectNameException;
 
 public interface DestinationViewMBean {
 
@@ -314,6 +313,12 @@ public interface DestinationViewMBean {
     public boolean isUseCache();
     
     /**
+     * @return true if prioritized messages are enabled for the destination
+     */
+    @MBeanInfo("Prioritized messages is enabled")
+    public boolean isPrioritizedMessages();
+    
+    /**
      * @param value
      * enable/disable caching on the destination
      */

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=957881&r1=957880&r2=957881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java Fri Jun 25 10:28:17 2010
@@ -81,6 +81,7 @@ public abstract class BaseDestination im
     protected int cursorMemoryHighWaterMark = 70;
     protected int storeUsageHighWaterMark = 100;
     private SlowConsumerStrategy slowConsumerStrategy;
+    private boolean prioritizedMessages;
 
     /**
      * @param broker
@@ -580,5 +581,14 @@ public abstract class BaseDestination im
     public void setSlowConsumerStrategy(SlowConsumerStrategy slowConsumerStrategy) {
         this.slowConsumerStrategy = slowConsumerStrategy;
     }
+
+   
+    public boolean isPrioritizedMessages() {
+        return this.prioritizedMessages;
+    }
+
+    public void setPrioritizedMessages(boolean prioritizedMessages) {
+        this.prioritizedMessages = prioritizedMessages;
+    }
     
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java?rev=957881&r1=957880&r2=957881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java Fri Jun 25 10:28:17 2010
@@ -18,14 +18,12 @@ package org.apache.activemq.broker.regio
 
 import java.io.IOException;
 import java.util.List;
-
 import org.apache.activemq.Service;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
 import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
 import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatchNotification;
@@ -215,4 +213,6 @@ public interface Destination extends Ser
      * @throws Exception
      */
     void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception;
+    
+    boolean isPrioritizedMessages();
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java?rev=957881&r1=957880&r2=957881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java Fri Jun 25 10:28:17 2010
@@ -19,7 +19,6 @@ package org.apache.activemq.broker.regio
 import java.io.IOException;
 import java.util.List;
 import java.util.Set;
-
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.ProducerBrokerExchange;
@@ -39,7 +38,7 @@ import org.apache.activemq.usage.Usage;
  */
 public class DestinationFilter implements Destination {
 
-    private Destination next;
+    private final Destination next;
 
     public DestinationFilter(Destination next) {
         this.next = next;
@@ -270,4 +269,8 @@ public class DestinationFilter implement
     public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
         next.setCursorMemoryHighWaterMark(cursorMemoryHighWaterMark);
     }
+
+    public boolean isPrioritizedMessages() {
+        return next.isPrioritizedMessages();
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java?rev=957881&r1=957880&r2=957881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java Fri Jun 25 10:28:17 2010
@@ -16,8 +16,6 @@
  */
 package org.apache.activemq.broker.region;
 
-import java.io.IOException;
-
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageId;
@@ -33,7 +31,7 @@ public interface MessageReference {
     
     MessageId getMessageId();
     Message getMessageHardRef();
-    Message getMessage() throws IOException;
+    Message getMessage();
     boolean isPersistent();
     
     Destination getRegionDestination();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java?rev=957881&r1=957880&r2=957881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java Fri Jun 25 10:28:17 2010
@@ -16,8 +16,6 @@
  */
 package org.apache.activemq.broker.region;
 
-import java.io.IOException;
-
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.Message;
@@ -28,7 +26,7 @@ import org.apache.activemq.command.Messa
  */
 final class NullMessageReference implements QueueMessageReference {
 
-    private ActiveMQMessage message = new ActiveMQMessage();
+    private final ActiveMQMessage message = new ActiveMQMessage();
     private volatile int references;
 
     public void drop() {
@@ -75,7 +73,7 @@ final class NullMessageReference impleme
         return 0;
     }
 
-    public Message getMessage() throws IOException {
+    public Message getMessage()  {
         return message;
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=957881&r1=957880&r2=957881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Fri Jun 25 10:28:17 2010
@@ -78,7 +78,7 @@ public abstract class PrefetchSubscripti
     }
 
     public PrefetchSubscription(Broker broker,SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
-        this(broker,usageManager,context, info, new VMPendingMessageCursor());
+        this(broker,usageManager,context, info, new VMPendingMessageCursor(false));
     }
 
     /**

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=957881&r1=957880&r2=957881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Fri Jun 25 10:28:17 2010
@@ -236,7 +236,7 @@ public class Queue extends BaseDestinati
     public void initialize() throws Exception {
         if (this.messages == null) {
             if (destination.isTemporary() || broker == null || store == null) {
-                this.messages = new VMPendingMessageCursor();
+                this.messages = new VMPendingMessageCursor(isPrioritizedMessages());
             } else {
                 this.messages = new StoreQueueCursor(broker, this);
             }
@@ -951,38 +951,30 @@ public class Queue extends BaseDestinati
 
     public Message getMessage(String id) {
         MessageId msgId = new MessageId(id);
-        try {
-            synchronized (pagedInMessages) {
-                QueueMessageReference r = this.pagedInMessages.get(msgId);
-                if (r != null) {
-                    return r.getMessage();
-                }
+        synchronized (pagedInMessages) {
+            QueueMessageReference r = this.pagedInMessages.get(msgId);
+            if (r != null) {
+                return r.getMessage();
             }
-            synchronized (messages) {
-                try {
-                    messages.reset();
-                    while (messages.hasNext()) {
-                        try {
-                            MessageReference r = messages.next();
-                            r.decrementReferenceCount();
-                            messages.rollback(r.getMessageId());
-                            if (msgId.equals(r.getMessageId())) {
-                                Message m = r.getMessage();
-                                if (m != null) {
-                                    return m;
-                                }
-                                break;
-                            }
-                        } catch (IOException e) {
-                            LOG.error("got an exception retrieving message " + id);
+        }
+        synchronized (messages) {
+            try {
+                messages.reset();
+                while (messages.hasNext()) {
+                    MessageReference r = messages.next();
+                    r.decrementReferenceCount();
+                    messages.rollback(r.getMessageId());
+                    if (msgId.equals(r.getMessageId())) {
+                        Message m = r.getMessage();
+                        if (m != null) {
+                            return m;
                         }
+                        break;
                     }
-                } finally {
-                    messages.release();
                 }
+            } finally {
+                messages.release();
             }
-        } catch (IOException e) {
-            LOG.error("got an exception retrieving message " + id);
         }
         return null;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java?rev=957881&r1=957880&r2=957881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java Fri Jun 25 10:28:17 2010
@@ -56,7 +56,7 @@ public class TempQueue extends Queue{
     
     @Override
     public void initialize() throws Exception {
-        this.messages=new VMPendingMessageCursor();
+        this.messages=new VMPendingMessageCursor(false);
         this.messages.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
         this.systemUsage = brokerService.getSystemUsage();
         memoryUsage.setParent(systemUsage.getMemoryUsage());           

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=957881&r1=957880&r2=957881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java Fri Jun 25 10:28:17 2010
@@ -72,9 +72,9 @@ public class TopicSubscription extends A
         this.usageManager = usageManager;
         String matchedName = "TopicSubscription:" + CURSOR_NAME_COUNTER.getAndIncrement() + "[" + info.getConsumerId().toString() + "]";
         if (info.getDestination().isTemporary() || broker == null || broker.getTempDataStore()==null ) {
-            this.matched = new VMPendingMessageCursor();
+            this.matched = new VMPendingMessageCursor(false);
         } else {
-            this.matched = new FilePendingMessageCursor(broker,matchedName);
+            this.matched = new FilePendingMessageCursor(broker,matchedName,false);
         }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?rev=957881&r1=957880&r2=957881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java Fri Jun 25 10:28:17 2010
@@ -19,11 +19,14 @@ package org.apache.activemq.broker.regio
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Set;
 import org.apache.activemq.ActiveMQMessageAudit;
+import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.BaseDestination;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.usage.SystemUsage;
 
@@ -44,6 +47,11 @@ public abstract class AbstractPendingMes
     protected boolean useCache=true;
     private boolean started=false;
     protected MessageReference last = null;
+    protected final boolean prioritizedMessages;
+    
+    public AbstractPendingMessageCursor(boolean prioritizedMessages) {
+        this.prioritizedMessages=prioritizedMessages;
+    }
   
 
     public synchronized void start() throws Exception  {
@@ -304,4 +312,19 @@ public abstract class AbstractPendingMes
     protected synchronized boolean isStarted() {
         return started;
     }
+    
+    public static boolean isPrioritizedMessageSubscriber(Broker broker,Subscription sub) {
+        boolean result = false;
+        Set<Destination> destinations = broker.getDestinations(sub.getActiveMQDestination());
+        if (destinations != null) {
+            for (Destination dest:destinations) {
+                if (dest.isPrioritizedMessages()) {
+                    result = true;
+                    break;
+                }
+            }
+        }
+        return result;
+
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=957881&r1=957880&r2=957881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java Fri Jun 25 10:28:17 2010
@@ -17,8 +17,6 @@
 package org.apache.activemq.broker.region.cursors;
 
 import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.Map.Entry;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.command.Message;
@@ -34,8 +32,8 @@ import org.apache.commons.logging.LogFac
 public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor implements MessageRecoveryListener {
     private static final Log LOG = LogFactory.getLog(AbstractStoreCursor.class);
     protected final Destination regionDestination;
-    private final LinkedHashMap<MessageId,Message> batchList = new LinkedHashMap<MessageId,Message> ();
-    private Iterator<Entry<MessageId, Message>> iterator = null;
+    private final PendingList batchList;
+    private Iterator<MessageReference> iterator = null;
     private boolean cacheEnabled=false;
     protected boolean batchResetNeeded = true;
     protected boolean storeHasMessages = false;
@@ -43,10 +41,16 @@ public abstract class AbstractStoreCurso
     private MessageId lastCachedId;
     
     protected AbstractStoreCursor(Destination destination) {
+        super((destination != null ? destination.isPrioritizedMessages():false));
         this.regionDestination=destination;
+        if (this.prioritizedMessages) {
+            this.batchList= new PrioritizedPendingList();
+        }else {
+            this.batchList = new OrderedPendingList();
+        }
     }
     
-    @Override
+    
     public final synchronized void start() throws Exception{
         if (!isStarted()) {
             super.start();
@@ -60,7 +64,7 @@ public abstract class AbstractStoreCurso
         } 
     }
     
-    @Override
+    
     public final synchronized void stop() throws Exception {
         resetBatch();
         super.stop();
@@ -82,7 +86,7 @@ public abstract class AbstractStoreCurso
                 }
             }
             message.incrementReferenceCount();
-            batchList.put(message.getMessageId(), message);
+            batchList.addMessageLast(message);
             clearIterator(true);
             recovered = true;
         } else {
@@ -99,7 +103,7 @@ public abstract class AbstractStoreCurso
         return recovered;
     }
     
-    @Override
+    
     public final void reset() {
         if (batchList.isEmpty()) {
             try {
@@ -113,7 +117,7 @@ public abstract class AbstractStoreCurso
         size();
     }
     
-    @Override
+    
     public synchronized void release() {
         clearIterator(false);
     }
@@ -129,7 +133,7 @@ public abstract class AbstractStoreCurso
     
     private synchronized void ensureIterator() {
         if(this.iterator==null) {
-            this.iterator=this.batchList.entrySet().iterator();
+            this.iterator=this.batchList.iterator();
         }
     }
 
@@ -137,7 +141,7 @@ public abstract class AbstractStoreCurso
     public final void finished() {
     }
         
-    @Override
+    
     public final synchronized boolean hasNext() {
         if (batchList.isEmpty()) {
             try {
@@ -151,11 +155,11 @@ public abstract class AbstractStoreCurso
         return this.iterator.hasNext();
     }
     
-    @Override
+    
     public final synchronized MessageReference next() {
         MessageReference result = null;
         if (!this.batchList.isEmpty()&&this.iterator.hasNext()) {
-            result = this.iterator.next().getValue();
+            result = this.iterator.next();
         }
         last = result;
         if (result != null) {
@@ -164,7 +168,7 @@ public abstract class AbstractStoreCurso
         return result;
     }
     
-    @Override
+    
     public final synchronized void addMessageLast(MessageReference node) throws Exception {
         if (cacheEnabled && hasSpace()) {
             recoverMessage(node.getMessage(),true);
@@ -189,13 +193,13 @@ public abstract class AbstractStoreCurso
     protected void setBatch(MessageId messageId) throws Exception {
     }
 
-    @Override
+    
     public final synchronized void addMessageFirst(MessageReference node) throws Exception {
         cacheEnabled=false;
         size++;
     }
 
-    @Override
+    
     public final synchronized void remove() {
         size--;
         if (iterator!=null) {
@@ -212,21 +216,22 @@ public abstract class AbstractStoreCurso
         }
     }
 
-    @Override
+    
     public final synchronized void remove(MessageReference node) {
         size--;
         cacheEnabled=false;
-        batchList.remove(node.getMessageId());
+        batchList.remove(node);
     }
     
-    @Override
+    
     public final synchronized void clear() {
         gc();
     }
     
-    @Override
+    
     public final synchronized void gc() {
-        for (Message msg : batchList.values()) {
+        for (Iterator<MessageReference>i = batchList.iterator();i.hasNext();) {
+            MessageReference msg = i.next();
             rollback(msg.getMessageId());
             msg.decrementReferenceCount();
         }
@@ -241,7 +246,7 @@ public abstract class AbstractStoreCurso
         }
     }
     
-    @Override
+    
     protected final synchronized void fillBatch() {
         if (batchResetNeeded) {
             resetBatch();
@@ -261,18 +266,18 @@ public abstract class AbstractStoreCurso
         }
     }
     
-    @Override
+    
     public final synchronized boolean isEmpty() {
         // negative means more messages added to store through queue.send since last reset
         return size == 0;
     }
 
-    @Override
+    
     public final synchronized boolean hasMessagesBufferedToDeliver() {
         return !batchList.isEmpty();
     }
 
-    @Override
+    
     public final synchronized int size() {
         if (size < 0) {
             this.size = getStoreSize();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?rev=957881&r1=957880&r2=957881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java Fri Jun 25 10:28:17 2010
@@ -63,9 +63,11 @@ public class FilePendingMessageCursor ex
     /**
      * @param broker
      * @param name
+     * @param prioritizedMessages 
      * @param store
      */
-    public FilePendingMessageCursor(Broker broker, String name) {
+    public FilePendingMessageCursor(Broker broker, String name, boolean prioritizedMessages) {
+        super(prioritizedMessages);
         this.useCache = false;
         this.broker = broker;
         // the store can be null if the BrokerService has persistence
@@ -190,6 +192,7 @@ public class FilePendingMessageCursor ex
         tryAddMessageLast(node, 0);
     }
     
+    @Override
     public synchronized boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception {
         if (!node.isExpired()) {
             try {

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java?rev=957881&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java Fri Jun 25 10:28:17 2010
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.cursors;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.command.MessageId;
+
+public class OrderedPendingList implements PendingList {
+    PendingNode root = null;
+    PendingNode tail = null;
+    final Map<MessageId, PendingNode> map = new HashMap<MessageId, PendingNode>();
+
+    public PendingNode addMessageFirst(MessageReference message) {
+        PendingNode node = new PendingNode(this, message);
+        if (root == null) {
+            root = node;
+            tail = node;
+        } else {
+            root.linkBefore(node);
+        }
+        this.map.put(message.getMessageId(), node);
+        return node;
+    }
+
+    public PendingNode addMessageLast(MessageReference message) {
+        PendingNode node = new PendingNode(this, message);
+        if (root == null) {
+            root = node;
+        } else {
+            tail.linkAfter(node);
+        }
+        tail = node;
+        this.map.put(message.getMessageId(), node);
+        return node;
+    }
+
+    public void clear() {
+        this.root = null;
+        this.tail = null;
+        this.map.clear();
+    }
+
+    public boolean isEmpty() {
+        return this.map.isEmpty();
+    }
+
+    public Iterator<MessageReference> iterator() {
+        return new Iterator<MessageReference>() {
+            private PendingNode current = null;
+            private PendingNode next = root;
+
+            public boolean hasNext() {
+                return next != null;
+            }
+
+            public MessageReference next() {
+                MessageReference result = null;
+                this.current = this.next;
+                result = this.current.getMessage();
+                this.next = (PendingNode) this.next.getNext();
+                return result;
+            }
+
+            public void remove() {
+                if (this.current != null && this.current.getMessage() != null) {
+                    map.remove(this.current.getMessage().getMessageId());
+                }
+                removeNode(this.current);
+            }
+        };
+    }
+
+    public void remove(MessageReference message) {
+        if (message != null) {
+            PendingNode node = this.map.remove(message.getMessageId());
+            removeNode(node);
+        }
+    }
+
+    public int size() {
+        return this.map.size();
+    }
+
+    void removeNode(PendingNode node) {
+        if (node != null) {
+            map.remove(node.getMessage().getMessageId());
+            if (root == node) {
+                root = (PendingNode) node.getNext();
+            }
+            if (tail == node) {
+                tail = (PendingNode) node.getPrevious();
+            }
+            node.unlink();
+        }
+    }
+
+    List<PendingNode> getAsList() {
+        List<PendingNode> result = new ArrayList<PendingNode>(size());
+        PendingNode node = root;
+        while (node != null) {
+            result.add(node);
+            node = (PendingNode) node.getNext();
+        }
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "OrderedPendingList(" + System.identityHashCode(this) + ")";
+    }
+
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java?rev=957881&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java Fri Jun 25 10:28:17 2010
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.cursors;
+
+import java.util.Iterator;
+import org.apache.activemq.broker.region.MessageReference;
+
+public interface PendingList {
+    
+    public boolean isEmpty();
+    public void clear();
+    public PendingNode addMessageFirst(MessageReference message);
+    public PendingNode addMessageLast(MessageReference message);
+    public void remove(MessageReference message);
+    public int size();
+    public Iterator<MessageReference> iterator();
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingNode.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingNode.java?rev=957881&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingNode.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingNode.java Fri Jun 25 10:28:17 2010
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.cursors;
+
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.util.LinkedNode;
+
+public class PendingNode extends LinkedNode {
+    private final MessageReference message;
+    private final OrderedPendingList list;
+    public PendingNode(OrderedPendingList list,MessageReference message) {
+        this.list = list;
+        this.message = message;
+    }
+
+    MessageReference getMessage() {
+        return this.message;
+    }
+    
+    OrderedPendingList getList() {
+        return this.list;
+    }
+    
+    @Override
+    public String toString() {
+        PendingNode n = (PendingNode) getNext();
+        String str = "PendingNode(";
+        str += System.identityHashCode(this) + "),root="+isHeadNode()+",next="+(n != null ?System.identityHashCode(n):"NULL");
+        return str;
+    }
+
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingNode.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingNode.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java?rev=957881&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java Fri Jun 25 10:28:17 2010
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.cursors;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.command.MessageId;
+
+public class PrioritizedPendingList implements PendingList {
+    static final Integer MAX_PRIORITY = 10;
+    private final OrderedPendingList[] lists = new OrderedPendingList[MAX_PRIORITY];
+    final Map<MessageId, PendingNode> map = new HashMap<MessageId, PendingNode>();
+
+    public PrioritizedPendingList() {
+        for (int i = 0; i < MAX_PRIORITY; i++) {
+            this.lists[i] = new OrderedPendingList();
+        }
+    }
+    public PendingNode addMessageFirst(MessageReference message) {
+        PendingNode node = getList(message).addMessageFirst(message);
+        this.map.put(message.getMessageId(), node);
+        return node;
+    }
+
+    public PendingNode addMessageLast(MessageReference message) {
+        PendingNode node = getList(message).addMessageLast(message);
+        this.map.put(message.getMessageId(), node);
+        return node;
+    }
+
+    public void clear() {
+        for (int i = 0; i < MAX_PRIORITY; i++) {
+            this.lists[i].clear();
+        }
+        this.map.clear();
+    }
+
+    public boolean isEmpty() {
+        return this.map.isEmpty();
+    }
+
+    public Iterator<MessageReference> iterator() {
+        return new PrioritizedPendingListIterator();
+    }
+
+    public void remove(MessageReference message) {
+        if (message != null) {
+            PendingNode node = this.map.remove(message.getMessageId());
+            if (node != null) {
+                node.getList().removeNode(node);
+            }
+        }
+    }
+
+    public int size() {
+        return this.map.size();
+    }
+
+    @Override
+    public String toString() {
+        return "PrioritizedPendingList(" + System.identityHashCode(this) + ")";
+    }
+
+    protected int getPriority(MessageReference message) {
+        int priority = javax.jms.Message.DEFAULT_PRIORITY;
+        if (message.getMessageId() != null) {
+            Math.max(message.getMessage().getPriority(), 0);
+            priority = Math.min(priority, 9);
+        }
+        return priority;
+    }
+
+    protected OrderedPendingList getList(MessageReference msg) {
+        return lists[getPriority(msg)];
+    }
+
+    private class PrioritizedPendingListIterator implements Iterator<MessageReference> {
+        private int index = 0;
+        private int currentIndex = 0;
+        List<PendingNode> list = new ArrayList<PendingNode>(size());
+
+        PrioritizedPendingListIterator() {
+            for (int i = MAX_PRIORITY - 1; i >= 0; i--) {
+                OrderedPendingList orderedPendingList = lists[i];
+                if (!orderedPendingList.isEmpty()) {
+                    list.addAll(orderedPendingList.getAsList());
+                }
+            }
+        }
+        public boolean hasNext() {
+            return list.size() > index;
+        }
+
+        public MessageReference next() {
+            PendingNode node = list.get(this.index);
+            this.currentIndex = this.index;
+            this.index++;
+            return node.getMessage();
+        }
+
+        public void remove() {
+            PendingNode node = list.get(this.currentIndex);
+            if (node != null) {
+                map.remove(node.getMessage().getMessageId());
+                node.getList().removeNode(node);
+            }
+
+        }
+
+    }
+
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?rev=957881&r1=957880&r2=957881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java Fri Jun 25 10:28:17 2010
@@ -21,7 +21,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
-
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
@@ -58,13 +57,14 @@ public class StoreDurableSubscriberCurso
      * @param subscription  subscription for this cursor
      */
     public StoreDurableSubscriberCursor(Broker broker,String clientId, String subscriberName,int maxBatchSize, Subscription subscription) {
+        super(AbstractPendingMessageCursor.isPrioritizedMessageSubscriber(broker,subscription));
         this.subscription=subscription;
         this.clientId = clientId;
         this.subscriberName = subscriberName;
         if (broker.getBrokerService().isPersistent()) {
-            this.nonPersistent = new FilePendingMessageCursor(broker,clientId + subscriberName);
+            this.nonPersistent = new FilePendingMessageCursor(broker,clientId + subscriberName,this.prioritizedMessages);
         }else {
-            this.nonPersistent = new VMPendingMessageCursor();
+            this.nonPersistent = new VMPendingMessageCursor(this.prioritizedMessages);
         }
         
         this.nonPersistent.setMaxBatchSize(maxBatchSize);
@@ -72,6 +72,7 @@ public class StoreDurableSubscriberCurso
         this.storePrefetches.add(this.nonPersistent);
     }
 
+    @Override
     public synchronized void start() throws Exception {
         if (!isStarted()) {
             super.start();
@@ -82,6 +83,7 @@ public class StoreDurableSubscriberCurso
         }
     }
 
+    @Override
     public synchronized void stop() throws Exception {
         if (isStarted()) {
             super.stop();
@@ -98,6 +100,7 @@ public class StoreDurableSubscriberCurso
      * @param destination
      * @throws Exception
      */
+    @Override
     public synchronized void add(ConnectionContext context, Destination destination) throws Exception {
         if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination())) {
             TopicStorePrefetch tsp = new TopicStorePrefetch(this.subscription,(Topic)destination, clientId, subscriberName);
@@ -122,6 +125,7 @@ public class StoreDurableSubscriberCurso
      * @param destination
      * @throws Exception
      */
+    @Override
     public synchronized List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
         PendingMessageCursor tsp = topics.remove(destination);
         if (tsp != null) {
@@ -133,6 +137,7 @@ public class StoreDurableSubscriberCurso
     /**
      * @return true if there are no pending messages
      */
+    @Override
     public synchronized boolean isEmpty() {
         for (PendingMessageCursor tsp : storePrefetches) {
             if( !tsp.isEmpty() )
@@ -141,6 +146,7 @@ public class StoreDurableSubscriberCurso
         return true;
     }
 
+    @Override
     public synchronized boolean isEmpty(Destination destination) {
         boolean result = true;
         TopicStorePrefetch tsp = topics.get(destination);
@@ -157,10 +163,12 @@ public class StoreDurableSubscriberCurso
      * @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor
      * @return true if recovery required
      */
+    @Override
     public boolean isRecoveryRequired() {
         return false;
     }
 
+    @Override
     public synchronized void addMessageLast(MessageReference node) throws Exception {
         if (node != null) {
             Message msg = node.getMessage();
@@ -179,16 +187,19 @@ public class StoreDurableSubscriberCurso
         }
     }
 
+    @Override
     public synchronized void addRecoveredMessage(MessageReference node) throws Exception {
         nonPersistent.addMessageLast(node);
     }
 
+    @Override
     public synchronized void clear() {
         for (PendingMessageCursor tsp : storePrefetches) {
             tsp.clear();
         }
     }
 
+    @Override
     public synchronized boolean hasNext() {
         boolean result = true;
         if (result) {
@@ -203,35 +214,41 @@ public class StoreDurableSubscriberCurso
         return result;
     }
 
+    @Override
     public synchronized MessageReference next() {
         MessageReference result = currentCursor != null ? currentCursor.next() : null;
         return result;
     }
 
+    @Override
     public synchronized void remove() {
         if (currentCursor != null) {
             currentCursor.remove();
         }
     }
 
+    @Override
     public synchronized void remove(MessageReference node) {
         if (currentCursor != null) {
             currentCursor.remove(node);
         }
     }
 
+    @Override
     public synchronized void reset() {
         for (PendingMessageCursor storePrefetch : storePrefetches) {
             storePrefetch.reset();
         }
     }
 
+    @Override
     public synchronized void release() {
         for (PendingMessageCursor storePrefetch : storePrefetches) {
             storePrefetch.release();
         }
     }
 
+    @Override
     public synchronized int size() {
         int pendingCount=0;
         for (PendingMessageCursor tsp : storePrefetches) {
@@ -240,6 +257,7 @@ public class StoreDurableSubscriberCurso
         return pendingCount;
     }
 
+    @Override
     public void setMaxBatchSize(int maxBatchSize) {
         for (PendingMessageCursor storePrefetch : storePrefetches) {
             storePrefetch.setMaxBatchSize(maxBatchSize);
@@ -247,12 +265,14 @@ public class StoreDurableSubscriberCurso
         super.setMaxBatchSize(maxBatchSize);
     }
 
+    @Override
     public synchronized void gc() {
         for (PendingMessageCursor tsp : storePrefetches) {
             tsp.gc();
         }
     }
 
+    @Override
     public void setSystemUsage(SystemUsage usageManager) {
         super.setSystemUsage(usageManager);
         for (PendingMessageCursor tsp : storePrefetches) {
@@ -260,6 +280,7 @@ public class StoreDurableSubscriberCurso
         }
     }
     
+    @Override
     public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
         super.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
         for (PendingMessageCursor cursor : storePrefetches) {
@@ -267,6 +288,7 @@ public class StoreDurableSubscriberCurso
         }
     }
     
+    @Override
     public void setMaxProducersToAudit(int maxProducersToAudit) {
         super.setMaxProducersToAudit(maxProducersToAudit);
         for (PendingMessageCursor cursor : storePrefetches) {
@@ -274,6 +296,7 @@ public class StoreDurableSubscriberCurso
         }
     }
 
+    @Override
     public void setMaxAuditDepth(int maxAuditDepth) {
         super.setMaxAuditDepth(maxAuditDepth);
         for (PendingMessageCursor cursor : storePrefetches) {
@@ -281,6 +304,7 @@ public class StoreDurableSubscriberCurso
         }
     }
     
+    @Override
     public void setEnableAudit(boolean enableAudit) {
         super.setEnableAudit(enableAudit);
         for (PendingMessageCursor cursor : storePrefetches) {
@@ -288,6 +312,7 @@ public class StoreDurableSubscriberCurso
         }
     }
     
+    @Override
     public  void setUseCache(boolean useCache) {
         super.setUseCache(useCache);
         for (PendingMessageCursor cursor : storePrefetches) {
@@ -313,6 +338,7 @@ public class StoreDurableSubscriberCurso
         return currentCursor;
     }
     
+    @Override
     public String toString() {
         return "StoreDurableSubscriber(" + clientId + ":" + subscriberName + ")";
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java?rev=957881&r1=957880&r2=957881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java Fri Jun 25 10:28:17 2010
@@ -32,21 +32,21 @@ import org.apache.commons.logging.LogFac
 public class StoreQueueCursor extends AbstractPendingMessageCursor {
 
     private static final Log LOG = LogFactory.getLog(StoreQueueCursor.class);
-    private Broker broker;
+    private final Broker broker;
     private int pendingCount;
-    private Queue queue;
+    private final Queue queue;
     private PendingMessageCursor nonPersistent;
-    private QueueStorePrefetch persistent;
+    private final QueueStorePrefetch persistent;
     private boolean started;
     private PendingMessageCursor currentCursor;
 
     /**
      * Construct
-     * 
+     * @param broker 
      * @param queue
-     * @param tmpStore
      */
     public StoreQueueCursor(Broker broker,Queue queue) {
+        super((queue != null ? queue.isPrioritizedMessages():false));
         this.broker=broker;
         this.queue = queue;
         this.persistent = new QueueStorePrefetch(queue);
@@ -58,9 +58,9 @@ public class StoreQueueCursor extends Ab
         super.start();
         if (nonPersistent == null) {
             if (broker.getBrokerService().isPersistent()) {
-                nonPersistent = new FilePendingMessageCursor(broker,queue.getName());
+                nonPersistent = new FilePendingMessageCursor(broker,queue.getName(),this.prioritizedMessages);
             }else {
-                nonPersistent = new VMPendingMessageCursor();
+                nonPersistent = new VMPendingMessageCursor(this.prioritizedMessages);
             }
             nonPersistent.setMaxBatchSize(getMaxBatchSize());
             nonPersistent.setSystemUsage(systemUsage);
@@ -101,7 +101,7 @@ public class StoreQueueCursor extends Ab
             }
         }
     }
-
+    
     public synchronized void addMessageFirst(MessageReference node) throws Exception {
         if (node != null) {
             Message msg = node.getMessage();
@@ -240,6 +240,7 @@ public class StoreQueueCursor extends Ab
         }
     }
     
+    @Override
     public void setUseCache(boolean useCache) {
         super.setUseCache(useCache);
         if (persistent != null) {
@@ -250,6 +251,7 @@ public class StoreQueueCursor extends Ab
         }
     }
     
+    @Override
     public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
         super.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
         if (persistent != null) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java?rev=957881&r1=957880&r2=957881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java Fri Jun 25 10:28:17 2010
@@ -32,13 +32,20 @@ import org.apache.activemq.broker.region
  * @version $Revision$
  */
 public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
-    private final LinkedList<MessageReference> list = new LinkedList<MessageReference>();
+    private final PendingList list;
     private Iterator<MessageReference> iter;
-    public VMPendingMessageCursor() {
+    
+    public VMPendingMessageCursor(boolean prioritizedMessages) {
+        super(prioritizedMessages);
         this.useCache = false;
+        if (this.prioritizedMessages) {
+            this.list= new PrioritizedPendingList();
+        }else {
+            this.list = new OrderedPendingList();
+        }
     }
 
-    @Override
+    
     public synchronized List<MessageReference> remove(ConnectionContext context, Destination destination)
             throws Exception {
         List<MessageReference> rc = new ArrayList<MessageReference>();
@@ -56,7 +63,7 @@ public class VMPendingMessageCursor exte
     /**
      * @return true if there are no pending messages
      */
-    @Override
+    
     public synchronized boolean isEmpty() {
         if (list.isEmpty()) {
             return true;
@@ -79,9 +86,9 @@ public class VMPendingMessageCursor exte
     /**
      * reset the cursor
      */
-    @Override
+    
     public synchronized void reset() {
-        iter = list.listIterator();
+        iter = list.iterator();
         last = null;
     }
 
@@ -90,10 +97,10 @@ public class VMPendingMessageCursor exte
      * 
      * @param node
      */
-    @Override
+    
     public synchronized void addMessageLast(MessageReference node) {
         node.incrementReferenceCount();
-        list.addLast(node);
+        list.addMessageLast(node);
     }
 
     /**
@@ -102,16 +109,16 @@ public class VMPendingMessageCursor exte
      * @param position
      * @param node
      */
-    @Override
+    
     public synchronized void addMessageFirst(MessageReference node) {
         node.incrementReferenceCount();
-        list.addFirst(node);
+        list.addMessageFirst(node);
     }
 
     /**
      * @return true if there pending messages to dispatch
      */
-    @Override
+    
     public synchronized boolean hasNext() {
         return iter.hasNext();
     }
@@ -119,7 +126,7 @@ public class VMPendingMessageCursor exte
     /**
      * @return the next pending message
      */
-    @Override
+    
     public synchronized MessageReference next() {
         last = iter.next();
         if (last != null) {
@@ -131,7 +138,7 @@ public class VMPendingMessageCursor exte
     /**
      * remove the message at the cursor position
      */
-    @Override
+    
     public synchronized void remove() {
         if (last != null) {
             last.decrementReferenceCount();
@@ -142,7 +149,7 @@ public class VMPendingMessageCursor exte
     /**
      * @return the number of pending messages
      */
-    @Override
+    
     public synchronized int size() {
         return list.size();
     }
@@ -150,7 +157,7 @@ public class VMPendingMessageCursor exte
     /**
      * clear all pending messages
      */
-    @Override
+    
     public synchronized void clear() {
         for (Iterator<MessageReference> i = list.iterator(); i.hasNext();) {
             MessageReference ref = i.next();
@@ -159,16 +166,10 @@ public class VMPendingMessageCursor exte
         list.clear();
     }
 
-    @Override
+    
     public synchronized void remove(MessageReference node) {
-        for (Iterator<MessageReference> i = list.iterator(); i.hasNext();) {
-            MessageReference ref = i.next();
-            if (node.getMessageId().equals(ref.getMessageId())) {
-                ref.decrementReferenceCount();
-                i.remove();
-                break;
-            }
-        }
+        list.remove(node);
+        node.decrementReferenceCount();
     }
 
     /**
@@ -177,10 +178,11 @@ public class VMPendingMessageCursor exte
      * @param maxItems
      * @return a list of paged in messages
      */
-    @Override
+    
     public LinkedList<MessageReference> pageInList(int maxItems) {
         LinkedList<MessageReference> result = new LinkedList<MessageReference>();
-        for (MessageReference ref: list) {
+        for (Iterator<MessageReference>i = list.iterator();i.hasNext();) {
+            MessageReference ref = i.next();
             ref.incrementReferenceCount();
             result.add(ref);
             if (result.size() >= maxItems) {
@@ -190,12 +192,12 @@ public class VMPendingMessageCursor exte
         return result;
     }
 
-    @Override
+    
     public boolean isTransient() {
         return true;
     }
 
-    @Override
+    
     public void destroy() throws Exception {
         super.destroy();
         clear();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java?rev=957881&r1=957880&r2=957881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java Fri Jun 25 10:28:17 2010
@@ -18,6 +18,7 @@ package org.apache.activemq.broker.regio
 
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor;
 import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
 
@@ -42,6 +43,6 @@ public class FilePendingDurableSubscribe
      * @return the Pending Message cursor
      */
     public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId, String name, int maxBatchSize, Subscription sub) {
-        return new FilePendingMessageCursor(broker,name);
+        return new FilePendingMessageCursor(broker,name,AbstractPendingMessageCursor.isPrioritizedMessageSubscriber(broker, sub));
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java?rev=957881&r1=957880&r2=957881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java Fri Jun 25 10:28:17 2010
@@ -39,7 +39,7 @@ public class FilePendingQueueMessageStor
      *      org.apache.activemq.kaha.Store)
      */
     public PendingMessageCursor getQueuePendingMessageCursor(Broker broker,Queue queue) {
-        return new FilePendingMessageCursor(broker,"PendingCursor:" + queue.getName());
+        return new FilePendingMessageCursor(broker,"PendingCursor:" + queue.getName(),queue.isPrioritizedMessages());
     }
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingSubscriberMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingSubscriberMessageStoragePolicy.java?rev=957881&r1=957880&r2=957881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingSubscriberMessageStoragePolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingSubscriberMessageStoragePolicy.java Fri Jun 25 10:28:17 2010
@@ -17,6 +17,8 @@
 package org.apache.activemq.broker.region.policy;
 
 import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor;
 import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
 
@@ -31,14 +33,16 @@ import org.apache.activemq.broker.region
 public class FilePendingSubscriberMessageStoragePolicy implements PendingSubscriberMessageStoragePolicy {
 
     /**
-     * @param broker 
+     * @param broker
      * @param name
      * @param maxBatchSize
      * @return a Cursor
      * @see org.apache.activemq.broker.region.policy.PendingSubscriberMessageStoragePolicy#getSubscriberPendingMessageCursor(java.lang.String,
      *      org.apache.activemq.kaha.Store, int)
      */
-    public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String name,int maxBatchSize) {
-        return new FilePendingMessageCursor(broker,"PendingCursor:" + name);
+    public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker, String name, int maxBatchSize,
+            Subscription subs) {
+        return new FilePendingMessageCursor(broker, "PendingCursor:" + name, AbstractPendingMessageCursor
+                .isPrioritizedMessageSubscriber(broker, subs));
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingSubscriberMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingSubscriberMessageStoragePolicy.java?rev=957881&r1=957880&r2=957881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingSubscriberMessageStoragePolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingSubscriberMessageStoragePolicy.java Fri Jun 25 10:28:17 2010
@@ -17,6 +17,7 @@
 package org.apache.activemq.broker.region.policy;
 
 import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
 
 /**
@@ -35,5 +36,5 @@ public interface PendingSubscriberMessag
      * @param maxBatchSize
      * @return the Pending Message cursor
      */
-    PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String name,int maxBatchSize);
+    PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String name,int maxBatchSize,Subscription subs);
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=957881&r1=957880&r2=957881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java Fri Jun 25 10:28:17 2010
@@ -87,6 +87,7 @@ public class PolicyEntry extends Destina
     private int cursorMemoryHighWaterMark = 70;
     private int storeUsageHighWaterMark = 100;
     private SlowConsumerStrategy slowConsumerStrategy;
+    private boolean prioritizedMessages;
     
    
     public void configure(Broker broker,Queue queue) {
@@ -155,6 +156,7 @@ public class PolicyEntry extends Destina
             scs.setScheduler(broker.getScheduler());
         }
         destination.setSlowConsumerStrategy(scs);
+        destination.setPrioritizedMessages(isPrioritizedMessages());
     }
 
     public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription) {
@@ -184,7 +186,7 @@ public class PolicyEntry extends Destina
         if (pendingSubscriberPolicy != null) {
             String name = subscription.getContext().getClientId() + "_" + subscription.getConsumerInfo().getConsumerId();
             int maxBatchSize = subscription.getConsumerInfo().getPrefetchSize();
-            subscription.setMatched(pendingSubscriberPolicy.getSubscriberPendingMessageCursor(broker,name, maxBatchSize));
+            subscription.setMatched(pendingSubscriberPolicy.getSubscriberPendingMessageCursor(broker,name, maxBatchSize,subscription));
         }
         if (enableAudit) {
             subscription.setEnableAudit(enableAudit);
@@ -739,5 +741,14 @@ public class PolicyEntry extends Destina
     public SlowConsumerStrategy getSlowConsumerStrategy() {
         return this.slowConsumerStrategy;
     }
+    
+    
+    public boolean isPrioritizedMessages() {
+        return this.prioritizedMessages;
+    }
+
+    public void setPrioritizedMessages(boolean prioritizedMessages) {
+        this.prioritizedMessages = prioritizedMessages;
+    }
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java?rev=957881&r1=957880&r2=957881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java Fri Jun 25 10:28:17 2010
@@ -18,6 +18,7 @@ package org.apache.activemq.broker.regio
 
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor;
 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
 import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
 
@@ -40,6 +41,6 @@ public class VMPendingDurableSubscriberM
      * @return the Pending Message cursor
      */
     public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId, String name,int maxBatchSize, Subscription sub) {
-        return new VMPendingMessageCursor();
+        return new VMPendingMessageCursor(AbstractPendingMessageCursor.isPrioritizedMessageSubscriber(broker, sub));
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java?rev=957881&r1=957880&r2=957881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java Fri Jun 25 10:28:17 2010
@@ -37,6 +37,6 @@ public class VMPendingQueueMessageStorag
      * @return the cursor
      */
     public PendingMessageCursor getQueuePendingMessageCursor(Broker broker,Queue queue) {
-        return new VMPendingMessageCursor();
+        return new VMPendingMessageCursor(queue.isPrioritizedMessages());
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingSubscriberMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingSubscriberMessageStoragePolicy.java?rev=957881&r1=957880&r2=957881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingSubscriberMessageStoragePolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingSubscriberMessageStoragePolicy.java Fri Jun 25 10:28:17 2010
@@ -17,6 +17,8 @@
 package org.apache.activemq.broker.region.policy;
 
 import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor;
 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
 import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
 
@@ -38,7 +40,7 @@ public class VMPendingSubscriberMessageS
      * @see org.apache.activemq.broker.region.policy.PendingSubscriberMessageStoragePolicy#getSubscriberPendingMessageCursor(java.lang.String,
      *      org.apache.activemq.kaha.Store, int)
      */
-    public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String name,int maxBatchSize) {
-        return new VMPendingMessageCursor();
+    public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String name,int maxBatchSize,Subscription subs) {
+        return new VMPendingMessageCursor(AbstractPendingMessageCursor.isPrioritizedMessageSubscriber(broker, subs));
     }
 }