You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2009/04/03 16:15:08 UTC

svn commit: r761688 - /qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java

Author: ritchiem
Date: Fri Apr  3 14:15:08 2009
New Revision: 761688

URL: http://svn.apache.org/viewvc?rev=761688&view=rev
Log:
QPID-1784 Update to FlowableBaseQueueEntryList to ensure that the inhaler and purger threads will stop when the inMemory values are within the correct range.

Merge of r761671 and r761674 from trunk

Modified:
    qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java

Modified: qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java?rev=761688&r1=761687&r2=761688&view=diff
==============================================================================
--- qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java (original)
+++ qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java Fri Apr  3 14:15:08 2009
@@ -359,7 +359,12 @@
         _asynchronousInhaler.compareAndSet(messageInhaler, null);
         int inhaled = 1;
 
+        //Because we may not be able to totally fill up to _memoryUsageMaximum we need to be able to say we've done
+        // enough loading and this inhale process should stop
+        boolean finshedInhaling = false;
+
         while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) // we havn't filled our max memory
+               && !finshedInhaling // Have we loaded all we can fit into memory
                && (_atomicQueueInMemory.get() < _atomicQueueSize.get()) // we haven't loaded all that is available
                && (inhaled < BATCH_PROCESS_COUNT) // limit the number of runs we do
                && (inhaled > 0) // ensure we could inhale something
@@ -379,7 +384,9 @@
             // we won't have checked the last entry to see if we can load it. So create atEndofList and update it based
             // on the return from advance() which returns true if it can advance.
             boolean atEndofList = false;
-            while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) // we havn't filled our max memory
+
+            while ((_atomicQueueInMemory.get() <= _memoryUsageMaximum) // we haven't filled our max memory
+                   && !finshedInhaling // Have we loaded all we can fit into memory
                    && (inhaled < BATCH_PROCESS_COUNT) // limit the number of runs we do
                    && !atEndofList) // We have reached end of list QueueEntries
             {
@@ -394,7 +401,7 @@
                         {
                             _log.debug("Entry won't fit in memory stopping inhaler:" + entry.debugIdentity());
                         }
-                        inhaled = BATCH_PROCESS_COUNT;
+                        finshedInhaling = true;
                     }
                     else
                     {
@@ -421,7 +428,7 @@
         }
 
         //If we have become flowed or have more capacity since we stopped then schedule the thread to run again.
-        if (_flowed.get() && _atomicQueueInMemory.get() < _memoryUsageMaximum)
+        if (!finshedInhaling && _flowed.get() && _atomicQueueInMemory.get() < _memoryUsageMaximum)
         {
             if (_log.isInfoEnabled())
             {
@@ -471,7 +478,7 @@
         _asynchronousPurger.compareAndSet(messagePurger, null);
         int purged = 0;
 
-        while ((_atomicQueueInMemory.get() > _memoryUsageMinimum)
+        while ((_atomicQueueInMemory.get() > _memoryUsageMaximum)
                && purged < BATCH_PROCESS_COUNT
                && _asynchronousPurger.compareAndSet(null, messagePurger))
         {
@@ -496,6 +503,12 @@
                 if (entry.isAvailable() && !entry.isFlowed())
                 {
                     memoryUsage += entry.getSize();
+                    // If this message is what puts us over the limit then break
+                    // out of this loop as we need to purge this item.
+                    if (memoryUsage > _memoryUsageMaximum)
+                    {
+                        break;
+                    }
                 }
 
                 atTail = !iterator.advance();
@@ -525,7 +538,7 @@
         }
 
         //If we are still flowed and are over the minimum value then schedule to run again.
-        if (_flowed.get() && _atomicQueueInMemory.get() > _memoryUsageMinimum)
+        if (_flowed.get() && _atomicQueueInMemory.get() > _memoryUsageMaximum)
         {
             _log.info("Rescheduling Purger:" + _queue.getName());
             _purger.execute(messagePurger);



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org