You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by pm...@apache.org on 2013/11/19 16:16:11 UTC

svn commit: r1543449 - in /qpid/trunk/qpid/cpp/src: qpid/broker/QueueFactory.cpp qpid/broker/QueueFlowLimit.cpp qpid/broker/QueueFlowLimit.h tests/QueueFlowLimitTest.cpp

Author: pmoravec
Date: Tue Nov 19 15:16:11 2013
New Revision: 1543449

URL: http://svn.apache.org/r1543449
Log:
QPID-5278 , QPID-5281: Queue flow limit validation ignores size parameters , Creating a queue with invalid settings results in no queue but only its management object exists

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueFactory.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
    qpid/trunk/qpid/cpp/src/tests/QueueFlowLimitTest.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueFactory.cpp?rev=1543449&r1=1543448&r2=1543449&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueFactory.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueFactory.cpp Tue Nov 19 15:16:11 2013
@@ -48,6 +48,7 @@ QueueFactory::QueueFactory() : broker(0)
 boost::shared_ptr<Queue> QueueFactory::create(const std::string& name, const QueueSettings& settings)
 {
     settings.validate();
+    boost::shared_ptr<QueueFlowLimit> flow_ptr(QueueFlowLimit::createLimit(name, settings));
 
     //1. determine Queue type (i.e. whether we are subclassing Queue)
     // -> if 'ring' policy is in use then subclass
@@ -100,7 +101,9 @@ boost::shared_ptr<Queue> QueueFactory::c
         ThresholdAlerts::observe(*queue, *(broker->getManagementAgent()), settings, broker->getOptions().queueThresholdEventRatio);
     }
     //5. flow control config
-    QueueFlowLimit::observe(*queue, settings);
+    if (flow_ptr) {
+	flow_ptr->observe(*queue);
+    }
 
     return queue;
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp?rev=1543449&r1=1543448&r2=1543449&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp Tue Nov 19 15:16:11 2013
@@ -34,6 +34,8 @@
 
 #include <sstream>
 
+#include <boost/enable_shared_from_this.hpp>
+
 using namespace qpid::broker;
 using namespace qpid::framing;
 
