You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2009/03/02 16:13:58 UTC

svn commit: r749331 - in /qpid/trunk/qpid/java/broker/src: main/java/org/apache/qpid/server/configuration/ main/java/org/apache/qpid/server/queue/ test/java/org/apache/qpid/server/exchange/ test/java/org/apache/qpid/server/queue/

Author: ritchiem
Date: Mon Mar  2 15:13:57 2009
New Revision: 749331

URL: http://svn.apache.org/viewvc?rev=749331&view=rev
Log:
QPID-1637 : Added Purger thread for Priority Queues and when threasholds are adjusted.
QueueEntries are now the point of entry to load/unload rather than the List. This is because it is only the QueueEntryList that the QueueEntry that is attached to that can correctly account for the inMemory usage. In the Priority Queue case the priority queue does not know which sub list the QueueEntry is on. As the QEI knows it makes sence to request load/unload through the entry.

Set the default Maximum InMemory to -1, disabled.

Removed the FlowableQueueEntryList interface, merged with QueueEntryList

Removed:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableQueueEntryList.java
Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java?rev=749331&r1=749330&r2=749331&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java Mon Mar  2 15:13:57 2009
@@ -114,7 +114,7 @@
 
     public long getMemoryUsageMaximum()
     {
-        return _config.getLong("maximumMemoryUsage", 0);
+        return _config.getLong("maximumMemoryUsage", -1);
     }
 
     public long getMemoryUsageMinimum()

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java?rev=749331&r1=749330&r2=749331&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java Mon Mar  2 15:13:57 2009
@@ -24,7 +24,6 @@
 import org.apache.qpid.pool.ReferenceCountingExecutorService;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -32,25 +31,29 @@
 import java.util.concurrent.atomic.AtomicReference;
 
 /** This is an abstract base class to handle */
-public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryList
+public abstract class FlowableBaseQueueEntryList implements QueueEntryList
 {
-    private static final Logger _log = Logger.getLogger(FlowableBaseQueueEntryList.class);
+    protected static final Logger _log = Logger.getLogger(FlowableBaseQueueEntryList.class);
 
     private final AtomicInteger _atomicQueueCount = new AtomicInteger(0);
     private final AtomicLong _atomicQueueSize = new AtomicLong(0L);
-    private final AtomicLong _atomicQueueInMemory = new AtomicLong(0L);
+    protected final AtomicLong _atomicQueueInMemory = new AtomicLong(0L);
     /** The maximum amount of memory that is allocated to this queue. Beyond this the queue will flow to disk. */
 
-    private long _memoryUsageMaximum = 0;
+    protected long _memoryUsageMaximum = -1L;
 
     /** The minimum amount of memory that is allocated to this queue. If the queueDepth hits this level then more flowed data can be read in. */
-    private long _memoryUsageMinimum = 0;
+    protected long _memoryUsageMinimum = 0;
     private volatile AtomicBoolean _flowed;
     private QueueBackingStore _backingStore;
     protected AMQQueue _queue;
     private Executor _inhaler;
+    private Executor _purger;
     private AtomicBoolean _stopped;
     private AtomicReference<MessageInhaler> _asynchronousInhaler = new AtomicReference(null);
+    protected boolean _disabled;
+    private AtomicReference<MessagePurger> _asynchronousPurger = new AtomicReference(null);
+    private static final int BATCH_INHALE_COUNT = 100;
 
     FlowableBaseQueueEntryList(AMQQueue queue)
     {
@@ -64,6 +67,8 @@
 
         _stopped = new AtomicBoolean(false);
         _inhaler = ReferenceCountingExecutorService.getInstance().acquireExecutorService();
+        _purger = ReferenceCountingExecutorService.getInstance().acquireExecutorService();
+        _disabled = true;
     }
 
     public void setFlowed(boolean flowed)
@@ -71,10 +76,26 @@
         if (_flowed.get() != flowed)
         {
             _log.warn("Marking Queue(" + _queue.getName() + ") as flowed (" + flowed + ")");
+            showUsage();
             _flowed.set(flowed);
         }
     }
 
+    protected void showUsage()
+    {
+        showUsage("");
+    }
+
+    protected void showUsage(String prefix)
+    {
+        if (_log.isDebugEnabled())
+        {
+            _log.debug(prefix + " Queue(" + _queue + ":" + _queue.getName() + ") usage:" + memoryUsed()
+                       + "/" + getMemoryUsageMinimum() + "<>" + getMemoryUsageMaximum()
+                       + "/" + dataSize());
+        }
+    }
+
     public boolean isFlowed()
     {
         return _flowed.get();
@@ -99,13 +120,15 @@
     {
         _memoryUsageMaximum = maximumMemoryUsage;
 
+        if (maximumMemoryUsage >= 0)
+        {
+            _disabled = false;
+        }
+
         // Don't attempt to start the inhaler/purger unless we have a minimum value specified.
         if (_memoryUsageMaximum > 0)
         {
-            if (_memoryUsageMinimum == 0)
-            {
-                setMemoryUsageMinimum(_memoryUsageMaximum / 2);
-            }
+            setMemoryUsageMinimum(_memoryUsageMaximum / 2);
 
             // if we have now have to much memory in use we need to purge.
             if (_memoryUsageMaximum < _atomicQueueInMemory.get())
@@ -113,6 +136,21 @@
                 startPurger();
             }
         }
+        else if (_memoryUsageMaximum == 0)
+        {
+            if (_atomicQueueInMemory.get() > 0)
+            {
+                startPurger();
+            }
+        }
+        else
+        {
+            if (_log.isInfoEnabled())
+            {
+                _log.info("Disabling Flow to Disk for queue:" + _queue.getName());
+            }
+            _disabled = true;
+        }
     }
 
     public long getMemoryUsageMaximum()
