You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2007/01/28 20:39:04 UTC

svn commit: r500862 - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/cursors/ main/java/org/apache/activemq/broker/region/po...

Author: rajdavies
Date: Sun Jan 28 11:39:02 2007
New Revision: 500862

URL: http://svn.apache.org/viewvc?view=rev&rev=500862
Log:
Updated support for configurable Cursor types from the Destination Policy Map

Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingSubscriberMessageStoragePolicy.java   (with props)
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingSubscriberMessageStoragePolicy.java   (with props)
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingSubscriberMessageStoragePolicy.java   (with props)
    incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/policy/cursor.xml
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorQueueStoreTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaQueueStoreTest.java
    incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/perf/slowConsumerBroker.xml

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java Sun Jan 28 11:39:02 2007
@@ -254,20 +254,6 @@
      * @param adminConnectionContext 
      */
     public abstract void setAdminConnectionContext(ConnectionContext adminConnectionContext);
-    
-    /**
-     * @return the pendingDurableSubscriberPolicy
-     */
-    public abstract PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy();
-  
-    /**
-     * @param pendingDurableSubscriberPolicy the pendingDurableSubscriberPolicy to set
-     */
-    public abstract void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy);
-    /**
-     * @return the broker's temp data store
-     * @throws Exception
-     */
-    
+        
     public Store getTempDataStore();
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java Sun Jan 28 11:39:02 2007
@@ -234,15 +234,6 @@
         next.setAdminConnectionContext(adminConnectionContext);
     }
     
-    public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy() {
-        return next.getPendingDurableSubscriberPolicy();
-    }
-  
-    public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy) {
-        next.setPendingDurableSubscriberPolicy(pendingDurableSubscriberPolicy);
-    }
-   
-
     public Store getTempDataStore() {
         return next.getTempDataStore();
     }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Sun Jan 28 11:39:02 2007
@@ -55,7 +55,6 @@
 import org.apache.activemq.broker.region.RegionBroker;
 import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
 import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.broker.region.policy.StorePendingDurableSubscriberMessageStoragePolicy;
 import org.apache.activemq.broker.region.virtual.VirtualDestination;
 import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
 import org.apache.activemq.broker.region.virtual.VirtualTopic;
@@ -149,8 +148,6 @@
     private Store tempDataStore;
     private int persistenceThreadPriority = Thread.MAX_PRIORITY;
     private boolean useLocalHostBrokerName = false;
-    //private PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy = new VMPendingDurableSubscriberMessageStoragePolicy();
-    private PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy = new StorePendingDurableSubscriberMessageStoragePolicy();
     
 
    
@@ -1008,24 +1005,7 @@
     public void setPersistenceThreadPriority(int persistenceThreadPriority){
         this.persistenceThreadPriority=persistenceThreadPriority;
     }
-    
-    /**
-     * @return the pendingDurableSubscriberPolicy
-     */
-    public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy(){
-        return this.pendingDurableSubscriberPolicy;
-    }
-  
-    /**
-     * @param pendingDurableSubscriberPolicy the pendingDurableSubscriberPolicy to set
-     */
-    public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy){
-        this.pendingDurableSubscriberPolicy=pendingDurableSubscriberPolicy;
-        if (broker != null) {
-            broker.setPendingDurableSubscriberPolicy(pendingDurableSubscriberPolicy);
-        }
-    }
-    
+        
     /**
      * @return the useLocalHostBrokerName
      */
@@ -1296,7 +1276,6 @@
         
         regionBroker.setKeepDurableSubsActive(keepDurableSubsActive);
 		regionBroker.setBrokerName(getBrokerName());
-        regionBroker.setPendingDurableSubscriberPolicy(getPendingDurableSubscriberPolicy());
 		return regionBroker;
 	}
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java Sun Jan 28 11:39:02 2007
@@ -232,12 +232,6 @@
         return null;
     }
     
-    public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy() {
-        return null;
-    }
-  
-    public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy) {
-    }
     
     public Store getTempDataStore() {
         return null;

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java Sun Jan 28 11:39:02 2007
@@ -231,15 +231,7 @@
     public Response messagePull(ConnectionContext context, MessagePull pull) {
         throw new BrokerStoppedException(this.message);
     }
-    
-    public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy() {
-        throw new BrokerStoppedException(this.message);
-    }
-  
-    public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy) {
-        throw new BrokerStoppedException(this.message);
-    }
-    
+        
     public Store getTempDataStore() {
         throw new BrokerStoppedException(this.message);
     }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java Sun Jan 28 11:39:02 2007
@@ -246,14 +246,6 @@
         return getNext().messagePull(context, pull);
     }
     
-    public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy() {
-        return getNext().getPendingDurableSubscriberPolicy();
-    }
-  
-    public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy) {
-        getNext().setPendingDurableSubscriberPolicy(pendingDurableSubscriberPolicy);
-    }
-    
     public Store getTempDataStore() {
         return getNext().getTempDataStore();
     }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Sun Jan 28 11:39:02 2007
@@ -24,10 +24,12 @@
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
+import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.util.SubscriptionKey;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -40,8 +42,8 @@
     private final boolean keepDurableSubsActive;
     private boolean active=false;
     
