You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2009/03/26 17:57:55 UTC

svn commit: r758742 - in /qpid/trunk/qpid/java/broker/src: main/java/org/apache/qpid/server/queue/ test/java/org/apache/qpid/server/queue/

Author: ritchiem
Date: Thu Mar 26 16:57:21 2009
New Revision: 758742

URL: http://svn.apache.org/viewvc?rev=758742&view=rev
Log:
QPID-1768 : Removed all the special priority queue code. Added the ability for a FlowableBaseQueueEntryList to delegate its accounting to a parent QueueEntryList. This results in the PriorityQueueEntryList using the same FtD algorithm as SimpleQELs.
    - New Messages on a flowed queue are pushed optimistically pushed to disk, this should potentially be removed and just rely on the purger to flush the correct message which in the Priority case may not be the last message in.
    - When space is available messages are loaded in queue order, so in this case Priority order.


Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryPriorityTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java?rev=758742&r1=758741&r2=758742&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java Thu Mar 26 16:57:21 2009
@@ -54,6 +54,7 @@
     protected boolean _disableFlowToDisk;
     private AtomicReference<MessagePurger> _asynchronousPurger = new AtomicReference(null);
     private static final int BATCH_PROCESS_COUNT = 100;
+    protected FlowableBaseQueueEntryList _parentQueue;
 
     FlowableBaseQueueEntryList(AMQQueue queue)
     {
@@ -89,7 +90,7 @@
     {
         if (_log.isDebugEnabled())
         {
-            _log.debug(prefix + " Queue(" + _queue + ":" + _queue.getName() + ") usage:" + memoryUsed()
+            _log.debug(prefix + " Queue(" + _queue.getName() + ") usage:" + memoryUsed()
                        + "/" + getMemoryUsageMinimum() + "<>" + getMemoryUsageMaximum()
                        + "/" + dataSize());
         }
@@ -97,7 +98,14 @@
 
     public boolean isFlowed()
     {
-        return _flowed.get();
+        if (_parentQueue != null)
+        {
+            return _parentQueue.isFlowed();
+        }
+        else
+        {
+            return _flowed.get();
+        }
     }
 
     public int size()
@@ -204,12 +212,19 @@
      */
     public void entryUnloadedUpdateMemory(QueueEntry queueEntry)
     {
-        if (!_disableFlowToDisk && _atomicQueueInMemory.addAndGet(-queueEntry.getSize()) < 0)
+        if (_parentQueue != null)
         {
-            _log.error("InMemory Count just went below 0:" + queueEntry.debugIdentity());
+            _parentQueue.entryUnloadedUpdateMemory(queueEntry);
         }
+        else
+        {
+            if (!_disableFlowToDisk && _atomicQueueInMemory.addAndGet(-queueEntry.getSize()) < 0)
+            {
+                _log.error("InMemory Count just went below 0:" + queueEntry.debugIdentity());
+            }
 
-        checkAndStartInhaler();
+            checkAndStartInhaler();
+        }
     }
 
     /**
@@ -219,11 +234,18 @@
      */
     public void entryLoadedUpdateMemory(QueueEntry queueEntry)
     {
-        if (!_disableFlowToDisk && _atomicQueueInMemory.addAndGet(queueEntry.getSize()) > _memoryUsageMaximum)
+        if (_parentQueue != null)
         {
-            _log.error("Loaded to much data!:" + _atomicQueueInMemory.get() + "/" + _memoryUsageMaximum);
-            setFlowed(true);
-            startPurger();
+            _parentQueue.entryLoadedUpdateMemory(queueEntry);
+        }
+        else
+        {
+            if (!_disableFlowToDisk && _atomicQueueInMemory.addAndGet(queueEntry.getSize()) > _memoryUsageMaximum)
+            {
+                _log.error("Loaded to much data!:" + _atomicQueueInMemory.get() + "/" + _memoryUsageMaximum);
+                setFlowed(true);
+                startPurger();
+            }
         }
     }
 
@@ -241,28 +263,55 @@
         }
     }
 
-    protected void incrementCounters(final QueueEntryImpl queueEntry)
+    /**
+     * Mark this queue as part of another QueueEntryList for accounting purposes.
+     *
+     * All Calls from the QueueEntry to the QueueEntryList need to check if there is
+     * a parent QueueEntrylist upon which the action should take place.
+     *
+     * @param queueEntryList The parent queue that is performing accounting.
+     */    
+    public void setParentQueueEntryList(FlowableBaseQueueEntryList queueEntryList)
     {
-        _atomicQueueCount.incrementAndGet();
-        _atomicQueueSize.addAndGet(queueEntry.getSize());
-        long inUseMemory = _atomicQueueInMemory.addAndGet(queueEntry.getSize());
+        _parentQueue = queueEntryList;
+    }
 
-        if (!_disableFlowToDisk && inUseMemory > _memoryUsageMaximum)
+    protected void incrementCounters(final QueueEntryImpl queueEntry)
+    {
+        if (_parentQueue != null)
+        {
+            _parentQueue.incrementCounters(queueEntry);
+        }
+        else
         {
-            setFlowed(true);
-            queueEntry.unload();
+            _atomicQueueCount.incrementAndGet();
+            _atomicQueueSize.addAndGet(queueEntry.getSize());
+            long inUseMemory = _atomicQueueInMemory.addAndGet(queueEntry.getSize());
+
+            if (!_disableFlowToDisk && inUseMemory > _memoryUsageMaximum)
+            {
+                setFlowed(true);
+                queueEntry.unload();
+            }
         }
     }
 
     protected void dequeued(QueueEntryImpl queueEntry)
     {
-        _atomicQueueCount.decrementAndGet();
-        _atomicQueueSize.addAndGet(-queueEntry.getSize());
-        if (!queueEntry.isFlowed())
+        if (_parentQueue != null)
         {
-            if (_atomicQueueInMemory.addAndGet(-queueEntry.getSize()) < 0)
+            _parentQueue.dequeued(queueEntry);
+        }
+        else
+        {
+            _atomicQueueCount.decrementAndGet();
+            _atomicQueueSize.addAndGet(-queueEntry.getSize());
+            if (!queueEntry.isFlowed())
             {
-                _log.error("InMemory Count just went below 0 on dequeue.");
+                if (_atomicQueueInMemory.addAndGet(-queueEntry.getSize()) < 0)
+                {
+                    _log.error("InMemory Count just went below 0 on dequeue.");
+                }
             }
         }
     }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java?rev=758742&r1=758741&r2=758742&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java Thu Mar 26 16:57:21 2009
@@ -39,6 +39,7 @@
         for (int i = 0; i < priorities; i++)
         {
             _priorityLists[i] = new SimpleQueueEntryList(queue);
+            _priorityLists[i].setParentQueueEntryList(this);
         }
 
         showUsage("Created:" + _queue.getName());
@@ -66,183 +67,9 @@
             index = 0;
         }
 
