You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2013/10/03 15:31:12 UTC
svn commit: r1528852 - in /qpid/trunk/qpid: cpp/src/qpid/broker/
cpp/src/qpid/broker/amqp/ cpp/src/qpid/broker/amqp_0_10/ cpp/src/tests/
tests/src/py/qpid_tests/broker_0_10/
Author: gsim
Date: Thu Oct 3 13:31:12 2013
New Revision: 1528852
URL: http://svn.apache.org/r1528852
Log:
QPID-5199: take 0-10 header segment into account for message size
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/ThresholdAlerts.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.h
qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h
qpid/trunk/qpid/cpp/src/tests/MessageTest.cpp
qpid/trunk/qpid/cpp/src/tests/QueueFlowLimitTest.cpp
qpid/trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp
qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/stats.py
qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/threshold.py
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=1528852&r1=1528851&r2=1528852&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp Thu Oct 3 13:31:12 2013
@@ -135,7 +135,7 @@ void Exchange::doRoute(Deliverable& msg,
if (mgmtExchange != 0)
{
qmf::org::apache::qpid::broker::Exchange::PerThreadStats *eStats = mgmtExchange->getStatistics();
- uint64_t contentSize = msg.getMessage().getContentSize();
+ uint64_t contentSize = msg.getMessage().getMessageSize();
eStats->msgReceives += 1;
eStats->byteReceives += contentSize;
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=1528852&r1=1528851&r2=1528852&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Thu Oct 3 13:31:12 2013
@@ -71,9 +71,9 @@ bool Message::isPersistent() const
return getEncoding().isPersistent();
}
-uint64_t Message::getContentSize() const
+uint64_t Message::getMessageSize() const
{
- return getEncoding().getContentSize();
+ return getEncoding().getMessageSize();
}
boost::intrusive_ptr<AsyncCompletion> Message::getIngressCompletion() const
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=1528852&r1=1528851&r2=1528852&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Thu Oct 3 13:31:12 2013
@@ -68,7 +68,7 @@ public:
virtual std::string getRoutingKey() const = 0;
virtual bool isPersistent() const = 0;
virtual uint8_t getPriority() const = 0;
- virtual uint64_t getContentSize() const = 0;
+ virtual uint64_t getMessageSize() const = 0;
virtual qpid::amqp::MessageId getMessageId() const = 0;
virtual qpid::amqp::MessageId getCorrelationId() const = 0;
virtual std::string getPropertyAsString(const std::string& key) const = 0;
@@ -119,7 +119,7 @@ public:
QPID_BROKER_EXTERN qpid::types::Variant getProperty(const std::string& key) const;
void processProperties(qpid::amqp::MapHandler&) const;
- QPID_BROKER_EXTERN uint64_t getContentSize() const;
+ QPID_BROKER_EXTERN uint64_t getMessageSize() const;
QPID_BROKER_EXTERN Encoding& getEncoding();
QPID_BROKER_EXTERN const Encoding& getEncoding() const;
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1528852&r1=1528851&r2=1528852&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Thu Oct 3 13:31:12 2013
@@ -88,7 +88,7 @@ inline void mgntEnqStats(const Message&
_qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
_qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics();
- uint64_t contentSize = msg.getContentSize();
+ uint64_t contentSize = msg.getMessageSize();
qStats->msgTotalEnqueues +=1;
bStats->msgTotalEnqueues += 1;
qStats->byteTotalEnqueues += contentSize;
@@ -111,7 +111,7 @@ inline void mgntDeqStats(const Message&
if (mgmtObject != 0){
_qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
_qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics();
- uint64_t contentSize = msg.getContentSize();
+ uint64_t contentSize = msg.getMessageSize();
qStats->msgTotalDequeues += 1;
bStats->msgTotalDequeues += 1;
@@ -202,7 +202,7 @@ Queue::Queue(const string& _name, const
redirectSource(false)
{
current.setCount(0);//always track depth in messages
- if (settings.maxDepth.hasSize()) current.setSize(0);//track depth in bytes only if policy requires it
+ if (settings.maxDepth.getSize()) current.setSize(0);//track depth in bytes only if policy requires it
if (settings.traceExcludes.size()) {
split(traceExclude, settings.traceExcludes, ", ");
}
@@ -305,7 +305,7 @@ void Queue::deliverTo(Message msg, TxBuf
void Queue::recoverPrepared(const Message& msg)
{
Mutex::ScopedLock locker(messageLock);
- current += QueueDepth(1, msg.getContentSize());
+ current += QueueDepth(1, msg.getMessageSize());
}
void Queue::recover(Message& msg)
@@ -319,7 +319,7 @@ void Queue::process(Message& msg)
push(msg);
if (mgmtObject != 0){
_qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
- const uint64_t contentSize = msg.getContentSize();
+ const uint64_t contentSize = msg.getMessageSize();
qStats->msgTxnEnqueues += 1;
qStats->byteTxnEnqueues += contentSize;
mgmtObject->statisticsUpdated();
@@ -861,7 +861,7 @@ bool Queue::enqueue(TransactionContext*
{
Mutex::ScopedLock locker(messageLock);
- if (!checkDepth(QueueDepth(1, msg.getContentSize()), msg)) {
+ if (!checkDepth(QueueDepth(1, msg.getMessageSize()), msg)) {
return false;
}
}
@@ -891,7 +891,7 @@ void Queue::enqueueAborted(const Message
//Called when any transactional enqueue is aborted (including but
//not limited to a recovered dtx transaction)
Mutex::ScopedLock locker(messageLock);
- current -= QueueDepth(1, msg.getContentSize());
+ current -= QueueDepth(1, msg.getMessageSize());
}
void Queue::enqueueCommited(Message& msg)
@@ -919,7 +919,7 @@ void Queue::dequeueCommited(const Messag
observeDequeue(msg, locker, settings.autodelete ? &autodelete : 0);
if (mgmtObject != 0) {
mgmtObject->inc_msgTxnDequeues();
- mgmtObject->inc_byteTxnDequeues(msg.getContentSize());
+ mgmtObject->inc_byteTxnDequeues(msg.getMessageSize());
}
}
@@ -962,7 +962,7 @@ void Queue::dequeueCommitted(const Queue
Mutex::ScopedLock locker(messageLock);
Message* msg = messages->find(cursor);
if (msg) {
- const uint64_t contentSize = msg->getContentSize();
+ const uint64_t contentSize = msg->getMessageSize();
observeDequeue(*msg, locker, settings.autodelete ? &autodelete : 0);
if (mgmtObject != 0) {
mgmtObject->inc_msgTxnDequeues();
@@ -986,7 +986,7 @@ void Queue::dequeueCommitted(const Queue
*/
void Queue::observeDequeue(const Message& msg, const Mutex::ScopedLock& lock, ScopedAutoDelete* autodelete)
{
- current -= QueueDepth(1, msg.getContentSize());
+ current -= QueueDepth(1, msg.getMessageSize());
mgntDeqStats(msg, mgmtObject, brokerMgmtObject);
for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
try{
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp?rev=1528852&r1=1528851&r2=1528852&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp Thu Oct 3 13:31:12 2013
@@ -112,7 +112,7 @@ void QueueFlowLimit::enqueued(const Mess
sys::Mutex::ScopedLock l(indexLock);
++count;
- size += msg.getContentSize();
+ size += msg.getMessageSize();
if (!flowStopped) {
if (flowStopCount && count > flowStopCount) {
@@ -150,7 +150,7 @@ void QueueFlowLimit::dequeued(const Mess
throw Exception(QPID_MSG("Flow limit count underflow on dequeue. Queue=" << queueName));
}
- uint64_t _size = msg.getContentSize();
+ uint64_t _size = msg.getMessageSize();
if (_size <= size) {
size -= _size;
} else {
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/ThresholdAlerts.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/ThresholdAlerts.cpp?rev=1528852&r1=1528851&r2=1528852&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/ThresholdAlerts.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/ThresholdAlerts.cpp Thu Oct 3 13:31:12 2013
@@ -44,7 +44,7 @@ ThresholdAlerts::ThresholdAlerts(const s
void ThresholdAlerts::enqueued(const Message& m)
{
- size += m.getContentSize();
+ size += m.getMessageSize();
++count;
if (sizeGoingUp && sizeThreshold && size >= sizeThreshold) {
@@ -64,7 +64,7 @@ void ThresholdAlerts::enqueued(const Mes
void ThresholdAlerts::dequeued(const Message& m)
{
- size -= m.getContentSize();
+ size -= m.getMessageSize();
--count;
if (!sizeGoingUp && sizeThreshold && size <= sizeThresholdDown) {
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.cpp?rev=1528852&r1=1528851&r2=1528852&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.cpp Thu Oct 3 13:31:12 2013
@@ -202,11 +202,7 @@ std::string Message::getAnnotationAsStri
}
-//getContentSize() is primarily used in stats about the number of
-//bytes enqueued/dequeued etc, not sure whether this is the right name
-//and whether it should indeed only be the content that is thus
-//measured
-uint64_t Message::getContentSize() const { return data.size(); }
+uint64_t Message::getMessageSize() const { return data.size(); }
//getContent() is used primarily for decoding qmf messages in
//management and ha, but also by the xml exchange
std::string Message::getContent() const
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.h?rev=1528852&r1=1528851&r2=1528852&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.h Thu Oct 3 13:31:12 2013
@@ -44,7 +44,7 @@ class Message : public qpid::broker::Mes
std::string getRoutingKey() const;
bool isPersistent() const;
uint8_t getPriority() const;
- uint64_t getContentSize() const;
+ uint64_t getMessageSize() const;
std::string getPropertyAsString(const std::string& key) const;
std::string getAnnotationAsString(const std::string& key) const;
bool getTtl(uint64_t&) const;
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp?rev=1528852&r1=1528851&r2=1528852&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp Thu Oct 3 13:31:12 2013
@@ -52,6 +52,11 @@ uint64_t MessageTransfer::getContentSize
return frames.getContentSize();
}
+uint64_t MessageTransfer::getMessageSize() const
+{
+ return getRequiredCredit();
+}
+
std::string MessageTransfer::getAnnotationAsString(const std::string& key) const
{
const qpid::framing::MessageProperties* mp = getProperties<qpid::framing::MessageProperties>();
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h?rev=1528852&r1=1528851&r2=1528852&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h Thu Oct 3 13:31:12 2013
@@ -45,6 +45,7 @@ class MessageTransfer : public qpid::bro
bool isPersistent() const;
uint8_t getPriority() const;
uint64_t getContentSize() const;
+ uint64_t getMessageSize() const;
qpid::amqp::MessageId getMessageId() const;
qpid::amqp::MessageId getCorrelationId() const;
std::string getPropertyAsString(const std::string& key) const;
Modified: qpid/trunk/qpid/cpp/src/tests/MessageTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/MessageTest.cpp?rev=1528852&r1=1528851&r2=1528852&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/MessageTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/MessageTest.cpp Thu Oct 3 13:31:12 2013
@@ -65,7 +65,7 @@ QPID_AUTO_TEST_CASE(testEncodeDecode)
msg = registry.decode(buffer);
BOOST_CHECK_EQUAL(routingKey, msg.getRoutingKey());
- BOOST_CHECK_EQUAL((uint64_t) data.size(), msg.getContentSize());
+ BOOST_CHECK_EQUAL((uint64_t) data.size(), msg.getContent().size());
BOOST_CHECK_EQUAL(data, msg.getContent());
//BOOST_CHECK_EQUAL(messageId, msg->getProperties<MessageProperties>()->getMessageId());
BOOST_CHECK_EQUAL(string("xyz"), msg.getPropertyAsString("abc"));
Modified: qpid/trunk/qpid/cpp/src/tests/QueueFlowLimitTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/QueueFlowLimitTest.cpp?rev=1528852&r1=1528851&r2=1528852&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/QueueFlowLimitTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/QueueFlowLimitTest.cpp Thu Oct 3 13:31:12 2013
@@ -77,8 +77,14 @@ public:
Message createMessage(uint32_t size)
{
static uint32_t seqNum;
- Message msg = MessageUtils::createMessage(qpid::types::Variant::Map(), std::string (size, 'x'));
- msg.setSequence(++seqNum);
+ //Need to compute what data size is required to make a given
+ //overall size (use one byte of content in test message to ensure
+ //content frame is added)
+ Message test = MessageUtils::createMessage(qpid::types::Variant::Map(), std::string("x"));
+ size_t min = test.getMessageSize() - 1;
+ if (min > size) throw qpid::Exception("Can't create message that small!");
+ Message msg = MessageUtils::createMessage(qpid::types::Variant::Map(), std::string (size - min, 'x'));
+ msg.setSequence(++seqNum);//this doesn't affect message size
return msg;
}
}
@@ -100,18 +106,18 @@ QPID_AUTO_TEST_CASE(testFlowCount)
std::deque<Message> msgs;
for (size_t i = 0; i < 6; i++) {
- msgs.push_back(createMessage(10));
+ msgs.push_back(createMessage(100));
flow->enqueued(msgs.back());
BOOST_CHECK(!flow->isFlowControlActive());
}
BOOST_CHECK(!flow->isFlowControlActive()); // 6 on queue
- msgs.push_back(createMessage(10));
+ msgs.push_back(createMessage(100));
flow->enqueued(msgs.back());
BOOST_CHECK(!flow->isFlowControlActive()); // 7 on queue
- msgs.push_back(createMessage(10));
+ msgs.push_back(createMessage(100));
flow->enqueued(msgs.back());
BOOST_CHECK(flow->isFlowControlActive()); // 8 on queue, ON
- msgs.push_back(createMessage(10));
+ msgs.push_back(createMessage(100));
flow->enqueued(msgs.back());
BOOST_CHECK(flow->isFlowControlActive()); // 9 on queue, no change to flow control
@@ -136,69 +142,69 @@ QPID_AUTO_TEST_CASE(testFlowCount)
QPID_AUTO_TEST_CASE(testFlowSize)
{
FieldTable args;
- args.setUInt64(QueueFlowLimit::flowStopSizeKey, 70);
- args.setUInt64(QueueFlowLimit::flowResumeSizeKey, 50);
+ args.setUInt64(QueueFlowLimit::flowStopSizeKey, 700);
+ args.setUInt64(QueueFlowLimit::flowResumeSizeKey, 460);
std::auto_ptr<TestFlow> flow(TestFlow::createTestFlow(args));
BOOST_CHECK_EQUAL((uint32_t) 0, flow->getFlowStopCount());
BOOST_CHECK_EQUAL((uint32_t) 0, flow->getFlowResumeCount());
- BOOST_CHECK_EQUAL((uint32_t) 70, flow->getFlowStopSize());
- BOOST_CHECK_EQUAL((uint32_t) 50, flow->getFlowResumeSize());
+ BOOST_CHECK_EQUAL((uint32_t) 700, flow->getFlowStopSize());
+ BOOST_CHECK_EQUAL((uint32_t) 460, flow->getFlowResumeSize());
BOOST_CHECK(!flow->isFlowControlActive());
BOOST_CHECK(flow->monitorFlowControl());
std::deque<Message> msgs;
for (size_t i = 0; i < 6; i++) {
- msgs.push_back(createMessage(10));
+ msgs.push_back(createMessage(100));
flow->enqueued(msgs.back());
BOOST_CHECK(!flow->isFlowControlActive());
}
- BOOST_CHECK(!flow->isFlowControlActive()); // 60 on queue
+ BOOST_CHECK(!flow->isFlowControlActive()); // 600 on queue
BOOST_CHECK_EQUAL(6u, flow->getFlowCount());
- BOOST_CHECK_EQUAL(60u, flow->getFlowSize());
+ BOOST_CHECK_EQUAL(600u, flow->getFlowSize());
- Message msg_9 = createMessage(9);
- flow->enqueued(msg_9);
- BOOST_CHECK(!flow->isFlowControlActive()); // 69 on queue
- Message tinyMsg_1 = createMessage(1);
+ Message msg_50 = createMessage(50);
+ flow->enqueued(msg_50);
+ BOOST_CHECK(!flow->isFlowControlActive()); // 650 on queue
+ Message tinyMsg_1 = createMessage(40);
flow->enqueued(tinyMsg_1);
- BOOST_CHECK(!flow->isFlowControlActive()); // 70 on queue
+ BOOST_CHECK(!flow->isFlowControlActive()); // 690 on queue
- Message tinyMsg_2 = createMessage(1);
+ Message tinyMsg_2 = createMessage(40);
flow->enqueued(tinyMsg_2);
- BOOST_CHECK(flow->isFlowControlActive()); // 71 on queue, ON
- msgs.push_back(createMessage(10));
+ BOOST_CHECK(flow->isFlowControlActive()); // 730 on queue, ON
+ msgs.push_back(createMessage(100));
flow->enqueued(msgs.back());
- BOOST_CHECK(flow->isFlowControlActive()); // 81 on queue
+ BOOST_CHECK(flow->isFlowControlActive()); // 830 on queue
BOOST_CHECK_EQUAL(10u, flow->getFlowCount());
- BOOST_CHECK_EQUAL(81u, flow->getFlowSize());
+ BOOST_CHECK_EQUAL(830u, flow->getFlowSize());
flow->dequeued(msgs.front());
msgs.pop_front();
- BOOST_CHECK(flow->isFlowControlActive()); // 71 on queue
+ BOOST_CHECK(flow->isFlowControlActive()); // 730 on queue
flow->dequeued(msgs.front());
msgs.pop_front();
- BOOST_CHECK(flow->isFlowControlActive()); // 61 on queue
+ BOOST_CHECK(flow->isFlowControlActive()); // 630 on queue
flow->dequeued(msgs.front());
msgs.pop_front();
- BOOST_CHECK(flow->isFlowControlActive()); // 51 on queue
+ BOOST_CHECK(flow->isFlowControlActive()); // 530 on queue
flow->dequeued(tinyMsg_1);
- BOOST_CHECK(flow->isFlowControlActive()); // 50 on queue
+ BOOST_CHECK(flow->isFlowControlActive()); // 490 on queue
flow->dequeued(tinyMsg_2);
- BOOST_CHECK(!flow->isFlowControlActive()); // 49 on queue, OFF
+ BOOST_CHECK(!flow->isFlowControlActive()); // 450 on queue, OFF
- flow->dequeued(msg_9);
- BOOST_CHECK(!flow->isFlowControlActive()); // 40 on queue
+ flow->dequeued(msg_50);
+ BOOST_CHECK(!flow->isFlowControlActive()); // 400 on queue
flow->dequeued(msgs.front());
msgs.pop_front();
- BOOST_CHECK(!flow->isFlowControlActive()); // 30 on queue
+ BOOST_CHECK(!flow->isFlowControlActive()); // 300 on queue
flow->dequeued(msgs.front());
msgs.pop_front();
- BOOST_CHECK(!flow->isFlowControlActive()); // 20 on queue
+ BOOST_CHECK(!flow->isFlowControlActive()); // 200 on queue
BOOST_CHECK_EQUAL(2u, flow->getFlowCount());
- BOOST_CHECK_EQUAL(20u, flow->getFlowSize());
+ BOOST_CHECK_EQUAL(200u, flow->getFlowSize());
}
QPID_AUTO_TEST_CASE(testFlowArgs)
@@ -227,13 +233,13 @@ QPID_AUTO_TEST_CASE(testFlowCombo)
FieldTable args;
args.setInt(QueueFlowLimit::flowStopCountKey, 10);
args.setInt(QueueFlowLimit::flowResumeCountKey, 5);
- args.setUInt64(QueueFlowLimit::flowStopSizeKey, 200);
- args.setUInt64(QueueFlowLimit::flowResumeSizeKey, 100);
+ args.setUInt64(QueueFlowLimit::flowStopSizeKey, 2000);
+ args.setUInt64(QueueFlowLimit::flowResumeSizeKey, 1000);
- std::deque<Message> msgs_1;
- std::deque<Message> msgs_10;
std::deque<Message> msgs_50;
std::deque<Message> msgs_100;
+ std::deque<Message> msgs_500;
+ std::deque<Message> msgs_1000;
Message msg;
@@ -243,104 +249,104 @@ QPID_AUTO_TEST_CASE(testFlowCombo)
// verify flow control comes ON when only count passes its stop point.
for (size_t i = 0; i < 10; i++) {
- msgs_10.push_back(createMessage(10));
- flow->enqueued(msgs_10.back());
+ msgs_100.push_back(createMessage(100));
+ flow->enqueued(msgs_100.back());
BOOST_CHECK(!flow->isFlowControlActive());
}
- // count:10 size:100
+ // count:10 size:1000
- msgs_1.push_back(createMessage(1));
- flow->enqueued(msgs_1.back()); // count:11 size: 101 ->ON
+ msgs_50.push_back(createMessage(50));
+ flow->enqueued(msgs_50.back()); // count:11 size: 1050 ->ON
BOOST_CHECK(flow->isFlowControlActive());
for (size_t i = 0; i < 6; i++) {
- flow->dequeued(msgs_10.front());
- msgs_10.pop_front();
+ flow->dequeued(msgs_100.front());
+ msgs_100.pop_front();
BOOST_CHECK(flow->isFlowControlActive());
}
- // count:5 size: 41
+ // count:5 size: 450
- flow->dequeued(msgs_1.front()); // count: 4 size: 40 ->OFF
- msgs_1.pop_front();
+ flow->dequeued(msgs_50.front()); // count: 4 size: 400 ->OFF
+ msgs_50.pop_front();
BOOST_CHECK(!flow->isFlowControlActive());
for (size_t i = 0; i < 4; i++) {
- flow->dequeued(msgs_10.front());
- msgs_10.pop_front();
+ flow->dequeued(msgs_100.front());
+ msgs_100.pop_front();
BOOST_CHECK(!flow->isFlowControlActive());
}
// count:0 size:0
// verify flow control comes ON when only size passes its stop point.
- msgs_100.push_back(createMessage(100));
- flow->enqueued(msgs_100.back()); // count:1 size: 100
+ msgs_1000.push_back(createMessage(1000));
+ flow->enqueued(msgs_1000.back()); // count:1 size: 1000
BOOST_CHECK(!flow->isFlowControlActive());
- msgs_50.push_back(createMessage(50));
- flow->enqueued(msgs_50.back()); // count:2 size: 150
+ msgs_500.push_back(createMessage(500));
+ flow->enqueued(msgs_500.back()); // count:2 size: 1500
BOOST_CHECK(!flow->isFlowControlActive());
- msgs_50.push_back(createMessage(50));
- flow->enqueued(msgs_50.back()); // count:3 size: 200
+ msgs_500.push_back(createMessage(500));
+ flow->enqueued(msgs_500.back()); // count:3 size: 2000
BOOST_CHECK(!flow->isFlowControlActive());
- msgs_1.push_back(createMessage(1));
- flow->enqueued(msgs_1.back()); // count:4 size: 201 ->ON
+ msgs_50.push_back(createMessage(50));
+ flow->enqueued(msgs_50.back()); // count:4 size: 2050 ->ON
BOOST_CHECK(flow->isFlowControlActive());
- flow->dequeued(msgs_100.front()); // count:3 size:101
- msgs_100.pop_front();
+ flow->dequeued(msgs_1000.front()); // count:3 size:1050
+ msgs_1000.pop_front();
BOOST_CHECK(flow->isFlowControlActive());
- flow->dequeued(msgs_1.front()); // count:2 size:100
- msgs_1.pop_front();
+ flow->dequeued(msgs_50.front()); // count:2 size:1000
+ msgs_50.pop_front();
BOOST_CHECK(flow->isFlowControlActive());
- flow->dequeued(msgs_50.front()); // count:1 size:50 ->OFF
- msgs_50.pop_front();
+ flow->dequeued(msgs_500.front()); // count:1 size:500 ->OFF
+ msgs_500.pop_front();
BOOST_CHECK(!flow->isFlowControlActive());
// verify flow control remains ON until both thresholds drop below their
// resume point.
for (size_t i = 0; i < 8; i++) {
- msgs_10.push_back(createMessage(10));
- flow->enqueued(msgs_10.back());
+ msgs_100.push_back(createMessage(100));
+ flow->enqueued(msgs_100.back());
BOOST_CHECK(!flow->isFlowControlActive());
}
- // count:9 size:130
+ // count:9 size:1300
- msgs_10.push_back(createMessage(10));
- flow->enqueued(msgs_10.back()); // count:10 size: 140
+ msgs_100.push_back(createMessage(100));
+ flow->enqueued(msgs_100.back()); // count:10 size: 1400
BOOST_CHECK(!flow->isFlowControlActive());
- msgs_1.push_back(createMessage(1));
- flow->enqueued(msgs_1.back()); // count:11 size: 141 ->ON
+ msgs_50.push_back(createMessage(50));
+ flow->enqueued(msgs_50.back()); // count:11 size: 1450 ->ON
BOOST_CHECK(flow->isFlowControlActive());
- msgs_100.push_back(createMessage(100));
- flow->enqueued(msgs_100.back()); // count:12 size: 241 (both thresholds crossed)
+ msgs_1000.push_back(createMessage(1000));
+ flow->enqueued(msgs_1000.back()); // count:12 size: 2450 (both thresholds crossed)
BOOST_CHECK(flow->isFlowControlActive());
- // at this point: 9@10 + 1@50 + 1@100 + 1@1 == 12@241
+ // at this point: 9@100 + 1@500 + 1@1000 + 1@50 == 12@2450
- flow->dequeued(msgs_50.front()); // count:11 size:191
- msgs_50.pop_front();
+ flow->dequeued(msgs_500.front()); // count:11 size:1950
+ msgs_500.pop_front();
BOOST_CHECK(flow->isFlowControlActive());
for (size_t i = 0; i < 9; i++) {
- flow->dequeued(msgs_10.front());
- msgs_10.pop_front();
+ flow->dequeued(msgs_100.front());
+ msgs_100.pop_front();
BOOST_CHECK(flow->isFlowControlActive());
}
- // count:2 size:101
- flow->dequeued(msgs_1.front()); // count:1 size:100
- msgs_1.pop_front();
+ // count:2 size:1050
+ flow->dequeued(msgs_50.front()); // count:1 size:1000
+ msgs_50.pop_front();
BOOST_CHECK(flow->isFlowControlActive()); // still active due to size
- flow->dequeued(msgs_100.front()); // count:0 size:0 ->OFF
- msgs_100.pop_front();
+ flow->dequeued(msgs_1000.front()); // count:0 size:0 ->OFF
+ msgs_1000.pop_front();
BOOST_CHECK(!flow->isFlowControlActive());
}
Modified: qpid/trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp?rev=1528852&r1=1528851&r2=1528852&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp Thu Oct 3 13:31:12 2013
@@ -69,9 +69,15 @@ QPID_AUTO_TEST_CASE(testRingPolicyCount)
QPID_AUTO_TEST_CASE(testRingPolicySize)
{
- std::string hundredBytes = std::string(100, 'h');
- std::string fourHundredBytes = std::string (400, 'f');
- std::string thousandBytes = std::string(1000, 't');
+ //The message size now includes all headers as well as the content
+ //aka body, so compute the amount of data needed to hit a given
+ //overall size
+ std::string q("my-ring-queue");
+ size_t minMessageSize = 25/*minimum size of headers*/ + q.size()/*routing key length*/ + 4/*default exchange, added by broker*/;
+
+ std::string hundredBytes = std::string(100 - minMessageSize, 'h');
+ std::string fourHundredBytes = std::string (400 - minMessageSize, 'f');
+ std::string thousandBytes = std::string(1000 - minMessageSize, 't');
// Ring queue, 500 bytes maxSize
@@ -79,7 +85,6 @@ QPID_AUTO_TEST_CASE(testRingPolicySize)
args.setSizePolicy(RING, 500, 0);
SessionFixture f;
- std::string q("my-ring-queue");
f.session.queueDeclare(arg::queue=q, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args);
// A. Send messages 0 .. 5, each 100 bytes
Modified: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/stats.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/stats.py?rev=1528852&r1=1528851&r2=1528852&view=diff
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/stats.py (original)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/stats.py Thu Oct 3 13:31:12 2013
@@ -67,34 +67,36 @@ class BrokerStatsTests(Base):
start_broker = agent.getBroker()
agent.addExchange("direct", "stats-test-exchange")
- sess = self.setup_session()
- tx_a = sess.sender("stats-test-exchange/a")
- tx_b = sess.sender("stats-test-exchange/b")
- rx_a = sess.receiver("stats-test-exchange/a")
-
- exchange = agent.getExchange("stats-test-exchange")
- self.failUnless(exchange, "expected a valid exchange object")
- self.assertEqual(exchange.msgReceives, 0, "msgReceives")
- self.assertEqual(exchange.msgDrops, 0, "msgDrops")
- self.assertEqual(exchange.msgRoutes, 0, "msgRoutes")
- self.assertEqual(exchange.byteReceives, 0, "byteReceives")
- self.assertEqual(exchange.byteDrops, 0, "byteDrops")
- self.assertEqual(exchange.byteRoutes, 0, "byteRoutes")
-
- tx_a.send("0123456789")
- tx_b.send("01234567890123456789")
- tx_a.send("012345678901234567890123456789")
- tx_b.send("0123456789012345678901234567890123456789")
-
- exchange.update()
- self.assertEqual(exchange.msgReceives, 4, "msgReceives")
- self.assertEqual(exchange.msgDrops, 2, "msgDrops")
- self.assertEqual(exchange.msgRoutes, 2, "msgRoutes")
- self.assertEqual(exchange.byteReceives, 100, "byteReceives")
- self.assertEqual(exchange.byteDrops, 60, "byteDrops")
- self.assertEqual(exchange.byteRoutes, 40, "byteRoutes")
-
- agent.delExchange("stats-test-exchange")
+ try:
+ sess = self.setup_session()
+ tx_a = sess.sender("stats-test-exchange/a")
+ tx_b = sess.sender("stats-test-exchange/b")
+ rx_a = sess.receiver("stats-test-exchange/a")
+
+ exchange = agent.getExchange("stats-test-exchange")
+ self.failUnless(exchange, "expected a valid exchange object")
+ self.assertEqual(exchange.msgReceives, 0, "msgReceives")
+ self.assertEqual(exchange.msgDrops, 0, "msgDrops")
+ self.assertEqual(exchange.msgRoutes, 0, "msgRoutes")
+ self.assertEqual(exchange.byteReceives, 0, "byteReceives")
+ self.assertEqual(exchange.byteDrops, 0, "byteDrops")
+ self.assertEqual(exchange.byteRoutes, 0, "byteRoutes")
+
+ tx_a.send("0123456789")
+ tx_b.send("01234567890123456789")
+ tx_a.send("012345678901234567890123456789")
+ tx_b.send("0123456789012345678901234567890123456789")
+
+ overhead = 63 #overhead added to message from headers
+ exchange.update()
+ self.assertEqual(exchange.msgReceives, 4, "msgReceives")
+ self.assertEqual(exchange.msgDrops, 2, "msgDrops")
+ self.assertEqual(exchange.msgRoutes, 2, "msgRoutes")
+ self.assertEqual(exchange.byteReceives, 100+(4*overhead), "byteReceives")
+ self.assertEqual(exchange.byteDrops, 60+(2*overhead), "byteDrops")
+ self.assertEqual(exchange.byteRoutes, 40+(2*overhead), "byteRoutes")
+ finally:
+ agent.delExchange("stats-test-exchange")
def test_enqueues_dequeues(self):
agent = self.setup_access()
@@ -117,14 +119,15 @@ class BrokerStatsTests(Base):
tx.send("01234567890123456789")
tx.send("012345678901234567890123456789")
tx.send("0123456789012345678901234567890123456789")
+ overhead = 38 #overhead added to message from headers
queue.update()
self.assertEqual(queue.msgTotalEnqueues, 4, "msgTotalEnqueues")
- self.assertEqual(queue.byteTotalEnqueues, 100, "byteTotalEnqueues")
+ self.assertEqual(queue.byteTotalEnqueues, 100+(4*overhead), "byteTotalEnqueues")
self.assertEqual(queue.msgTotalDequeues, 0, "msgTotalDequeues")
self.assertEqual(queue.byteTotalDequeues, 0, "byteTotalDequeues")
self.assertEqual(queue.msgDepth, 4, "msgDepth")
- self.assertEqual(queue.byteDepth, 100, "byteDepth")
+ self.assertEqual(queue.byteDepth, 100+(4*overhead), "byteDepth")
now_broker = agent.getBroker()
self.failUnless((now_broker.msgTotalEnqueues - start_broker.msgTotalEnqueues) >= 4, "broker msgTotalEnqueues")
@@ -136,11 +139,11 @@ class BrokerStatsTests(Base):
queue.update()
self.assertEqual(queue.msgTotalEnqueues, 4, "msgTotalEnqueues")
- self.assertEqual(queue.byteTotalEnqueues, 100, "byteTotalEnqueues")
+ self.assertEqual(queue.byteTotalEnqueues, 100+(4*overhead), "byteTotalEnqueues")
self.assertEqual(queue.msgTotalDequeues, 2, "msgTotalDequeues")
- self.assertEqual(queue.byteTotalDequeues, 30, "byteTotalDequeues")
+ self.assertEqual(queue.byteTotalDequeues, 30+(2*overhead), "byteTotalDequeues")
self.assertEqual(queue.msgDepth, 2, "msgDepth")
- self.assertEqual(queue.byteDepth, 70, "byteDepth")
+ self.assertEqual(queue.byteDepth, 70+(2*overhead), "byteDepth")
now_broker = agent.getBroker()
self.failUnless((now_broker.msgTotalDequeues - start_broker.msgTotalDequeues) >= 2, "broker msgTotalDequeues")
@@ -165,6 +168,7 @@ class BrokerStatsTests(Base):
tx.send("0123456789")
tx.send("0123456789")
tx.send("0123456789")
+ overhead = 41 #overhead added to message from headers
queue = agent.getQueue("tx_enqueue_test")
self.failUnless(queue, "expected a valid queue object")
@@ -180,9 +184,9 @@ class BrokerStatsTests(Base):
sess.commit()
queue.update()
self.assertEqual(queue.msgTotalEnqueues, 4, "msgTotalEnqueues post-tx-commit")
- self.assertEqual(queue.byteTotalEnqueues, 40, "byteTotalEnqueues post-tx-commit")
+ self.assertEqual(queue.byteTotalEnqueues, 40+(4*overhead), "byteTotalEnqueues post-tx-commit")
self.assertEqual(queue.msgTxnEnqueues, 4, "msgTxnEnqueues post-tx-commit")
- self.assertEqual(queue.byteTxnEnqueues, 40, "byteTxnEnqueues post-tx-commit")
+ self.assertEqual(queue.byteTxnEnqueues, 40+(4*overhead), "byteTxnEnqueues post-tx-commit")
self.assertEqual(queue.msgTotalDequeues, 0, "msgTotalDequeues post-tx-commit")
self.assertEqual(queue.byteTotalDequeues, 0, "byteTotalDequeues post-tx-commit")
self.assertEqual(queue.msgTxnDequeues, 0, "msgTxnDequeues post-tx-commit")
@@ -198,9 +202,9 @@ class BrokerStatsTests(Base):
queue.update()
self.assertEqual(queue.msgTotalEnqueues, 4, "msgTotalEnqueues pre-rx-commit")
- self.assertEqual(queue.byteTotalEnqueues, 40, "byteTotalEnqueues pre-rx-commit")
+ self.assertEqual(queue.byteTotalEnqueues, 40+(4*overhead), "byteTotalEnqueues pre-rx-commit")
self.assertEqual(queue.msgTxnEnqueues, 4, "msgTxnEnqueues pre-rx-commit")
- self.assertEqual(queue.byteTxnEnqueues, 40, "byteTxnEnqueues pre-rx-commit")
+ self.assertEqual(queue.byteTxnEnqueues, 40+(4*overhead), "byteTxnEnqueues pre-rx-commit")
self.assertEqual(queue.msgTotalDequeues, 0, "msgTotalDequeues pre-rx-commit")
self.assertEqual(queue.byteTotalDequeues, 0, "byteTotalDequeues pre-rx-commit")
self.assertEqual(queue.msgTxnDequeues, 0, "msgTxnDequeues pre-rx-commit")
@@ -211,22 +215,22 @@ class BrokerStatsTests(Base):
queue.update()
self.assertEqual(queue.msgTotalEnqueues, 4, "msgTotalEnqueues post-rx-commit")
- self.assertEqual(queue.byteTotalEnqueues, 40, "byteTotalEnqueues post-rx-commit")
+ self.assertEqual(queue.byteTotalEnqueues, 40+(4*overhead), "byteTotalEnqueues post-rx-commit")
self.assertEqual(queue.msgTxnEnqueues, 4, "msgTxnEnqueues post-rx-commit")
- self.assertEqual(queue.byteTxnEnqueues, 40, "byteTxnEnqueues post-rx-commit")
+ self.assertEqual(queue.byteTxnEnqueues, 40+(4*overhead), "byteTxnEnqueues post-rx-commit")
self.assertEqual(queue.msgTotalDequeues, 4, "msgTotalDequeues post-rx-commit")
- self.assertEqual(queue.byteTotalDequeues, 40, "byteTotalDequeues post-rx-commit")
+ self.assertEqual(queue.byteTotalDequeues, 40+(4*overhead), "byteTotalDequeues post-rx-commit")
self.assertEqual(queue.msgTxnDequeues, 4, "msgTxnDequeues post-rx-commit")
- self.assertEqual(queue.byteTxnDequeues, 40, "byteTxnDequeues post-rx-commit")
+ self.assertEqual(queue.byteTxnDequeues, 40+(4*overhead), "byteTxnDequeues post-rx-commit")
sess.close()
sess2.close()
now_broker = agent.getBroker()
self.assertEqual(now_broker.msgTxnEnqueues - start_broker.msgTxnEnqueues, 4, "broker msgTxnEnqueues")
- self.assertEqual(now_broker.byteTxnEnqueues - start_broker.byteTxnEnqueues, 40, "broker byteTxnEnqueues")
+ self.assertEqual(now_broker.byteTxnEnqueues - start_broker.byteTxnEnqueues, 40+(4*overhead), "broker byteTxnEnqueues")
self.assertEqual(now_broker.msgTxnDequeues - start_broker.msgTxnDequeues, 4, "broker msgTxnDequeues")
- self.assertEqual(now_broker.byteTxnDequeues - start_broker.byteTxnDequeues, 40, "broker byteTxnDequeues")
+ self.assertEqual(now_broker.byteTxnDequeues - start_broker.byteTxnDequeues, 40+(4*overhead), "broker byteTxnDequeues")
def test_discards_no_route(self):
Modified: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/threshold.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/threshold.py?rev=1528852&r1=1528851&r2=1528852&view=diff
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/threshold.py (original)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/threshold.py Thu Oct 3 13:31:12 2013
@@ -65,12 +65,13 @@ class ThresholdTests (Base):
rcvBw = self.ssn.receiver("qmf.default.topic/agent.ind.event.org_apache_qpid_broker.queueThresholdExceeded.#")
snd = self.ssn.sender("ttq; {create:always, node: {x-declare:{auto_delete:True,exclusive:True,arguments:{%s}}}}" % astr)
rcv = self.ssn.receiver("ttq")
+ overhead = 29 #additional bytes in broker's view of message size from headers etc
size = 0
count = 0
for m in messages:
snd.send(m)
count = count + 1
- size = size + len(m.content)
+ size = size + len(m.content) + overhead
event = rcvUp.fetch(timeout=1)
schema = event.content[0]["_schema_id"]
assert schema["_class_name"] == "queueThresholdCrossedUpward"
@@ -92,7 +93,7 @@ class ThresholdTests (Base):
m = rcv.fetch(timeout=1)
self.ssn.acknowledge()
count -= 1
- size -= len(m.content)
+ size -= (len(m.content) + overhead)
event = rcvDn.fetch(timeout=1)
schema = event.content[0]["_schema_id"]
assert schema["_class_name"] == "queueThresholdCrossedDownward"
@@ -111,7 +112,7 @@ class ThresholdTests (Base):
self.do_threshold_test(a, [Message("msg-%s" % i) for i in range(5)], 2)
def test_alert_size(self):
- a = {'qpid.alert_size_up':25,'qpid.alert_size_down':15}
+ a = {'qpid.alert_size_up':150,'qpid.alert_size_down':120}
self.do_threshold_test(a, [Message("msg-%s" % i) for i in range(5)], 2)
def test_alert_count_alias(self):
@@ -119,7 +120,7 @@ class ThresholdTests (Base):
self.do_threshold_test(a, [Message("msg-%s" % i) for i in range(10)], 0, True)
def test_alert_size_alias(self):
- a = {'x-qpid-maximum-message-size':15}
+ a = {'x-qpid-maximum-message-size':100}
self.do_threshold_test(a, [Message("msg-%s" % i) for i in range(3)], 0, True)
def test_alert_on_alert_queue(self):
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org