You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2007/01/25 13:23:58 UTC

svn commit: r499760 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region: TopicSubscription.java cursors/AbstractPendingMessageCursor.java cursors/FilePendingMessageCursor.java cursors/PendingMessageCursor.java

Author: rajdavies
Date: Thu Jan 25 04:23:57 2007
New Revision: 499760

URL: http://svn.apache.org/viewvc?view=rev&rev=499760
Log:
fix for memory leaks woth non-persistent messages - see http://www.nabble.com/OutOfMemoryErrors-again-tf3083798.html

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?view=diff&rev=499760&r1=499759&r2=499760
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java Thu Jan 25 04:23:57 2007
@@ -1,36 +1,29 @@
 /**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
  * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
+
 package org.apache.activemq.broker.region;
 
 import java.io.IOException;
-import java.util.Iterator;
 import java.util.LinkedList;
-
+import java.util.concurrent.atomic.AtomicLong;
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
-
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
 import org.apache.activemq.broker.region.policy.MessageEvictionStrategy;
 import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ConsumerControl;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.Message;
@@ -44,40 +37,38 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import java.util.concurrent.atomic.AtomicLong;
-
-
 public class TopicSubscription extends AbstractSubscription{
-	
+
     private static final Log log=LogFactory.getLog(TopicSubscription.class);
-    
+    private static final AtomicLong cursorNameCounter=new AtomicLong(0);
     final protected FilePendingMessageCursor matched;
-    final protected ActiveMQDestination dlqDestination=new ActiveMQQueue("ActiveMQ.DLQ");
     final protected UsageManager usageManager;
     protected AtomicLong dispatched=new AtomicLong();
     protected AtomicLong delivered=new AtomicLong();
     private int maximumPendingMessages=-1;
-    private MessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy();
-    private int discarded = 0;
+    private MessageEvictionStrategy messageEvictionStrategy=new OldestMessageEvictionStrategy();
+    private int discarded=0;
     private final Object matchedListMutex=new Object();
-    private final AtomicLong enqueueCounter = new AtomicLong(0);
-    private final AtomicLong dequeueCounter = new AtomicLong(0);
-    
+    private final AtomicLong enqueueCounter=new AtomicLong(0);
+    private final AtomicLong dequeueCounter=new AtomicLong(0);
     boolean singleDestination=true;
-    Destination destination;    
-    
+    Destination destination;
+    private int memoryUsageHighWaterMark=95;
+
     public TopicSubscription(Broker broker,ConnectionContext context,ConsumerInfo info,UsageManager usageManager)
-                    throws InvalidSelectorException{
+            throws InvalidSelectorException{
         super(broker,context,info);
         this.usageManager=usageManager;
-        this.matched = new FilePendingMessageCursor(info.getConsumerId().toString(), broker.getTempDataStore());
+        String matchedName="TopicSubscription:"+cursorNameCounter.getAndIncrement()+"["+info.getConsumerId().toString()
+                +"]";
+        this.matched=new FilePendingMessageCursor(matchedName,broker.getTempDataStore());
+        this.matched.setUsageManager(usageManager);
+        this.matched.start();
     }
 
     public void add(MessageReference node) throws InterruptedException,IOException{
-        
-        enqueueCounter.incrementAndGet();        
+        enqueueCounter.incrementAndGet();
         node.incrementReferenceCount();
-        
         if(!isFull()&&!isSlaveBroker()){
             optimizePrefetch();
             // if maximumPendingMessages is set we will only discard messages which
@@ -88,40 +79,37 @@
                 synchronized(matchedListMutex){
                     matched.addMessageLast(node);
                     // NOTE - be careful about the slaveBroker!
-                    if (maximumPendingMessages > 0) {
-                        
+                    if(maximumPendingMessages>0){
                         // calculate the high water mark from which point we will eagerly evict expired messages
-                        int max = messageEvictionStrategy.getEvictExpiredMessagesHighWatermark();
-                        if (maximumPendingMessages > 0 && maximumPendingMessages < max) {
-                            max = maximumPendingMessages;
+                        int max=messageEvictionStrategy.getEvictExpiredMessagesHighWatermark();
+                        if(maximumPendingMessages>0&&maximumPendingMessages<max){
+                            max=maximumPendingMessages;
                         }
-                        if (!matched.isEmpty() && matched.size() > max) {
+                        if(!matched.isEmpty()&&matched.size()>max){
                             removeExpiredMessages();
                         }
-
                         // lets discard old messages as we are a slow consumer
-                        while (!matched.isEmpty() && matched.size() > maximumPendingMessages) {
-                            int pageInSize = matched.size() - maximumPendingMessages;
-                            //only page in a 1000 at a time - else we could blow da memory
-                            pageInSize = Math.max(1000,pageInSize);
-                            LinkedList list = matched.pageInList(pageInSize);
-                            MessageReference[] oldMessages = messageEvictionStrategy.evictMessages(list);
-                            int messagesToEvict = oldMessages.length;
-                            for(int i = 0; i < messagesToEvict; i++) {
-                                MessageReference oldMessage = oldMessages[i];
-                            	oldMessage.decrementReferenceCount();
+                        while(!matched.isEmpty()&&matched.size()>maximumPendingMessages){
+                            int pageInSize=matched.size()-maximumPendingMessages;
+                            // only page in a 1000 at a time - else we could blow da memory
+                            pageInSize=Math.max(1000,pageInSize);
+                            LinkedList list=matched.pageInList(pageInSize);
+                            MessageReference[] oldMessages=messageEvictionStrategy.evictMessages(list);
+                            int messagesToEvict=oldMessages.length;
+                            for(int i=0;i<messagesToEvict;i++){
+                                MessageReference oldMessage=oldMessages[i];
+                                oldMessage.decrementReferenceCount();
                                 matched.remove(oldMessage);
-                                
                                 discarded++;
-                                if (log.isDebugEnabled()) {
-                                    log.debug("Discarding message " + oldMessages[i]);
+                                if(log.isDebugEnabled()){
+                                    log.debug("Discarding message "+oldMessages[i]);
                                 }
-							}
-                            
+                            }
                             // lets avoid an infinite loop if we are given a bad eviction strategy
                             // for a bad strategy lets just not evict
-                            if (messagesToEvict == 0) {
-                                log.warn("No messages to evict returned from eviction strategy: " + messageEvictionStrategy);
+                            if(messagesToEvict==0){
+                                log.warn("No messages to evict returned from eviction strategy: "
+                                        +messageEvictionStrategy);
                                 break;
                             }
                         }
@@ -133,7 +121,8 @@
 
     /**
      * Discard any expired messages from the matched list. Called from a synchronized block.
-     * @throws IOException 
+     * 
+     * @throws IOException
      */
     protected void removeExpiredMessages() throws IOException{
         try{
@@ -172,30 +161,28 @@
     }
 
     synchronized public void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception{
-        
         // Handle the standard acknowledgment case.
         boolean wasFull=isFull();
         if(ack.isStandardAck()||ack.isPoisonAck()){
             if(context.isInTransaction()){
                 delivered.addAndGet(ack.getMessageCount());
                 context.getTransaction().addSynchronization(new Synchronization(){
+
                     public void afterCommit() throws Exception{
-                    	synchronized( TopicSubscription.this ) {
-	                    	if( singleDestination ) {
-	                    		destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
-	                    	}
-                    	}                    
+                        synchronized(TopicSubscription.this){
+                            if(singleDestination){
+                                destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
+                            }
+                        }
                         dequeueCounter.addAndGet(ack.getMessageCount());
                         dispatched.addAndGet(-ack.getMessageCount());
                         delivered.set(Math.max(0,delivered.get()-ack.getMessageCount()));
                     }
                 });
             }else{
-            	
-            	if( singleDestination ) {
-            		destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
-            	}
-            	            
+                if(singleDestination){
+                    destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
+                }
                 dequeueCounter.addAndGet(ack.getMessageCount());
                 dispatched.addAndGet(-ack.getMessageCount());
                 delivered.set(Math.max(0,delivered.get()-ack.getMessageCount()));
@@ -215,7 +202,7 @@
         throw new JMSException("Invalid acknowledgment: "+ack);
     }
 
-    public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
+    public Response pullMessage(ConnectionContext context,MessagePull pull) throws Exception{
         // not supported for topics
         return null;
     }
@@ -231,15 +218,15 @@
     public int getMaximumPendingMessages(){
         return maximumPendingMessages;
     }
-    
-	public long getDispatchedCounter() {
-		return dispatched.get();
-	}
-
-	public long getEnqueueCounter() {
-		return enqueueCounter.get();
-	}
-    
+
+    public long getDispatchedCounter(){
+        return dispatched.get();
+    }
+
+    public long getEnqueueCounter(){
+        return enqueueCounter.get();
+    }
+
     public long getDequeueCounter(){
         return dequeueCounter.get();
     }
@@ -247,23 +234,22 @@
     /**
      * @return the number of messages discarded due to being a slow consumer
      */
-    public int discarded() {
-        synchronized(matchedListMutex) {
+    public int discarded(){
+        synchronized(matchedListMutex){
             return discarded;
         }
     }
 
     /**
-     * @return the number of matched messages (messages targeted for the subscription but not
-     * yet able to be dispatched due to the prefetch buffer being full).
+     * @return the number of matched messages (messages targeted for the subscription but not yet able to be dispatched
+     *         due to the prefetch buffer being full).
      */
-    public int matched() {
-        synchronized(matchedListMutex) {
+    public int matched(){
+        synchronized(matchedListMutex){
             return matched.size();
         }
     }
 
-
     /**
      * Sets the maximum number of pending messages that can be matched against this consumer before old messages are
      * discarded.
@@ -272,78 +258,92 @@
         this.maximumPendingMessages=maximumPendingMessages;
     }
 
-    public MessageEvictionStrategy getMessageEvictionStrategy() {
+    public MessageEvictionStrategy getMessageEvictionStrategy(){
         return messageEvictionStrategy;
     }
 
     /**
-     * Sets the eviction strategy used to decide which message to evict when the slow consumer
-     * needs to discard messages
+     * Sets the eviction strategy used to decide which message to evict when the slow consumer needs to discard messages
      */
-    public void setMessageEvictionStrategy(MessageEvictionStrategy messageEvictionStrategy) {
-        this.messageEvictionStrategy = messageEvictionStrategy;
+    public void setMessageEvictionStrategy(MessageEvictionStrategy messageEvictionStrategy){
+        this.messageEvictionStrategy=messageEvictionStrategy;
     }
 
-    
     // Implementation methods
     // -------------------------------------------------------------------------
-
     private boolean isFull(){
         return dispatched.get()-delivered.get()>=info.getPrefetchSize();
     }
-    
+
     /**
      * @return true when 60% or more room is left for dispatching messages
      */
     public boolean isLowWaterMark(){
-        return (dispatched.get()-delivered.get()) <= (info.getPrefetchSize() *.4);
+        return (dispatched.get()-delivered.get())<=(info.getPrefetchSize()*.4);
     }
-    
+
     /**
      * @return true when 10% or less room is left for dispatching messages
      */
     public boolean isHighWaterMark(){
-        return (dispatched.get()-delivered.get()) >= (info.getPrefetchSize() *.9);
+        return (dispatched.get()-delivered.get())>=(info.getPrefetchSize()*.9);
+    }
+
+    /**
+     * @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set
+     */
+    public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark){
+        this.memoryUsageHighWaterMark=memoryUsageHighWaterMark;
+    }
+
+    /**
+     * @return the memoryUsageHighWaterMark
+     */
+    public int getMemoryUsageHighWaterMark(){
+        return this.memoryUsageHighWaterMark;
+    }
+
+    /**
+     * @return the usageManager
+     */
+    public UsageManager getUsageManager(){
+        return this.usageManager;
     }
-    
+
     /**
      * inform the MessageConsumer on the client to change it's prefetch
+     * 
      * @param newPrefetch
      */
     public void updateConsumerPrefetch(int newPrefetch){
-        if (context != null && context.getConnection() != null && context.getConnection().isManageable()){
-            ConsumerControl cc = new ConsumerControl();
+        if(context!=null&&context.getConnection()!=null&&context.getConnection().isManageable()){
+            ConsumerControl cc=new ConsumerControl();
             cc.setConsumerId(info.getConsumerId());
             cc.setPrefetch(newPrefetch);
             context.getConnection().dispatchAsync(cc);
         }
     }
-    
+
     /**
      * optimize message consumer prefetch if the consumer supports it
-     *
+     * 
      */
     public void optimizePrefetch(){
-    	/*
-        if(info!=null&&info.isOptimizedAcknowledge()&&context!=null&&context.getConnection()!=null
-                        &&context.getConnection().isManageable()){
-            if(info.getCurrentPrefetchSize()!=info.getPrefetchSize() && isLowWaterMark()){
-                info.setCurrentPrefetchSize(info.getPrefetchSize());
-                updateConsumerPrefetch(info.getPrefetchSize());
-            }else if(info.getCurrentPrefetchSize()==info.getPrefetchSize() && isHighWaterMark()){
-                // want to purge any outstanding acks held by the consumer
-                info.setCurrentPrefetchSize(1);
-                updateConsumerPrefetch(1);
-            }
-        }
-        */
+        /*
+         * if(info!=null&&info.isOptimizedAcknowledge()&&context!=null&&context.getConnection()!=null
+         * &&context.getConnection().isManageable()){ if(info.getCurrentPrefetchSize()!=info.getPrefetchSize() &&
+         * isLowWaterMark()){ info.setCurrentPrefetchSize(info.getPrefetchSize());
+         * updateConsumerPrefetch(info.getPrefetchSize()); }else
+         * if(info.getCurrentPrefetchSize()==info.getPrefetchSize() && isHighWaterMark()){ // want to purge any
+         * outstanding acks held by the consumer info.setCurrentPrefetchSize(1); updateConsumerPrefetch(1); } }
+         */
     }
 
     private void dispatchMatched() throws IOException{
         synchronized(matchedListMutex){
             try{
                 matched.reset();
-                while(matched.hasNext()){
+                while(matched.hasNext()&&!isFull()){
                     MessageReference message=(MessageReference)matched.next();
                     matched.remove();
                     // Message may have been sitting in the matched list a while
@@ -361,29 +361,28 @@
     }
 
     private void dispatch(final MessageReference node) throws IOException{
-        Message message=(Message) node;
+        Message message=(Message)node;
         // Make sure we can dispatch a message.
         MessageDispatch md=new MessageDispatch();
         md.setMessage(message);
         md.setConsumerId(info.getConsumerId());
         md.setDestination(node.getRegionDestination().getActiveMQDestination());
         dispatched.incrementAndGet();
-       
         // Keep track if this subscription is receiving messages from a single destination.
-        if( singleDestination ) {
-        	if( destination == null ) {
-        		destination = node.getRegionDestination();
-        	} else {
-        		if( destination != node.getRegionDestination() ) {
-        			singleDestination = false;
-        		}
-        	}
+        if(singleDestination){
+            if(destination==null){
+                destination=node.getRegionDestination();
+            }else{
+                if(destination!=node.getRegionDestination()){
+                    singleDestination=false;
+                }
+            }
         }
-                
         if(info.isDispatchAsync()){
             md.setConsumer(new Runnable(){
+
                 public void run(){
-                    node.getRegionDestination().getDestinationStatistics().getDispatched().increment();	
+                    node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
                     node.decrementReferenceCount();
                 }
             });
@@ -397,13 +396,13 @@
 
     public String toString(){
         return "TopicSubscription:"+" consumer="+info.getConsumerId()+", destinations="+destinations.size()
-                        +", dispatched="+getDispatchedQueueSize()+", delivered="+getDequeueCounter()+", matched="+matched()+", discarded="+discarded();
+                +", dispatched="+getDispatchedQueueSize()+", delivered="+getDequeueCounter()+", matched="+matched()
+                +", discarded="+discarded();
     }
 
-    public void destroy() {
+    public void destroy(){
         synchronized(matchedListMutex){
             matched.destroy();
         }
     }
-
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?view=diff&rev=499760&r1=499759&r2=499760
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java Thu Jan 25 04:23:57 2007
@@ -25,14 +25,14 @@
  * @version $Revision$
  */
 public class AbstractPendingMessageCursor implements PendingMessageCursor{
-
+    protected int memoryUsageHighWaterMark = 90;
     protected int maxBatchSize=100;
     protected UsageManager usageManager;
 
-    public void start() throws Exception{
+    public void start() throws Exception {
     }
 
-    public void stop() throws Exception{
+    public void stop() throws Exception {
         gc();
     }
 
@@ -112,7 +112,7 @@
     }
     
     public boolean hasSpace() {
-        return usageManager != null ? !usageManager.isFull() : true;
+        return usageManager != null ? (usageManager.getPercentUsage() < memoryUsageHighWaterMark): true;
     }
     
     public boolean isFull() {
@@ -125,5 +125,29 @@
     
     public boolean hasMessagesBufferedToDeliver() {
         return false;
+    }
+
+    
+    /**
+     * @return the memoryUsageHighWaterMark
+     */
+    public int getMemoryUsageHighWaterMark(){
+        return this.memoryUsageHighWaterMark;
+    }
+
+    
+    /**
+     * @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set
+     */
+    public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark){
+        this.memoryUsageHighWaterMark=memoryUsageHighWaterMark;
+    }
+
+    
+    /**
+     * @return the usageManager
+     */
+    public UsageManager getUsageManager(){
+        return this.usageManager;
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?view=diff&rev=499760&r1=499759&r2=499760
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java Thu Jan 25 04:23:57 2007
@@ -21,6 +21,7 @@
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.command.Message;
+import org.apache.activemq.kaha.IndexTypes;
 import org.apache.activemq.kaha.ListContainer;
 import org.apache.activemq.kaha.Store;
 import org.apache.activemq.memory.UsageListener;
@@ -46,6 +47,7 @@
     private Destination regionDestination;
     private AtomicBoolean iterating=new AtomicBoolean();
     private boolean flushRequired;
+    private AtomicBoolean started=new AtomicBoolean();
 
     /**
      * @param name
@@ -56,6 +58,23 @@
         this.store=store;
     }
 
+    public void start(){
+        if(started.compareAndSet(false,true)){
+            if(usageManager!=null){
+                usageManager.addUsageListener(this);
+            }
+        }
+    }
+
+    public void stop(){
+        if(started.compareAndSet(true,false)){
+            gc();
+            if(usageManager!=null){
+                usageManager.removeUsageListener(this);
+            }
+        }
+    }
+
     /**
      * @return true if there are no pending messages
      */
@@ -83,6 +102,7 @@
     }
 
     public synchronized void destroy(){
+        stop();
         for(Iterator i=memoryList.iterator();i.hasNext();){
             Message node=(Message)i.next();
             node.decrementReferenceCount();
@@ -213,8 +233,8 @@
         // we always have space - as we can persist to disk
         return false;
     }
-    
-    public boolean hasMessagesBufferedToDeliver() {
+
+    public boolean hasMessagesBufferedToDeliver(){
         return !isEmpty();
     }
 
@@ -224,7 +244,7 @@
     }
 
     public void onMemoryUseChanged(UsageManager memoryManager,int oldPercentUsage,int newPercentUsage){
-        if(newPercentUsage>=100){
+        if(newPercentUsage>=getMemoryUsageHighWaterMark()){
             synchronized(iterating){
                 flushRequired=true;
                 if(!iterating.get()){
@@ -240,12 +260,14 @@
     }
 
     protected synchronized void flushToDisk(){
-        for(Iterator i=memoryList.iterator();i.hasNext();){
-            MessageReference node=(MessageReference)i.next();
-            node.decrementReferenceCount();
-            getDiskList().addLast(node);
+        if(!memoryList.isEmpty()){
+            while(!memoryList.isEmpty()){
+                MessageReference node=(MessageReference)memoryList.removeFirst();
+                node.decrementReferenceCount();
+                getDiskList().addLast(node);
+            }
+            memoryList.clear();
         }
-        memoryList.clear();
     }
 
     protected boolean isDiskListEmpty(){
@@ -255,10 +277,10 @@
     protected ListContainer getDiskList(){
         if(diskList==null){
             try{
-                diskList=store.getListContainer(name);
+                diskList=store.getListContainer(name,"TopicSubscription",IndexTypes.DISK_INDEX);
                 diskList.setMarshaller(new CommandMarshaller(new OpenWireFormat()));
-                diskList.setMaximumCacheSize(0);
             }catch(IOException e){
+                e.printStackTrace();
                 throw new RuntimeException(e);
             }
         }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java?view=diff&rev=499760&r1=499759&r2=499760
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java Thu Jan 25 04:23:57 2007
@@ -163,6 +163,23 @@
     public void setUsageManager(UsageManager usageManager);
     
     /**
+     * @return the usageManager
+     */
+    public UsageManager getUsageManager();
+    
+    /**
+     * @return the memoryUsageHighWaterMark
+     */
+    public int getMemoryUsageHighWaterMark();
+
+    
+    /**
+     * @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set
+     */
+    public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark);
+
+    
+    /**
      * @return true if the cursor is full
      */
     public boolean isFull();
@@ -171,4 +188,6 @@
      * @return true if the cursor has buffered messages ready to deliver
      */
     public boolean hasMessagesBufferedToDeliver();
+    
+    
 }