@@ -134,7 +172,9 @@
     private void checkAndStartLoader()
     {
         // If we've increased the minimum memory above what we have in memory then we need to inhale more
-        if (_atomicQueueInMemory.get() <= _memoryUsageMinimum)
+        long inMemory = _atomicQueueInMemory.get();
+        // Can't check if inMemory == 0 or we will cause the inhaler thread to continually run.
+        if (inMemory < _memoryUsageMinimum || _memoryUsageMinimum == 0)
         {
             startInhaler();
         }
@@ -142,22 +182,22 @@
 
     private void startInhaler()
     {
-        if (_flowed.get())
-        {
-            MessageInhaler inhaler = new MessageInhaler();
+        MessageInhaler inhaler = new MessageInhaler();
 
-            if (_asynchronousInhaler.compareAndSet(null, inhaler))
-            {
-                _inhaler.execute(inhaler);
-            }
+        if (_asynchronousInhaler.compareAndSet(null, inhaler))
+        {
+            _inhaler.execute(inhaler);
         }
     }
 
     private void startPurger()
     {
-       //TODO create purger, used when maxMemory is reduced creating over memory situation.
-       _log.warn("Requested Purger Start.. purger TBC.");
-       //_purger.execute(new MessagePurger(this));
+        MessagePurger purger = new MessagePurger();
+
+        if (_asynchronousPurger.compareAndSet(null, purger))
+        {
+            _purger.execute(purger);
+        }
     }
 
     public long getMemoryUsageMinimum()
@@ -165,26 +205,30 @@
         return _memoryUsageMinimum;
     }
 
+    /**
+     * Only to be called by the QueueEntry
+     *
+     * @param queueEntry the entry to unload
+     */
     public void unloadEntry(QueueEntry queueEntry)
     {
-        try
-        {
-            queueEntry.unload();
-            _atomicQueueInMemory.addAndGet(-queueEntry.getSize());
-            checkAndStartLoader();
-        }
-        catch (UnableToFlowMessageException e)
+        if (_atomicQueueInMemory.addAndGet(-queueEntry.getSize()) < 0)
         {
-            _atomicQueueInMemory.addAndGet(queueEntry.getSize());
+            _log.error("InMemory Count just went below 0:" + queueEntry.debugIdentity());
         }
+        checkAndStartLoader();
     }
 
+    /**
+     * Only to be called from the QueueEntry
+     *
+     * @param queueEntry the entry to load
+     */
     public void loadEntry(QueueEntry queueEntry)
     {
-        queueEntry.load();
-        if( _atomicQueueInMemory.addAndGet(queueEntry.getSize()) > _memoryUsageMaximum)
+        if (_atomicQueueInMemory.addAndGet(queueEntry.getSize()) > _memoryUsageMaximum)
         {
-            _log.error("Loaded to much data!:"+_atomicQueueInMemory.get()+"/"+_memoryUsageMaximum);
+            _log.error("Loaded to much data!:" + _atomicQueueInMemory.get() + "/" + _memoryUsageMaximum);
         }
     }
 
@@ -196,44 +240,21 @@
             // rather than actively shutdown our threads.
             //Shutdown thread for inhaler.
             ReferenceCountingExecutorService.getInstance().releaseExecutorService();
+            ReferenceCountingExecutorService.getInstance().releaseExecutorService();
         }
     }
 
-    protected boolean willCauseFlowToDisk(QueueEntryImpl queueEntry)
-    {
-        return _memoryUsageMaximum != 0 && memoryUsed() + queueEntry.getSize() > _memoryUsageMaximum;
-    }
-
     protected void incrementCounters(final QueueEntryImpl queueEntry)
     {
         _atomicQueueCount.incrementAndGet();
         _atomicQueueSize.addAndGet(queueEntry.getSize());
-        if (!willCauseFlowToDisk(queueEntry))
-        {
-            _atomicQueueInMemory.addAndGet(queueEntry.getSize());
-        }
-        else
-        {
-            setFlowed(true);
-            flowingToDisk(queueEntry);
-        }
-    }
+        long inUseMemory = _atomicQueueInMemory.addAndGet(queueEntry.getSize());
 
-    /**
-     * Called when we are now flowing to disk
-     *
-     * @param queueEntry the entry that is being flowed to disk
-     */
-    protected void flowingToDisk(QueueEntryImpl queueEntry)
-    {
-        try
+        if (!_disabled && inUseMemory > _memoryUsageMaximum)
         {
+            setFlowed(true);
             queueEntry.unload();
         }
-        catch (UnableToFlowMessageException e)
-        {
-            _atomicQueueInMemory.addAndGet(queueEntry.getSize());
-        }
     }
 
     protected void dequeued(QueueEntryImpl queueEntry)
@@ -242,7 +263,10 @@
         _atomicQueueSize.addAndGet(-queueEntry.getSize());
         if (!queueEntry.isFlowed())
         {
-            _atomicQueueInMemory.addAndGet(-queueEntry.getSize());
+            if (_atomicQueueInMemory.addAndGet(-queueEntry.getSize()) < 0)
+            {
+                _log.error("InMemory Count just went below 0 on dequeue.");
+            }
         }
     }
 
