You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2009/03/03 20:00:02 UTC
svn commit: r749699 - in /qpid/trunk/qpid/java/broker/src:
main/java/org/apache/qpid/server/queue/
test/java/org/apache/qpid/server/queue/
Author: ritchiem
Date: Tue Mar 3 19:00:02 2009
New Revision: 749699
URL: http://svn.apache.org/viewvc?rev=749699&view=rev
Log:
QPID-1637 : Update to test to correctly use priority queues in test. Fixed big in inhaler/purger to ensure priority data is correctly reloaded.
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java?rev=749699&r1=749698&r2=749699&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java Tue Mar 3 19:00:02 2009
@@ -53,7 +53,7 @@
private AtomicReference<MessageInhaler> _asynchronousInhaler = new AtomicReference(null);
protected boolean _disabled;
private AtomicReference<MessagePurger> _asynchronousPurger = new AtomicReference(null);
- private static final int BATCH_INHALE_COUNT = 100;
+ private static final int BATCH_PROCESS_COUNT = 100;
FlowableBaseQueueEntryList(AMQQueue queue)
{
@@ -76,7 +76,6 @@
if (_flowed.get() != flowed)
{
_log.warn("Marking Queue(" + _queue.getName() + ") as flowed (" + flowed + ")");
- showUsage();
_flowed.set(flowed);
}
}
@@ -126,20 +125,14 @@
}
// Don't attempt to start the inhaler/purger unless we have a minimum value specified.
- if (_memoryUsageMaximum > 0)
+ if (_memoryUsageMaximum >= 0)
{
setMemoryUsageMinimum(_memoryUsageMaximum / 2);
// if we have now have to much memory in use we need to purge.
if (_memoryUsageMaximum < _atomicQueueInMemory.get())
{
- startPurger();
- }
- }
- else if (_memoryUsageMaximum == 0)
- {
- if (_atomicQueueInMemory.get() > 0)
- {
+ setFlowed(true);
startPurger();
}
}
@@ -165,16 +158,15 @@
// Don't attempt to start the inhaler unless we have a minimum value specified.
if (_memoryUsageMinimum > 0)
{
- checkAndStartLoader();
+ checkAndStartInhaler();
}
}
- private void checkAndStartLoader()
+ private void checkAndStartInhaler()
{
- // If we've increased the minimum memory above what we have in memory then we need to inhale more
- long inMemory = _atomicQueueInMemory.get();
- // Can't check if inMemory == 0 or we will cause the inhaler thread to continually run.
- if (inMemory < _memoryUsageMinimum || _memoryUsageMinimum == 0)
+ // If we've increased the minimum memory above what we have in memory then
+ // we need to inhale more if there is more
+ if (_atomicQueueInMemory.get() < _memoryUsageMinimum && _atomicQueueSize.get() > 0)
{
startInhaler();
}
@@ -210,13 +202,14 @@
*
* @param queueEntry the entry to unload
*/
- public void unloadEntry(QueueEntry queueEntry)
+ public void entryUnloadedUpdateMemory(QueueEntry queueEntry)
{
if (_atomicQueueInMemory.addAndGet(-queueEntry.getSize()) < 0)
{
_log.error("InMemory Count just went below 0:" + queueEntry.debugIdentity());
}
- checkAndStartLoader();
+
+ checkAndStartInhaler();
}
/**
@@ -224,11 +217,13 @@
*
* @param queueEntry the entry to load
*/
- public void loadEntry(QueueEntry queueEntry)
+ public void entryLoadedUpdateMemory(QueueEntry queueEntry)
{
if (_atomicQueueInMemory.addAndGet(queueEntry.getSize()) > _memoryUsageMaximum)
{
_log.error("Loaded to much data!:" + _atomicQueueInMemory.get() + "/" + _memoryUsageMaximum);
+ setFlowed(true);
+ startPurger();
}
}
@@ -311,11 +306,15 @@
}
_asynchronousInhaler.compareAndSet(messageInhaler, null);
- int inhaled = 0;
+ int inhaled = 1;
- while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) && (inhaled < BATCH_INHALE_COUNT)
- && _asynchronousInhaler.compareAndSet(null, messageInhaler))
+ while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) // we havn't filled our max memory
+ && (_atomicQueueInMemory.get() < _atomicQueueSize.get()) // we haven't loaded all that is available
+ && (inhaled < BATCH_PROCESS_COUNT) // limit the number of runs we do
+ && (inhaled > 0) // ensure we could inhale something
+ && _asynchronousInhaler.compareAndSet(null, messageInhaler)) // Ensure we are the running inhaler
{
+ inhaled = 0;
QueueEntryIterator iterator = iterator();
// If the inhaler is running and delivery rate picks up ensure that we just don't chase the delivery thread.
@@ -325,7 +324,13 @@
//Find first AVAILABLE node
}
- while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) && (inhaled < BATCH_INHALE_COUNT) && !iterator.atTail())
+ // Because the above loop checks then moves on to the next entry a check for atTail will return true but
+ // we won't have checked the last entry to see if we can load it. So create atEndofList and update it based
+ // on the return from advance() which returns true if it can advance.
+ boolean atEndofList = false;
+ while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) // we havn't filled our max memory
+ && (inhaled < BATCH_PROCESS_COUNT) // limit the number of runs we do
+ && !atEndofList) // We have reached end of list QueueEntries
{
QueueEntry entry = iterator.getNode();
@@ -334,16 +339,20 @@
if (_atomicQueueInMemory.get() + entry.getSize() > _memoryUsageMaximum)
{
// We don't have space for this message so we need to stop inhaling.
- inhaled = BATCH_INHALE_COUNT;
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Entry won't fit in memory stopping inhaler:" + entry.debugIdentity());
+ }
+ inhaled = BATCH_PROCESS_COUNT;
}
else
{
- loadEntry(entry);
+ entry.load();
inhaled++;
}
}
- iterator.advance();
+ atEndofList = !iterator.advance();
}
if (iterator.atTail())
@@ -402,20 +411,34 @@
return;
}
+ if (_log.isInfoEnabled())
+ {
+ _log.info("Purger Running:" + _queue.getName());
+ showUsage("Purger Running:" + _queue.getName());
+ }
+
_asynchronousPurger.compareAndSet(messagePurger, null);
+ int purged = 0;
- while ((_atomicQueueInMemory.get() >= _memoryUsageMinimum) && _asynchronousPurger.compareAndSet(null, messagePurger))
+ while ((_atomicQueueInMemory.get() > _memoryUsageMinimum)
+ && purged < BATCH_PROCESS_COUNT
+ && _asynchronousPurger.compareAndSet(null, messagePurger))
{
QueueEntryIterator iterator = iterator();
+ //There are potentially AQUIRED messages that can be purged but we can't purge the last AQUIRED message
+ // as it may have just become AQUIRED and not yet delivered.
+
+ //To be safe only purge available messages. This should be fine as long as we have a small prefetch.
while (!iterator.getNode().isAvailable() && iterator.advance())
{
//Find first AVAILABLE node
}
- // Count up the memory usage
+ // Count up the memory usage to find our minimum point
long memoryUsage = 0;
- while ((memoryUsage < _memoryUsageMaximum) && !iterator.atTail())
+ boolean atTail = false;
+ while ((memoryUsage < _memoryUsageMaximum) && !atTail)
{
QueueEntry entry = iterator.getNode();
@@ -424,30 +447,37 @@
memoryUsage += entry.getSize();
}
- iterator.advance();
+ atTail = !iterator.advance();
}
//Purge remainging mesages on queue
- while (!iterator.atTail())
+ while (!atTail && (purged < BATCH_PROCESS_COUNT))
{
QueueEntry entry = iterator.getNode();
if (entry.isAvailable() && !entry.isFlowed())
{
entry.unload();
+ purged++;
}
- iterator.advance();
+ atTail = !iterator.advance();
}
- _asynchronousInhaler.set(null);
+ _asynchronousPurger.set(null);
}
- //If we have become flowed or have more capacity since we stopped then schedule the thread to run again.
- if (_flowed.get() && _atomicQueueInMemory.get() < _memoryUsageMaximum)
+ if (_log.isInfoEnabled())
{
- _inhaler.execute(messagePurger);
+ _log.info("Purger Stopping:" + _queue.getName());
+ showUsage("Purger Stopping:" + _queue.getName());
+ }
+ //If we are still flowed and are over the minimum value then schedule to run again.
+ if (_flowed.get() && _atomicQueueInMemory.get() > _memoryUsageMinimum)
+ {
+ _log.info("Rescheduling Purger:" + _queue.getName());
+ _purger.execute(messagePurger);
}
}
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java?rev=749699&r1=749698&r2=749699&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java Tue Mar 3 19:00:02 2009
@@ -79,13 +79,17 @@
long reclaimed = 0;
//work down the priorities looking for memory
- int scavangeIndex = _priorities - 1;
//First: Don't take all the memory. So look for a queue that has more than 50% free
-
long currentMax;
+ int scavangeIndex = 0;
+
+ if (scavangeIndex == index)
+ {
+ scavangeIndex++;
+ }
- while (scavangeIndex >= 0 && reclaimed <= requriedSize)
+ while (scavangeIndex < _priorities && reclaimed <= requriedSize)
{
currentMax = _priorityLists[scavangeIndex].getMemoryUsageMaximum();
long used = _priorityLists[scavangeIndex].memoryUsed();
@@ -105,15 +109,24 @@
}
else
{
- scavangeIndex--;
+ scavangeIndex++;
+ if (scavangeIndex == index)
+ {
+ scavangeIndex++;
+ }
}
- }
+ }
- //Second: Just take the memory we need
- if (scavangeIndex == -1)
+ //Second: Just take the free memory we need
+ if (scavangeIndex == _priorities)
{
- scavangeIndex = _priorities - 1;
- while (scavangeIndex >= 0 && reclaimed <= requriedSize)
+ scavangeIndex = 0;
+ if (scavangeIndex == index)
+ {
+ scavangeIndex++;
+ }
+
+ while (scavangeIndex < _priorities && reclaimed <= requriedSize)
{
currentMax = _priorityLists[scavangeIndex].getMemoryUsageMaximum();
long used = _priorityLists[scavangeIndex].memoryUsed();
@@ -139,7 +152,51 @@
}
else
{
- scavangeIndex--;
+ scavangeIndex++;
+ if (scavangeIndex == index)
+ {
+ scavangeIndex++;
+ }
+ }
+ }
+
+ //Third: Take required memory
+ if (scavangeIndex == _priorities)
+ {
+ scavangeIndex = 0;
+ if (scavangeIndex == index)
+ {
+ scavangeIndex++;
+ }
+ while (scavangeIndex < _priorities && reclaimed <= requriedSize)
+ {
+ currentMax = _priorityLists[scavangeIndex].getMemoryUsageMaximum();
+
+ if (currentMax > 0 )
+ {
+ long newMax = currentMax;
+ // Just take the amount of space required for this message.
+ if (newMax > requriedSize)
+ {
+ newMax = requriedSize;
+ }
+ _priorityLists[scavangeIndex].setMemoryUsageMaximum(newMax);
+
+ reclaimed += currentMax - newMax;
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Reclaiming(3) :" + (currentMax - newMax) + "(" + reclaimed + "/" + requriedSize + ") from queue:" + scavangeIndex);
+ }
+ break;
+ }
+ else
+ {
+ scavangeIndex++;
+ if (scavangeIndex == index)
+ {
+ scavangeIndex++;
+ }
+ }
}
}
}
@@ -159,7 +216,10 @@
_log.debug("No space found.");
}
- showUsage();
+ if (_log.isTraceEnabled())
+ {
+ showUsage("Add");
+ }
}
return _priorityLists[index].add(message);
@@ -170,7 +230,10 @@
{
if (_log.isDebugEnabled())
{
- _log.debug(prefix);
+ if (prefix.length() != 0)
+ {
+ _log.debug(prefix);
+ }
for (int index = 0; index < _priorities; index++)
{
QueueEntryList queueEntryList = _priorityLists[index];
@@ -280,10 +343,16 @@
{
boolean flowed = false;
boolean full = true;
- showUsage();
+
+ if (_log.isTraceEnabled())
+ {
+ showUsage("isFlowed");
+ }
+
for (QueueEntryList queueEntryList : _priorityLists)
{
- full = full && queueEntryList.getMemoryUsageMaximum() == queueEntryList.memoryUsed();
+ //full = full && queueEntryList.getMemoryUsageMaximum() == queueEntryList.memoryUsed();
+ full = full && queueEntryList.getMemoryUsageMaximum() <= queueEntryList.dataSize();
flowed = flowed || (queueEntryList.isFlowed());
}
return flowed && full;
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=749699&r1=749698&r2=749699&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Tue Mar 3 19:00:02 2009
@@ -411,7 +411,7 @@
//Update the memoryState if this load call resulted in the message being purged from memory
if (!_flowed.getAndSet(true))
{
- _queueEntryList.unloadEntry(this);
+ _queueEntryList.entryUnloadedUpdateMemory(this);
}
} catch (UnableToFlowMessageException utfme) {
@@ -438,7 +438,7 @@
//Update the memoryState if this load call resulted in the message comming in to memory
if (_flowed.getAndSet(false))
{
- _queueEntryList.loadEntry(this);
+ _queueEntryList.entryLoadedUpdateMemory(this);
}
}
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java?rev=749699&r1=749698&r2=749699&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java Tue Mar 3 19:00:02 2009
@@ -54,13 +54,13 @@
* Immediately update memory usage based on the unload of this queueEntry, potentially start inhaler.
* @param queueEntry the entry that has been unloaded
*/
- void unloadEntry(QueueEntry queueEntry);
+ void entryUnloadedUpdateMemory(QueueEntry queueEntry);
/**
* Immediately update memory usage based on the load of this queueEntry
* @param queueEntry the entry that has been loaded
*/
- void loadEntry(QueueEntry queueEntry);
+ void entryLoadedUpdateMemory(QueueEntry queueEntry);
void stop();
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=749699&r1=749698&r2=749699&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Tue Mar 3 19:00:02 2009
@@ -468,6 +468,10 @@
if (entry.isFlowed())
{
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Synchoronus load of entry:" + entry.debugIdentity());
+ }
entry.load();
}
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java?rev=749699&r1=749698&r2=749699&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java Tue Mar 3 19:00:02 2009
@@ -134,17 +134,21 @@
assertTrue("Queue is flowed.", !_queue.isFlowed());
// Send another and ensure we are flowed
- sendMessage(txnContext);
-
+ sendMessage(txnContext, 9);
+
+ //Give the Purging Thread a chance to run
+ Thread.yield();
+ Thread.sleep(500);
+
assertTrue("Queue is not flowed.", _queue.isFlowed());
- assertEquals(MESSAGE_COUNT / 2 + 1, _queue.getMessageCount());
- assertEquals(MESSAGE_COUNT / 2, _queue.getMemoryUsageCurrent());
+ assertEquals("Queue contains more messages than expected.", MESSAGE_COUNT / 2 + 1, _queue.getMessageCount());
+ assertEquals("Queue over memory quota.",MESSAGE_COUNT / 2, _queue.getMemoryUsageCurrent());
- //send another 99 so there are 200msgs in total on the queue
- for (int msgCount = 0; msgCount < (MESSAGE_COUNT / 2) - 1; msgCount++)
+ //send another batch of messagse so the total in each queue is equal
+ for (int msgCount = 0; msgCount < (MESSAGE_COUNT / 2) ; msgCount++)
{
- sendMessage(txnContext);
+ sendMessage(txnContext, (msgCount % 10));
long usage = _queue.getMemoryUsageCurrent();
assertTrue("Queue has gone over quota:" + usage,
@@ -153,21 +157,22 @@
assertTrue("Queue has a negative quota:" + usage, usage > 0);
}
- assertEquals(MESSAGE_COUNT, _queue.getMessageCount());
+ assertEquals(MESSAGE_COUNT + 1, _queue.getMessageCount());
assertEquals(MEMORY_MAX, _queue.getMemoryUsageCurrent());
assertTrue("Queue is not flowed.", _queue.isFlowed());
_queue.registerSubscription(_subscription, false);
int slept = 0;
- while (_subscription.getQueueEntries().size() != MESSAGE_COUNT && slept < 10)
+ while (_subscription.getQueueEntries().size() != MESSAGE_COUNT + 1 && slept < 10)
{
+ Thread.yield();
Thread.sleep(500);
slept++;
}
//Ensure the messages are retreived
- assertEquals("Not all messages were received, slept:" + slept / 2 + "s", MESSAGE_COUNT, _subscription.getQueueEntries().size());
+ assertEquals("Not all messages were received, slept:" + slept / 2 + "s", MESSAGE_COUNT + 1, _subscription.getQueueEntries().size());
//Check the queue is still within it's limits.
assertTrue("Queue has gone over quota:" + _queue.getMemoryUsageCurrent(),
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=749699&r1=749698&r2=749699&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java Tue Mar 3 19:00:02 2009
@@ -434,7 +434,9 @@
NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null);
MESSAGE_SIZE = 1;
- long MEMORY_MAX = 10;
+ /** Set to larger than the purge batch size. Default 100.
+ * @see FlowableBaseQueueEntryList.BATCH_PROCESS_COUNT */
+ long MEMORY_MAX = 500;
int MESSAGE_COUNT = (int) MEMORY_MAX;
//Set the Memory Usage to be very low
_queue.setMemoryUsageMaximum(MEMORY_MAX);
@@ -457,8 +459,14 @@
_queue.setMemoryUsageMaximum(0L);
- //Give the purger time to work
- Thread.sleep(200);
+ //Give the purger time to work maximum of 1s
+ int slept = 0;
+ while (_queue.getMemoryUsageCurrent() > 0 && slept < 5)
+ {
+ Thread.yield();
+ Thread.sleep(200);
+ slept++;
+ }
assertEquals(MESSAGE_COUNT + 1, _queue.getMessageCount());
assertEquals(0L , _queue.getMemoryUsageCurrent());
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org