You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2009/03/03 20:00:02 UTC

svn commit: r749699 - in /qpid/trunk/qpid/java/broker/src: main/java/org/apache/qpid/server/queue/ test/java/org/apache/qpid/server/queue/

Author: ritchiem
Date: Tue Mar  3 19:00:02 2009
New Revision: 749699

URL: http://svn.apache.org/viewvc?rev=749699&view=rev
Log:
QPID-1637 : Update to test to correctly use priority queues in test. Fixed big in inhaler/purger to ensure priority data is correctly reloaded.

Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java?rev=749699&r1=749698&r2=749699&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java Tue Mar  3 19:00:02 2009
@@ -53,7 +53,7 @@
     private AtomicReference<MessageInhaler> _asynchronousInhaler = new AtomicReference(null);
     protected boolean _disabled;
     private AtomicReference<MessagePurger> _asynchronousPurger = new AtomicReference(null);
-    private static final int BATCH_INHALE_COUNT = 100;
+    private static final int BATCH_PROCESS_COUNT = 100;
 
     FlowableBaseQueueEntryList(AMQQueue queue)
     {
@@ -76,7 +76,6 @@
         if (_flowed.get() != flowed)
         {
             _log.warn("Marking Queue(" + _queue.getName() + ") as flowed (" + flowed + ")");
-            showUsage();
             _flowed.set(flowed);
         }
     }
@@ -126,20 +125,14 @@
         }
 
         // Don't attempt to start the inhaler/purger unless we have a minimum value specified.
-        if (_memoryUsageMaximum > 0)
+        if (_memoryUsageMaximum >= 0)
         {
             setMemoryUsageMinimum(_memoryUsageMaximum / 2);
 
             // if we have now have to much memory in use we need to purge.
             if (_memoryUsageMaximum < _atomicQueueInMemory.get())
             {
-                startPurger();
-            }
-        }
-        else if (_memoryUsageMaximum == 0)
-        {
-            if (_atomicQueueInMemory.get() > 0)
-            {
+                setFlowed(true);
                 startPurger();
             }
         }
@@ -165,16 +158,15 @@
         // Don't attempt to start the inhaler unless we have a minimum value specified.
         if (_memoryUsageMinimum > 0)
         {
-            checkAndStartLoader();
+            checkAndStartInhaler();
         }
     }
 
-    private void checkAndStartLoader()
+    private void checkAndStartInhaler()
     {
-        // If we've increased the minimum memory above what we have in memory then we need to inhale more
-        long inMemory = _atomicQueueInMemory.get();
-        // Can't check if inMemory == 0 or we will cause the inhaler thread to continually run.
-        if (inMemory < _memoryUsageMinimum || _memoryUsageMinimum == 0)
+        // If we've increased the minimum memory above what we have in memory then
+        // we need to inhale more if there is more
+        if (_atomicQueueInMemory.get() < _memoryUsageMinimum && _atomicQueueSize.get() > 0)
         {
             startInhaler();
         }
@@ -210,13 +202,14 @@
      *
      * @param queueEntry the entry to unload
      */
-    public void unloadEntry(QueueEntry queueEntry)
+    public void entryUnloadedUpdateMemory(QueueEntry queueEntry)
     {
         if (_atomicQueueInMemory.addAndGet(-queueEntry.getSize()) < 0)
         {
             _log.error("InMemory Count just went below 0:" + queueEntry.debugIdentity());
         }
-        checkAndStartLoader();
+
+        checkAndStartInhaler();
     }
 
     /**
@@ -224,11 +217,13 @@
      *
      * @param queueEntry the entry to load
      */
-    public void loadEntry(QueueEntry queueEntry)
+    public void entryLoadedUpdateMemory(QueueEntry queueEntry)
     {
         if (_atomicQueueInMemory.addAndGet(queueEntry.getSize()) > _memoryUsageMaximum)
         {
             _log.error("Loaded to much data!:" + _atomicQueueInMemory.get() + "/" + _memoryUsageMaximum);
+            setFlowed(true);
+            startPurger();
         }
     }
 
@@ -311,11 +306,15 @@
         }
 
         _asynchronousInhaler.compareAndSet(messageInhaler, null);
