You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2013/10/25 20:13:35 UTC

svn commit: r1535803 - in /qpid/trunk/qpid/cpp/src/qpid/broker: LossyQueue.cpp Queue.cpp Queue.h

Author: aconway
Date: Fri Oct 25 18:13:35 2013
New Revision: 1535803

URL: http://svn.apache.org/r1535803
Log:
QPID-4287: Poor performance when a priority queue with a ring queue policy has a large backlog

LossyQueue::checkDepth was performing an unintended linear search of its
messages when a new message was added at capacity.  Since the messages are in
priority order, only the tail message on the queue needs to be compared with the
new message to determine which of them should be dropped.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/LossyQueue.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/LossyQueue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/LossyQueue.cpp?rev=1535803&r1=1535802&r2=1535803&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/LossyQueue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/LossyQueue.cpp Fri Oct 25 18:13:35 2013
@@ -51,8 +51,19 @@ bool LossyQueue::checkDepth(const QueueD
     while (settings.maxDepth && (settings.maxDepth - current < increment)) {
         QPID_LOG(debug, "purging " << name << ": current depth is [" << current << "], max depth is [" << settings.maxDepth << "], new message has size " << increment.getSize());
         qpid::sys::Mutex::ScopedUnlock u(messageLock);
-        //TODO: arguably we should try and purge expired messages first but that is potentially expensive
-        if (remove(1, settings.priorities ? boost::bind(&isLowerPriorityThan, message.getPriority(), _1) : MessagePredicate(), boost::bind(&reroute, alternateExchange, _1), PURGE, false)) {
+        //TODO: arguably we should try and purge expired messages first but that
+        //is potentially expensive
+
+        // Note: in the case of a priority queue we are only comparing the new mesage
+        // with single lowest-priority message, hence the final parameter maxTests
+        // is 1 in this case, so we only test one message for removal.
+        if (remove(1,
+                   settings.priorities ?
+                   boost::bind(&isLowerPriorityThan, message.getPriority(), _1) :
+                   MessagePredicate(), boost::bind(&reroute, alternateExchange, _1),
+                   PURGE, false,
+                   settings.priorities ? 1 : 0))
+        {
             if (mgmtObject) {
                 mgmtObject->inc_discardsRing(1);
                 if (brokerMgmtObject)

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1535803&r1=1535802&r2=1535803&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Fri Oct 25 18:13:35 2013
@@ -706,30 +706,29 @@ namespace {
     }
 } // end namespace
 
-uint32_t Queue::remove(const uint32_t maxCount, MessagePredicate p, MessageFunctor f, SubscriptionType type, bool triggerAutoDelete)
+uint32_t Queue::remove(const uint32_t maxCount, MessagePredicate p, MessageFunctor f,
+                       SubscriptionType type, bool triggerAutoDelete, uint32_t maxTests)
 {
     ScopedAutoDelete autodelete(*this);
     std::deque<Message> removed;
     {
         QueueCursor c(type);
-        uint32_t count(0);
+        uint32_t count(0), tests(0);
         Mutex::ScopedLock locker(messageLock);
         Message* m = messages->next(c);
         while (m){
+            if (maxTests && tests++ >= maxTests) break;
             if (!p || p(*m)) {
-                if (!maxCount || count++ < maxCount) {
-                    if (m->getState() == AVAILABLE) {
-                        //don't actually acquire, just act as if we did
-                        observeAcquire(*m, locker);
-                    }
-                    observeDequeue(*m, locker, triggerAutoDelete ? &autodelete : 0);
-                    removed.push_back(*m);//takes a copy of the message
-                    if (!messages->deleted(c)) {
-                        QPID_LOG(warning, "Failed to correctly remove message from " << name << "; state is not consistent!");
-                        assert(false);
-                    }
-                } else {
-                    break;
+                if (maxCount && count++ >= maxCount) break;
+                if (m->getState() == AVAILABLE) {
+                    //don't actually acquire, just act as if we did
+                    observeAcquire(*m, locker);
+                }
+                observeDequeue(*m, locker, triggerAutoDelete ? &autodelete : 0);
+                removed.push_back(*m);//takes a copy of the message
+                if (!messages->deleted(c)) {
+                    QPID_LOG(warning, "Failed to correctly remove message from " << name << "; state is not consistent!");
+                    assert(false);
                 }
             }
             m = messages->next(c);

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=1535803&r1=1535802&r2=1535803&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Fri Oct 25 18:13:35 2013
@@ -255,7 +255,22 @@ class Queue : public boost::enable_share
     void abandoned(const Message& message);
     bool checkNotDeleted(const Consumer::shared_ptr&);
     void notifyDeleted();
-    uint32_t remove(uint32_t maxCount, MessagePredicate, MessageFunctor, SubscriptionType, bool triggerAutoDelete);
+
+    /** Remove messages from the queue:
+     *@param maxCount Maximum number of messages to remove, 0 means unlimited.
+     *@param p Only remove messages for which p(msg) is true.
+     *@param f Call f on each message that is removed.
+     *@param st Use a cursor of this SubscriptionType to iterate messages to remove.
+     *@param triggerAutoDelete If true removing messages may trigger aut-delete.
+     *@param maxTests Max number of messages to test for removal, 0 means unlimited.
+     *@return Number of messages removed.
+     */
+    uint32_t remove(uint32_t maxCount,
+                    MessagePredicate p, MessageFunctor f,
+                    SubscriptionType st,
+                    bool triggerAutoDelete,
+                    uint32_t maxTests=0);
+
     virtual bool checkDepth(const QueueDepth& increment, const Message&);
     void tryAutoDelete();
   public:



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org