You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by jo...@apache.org on 2010/10/01 15:20:54 UTC

svn commit: r1003531 - in /qpid/trunk/qpid/cpp/src: qpid/broker/QueuePolicy.cpp qpid/broker/QueuePolicy.h tests/QueuePolicyTest.cpp

Author: jonathan
Date: Fri Oct  1 13:20:54 2010
New Revision: 1003531

URL: http://svn.apache.org/viewvc?rev=1003531&view=rev
Log:
Fixes two bugs for ring queue policies that involve size. 

- When messages vary in size, now correctly displaces enough smaller messages
  to make room for the new message.
- When a message is larger than maximum queue size, now correctly rejects
  the message.

Resolves JIRA QPID-2338 (https://issues.apache.org/jira/browse/QPID-2338).

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.h
    qpid/trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp?rev=1003531&r1=1003530&r2=1003531&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp Fri Oct  1 13:20:54 2010
@@ -222,30 +222,51 @@ bool RingQueuePolicy::isEnqueued(const Q
 
 bool RingQueuePolicy::checkLimit(boost::intrusive_ptr<Message> m)
 {
-    if (QueuePolicy::checkLimit(m)) return true;//if haven't hit limit, ok to accept
-    
-    QueuedMessage oldest;
-    if (queue.empty()) {
+
+    // If the message is bigger than the queue size, give up
+    if (m->contentSize() > getMaxSize()) {
         QPID_LOG(debug, "Message too large for ring queue " << name 
                  << " [" << *this  << "] "
-                 << ": message size = " << m->contentSize() << " bytes");
-        return false;
-    }
-    oldest = queue.front();
-    if (oldest.queue->acquire(oldest) || !strict) {
-        queue.pop_front();
-        pendingDequeues.push_back(oldest);
-        QPID_LOG(debug, "Ring policy triggered in " << name 
-                 << ": removed message " << oldest.position << " to make way for new message");
-        return true;
-    } else {
-        QPID_LOG(debug, "Ring policy could not be triggered in " << name 
-                 << ": oldest message (seq-no=" << oldest.position << ") has been delivered but not yet acknowledged or requeued");
-        //in strict mode, if oldest message has been delivered (hence
-        //cannot be acquired) but not yet acked, it should not be
-        //removed and the attempted enqueue should fail
+                 << ": message size = " << m->contentSize() << " bytes"
+                 << ": max queue size = " << getMaxSize() << " bytes");
         return false;
     }
+
+    // if within limits, ok to accept
+    if (QueuePolicy::checkLimit(m)) return true;
+
+    // At this point, we've exceeded maxSize, maxCount, or both.
+    //
+    // If we've exceeded maxCount, we've exceeded it by 1, so
+    // replacing the first message is sufficient. If we've exceeded
+    // maxSize, we need to pop enough messages to get the space we
+    // need.
+
+    unsigned int haveSpace = getMaxSize() - getCurrentQueueSize();
+
+    do {
+        QueuedMessage oldest  = queue.front();
+
+        if (oldest.queue->acquire(oldest) || !strict) {
+            queue.pop_front();
+            pendingDequeues.push_back(oldest);
+            QPID_LOG(debug, "Ring policy triggered in " << name 
+                     << ": removed message " << oldest.position << " to make way for new message");
+
+            haveSpace += oldest.payload->contentSize();
+            
+        } else {
+            //in strict mode, if oldest message has been delivered (hence
+            //cannot be acquired) but not yet acked, it should not be
+            //removed and the attempted enqueue should fail
+            QPID_LOG(debug, "Ring policy could not be triggered in " << name 
+                     << ": oldest message (seq-no=" << oldest.position << ") has been delivered but not yet acknowledged or requeued");
+            return false;
+        }
+    } while (haveSpace < m->contentSize());
+    
+    
+    return true;
 }
 
 void RingQueuePolicy::getPendingDequeues(Messages& result)

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.h?rev=1003531&r1=1003530&r2=1003531&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.h Fri Oct  1 13:20:54 2010
@@ -46,6 +46,9 @@ class QueuePolicy
             
     static int getInt(const qpid::framing::FieldTable& settings, const std::string& key, int defaultValue);
 
+  protected:
+    uint64_t getCurrentQueueSize() const { return size; } 
+
   public:
     typedef std::deque<QueuedMessage> Messages;
     static QPID_BROKER_EXTERN const std::string maxCountKey;

Modified: qpid/trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp?rev=1003531&r1=1003530&r2=1003531&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp Fri Oct  1 13:20:54 2010
@@ -1,4 +1,4 @@
- /*
+/*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -18,6 +18,7 @@
  * under the License.
  *
  */
+#include <sstream>
 #include "unit_test.h"
 #include "test_tools.h"
 
@@ -143,7 +144,7 @@ QPID_AUTO_TEST_CASE(testSettings)
     BOOST_CHECK_EQUAL(a->getMaxSize(), b->getMaxSize());
 }
 
-QPID_AUTO_TEST_CASE(testRingPolicy)
+QPID_AUTO_TEST_CASE(testRingPolicyCount)
 {
     FieldTable args;
     std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 5, 0, QueuePolicy::RING);
@@ -172,6 +173,84 @@ QPID_AUTO_TEST_CASE(testRingPolicy)
     BOOST_CHECK(!f.subs.get(msg, q));
 }
 
