You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2011/02/14 16:05:36 UTC

svn commit: r1070515 - in /qpid/branches/qpid-2935/qpid/cpp/src: qpid/broker/Broker.cpp qpid/broker/Broker.h qpid/broker/QueueFlowLimit.cpp qpid/broker/QueueFlowLimit.h tests/QueueFlowLimitTest.cpp tests/QueuePolicyTest.cpp tests/QueueTest.cpp

Author: kgiusti
Date: Mon Feb 14 15:05:35 2011
New Revision: 1070515

URL: http://svn.apache.org/viewvc?rev=1070515&view=rev
Log:
QPID-2935: make flow control enabled by default

Modified:
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.cpp
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.h
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
    qpid/branches/qpid-2935/qpid/cpp/src/tests/QueueFlowLimitTest.cpp
    qpid/branches/qpid-2935/qpid/cpp/src/tests/QueuePolicyTest.cpp
    qpid/branches/qpid-2935/qpid/cpp/src/tests/QueueTest.cpp

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1070515&r1=1070514&r2=1070515&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.cpp Mon Feb 14 15:05:35 2011
@@ -31,6 +31,7 @@
 #include "qpid/broker/TopicExchange.h"
 #include "qpid/broker/Link.h"
 #include "qpid/broker/ExpiryPolicy.h"
+#include "qpid/broker/QueueFlowLimit.h"
 
 #include "qmf/org/apache/qpid/broker/Package.h"
 #include "qmf/org/apache/qpid/broker/ArgsBrokerEcho.h"