@@ -44,47 +46,34 @@ namespace {
     template <typename T>
     void validateFlowConfig(T max, T& stop, T& resume, const std::string& type, const std::string& queue)
     {
-        if (resume > stop) {
-            throw InvalidArgumentException(QPID_MSG("Queue \"" << queue << "\": qpid.flow_resume_" << type
+        if (stop) {
+            if (resume > stop) {
+                throw InvalidArgumentException(QPID_MSG("Queue \"" << queue << "\": qpid.flow_resume_" << type
                                                     << "=" << resume
-                                                    << " must be less than qpid.flow_stop_" << type
+                                                    << " must be less or equal to qpid.flow_stop_" << type
                                                     << "=" << stop));
-        }
-        if (resume == 0) resume = stop;
-        if (max != 0 && (max < stop)) {
-            throw InvalidArgumentException(QPID_MSG("Queue \"" << queue << "\": qpid.flow_stop_" << type
+            }
+            if (resume == 0) resume = stop;
+            if (max != 0 && (max < stop)) {
+                throw InvalidArgumentException(QPID_MSG("Queue \"" << queue << "\": qpid.flow_stop_" << type
                                                     << "=" << stop
                                                     << " must be less than qpid.max_" << type
                                                     << "=" << max));
+            }
         }
     }
 }
 
 
 
-QueueFlowLimit::QueueFlowLimit(Queue *_queue,
+QueueFlowLimit::QueueFlowLimit(const std::string& _queueName,
                                uint32_t _flowStopCount, uint32_t _flowResumeCount,
                                uint64_t _flowStopSize,  uint64_t _flowResumeSize)
-    : queue(_queue), queueName("<unknown>"),
+    : queue(0), queueName(_queueName),
       flowStopCount(_flowStopCount), flowResumeCount(_flowResumeCount),
       flowStopSize(_flowStopSize), flowResumeSize(_flowResumeSize),
       flowStopped(false), count(0), size(0), broker(0)
 {
-    uint32_t maxCount(0);
-    uint64_t maxSize(0);
-
-    if (queue) {
-        queueName = _queue->getName();
-        if (queue->getSettings().maxDepth.hasCount()) maxCount = queue->getSettings().maxDepth.getCount();
-        if (queue->getSettings().maxDepth.hasCount()) maxSize = queue->getSettings().maxDepth.getSize();
-        broker = queue->getBroker();
-        queueMgmtObj = boost::dynamic_pointer_cast<_qmfBroker::Queue> (queue->GetManagementObject());
-        if (queueMgmtObj) {
-            queueMgmtObj->set_flowStopped(isFlowControlActive());
-        }
-    }
-    validateFlowConfig( maxCount, flowStopCount, flowResumeCount, "count", queueName );
-    validateFlowConfig( maxSize, flowStopSize, flowResumeSize, "size", queueName );
     QPID_LOG(info, "Queue \"" << queueName << "\": Flow limit created: flowStopCount=" << flowStopCount
              << ", flowResumeCount=" << flowResumeCount
              << ", flowStopSize=" << flowStopSize << ", flowResumeSize=" << flowResumeSize );
@@ -245,50 +234,62 @@ void QueueFlowLimit::setDefaults(uint64_
 }
 
 
-void QueueFlowLimit::observe(Queue& queue, const QueueSettings& settings)
+void QueueFlowLimit::observe(Queue& queue)
 {
-    QueueFlowLimit *ptr = createLimit( &queue, settings );
-    if (ptr) {
-        boost::shared_ptr<QueueFlowLimit> observer(ptr);
-        queue.addObserver(observer);
+    /* set up management stuff */
+    broker = queue.getBroker();
+    queueMgmtObj = boost::dynamic_pointer_cast<_qmfBroker::Queue> (queue.GetManagementObject());
+    if (queueMgmtObj) {
+        queueMgmtObj->set_flowStopped(isFlowControlActive());
     }
+
+    /* set up the observer */
+    queue.addObserver(shared_from_this());
 }
 
 /** returns ptr to a QueueFlowLimit, else 0 if no limit */
-QueueFlowLimit *QueueFlowLimit::createLimit(Queue *queue, const QueueSettings& settings)
+boost::shared_ptr<QueueFlowLimit> QueueFlowLimit::createLimit(const std::string& queueName, const QueueSettings& settings)
 {
     if (settings.dropMessagesAtLimit) {
         // The size of a RING queue is limited by design - no need for flow control.
-        return 0;
+        return boost::shared_ptr<QueueFlowLimit>();
     }
+    if ((!settings.flowStop.hasCount()) && (!settings.flowStop.hasSize()) && (settings.flowResume.hasCount() || settings.flowResume.hasSize()))
+        QPID_LOG(warning, "queue " << queueName << ": user-configured flow limits are ignored as no stop limits provided");
 
-    if (settings.flowStop.hasCount() || settings.flowStop.hasSize()) {
-        // user provided (some) flow settings manually...
-        if (settings.flowStop.getCount() || settings.flowStop.getSize()) {
-            return new QueueFlowLimit(queue,
-                                      settings.flowStop.getCount(),
-                                      settings.flowResume.getCount(),
-                                      settings.flowStop.getSize(),
-                                      settings.flowResume.getSize());
-        } else {
-            //don't have a non-zero value for either the count or the
-            //size to stop at, yet at least one of these settings was
-            //provided, i.e it was set to 0 explicitly which we treat
-            //as turning it off
-            return 0;
-        }
-    }
+    uint32_t flowStopCount(0), flowResumeCount(0), maxMsgCount(settings.maxDepth.hasCount() ? settings.maxDepth.getCount() : 0);
+    uint64_t flowStopSize(0), flowResumeSize(0), maxByteCount(settings.maxDepth.hasSize() ? settings.maxDepth.getSize() : defaultMaxSize);
 
+    // pre-fill by defaults, if exist
     if (defaultFlowStopRatio) {   // broker has a default ratio setup...
-        uint64_t maxByteCount = settings.maxDepth.hasSize() ? settings.maxDepth.getSize() : defaultMaxSize;
-        uint64_t flowStopSize = (uint64_t)(maxByteCount * (defaultFlowStopRatio/100.0) + 0.5);
-        uint64_t flowResumeSize = (uint64_t)(maxByteCount * (defaultFlowResumeRatio/100.0));
-        uint32_t maxMsgCount =  settings.maxDepth.hasCount() ? settings.maxDepth.getCount() : 0;
-        uint32_t flowStopCount = (uint32_t)(maxMsgCount * (defaultFlowStopRatio/100.0) + 0.5);
-        uint32_t flowResumeCount = (uint32_t)(maxMsgCount * (defaultFlowResumeRatio/100.0));
-        return new QueueFlowLimit(queue, flowStopCount, flowResumeCount, flowStopSize, flowResumeSize);
+        flowStopSize = (uint64_t)(maxByteCount * (defaultFlowStopRatio/100.0) + 0.5);
+        flowStopCount = (uint32_t)(maxMsgCount * (defaultFlowStopRatio/100.0) + 0.5);
     }
-    return 0;
+
+    if (defaultFlowResumeRatio) {   // broker has a default ratio setup...
+        flowResumeSize = (uint64_t)(maxByteCount * (defaultFlowResumeRatio/100.0));
+	flowResumeCount = (uint32_t)(maxMsgCount * (defaultFlowResumeRatio/100.0));
+    }
+
+    // update by user-specified thresholds
+    if (settings.flowStop.hasCount())
+        flowStopCount = settings.flowStop.getCount();
+    if (settings.flowStop.hasSize())
+        flowStopSize = settings.flowStop.getSize();
+    if (settings.flowResume.hasCount())
+        flowResumeCount = settings.flowResume.getCount();
+    if (settings.flowResume.hasSize())
+        flowResumeSize = settings.flowResume.getSize();
+
+    if (flowStopCount || flowStopSize) {
+	validateFlowConfig(maxMsgCount, flowStopCount, flowResumeCount, "count", queueName );
+        validateFlowConfig(maxByteCount, flowStopSize, flowResumeSize, "size", queueName );
+        return boost::shared_ptr<QueueFlowLimit>(new QueueFlowLimit(queueName, flowStopCount, flowResumeCount, flowStopSize, flowResumeSize));
+    }
+    else
+        //don't have a non-zero value for either the count or the
+        //size to stop at, so no flow limit applicable
+        return boost::shared_ptr<QueueFlowLimit>();
 }
 
 namespace qpid {

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.h?rev=1543449&r1=1543448&r2=1543449&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.h Tue Nov 19 15:16:11 2013
@@ -33,6 +33,8 @@
 #include "qpid/sys/Mutex.h"
 #include "qmf/org/apache/qpid/broker/Queue.h"
 
+#include <boost/enable_shared_from_this.hpp>
+
 namespace _qmfBroker = qmf::org::apache::qpid::broker;
 
 namespace qpid {
@@ -50,7 +52,7 @@ struct QueueSettings;
  * passing _either_ level may turn flow control ON, but _both_ must be
  * below level before flow control will be turned OFF.
  */
- class QueueFlowLimit : public QueueObserver
+ class QueueFlowLimit : public QueueObserver, public boost::enable_shared_from_this<QueueFlowLimit>
 {
     static uint64_t defaultMaxSize;
     static uint defaultFlowStopRatio;
@@ -99,7 +101,8 @@ struct QueueSettings;
     void decode(framing::Buffer& buffer);
     uint32_t encodedSize() const;
 
-    static QPID_BROKER_EXTERN void observe(Queue& queue, const QueueSettings& settings);
+    QPID_BROKER_EXTERN void observe(Queue& queue);
+    static QPID_BROKER_EXTERN boost::shared_ptr<QueueFlowLimit> createLimit(const std::string& queueName, const QueueSettings& settings);
     static QPID_BROKER_EXTERN void setDefaults(uint64_t defaultMaxSize, uint defaultFlowStopRatio, uint defaultFlowResumeRatio);
 
     friend QPID_BROKER_EXTERN std::ostream& operator<<(std::ostream&, const QueueFlowLimit&);
@@ -113,10 +116,9 @@ struct QueueSettings;
 
     const Broker *broker;
 
-    QPID_BROKER_EXTERN QueueFlowLimit(Queue *queue,
-                   uint32_t flowStopCount, uint32_t flowResumeCount,
-                   uint64_t flowStopSize,  uint64_t flowResumeSize);
-    static QPID_BROKER_EXTERN QueueFlowLimit *createLimit(Queue *queue, const QueueSettings& settings);
+    QPID_BROKER_EXTERN QueueFlowLimit(const std::string& _queueName,
+                   uint32_t _flowStopCount, uint32_t _flowResumeCount,
+                   uint64_t _flowStopSize,  uint64_t _flowResumeSize);
 };
 
 }}

Modified: qpid/trunk/qpid/cpp/src/tests/QueueFlowLimitTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/QueueFlowLimitTest.cpp?rev=1543449&r1=1543448&r2=1543449&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/QueueFlowLimitTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/QueueFlowLimitTest.cpp Tue Nov 19 15:16:11 2013
@@ -46,7 +46,7 @@ class TestFlow : public QueueFlowLimit
 public:
     TestFlow(uint32_t flowStopCount, uint32_t flowResumeCount,
              uint64_t flowStopSize, uint64_t flowResumeSize) :
-        QueueFlowLimit(0, flowStopCount, flowResumeCount, flowStopSize, flowResumeSize)
+        QueueFlowLimit("", flowStopCount, flowResumeCount, flowStopSize, flowResumeSize)
     {}
     virtual ~TestFlow() {}
 
@@ -66,11 +66,11 @@ public:
         return new TestFlow(flowStopCount, flowResumeCount, flowStopSize, flowResumeSize);
     }
 
-    static QueueFlowLimit *getQueueFlowLimit(const qpid::framing::FieldTable& arguments)
+    static boost::shared_ptr<qpid::broker::QueueFlowLimit> getQueueFlowLimit(const qpid::framing::FieldTable& arguments)
     {
         QueueSettings settings;
         settings.populate(arguments, settings.storeSettings);
-        return QueueFlowLimit::createLimit(0, settings);
+        return QueueFlowLimit::createLimit("", settings);
     }
 };
 
@@ -357,10 +357,9 @@ QPID_AUTO_TEST_CASE(testFlowDefaultArgs)
                                 80,     // 80% stop threshold
                                 70);    // 70% resume threshold
     FieldTable args;
-    QueueFlowLimit *ptr = TestFlow::getQueueFlowLimit(args);
+    boost::shared_ptr<QueueFlowLimit> flow = TestFlow::getQueueFlowLimit(args);
+    BOOST_CHECK(flow);
 
-    BOOST_CHECK(ptr);
-    std::auto_ptr<QueueFlowLimit> flow(ptr);
     BOOST_CHECK_EQUAL((uint64_t) 2360001, flow->getFlowStopSize());
     BOOST_CHECK_EQUAL((uint64_t) 2065000, flow->getFlowResumeSize());
     BOOST_CHECK_EQUAL( 0u, flow->getFlowStopCount());
@@ -372,17 +371,17 @@ QPID_AUTO_TEST_CASE(testFlowDefaultArgs)
 
 QPID_AUTO_TEST_CASE(testFlowOverrideArgs)
 {
-    QueueFlowLimit::setDefaults(2950001, // max queue byte count
+    QueueFlowLimit::setDefaults(0, // max queue byte count
                                 80,     // 80% stop threshold
                                 70);    // 70% resume threshold
     {
         FieldTable args;
         args.setInt(QueueFlowLimit::flowStopCountKey, 35000);
         args.setInt(QueueFlowLimit::flowResumeCountKey, 30000);
+//	args.setInt(QueueFlowLimit::flowStopSizeKey, 0);
 
-        QueueFlowLimit *ptr = TestFlow::getQueueFlowLimit(args);
-        BOOST_CHECK(ptr);
-        std::auto_ptr<QueueFlowLimit> flow(ptr);
+        boost::shared_ptr<QueueFlowLimit> flow = TestFlow::getQueueFlowLimit(args);
+        BOOST_CHECK(flow);
 
         BOOST_CHECK_EQUAL((uint32_t) 35000, flow->getFlowStopCount());
         BOOST_CHECK_EQUAL((uint32_t) 30000, flow->getFlowResumeCount());
@@ -396,9 +395,8 @@ QPID_AUTO_TEST_CASE(testFlowOverrideArgs
         args.setInt(QueueFlowLimit::flowStopSizeKey, 350000);
         args.setInt(QueueFlowLimit::flowResumeSizeKey, 300000);
 
-        QueueFlowLimit *ptr = TestFlow::getQueueFlowLimit(args);
-        BOOST_CHECK(ptr);
-        std::auto_ptr<QueueFlowLimit> flow(ptr);
+        boost::shared_ptr<QueueFlowLimit> flow = TestFlow::getQueueFlowLimit(args);
+        BOOST_CHECK(flow);
 
         BOOST_CHECK_EQUAL((uint32_t) 0, flow->getFlowStopCount());
         BOOST_CHECK_EQUAL((uint32_t) 0, flow->getFlowResumeCount());
@@ -414,9 +412,8 @@ QPID_AUTO_TEST_CASE(testFlowOverrideArgs
         args.setInt(QueueFlowLimit::flowStopSizeKey, 350000);
         args.setInt(QueueFlowLimit::flowResumeSizeKey, 300000);
 
-        QueueFlowLimit *ptr = TestFlow::getQueueFlowLimit(args);
-        BOOST_CHECK(ptr);
-        std::auto_ptr<QueueFlowLimit> flow(ptr);
+        boost::shared_ptr<QueueFlowLimit> flow = TestFlow::getQueueFlowLimit(args);
+        BOOST_CHECK(flow);
 
         BOOST_CHECK_EQUAL((uint32_t) 35000, flow->getFlowStopCount());
         BOOST_CHECK_EQUAL((uint32_t) 30000, flow->getFlowResumeCount());
@@ -434,9 +431,8 @@ QPID_AUTO_TEST_CASE(testFlowOverrideDefa
                                 97,     // stop threshold
                                 73);    // resume threshold
     FieldTable args;
-    QueueFlowLimit *ptr = TestFlow::getQueueFlowLimit(args);
-    BOOST_CHECK(ptr);
-    std::auto_ptr<QueueFlowLimit> flow(ptr);
+    boost::shared_ptr<QueueFlowLimit> flow = TestFlow::getQueueFlowLimit(args);
+    BOOST_CHECK(flow);
 
     BOOST_CHECK_EQUAL((uint32_t) 2861501, flow->getFlowStopSize());
     BOOST_CHECK_EQUAL((uint32_t) 2153500, flow->getFlowResumeSize());
@@ -450,14 +446,9 @@ QPID_AUTO_TEST_CASE(testFlowDisable)
     {
         FieldTable args;
         args.setInt(QueueFlowLimit::flowStopCountKey, 0);
-        QueueFlowLimit *ptr = TestFlow::getQueueFlowLimit(args);
-        BOOST_CHECK(!ptr);
-    }
-    {
-        FieldTable args;
         args.setInt(QueueFlowLimit::flowStopSizeKey, 0);
-        QueueFlowLimit *ptr = TestFlow::getQueueFlowLimit(args);
-        BOOST_CHECK(!ptr);
+        boost::shared_ptr<QueueFlowLimit> flow = TestFlow::getQueueFlowLimit(args);
+        BOOST_CHECK(!flow);
     }
 }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org