You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2008/07/25 15:50:32 UTC
svn commit: r679805 - in /incubator/qpid/trunk/qpid/cpp/src:
qpid/broker/Queue.cpp qpid/broker/Queue.h qpid/broker/QueuePolicy.cpp
qpid/broker/QueuePolicy.h qpid/broker/SemanticState.cpp tests/QueueTest.cpp
tests/TxPublishTest.cpp
Author: gsim
Date: Fri Jul 25 06:50:32 2008
New Revision: 679805
URL: http://svn.apache.org/viewvc?rev=679805&view=rev
Log:
Only reduce count and size maintained for queue plicy when messages are actually dequeued (i.e. acked).
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/TxPublishTest.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=679805&r1=679804&r2=679805&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Fri Jul 25 06:50:32 2008
@@ -230,7 +230,7 @@
if (c.filter(msg.payload)) {
if (c.accept(msg.payload)) {
m = msg;
- pop();
+ messages.pop_front();
return true;
} else {
//message(s) are available but consumer hasn't got enough credit
@@ -361,13 +361,13 @@
mgmtObject->dec_consumerCount ();
}
-QueuedMessage Queue::dequeue(){
+QueuedMessage Queue::get(){
Mutex::ScopedLock locker(messageLock);
QueuedMessage msg(this);
if(!messages.empty()){
msg = messages.front();
- pop();
+ messages.pop_front();
}
return msg;
}
@@ -376,35 +376,11 @@
Mutex::ScopedLock locker(messageLock);
int count = messages.size();
while(!messages.empty()) {
- QueuedMessage& msg = messages.front();
- if (store && msg.payload->isPersistent()) {
- boost::intrusive_ptr<PersistableMessage> pmsg =
- boost::static_pointer_cast<PersistableMessage>(msg.payload);
- store->dequeue(0, pmsg, *this);
- }
- pop();
+ popAndDequeue();
}
return count;
}
-/**
- * Assumes messageLock is held
- */
-void Queue::pop(){
- QueuedMessage& msg = messages.front();
-
- if (policy.get()) policy->dequeued(msg.payload->contentSize());
- if (mgmtObject != 0){
- mgmtObject->inc_msgTotalDequeues ();
- mgmtObject->inc_byteTotalDequeues (msg.payload->contentSize());
- if (msg.payload->isPersistent ()){
- mgmtObject->inc_msgPersistDequeues ();
- mgmtObject->inc_bytePersistDequeues (msg.payload->contentSize());
- }
- }
- messages.pop_front();
-}
-
void Queue::push(boost::intrusive_ptr<Message>& msg){
Mutex::ScopedLock locker(messageLock);
messages.push_back(QueuedMessage(this, msg, ++sequence));
@@ -421,7 +397,7 @@
} else {
QPID_LOG(error, "Message " << msg << " on " << name
<< " exceeds the policy for the queue but can't be released from memory as the queue is not durable");
- throw ResourceLimitExceededException(QPID_MSG("Policy exceeded for " << name));
+ throw ResourceLimitExceededException(QPID_MSG("Policy exceeded for " << name << " " << *policy));
}
} else {
if (policyExceeded) {
@@ -475,6 +451,10 @@
// return true if store exists,
bool Queue::dequeue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg)
{
+ {
+ Mutex::ScopedLock locker(messageLock);
+ dequeued(msg);
+ }
if (msg->isPersistent() && store) {
msg->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg);
@@ -485,6 +465,34 @@
return false;
}
+/**
+ * Removes a message from the in-memory delivery queue as well
+ * dequeing it from the logical (and persistent if applicable) queue
+ */
+void Queue::popAndDequeue()
+{
+ boost::intrusive_ptr<Message> msg = messages.front().payload;
+ messages.pop_front();
+ dequeue(0, msg);
+}
+
+/**
+ * Updates policy and management when a message has been dequeued,
+ * expects messageLock to be held
+ */
+void Queue::dequeued(boost::intrusive_ptr<Message>& msg)
+{
+ if (policy.get()) policy->dequeued(msg->contentSize());
+ if (mgmtObject != 0){
+ mgmtObject->inc_msgTotalDequeues ();
+ mgmtObject->inc_byteTotalDequeues (msg->contentSize());
+ if (msg->isPersistent ()){
+ mgmtObject->inc_msgPersistDequeues ();
+ mgmtObject->inc_bytePersistDequeues (msg->contentSize());
+ }
+ }
+}
+
namespace
{
@@ -534,7 +542,7 @@
DeliverableMessage msg(messages.front().payload);
alternateExchange->route(msg, msg.getMessage().getRoutingKey(),
msg.getMessage().getApplicationHeaders());
- pop();
+ popAndDequeue();
}
alternateExchange->decAlternateUsers();
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=679805&r1=679804&r2=679805&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Fri Jul 25 06:50:32 2008
@@ -98,7 +98,6 @@
framing::SequenceNumber sequence;
management::Queue* mgmtObject;
- void pop();
void push(boost::intrusive_ptr<Message>& msg);
void setPolicy(std::auto_ptr<QueuePolicy> policy);
bool seek(QueuedMessage& msg, Consumer& position);
@@ -112,6 +111,9 @@
bool isExcluded(boost::intrusive_ptr<Message>& msg);
+ void dequeued(boost::intrusive_ptr<Message>& msg);
+ void popAndDequeue();
+
public:
virtual void notifyDurableIOComplete();
typedef boost::shared_ptr<Queue> shared_ptr;
@@ -178,10 +180,11 @@
* dequeue from store (only done once messages is acknowledged)
*/
bool dequeue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg);
+
/**
- * dequeues from memory only
+ * Gets the next available message
*/
- QueuedMessage dequeue();
+ QueuedMessage get();
const QueuePolicy* getPolicy();
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp?rev=679805&r1=679804&r2=679805&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp Fri Jul 25 06:50:32 2008
@@ -71,3 +71,18 @@
const std::string QueuePolicy::maxSizeKey("qpid.max_size");
uint64_t QueuePolicy::defaultMaxSize(0);
+namespace qpid {
+ namespace broker {
+
+std::ostream& operator<<(std::ostream& out, const QueuePolicy& p)
+{
+ if (p.maxSize) out << "size: max=" << p.maxSize << ", current=" << p.size;
+ else out << "size unlimited, current=" << p.size;
+ out << "; ";
+ if (p.maxCount) out << "count: max=" << p.maxCount << ", current=" << p.count;
+ else out << "count unlimited, current=" << p.count;
+ return out;
+}
+
+ }
+}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.h?rev=679805&r1=679804&r2=679805&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.h Fri Jul 25 06:50:32 2008
@@ -21,6 +21,7 @@
#ifndef _QueuePolicy_
#define _QueuePolicy_
+#include <iostream>
#include "qpid/framing/FieldTable.h"
namespace qpid {
@@ -50,6 +51,7 @@
uint64_t getMaxSize() const { return maxSize; }
static void setDefaultMaxSize(uint64_t);
+ friend std::ostream& operator<<(std::ostream&, const QueuePolicy&);
};
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=679805&r1=679804&r2=679805&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Fri Jul 25 06:50:32 2008
@@ -429,7 +429,7 @@
bool SemanticState::get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected)
{
- QueuedMessage msg = queue->dequeue();
+ QueuedMessage msg = queue->get();
if(msg.payload){
DeliveryId myDeliveryTag = deliveryAdapter.deliver(msg, token);
if(ackExpected){
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp?rev=679805&r1=679804&r2=679805&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp Fri Jul 25 06:50:32 2008
@@ -91,7 +91,7 @@
BOOST_CHECK(!c1.received);
msg1->enqueueComplete();
- received = queue->dequeue().payload;
+ received = queue->get().payload;
BOOST_CHECK_EQUAL(msg1.get(), received.get());
}
@@ -179,11 +179,11 @@
BOOST_CHECK_EQUAL(uint32_t(3), queue->getMessageCount());
- received = queue->dequeue().payload;
+ received = queue->get().payload;
BOOST_CHECK_EQUAL(msg1.get(), received.get());
BOOST_CHECK_EQUAL(uint32_t(2), queue->getMessageCount());
- received = queue->dequeue().payload;
+ received = queue->get().payload;
BOOST_CHECK_EQUAL(msg2.get(), received.get());
BOOST_CHECK_EQUAL(uint32_t(1), queue->getMessageCount());
@@ -196,7 +196,7 @@
BOOST_CHECK_EQUAL(msg3.get(), consumer.last.get());
BOOST_CHECK_EQUAL(uint32_t(0), queue->getMessageCount());
- received = queue->dequeue().payload;
+ received = queue->get().payload;
BOOST_CHECK(!received);
BOOST_CHECK_EQUAL(uint32_t(0), queue->getMessageCount());
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/TxPublishTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/TxPublishTest.cpp?rev=679805&r1=679804&r2=679805&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/TxPublishTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/TxPublishTest.cpp Fri Jul 25 06:50:32 2008
@@ -82,13 +82,13 @@
t.op.prepare(0);
t.op.commit();
BOOST_CHECK_EQUAL((uint32_t) 1, t.queue1->getMessageCount());
- intrusive_ptr<Message> msg_dequeue = t.queue1->dequeue().payload;
+ intrusive_ptr<Message> msg_dequeue = t.queue1->get().payload;
BOOST_CHECK_EQUAL( true, (static_pointer_cast<PersistableMessage>(msg_dequeue))->isEnqueueComplete());
BOOST_CHECK_EQUAL(t.msg, msg_dequeue);
BOOST_CHECK_EQUAL((uint32_t) 1, t.queue2->getMessageCount());
- BOOST_CHECK_EQUAL(t.msg, t.queue2->dequeue().payload);
+ BOOST_CHECK_EQUAL(t.msg, t.queue2->get().payload);
}
QPID_AUTO_TEST_SUITE_END()