-        int inhaled = 0;
+        int inhaled = 1;
 
-        while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) && (inhaled < BATCH_INHALE_COUNT)
-               && _asynchronousInhaler.compareAndSet(null, messageInhaler))
+        while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) // we havn't filled our max memory
+               && (_atomicQueueInMemory.get() < _atomicQueueSize.get()) // we haven't loaded all that is available
+               && (inhaled < BATCH_PROCESS_COUNT) // limit the number of runs we do
+               && (inhaled > 0) // ensure we could inhale something
+               && _asynchronousInhaler.compareAndSet(null, messageInhaler)) // Ensure we are the running inhaler
         {
+            inhaled = 0;
             QueueEntryIterator iterator = iterator();
 
             // If the inhaler is running and delivery rate picks up ensure that we just don't chase the delivery thread.
@@ -325,7 +324,13 @@
                 //Find first AVAILABLE node
             }
 
-            while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) && (inhaled < BATCH_INHALE_COUNT) && !iterator.atTail())
+            // Because the above loop checks then moves on to the next entry a check for atTail will return true but
+            // we won't have checked the last entry to see if we can load it. So create atEndofList and update it based
+            // on the return from advance() which returns true if it can advance.
+            boolean atEndofList = false;
+            while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) // we havn't filled our max memory
+                   && (inhaled < BATCH_PROCESS_COUNT) // limit the number of runs we do
+                   && !atEndofList) // We have reached end of list QueueEntries
             {
                 QueueEntry entry = iterator.getNode();
 
@@ -334,16 +339,20 @@
                     if (_atomicQueueInMemory.get() + entry.getSize() > _memoryUsageMaximum)
                     {
                         // We don't have space for this message so we need to stop inhaling.
-                        inhaled = BATCH_INHALE_COUNT;
+                        if (_log.isDebugEnabled())
+                        {
+                            _log.debug("Entry won't fit in memory stopping inhaler:" + entry.debugIdentity());
+                        }
+                        inhaled = BATCH_PROCESS_COUNT;
                     }
                     else
                     {
-                        loadEntry(entry);
+                        entry.load();
                         inhaled++;
                     }
                 }
 
-                iterator.advance();
+                atEndofList = !iterator.advance();
             }
 
             if (iterator.atTail())
@@ -402,20 +411,34 @@
             return;
         }
 
+        if (_log.isInfoEnabled())
+        {
+            _log.info("Purger Running:" + _queue.getName());
+            showUsage("Purger Running:" + _queue.getName());
+        }
+
         _asynchronousPurger.compareAndSet(messagePurger, null);
+        int purged = 0;
 
-        while ((_atomicQueueInMemory.get() >= _memoryUsageMinimum) && _asynchronousPurger.compareAndSet(null, messagePurger))
+        while ((_atomicQueueInMemory.get() > _memoryUsageMinimum)
+               && purged < BATCH_PROCESS_COUNT
+               && _asynchronousPurger.compareAndSet(null, messagePurger))
         {
             QueueEntryIterator iterator = iterator();
 
+            //There are potentially AQUIRED messages that can be purged but we can't purge the last AQUIRED message
+            // as it may have just become AQUIRED and not yet delivered.
+
+            //To be safe only purge available messages. This should be fine as long as we have a small prefetch.
             while (!iterator.getNode().isAvailable() && iterator.advance())
             {
                 //Find first AVAILABLE node
             }
 
-            // Count up the memory usage
+            // Count up the memory usage to find our minimum point
             long memoryUsage = 0;
-            while ((memoryUsage < _memoryUsageMaximum) && !iterator.atTail())
+            boolean atTail = false;
+            while ((memoryUsage < _memoryUsageMaximum) && !atTail)
             {
                 QueueEntry entry = iterator.getNode();
 
@@ -424,30 +447,37 @@
                     memoryUsage += entry.getSize();
                 }
 
-                iterator.advance();
+                atTail = !iterator.advance();
             }
 
             //Purge remainging mesages on queue
