You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/10/13 13:17:46 UTC

svn commit: r463646 - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/cursors/ main/java/org/apache/activemq/broker/region/po...

Author: rajdavies
Date: Fri Oct 13 04:17:41 2006
New Revision: 463646

URL: http://svn.apache.org/viewvc?view=rev&rev=463646
Log:
Fixed failing test cases: - a few problems had been there for a while

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaCursorDurableTest.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java?view=diff&rev=463646&r1=463645&r2=463646
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java Fri Oct 13 04:17:41 2006
@@ -21,6 +21,7 @@
 import org.apache.activemq.Service;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.Region;
+import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.BrokerId;
 import org.apache.activemq.command.BrokerInfo;
@@ -250,10 +251,19 @@
 
     /**
      * Sets the default administration connection context used when configuring the broker on startup or via JMX
+     * @param adminConnectionContext 
      */
     public abstract void setAdminConnectionContext(ConnectionContext adminConnectionContext);
     
-    
+    /**
+     * @return the pendingDurableSubscriberPolicy
+     */
+    public abstract PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy();
+  
+    /**
+     * @param pendingDurableSubscriberPolicy the pendingDurableSubscriberPolicy to set
+     */
+    public abstract void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy);
     /**
      * @return the broker's temp data store
      * @throws Exception

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java?view=diff&rev=463646&r1=463645&r2=463646
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java Fri Oct 13 04:17:41 2006
@@ -19,6 +19,7 @@
 
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.BrokerId;
 import org.apache.activemq.command.BrokerInfo;
@@ -232,6 +233,15 @@
     public void setAdminConnectionContext(ConnectionContext adminConnectionContext) {
         next.setAdminConnectionContext(adminConnectionContext);
     }
+    
+    public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy() {
+        return next.getPendingDurableSubscriberPolicy();
+    }
+  
+    public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy) {
+        next.setPendingDurableSubscriberPolicy(pendingDurableSubscriberPolicy);
+    }
+   
 
     public Store getTempDataStore() {
         return next.getTempDataStore();

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?view=diff&rev=463646&r1=463645&r2=463646
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Fri Oct 13 04:17:41 2006
@@ -45,7 +45,9 @@
 import org.apache.activemq.broker.region.DestinationFactoryImpl;
 import org.apache.activemq.broker.region.DestinationInterceptor;
 import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
 import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.VMPendingDurableSubscriberMessageStoragePolicy;
 import org.apache.activemq.broker.region.virtual.VirtualDestination;
 import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
 import org.apache.activemq.broker.region.virtual.VirtualTopic;
@@ -137,6 +139,7 @@
     private ActiveMQDestination[] destinations;
     private Store tempDataStore;
     private int persistenceThreadPriority = Thread.MAX_PRIORITY;
+    private PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy = new VMPendingDurableSubscriberMessageStoragePolicy();
    
 
     /**
@@ -388,7 +391,13 @@
             }
 
             getBroker().start();
-            
+            /*
+            if(isUseJmx()){
+                // yes - this is orer dependent!
+                // register all destination in persistence store including inactive destinations as mbeans
+                this.startDestinationsInPersistenceStore(broker);
+            }
+            */
             startAllConnectors();
             
             if (isUseJmx() && masterConnector != null) {
@@ -987,6 +996,23 @@
     public void setPersistenceThreadPriority(int persistenceThreadPriority){
         this.persistenceThreadPriority=persistenceThreadPriority;
     }
+    
+    /**
+     * @return the pendingDurableSubscriberPolicy
+     */
+    public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy(){
+        return this.pendingDurableSubscriberPolicy;
+    }
+  
+    /**
+     * @param pendingDurableSubscriberPolicy the pendingDurableSubscriberPolicy to set
+     */
+    public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy){
+        this.pendingDurableSubscriberPolicy=pendingDurableSubscriberPolicy;
+        if (broker != null) {
+            broker.setPendingDurableSubscriberPolicy(pendingDurableSubscriberPolicy);
+        }
+    }
 
     // Implementation methods
     // -------------------------------------------------------------------------