+QPID_AUTO_TEST_CASE(testRingPolicySize)
+{
+    std::string hundredBytes = std::string(100, 'h');
+    std::string fourHundredBytes = std::string (400, 'f');
+    std::string thousandBytes = std::string(1000, 't');
+
+    // Ring queue, 500 bytes maxSize
+
+    FieldTable args;
+    std::auto_ptr<QueuePolicy> policy = QueuePolicy::createQueuePolicy("test", 0, 500, QueuePolicy::RING);
+    policy->update(args);
+
+    ProxySessionFixture f;
+    std::string q("my-ring-queue");
+    f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args);
+
+    // A. Send messages 0 .. 5, each 100 bytes
+
+    client::Message m(hundredBytes, q); 
+    
+    for (int i = 0; i < 6; i++) {
+        std::stringstream id;
+        id << i;        
+        m.getMessageProperties().setCorrelationId(id.str());
+        f.session.messageTransfer(arg::content=m);
+    }
+
+    // should find 1 .. 5 on the queue, 0 is displaced by 5
+    client::Message msg;
+    for (int i = 1; i < 6; i++) {
+        std::stringstream id;
+        id << i;        
+        BOOST_CHECK(f.subs.get(msg, q, qpid::sys::TIME_SEC));
+        BOOST_CHECK_EQUAL(msg.getMessageProperties().getCorrelationId(), id.str());
+    }
+    BOOST_CHECK(!f.subs.get(msg, q));
+
+    // B. Now make sure that one 400 byte message displaces four 100 byte messages
+
+    // Send messages 0 .. 5, each 100 bytes
+    for (int i = 0; i < 6; i++) {
+        client::Message m(hundredBytes, q);
+        std::stringstream id;
+        id << i;        
+        m.getMessageProperties().setCorrelationId(id.str());
+        f.session.messageTransfer(arg::content=m);
+    }
+
+    // Now send one 400 byte message
+    client::Message m2(fourHundredBytes, q);
+    m2.getMessageProperties().setCorrelationId("6");
+    f.session.messageTransfer(arg::content=m2);
+
+    // expect to see 5, 6 on the queue
+    for (int i = 5; i < 7; i++) {
+        std::stringstream id;
+        id << i;        
+        BOOST_CHECK(f.subs.get(msg, q, qpid::sys::TIME_SEC));
+        BOOST_CHECK_EQUAL(msg.getMessageProperties().getCorrelationId(), id.str());
+    }
+    BOOST_CHECK(!f.subs.get(msg, q));
+
+
+    // C. Try sending a 1000-byte message, should fail - exceeds maxSize of queue
+
+    client::Message m3(thousandBytes, q);
+    m3.getMessageProperties().setCorrelationId("6");
+    try {
+        ScopedSuppressLogging sl;
+        f.session.messageTransfer(arg::content=m3);
+        BOOST_FAIL("Ooops - successfully added a 1000 byte message to a 512 byte ring queue ..."); 
+    }
+    catch (...) {
+    }
+            
+}
+
+
 QPID_AUTO_TEST_CASE(testStrictRingPolicy)
 {
     FieldTable args;



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org