@@ -270,32 +294,53 @@
 
     private void inhaleList(MessageInhaler messageInhaler)
     {
-        _log.info("Inhaler Running");
+        if (_log.isInfoEnabled())
+        {
+            _log.info("Inhaler Running:" + _queue.getName());
+            showUsage("Inhaler Running:" + _queue.getName());
+        }
         // If in memory count is at or over max then we can't inhale
         if (_atomicQueueInMemory.get() >= _memoryUsageMaximum)
         {
-            _log.debug("Unable to start inhaling as we are already over quota:" +
-                       _atomicQueueInMemory.get() + ">=" + _memoryUsageMaximum);
+            if (_log.isDebugEnabled())
+            {
+                _log.debug("Unable to start inhaling as we are already over quota:" +
+                           _atomicQueueInMemory.get() + ">=" + _memoryUsageMaximum);
+            }
             return;
         }
 
         _asynchronousInhaler.compareAndSet(messageInhaler, null);
-        while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) && _asynchronousInhaler.compareAndSet(null, messageInhaler))
+        int inhaled = 0;
+
+        while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) && (inhaled < BATCH_INHALE_COUNT)
+               && _asynchronousInhaler.compareAndSet(null, messageInhaler))
         {
             QueueEntryIterator iterator = iterator();
 
-            while (!iterator.getNode().isAvailable() && iterator.advance())
+            // If the inhaler is running and delivery rate picks up ensure that we just don't chase the delivery thread.
+            while ((_atomicQueueInMemory.get() < _memoryUsageMaximum)
+                   && !iterator.getNode().isAvailable() && iterator.advance())
             {
                 //Find first AVAILABLE node
             }
 
-            while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) && !iterator.atTail())
+            while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) && (inhaled < BATCH_INHALE_COUNT) && !iterator.atTail())
             {
                 QueueEntry entry = iterator.getNode();
 
                 if (entry.isAvailable() && entry.isFlowed())
                 {
-                    loadEntry(entry);
+                    if (_atomicQueueInMemory.get() + entry.getSize() > _memoryUsageMaximum)
+                    {
+                        // We don't have space for this message so we need to stop inhaling.
+                        inhaled = BATCH_INHALE_COUNT;
+                    }
+                    else
+                    {
+                        loadEntry(entry);
+                        inhaled++;
+                    }
                 }
 
                 iterator.advance();
@@ -309,13 +354,100 @@
             _asynchronousInhaler.set(null);
         }
 
+        if (_log.isInfoEnabled())
+        {
+            _log.info("Inhaler Stopping:" + _queue.getName());
+            showUsage("Inhaler Stopping:" + _queue.getName());
+        }
+
         //If we have become flowed or have more capacity since we stopped then schedule the thread to run again.
         if (_flowed.get() && _atomicQueueInMemory.get() < _memoryUsageMaximum)
         {
+            if (_log.isInfoEnabled())
+            {
+                _log.info("Rescheduling Inhaler:" + _queue.getName());
+            }
             _inhaler.execute(messageInhaler);
-
         }
 
     }
 