@@ -1199,8 +1225,6 @@
                 mbeanServer.registerMBean(adminView, objectName);
                 registeredMBeanNames.add(objectName);
             }
-            //register all destination in persistence store including inactive destinations as mbeans 
-            this.startDestinationsInPersistenceStore(broker);
         }
         
 
@@ -1243,6 +1267,7 @@
         
         regionBroker.setKeepDurableSubsActive(keepDurableSubsActive);
 		regionBroker.setBrokerName(getBrokerName());
+        regionBroker.setPendingDurableSubscriberPolicy(getPendingDurableSubscriberPolicy());
 		return regionBroker;
 	}
 
@@ -1515,8 +1540,5 @@
                 broker.addDestination(adminConnectionContext, destination);
             }
         }
-    }
-    
-
-    
+    }    
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java?view=diff&rev=463646&r1=463645&r2=463646
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java Fri Oct 13 04:17:41 2006
@@ -19,6 +19,7 @@
 
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.BrokerId;
 import org.apache.activemq.command.BrokerInfo;
@@ -229,6 +230,13 @@
     
     public Response messagePull(ConnectionContext context, MessagePull pull) {
         return null;
+    }
+    
+    public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy() {
+        return null;
+    }
+  
+    public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy) {
     }
     
     public Store getTempDataStore() {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java?view=diff&rev=463646&r1=463645&r2=463646
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java Fri Oct 13 04:17:41 2006
@@ -23,6 +23,7 @@
 
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.BrokerId;
 import org.apache.activemq.command.BrokerInfo;
@@ -228,6 +229,14 @@
     }
 
     public Response messagePull(ConnectionContext context, MessagePull pull) {
+        throw new BrokerStoppedException(this.message);
+    }
+    
+    public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy() {
+        throw new BrokerStoppedException(this.message);
+    }
+  
+    public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy) {
         throw new BrokerStoppedException(this.message);
     }
     

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java?view=diff&rev=463646&r1=463645&r2=463646
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java Fri Oct 13 04:17:41 2006
@@ -19,6 +19,7 @@
 
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.BrokerId;
 import org.apache.activemq.command.BrokerInfo;
@@ -243,6 +244,14 @@
 
     public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
         return getNext().messagePull(context, pull);
