You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2010/05/11 21:31:30 UTC
svn commit: r943240 - in
/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:
QueueEntryImpl.java SimpleQueueEntryList.java
Author: ritchiem
Date: Tue May 11 19:31:30 2010
New Revision: 943240
URL: http://svn.apache.org/viewvc?rev=943240&view=rev
Log:
QPID-2597 : Add scavenge code. Unit test for SQEL to follow.
Modified:
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=943240&r1=943239&r2=943240&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Tue May 11 19:31:30 2010
@@ -386,7 +386,26 @@ public class QueueEntryImpl implements Q
if(state != DELETED_STATE && _stateUpdater.compareAndSet(this,state,DELETED_STATE))
{
- _queueEntryList.advanceHead();
+ //
+ // We can't advanceHead() before the 'if' as we need to check if we
+ // are at the head or not. If we are not at the head then we should
+ // try scavenge the list.
+ //
+ // An alternative would be to provide a remove(this) method on
+ // the queueEntryList, this would however be less efficient as it
+ // would require that we walk the queue to locate this before we
+ // could perform the removal.
+ //
+ if (((QueueEntryImpl) _queueEntryList.getHead()).nextNode() != this)
+ {
+ _queueEntryList.advanceHead();
+ _queueEntryList.scavenge();
+ }
+ else
+ {
+ _queueEntryList.advanceHead();
+ }
+
return true;
}
else
Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java?rev=943240&r1=943239&r2=943240&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java Tue May 11 19:31:30 2010
@@ -2,6 +2,8 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.server.store.StoreContext;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
/*
@@ -43,6 +45,9 @@ public class SimpleQueueEntryList implem
AtomicReferenceFieldUpdater.newUpdater
(QueueEntryImpl.class, QueueEntryImpl.class, "_next");
+ private AtomicLong _scavenges = new AtomicLong(0L);
+ private AtomicReference<Thread> _scavenger = new AtomicReference<Thread>(null);
+ private static final long SCAVENGE_COUNT = Integer.getInteger("qpid.queue.scavenge_count", 50);
@@ -69,6 +74,78 @@ public class SimpleQueueEntryList implem
}
}
+ void scavenge()
+ {
+ _scavenges.incrementAndGet();
+
+ if (_scavenges.get() < SCAVENGE_COUNT)
+ {
+ return;
+ }
+
+ try
+ {
+
+ if (_scavenger.compareAndSet(null, Thread.currentThread()))
+ {
+ // only delete the number of scavenges requested.
+ // This should keep things fair when we have multiple consumers
+ // using selectors that will be calling this.
+ // With multiple consumers this will also be called but
+ // advanceHead should take care of it in most instances.
+ // Often it will be the ExpiredMessageTask so will not
+ // affect throughput.
+ long deletesToPerform = _scavenges.getAndSet(0);
+
+
+ QueueEntryImpl root = _head;
+ QueueEntryImpl next = root.nextNode();
+
+ do
+ {
+
+ while (next._next != null && next.isDeleted())
+ {
+
+ final QueueEntryImpl newhead = next.nextNode();
+ if (newhead != null)
+ {
+ _nextUpdater.compareAndSet(root, next, newhead);
+ }
+ next = root.nextNode();
+ }
+ if (next._next != null)
+ {
+ if (!next.isDeleted())
+ {
+ root = next;
+ next = root.nextNode();
+ deletesToPerform--;
+ }
+
+ // Limit the number of scavenges performed by this
+ // thread. For fairness.
+ if (deletesToPerform == 0)
+ {
+ return;
+ }
+
+ }
+ else
+ {
+ break;
+ }
+
+ }
+ while (next != null && next._next != null);
+ }
+ }
+ finally
+ {
+ _scavenger.compareAndSet(Thread.currentThread(), null);
+ }
+
+ }
public AMQQueue getQueue()
{
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org