You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2010/05/11 21:31:30 UTC

svn commit: r943240 - in /qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue: QueueEntryImpl.java SimpleQueueEntryList.java

Author: ritchiem
Date: Tue May 11 19:31:30 2010
New Revision: 943240

URL: http://svn.apache.org/viewvc?rev=943240&view=rev
Log:
QPID-2597 : Add scavenge code. Unit test for SQEL to follow. 


Modified:
    qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
    qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java

Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=943240&r1=943239&r2=943240&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Tue May 11 19:31:30 2010
@@ -386,7 +386,26 @@ public class QueueEntryImpl implements Q
 
         if(state != DELETED_STATE && _stateUpdater.compareAndSet(this,state,DELETED_STATE))
         {
-            _queueEntryList.advanceHead();            
+            //
+            // We can't advanceHead() before the 'if' as we need to check if we
+            // are at the head or not. If we are not at the head then we should
+            // try scavenge the list.
+            //
+            // An alternative would be to provide a remove(this) method on
+            // the queueEntryList, this would however be less efficient as it
+            // would require that we walk the queue to locate this before we
+            // could perform the removal.            
+            //
+            if (((QueueEntryImpl) _queueEntryList.getHead()).nextNode() != this)
+            {
+                _queueEntryList.advanceHead();
+                _queueEntryList.scavenge();
+            }
+            else
+            {
+                _queueEntryList.advanceHead();
+            }
+
             return true;
         }
         else

Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java?rev=943240&r1=943239&r2=943240&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java Tue May 11 19:31:30 2010
@@ -2,6 +2,8 @@ package org.apache.qpid.server.queue;
 
 import org.apache.qpid.server.store.StoreContext;
 
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 /*
@@ -43,6 +45,9 @@ public class SimpleQueueEntryList implem
             AtomicReferenceFieldUpdater.newUpdater
             (QueueEntryImpl.class, QueueEntryImpl.class, "_next");
 
+    private AtomicLong _scavenges = new AtomicLong(0L);
+    private AtomicReference<Thread> _scavenger = new AtomicReference<Thread>(null);
+    private static final long SCAVENGE_COUNT = Integer.getInteger("qpid.queue.scavenge_count", 50);
 
 
 
@@ -69,6 +74,78 @@ public class SimpleQueueEntryList implem
         }
     }
 
+    void scavenge()
+    {
+        _scavenges.incrementAndGet();
+        
+        if (_scavenges.get() < SCAVENGE_COUNT)
+        {
+            return;
+        }
+
+        try
+        {
+
+            if (_scavenger.compareAndSet(null, Thread.currentThread()))
+            {
+                // only delete the number of scavenges requested.
+                // This should keep things fair when we have multiple consumers
+                // using selectors that will be calling this.
+                // With multiple consumers this will also be called but
+                // advanceHead should take care of it in most instances.
+                // Often it will be the ExpiredMessageTask so will not
+                // affect throughput.
+                long deletesToPerform = _scavenges.getAndSet(0);
+
+
+                QueueEntryImpl root = _head;
+                QueueEntryImpl next = root.nextNode();
+
+                do
+                {
+
+                    while (next._next != null && next.isDeleted())
+                    {
+
+                        final QueueEntryImpl newhead = next.nextNode();
+                        if (newhead != null)
+                        {
+                            _nextUpdater.compareAndSet(root, next, newhead);
+                        }
+                        next = root.nextNode();
+                    }
+                    if (next._next != null)
+                    {
+                        if (!next.isDeleted())
+                        {
+                            root = next;
+                            next = root.nextNode();
+                            deletesToPerform--;
+                        }
+
+                        // Limit the number of scavenges performed by this
+                        // thread. For fairness.
+                        if (deletesToPerform == 0)
+                        {
+                            return;
+                        }
+
+                    }
+                    else
+                    {
+                        break;
+                    }
+
+                }
+                while (next != null && next._next != null);
+            }
+        }
+        finally
+        {
+            _scavenger.compareAndSet(Thread.currentThread(), null);
+        }
+
+    }
 
     public AMQQueue getQueue()
     {



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org