-    public DurableTopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive,PendingMessageCursor cursor) throws InvalidSelectorException {
-        super(broker,context,info,cursor);
+    public DurableTopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive) throws InvalidSelectorException {
+        super(broker,context,info,new StoreDurableSubscriberCursor(context.getClientId(),info.getSubscriptionName(),broker.getTempDataStore(),info.getPrefetchSize()));
         this.keepDurableSubsActive = keepDurableSubsActive;
         subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
     }
@@ -70,7 +72,7 @@
         dispatchMatched();
     }
    
-    public void activate(ConnectionContext context, ConsumerInfo info) throws Exception {
+    public void activate(UsageManager memoryManager,ConnectionContext context, ConsumerInfo info) throws Exception {
         log.debug("Deactivating " + this);
         if( !active ) {
             this.active = true;
@@ -83,6 +85,7 @@
                 }
             }
             synchronized(pending) {
+                pending.setUsageManager(memoryManager);
                 pending.start();
             }
             //If nothing was in the persistent store, then try to use the recovery policy.

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Sun Jan 28 11:39:02 2007
@@ -49,7 +49,7 @@
 abstract public class PrefetchSubscription extends AbstractSubscription{
 
     static private final Log log=LogFactory.getLog(PrefetchSubscription.class);
-    final protected PendingMessageCursor pending;
+    protected PendingMessageCursor pending;
     final protected LinkedList dispatched=new LinkedList();
     protected int prefetchExtension=0;
     protected long enqueueCounter;
@@ -342,6 +342,17 @@
     public boolean isRecoveryRequired(){
         return pending.isRecoveryRequired();
     }
+    
+   
+    public PendingMessageCursor getPending(){
+        return this.pending;
+    }
+
+    public void setPending(PendingMessageCursor pending){
+        this.pending=pending;
+    }
+    
+   
 
     /**
      * optimize message consumer prefetch if the consumer supports it
@@ -506,4 +517,7 @@
     protected void acknowledge(ConnectionContext context,final MessageAck ack,final MessageReference node)
             throws IOException{
     }
+
+    
+    
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Sun Jan 28 11:39:02 2007
@@ -22,10 +22,9 @@
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
-
+import java.util.concurrent.CopyOnWriteArrayList;
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
-
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
 import org.apache.activemq.broker.region.cursors.StoreQueueCursor;
@@ -58,8 +57,6 @@
 import org.apache.activemq.util.BrokerSupport;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
-import java.util.concurrent.CopyOnWriteArrayList;
 
 /**
  * The Queue is a List of MessageEntry objects that are dispatched to matching

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Sun Jan 28 11:39:02 2007
@@ -95,7 +95,7 @@
     private ConnectionContext adminConnectionContext;
     protected DestinationFactory destinationFactory;
     protected final ConcurrentHashMap connectionStates = new ConcurrentHashMap();
-    private PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy = new VMPendingDurableSubscriberMessageStoragePolicy();
+    
         
     public RegionBroker(BrokerService brokerService,TaskRunnerFactory taskRunnerFactory, UsageManager memoryManager, DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor) throws IOException {
         this.brokerService = brokerService;
@@ -587,16 +587,5 @@
 
     public Store getTempDataStore() {
         return brokerService.getTempDataStore();
-    }
-    
-    /**
-     * @return the pendingDurableSubscriberPolicy
-     */
-    public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy(){
-        return this.pendingDurableSubscriberPolicy;
-    }
-  
-    public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy durableSubscriberCursor){
-        this.pendingDurableSubscriberPolicy=durableSubscriberCursor;
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java Sun Jan 28 11:39:02 2007
@@ -1,51 +1,70 @@
 /**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
  * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
+
 package org.apache.activemq.broker.region;
 
 import javax.jms.JMSException;
-
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * 
  * @version $Revision: 1.7 $
  */
-public class TempTopicRegion extends AbstractRegion {
+public class TempTopicRegion extends AbstractRegion{
 
-    public TempTopicRegion(RegionBroker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
-        super(broker,destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
+    private static final Log log=LogFactory.getLog(TempTopicRegion.class);
+
+    public TempTopicRegion(RegionBroker broker,DestinationStatistics destinationStatistics,UsageManager memoryManager,
+            TaskRunnerFactory taskRunnerFactory,DestinationFactory destinationFactory){
+        super(broker,destinationStatistics,memoryManager,taskRunnerFactory,destinationFactory);
         setAutoCreateDestinations(false);
     }
 
-    protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException {
-        if( info.isDurable() ) {
+    protected Subscription createSubscription(ConnectionContext context,ConsumerInfo info) throws JMSException{
+        if(info.isDurable()){
             throw new JMSException("A durable subscription cannot be created for a temporary topic.");
-        } else {
-            return new TopicSubscription(broker,context, info, this.memoryManager);
         }
-    }
-        
-    public String toString() {
-        return "TempTopicRegion: destinations="+destinations.size()+", subscriptions="+subscriptions.size()+", memory="+memoryManager.getPercentUsage()+"%";
+        try{
+            TopicSubscription answer=new TopicSubscription(broker,context,info,memoryManager);
+            // lets configure the subscription depending on the destination
+            ActiveMQDestination destination=info.getDestination();
+            if(destination!=null&&broker.getDestinationPolicy()!=null){
+                PolicyEntry entry=broker.getDestinationPolicy().getEntryFor(destination);
+                if(entry!=null){
+                    entry.configure(broker,memoryManager,answer);
+                }
+            }
+            answer.init();
+            return answer;
+        }catch(Exception e){
+            log.error("Failed to create TopicSubscription ",e);
+            JMSException jmsEx=new JMSException("Couldn't create TopicSubscription");
+            jmsEx.setLinkedException(e);
+            throw jmsEx;
+        }
     }
 
-
+    public String toString(){
+        return "TempTopicRegion: destinations="+destinations.size()+", subscriptions="+subscriptions.size()+", memory="
+                +memoryManager.getPercentUsage()+"%";
+    }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java Sun Jan 28 11:39:02 2007
@@ -104,7 +104,7 @@
                             +" subscriberName: "+key.getSubscriptionName());
                 }
             }
-            sub.activate(context,info);
+            sub.activate(memoryManager,context,info);
             return sub;
         }else{
             return super.addConsumer(context,info);
@@ -208,38 +208,45 @@
         }
     }
 
-    protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException {
-        if (info.isDurable()) {
-            if (AdvisorySupport.isAdvisoryTopic(info.getDestination())){
+    protected Subscription createSubscription(ConnectionContext context,ConsumerInfo info) throws JMSException{
+        if(info.isDurable()){
+            if(AdvisorySupport.isAdvisoryTopic(info.getDestination())){
                 throw new JMSException("Cannot create a durable subscription for an advisory Topic");
             }
-            SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
-            DurableTopicSubscription sub = (DurableTopicSubscription) durableSubscriptions.get(key);
+            SubscriptionKey key=new SubscriptionKey(context.getClientId(),info.getSubscriptionName());
+            DurableTopicSubscription sub=(DurableTopicSubscription)durableSubscriptions.get(key);
             if(sub==null){
-                PendingMessageCursor cursor=broker.getPendingDurableSubscriberPolicy().getSubscriberPendingMessageCursor(
-                        context.getClientId(),info.getSubscriptionName(),broker.getTempDataStore(),
-                        info.getPrefetchSize());
-                cursor.setUsageManager(memoryManager);
-                sub=new DurableTopicSubscription(broker,context,info,keepDurableSubsActive,cursor);
+                sub=new DurableTopicSubscription(broker,context,info,keepDurableSubsActive);
+                ActiveMQDestination destination=info.getDestination();
+                if(destination!=null&&broker.getDestinationPolicy()!=null){
+                    PolicyEntry entry=broker.getDestinationPolicy().getEntryFor(destination);
+                    if(entry!=null){
+                        entry.configure(broker,memoryManager,sub);
+                    }
+                }
                 durableSubscriptions.put(key,sub);
-            }
-            else {
+            }else{
                 throw new JMSException("That durable subscription is already active.");
             }
             return sub;
         }
-        else {
-            TopicSubscription answer = new TopicSubscription(broker,context, info, memoryManager);
-            
+        try{
+            TopicSubscription answer=new TopicSubscription(broker,context,info,memoryManager);
             // lets configure the subscription depending on the destination
-            ActiveMQDestination destination = info.getDestination();
-            if (destination != null && broker.getDestinationPolicy() != null) {
-                PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
-                if (entry != null) {
-                    entry.configure(answer);
+            ActiveMQDestination destination=info.getDestination();
+            if(destination!=null&&broker.getDestinationPolicy()!=null){
+                PolicyEntry entry=broker.getDestinationPolicy().getEntryFor(destination);
+                if(entry!=null){
+                    entry.configure(broker,memoryManager,answer);
                 }
             }
+            answer.init();
             return answer;
+        }catch(Exception e){
+            log.error("Failed to create TopicSubscription ",e);
+            JMSException jmsEx=new JMSException("Couldn't create TopicSubscription");
+            jmsEx.setLinkedException(e);
+            throw jmsEx;
         }
     }
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java Sun Jan 28 11:39:02 2007
@@ -22,6 +22,7 @@
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
+import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
 import org.apache.activemq.broker.region.policy.MessageEvictionStrategy;
 import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
 import org.apache.activemq.command.ConsumerControl;
@@ -41,7 +42,7 @@
 
     private static final Log log=LogFactory.getLog(TopicSubscription.class);
     private static final AtomicLong cursorNameCounter=new AtomicLong(0);
-    final protected FilePendingMessageCursor matched;
+    protected PendingMessageCursor matched;
     final protected UsageManager usageManager;
     protected AtomicLong dispatched=new AtomicLong();
     protected AtomicLong delivered=new AtomicLong();
@@ -56,17 +57,21 @@
     private int memoryUsageHighWaterMark=95;
 
     public TopicSubscription(Broker broker,ConnectionContext context,ConsumerInfo info,UsageManager usageManager)
-            throws InvalidSelectorException{
+            throws Exception{
         super(broker,context,info);
         this.usageManager=usageManager;
         String matchedName="TopicSubscription:"+cursorNameCounter.getAndIncrement()+"["+info.getConsumerId().toString()
                 +"]";
         this.matched=new FilePendingMessageCursor(matchedName,broker.getTempDataStore());
+       
+    }
+    
+    public void init() throws Exception {
         this.matched.setUsageManager(usageManager);
         this.matched.start();
     }
-
-    public void add(MessageReference node) throws InterruptedException,IOException{
+    
+    public void add(MessageReference node) throws Exception{
         enqueueCounter.incrementAndGet();
         node.incrementReferenceCount();
         if(!isFull()&&!isSlaveBroker()){
@@ -309,6 +314,20 @@
     public UsageManager getUsageManager(){
         return this.usageManager;
     }
+    
+    /**
+     * @return the matched
+     */
+    public PendingMessageCursor getMatched(){
+        return this.matched;
+    }
+
+    /**
+     * @param matched the matched to set
+     */
+    public void setMatched(PendingMessageCursor matched){
+        this.matched=matched;
+    }
 
     /**
      * inform the MessageConsumer on the client to change it's prefetch
@@ -402,7 +421,14 @@
 
     public void destroy(){
         synchronized(matchedListMutex){
-            matched.destroy();
+            try{
+                matched.destroy();
+            }catch(Exception e){
+               log.warn("Failed to destroy cursor",e);
+            }
         }
     }
+
+    
+    
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java Sun Jan 28 11:39:02 2007
@@ -14,6 +14,7 @@
 
 package org.apache.activemq.broker.region.cursors;
 
+import java.util.LinkedList;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
@@ -149,5 +150,22 @@
      */
     public UsageManager getUsageManager(){
         return this.usageManager;
+    }
+    
+    /**
+     * destroy the cursor
+     * @throws Exception 
+     */
+    public void destroy() throws Exception {
+        stop();
+    }
+    
+    /**
+     * Page in a restricted number of messages
+     * @param maxItems
+     * @return a list of paged in messages
+     */
+    public LinkedList pageInList(int maxItems) {
+        throw new RuntimeException("Not supported");
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java Sun Jan 28 11:39:02 2007
@@ -18,6 +18,7 @@
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.command.Message;
@@ -39,6 +40,7 @@
 public class FilePendingMessageCursor extends AbstractPendingMessageCursor implements UsageListener{
 
     static private final Log log=LogFactory.getLog(FilePendingMessageCursor.class);
+    static private final AtomicLong nameCount = new AtomicLong();
     private Store store;
     private String name;
     private LinkedList memoryList=new LinkedList();
@@ -54,7 +56,7 @@
      * @param store
      */
     public FilePendingMessageCursor(String name,Store store){
-        this.name=name;
+        this.name=nameCount.incrementAndGet() + "_"+name;
         this.store=store;
     }
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java Sun Jan 28 11:39:02 2007
@@ -14,6 +14,7 @@
 package org.apache.activemq.broker.region.cursors;
 
 import java.io.IOException;
+import java.util.LinkedList;
 
 import org.apache.activemq.Service;
 import org.apache.activemq.broker.ConnectionContext;
@@ -188,6 +189,19 @@
      * @return true if the cursor has buffered messages ready to deliver
      */
     public boolean hasMessagesBufferedToDeliver();
+    
+    /**
+     * destroy the cursor
+     * @throws Exception 
+     */
+    public void destroy() throws Exception;
+    
+    /**
+     * Page in a restricted number of messages
+     * @param maxItems
+     * @return a list of paged in messages
+     */
+    public LinkedList pageInList(int maxItems);
     
     
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java Sun Jan 28 11:39:02 2007
@@ -103,4 +103,13 @@
             }
         }
     }
+    
+    /**
+     * Page in a restricted number of messages
+     * @param maxItems
+     * @return a list of paged in messages
+     */
+    public LinkedList pageInList(int maxItems) {
+        return list;
+    }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java Sun Jan 28 11:39:02 2007
@@ -23,7 +23,7 @@
 /**
  * Creates a FilePendingMessageCursor
  *  *
- * @org.apache.xbean.XBean element="fileCursor" description="Pending messages paged in from file"
+ * @org.apache.xbean.XBean element="fileQueueCursor" description="Pending messages paged in from file"
  * 
  * @version $Revision$
  */

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingSubscriberMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingSubscriberMessageStoragePolicy.java?view=auto&rev=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingSubscriberMessageStoragePolicy.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingSubscriberMessageStoragePolicy.java Sun Jan 28 11:39:02 2007
@@ -0,0 +1,41 @@
+/**
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.activemq.broker.region.policy;
+
+import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
+import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
+import org.apache.activemq.kaha.Store;
+
+
+/**
+ * Creates a PendIngMessageCursor for Durable subscribers
+ *  *
+ * @org.apache.xbean.XBean element="fileCursor" description="Pending messages for durable subscribers
+ *                         held in temporary files"
+ * 
+ * @version $Revision$
+ */
+public class FilePendingSubscriberMessageStoragePolicy implements PendingSubscriberMessageStoragePolicy{
+
+    /**
+     * @param name
+     * @param tmpStorage
+     * @param maxBatchSize
+     * @return a Cursor
+     * @see org.apache.activemq.broker.region.policy.PendingSubscriberMessageStoragePolicy#getSubscriberPendingMessageCursor(java.lang.String, org.apache.activemq.kaha.Store, int)
+     */
+    public PendingMessageCursor getSubscriberPendingMessageCursor(String name,Store tmpStorage,int maxBatchSize){
+        return new FilePendingMessageCursor("PendingCursor:" + name,tmpStorage);
+    }
+}
\ No newline at end of file

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingSubscriberMessageStoragePolicy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingSubscriberMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingSubscriberMessageStoragePolicy.java?view=auto&rev=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingSubscriberMessageStoragePolicy.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingSubscriberMessageStoragePolicy.java Sun Jan 28 11:39:02 2007
@@ -0,0 +1,38 @@
+/**
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.activemq.broker.region.policy;
+
+import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
+import org.apache.activemq.kaha.Store;
+
+
+/**
+* Abstraction to allow different policies for holding messages awaiting dispatch to active clients
+* 
+* @version $Revision$
+*/
+public interface PendingSubscriberMessageStoragePolicy{
+    
+    /**
+     * Retrieve the configured pending message storage cursor;
+     * 
+     * @param name
+     * @param tmpStorage
+     * @param maxBatchSize
+     * @return the Pending Message cursor
+     */
+    public PendingMessageCursor getSubscriberPendingMessageCursor(String name,Store tmpStorage,
+            int maxBatchSize);
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingSubscriberMessageStoragePolicy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java Sun Jan 28 11:39:02 2007
@@ -1,22 +1,21 @@
 /**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
  * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
+
 package org.apache.activemq.broker.region.policy;
 
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.region.DurableTopicSubscription;
 import org.apache.activemq.broker.region.Queue;
 import org.apache.activemq.broker.region.Topic;
 import org.apache.activemq.broker.region.TopicSubscription;
@@ -25,21 +24,21 @@
 import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
 import org.apache.activemq.filter.DestinationMapEntry;
 import org.apache.activemq.kaha.Store;
+import org.apache.activemq.memory.UsageManager;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 /**
- * Represents an entry in a {@link PolicyMap} for assigning policies to a
- * specific destination or a hierarchical wildcard area of destinations.
+ * Represents an entry in a {@link PolicyMap} for assigning policies to a specific destination or a hierarchical
+ * wildcard area of destinations.
  * 
  * @org.apache.xbean.XBean
  * 
  * @version $Revision: 1.1 $
  */
-public class PolicyEntry extends DestinationMapEntry {
+public class PolicyEntry extends DestinationMapEntry{
 
-    private static final Log log = LogFactory.getLog(PolicyEntry.class);
-    
+    private static final Log log=LogFactory.getLog(PolicyEntry.class);
     private DispatchPolicy dispatchPolicy;
     private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
     private boolean sendAdvisoryIfNoConsumers;
@@ -48,135 +47,149 @@
     private MessageEvictionStrategy messageEvictionStrategy;
     private long memoryLimit;
     private MessageGroupMapFactory messageGroupMapFactory;
-    private PendingQueueMessageStoragePolicy pendingQueueMessageStoragePolicy;
-    
-    public void configure(Queue queue, Store tmpStore) {
-        if (dispatchPolicy != null) {
+    private PendingQueueMessageStoragePolicy pendingQueuePolicy;
+    private PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy;
+    private PendingSubscriberMessageStoragePolicy pendingSubscriberPolicy;
+    public void configure(Queue queue,Store tmpStore){
+        if(dispatchPolicy!=null){
             queue.setDispatchPolicy(dispatchPolicy);
         }
-        if (deadLetterStrategy != null) {
+        if(deadLetterStrategy!=null){
             queue.setDeadLetterStrategy(deadLetterStrategy);
         }
         queue.setMessageGroupMapFactory(getMessageGroupMapFactory());
-        if( memoryLimit>0 ) {
+        if(memoryLimit>0){
             queue.getUsageManager().setLimit(memoryLimit);
         }
-        if (pendingQueueMessageStoragePolicy != null) {
-            PendingMessageCursor messages = pendingQueueMessageStoragePolicy.getQueuePendingMessageCursor(queue,tmpStore);
+        if(pendingQueuePolicy!=null){
+            PendingMessageCursor messages=pendingQueuePolicy.getQueuePendingMessageCursor(queue,tmpStore);
             queue.setMessages(messages);
         }
     }
 
-    public void configure(Topic topic) {
-        if (dispatchPolicy != null) {
+    public void configure(Topic topic){
+        if(dispatchPolicy!=null){
             topic.setDispatchPolicy(dispatchPolicy);
         }
-        if (deadLetterStrategy != null) {
+        if(deadLetterStrategy!=null){
             topic.setDeadLetterStrategy(deadLetterStrategy);
         }
-        if (subscriptionRecoveryPolicy != null) {
+        if(subscriptionRecoveryPolicy!=null){
             topic.setSubscriptionRecoveryPolicy(subscriptionRecoveryPolicy.copy());
         }
         topic.setSendAdvisoryIfNoConsumers(sendAdvisoryIfNoConsumers);
-        if( memoryLimit>0 ) {
+        if(memoryLimit>0){
             topic.getUsageManager().setLimit(memoryLimit);
         }
-        
     }
 
-    public void configure(TopicSubscription subscription) {
-        if (pendingMessageLimitStrategy != null) {
-            int value = pendingMessageLimitStrategy.getMaximumPendingMessageLimit(subscription);
-            int consumerLimit = subscription.getInfo().getMaximumPendingMessageLimit();
-            if (consumerLimit > 0) {
-                if (value < 0 || consumerLimit < value) {
-                    value = consumerLimit;
+    public void configure(Broker broker,UsageManager memoryManager,TopicSubscription subscription){
+        if(pendingMessageLimitStrategy!=null){
+            int value=pendingMessageLimitStrategy.getMaximumPendingMessageLimit(subscription);
+            int consumerLimit=subscription.getInfo().getMaximumPendingMessageLimit();
+            if(consumerLimit>0){
+                if(value<0||consumerLimit<value){
+                    value=consumerLimit;
                 }
             }
-            if (value >= 0) {
-                if (log.isDebugEnabled()) {
-                    log.debug("Setting the maximumPendingMessages size to: " + value + " for consumer: " + subscription.getInfo().getConsumerId());
+            if(value>=0){
+                if(log.isDebugEnabled()){
+                    log.debug("Setting the maximumPendingMessages size to: "+value+" for consumer: "
+                            +subscription.getInfo().getConsumerId());
                 }
                 subscription.setMaximumPendingMessages(value);
             }
         }
-        if (messageEvictionStrategy != null) {
+        if(messageEvictionStrategy!=null){
             subscription.setMessageEvictionStrategy(messageEvictionStrategy);
         }
+        if (pendingSubscriberPolicy!=null) {
+            String name = subscription.getContext().getClientId() + "_" + subscription.getConsumerInfo().getConsumerId();
+            int maxBatchSize = subscription.getConsumerInfo().getPrefetchSize();
+            subscription.setMatched(pendingSubscriberPolicy.getSubscriberPendingMessageCursor(name,broker.getTempDataStore(),maxBatchSize));
+        }
+    }
+
+    public void configure(Broker broker,UsageManager memoryManager,DurableTopicSubscription sub){
+        String clientId=sub.getClientId();
+        String subName=sub.getSubscriptionName();
+        int prefetch=sub.getPrefetchSize();
+        if(pendingDurableSubscriberPolicy!=null){
+            PendingMessageCursor cursor=pendingDurableSubscriberPolicy.getSubscriberPendingMessageCursor(clientId,
+                    subName,broker.getTempDataStore(),prefetch);
+            cursor.setUsageManager(memoryManager);
+            sub.setPending(cursor);
+        }
     }
 
     // Properties
     // -------------------------------------------------------------------------
-    public DispatchPolicy getDispatchPolicy() {
+    public DispatchPolicy getDispatchPolicy(){
         return dispatchPolicy;
     }
 
-    public void setDispatchPolicy(DispatchPolicy policy) {
-        this.dispatchPolicy = policy;
+    public void setDispatchPolicy(DispatchPolicy policy){
+        this.dispatchPolicy=policy;
     }
 
-    public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy() {
+    public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy(){
         return subscriptionRecoveryPolicy;
     }
 
-    public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy subscriptionRecoveryPolicy) {
-        this.subscriptionRecoveryPolicy = subscriptionRecoveryPolicy;
+    public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy subscriptionRecoveryPolicy){
+        this.subscriptionRecoveryPolicy=subscriptionRecoveryPolicy;
     }
 
-    public boolean isSendAdvisoryIfNoConsumers() {
+    public boolean isSendAdvisoryIfNoConsumers(){
         return sendAdvisoryIfNoConsumers;
     }
 
     /**
-     * Sends an advisory message if a non-persistent message is sent and there
-     * are no active consumers
+     * Sends an advisory message if a non-persistent message is sent and there are no active consumers
      */
-    public void setSendAdvisoryIfNoConsumers(boolean sendAdvisoryIfNoConsumers) {
-        this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers;
+    public void setSendAdvisoryIfNoConsumers(boolean sendAdvisoryIfNoConsumers){
+        this.sendAdvisoryIfNoConsumers=sendAdvisoryIfNoConsumers;
     }
 
-    public DeadLetterStrategy getDeadLetterStrategy() {
+    public DeadLetterStrategy getDeadLetterStrategy(){
         return deadLetterStrategy;
     }
 
     /**
-     * Sets the policy used to determine which dead letter queue destination
-     * should be used
+     * Sets the policy used to determine which dead letter queue destination should be used
      */
-    public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) {
-        this.deadLetterStrategy = deadLetterStrategy;
+    public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy){
+        this.deadLetterStrategy=deadLetterStrategy;
     }
 
-    public PendingMessageLimitStrategy getPendingMessageLimitStrategy() {
+    public PendingMessageLimitStrategy getPendingMessageLimitStrategy(){
         return pendingMessageLimitStrategy;
     }
 
     /**
-     * Sets the strategy to calculate the maximum number of messages that are
-     * allowed to be pending on consumers (in addition to their prefetch sizes).
+     * Sets the strategy to calculate the maximum number of messages that are allowed to be pending on consumers (in
+     * addition to their prefetch sizes).
      * 
-     * Once the limit is reached, non-durable topics can then start discarding
-     * old messages. This allows us to keep dispatching messages to slow
-     * consumers while not blocking fast consumers and discarding the messages
-     * oldest first.
+     * Once the limit is reached, non-durable topics can then start discarding old messages. This allows us to keep
+     * dispatching messages to slow consumers while not blocking fast consumers and discarding the messages oldest
+     * first.
      */
-    public void setPendingMessageLimitStrategy(PendingMessageLimitStrategy pendingMessageLimitStrategy) {
-        this.pendingMessageLimitStrategy = pendingMessageLimitStrategy;
+    public void setPendingMessageLimitStrategy(PendingMessageLimitStrategy pendingMessageLimitStrategy){
+        this.pendingMessageLimitStrategy=pendingMessageLimitStrategy;
     }
 
-    public MessageEvictionStrategy getMessageEvictionStrategy() {
+    public MessageEvictionStrategy getMessageEvictionStrategy(){
         return messageEvictionStrategy;
     }
 
     /**
-     * Sets the eviction strategy used to decide which message to evict when the
-     * slow consumer needs to discard messages
+     * Sets the eviction strategy used to decide which message to evict when the slow consumer needs to discard messages
      */
-    public void setMessageEvictionStrategy(MessageEvictionStrategy messageEvictionStrategy) {
-        this.messageEvictionStrategy = messageEvictionStrategy;
+    public void setMessageEvictionStrategy(MessageEvictionStrategy messageEvictionStrategy){
+        this.messageEvictionStrategy=messageEvictionStrategy;
     }
 
-    public long getMemoryLimit() {
+    public long getMemoryLimit(){
         return memoryLimit;
     }
 
@@ -184,40 +197,72 @@
      * 
      * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
      */
-    public void setMemoryLimit(long memoryLimit) {
-        this.memoryLimit = memoryLimit;
+    public void setMemoryLimit(long memoryLimit){
+        this.memoryLimit=memoryLimit;
     }
 
-    public MessageGroupMapFactory getMessageGroupMapFactory() {
-        if (messageGroupMapFactory == null) {
-            messageGroupMapFactory = new MessageGroupHashBucketFactory(); 
+    public MessageGroupMapFactory getMessageGroupMapFactory(){
+        if(messageGroupMapFactory==null){
+            messageGroupMapFactory=new MessageGroupHashBucketFactory();
         }
         return messageGroupMapFactory;
     }
 
     /**
-     * Sets the factory used to create new instances of {MessageGroupMap} used to implement the 
-     * <a href="http://incubator.apache.org/activemq/message-groups.html">Message Groups</a> functionality.
+     * Sets the factory used to create new instances of {MessageGroupMap} used to implement the <a
+     * href="http://incubator.apache.org/activemq/message-groups.html">Message Groups</a> functionality.
+     */
+    public void setMessageGroupMapFactory(MessageGroupMapFactory messageGroupMapFactory){
+        this.messageGroupMapFactory=messageGroupMapFactory;
+    }
+
+    
+    /**
+     * @return the pendingDurableSubscriberPolicy
      */
-    public void setMessageGroupMapFactory(MessageGroupMapFactory messageGroupMapFactory) {
-        this.messageGroupMapFactory = messageGroupMapFactory;
+    public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy(){
+        return this.pendingDurableSubscriberPolicy;
     }
 
     
     /**
-     * @return the pendingQueueMessageStoragePolicy
+     * @param pendingDurableSubscriberPolicy the pendingDurableSubscriberPolicy to set
      */
-    public PendingQueueMessageStoragePolicy getPendingQueueMessageStoragePolicy(){
-        return this.pendingQueueMessageStoragePolicy;
+    public void setPendingDurableSubscriberPolicy(
+            PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy){
+        this.pendingDurableSubscriberPolicy=pendingDurableSubscriberPolicy;
     }
 
     
     /**
-     * @param pendingQueueMessageStoragePolicy the pendingQueueMessageStoragePolicy to set
+     * @return the pendingQueuePolicy
      */
-    public void setPendingQueueMessageStoragePolicy(PendingQueueMessageStoragePolicy pendingQueueMessageStoragePolicy){
-        this.pendingQueueMessageStoragePolicy=pendingQueueMessageStoragePolicy;
+    public PendingQueueMessageStoragePolicy getPendingQueuePolicy(){
+        return this.pendingQueuePolicy;
     }
 
     
+    /**
+     * @param pendingQueuePolicy the pendingQueuePolicy to set
+     */
+    public void setPendingQueuePolicy(PendingQueueMessageStoragePolicy pendingQueuePolicy){
+        this.pendingQueuePolicy=pendingQueuePolicy;
+    }
+
+    
+    /**
+     * @return the pendingSubscriberPolicy
+     */
+    public PendingSubscriberMessageStoragePolicy getPendingSubscriberPolicy(){
+        return this.pendingSubscriberPolicy;
+    }
+
+    
+    /**
+     * @param pendingSubscriberPolicy the pendingSubscriberPolicy to set
+     */
+    public void setPendingSubscriberPolicy(PendingSubscriberMessageStoragePolicy pendingSubscriberPolicy){
+        this.pendingSubscriberPolicy=pendingSubscriberPolicy;
+    }
+
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java Sun Jan 28 11:39:02 2007
@@ -21,7 +21,7 @@
 /**
  * Creates a VMPendingMessageCursor
  *  *
- * @org.apache.xbean.XBean element="vmCursor" description="Pending messages held in the JVM"
+ * @org.apache.xbean.XBean element="vmDurableCursor" description="Pending messages held in the JVM"
  * 
  * @version $Revision$
  */

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java Sun Jan 28 11:39:02 2007
@@ -22,7 +22,7 @@
 /**
  * Creates a VMPendingMessageCursor
  *  *
- * @org.apache.xbean.XBean element="vmCursor" description="Pending messages held in the JVM"
+ * @org.apache.xbean.XBean element="vmQueueCursor" description="Pending messages held in the JVM"
  * 
  * @version $Revision$
  */

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingSubscriberMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingSubscriberMessageStoragePolicy.java?view=auto&rev=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingSubscriberMessageStoragePolicy.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingSubscriberMessageStoragePolicy.java Sun Jan 28 11:39:02 2007
@@ -0,0 +1,40 @@
+/**
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.activemq.broker.region.policy;
+
+import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
+import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
+import org.apache.activemq.kaha.Store;
+
+
+/**
+ * Creates a VMPendingMessageCursor
+ *  *
+ * @org.apache.xbean.XBean element="vmCursor" description="Pending messages held in the JVM"
+ * 
+ * @version $Revision$
+ */
+public class VMPendingSubscriberMessageStoragePolicy implements PendingSubscriberMessageStoragePolicy{
+
+    /**
+     * @param name
+     * @param tmpStorage
+     * @param maxBatchSize
+     * @return a Cursor
+     * @see org.apache.activemq.broker.region.policy.PendingSubscriberMessageStoragePolicy#getSubscriberPendingMessageCursor(java.lang.String, org.apache.activemq.kaha.Store, int)
+     */
+    public PendingMessageCursor getSubscriberPendingMessageCursor(String name,Store tmpStorage,int maxBatchSize){
+        return new VMPendingMessageCursor();
+    }
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingSubscriberMessageStoragePolicy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java Sun Jan 28 11:39:02 2007
@@ -22,7 +22,6 @@
 import javax.jms.Session;
 import javax.jms.Topic;
 import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.policy.StorePendingDurableSubscriberMessageStoragePolicy;
 
 /**
  * @version $Revision: 1.3 $
@@ -50,7 +49,6 @@
     
     protected void configureBroker(BrokerService answer) throws Exception{
         answer.setDeleteAllMessagesOnStartup(true);
-        answer.setPendingDurableSubscriberPolicy(new StorePendingDurableSubscriberMessageStoragePolicy());
         answer.addConnector(bindAddress);
         answer.setDeleteAllMessagesOnStartup(true);
     }

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorQueueStoreTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorQueueStoreTest.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorQueueStoreTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorQueueStoreTest.java Sun Jan 28 11:39:02 2007
@@ -53,7 +53,7 @@
     
     protected void configureBroker(BrokerService answer) throws Exception{
         PolicyEntry policy = new PolicyEntry();
-        policy.setPendingQueueMessageStoragePolicy(new StorePendingQueueMessageStoragePolicy());
+        policy.setPendingQueuePolicy(new StorePendingQueueMessageStoragePolicy());
         PolicyMap pMap = new PolicyMap();
         pMap.setDefaultEntry(policy);
         answer.setDestinationPolicy(pMap);

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaQueueStoreTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaQueueStoreTest.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaQueueStoreTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaQueueStoreTest.java Sun Jan 28 11:39:02 2007
@@ -38,7 +38,7 @@
         KahaPersistenceAdapter adaptor = new KahaPersistenceAdapter(new File("activemq-data/durableTest"));
         answer.setPersistenceAdapter(adaptor);
         PolicyEntry policy = new PolicyEntry();
-        policy.setPendingQueueMessageStoragePolicy(new StorePendingQueueMessageStoragePolicy());
+        policy.setPendingQueuePolicy(new StorePendingQueueMessageStoragePolicy());
         PolicyMap pMap = new PolicyMap();
         pMap.setDefaultEntry(policy);
         answer.setDestinationPolicy(pMap);

Added: incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/policy/cursor.xml
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/policy/cursor.xml?view=auto&rev=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/policy/cursor.xml (added)
+++ incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/policy/cursor.xml Sun Jan 28 11:39:02 2007
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Copyright 2005-2006 The Apache Software Foundation
+  
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+  
+  http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!-- this file can only be parsed using the xbean-spring library -->
+<!-- START SNIPPET: xbean -->
+<beans>
+  <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
+
+  <broker persistent="false" xmlns="http://activemq.org/config/1.0">
+
+    <!--  lets define the dispatch policy -->
+    <destinationPolicy>
+      <policyMap>
+        <policyEntries>
+          <policyEntry topic="org.apache.>">
+            <dispatchPolicy>
+              <strictOrderDispatchPolicy />
+            </dispatchPolicy>
+            <deadLetterStrategy>
+              <individualDeadLetterStrategy  topicPrefix="Test.DLQ." />
+            </deadLetterStrategy>
+            <pendingSubscriberPolicy>
+            	<vmCursor />
+            </pendingSubscriberPolicy>
+          </policyEntry>
+
+          <policyEntry queue="org.apache.>">
+            <dispatchPolicy>
+              <strictOrderDispatchPolicy />
+            </dispatchPolicy>
+            <deadLetterStrategy>
+              <individualDeadLetterStrategy queuePrefix="Test.DLQ."/>
+            </deadLetterStrategy>
+            <pendingQueuePolicy>
+            	<vmQueueCursor />
+            </pendingQueuePolicy>
+          </policyEntry>
+
+        </policyEntries>
+      </policyMap>
+    </destinationPolicy>
+  </broker>
+
+</beans>
+<!-- END SNIPPET: xbean -->

Modified: incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/perf/slowConsumerBroker.xml
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/perf/slowConsumerBroker.xml?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/perf/slowConsumerBroker.xml (original)
+++ incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/perf/slowConsumerBroker.xml Sun Jan 28 11:39:02 2007
@@ -18,14 +18,14 @@
 <beans>
   <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
 
-  <broker brokerName="slowConsumerBroker" persistent="true" useShutdownHook="false" xmlns="http://activemq.org/config/1.0">
+  <broker brokerName="slowConsumerBroker" useJmx="false" persistent="false" useShutdownHook="false" xmlns="http://activemq.org/config/1.0">
     <transportConnectors>
       <transportConnector uri="tcp://localhost:61616"/>
     </transportConnectors>
      <destinationPolicy>
       <policyMap>
         <policyEntries>
-          <policyEntry topic=">">            
+          <policyEntry topic="blob">            
             <!-- lets force old messages to be discarded for slow consumers -->
             <pendingMessageLimitStrategy>
               <constantPendingMessageLimitStrategy limit="10"/>