You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2007/01/25 13:23:58 UTC
svn commit: r499760 - in
/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region:
TopicSubscription.java cursors/AbstractPendingMessageCursor.java
cursors/FilePendingMessageCursor.java cursors/PendingMessageCursor.java
Author: rajdavies
Date: Thu Jan 25 04:23:57 2007
New Revision: 499760
URL: http://svn.apache.org/viewvc?view=rev&rev=499760
Log:
fix for memory leaks woth non-persistent messages - see http://www.nabble.com/OutOfMemoryErrors-again-tf3083798.html
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?view=diff&rev=499760&r1=499759&r2=499760
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java Thu Jan 25 04:23:57 2007
@@ -1,36 +1,29 @@
/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
+
package org.apache.activemq.broker.region;
import java.io.IOException;
-import java.util.Iterator;
import java.util.LinkedList;
-
+import java.util.concurrent.atomic.AtomicLong;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
-
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
import org.apache.activemq.broker.region.policy.MessageEvictionStrategy;
import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
@@ -44,40 +37,38 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import java.util.concurrent.atomic.AtomicLong;
-
-
public class TopicSubscription extends AbstractSubscription{
-
+
private static final Log log=LogFactory.getLog(TopicSubscription.class);
-
+ private static final AtomicLong cursorNameCounter=new AtomicLong(0);
final protected FilePendingMessageCursor matched;
- final protected ActiveMQDestination dlqDestination=new ActiveMQQueue("ActiveMQ.DLQ");
final protected UsageManager usageManager;
protected AtomicLong dispatched=new AtomicLong();
protected AtomicLong delivered=new AtomicLong();
private int maximumPendingMessages=-1;
- private MessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy();
- private int discarded = 0;
+ private MessageEvictionStrategy messageEvictionStrategy=new OldestMessageEvictionStrategy();
+ private int discarded=0;
private final Object matchedListMutex=new Object();
- private final AtomicLong enqueueCounter = new AtomicLong(0);
- private final AtomicLong dequeueCounter = new AtomicLong(0);
-
+ private final AtomicLong enqueueCounter=new AtomicLong(0);
+ private final AtomicLong dequeueCounter=new AtomicLong(0);
boolean singleDestination=true;
- Destination destination;
-
+ Destination destination;
+ private int memoryUsageHighWaterMark=95;
+
public TopicSubscription(Broker broker,ConnectionContext context,ConsumerInfo info,UsageManager usageManager)
- throws InvalidSelectorException{
+ throws InvalidSelectorException{
super(broker,context,info);
this.usageManager=usageManager;
- this.matched = new FilePendingMessageCursor(info.getConsumerId().toString(), broker.getTempDataStore());
+ String matchedName="TopicSubscription:"+cursorNameCounter.getAndIncrement()+"["+info.getConsumerId().toString()
+ +"]";
+ this.matched=new FilePendingMessageCursor(matchedName,broker.getTempDataStore());
+ this.matched.setUsageManager(usageManager);
+ this.matched.start();
}
public void add(MessageReference node) throws InterruptedException,IOException{
-
- enqueueCounter.incrementAndGet();
+ enqueueCounter.incrementAndGet();
node.incrementReferenceCount();
-
if(!isFull()&&!isSlaveBroker()){
optimizePrefetch();
// if maximumPendingMessages is set we will only discard messages which
@@ -88,40 +79,37 @@
synchronized(matchedListMutex){
matched.addMessageLast(node);
// NOTE - be careful about the slaveBroker!
- if (maximumPendingMessages > 0) {
-
+ if(maximumPendingMessages>0){
// calculate the high water mark from which point we will eagerly evict expired messages
- int max = messageEvictionStrategy.getEvictExpiredMessagesHighWatermark();
- if (maximumPendingMessages > 0 && maximumPendingMessages < max) {
- max = maximumPendingMessages;
+ int max=messageEvictionStrategy.getEvictExpiredMessagesHighWatermark();
+ if(maximumPendingMessages>0&&maximumPendingMessages<max){
+ max=maximumPendingMessages;
}
- if (!matched.isEmpty() && matched.size() > max) {
+ if(!matched.isEmpty()&&matched.size()>max){
removeExpiredMessages();
}
-
// lets discard old messages as we are a slow consumer
- while (!matched.isEmpty() && matched.size() > maximumPendingMessages) {
- int pageInSize = matched.size() - maximumPendingMessages;
- //only page in a 1000 at a time - else we could blow da memory
- pageInSize = Math.max(1000,pageInSize);
- LinkedList list = matched.pageInList(pageInSize);
- MessageReference[] oldMessages = messageEvictionStrategy.evictMessages(list);
- int messagesToEvict = oldMessages.length;
- for(int i = 0; i < messagesToEvict; i++) {
- MessageReference oldMessage = oldMessages[i];
- oldMessage.decrementReferenceCount();
+ while(!matched.isEmpty()&&matched.size()>maximumPendingMessages){
+ int pageInSize=matched.size()-maximumPendingMessages;
+ // only page in a 1000 at a time - else we could blow da memory
+ pageInSize=Math.max(1000,pageInSize);
+ LinkedList list=matched.pageInList(pageInSize);
+ MessageReference[] oldMessages=messageEvictionStrategy.evictMessages(list);
+ int messagesToEvict=oldMessages.length;
+ for(int i=0;i<messagesToEvict;i++){
+ MessageReference oldMessage=oldMessages[i];
+ oldMessage.decrementReferenceCount();
matched.remove(oldMessage);
-
discarded++;
- if (log.isDebugEnabled()) {
- log.debug("Discarding message " + oldMessages[i]);
+ if(log.isDebugEnabled()){
+ log.debug("Discarding message "+oldMessages[i]);
}
- }
-
+ }
// lets avoid an infinite loop if we are given a bad eviction strategy
// for a bad strategy lets just not evict
- if (messagesToEvict == 0) {
- log.warn("No messages to evict returned from eviction strategy: " + messageEvictionStrategy);
+ if(messagesToEvict==0){
+ log.warn("No messages to evict returned from eviction strategy: "
+ +messageEvictionStrategy);
break;
}
}
@@ -133,7 +121,8 @@
/**
* Discard any expired messages from the matched list. Called from a synchronized block.
- * @throws IOException
+ *
+ * @throws IOException
*/
protected void removeExpiredMessages() throws IOException{
try{
@@ -172,30 +161,28 @@
}
synchronized public void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception{
-
// Handle the standard acknowledgment case.
boolean wasFull=isFull();
if(ack.isStandardAck()||ack.isPoisonAck()){
if(context.isInTransaction()){
delivered.addAndGet(ack.getMessageCount());
context.getTransaction().addSynchronization(new Synchronization(){
+
public void afterCommit() throws Exception{
- synchronized( TopicSubscription.this ) {
- if( singleDestination ) {
- destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
- }
- }
+ synchronized(TopicSubscription.this){
+ if(singleDestination){
+ destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
+ }
+ }
dequeueCounter.addAndGet(ack.getMessageCount());
dispatched.addAndGet(-ack.getMessageCount());
delivered.set(Math.max(0,delivered.get()-ack.getMessageCount()));
}
});
}else{
-
- if( singleDestination ) {
- destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
- }
-
+ if(singleDestination){
+ destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
+ }
dequeueCounter.addAndGet(ack.getMessageCount());
dispatched.addAndGet(-ack.getMessageCount());
delivered.set(Math.max(0,delivered.get()-ack.getMessageCount()));
@@ -215,7 +202,7 @@
throw new JMSException("Invalid acknowledgment: "+ack);
}
- public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
+ public Response pullMessage(ConnectionContext context,MessagePull pull) throws Exception{
// not supported for topics
return null;
}
@@ -231,15 +218,15 @@
public int getMaximumPendingMessages(){
return maximumPendingMessages;
}
-
- public long getDispatchedCounter() {
- return dispatched.get();
- }
-
- public long getEnqueueCounter() {
- return enqueueCounter.get();
- }
-
+
+ public long getDispatchedCounter(){
+ return dispatched.get();
+ }
+
+ public long getEnqueueCounter(){
+ return enqueueCounter.get();
+ }
+
public long getDequeueCounter(){
return dequeueCounter.get();
}
@@ -247,23 +234,22 @@
/**
* @return the number of messages discarded due to being a slow consumer
*/
- public int discarded() {
- synchronized(matchedListMutex) {
+ public int discarded(){
+ synchronized(matchedListMutex){
return discarded;
}
}
/**
- * @return the number of matched messages (messages targeted for the subscription but not
- * yet able to be dispatched due to the prefetch buffer being full).
+ * @return the number of matched messages (messages targeted for the subscription but not yet able to be dispatched
+ * due to the prefetch buffer being full).
*/
- public int matched() {
- synchronized(matchedListMutex) {
+ public int matched(){
+ synchronized(matchedListMutex){
return matched.size();
}
}
-
/**
* Sets the maximum number of pending messages that can be matched against this consumer before old messages are
* discarded.
@@ -272,78 +258,92 @@
this.maximumPendingMessages=maximumPendingMessages;
}
- public MessageEvictionStrategy getMessageEvictionStrategy() {
+ public MessageEvictionStrategy getMessageEvictionStrategy(){
return messageEvictionStrategy;
}
/**
- * Sets the eviction strategy used to decide which message to evict when the slow consumer
- * needs to discard messages
+ * Sets the eviction strategy used to decide which message to evict when the slow consumer needs to discard messages
*/
- public void setMessageEvictionStrategy(MessageEvictionStrategy messageEvictionStrategy) {
- this.messageEvictionStrategy = messageEvictionStrategy;
+ public void setMessageEvictionStrategy(MessageEvictionStrategy messageEvictionStrategy){
+ this.messageEvictionStrategy=messageEvictionStrategy;
}
-
// Implementation methods
// -------------------------------------------------------------------------
-
private boolean isFull(){
return dispatched.get()-delivered.get()>=info.getPrefetchSize();
}
-
+
/**
* @return true when 60% or more room is left for dispatching messages
*/
public boolean isLowWaterMark(){
- return (dispatched.get()-delivered.get()) <= (info.getPrefetchSize() *.4);
+ return (dispatched.get()-delivered.get())<=(info.getPrefetchSize()*.4);
}
-
+
/**
* @return true when 10% or less room is left for dispatching messages
*/
public boolean isHighWaterMark(){
- return (dispatched.get()-delivered.get()) >= (info.getPrefetchSize() *.9);
+ return (dispatched.get()-delivered.get())>=(info.getPrefetchSize()*.9);
+ }
+
+ /**
+ * @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set
+ */
+ public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark){
+ this.memoryUsageHighWaterMark=memoryUsageHighWaterMark;
+ }
+
+ /**
+ * @return the memoryUsageHighWaterMark
+ */
+ public int getMemoryUsageHighWaterMark(){
+ return this.memoryUsageHighWaterMark;
+ }
+
+ /**
+ * @return the usageManager
+ */
+ public UsageManager getUsageManager(){
+ return this.usageManager;
}
-
+
/**
* inform the MessageConsumer on the client to change it's prefetch
+ *
* @param newPrefetch
*/
public void updateConsumerPrefetch(int newPrefetch){
- if (context != null && context.getConnection() != null && context.getConnection().isManageable()){
- ConsumerControl cc = new ConsumerControl();
+ if(context!=null&&context.getConnection()!=null&&context.getConnection().isManageable()){
+ ConsumerControl cc=new ConsumerControl();
cc.setConsumerId(info.getConsumerId());
cc.setPrefetch(newPrefetch);
context.getConnection().dispatchAsync(cc);
}
}
-
+
/**
* optimize message consumer prefetch if the consumer supports it
- *
+ *
*/
public void optimizePrefetch(){
- /*
- if(info!=null&&info.isOptimizedAcknowledge()&&context!=null&&context.getConnection()!=null
- &&context.getConnection().isManageable()){
- if(info.getCurrentPrefetchSize()!=info.getPrefetchSize() && isLowWaterMark()){
- info.setCurrentPrefetchSize(info.getPrefetchSize());
- updateConsumerPrefetch(info.getPrefetchSize());
- }else if(info.getCurrentPrefetchSize()==info.getPrefetchSize() && isHighWaterMark()){
- // want to purge any outstanding acks held by the consumer
- info.setCurrentPrefetchSize(1);
- updateConsumerPrefetch(1);
- }
- }
- */
+ /*
+ * if(info!=null&&info.isOptimizedAcknowledge()&&context!=null&&context.getConnection()!=null
+ * &&context.getConnection().isManageable()){ if(info.getCurrentPrefetchSize()!=info.getPrefetchSize() &&
+ * isLowWaterMark()){ info.setCurrentPrefetchSize(info.getPrefetchSize());
+ * updateConsumerPrefetch(info.getPrefetchSize()); }else
+ * if(info.getCurrentPrefetchSize()==info.getPrefetchSize() && isHighWaterMark()){ // want to purge any
+ * outstanding acks held by the consumer info.setCurrentPrefetchSize(1); updateConsumerPrefetch(1); } }
+ */
}
private void dispatchMatched() throws IOException{
synchronized(matchedListMutex){
try{
matched.reset();
- while(matched.hasNext()){
+ while(matched.hasNext()&&!isFull()){
MessageReference message=(MessageReference)matched.next();
matched.remove();
// Message may have been sitting in the matched list a while
@@ -361,29 +361,28 @@
}
private void dispatch(final MessageReference node) throws IOException{
- Message message=(Message) node;
+ Message message=(Message)node;
// Make sure we can dispatch a message.
MessageDispatch md=new MessageDispatch();
md.setMessage(message);
md.setConsumerId(info.getConsumerId());
md.setDestination(node.getRegionDestination().getActiveMQDestination());
dispatched.incrementAndGet();
-
// Keep track if this subscription is receiving messages from a single destination.
- if( singleDestination ) {
- if( destination == null ) {
- destination = node.getRegionDestination();
- } else {
- if( destination != node.getRegionDestination() ) {
- singleDestination = false;
- }
- }
+ if(singleDestination){
+ if(destination==null){
+ destination=node.getRegionDestination();
+ }else{
+ if(destination!=node.getRegionDestination()){
+ singleDestination=false;
+ }
+ }
}
-
if(info.isDispatchAsync()){
md.setConsumer(new Runnable(){
+
public void run(){
- node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
+ node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
node.decrementReferenceCount();
}
});
@@ -397,13 +396,13 @@
public String toString(){
return "TopicSubscription:"+" consumer="+info.getConsumerId()+", destinations="+destinations.size()
- +", dispatched="+getDispatchedQueueSize()+", delivered="+getDequeueCounter()+", matched="+matched()+", discarded="+discarded();
+ +", dispatched="+getDispatchedQueueSize()+", delivered="+getDequeueCounter()+", matched="+matched()
+ +", discarded="+discarded();
}
- public void destroy() {
+ public void destroy(){
synchronized(matchedListMutex){
matched.destroy();
}
}
-
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?view=diff&rev=499760&r1=499759&r2=499760
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java Thu Jan 25 04:23:57 2007
@@ -25,14 +25,14 @@
* @version $Revision$
*/
public class AbstractPendingMessageCursor implements PendingMessageCursor{
-
+ protected int memoryUsageHighWaterMark = 90;
protected int maxBatchSize=100;
protected UsageManager usageManager;
- public void start() throws Exception{
+ public void start() throws Exception {
}
- public void stop() throws Exception{
+ public void stop() throws Exception {
gc();
}
@@ -112,7 +112,7 @@
}
public boolean hasSpace() {
- return usageManager != null ? !usageManager.isFull() : true;
+ return usageManager != null ? (usageManager.getPercentUsage() < memoryUsageHighWaterMark): true;
}
public boolean isFull() {
@@ -125,5 +125,29 @@
public boolean hasMessagesBufferedToDeliver() {
return false;
+ }
+
+
+ /**
+ * @return the memoryUsageHighWaterMark
+ */
+ public int getMemoryUsageHighWaterMark(){
+ return this.memoryUsageHighWaterMark;
+ }
+
+
+ /**
+ * @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set
+ */
+ public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark){
+ this.memoryUsageHighWaterMark=memoryUsageHighWaterMark;
+ }
+
+
+ /**
+ * @return the usageManager
+ */
+ public UsageManager getUsageManager(){
+ return this.usageManager;
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?view=diff&rev=499760&r1=499759&r2=499760
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java Thu Jan 25 04:23:57 2007
@@ -21,6 +21,7 @@
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.command.Message;
+import org.apache.activemq.kaha.IndexTypes;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.memory.UsageListener;
@@ -46,6 +47,7 @@
private Destination regionDestination;
private AtomicBoolean iterating=new AtomicBoolean();
private boolean flushRequired;
+ private AtomicBoolean started=new AtomicBoolean();
/**
* @param name
@@ -56,6 +58,23 @@
this.store=store;
}
+ public void start(){
+ if(started.compareAndSet(false,true)){
+ if(usageManager!=null){
+ usageManager.addUsageListener(this);
+ }
+ }
+ }
+
+ public void stop(){
+ if(started.compareAndSet(true,false)){
+ gc();
+ if(usageManager!=null){
+ usageManager.removeUsageListener(this);
+ }
+ }
+ }
+
/**
* @return true if there are no pending messages
*/
@@ -83,6 +102,7 @@
}
public synchronized void destroy(){
+ stop();
for(Iterator i=memoryList.iterator();i.hasNext();){
Message node=(Message)i.next();
node.decrementReferenceCount();
@@ -213,8 +233,8 @@
// we always have space - as we can persist to disk
return false;
}
-
- public boolean hasMessagesBufferedToDeliver() {
+
+ public boolean hasMessagesBufferedToDeliver(){
return !isEmpty();
}
@@ -224,7 +244,7 @@
}
public void onMemoryUseChanged(UsageManager memoryManager,int oldPercentUsage,int newPercentUsage){
- if(newPercentUsage>=100){
+ if(newPercentUsage>=getMemoryUsageHighWaterMark()){
synchronized(iterating){
flushRequired=true;
if(!iterating.get()){
@@ -240,12 +260,14 @@
}
protected synchronized void flushToDisk(){
- for(Iterator i=memoryList.iterator();i.hasNext();){
- MessageReference node=(MessageReference)i.next();
- node.decrementReferenceCount();
- getDiskList().addLast(node);
+ if(!memoryList.isEmpty()){
+ while(!memoryList.isEmpty()){
+ MessageReference node=(MessageReference)memoryList.removeFirst();
+ node.decrementReferenceCount();
+ getDiskList().addLast(node);
+ }
+ memoryList.clear();
}
- memoryList.clear();
}
protected boolean isDiskListEmpty(){
@@ -255,10 +277,10 @@
protected ListContainer getDiskList(){
if(diskList==null){
try{
- diskList=store.getListContainer(name);
+ diskList=store.getListContainer(name,"TopicSubscription",IndexTypes.DISK_INDEX);
diskList.setMarshaller(new CommandMarshaller(new OpenWireFormat()));
- diskList.setMaximumCacheSize(0);
}catch(IOException e){
+ e.printStackTrace();
throw new RuntimeException(e);
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java?view=diff&rev=499760&r1=499759&r2=499760
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java Thu Jan 25 04:23:57 2007
@@ -163,6 +163,23 @@
public void setUsageManager(UsageManager usageManager);
/**
+ * @return the usageManager
+ */
+ public UsageManager getUsageManager();
+
+ /**
+ * @return the memoryUsageHighWaterMark
+ */
+ public int getMemoryUsageHighWaterMark();
+
+
+ /**
+ * @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set
+ */
+ public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark);
+
+
+ /**
* @return true if the cursor is full
*/
public boolean isFull();
@@ -171,4 +188,6 @@
* @return true if the cursor has buffered messages ready to deliver
*/
public boolean hasMessagesBufferedToDeliver();
+
+
}