+    private class MessagePurger implements Runnable
+    {
+        public void run()
+        {
+            String threadName = Thread.currentThread().getName();
+            Thread.currentThread().setName("Purger-" + _queue.getVirtualHost().getName() + "-" + _queue.getName());
+            try
+            {
+                purgeList(this);
+            }
+            finally
+            {
+                Thread.currentThread().setName(threadName);
+            }
+        }
+    }
+
+    private void purgeList(MessagePurger messagePurger)
+    {
+        // If in memory count is at or over max then we can't inhale
+        if (_atomicQueueInMemory.get() <= _memoryUsageMinimum)
+        {
+            if (_log.isDebugEnabled())
+            {
+                _log.debug("Unable to start purging as we are already below our minimum cache level:" +
+                           _atomicQueueInMemory.get() + "<=" + _memoryUsageMinimum);
+            }
+            return;
+        }
+
+        _asynchronousPurger.compareAndSet(messagePurger, null);
+
+        while ((_atomicQueueInMemory.get() >= _memoryUsageMinimum) && _asynchronousPurger.compareAndSet(null, messagePurger))
+        {
+            QueueEntryIterator iterator = iterator();
+
+            while (!iterator.getNode().isAvailable() && iterator.advance())
+            {
+                //Find first AVAILABLE node
+            }
+
+            // Count up the memory usage
+            long memoryUsage = 0;
+            while ((memoryUsage < _memoryUsageMaximum) && !iterator.atTail())
+            {
+                QueueEntry entry = iterator.getNode();
+
+                if (entry.isAvailable() && !entry.isFlowed())
+                {
+                    memoryUsage += entry.getSize();
+                }
+
+                iterator.advance();
+            }
+
+            //Purge remainging mesages on queue
+            while (!iterator.atTail())
+            {
+                QueueEntry entry = iterator.getNode();
+
+                if (entry.isAvailable() && !entry.isFlowed())
+                {
+                    entry.unload();
+                }
+
+                iterator.advance();
+            }
+
+            _asynchronousInhaler.set(null);
+        }
+
+        //If we have become flowed or have more capacity since we stopped then schedule the thread to run again.
+        if (_flowed.get() && _atomicQueueInMemory.get() < _memoryUsageMaximum)
+        {
+            _inhaler.execute(messagePurger);
+
+        }
+    }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java?rev=749331&r1=749330&r2=749331&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java Mon Mar  2 15:13:57 2009
@@ -25,7 +25,7 @@
 public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implements QueueEntryList
 {
     private final AMQQueue _queue;
-    private final FlowableQueueEntryList[] _priorityLists;
+    private final QueueEntryList[] _priorityLists;
     private final int _priorities;
     private final int _priorityOffset;
 
@@ -33,13 +33,15 @@
     {
         super(queue);
         _queue = queue;
-        _priorityLists = new FlowableQueueEntryList[priorities];
+        _priorityLists = new QueueEntryList[priorities];
         _priorities = priorities;
-        _priorityOffset = 5-((priorities + 1)/2);
-        for(int i = 0; i < priorities; i++)
+        _priorityOffset = 5 - ((priorities + 1) / 2);
+        for (int i = 0; i < priorities; i++)
         {
             _priorityLists[i] = new SimpleQueueEntryList(queue);
         }
+
+        showUsage("Created:" + _queue.getName());
     }
 
     public int getPriorities()
@@ -54,33 +56,149 @@
 
     public QueueEntry add(AMQMessage message)
     {
-        int index = ((CommonContentHeaderProperties)((message.getContentHeaderBody().properties))).getPriority() - _priorityOffset;
-        if(index >= _priorities)
+        int index = ((CommonContentHeaderProperties) ((message.getContentHeaderBody().properties))).getPriority() - _priorityOffset;
+        if (index >= _priorities)
         {
-            index = _priorities-1;
+            index = _priorities - 1;
         }
-        else if(index < 0)
+        else if (index < 0)
         {
             index = 0;
         }
+
+        long requriedSize = message.getSize();
+        // Check and see if list would flow on adding message
+        if (!_disabled && !isFlowed() && _priorityLists[index].memoryUsed() + requriedSize > _priorityLists[index].getMemoryUsageMaximum())
+        {
+            if (_log.isDebugEnabled())
+            {
+                _log.debug("Message(" + message.debugIdentity() + ") Add of size ("
+                              + requriedSize + ") will cause flow. Searching for space");
+            }
+
+            long reclaimed = 0;
+
+            //work down the priorities looking for memory
+            int scavangeIndex = _priorities - 1;
+
+            //First: Don't take all the memory. So look for a queue that has more than 50% free
+
+            long currentMax;
+
+            while (scavangeIndex >= 0 && reclaimed <= requriedSize)
+            {
+                currentMax = _priorityLists[scavangeIndex].getMemoryUsageMaximum();
+                long used = _priorityLists[scavangeIndex].memoryUsed();
+
+                if (used < currentMax / 2)
+                {
+                    long newMax = currentMax / 2;
+
+                    _priorityLists[scavangeIndex].setMemoryUsageMaximum(newMax);
+
+                    reclaimed += currentMax - newMax;
+                    if (_log.isDebugEnabled())
+                    {
+                        _log.debug("Reclaiming(1) :" + (currentMax - newMax) + "(" + reclaimed + "/" + requriedSize + ") from queue:" + scavangeIndex);
+                    }
+                    break;
+                }
+                else
+                {
+                    scavangeIndex--;
+                }
+            }
+
+            //Second: Just take the memory we need
+            if (scavangeIndex == -1)
+            {
+                scavangeIndex = _priorities - 1;
+                while (scavangeIndex >= 0 && reclaimed <= requriedSize)
+                {
+                    currentMax = _priorityLists[scavangeIndex].getMemoryUsageMaximum();
+                    long used = _priorityLists[scavangeIndex].memoryUsed();
+
+                    if (used < currentMax)
+                    {
+                        long newMax = currentMax - used;
+
+                        // if there are no messages at this priority just take it all
+                        if (newMax == currentMax)
+                        {
+                            newMax = 0;
+                        }
+
+                        _priorityLists[scavangeIndex].setMemoryUsageMaximum(newMax);
+
+                        reclaimed += currentMax - newMax;
+                        if (_log.isDebugEnabled())
+                        {
+                            _log.debug("Reclaiming(2) :" + (currentMax - newMax) + "(" + reclaimed + "/" + requriedSize + ") from queue:" + scavangeIndex);
+                        }
+                        break;
+                    }
+                    else
+                    {
+                        scavangeIndex--;
+                    }
+                }
+            }
+
+            //Increment Maximum
+            if (reclaimed > 0)
+            {
+                if (_log.isDebugEnabled())
+                {
+                    _log.debug("Increasing queue(" + index + ") maximum by " + reclaimed
+                               + " to " + (_priorityLists[index].getMemoryUsageMaximum() + reclaimed));
+                }
+                _priorityLists[index].setMemoryUsageMaximum(_priorityLists[index].getMemoryUsageMaximum() + reclaimed);
+            }
+            else
+            {
+                _log.debug("No space found.");
+            }
+
+            showUsage();
+        }
+
         return _priorityLists[index].add(message);
     }
 
+    @Override
+    protected void showUsage(String prefix)
+    {
+        if (_log.isDebugEnabled())
+        {
+            _log.debug(prefix);
+            for (int index = 0; index < _priorities; index++)
+            {
+                QueueEntryList queueEntryList = _priorityLists[index];
+                _log.debug("Queue (" + _queue + ":" + _queue.getName() + ")[" + index + "] usage:" + queueEntryList.memoryUsed()
+                           + "/" + queueEntryList.getMemoryUsageMaximum()
+                           + "/" + queueEntryList.dataSize());
+            }
+        }
+    }
+
     public QueueEntry next(QueueEntry node)
     {
-        QueueEntryImpl nodeImpl = (QueueEntryImpl)node;
+        QueueEntryImpl nodeImpl = (QueueEntryImpl) node;
         QueueEntry next = nodeImpl.getNext();
 
-        if(next == null)
+        if (next == null)
         {
             QueueEntryList nodeEntryList = nodeImpl.getQueueEntryList();
             int index;
-            for(index = _priorityLists.length-1; _priorityLists[index] != nodeEntryList; index--);
+            for (index = _priorityLists.length - 1; _priorityLists[index] != nodeEntryList; index--)
+            {
+                ;
+            }
 
-            while(next == null && index != 0)
+            while (next == null && index != 0)
             {
                 index--;
-                next = ((QueueEntryImpl)_priorityLists[index].getHead()).getNext();
+                next = ((QueueEntryImpl) _priorityLists[index].getHead()).getNext();
             }
 
         }
@@ -89,24 +207,23 @@
 
     private final class PriorityQueueEntryListIterator implements QueueEntryIterator
     {
-        private final QueueEntryIterator[] _iterators = new QueueEntryIterator[ _priorityLists.length ];
+        private final QueueEntryIterator[] _iterators = new QueueEntryIterator[_priorityLists.length];
         private QueueEntry _lastNode;
 
         PriorityQueueEntryListIterator()
         {
-            for(int i = 0; i < _priorityLists.length; i++)
+            for (int i = 0; i < _priorityLists.length; i++)
             {
                 _iterators[i] = _priorityLists[i].iterator();
             }
             _lastNode = _iterators[_iterators.length - 1].getNode();
         }
 
-
         public boolean atTail()
         {
-            for(int i = 0; i < _iterators.length; i++)
+            for (int i = 0; i < _iterators.length; i++)
             {
-                if(!_iterators[i].atTail())
+                if (!_iterators[i].atTail())
                 {
                     return false;
                 }
@@ -121,9 +238,9 @@
 
         public boolean advance()
         {
-            for(int i = _iterators.length-1; i >= 0; i--)
+            for (int i = _iterators.length - 1; i >= 0; i--)
             {
-                if(_iterators[i].advance())
+                if (_iterators[i].advance())
                 {
                     _lastNode = _iterators[i].getNode();
                     return true;
@@ -140,7 +257,7 @@
 
     public QueueEntry getHead()
     {
-        return _priorityLists[_priorities-1].getHead();
+        return _priorityLists[_priorities - 1].getHead();
     }
 
     static class Factory implements QueueEntryListFactory
@@ -152,17 +269,31 @@
             _priorities = priorities;
         }
 
-        public FlowableQueueEntryList createQueueEntryList(AMQQueue queue)
+        public QueueEntryList createQueueEntryList(AMQQueue queue)
         {
             return new PriorityQueueEntryList(queue, _priorities);
         }
     }
 
     @Override
+    public boolean isFlowed()
+    {
+        boolean flowed = false;
+        boolean full = true;
+        showUsage();
+        for (QueueEntryList queueEntryList : _priorityLists)
+        {
+            full = full && queueEntryList.getMemoryUsageMaximum() == queueEntryList.memoryUsed();
+            flowed = flowed || (queueEntryList.isFlowed());
+        }
+        return flowed && full;
+    }
+
+    @Override
     public int size()
     {
-        int size=0;
-        for (FlowableQueueEntryList queueEntryList : _priorityLists)
+        int size = 0;
+        for (QueueEntryList queueEntryList : _priorityLists)
         {
             size += queueEntryList.size();
         }
@@ -170,10 +301,96 @@
         return size;
     }
 
+    @Override
+    public long dataSize()
+    {
+        int dataSize = 0;
+        for (QueueEntryList queueEntryList : _priorityLists)
+        {
+            dataSize += queueEntryList.dataSize();
+        }
+
+        return dataSize;
+    }
+
+    @Override
+    public long memoryUsed()
+    {
+        int memoryUsed = 0;
+        for (QueueEntryList queueEntryList : _priorityLists)
+        {
+            memoryUsed += queueEntryList.memoryUsed();
+        }
+
+        return memoryUsed;
+    }
+
+    @Override
+    public void setMemoryUsageMaximum(long maximumMemoryUsage)
+    {
+        _memoryUsageMaximum = maximumMemoryUsage;
+
+        if (maximumMemoryUsage >= 0)
+        {
+            _disabled = false;
+        }
+
+        long share = maximumMemoryUsage / _priorities;
+
+        //Apply a share of the maximum To each prioirty quue
+        for (QueueEntryList queueEntryList : _priorityLists)
+        {
+            queueEntryList.setMemoryUsageMaximum(share);
+        }
+
+        if (maximumMemoryUsage < 0)
+        {
+            if (_log.isInfoEnabled())
+            {
+                _log.info("Disabling Flow to Disk for queue:" + _queue.getName());
+            }
+            _disabled = true;
+            return;
+        }
+
+        //ensure we use the full allocation of memory
+        long remainder = maximumMemoryUsage - (share * _priorities);
+        if (remainder > 0)
+        {
+            _priorityLists[_priorities - 1].setMemoryUsageMaximum(share + remainder);
+        }
+    }
+
+    @Override
+    public long getMemoryUsageMaximum()
+    {
+        return _memoryUsageMaximum;
+    }
 
     @Override
-    protected void flowingToDisk(QueueEntryImpl queueEntry)
+    public void setMemoryUsageMinimum(long minimumMemoryUsage)
     {
-        //This class doesn't maintain it's own sizes it delegates to the sub FlowableQueueEntryLists
+        _memoryUsageMinimum = minimumMemoryUsage;
+
+        //Apply a share of the minimum To each prioirty quue
+        for (QueueEntryList queueEntryList : _priorityLists)
+        {
+            queueEntryList.setMemoryUsageMaximum(minimumMemoryUsage / _priorities);
+        }
+    }
+
+    @Override
+    public long getMemoryUsageMinimum()
+    {
+        return _memoryUsageMinimum;
+    }
+
+    @Override
+    public void stop()
+    {
+        for (QueueEntryList queueEntryList : _priorityLists)
+        {
+            queueEntryList.stop();
+        }
     }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=749331&r1=749330&r2=749331&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Mon Mar  2 15:13:57 2009
@@ -221,7 +221,7 @@
 
     boolean removeStateChangeListener(StateChangeListener listener);
 
-    void unload() throws UnableToFlowMessageException;
+    void unload();
 
     void load();
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=749331&r1=749330&r2=749331&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Mon Mar  2 15:13:57 2009
@@ -393,17 +393,34 @@
         return false;
     }
 
-    public void unload() throws UnableToFlowMessageException
+    public void unload()
     {
         if (_message != null && _backingStore != null)
         {
-            if(_log.isDebugEnabled())
+
+            try
             {
-                _log.debug("Unloading:" + debugIdentity());
+                _backingStore.unload(_message);
+
+                if(_log.isDebugEnabled())
+                {
+                    _log.debug("Unloaded:" + debugIdentity());
+                }
+
+                _message = null;
+                //Update the memoryState if this load call resulted in the message being purged from memory                
+                if (!_flowed.getAndSet(true))
+                {
+                    _queueEntryList.unloadEntry(this);
+                }
+
+            } catch (UnableToFlowMessageException utfme) {
+                // There is no recovery needed as the memory states remain unchanged.
+                if(_log.isDebugEnabled())
+                {
+                    _log.debug("Unable to Flow message:" + debugIdentity() + ", due to:" + utfme.getMessage());
+                }
             }
-            _backingStore.unload(_message);
-            _message = null;
-            _flowed.getAndSet(true);
         }
     }
 
@@ -412,11 +429,17 @@
         if (_messageId != null && _backingStore != null)
         {
             _message = _backingStore.load(_messageId);
+
             if(_log.isDebugEnabled())
             {
-                _log.debug("Loading:" + debugIdentity());
+                _log.debug("Loaded:" + debugIdentity());
+            }
+
+            //Update the memoryState if this load call resulted in the message comming in to memory
+            if (_flowed.getAndSet(false))
+            {
+                _queueEntryList.loadEntry(this);
             }
-            _flowed.getAndSet(false);
         }
     }
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java?rev=749331&r1=749330&r2=749331&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java Mon Mar  2 15:13:57 2009
@@ -1,23 +1,23 @@
 /*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
 package org.apache.qpid.server.queue;
 
 public interface QueueEntryList
@@ -31,4 +31,36 @@
     QueueEntryIterator iterator();
 
     QueueEntry getHead();
+
+    void setFlowed(boolean flowed);
+
+    boolean isFlowed();
+
+    int size();
+
+    long dataSize();
+
+    long memoryUsed();
+
+    void setMemoryUsageMaximum(long maximumMemoryUsage);
+
+    long getMemoryUsageMaximum();
+
+    void setMemoryUsageMinimum(long minimumMemoryUsage);
+
+    long getMemoryUsageMinimum();
+
+    /**
+     * Immediately update memory usage based on the unload of this queueEntry, potentially start inhaler.
+     * @param queueEntry the entry that has been unloaded
+     */
+    void unloadEntry(QueueEntry queueEntry);
+
+    /**
+     * Immediately update memory usage based on the load of this queueEntry
+     * @param queueEntry the entry that has been loaded
+     */
+    void loadEntry(QueueEntry queueEntry);
+
+    void stop();
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java?rev=749331&r1=749330&r2=749331&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java Mon Mar  2 15:13:57 2009
@@ -22,5 +22,5 @@
 
 interface QueueEntryListFactory
 {
-    public FlowableQueueEntryList createQueueEntryList(AMQQueue queue);
+    public QueueEntryList createQueueEntryList(AMQQueue queue);
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=749331&r1=749330&r2=749331&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Mon Mar  2 15:13:57 2009
@@ -79,7 +79,7 @@
 
     private volatile Subscription _exclusiveSubscriber;
 
-    protected final FlowableQueueEntryList _entries;
+    protected final QueueEntryList _entries;
 
     private final AMQQueueMBean _managedObject;
     private final Executor _asyncDelivery;
@@ -468,8 +468,7 @@
 
         if (entry.isFlowed())
         {
-            _logger.debug("Synchronously loading flowed entry:" + entry.debugIdentity());
-            _entries.loadEntry(entry);
+            entry.load();
         }
 
         sub.send(entry);
@@ -477,7 +476,7 @@
         // We have delivered this message so we can unload it.
         if (_entries.isFlowed() && entry.isAcquired() && entry.getDeliveredToConsumer())
         {
-            _entries.unloadEntry(entry);
+            entry.unload();
         }
 
     }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java?rev=749331&r1=749330&r2=749331&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java Mon Mar  2 15:13:57 2009
@@ -1,9 +1,6 @@
 package org.apache.qpid.server.queue;
 
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 
 /*
 *
@@ -172,7 +169,7 @@
     static class Factory implements QueueEntryListFactory
     {
 
-        public FlowableQueueEntryList createQueueEntryList(AMQQueue queue)
+        public QueueEntryList createQueueEntryList(AMQQueue queue)
         {
             return new SimpleQueueEntryList(queue);
         }

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java?rev=749331&r1=749330&r2=749331&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java Mon Mar  2 15:13:57 2009
@@ -351,7 +351,7 @@
                     return false;  //To change body of implemented methods use File | Settings | File Templates.
                 }
 
-                public void unload() throws UnableToFlowMessageException
+                public void unload()
                 {
                     //To change body of implemented methods use File | Settings | File Templates.
                 }

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java?rev=749331&r1=749330&r2=749331&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java Mon Mar  2 15:13:57 2009
@@ -24,18 +24,20 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.txn.NonTransactionalContext;
 
 import java.util.ArrayList;
 
 public class AMQPriorityQueueTest extends SimpleAMQQueueTest
 {
-    private static final long MESSAGE_SIZE = 100L;
+    private static final int PRIORITIES = 3;
 
     @Override
     protected void setUp() throws Exception
-    {
+    {        
         _arguments = new FieldTable();
-        _arguments.put(AMQQueueFactory.X_QPID_PRIORITIES, 3);
+        _arguments.put(AMQQueueFactory.X_QPID_PRIORITIES, PRIORITIES);
         super.setUp();
     }
 
@@ -84,7 +86,6 @@
             int index = 1;
             for (QueueEntry qe : msgs)
             {
-                System.err.println(index + ":" + qe.getMessage().getMessageId());
                 index++;
             }
 
@@ -96,16 +97,91 @@
     protected AMQMessage createMessage(byte i) throws AMQException
     {
         AMQMessage message = super.createMessage();
-                
-        ((BasicContentHeaderProperties)message.getContentHeaderBody().properties).setPriority(i);
+
+        ((BasicContentHeaderProperties) message.getContentHeaderBody().properties).setPriority(i);
 
         return message;
     }
 
-    @Override
-    public void testMessagesFlowToDisk() throws AMQException, InterruptedException
+
+    public void testMessagesFlowToDiskWithPriority() throws AMQException, InterruptedException
     {
-        //Disable this test pending completion of QPID-1637
+        int PRIORITIES = 1;
+        FieldTable arguments = new FieldTable();
+        arguments.put(AMQQueueFactory.X_QPID_PRIORITIES, PRIORITIES);
+
+        // Create IncomingMessage and nondurable queue
+        NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null);
+
+        //Create a priorityQueue
+        _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testMessagesFlowToDiskWithPriority"), false, _owner, false, _virtualHost, arguments);
+
+        MESSAGE_SIZE = 1;
+        long MEMORY_MAX = PRIORITIES * 2;
+        int MESSAGE_COUNT = (int) MEMORY_MAX * 2;
+        //Set the Memory Usage to be very low
+        _queue.setMemoryUsageMaximum(MEMORY_MAX);
+
+        for (int msgCount = 0; msgCount < MESSAGE_COUNT / 2; msgCount++)
+        {
+            sendMessage(txnContext, (msgCount % 10));
+        }
+
+        //Check that we can hold 10 messages without flowing
+        assertEquals(MESSAGE_COUNT / 2, _queue.getMessageCount());
+        assertEquals(MEMORY_MAX, _queue.getMemoryUsageMaximum());        
+        assertEquals(_queue.getMemoryUsageMaximum(), _queue.getMemoryUsageCurrent());
+        assertTrue("Queue is flowed.", !_queue.isFlowed());
+
+        // Send another and ensure we are flowed
+        sendMessage(txnContext);
+        
+        assertTrue("Queue is not flowed.", _queue.isFlowed());
+        assertEquals(MESSAGE_COUNT / 2 + 1, _queue.getMessageCount());
+        assertEquals(MESSAGE_COUNT / 2, _queue.getMemoryUsageCurrent());
+
+
+        //send another 99 so there are 200msgs in total on the queue
+        for (int msgCount = 0; msgCount < (MESSAGE_COUNT / 2) - 1; msgCount++)
+        {
+            sendMessage(txnContext);
+
+            long usage = _queue.getMemoryUsageCurrent();
+            assertTrue("Queue has gone over quota:" + usage,
+                       usage <= _queue.getMemoryUsageMaximum());
+
+            assertTrue("Queue has a negative quota:" + usage, usage > 0);
+
+        }
+        assertEquals(MESSAGE_COUNT, _queue.getMessageCount());
+        assertEquals(MEMORY_MAX, _queue.getMemoryUsageCurrent());
+        assertTrue("Queue is not flowed.", _queue.isFlowed());
+
+        _queue.registerSubscription(_subscription, false);
+
+        int slept = 0;
+        while (_subscription.getQueueEntries().size() != MESSAGE_COUNT && slept < 10)
+        {
+            Thread.sleep(500);
+            slept++;
+        }
+
+        //Ensure the messages are retreived
+        assertEquals("Not all messages were received, slept:" + slept / 2 + "s", MESSAGE_COUNT, _subscription.getQueueEntries().size());
+
+        //Check the queue is still within it's limits.
+        assertTrue("Queue has gone over quota:" + _queue.getMemoryUsageCurrent(),
+                   _queue.getMemoryUsageCurrent() <= _queue.getMemoryUsageMaximum());
+
+        assertTrue("Queue has a negative quota:" + _queue.getMemoryUsageCurrent(), _queue.getMemoryUsageCurrent() >= 0);
+
+        for (int index = 0; index < MESSAGE_COUNT; index++)
+        {
+            // Ensure that we have received the messages and it wasn't flushed to disk before we received it.
+            AMQMessage message = _subscription.getMessages().get(index);
+            assertNotNull("Message:" + message.debugIdentity() + " was null.", message);
+        }
+
     }
 
 }

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=749331&r1=749330&r2=749331&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java Mon Mar  2 15:13:57 2009
@@ -20,7 +20,6 @@
  * 
  */
 
-import junit.framework.AssertionFailedError;
 import junit.framework.TestCase;
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.qpid.AMQException;
@@ -60,7 +59,7 @@
     protected FieldTable _arguments = null;
 
     MessagePublishInfo info = new MessagePublishInfoImpl();
-    private static long MESSAGE_SIZE = 100;
+    protected static long MESSAGE_SIZE = 100;
 
     @Override
     protected void setUp() throws Exception
@@ -368,7 +367,7 @@
         long MEMORY_MAX = 500;
         int MESSAGE_COUNT = (int) MEMORY_MAX * 2;
         //Set the Memory Usage to be very low
-        _queue.setMemoryUsageMaximum(MEMORY_MAX);        
+        _queue.setMemoryUsageMaximum(MEMORY_MAX);
 
         for (int msgCount = 0; msgCount < MESSAGE_COUNT / 2; msgCount++)
         {
@@ -395,7 +394,7 @@
             assertTrue("Queue has gone over quota:" + usage,
                        usage <= _queue.getMemoryUsageMaximum());
 
-            assertTrue("Queue has a negative quota:" + usage,usage  > 0);
+            assertTrue("Queue has a negative quota:" + usage, usage > 0);
 
         }
         assertEquals(MESSAGE_COUNT, _queue.getMessageCount());
@@ -412,13 +411,14 @@
         }
 
         //Ensure the messages are retreived
-        assertEquals("Not all messages were received, slept:"+slept/2+"s", MESSAGE_COUNT, _subscription.getQueueEntries().size());
+        assertEquals("Not all messages were received, slept:" + slept / 2 + "s", MESSAGE_COUNT, _subscription.getQueueEntries().size());
 
         //Check the queue is still within it's limits.
-        assertTrue("Queue has gone over quota:" + _queue.getMemoryUsageCurrent(),
-                   _queue.getMemoryUsageCurrent() <= _queue.getMemoryUsageMaximum());
+        long current = _queue.getMemoryUsageCurrent();
+        assertTrue("Queue has gone over quota:" + current+"/"+_queue.getMemoryUsageMaximum() ,
+                   current <= _queue.getMemoryUsageMaximum());
 
-        assertTrue("Queue has a negative quota:" + _queue.getMemoryUsageCurrent(), _queue.getMemoryUsageCurrent() > 0);
+        assertTrue("Queue has a negative quota:" + _queue.getMemoryUsageCurrent(), _queue.getMemoryUsageCurrent() >= 0);
 
         for (int index = 0; index < MESSAGE_COUNT; index++)
         {
@@ -426,10 +426,52 @@
             AMQMessage message = _subscription.getMessages().get(index);
             assertNotNull("Message:" + message.debugIdentity() + " was null.", message);
         }
+    }
+
+      public void testMessagesFlowToDiskPurger() throws AMQException, InterruptedException
+    {
+        // Create IncomingMessage and nondurable queue
+        NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null);
+
+        MESSAGE_SIZE = 1;
+        long MEMORY_MAX = 10;
+        int MESSAGE_COUNT = (int) MEMORY_MAX;
+        //Set the Memory Usage to be very low
+        _queue.setMemoryUsageMaximum(MEMORY_MAX);
 
+        for (int msgCount = 0; msgCount < MESSAGE_COUNT; msgCount++)
+        {
+            sendMessage(txnContext);
+        }
+
+        //Check that we can hold all messages without flowing
+        assertEquals(MESSAGE_COUNT, _queue.getMessageCount());
+        assertEquals(MEMORY_MAX, _queue.getMemoryUsageCurrent());
+        assertTrue("Queue is flowed.", !_queue.isFlowed());
+
+        // Send anothe and ensure we are flowed
+        sendMessage(txnContext);
+        assertEquals(MESSAGE_COUNT + 1, _queue.getMessageCount());
+        assertEquals(MESSAGE_COUNT , _queue.getMemoryUsageCurrent());
+        assertTrue("Queue is not flowed.", _queue.isFlowed());
+
+        _queue.setMemoryUsageMaximum(0L);
+
+        //Give the purger time to work
+        Thread.sleep(200);
+
+        assertEquals(MESSAGE_COUNT + 1, _queue.getMessageCount());
+        assertEquals(0L , _queue.getMemoryUsageCurrent());
+        assertTrue("Queue is not flowed.", _queue.isFlowed());
+
+    }
+
+    protected void sendMessage(TransactionalContext txnContext) throws AMQException
+    {
+        sendMessage(txnContext, 5);
     }
 
-    private void sendMessage(TransactionalContext txnContext) throws AMQException
+    protected void sendMessage(TransactionalContext txnContext, int priority) throws AMQException
     {
         IncomingMessage msg = new IncomingMessage(info, txnContext, new MockProtocolSession(_transactionLog), _transactionLog);
 
@@ -438,6 +480,7 @@
         contentHeaderBody.bodySize = MESSAGE_SIZE;
         contentHeaderBody.properties = new BasicContentHeaderProperties();
         ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) 2);
+        ((BasicContentHeaderProperties) contentHeaderBody.properties).setPriority((byte) priority);
         msg.setContentHeaderBody(contentHeaderBody);
 
         long messageId = msg.getMessageId();

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java?rev=749331&r1=749330&r2=749331&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java Mon Mar  2 15:13:57 2009
@@ -48,8 +48,9 @@
 
             //This is +2 because:
             // 1 - asyncDelivery Thread
-            // 2 - queueInhalerThread
-            assertEquals("References not increased", initialCount + 2, ReferenceCountingExecutorService.getInstance().getReferenceCount());
+            // 2 - queue InhalerThread
+            // 3 - queue PurgerThread
+            assertEquals("References not increased", initialCount + 3, ReferenceCountingExecutorService.getInstance().getReferenceCount());
             
             queue.stop();
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org