+    }
+    
+    public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy() {
+        return getNext().getPendingDurableSubscriberPolicy();
+    }
+  
+    public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy) {
+        getNext().setPendingDurableSubscriberPolicy(pendingDurableSubscriberPolicy);
     }
     
     public Store getTempDataStore() {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?view=diff&rev=463646&r1=463645&r2=463646
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java Fri Oct 13 04:17:41 2006
@@ -177,7 +177,6 @@
 
     public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
         log.debug("Adding consumer: "+info.getConsumerId());
-
         ActiveMQDestination destination = info.getDestination();
         if (destination != null && ! destination.isPattern() && ! destination.isComposite()) {
             // lets auto-create the destination
@@ -260,7 +259,6 @@
     }
 
     public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
-
         log.debug("Removing consumer: "+info.getConsumerId());
 
         Subscription sub = (Subscription) subscriptions.remove(info.getConsumerId());

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java?view=diff&rev=463646&r1=463645&r2=463646
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java Fri Oct 13 04:17:41 2006
@@ -78,7 +78,7 @@
             if (destination.isTemporary()) {
                 final ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destination;
                 return new Queue(destination, memoryManager, null, destinationStatistics, taskRunnerFactory) {
-                    
+                   
                     public void addSubscription(ConnectionContext context,Subscription sub) throws Exception {
                         // Only consumers on the same connection can consume from 
                         // the temporary destination
@@ -92,6 +92,7 @@
                 MessageStore store = persistenceAdapter.createQueueMessageStore((ActiveMQQueue) destination);
                 Queue queue = new Queue(destination, memoryManager, store, destinationStatistics, taskRunnerFactory);
                 configureQueue(queue, destination);
+                queue.initialize();
                 return queue;
             }
         } else if (destination.isTemporary()){

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?view=diff&rev=463646&r1=463645&r2=463646
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Fri Oct 13 04:17:41 2006
@@ -24,6 +24,7 @@
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
+import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
 import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.Message;
@@ -40,10 +41,8 @@
     private final boolean keepDurableSubsActive;
     private boolean active=false;
     
-    public DurableTopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive) throws InvalidSelectorException {
-       //super(broker,context, info, new StoreDurableSubscriberCursor(context.getClientId(),info.getSubcriptionName(),broker.getTempDataStore(),info.getPrefetchSize()));
-       //super(broker,context, info, new FilePendingMessageCursor(context.getClientId() + info.getConsumerId().toString(),broker.getTempDataStore()));
-       super(broker,context,info);
+    public DurableTopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive,PendingMessageCursor cursor) throws InvalidSelectorException {
+        super(broker,context,info,cursor);
         this.keepDurableSubsActive = keepDurableSubsActive;
         subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubcriptionName());
     }
@@ -192,7 +191,6 @@
      * Release any references that we are holding.
      */
     synchronized public void destroy() {
-    	
     	synchronized(pending) {
             pending.reset();
 	        while(pending.hasNext()) {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?view=diff&rev=463646&r1=463645&r2=463646
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Fri Oct 13 04:17:41 2006
@@ -124,8 +124,8 @@
         
     synchronized public void add(MessageReference node) throws Exception{
         enqueueCounter++;
-        //if(!isFull()){
-        if(!isFull() && pending.isEmpty() && canDispatch(node)){
+      
+        if(!isFull() && pending.isEmpty() ){
             dispatch(node);
         }else{
             optimizePrefetch();
@@ -376,7 +376,6 @@
         if(canDispatch(node)&&!isSlaveBroker()){
         	
             MessageDispatch md=createMessageDispatch(node,message);
-
             // NULL messages don't count... they don't get Acked.
             if( node != QueueMessageReference.NULL_MESSAGE ) {
             dispatchCounter++;

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?view=diff&rev=463646&r1=463645&r2=463646
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Fri Oct 13 04:17:41 2006
@@ -20,12 +20,15 @@
 import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
+import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
 import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory;
 import org.apache.activemq.broker.region.group.MessageGroupMap;
 import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
 import org.apache.activemq.broker.region.group.MessageGroupSet;
 import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
 import org.apache.activemq.broker.region.policy.DispatchPolicy;
+import org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy;
 import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
 import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -67,10 +70,10 @@
 
     protected final ActiveMQDestination destination;
     protected final List consumers = new CopyOnWriteArrayList();
-    private final LinkedList messages = new LinkedList();
     protected final Valve dispatchValve = new Valve(true);
     protected final UsageManager usageManager;
     protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
+    protected  PendingMessageCursor messages = new VMPendingMessageCursor();
 
     private LockOwner exclusiveOwner;
     private MessageGroupMap messageGroupOwners;
@@ -100,6 +103,10 @@
         destinationStatistics.setParent(parentStats);
         this.log = LogFactory.getLog(getClass().getName() + "." + destination.getPhysicalName());
 
+        
+    }
+    
+    public void initialize() throws Exception {
         if (store != null) {
             // Restore the persistent messages.
             store.recover(new MessageRecoveryListener() {
@@ -107,7 +114,11 @@
                     message.setRegionDestination(Queue.this);
                     MessageReference reference = createMessageReference(message);
                     synchronized (messages) {
-                        messages.add(reference);
+                        try{
+                            messages.addMessageLast(reference);
+                        }catch(Exception e){
+                           log.fatal("Failed to add message to cursor",e);
+                        }
                     }
                     reference.decrementReferenceCount();
                     destinationStatistics.getMessages().increment();
@@ -158,9 +169,10 @@
             synchronized (messages) {
                 // Add all the matching messages in the queue to the
                 // subscription.
-                for (Iterator iter = messages.iterator(); iter.hasNext();) {
+                messages.reset();
+                while(messages.hasNext()) {
 
-                    QueueMessageReference node = (QueueMessageReference) iter.next();
+                    QueueMessageReference node = (QueueMessageReference) messages.next();
                     if (node.isDropped()) {
                         continue;
                     }
@@ -219,8 +231,9 @@
                     // lets copy the messages to dispatch to avoid deadlock
                     List messagesToDispatch = new ArrayList();
                     synchronized (messages) {
-                        for (Iterator iter = messages.iterator(); iter.hasNext();) {
-                            QueueMessageReference node = (QueueMessageReference) iter.next();
+                        messages.reset();
+                        while(messages.hasNext()) {
+                            QueueMessageReference node = (QueueMessageReference) messages.next();
                             if (node.isDropped()) {
                                 continue;
                             }
@@ -314,12 +327,13 @@
 
     public void gc() {
         synchronized (messages) {
-            for (Iterator iter = messages.iterator(); iter.hasNext();) {
+            messages.resetForGC();
+            while(messages.hasNext()) {
                 // Remove dropped messages from the queue.
-                QueueMessageReference node = (QueueMessageReference) iter.next();
+                QueueMessageReference node = (QueueMessageReference) messages.next();
                 if (node.isDropped()) {
                     garbageSize--;
-                    iter.remove();
+                    messages.remove();
                     continue;
                 }
             }
@@ -456,6 +470,12 @@
     public void setMemoryLimit(long limit) {
         getUsageManager().setLimit(limit);
     }
+    public PendingMessageCursor getMessages(){
+        return this.messages;
+    }
+    public void setMessages(PendingMessageCursor messages){
+        this.messages=messages;
+    }
 
     // Implementation methods
     // -------------------------------------------------------------------------
@@ -470,7 +490,7 @@
         try {
             destinationStatistics.onMessageEnqueue(message);
             synchronized (messages) {
-                messages.add(node);
+                messages.addMessageLast(node);
             }
 
             synchronized (consumers) {
@@ -509,12 +529,12 @@
     }
 
     public Message[] browse() {
-
         ArrayList l = new ArrayList();
         synchronized (messages) {
-            for (Iterator iter = messages.iterator(); iter.hasNext();) {
+            messages.reset();
+            while(messages.hasNext()) {
                 try {
-                    MessageReference r = (MessageReference) iter.next();
+                    MessageReference r = (MessageReference) messages.next();
                     r.incrementReferenceCount();
                     try {
                         Message m = r.getMessage();
@@ -536,9 +556,10 @@
 
     public Message getMessage(String messageId) {
         synchronized (messages) {
-            for (Iterator iter = messages.iterator(); iter.hasNext();) {
+            messages.reset();
+            while(messages.hasNext()) {
                 try {
-                    MessageReference r = (MessageReference) iter.next();
+                    MessageReference r = (MessageReference) messages.next();
                     if (messageId.equals(r.getMessageId().toString())) {
                         r.incrementReferenceCount();
                         try {
@@ -563,9 +584,10 @@
     public void purge() {
         synchronized (messages) {
             ConnectionContext c = createConnectionContext();
-            for (Iterator iter = messages.iterator(); iter.hasNext();) {
+            messages.reset();
+            while(messages.hasNext()) {
                 try {
-                    QueueMessageReference r = (QueueMessageReference) iter.next();
+                    QueueMessageReference r = (QueueMessageReference) messages.next();
 
                     // We should only delete messages that can be locked.
                     if (r.lock(LockOwner.HIGH_PRIORITY_LOCK_OWNER)) {
@@ -623,8 +645,9 @@
         int counter = 0;
         synchronized (messages) {
             ConnectionContext c = createConnectionContext();
-            for (Iterator iter = messages.iterator(); iter.hasNext();) {
-                IndirectMessageReference r = (IndirectMessageReference) iter.next();
+            messages.reset();
+            while(messages.hasNext()) {
+                IndirectMessageReference r = (IndirectMessageReference) messages.next();
                 if (filter.evaluate(c, r)) {
                     // We should only delete messages that can be locked.
                     if (lockMessage(r)) {
@@ -672,8 +695,9 @@
     public int copyMatchingMessages(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, int maximumMessages) throws Exception {
         int counter = 0;
         synchronized (messages) {
-            for (Iterator iter = messages.iterator(); iter.hasNext();) {
-                MessageReference r = (MessageReference) iter.next();
+            messages.reset();
+            while(messages.hasNext()) {
+                MessageReference r = (MessageReference) messages.next();
                 if (filter.evaluate(context, r)) {
                     r.incrementReferenceCount();
                     try {
@@ -721,8 +745,9 @@
     public int moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, int maximumMessages) throws Exception {
         int counter = 0;
         synchronized (messages) {
-            for (Iterator iter = messages.iterator(); iter.hasNext();) {
-                IndirectMessageReference r = (IndirectMessageReference) iter.next();
+            messages.reset();
+            while(messages.hasNext()) {
+                IndirectMessageReference r = (IndirectMessageReference) messages.next();
                 if (filter.evaluate(context, r)) {
                     // We should only move messages that can be locked.
                     if (lockMessage(r)) {
@@ -789,5 +814,4 @@
         answer.getMessageEvaluationContext().setDestination(getActiveMQDestination());
         return answer;
     }
-
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java?view=diff&rev=463646&r1=463645&r2=463646
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java Fri Oct 13 04:17:41 2006
@@ -79,7 +79,6 @@
         String groupId = node.getGroupID();
         int sequence = node.getGroupSequence();
         if( groupId!=null ) {
-            
             MessageGroupMap messageGroupOwners = ((Queue)node.getRegionDestination()).getMessageGroupOwners();            
             
             // If we can own the first, then no-one else should own the rest.

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?view=diff&rev=463646&r1=463645&r2=463646
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Fri Oct 13 04:17:41 2006
@@ -31,7 +31,9 @@
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.Connection;
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
 import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.VMPendingDurableSubscriberMessageStoragePolicy;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.BrokerId;
 import org.apache.activemq.command.BrokerInfo;
@@ -93,6 +95,7 @@
     private ConnectionContext adminConnectionContext;
     protected DestinationFactory destinationFactory;
     protected final ConcurrentHashMap connectionStates = new ConcurrentHashMap();
+    private PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy = new VMPendingDurableSubscriberMessageStoragePolicy();
         
     public RegionBroker(BrokerService brokerService,TaskRunnerFactory taskRunnerFactory, UsageManager memoryManager, DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor) throws IOException {
         this.brokerService = brokerService;
@@ -583,5 +586,19 @@
 
     public Store getTempDataStore() {
         return brokerService.getTempDataStore();
+    }
+    
+    /**
+     * @return the pendingDurableSubscriberPolicy
+     */
+    public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy(){
+        return this.pendingDurableSubscriberPolicy;
+    }
+  
+    /**
+     * @param pendingDurableSubscriberPolicy the pendingDurableSubscriberPolicy to set
+     */
+    public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy durableSubscriberCursor){
+        this.pendingDurableSubscriberPolicy=durableSubscriberCursor;
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java?view=diff&rev=463646&r1=463645&r2=463646
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java Fri Oct 13 04:17:41 2006
@@ -26,6 +26,7 @@
 
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConnectionId;
@@ -61,60 +62,52 @@
         
     }
 
-    public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
-        if (info.isDurable()) {
-
-            ActiveMQDestination destination = info.getDestination();
-            if( !destination.isPattern() ) {
+    public Subscription addConsumer(ConnectionContext context,ConsumerInfo info) throws Exception{
+        if(info.isDurable()){
+            ActiveMQDestination destination=info.getDestination();
+            if(!destination.isPattern()){
                 // Make sure the destination is created.
-                lookup(context, destination);
+                lookup(context,destination);
             }
-
-            String clientId = context.getClientId();
-            String subcriptionName = info.getSubcriptionName();
-            SubscriptionKey key = new SubscriptionKey(clientId, subcriptionName);
-            DurableTopicSubscription sub = (DurableTopicSubscription) durableSubscriptions.get(key);
-            if (sub != null) {
-
-                if (sub.isActive()) {
-                    throw new JMSException("Durable consumer is in use for client: " + clientId + " and subscriptionName: " + subcriptionName);
+            String clientId=context.getClientId();
+            String subcriptionName=info.getSubcriptionName();
+            SubscriptionKey key=new SubscriptionKey(clientId,subcriptionName);
+            DurableTopicSubscription sub=(DurableTopicSubscription)durableSubscriptions.get(key);
+            if(sub!=null){
+                if(sub.isActive()){
+                    throw new JMSException("Durable consumer is in use for client: "+clientId+" and subscriptionName: "
+                            +subcriptionName);
                 }
-
                 // Has the selector changed??
-                if (hasDurableSubChanged(info, sub.getConsumerInfo())) {
-
+                if(hasDurableSubChanged(info,sub.getConsumerInfo())){
                     // Remove the consumer first then add it.
                     durableSubscriptions.remove(key);
-                    for (Iterator iter = destinations.values().iterator(); iter.hasNext();) {
-                        Topic topic = (Topic) iter.next();
-                        topic.deleteSubscription(context, key);
+                    for(Iterator iter=destinations.values().iterator();iter.hasNext();){
+                        Topic topic=(Topic)iter.next();
+                        topic.deleteSubscription(context,key);
                     }
-                    super.removeConsumer(context, sub.getConsumerInfo());
-
-                    super.addConsumer(context, info);
-                    sub = (DurableTopicSubscription) durableSubscriptions.get(key);
-                }
-                else {
+                    super.removeConsumer(context,sub.getConsumerInfo());
+                    super.addConsumer(context,info);
+                    sub=(DurableTopicSubscription)durableSubscriptions.get(key);
+                }else{
                     // Change the consumer id key of the durable sub.
-                    if( sub.getConsumerInfo().getConsumerId()!=null )
+                    if(sub.getConsumerInfo().getConsumerId()!=null)
                         subscriptions.remove(sub.getConsumerInfo().getConsumerId());
-                    subscriptions.put(info.getConsumerId(), sub);
+                    subscriptions.put(info.getConsumerId(),sub);
                 }
-            }
-            else {
-                super.addConsumer(context, info);
-                sub = (DurableTopicSubscription) durableSubscriptions.get(key);
-                if (sub == null) {
-                    throw new JMSException("Cannot use the same consumerId: " + info.getConsumerId() + " for two different durable subscriptions clientID: "
-                            + key.getClientId() + " subscriberName: " + key.getSubscriptionName());
+            }else{
+                super.addConsumer(context,info);
+                sub=(DurableTopicSubscription)durableSubscriptions.get(key);
+                if(sub==null){
+                    throw new JMSException("Cannot use the same consumerId: "+info.getConsumerId()
+                            +" for two different durable subscriptions clientID: "+key.getClientId()
+                            +" subscriberName: "+key.getSubscriptionName());
                 }
             }
-            
-            sub.activate(context, info);
+            sub.activate(context,info);
             return sub;
-        }
-        else {
-            return super.addConsumer(context, info);
+        }else{
+            return super.addConsumer(context,info);
         }
     }
 
@@ -222,9 +215,12 @@
             }
             SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubcriptionName());
             DurableTopicSubscription sub = (DurableTopicSubscription) durableSubscriptions.get(key);
-            if (sub == null) {
-                sub = new DurableTopicSubscription(broker,context, info, keepDurableSubsActive);
-                durableSubscriptions.put(key, sub);
+            if(sub==null){
+                PendingMessageCursor cursor=broker.getPendingDurableSubscriberPolicy().getSubscriberPendingMessageCursor(
+                        context.getClientId(),info.getSubcriptionName(),broker.getTempDataStore(),
+                        info.getPrefetchSize());
+                sub=new DurableTopicSubscription(broker,context,info,keepDurableSubsActive,cursor);
+                durableSubscriptions.put(key,sub);
             }
             else {
                 throw new JMSException("That durable subscription is already active.");

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?view=diff&rev=463646&r1=463645&r2=463646
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java Fri Oct 13 04:17:41 2006
@@ -1,19 +1,15 @@
 /**
  * 
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
  * 
  * http://www.apache.org/licenses/LICENSE-2.0
  * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 
 package org.apache.activemq.broker.region.cursors;
@@ -24,13 +20,13 @@
 import org.apache.activemq.broker.region.MessageReference;
 
 /**
- * Abstract method holder for pending message (messages awaiting disptach to a
- * consumer) cursor
+ * Abstract method holder for pending message (messages awaiting disptach to a consumer) cursor
  * 
  * @version $Revision$
  */
-public class AbstractPendingMessageCursor implements PendingMessageCursor {
-    protected int maxBatchSize = 100;
+public class AbstractPendingMessageCursor implements PendingMessageCursor{
+
+    protected int maxBatchSize=100;
 
     public void start() throws Exception{
     }
@@ -38,12 +34,10 @@
     public void stop() throws Exception{
     }
 
-    public void add(ConnectionContext context,Destination destination)
-            throws Exception{
+    public void add(ConnectionContext context,Destination destination) throws Exception{
     }
 
-    public void remove(ConnectionContext context,Destination destination)
-            throws Exception{
+    public void remove(ConnectionContext context,Destination destination) throws Exception{
     }
 
     public boolean isRecoveryRequired(){
@@ -80,7 +74,7 @@
     public int size(){
         return 0;
     }
-    
+
     public int getMaxBatchSize(){
         return maxBatchSize;
     }
@@ -91,6 +85,11 @@
 
     protected void fillBatch() throws Exception{
     }
-    
-    
+
+    /**
+     * Give the cursor a hint that we are about to remove messages from memory only
+     */
+    public void resetForGC(){
+        reset();
+    }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?view=diff&rev=463646&r1=463645&r2=463646
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java Fri Oct 13 04:17:41 2006
@@ -24,7 +24,7 @@
 import org.apache.activemq.store.kahadaptor.CommandMarshaller;
 /**
  * perist pending messages pending message (messages awaiting disptach to a consumer) cursor
- * 
+ *
  * @version $Revision$
  */
 public class FilePendingMessageCursor extends AbstractPendingMessageCursor{

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java?view=diff&rev=463646&r1=463645&r2=463646
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java Fri Oct 13 04:17:41 2006
@@ -113,4 +113,10 @@
      * @param maxBatchSize
      */
     public void setMaxBatchSize(int maxBatchSize);
+
+    /**
+     * Give the cursor a hint that we are about to remove
+     * messages from memory only
+     */
+    public void resetForGC();
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?view=diff&rev=463646&r1=463645&r2=463646
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java Fri Oct 13 04:17:41 2006
@@ -127,12 +127,7 @@
         return false;
     }
 
-    public synchronized void addMessageFirst(MessageReference node) throws IOException{
-        if(started){
-            throw new RuntimeException("This shouldn't be called!");
-        }
-    }
-
+    
     public synchronized void addMessageLast(MessageReference node) throws Exception{
         if(node!=null){
             Message msg=node.getMessage();

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?view=diff&rev=463646&r1=463645&r2=463646
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java Fri Oct 13 04:17:41 2006
@@ -20,6 +20,7 @@
 import org.apache.activemq.broker.region.Queue;
 import org.apache.activemq.broker.region.Topic;
 import org.apache.activemq.broker.region.TopicSubscription;
+import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
 import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory;
 import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
 import org.apache.activemq.filter.DestinationMapEntry;
@@ -46,6 +47,7 @@
     private MessageEvictionStrategy messageEvictionStrategy;
     private long memoryLimit;
     private MessageGroupMapFactory messageGroupMapFactory;
+    private PendingQueueMessageStoragePolicy pendingQueueMessageStoragePolicy;
     
     public void configure(Queue queue) {
         if (dispatchPolicy != null) {
@@ -58,6 +60,10 @@
         if( memoryLimit>0 ) {
             queue.getUsageManager().setLimit(memoryLimit);
         }
+        if (pendingQueueMessageStoragePolicy != null) {
+            PendingMessageCursor messages = pendingQueueMessageStoragePolicy.getQueuePendingMessageCursor();
+            queue.setMessages(messages);
+        }
     }
 
     public void configure(Topic topic) {
@@ -74,6 +80,7 @@
         if( memoryLimit>0 ) {
             topic.getUsageManager().setLimit(memoryLimit);
         }
+        
     }
 
     public void configure(TopicSubscription subscription) {
@@ -193,6 +200,22 @@
      */
     public void setMessageGroupMapFactory(MessageGroupMapFactory messageGroupMapFactory) {
         this.messageGroupMapFactory = messageGroupMapFactory;
+    }
+
+    
+    /**
+     * @return the pendingQueueMessageStoragePolicy
+     */
+    public PendingQueueMessageStoragePolicy getPendingQueueMessageStoragePolicy(){
+        return this.pendingQueueMessageStoragePolicy;
+    }
+
+    
+    /**
+     * @param pendingQueueMessageStoragePolicy the pendingQueueMessageStoragePolicy to set
+     */
+    public void setPendingQueueMessageStoragePolicy(PendingQueueMessageStoragePolicy pendingQueueMessageStoragePolicy){
+        this.pendingQueueMessageStoragePolicy=pendingQueueMessageStoragePolicy;
     }
 
     

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java?view=diff&rev=463646&r1=463645&r2=463646
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java Fri Oct 13 04:17:41 2006
@@ -20,6 +20,7 @@
 
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.BrokerId;
 import org.apache.activemq.command.BrokerInfo;
@@ -218,6 +219,13 @@
     }
 
     public void stop() throws Exception {
+    }
+    
+    public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy() {
+        return null;
+    }
+  
+    public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy) {
     }
 
 	public Store getTempDataStore() {

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java?view=diff&rev=463646&r1=463645&r2=463646
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java Fri Oct 13 04:17:41 2006
@@ -39,6 +39,7 @@
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.StorePendingDurableSubscriberMessageStoragePolicy;
 import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -51,7 +52,7 @@
     
     protected static final Log log = LogFactory.getLog(CursorDurableTest.class);
 
-    protected static final int MESSAGE_COUNT=50;
+    protected static final int MESSAGE_COUNT=100;
     protected static final int PREFETCH_SIZE = 5;
     protected BrokerService broker;
     protected String bindAddress="tcp://localhost:60706";
@@ -138,7 +139,10 @@
         for (int i =MESSAGE_COUNT/10; i < MESSAGE_COUNT; i++) {
             TextMessage msg=session.createTextMessage("test"+i);
             senderList.add(msg);
+           
             producer.send(msg);
+            
+           
         }   
         
         
@@ -204,11 +208,13 @@
         BrokerService answer=new BrokerService();
         configureBroker(answer);
         answer.setDeleteAllMessagesOnStartup(true);
+        answer.setPendingDurableSubscriberPolicy(new StorePendingDurableSubscriberMessageStoragePolicy());
         answer.start();
         return answer;
     }
 
     protected void configureBroker(BrokerService answer) throws Exception{
+        
         answer.addConnector(bindAddress);
         answer.setDeleteAllMessagesOnStartup(true);
     }

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaCursorDurableTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaCursorDurableTest.java?view=diff&rev=463646&r1=463645&r2=463646
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaCursorDurableTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaCursorDurableTest.java Fri Oct 13 04:17:41 2006
@@ -35,6 +35,6 @@
         KahaPersistenceAdapter adaptor = new KahaPersistenceAdapter(new File("activemq-data/durableTest"));
         answer.setPersistenceAdapter(adaptor);
         answer.addConnector(bindAddress);
-        answer.setDeleteAllMessagesOnStartup(true);
+        //answer.setDeleteAllMessagesOnStartup(true);
     }
 }