@@ -102,7 +103,9 @@ Broker::Options::Options(const std::stri
     requireEncrypted(false),
     maxSessionRate(0),
     asyncQueueEvents(false),     // Must be false in a cluster.
-    qmf2Support(false)
+    qmf2Support(false),
+    queueFlowStopRatio(80),
+    queueFlowResumeRatio(70)
 {
     int c = sys::SystemInfo::concurrency();
     workerThreads=c+1;
@@ -134,7 +137,9 @@ Broker::Options::Options(const std::stri
         ("known-hosts-url", optValue(knownHosts, "URL or 'none'"), "URL to send as 'known-hosts' to clients ('none' implies empty list)")
         ("sasl-config", optValue(saslConfigPath, "FILE"), "gets sasl config from nonstandard location")
         ("max-session-rate", optValue(maxSessionRate, "MESSAGES/S"), "Sets the maximum message rate per session (0=unlimited)")
-        ("async-queue-events", optValue(asyncQueueEvents, "yes|no"), "Set Queue Events async, used for services like replication");
+        ("async-queue-events", optValue(asyncQueueEvents, "yes|no"), "Set Queue Events async, used for services like replication")
+        ("default-flow-stop-threshold", optValue(queueFlowStopRatio, "%MESSAGES"), "Queue capacity level at which flow control is activated.")
+        ("default-flow-resume-threshold", optValue(queueFlowResumeRatio, "%MESSAGES"), "Queue capacity level at which flow control is de-activated.");
 }
 
 const std::string empty;
@@ -219,6 +224,7 @@ Broker::Broker(const Broker::Options& co
     }
 
     QueuePolicy::setDefaultMaxSize(conf.queueLimit);
+    QueueFlowLimit::setDefaults(conf.queueLimit, conf.queueFlowStopRatio, conf.queueFlowResumeRatio);
     queues.setQueueEvents(&queueEvents);
 
     // Early-Initialize plugins

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.h?rev=1070515&r1=1070514&r2=1070515&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.h Mon Feb 14 15:05:35 2011
@@ -115,6 +115,8 @@ public:
         uint32_t maxSessionRate;
         bool asyncQueueEvents;
         bool qmf2Support;
+        uint queueFlowStopRatio;    // producer flow control: on
+        uint queueFlowResumeRatio;  // producer flow control: off
 
       private:
         std::string getHome();

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp?rev=1070515&r1=1070514&r2=1070515&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp Mon Feb 14 15:05:35 2011
@@ -219,7 +219,7 @@ void QueueFlowLimit::encode(Buffer& buff
 }
 
 
-void QueueFlowLimit::decode ( Buffer& buffer ) 
+void QueueFlowLimit::decode ( Buffer& buffer )
 {
   flowStopCount   = buffer.getLong();
   flowResumeCount = buffer.getLong();
@@ -244,21 +244,58 @@ const std::string QueueFlowLimit::flowSt
 const std::string QueueFlowLimit::flowResumeCountKey("qpid.flow_resume_count");
 const std::string QueueFlowLimit::flowStopSizeKey("qpid.flow_stop_size");
 const std::string QueueFlowLimit::flowResumeSizeKey("qpid.flow_resume_size");
+uint64_t QueueFlowLimit::defaultMaxSize;
+uint QueueFlowLimit::defaultFlowStopRatio;
+uint QueueFlowLimit::defaultFlowResumeRatio;
+
+
+void QueueFlowLimit::setDefaults(uint64_t maxQueueSize, uint flowStopRatio, uint flowResumeRatio)
+{
+    defaultMaxSize = maxQueueSize;
+    defaultFlowStopRatio = flowStopRatio;
+    defaultFlowResumeRatio = flowResumeRatio;
+
+    /** @todo Verify valid range on Broker::Options instead of here */
+    if (flowStopRatio > 100 || flowResumeRatio > 100)
+        throw InvalidArgumentException(QPID_MSG("Default queue flow ratios must be between 0 and 100, inclusive:"
+                                                << " flowStopRatio=" << flowStopRatio
+                                                << " flowResumeRatio=" << flowResumeRatio));
+    if (flowResumeRatio > flowStopRatio)
+        throw InvalidArgumentException(QPID_MSG("Default queue flow stop ratio must be >= flow resume ratio:"
+                                                << " flowStopRatio=" << flowStopRatio
+                                                << " flowResumeRatio=" << flowResumeRatio));
+}
 
 
 std::auto_ptr<QueueFlowLimit> QueueFlowLimit::createQueueFlowLimit(Queue *queue, const qpid::framing::FieldTable& settings)
 {
-    uint32_t flowStopCount = getCapacity(settings, flowStopCountKey, 0);
-    uint32_t flowResumeCount = getCapacity(settings, flowResumeCountKey, 0);
-    uint64_t flowStopSize = getCapacity(settings, flowStopSizeKey, 0);
-    uint64_t flowResumeSize = getCapacity(settings, flowResumeSizeKey, 0);
+    std::string type(QueuePolicy::getType(settings));
 
-    if (flowStopCount || flowResumeCount || flowStopSize || flowResumeSize) {
+    if (type == QueuePolicy::RING || type == QueuePolicy::RING_STRICT) {
+        // The size of a RING queue is limited by design - no need for flow control.
+        return std::auto_ptr<QueueFlowLimit>();
+    }
+
+    if (settings.get(flowStopCountKey) || settings.get(flowStopSizeKey)) {
+        uint32_t flowStopCount = getCapacity(settings, flowStopCountKey, 0);
+        uint32_t flowResumeCount = getCapacity(settings, flowResumeCountKey, 0);
+        uint64_t flowStopSize = getCapacity(settings, flowStopSizeKey, 0);
+        uint64_t flowResumeSize = getCapacity(settings, flowResumeSizeKey, 0);
+        if (flowStopCount == 0 && flowStopSize == 0) {   // disable flow control
+            return std::auto_ptr<QueueFlowLimit>();
+        }
         return std::auto_ptr<QueueFlowLimit>(new QueueFlowLimit(queue, flowStopCount, flowResumeCount,
                                                                 flowStopSize, flowResumeSize));
-    } else {
-        return std::auto_ptr<QueueFlowLimit>();
     }
+
+    if (defaultFlowStopRatio) {
+        uint64_t maxByteCount = getCapacity(settings, QueuePolicy::maxSizeKey, defaultMaxSize);
+        uint64_t flowStopSize = (uint64_t)(maxByteCount * (defaultFlowStopRatio/100.0) + 0.5);
+        uint64_t flowResumeSize = (uint64_t)(maxByteCount * (defaultFlowResumeRatio/100.0));
+
+        return std::auto_ptr<QueueFlowLimit>(new QueueFlowLimit(queue, 0, 0, flowStopSize, flowResumeSize));
+    }
+    return std::auto_ptr<QueueFlowLimit>();
 }
 
 

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.h?rev=1070515&r1=1070514&r2=1070515&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.h Mon Feb 14 15:05:35 2011
@@ -52,6 +52,10 @@ namespace broker {
  */
 class QueueFlowLimit
 {
+    static uint64_t defaultMaxSize;
+    static uint defaultFlowStopRatio;
+    static uint defaultFlowResumeRatio;
+
     Queue *queue;
     std::string queueName;
 
@@ -92,6 +96,8 @@ class QueueFlowLimit
     uint32_t encodedSize() const;
 
     static QPID_BROKER_EXTERN std::auto_ptr<QueueFlowLimit> createQueueFlowLimit(Queue *queue, const qpid::framing::FieldTable& settings);
+    static QPID_BROKER_EXTERN void setDefaults(uint64_t defaultMaxSize, uint defaultFlowStopRatio, uint defaultFlowResumeRatio);
+
     friend QPID_BROKER_EXTERN std::ostream& operator<<(std::ostream&, const QueueFlowLimit&);
 
  protected:

Modified: qpid/branches/qpid-2935/qpid/cpp/src/tests/QueueFlowLimitTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/tests/QueueFlowLimitTest.cpp?rev=1070515&r1=1070514&r2=1070515&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/tests/QueueFlowLimitTest.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/tests/QueueFlowLimitTest.cpp Mon Feb 14 15:05:35 2011
@@ -23,6 +23,7 @@
 #include "unit_test.h"
 #include "test_tools.h"
 
+#include "qpid/broker/QueuePolicy.h"
 #include "qpid/broker/QueueFlowLimit.h"
 #include "qpid/sys/Time.h"
 #include "qpid/framing/reply_exceptions.h"
@@ -302,6 +303,106 @@ QPID_AUTO_TEST_CASE(testFlowCombo)
 }
 
 
+QPID_AUTO_TEST_CASE(testFlowDefaultArgs)
+{
+    QueueFlowLimit::setDefaults(2950001, // max queue byte count
+                                80,     // 80% stop threshold
+                                70);    // 70% resume threshold
+    FieldTable args;
+    std::auto_ptr<QueueFlowLimit> flow(QueueFlowLimit::createQueueFlowLimit(0, args));
+
+    BOOST_CHECK_EQUAL((uint64_t) 2360001, flow->getFlowStopSize());
+    BOOST_CHECK_EQUAL((uint64_t) 2065000, flow->getFlowResumeSize());
+    BOOST_CHECK_EQUAL( 0, flow->getFlowStopCount());
+    BOOST_CHECK_EQUAL( 0, flow->getFlowResumeCount());
+    BOOST_CHECK(!flow->isFlowControlActive());
+    BOOST_CHECK(flow->monitorFlowControl());
+}
+
+
+QPID_AUTO_TEST_CASE(testFlowOverrideArgs)
+{
+    QueueFlowLimit::setDefaults(2950001, // max queue byte count
+                                80,     // 80% stop threshold
+                                70);    // 70% resume threshold
+    {
+        FieldTable args;
+        args.setInt(QueueFlowLimit::flowStopCountKey, 35000);
+        args.setInt(QueueFlowLimit::flowResumeCountKey, 30000);
+        std::auto_ptr<QueueFlowLimit> flow(QueueFlowLimit::createQueueFlowLimit(0, args));
+
+        BOOST_CHECK_EQUAL((uint32_t) 35000, flow->getFlowStopCount());
+        BOOST_CHECK_EQUAL((uint32_t) 30000, flow->getFlowResumeCount());
+        BOOST_CHECK_EQUAL((uint64_t) 0, flow->getFlowStopSize());
+        BOOST_CHECK_EQUAL((uint64_t) 0, flow->getFlowResumeSize());
+        BOOST_CHECK(!flow->isFlowControlActive());
+        BOOST_CHECK(flow->monitorFlowControl());
+    }
+    {
+        FieldTable args;
+        args.setInt(QueueFlowLimit::flowStopSizeKey, 350000);
+        args.setInt(QueueFlowLimit::flowResumeSizeKey, 300000);
+        std::auto_ptr<QueueFlowLimit> flow(QueueFlowLimit::createQueueFlowLimit(0, args));
+
+        BOOST_CHECK_EQUAL((uint32_t) 0, flow->getFlowStopCount());
+        BOOST_CHECK_EQUAL((uint32_t) 0, flow->getFlowResumeCount());
+        BOOST_CHECK_EQUAL((uint64_t) 350000, flow->getFlowStopSize());
+        BOOST_CHECK_EQUAL((uint64_t) 300000, flow->getFlowResumeSize());
+        BOOST_CHECK(!flow->isFlowControlActive());
+        BOOST_CHECK(flow->monitorFlowControl());
+    }
+    {
+        FieldTable args;
+        args.setInt(QueueFlowLimit::flowStopCountKey, 35000);
+        args.setInt(QueueFlowLimit::flowResumeCountKey, 30000);
+        args.setInt(QueueFlowLimit::flowStopSizeKey, 350000);
+        args.setInt(QueueFlowLimit::flowResumeSizeKey, 300000);
+        std::auto_ptr<QueueFlowLimit> flow(QueueFlowLimit::createQueueFlowLimit(0, args));
+
+        BOOST_CHECK_EQUAL((uint32_t) 35000, flow->getFlowStopCount());
+        BOOST_CHECK_EQUAL((uint32_t) 30000, flow->getFlowResumeCount());
+        BOOST_CHECK_EQUAL((uint64_t) 350000, flow->getFlowStopSize());
+        BOOST_CHECK_EQUAL((uint64_t) 300000, flow->getFlowResumeSize());
+        BOOST_CHECK(!flow->isFlowControlActive());
+        BOOST_CHECK(flow->monitorFlowControl());
+    }
+}
+
+
+QPID_AUTO_TEST_CASE(testFlowOverrideDefaults)
+{
+    QueueFlowLimit::setDefaults(2950001, // max queue byte count
+                                97,     // stop threshold
+                                73);    // resume threshold
+    FieldTable args;
+    std::auto_ptr<QueueFlowLimit> flow(QueueFlowLimit::createQueueFlowLimit(0, args));
+
+    BOOST_CHECK_EQUAL((uint32_t) 2861501, flow->getFlowStopSize());
+    BOOST_CHECK_EQUAL((uint32_t) 2153500, flow->getFlowResumeSize());
+    BOOST_CHECK(!flow->isFlowControlActive());
+    BOOST_CHECK(flow->monitorFlowControl());
+}
+
+
+QPID_AUTO_TEST_CASE(testFlowDisable)
+{
+    {
+        FieldTable args;
+        args.setInt(QueueFlowLimit::flowStopCountKey, 0);
+        std::auto_ptr<QueueFlowLimit> flow(QueueFlowLimit::createQueueFlowLimit(0, args));
+
+        BOOST_CHECK(!flow.get());
+    }
+    {
+        FieldTable args;
+        args.setInt(QueueFlowLimit::flowStopSizeKey, 0);
+        std::auto_ptr<QueueFlowLimit> flow(QueueFlowLimit::createQueueFlowLimit(0, args));
+
+        BOOST_CHECK(!flow.get());
+    }
+}
+
+
 QPID_AUTO_TEST_SUITE_END()
 
 }} // namespace qpid::tests

Modified: qpid/branches/qpid-2935/qpid/cpp/src/tests/QueuePolicyTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/tests/QueuePolicyTest.cpp?rev=1070515&r1=1070514&r2=1070515&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/tests/QueuePolicyTest.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/tests/QueuePolicyTest.cpp Mon Feb 14 15:05:35 2011
@@ -23,6 +23,7 @@
 #include "test_tools.h"
 
 #include "qpid/broker/QueuePolicy.h"
+#include "qpid/broker/QueueFlowLimit.h"
 #include "qpid/client/QueueOptions.h"
 #include "qpid/sys/Time.h"
 #include "qpid/framing/reply_exceptions.h"
@@ -340,6 +341,8 @@ QPID_AUTO_TEST_CASE(testFlowToDiskWithNo
     //fallback to rejecting messages
     QueueOptions args;
     args.setSizePolicy(FLOW_TO_DISK, 0, 5);
+    // Disable flow control, or else we'll never hit the max limit
+    args.setInt(QueueFlowLimit::flowStopCountKey, 0);
 
     ProxySessionFixture f;
     std::string q("my-queue");

Modified: qpid/branches/qpid-2935/qpid/cpp/src/tests/QueueTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/tests/QueueTest.cpp?rev=1070515&r1=1070514&r2=1070515&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/tests/QueueTest.cpp Mon Feb 14 15:05:35 2011
@@ -36,6 +36,9 @@
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/framing/MessageTransferBody.h"
 #include "qpid/framing/reply_exceptions.h"
+#include "qpid/broker/QueuePolicy.h"
+#include "qpid/broker/QueueFlowLimit.h"
+
 #include <iostream>
 #include "boost/format.hpp"
 
@@ -508,6 +511,8 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){
     client::QueueOptions args;
     // set queue mode
     args.setOrdering(client::LVQ);
+    // disable flow control, as this test violates the enqueue/dequeue sequence.
+    args.setInt(QueueFlowLimit::flowStopCountKey, 0);
 
     Queue::shared_ptr queue(new Queue("my-queue", true ));
     queue->configure(args);



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org