-            while (!iterator.atTail())
+            while (!atTail && (purged < BATCH_PROCESS_COUNT))
             {
                 QueueEntry entry = iterator.getNode();
 
                 if (entry.isAvailable() && !entry.isFlowed())
                 {
                     entry.unload();
+                    purged++;
                 }
 
-                iterator.advance();
+                atTail = !iterator.advance();
             }
 
-            _asynchronousInhaler.set(null);
+            _asynchronousPurger.set(null);
         }
 
-        //If we have become flowed or have more capacity since we stopped then schedule the thread to run again.
-        if (_flowed.get() && _atomicQueueInMemory.get() < _memoryUsageMaximum)
+        if (_log.isInfoEnabled())
         {
-            _inhaler.execute(messagePurger);
+            _log.info("Purger Stopping:" + _queue.getName());
+            showUsage("Purger Stopping:" + _queue.getName());
+        }
 
+        //If we are still flowed and are over the minimum value then schedule to run again.
+        if (_flowed.get() && _atomicQueueInMemory.get() > _memoryUsageMinimum)
+        {
+            _log.info("Rescheduling Purger:" + _queue.getName());
+            _purger.execute(messagePurger);
         }
     }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java?rev=749699&r1=749698&r2=749699&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java Tue Mar  3 19:00:02 2009
@@ -79,13 +79,17 @@
             long reclaimed = 0;
 
             //work down the priorities looking for memory
-            int scavangeIndex = _priorities - 1;
 
             //First: Don't take all the memory. So look for a queue that has more than 50% free
-
             long currentMax;
+            int scavangeIndex = 0;
+
+            if (scavangeIndex == index)
+            {
+                scavangeIndex++;
+            }
 
-            while (scavangeIndex >= 0 && reclaimed <= requriedSize)
+            while (scavangeIndex < _priorities  && reclaimed <= requriedSize)
             {
                 currentMax = _priorityLists[scavangeIndex].getMemoryUsageMaximum();
                 long used = _priorityLists[scavangeIndex].memoryUsed();
@@ -105,15 +109,24 @@
                 }
                 else
                 {
-                    scavangeIndex--;
+                    scavangeIndex++;
+                    if (scavangeIndex == index)
+                    {
+                        scavangeIndex++;
+                    }
                 }
-            }
+            }                                   
 
-            //Second: Just take the memory we need
-            if (scavangeIndex == -1)
+            //Second: Just take the free memory we need
+            if (scavangeIndex == _priorities)
             {
-                scavangeIndex = _priorities - 1;
-                while (scavangeIndex >= 0 && reclaimed <= requriedSize)
+                scavangeIndex = 0;
+                if (scavangeIndex == index)
+                {
+                    scavangeIndex++;
+                }
+
+                while (scavangeIndex < _priorities && reclaimed <= requriedSize)
                 {
                     currentMax = _priorityLists[scavangeIndex].getMemoryUsageMaximum();
                     long used = _priorityLists[scavangeIndex].memoryUsed();
@@ -139,7 +152,51 @@
                     }
                     else
                     {
-                        scavangeIndex--;
+                        scavangeIndex++;
+                        if (scavangeIndex == index)
+                        {
+                            scavangeIndex++;
+                        }
+                    }
+                }
+
+                //Third: Take required memory
+                if (scavangeIndex == _priorities)
+                {
+                    scavangeIndex = 0;
+                    if (scavangeIndex == index)
+                    {
+                        scavangeIndex++;
+                    }
+                    while (scavangeIndex < _priorities && reclaimed <= requriedSize)
+                    {
+                        currentMax = _priorityLists[scavangeIndex].getMemoryUsageMaximum();
+
+                        if (currentMax > 0 )
+                        {
+                            long newMax = currentMax;
+                            // Just take the amount of space required for this message.
+                            if (newMax > requriedSize)
+                            {
+                                newMax = requriedSize;
+                            }
+                            _priorityLists[scavangeIndex].setMemoryUsageMaximum(newMax);
+
+                            reclaimed += currentMax - newMax;
+                            if (_log.isDebugEnabled())
+                            {
+                                _log.debug("Reclaiming(3) :" + (currentMax - newMax) + "(" + reclaimed + "/" + requriedSize + ") from queue:" + scavangeIndex);
+                            }
+                            break;
+                        }
+                        else
+                        {
+                            scavangeIndex++;
+                            if (scavangeIndex == index)
+                            {
+                                scavangeIndex++;
+                            }
+                        }
                     }
                 }
             }