-        long requriedSize = message.getSize();
-        // Check and see if list would flow on adding message
-        if (!_disableFlowToDisk && !isFlowed() && _priorityLists[index].memoryUsed() + requriedSize > _priorityLists[index].getMemoryUsageMaximum())
-        {
-            if (_log.isDebugEnabled())
-            {
-                _log.debug("Message(" + message.debugIdentity() + ") Add of size ("
-                              + requriedSize + ") will cause flow. Searching for space");
-            }
-
-            long reclaimed = 0;
-
-            //work down the priorities looking for memory
-
-            //First: Don't take all the memory. So look for a queue that has more than 50% free
-            long currentMax;
-            int scavangeIndex = 0;
-
-            if (scavangeIndex == index)
-            {
-                scavangeIndex++;
-            }
-
-            while (scavangeIndex < _priorities  && reclaimed <= requriedSize)
-            {
-                currentMax = _priorityLists[scavangeIndex].getMemoryUsageMaximum();
-                long used = _priorityLists[scavangeIndex].memoryUsed();
-
-                if (used < currentMax / 2)
-                {
-                    long newMax = currentMax / 2;
-
-                    _priorityLists[scavangeIndex].setMemoryUsageMaximum(newMax);
-
-                    reclaimed += currentMax - newMax;
-                    if (_log.isDebugEnabled())
-                    {
-                        _log.debug("Reclaiming(1) :" + (currentMax - newMax) + "(" + reclaimed + "/" + requriedSize + ") from queue:" + scavangeIndex);
-                    }
-                    break;
-                }
-                else
-                {
-                    scavangeIndex++;
-                    if (scavangeIndex == index)
-                    {
-                        scavangeIndex++;
-                    }
-                }
-            }                                   
-
-            //Second: Just take the free memory we need
-            if (scavangeIndex == _priorities)
-            {
-                scavangeIndex = 0;
-                if (scavangeIndex == index)
-                {
-                    scavangeIndex++;
-                }
-
-                while (scavangeIndex < _priorities && reclaimed <= requriedSize)
-                {
-                    currentMax = _priorityLists[scavangeIndex].getMemoryUsageMaximum();
-                    long used = _priorityLists[scavangeIndex].memoryUsed();
-
-                    if (used < currentMax)
-                    {
-                        long newMax = currentMax - used;
-
-                        // if there are no messages at this priority just take it all
-                        if (newMax == currentMax)
-                        {
-                            newMax = 0;
-                        }
-
-                        _priorityLists[scavangeIndex].setMemoryUsageMaximum(newMax);
-
-                        reclaimed += currentMax - newMax;
-                        if (_log.isDebugEnabled())
-                        {
-                            _log.debug("Reclaiming(2) :" + (currentMax - newMax) + "(" + reclaimed + "/" + requriedSize + ") from queue:" + scavangeIndex);
-                        }
-                        break;
-                    }
-                    else
-                    {
-                        scavangeIndex++;
-                        if (scavangeIndex == index)
-                        {
-                            scavangeIndex++;
-                        }
-                    }
-                }
-
-                //Third: Take required memory
-                if (scavangeIndex == _priorities)
-                {
-                    scavangeIndex = 0;
-                    if (scavangeIndex == index)
-                    {
-                        scavangeIndex++;
-                    }
-                    while (scavangeIndex < _priorities && reclaimed <= requriedSize)
-                    {
-                        currentMax = _priorityLists[scavangeIndex].getMemoryUsageMaximum();
-
-                        if (currentMax > 0 )
-                        {
-                            long newMax = currentMax;
-                            // Just take the amount of space required for this message.
-                            if (newMax > requriedSize)
-                            {
-                                newMax = requriedSize;
-                            }
-                            _priorityLists[scavangeIndex].setMemoryUsageMaximum(newMax);
-
-                            reclaimed += currentMax - newMax;
-                            if (_log.isDebugEnabled())
-                            {
-                                _log.debug("Reclaiming(3) :" + (currentMax - newMax) + "(" + reclaimed + "/" + requriedSize + ") from queue:" + scavangeIndex);
-                            }
-                            break;
-                        }
-                        else
-                        {
-                            scavangeIndex++;
-                            if (scavangeIndex == index)
-                            {
-                                scavangeIndex++;
-                            }
-                        }
-                    }
-                }
-            }
-
-            //Increment Maximum
-            if (reclaimed > 0)
-            {
-                if (_log.isDebugEnabled())
-                {
-                    _log.debug("Increasing queue(" + index + ") maximum by " + reclaimed
-                               + " to " + (_priorityLists[index].getMemoryUsageMaximum() + reclaimed));
-                }
-                _priorityLists[index].setMemoryUsageMaximum(_priorityLists[index].getMemoryUsageMaximum() + reclaimed);
-            }
-            else
-            {
-                _log.debug("No space found.");
-            }
-
-            if (_log.isTraceEnabled())
-            {
-                showUsage("Add");
-            }
-        }
-
         return _priorityLists[index].add(message);
     }
 
