You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by pm...@apache.org on 2013/11/19 16:16:11 UTC
svn commit: r1543449 - in /qpid/trunk/qpid/cpp/src:
qpid/broker/QueueFactory.cpp qpid/broker/QueueFlowLimit.cpp
qpid/broker/QueueFlowLimit.h tests/QueueFlowLimitTest.cpp
Author: pmoravec
Date: Tue Nov 19 15:16:11 2013
New Revision: 1543449
URL: http://svn.apache.org/r1543449
Log:
QPID-5278 , QPID-5281: Queue flow limit validation ignores size parameters , Creating a queue with invalid settings results in no queue but only its management object exists
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/QueueFactory.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
qpid/trunk/qpid/cpp/src/tests/QueueFlowLimitTest.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueFactory.cpp?rev=1543449&r1=1543448&r2=1543449&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueFactory.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueFactory.cpp Tue Nov 19 15:16:11 2013
@@ -48,6 +48,7 @@ QueueFactory::QueueFactory() : broker(0)
boost::shared_ptr<Queue> QueueFactory::create(const std::string& name, const QueueSettings& settings)
{
settings.validate();
+ boost::shared_ptr<QueueFlowLimit> flow_ptr(QueueFlowLimit::createLimit(name, settings));
//1. determine Queue type (i.e. whether we are subclassing Queue)
// -> if 'ring' policy is in use then subclass
@@ -100,7 +101,9 @@ boost::shared_ptr<Queue> QueueFactory::c
ThresholdAlerts::observe(*queue, *(broker->getManagementAgent()), settings, broker->getOptions().queueThresholdEventRatio);
}
//5. flow control config
- QueueFlowLimit::observe(*queue, settings);
+ if (flow_ptr) {
+ flow_ptr->observe(*queue);
+ }
return queue;
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp?rev=1543449&r1=1543448&r2=1543449&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp Tue Nov 19 15:16:11 2013
@@ -34,6 +34,8 @@
#include <sstream>
+#include <boost/enable_shared_from_this.hpp>
+
using namespace qpid::broker;
using namespace qpid::framing;
@@ -44,47 +46,34 @@ namespace {
template <typename T>
void validateFlowConfig(T max, T& stop, T& resume, const std::string& type, const std::string& queue)
{
- if (resume > stop) {
- throw InvalidArgumentException(QPID_MSG("Queue \"" << queue << "\": qpid.flow_resume_" << type
+ if (stop) {
+ if (resume > stop) {
+ throw InvalidArgumentException(QPID_MSG("Queue \"" << queue << "\": qpid.flow_resume_" << type
<< "=" << resume
- << " must be less than qpid.flow_stop_" << type
+ << " must be less or equal to qpid.flow_stop_" << type
<< "=" << stop));
- }
- if (resume == 0) resume = stop;
- if (max != 0 && (max < stop)) {
- throw InvalidArgumentException(QPID_MSG("Queue \"" << queue << "\": qpid.flow_stop_" << type
+ }
+ if (resume == 0) resume = stop;
+ if (max != 0 && (max < stop)) {
+ throw InvalidArgumentException(QPID_MSG("Queue \"" << queue << "\": qpid.flow_stop_" << type
<< "=" << stop
<< " must be less than qpid.max_" << type
<< "=" << max));
+ }
}
}
}
-QueueFlowLimit::QueueFlowLimit(Queue *_queue,
+QueueFlowLimit::QueueFlowLimit(const std::string& _queueName,
uint32_t _flowStopCount, uint32_t _flowResumeCount,
uint64_t _flowStopSize, uint64_t _flowResumeSize)
- : queue(_queue), queueName("<unknown>"),
+ : queue(0), queueName(_queueName),
flowStopCount(_flowStopCount), flowResumeCount(_flowResumeCount),
flowStopSize(_flowStopSize), flowResumeSize(_flowResumeSize),
flowStopped(false), count(0), size(0), broker(0)
{
- uint32_t maxCount(0);
- uint64_t maxSize(0);
-
- if (queue) {
- queueName = _queue->getName();
- if (queue->getSettings().maxDepth.hasCount()) maxCount = queue->getSettings().maxDepth.getCount();
- if (queue->getSettings().maxDepth.hasCount()) maxSize = queue->getSettings().maxDepth.getSize();
- broker = queue->getBroker();
- queueMgmtObj = boost::dynamic_pointer_cast<_qmfBroker::Queue> (queue->GetManagementObject());
- if (queueMgmtObj) {
- queueMgmtObj->set_flowStopped(isFlowControlActive());
- }
- }
- validateFlowConfig( maxCount, flowStopCount, flowResumeCount, "count", queueName );
- validateFlowConfig( maxSize, flowStopSize, flowResumeSize, "size", queueName );
QPID_LOG(info, "Queue \"" << queueName << "\": Flow limit created: flowStopCount=" << flowStopCount
<< ", flowResumeCount=" << flowResumeCount
<< ", flowStopSize=" << flowStopSize << ", flowResumeSize=" << flowResumeSize );
@@ -245,50 +234,62 @@ void QueueFlowLimit::setDefaults(uint64_
}
-void QueueFlowLimit::observe(Queue& queue, const QueueSettings& settings)
+void QueueFlowLimit::observe(Queue& queue)
{
- QueueFlowLimit *ptr = createLimit( &queue, settings );
- if (ptr) {
- boost::shared_ptr<QueueFlowLimit> observer(ptr);
- queue.addObserver(observer);
+ /* set up management stuff */
+ broker = queue.getBroker();
+ queueMgmtObj = boost::dynamic_pointer_cast<_qmfBroker::Queue> (queue.GetManagementObject());
+ if (queueMgmtObj) {
+ queueMgmtObj->set_flowStopped(isFlowControlActive());
}
+
+ /* set up the observer */
+ queue.addObserver(shared_from_this());
}
/** returns ptr to a QueueFlowLimit, else 0 if no limit */
-QueueFlowLimit *QueueFlowLimit::createLimit(Queue *queue, const QueueSettings& settings)
+boost::shared_ptr<QueueFlowLimit> QueueFlowLimit::createLimit(const std::string& queueName, const QueueSettings& settings)
{
if (settings.dropMessagesAtLimit) {
// The size of a RING queue is limited by design - no need for flow control.
- return 0;
+ return boost::shared_ptr<QueueFlowLimit>();
}
+ if ((!settings.flowStop.hasCount()) && (!settings.flowStop.hasSize()) && (settings.flowResume.hasCount() || settings.flowResume.hasSize()))
+ QPID_LOG(warning, "queue " << queueName << ": user-configured flow limits are ignored as no stop limits provided");
- if (settings.flowStop.hasCount() || settings.flowStop.hasSize()) {
- // user provided (some) flow settings manually...
- if (settings.flowStop.getCount() || settings.flowStop.getSize()) {
- return new QueueFlowLimit(queue,
- settings.flowStop.getCount(),
- settings.flowResume.getCount(),
- settings.flowStop.getSize(),
- settings.flowResume.getSize());
- } else {
- //don't have a non-zero value for either the count or the
- //size to stop at, yet at least one of these settings was
- //provided, i.e it was set to 0 explicitly which we treat
- //as turning it off
- return 0;
- }
- }
+ uint32_t flowStopCount(0), flowResumeCount(0), maxMsgCount(settings.maxDepth.hasCount() ? settings.maxDepth.getCount() : 0);
+ uint64_t flowStopSize(0), flowResumeSize(0), maxByteCount(settings.maxDepth.hasSize() ? settings.maxDepth.getSize() : defaultMaxSize);
+ // pre-fill by defaults, if exist
if (defaultFlowStopRatio) { // broker has a default ratio setup...
- uint64_t maxByteCount = settings.maxDepth.hasSize() ? settings.maxDepth.getSize() : defaultMaxSize;
- uint64_t flowStopSize = (uint64_t)(maxByteCount * (defaultFlowStopRatio/100.0) + 0.5);
- uint64_t flowResumeSize = (uint64_t)(maxByteCount * (defaultFlowResumeRatio/100.0));
- uint32_t maxMsgCount = settings.maxDepth.hasCount() ? settings.maxDepth.getCount() : 0;
- uint32_t flowStopCount = (uint32_t)(maxMsgCount * (defaultFlowStopRatio/100.0) + 0.5);
- uint32_t flowResumeCount = (uint32_t)(maxMsgCount * (defaultFlowResumeRatio/100.0));
- return new QueueFlowLimit(queue, flowStopCount, flowResumeCount, flowStopSize, flowResumeSize);
+ flowStopSize = (uint64_t)(maxByteCount * (defaultFlowStopRatio/100.0) + 0.5);
+ flowStopCount = (uint32_t)(maxMsgCount * (defaultFlowStopRatio/100.0) + 0.5);
}
- return 0;
+
+ if (defaultFlowResumeRatio) { // broker has a default ratio setup...
+ flowResumeSize = (uint64_t)(maxByteCount * (defaultFlowResumeRatio/100.0));
+ flowResumeCount = (uint32_t)(maxMsgCount * (defaultFlowResumeRatio/100.0));
+ }
+
+ // update by user-specified thresholds
+ if (settings.flowStop.hasCount())
+ flowStopCount = settings.flowStop.getCount();
+ if (settings.flowStop.hasSize())
+ flowStopSize = settings.flowStop.getSize();
+ if (settings.flowResume.hasCount())
+ flowResumeCount = settings.flowResume.getCount();
+ if (settings.flowResume.hasSize())
+ flowResumeSize = settings.flowResume.getSize();
+
+ if (flowStopCount || flowStopSize) {
+ validateFlowConfig(maxMsgCount, flowStopCount, flowResumeCount, "count", queueName );
+ validateFlowConfig(maxByteCount, flowStopSize, flowResumeSize, "size", queueName );
+ return boost::shared_ptr<QueueFlowLimit>(new QueueFlowLimit(queueName, flowStopCount, flowResumeCount, flowStopSize, flowResumeSize));
+ }
+ else
+ //don't have a non-zero value for either the count or the
+ //size to stop at, so no flow limit applicable
+ return boost::shared_ptr<QueueFlowLimit>();
}
namespace qpid {
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.h?rev=1543449&r1=1543448&r2=1543449&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.h Tue Nov 19 15:16:11 2013
@@ -33,6 +33,8 @@
#include "qpid/sys/Mutex.h"
#include "qmf/org/apache/qpid/broker/Queue.h"
+#include <boost/enable_shared_from_this.hpp>
+
namespace _qmfBroker = qmf::org::apache::qpid::broker;
namespace qpid {
@@ -50,7 +52,7 @@ struct QueueSettings;
* passing _either_ level may turn flow control ON, but _both_ must be
* below level before flow control will be turned OFF.
*/
- class QueueFlowLimit : public QueueObserver
+ class QueueFlowLimit : public QueueObserver, public boost::enable_shared_from_this<QueueFlowLimit>
{
static uint64_t defaultMaxSize;
static uint defaultFlowStopRatio;
@@ -99,7 +101,8 @@ struct QueueSettings;
void decode(framing::Buffer& buffer);
uint32_t encodedSize() const;
- static QPID_BROKER_EXTERN void observe(Queue& queue, const QueueSettings& settings);
+ QPID_BROKER_EXTERN void observe(Queue& queue);
+ static QPID_BROKER_EXTERN boost::shared_ptr<QueueFlowLimit> createLimit(const std::string& queueName, const QueueSettings& settings);
static QPID_BROKER_EXTERN void setDefaults(uint64_t defaultMaxSize, uint defaultFlowStopRatio, uint defaultFlowResumeRatio);
friend QPID_BROKER_EXTERN std::ostream& operator<<(std::ostream&, const QueueFlowLimit&);
@@ -113,10 +116,9 @@ struct QueueSettings;
const Broker *broker;
- QPID_BROKER_EXTERN QueueFlowLimit(Queue *queue,
- uint32_t flowStopCount, uint32_t flowResumeCount,
- uint64_t flowStopSize, uint64_t flowResumeSize);
- static QPID_BROKER_EXTERN QueueFlowLimit *createLimit(Queue *queue, const QueueSettings& settings);
+ QPID_BROKER_EXTERN QueueFlowLimit(const std::string& _queueName,
+ uint32_t _flowStopCount, uint32_t _flowResumeCount,
+ uint64_t _flowStopSize, uint64_t _flowResumeSize);
};
}}
Modified: qpid/trunk/qpid/cpp/src/tests/QueueFlowLimitTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/QueueFlowLimitTest.cpp?rev=1543449&r1=1543448&r2=1543449&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/QueueFlowLimitTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/QueueFlowLimitTest.cpp Tue Nov 19 15:16:11 2013
@@ -46,7 +46,7 @@ class TestFlow : public QueueFlowLimit
public:
TestFlow(uint32_t flowStopCount, uint32_t flowResumeCount,
uint64_t flowStopSize, uint64_t flowResumeSize) :
- QueueFlowLimit(0, flowStopCount, flowResumeCount, flowStopSize, flowResumeSize)
+ QueueFlowLimit("", flowStopCount, flowResumeCount, flowStopSize, flowResumeSize)
{}
virtual ~TestFlow() {}
@@ -66,11 +66,11 @@ public:
return new TestFlow(flowStopCount, flowResumeCount, flowStopSize, flowResumeSize);
}
- static QueueFlowLimit *getQueueFlowLimit(const qpid::framing::FieldTable& arguments)
+ static boost::shared_ptr<qpid::broker::QueueFlowLimit> getQueueFlowLimit(const qpid::framing::FieldTable& arguments)
{
QueueSettings settings;
settings.populate(arguments, settings.storeSettings);
- return QueueFlowLimit::createLimit(0, settings);
+ return QueueFlowLimit::createLimit("", settings);
}
};
@@ -357,10 +357,9 @@ QPID_AUTO_TEST_CASE(testFlowDefaultArgs)
80, // 80% stop threshold
70); // 70% resume threshold
FieldTable args;
- QueueFlowLimit *ptr = TestFlow::getQueueFlowLimit(args);
+ boost::shared_ptr<QueueFlowLimit> flow = TestFlow::getQueueFlowLimit(args);
+ BOOST_CHECK(flow);
- BOOST_CHECK(ptr);
- std::auto_ptr<QueueFlowLimit> flow(ptr);
BOOST_CHECK_EQUAL((uint64_t) 2360001, flow->getFlowStopSize());
BOOST_CHECK_EQUAL((uint64_t) 2065000, flow->getFlowResumeSize());
BOOST_CHECK_EQUAL( 0u, flow->getFlowStopCount());
@@ -372,17 +371,17 @@ QPID_AUTO_TEST_CASE(testFlowDefaultArgs)
QPID_AUTO_TEST_CASE(testFlowOverrideArgs)
{
- QueueFlowLimit::setDefaults(2950001, // max queue byte count
+ QueueFlowLimit::setDefaults(0, // max queue byte count
80, // 80% stop threshold
70); // 70% resume threshold
{
FieldTable args;
args.setInt(QueueFlowLimit::flowStopCountKey, 35000);
args.setInt(QueueFlowLimit::flowResumeCountKey, 30000);
+// args.setInt(QueueFlowLimit::flowStopSizeKey, 0);
- QueueFlowLimit *ptr = TestFlow::getQueueFlowLimit(args);
- BOOST_CHECK(ptr);
- std::auto_ptr<QueueFlowLimit> flow(ptr);
+ boost::shared_ptr<QueueFlowLimit> flow = TestFlow::getQueueFlowLimit(args);
+ BOOST_CHECK(flow);
BOOST_CHECK_EQUAL((uint32_t) 35000, flow->getFlowStopCount());
BOOST_CHECK_EQUAL((uint32_t) 30000, flow->getFlowResumeCount());
@@ -396,9 +395,8 @@ QPID_AUTO_TEST_CASE(testFlowOverrideArgs
args.setInt(QueueFlowLimit::flowStopSizeKey, 350000);
args.setInt(QueueFlowLimit::flowResumeSizeKey, 300000);
- QueueFlowLimit *ptr = TestFlow::getQueueFlowLimit(args);
- BOOST_CHECK(ptr);
- std::auto_ptr<QueueFlowLimit> flow(ptr);
+ boost::shared_ptr<QueueFlowLimit> flow = TestFlow::getQueueFlowLimit(args);
+ BOOST_CHECK(flow);
BOOST_CHECK_EQUAL((uint32_t) 0, flow->getFlowStopCount());
BOOST_CHECK_EQUAL((uint32_t) 0, flow->getFlowResumeCount());
@@ -414,9 +412,8 @@ QPID_AUTO_TEST_CASE(testFlowOverrideArgs
args.setInt(QueueFlowLimit::flowStopSizeKey, 350000);
args.setInt(QueueFlowLimit::flowResumeSizeKey, 300000);
- QueueFlowLimit *ptr = TestFlow::getQueueFlowLimit(args);
- BOOST_CHECK(ptr);
- std::auto_ptr<QueueFlowLimit> flow(ptr);
+ boost::shared_ptr<QueueFlowLimit> flow = TestFlow::getQueueFlowLimit(args);
+ BOOST_CHECK(flow);
BOOST_CHECK_EQUAL((uint32_t) 35000, flow->getFlowStopCount());
BOOST_CHECK_EQUAL((uint32_t) 30000, flow->getFlowResumeCount());
@@ -434,9 +431,8 @@ QPID_AUTO_TEST_CASE(testFlowOverrideDefa
97, // stop threshold
73); // resume threshold
FieldTable args;
- QueueFlowLimit *ptr = TestFlow::getQueueFlowLimit(args);
- BOOST_CHECK(ptr);
- std::auto_ptr<QueueFlowLimit> flow(ptr);
+ boost::shared_ptr<QueueFlowLimit> flow = TestFlow::getQueueFlowLimit(args);
+ BOOST_CHECK(flow);
BOOST_CHECK_EQUAL((uint32_t) 2861501, flow->getFlowStopSize());
BOOST_CHECK_EQUAL((uint32_t) 2153500, flow->getFlowResumeSize());
@@ -450,14 +446,9 @@ QPID_AUTO_TEST_CASE(testFlowDisable)
{
FieldTable args;
args.setInt(QueueFlowLimit::flowStopCountKey, 0);
- QueueFlowLimit *ptr = TestFlow::getQueueFlowLimit(args);
- BOOST_CHECK(!ptr);
- }
- {
- FieldTable args;
args.setInt(QueueFlowLimit::flowStopSizeKey, 0);
- QueueFlowLimit *ptr = TestFlow::getQueueFlowLimit(args);
- BOOST_CHECK(!ptr);
+ boost::shared_ptr<QueueFlowLimit> flow = TestFlow::getQueueFlowLimit(args);
+ BOOST_CHECK(!flow);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org