@@ -159,7 +216,10 @@
                 _log.debug("No space found.");
             }
 
-            showUsage();
+            if (_log.isTraceEnabled())
+            {
+                showUsage("Add");
+            }
         }
 
         return _priorityLists[index].add(message);
@@ -170,7 +230,10 @@
     {
         if (_log.isDebugEnabled())
         {
-            _log.debug(prefix);
+            if (prefix.length() != 0)
+            {
+                _log.debug(prefix);
+            }
             for (int index = 0; index < _priorities; index++)
             {
                 QueueEntryList queueEntryList = _priorityLists[index];
@@ -280,10 +343,16 @@
     {
         boolean flowed = false;
         boolean full = true;
-        showUsage();
+
+        if (_log.isTraceEnabled())
+        {
+            showUsage("isFlowed");
+        }
+
         for (QueueEntryList queueEntryList : _priorityLists)
         {
-            full = full && queueEntryList.getMemoryUsageMaximum() == queueEntryList.memoryUsed();
+            //full = full && queueEntryList.getMemoryUsageMaximum() == queueEntryList.memoryUsed();
+            full = full && queueEntryList.getMemoryUsageMaximum() <= queueEntryList.dataSize();
             flowed = flowed || (queueEntryList.isFlowed());
         }
         return flowed && full;

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=749699&r1=749698&r2=749699&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Tue Mar  3 19:00:02 2009
@@ -411,7 +411,7 @@
                 //Update the memoryState if this load call resulted in the message being purged from memory                
                 if (!_flowed.getAndSet(true))
                 {
-                    _queueEntryList.unloadEntry(this);
+                    _queueEntryList.entryUnloadedUpdateMemory(this);
                 }
 
             } catch (UnableToFlowMessageException utfme) {
@@ -438,7 +438,7 @@
             //Update the memoryState if this load call resulted in the message comming in to memory
             if (_flowed.getAndSet(false))
             {
-                _queueEntryList.loadEntry(this);
+                _queueEntryList.entryLoadedUpdateMemory(this);
             }
         }
     }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java?rev=749699&r1=749698&r2=749699&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java Tue Mar  3 19:00:02 2009
@@ -54,13 +54,13 @@
      * Immediately update memory usage based on the unload of this queueEntry, potentially start inhaler.
      * @param queueEntry the entry that has been unloaded
      */
-    void unloadEntry(QueueEntry queueEntry);
+    void entryUnloadedUpdateMemory(QueueEntry queueEntry);
 
     /**
      * Immediately update memory usage based on the load of this queueEntry
      * @param queueEntry the entry that has been loaded
      */
-    void loadEntry(QueueEntry queueEntry);
+    void entryLoadedUpdateMemory(QueueEntry queueEntry);
 
     void stop();
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=749699&r1=749698&r2=749699&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Tue Mar  3 19:00:02 2009
@@ -468,6 +468,10 @@
 
         if (entry.isFlowed())
         {
+            if(_logger.isDebugEnabled())
+            {
+                _logger.debug("Synchoronus load of entry:" + entry.debugIdentity());
+            }
             entry.load();
         }
 

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java?rev=749699&r1=749698&r2=749699&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java Tue Mar  3 19:00:02 2009
@@ -134,17 +134,21 @@
         assertTrue("Queue is flowed.", !_queue.isFlowed());
 
         // Send another and ensure we are flowed
-        sendMessage(txnContext);
-        
+        sendMessage(txnContext, 9);
+
+        //Give the Purging Thread a chance to run
+        Thread.yield();
+        Thread.sleep(500);
+
         assertTrue("Queue is not flowed.", _queue.isFlowed());
-        assertEquals(MESSAGE_COUNT / 2 + 1, _queue.getMessageCount());
-        assertEquals(MESSAGE_COUNT / 2, _queue.getMemoryUsageCurrent());
+        assertEquals("Queue contains more messages than expected.", MESSAGE_COUNT / 2 + 1, _queue.getMessageCount());
+        assertEquals("Queue over memory quota.",MESSAGE_COUNT / 2, _queue.getMemoryUsageCurrent());
 
 
-        //send another 99 so there are 200msgs in total on the queue
-        for (int msgCount = 0; msgCount < (MESSAGE_COUNT / 2) - 1; msgCount++)
+        //send another batch of messagse so the total in each queue is equal
+        for (int msgCount = 0; msgCount < (MESSAGE_COUNT / 2) ; msgCount++)
         {
-            sendMessage(txnContext);
+            sendMessage(txnContext, (msgCount % 10));
 
             long usage = _queue.getMemoryUsageCurrent();
             assertTrue("Queue has gone over quota:" + usage,
@@ -153,21 +157,22 @@
             assertTrue("Queue has a negative quota:" + usage, usage > 0);
 
         }
-        assertEquals(MESSAGE_COUNT, _queue.getMessageCount());
+        assertEquals(MESSAGE_COUNT + 1, _queue.getMessageCount());
         assertEquals(MEMORY_MAX, _queue.getMemoryUsageCurrent());
         assertTrue("Queue is not flowed.", _queue.isFlowed());
 
         _queue.registerSubscription(_subscription, false);
 
         int slept = 0;
-        while (_subscription.getQueueEntries().size() != MESSAGE_COUNT && slept < 10)
+        while (_subscription.getQueueEntries().size() != MESSAGE_COUNT + 1 && slept < 10)
         {
+            Thread.yield();
             Thread.sleep(500);
             slept++;
         }
 
         //Ensure the messages are retreived
-        assertEquals("Not all messages were received, slept:" + slept / 2 + "s", MESSAGE_COUNT, _subscription.getQueueEntries().size());
+        assertEquals("Not all messages were received, slept:" + slept / 2 + "s", MESSAGE_COUNT + 1, _subscription.getQueueEntries().size());
 
         //Check the queue is still within it's limits.
         assertTrue("Queue has gone over quota:" + _queue.getMemoryUsageCurrent(),

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=749699&r1=749698&r2=749699&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java Tue Mar  3 19:00:02 2009
@@ -434,7 +434,9 @@
         NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null);
 
         MESSAGE_SIZE = 1;
-        long MEMORY_MAX = 10;
+        /** Set to larger than the purge batch size. Default 100.
+         * @see FlowableBaseQueueEntryList.BATCH_PROCESS_COUNT */ 
+        long MEMORY_MAX = 500;
         int MESSAGE_COUNT = (int) MEMORY_MAX;
         //Set the Memory Usage to be very low
         _queue.setMemoryUsageMaximum(MEMORY_MAX);
@@ -457,8 +459,14 @@
 
         _queue.setMemoryUsageMaximum(0L);
 
-        //Give the purger time to work
-        Thread.sleep(200);
+        //Give the purger time to work maximum of 1s
+        int slept = 0;
+        while (_queue.getMemoryUsageCurrent() > 0 && slept < 5)
+        {
+            Thread.yield();
+            Thread.sleep(200);
+            slept++;
+        }
 
         assertEquals(MESSAGE_COUNT + 1, _queue.getMessageCount());
         assertEquals(0L , _queue.getMemoryUsageCurrent());



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org