-    @Override
-    protected void showUsage(String prefix)
-    {
-        if (_log.isDebugEnabled())
-        {
-            if (prefix.length() != 0)
-            {
-                _log.debug(prefix);
-            }
-            for (int index = 0; index < _priorities; index++)
-            {
-                QueueEntryList queueEntryList = _priorityLists[index];
-                _log.debug("Queue (" + _queue.getName() + ")[" + index + "] usage:" + queueEntryList.memoryUsed()
-                           + "/" + queueEntryList.getMemoryUsageMaximum()
-                           + "/" + queueEntryList.dataSize());
-            }
-        }
-    }
 
     public QueueEntry next(QueueEntry node)
     {
@@ -338,122 +165,7 @@
         }
     }
 
-    @Override
-    public boolean isFlowed()
-    {
-        boolean flowed = false;
-        boolean full = true;
-
-        if (_log.isTraceEnabled())
-        {
-            showUsage("isFlowed");
-        }
-
-        for (QueueEntryList queueEntryList : _priorityLists)
-        {
-            //full = full && queueEntryList.getMemoryUsageMaximum() == queueEntryList.memoryUsed();
-            full = full && queueEntryList.getMemoryUsageMaximum() <= queueEntryList.dataSize();
-            flowed = flowed || (queueEntryList.isFlowed());
-        }
-        return flowed && full;
-    }
-
-    @Override
-    public int size()
-    {
-        int size = 0;
-        for (QueueEntryList queueEntryList : _priorityLists)
-        {
-            size += queueEntryList.size();
-        }
-
-        return size;
-    }
-
-    @Override
-    public long dataSize()
-    {
-        int dataSize = 0;
-        for (QueueEntryList queueEntryList : _priorityLists)
-        {
-            dataSize += queueEntryList.dataSize();
-        }
-
-        return dataSize;
-    }
-
-    @Override
-    public long memoryUsed()
-    {
-        int memoryUsed = 0;
-        for (QueueEntryList queueEntryList : _priorityLists)
-        {
-            memoryUsed += queueEntryList.memoryUsed();
-        }
-
-        return memoryUsed;
-    }
-
-    @Override
-    public void setMemoryUsageMaximum(long maximumMemoryUsage)
-    {
-        _memoryUsageMaximum = maximumMemoryUsage;
-
-        if (maximumMemoryUsage >= 0)
-        {
-            _disableFlowToDisk = false;
-        }
-
-        long share = maximumMemoryUsage / _priorities;
-
-        //Apply a share of the maximum To each prioirty quue
-        for (QueueEntryList queueEntryList : _priorityLists)
-        {
-            queueEntryList.setMemoryUsageMaximum(share);
-        }
-
-        if (maximumMemoryUsage < 0)
-        {
-            if (_log.isInfoEnabled())
-            {
-                _log.info("Disabling Flow to Disk for queue:" + _queue.getName());
-            }
-            _disableFlowToDisk = true;
-            return;
-        }
-
-        //ensure we use the full allocation of memory
-        long remainder = maximumMemoryUsage - (share * _priorities);
-        if (remainder > 0)
-        {
-            _priorityLists[_priorities - 1].setMemoryUsageMaximum(share + remainder);
-        }
-    }
-
-    @Override
-    public long getMemoryUsageMaximum()
-    {
-        return _memoryUsageMaximum;
-    }
-
-    @Override
-    public void setMemoryUsageMinimum(long minimumMemoryUsage)
-    {
-        _memoryUsageMinimum = minimumMemoryUsage;
-
-        //Apply a share of the minimum To each prioirty quue
-        for (QueueEntryList queueEntryList : _priorityLists)
-        {
-            queueEntryList.setMemoryUsageMaximum(minimumMemoryUsage / _priorities);
-        }
-    }
-
-    @Override
-    public long getMemoryUsageMinimum()
-    {
-        return _memoryUsageMinimum;
-    }
-
+  
     @Override
     public void stop()
     {

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=758742&r1=758741&r2=758742&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Thu Mar 26 16:57:21 2009
@@ -155,6 +155,13 @@
         return (_flags & DELIVERED_TO_CONSUMER) != 0;
     }
 
+    /**
+     * Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality).
+     * And for selector efficiency.
+     *
+     * This is now also used to unload the message if this entry is on a flowed queue. As a result this method should
+     * only be called after the message has been sent.
+     */    
     public void setDeliveredToSubscription()
     {
         _flags |= DELIVERED_TO_CONSUMER;

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java?rev=758742&r1=758741&r2=758742&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java Thu Mar 26 16:57:21 2009
@@ -63,4 +63,14 @@
     void entryLoadedUpdateMemory(QueueEntry queueEntry);
 
     void stop();
+
+    /**
+     * Mark this queue as part of another QueueEntryList for accounting purposes.
+     *
+     * All Calls from the QueueEntry to the QueueEntryList need to check if there is
+     * a parent QueueEntrylist upon which the action should take place.
+     *
+     * @param queueEntryList The parent queue that is performing accounting.
+     */
+    void setParentQueueEntryList(FlowableBaseQueueEntryList queueEntryList);
 }

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryPriorityTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryPriorityTest.java?rev=758742&r1=758741&r2=758742&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryPriorityTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryPriorityTest.java Thu Mar 26 16:57:21 2009
@@ -50,21 +50,4 @@
             fail(e.getMessage());
         }
     }
-
-    @Override
-    public void testQueueValuesAfterCreation()
-    {
-        try
-        {
-            AMQQueue queue = createQueue();
-
-            assertEquals("MemoryMaximumSize not set correctly:", MAX_SIZE, queue.getMemoryUsageMaximum());
-            //NOTE: Priority queue will show 0 as minimum as the minimum value is actually spread between its sub QELs
-            assertEquals("MemoryMinimumSize not 0 as expected for a priority queue:", 0, queue.getMemoryUsageMinimum());
-        }
-        catch (AMQException e)
-        {
-            fail(e.getMessage());
-        }
-    }
 }

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java?rev=758742&r1=758741&r2=758742&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java Thu Mar 26 16:57:21 2009
@@ -48,11 +48,4 @@
         _contentHeaderBody = new ContentHeaderBody(properties, BasicPublishBodyImpl.CLASS_ID);
         _contentBodies = new ArrayList<ContentChunk>();
     }
-
-
-    @Override
-    public long getSize()
-    {
-        return 0l;
-    }
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org