You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2011/09/22 19:49:25 UTC
svn commit: r1174282 - in /qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker:
MessageAllocator.cpp MessageGroupManager.cpp Queue.cpp Queue.h
QueueObserver.h
Author: kgiusti
Date: Thu Sep 22 17:49:25 2011
New Revision: 1174282
URL: http://svn.apache.org/viewvc?rev=1174282&view=rev
Log:
QPID-3346: incorporate feedback from aconway's review
Modified:
qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageAllocator.cpp
qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.cpp
qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.h
qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/QueueObserver.h
Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageAllocator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageAllocator.cpp?rev=1174282&r1=1174281&r2=1174282&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageAllocator.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageAllocator.cpp Thu Sep 22 17:49:25 2011
@@ -52,6 +52,7 @@ bool MessageAllocator::acquirable( const
const QueuedMessage&,
const qpid::sys::Mutex::ScopedLock&)
{
+ // by default, all messages present on the queue are acquireable
return true;
}
Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp?rev=1174282&r1=1174281&r2=1174282&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp Thu Sep 22 17:49:25 2011
@@ -28,13 +28,13 @@
using namespace qpid::broker;
namespace {
- const std::string GroupQueryKey("qpid.message_group_queue");
- const std::string GroupHeaderKey("group_header_key");
- const std::string GroupStateKey("group_state");
- const std::string GroupIdKey("group_id");
- const std::string GroupMsgCount("msg_count");
- const std::string GroupTimestamp("timestamp");
- const std::string GroupConsumer("consumer");
+ const std::string GROUP_QUERY_KEY("qpid.message_group_queue");
+ const std::string GROUP_HEADER_KEY("group_header_key");
+ const std::string GROUP_STATE_KEY("group_state");
+ const std::string GROUP_ID_KEY("group_id");
+ const std::string GROUP_MSG_COUNT("msg_count");
+ const std::string GROUP_TIMESTAMP("timestamp");
+ const std::string GROUP_CONSUMER("consumer");
}
@@ -66,12 +66,8 @@ void MessageGroupManager::enqueued( cons
if (total == 1) {
// newly created group, no owner
state.group = group;
-#ifdef NDEBUG
+ assert(freeGroups.find(qm.position) == freeGroups.end());
freeGroups[qm.position] = &state;
-#else
- bool unique = freeGroups.insert(GroupFifo::value_type(qm.position, &state)).second;
- (void) unique; assert(unique);
-#endif
}
}
@@ -261,22 +257,22 @@ void MessageGroupManager::query(qpid::ty
}
**/
- assert(status.find(GroupQueryKey) == status.end());
+ assert(status.find(GROUP_QUERY_KEY) == status.end());
qpid::types::Variant::Map state;
qpid::types::Variant::List groups;
- state[GroupHeaderKey] = groupIdHeader;
+ state[GROUP_HEADER_KEY] = groupIdHeader;
for (GroupMap::const_iterator g = messageGroups.begin();
g != messageGroups.end(); ++g) {
qpid::types::Variant::Map info;
- info[GroupIdKey] = g->first;
- info[GroupMsgCount] = g->second.members.size();
- info[GroupTimestamp] = 0; /** @todo KAG - NEED HEAD MSG TIMESTAMP */
- info[GroupConsumer] = g->second.owner;
+ info[GROUP_ID_KEY] = g->first;
+ info[GROUP_MSG_COUNT] = g->second.members.size();
+ info[GROUP_TIMESTAMP] = 0; /** @todo KAG - NEED HEAD MSG TIMESTAMP */
+ info[GROUP_CONSUMER] = g->second.owner;
groups.push_back(info);
}
- state[GroupStateKey] = groups;
- status[GroupQueryKey] = state;
+ state[GROUP_STATE_KEY] = groups;
+ status[GROUP_QUERY_KEY] = state;
}
Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1174282&r1=1174281&r2=1174282&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.cpp Thu Sep 22 17:49:25 2011
@@ -224,14 +224,7 @@ void Queue::requeue(const QueuedMessage&
enqueue(0, payload);
}
}
-
- for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
- try{
- (*i)->requeued(msg);
- } catch (const std::exception& e) {
- QPID_LOG(warning, "Exception on notification of message requeue for queue " << getName() << ": " << e.what());
- }
- }
+ observeRequeue(msg, locker);
}
copy.notify();
}
@@ -241,7 +234,7 @@ bool Queue::acquireMessageAt(const Seque
Mutex::ScopedLock locker(messageLock);
assertClusterSafe();
QPID_LOG(debug, "Attempting to acquire message at " << position);
- if (acquire(position, message )) {
+ if (acquire(position, message, locker)) {
QPID_LOG(debug, "Acquired message at " << position << " from " << name);
return true;
} else {
@@ -262,7 +255,7 @@ bool Queue::acquire(const QueuedMessage&
}
QueuedMessage copy(msg);
- if (acquire( msg.position, copy )) {
+ if (acquire( msg.position, copy, locker)) {
QPID_LOG(debug, "Acquired message at " << msg.position << " from " << name);
return true;
}
@@ -317,7 +310,7 @@ Queue::ConsumeCode Queue::consumeNextMes
if (msg.payload->hasExpired()) {
QPID_LOG(debug, "Message expired from queue '" << name << "'");
c->position = msg.position;
- acquire( msg.position, msg );
+ acquire( msg.position, msg, locker);
dequeue( 0, msg );
continue;
}
@@ -328,7 +321,7 @@ Queue::ConsumeCode Queue::consumeNextMes
if (c->accept(msg.payload)) {
bool ok = allocator->acquirable( c->getName(), msg, locker ); // inform allocator
(void) ok; assert(ok);
- ok = acquire( msg.position, msg );
+ ok = acquire( msg.position, msg, locker);
(void) ok; assert(ok);
m = msg;
c->position = m.position;
@@ -435,9 +428,9 @@ void Queue::consume(Consumer::shared_ptr
autoDeleteTask->cancel();
}
}
+ Mutex::ScopedLock locker(messageLock);
for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
try{
- Mutex::ScopedLock locker(messageLock);
(*i)->consumerAdded(*c);
} catch (const std::exception& e) {
QPID_LOG(warning, "Exception on notification of new consumer for queue " << getName() << ": " << e.what());
@@ -454,9 +447,9 @@ void Queue::cancel(Consumer::shared_ptr
if (mgmtObject != 0)
mgmtObject->dec_consumerCount ();
}
+ Mutex::ScopedLock locker(messageLock);
for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
try{
- Mutex::ScopedLock locker(messageLock);
(*i)->consumerRemoved(*c);
} catch (const std::exception& e) {
QPID_LOG(warning, "Exception on notification of removed consumer for queue " << getName() << ": " << e.what());
@@ -468,7 +461,7 @@ QueuedMessage Queue::get(){
Mutex::ScopedLock locker(messageLock);
QueuedMessage msg(this);
if (messages->pop(msg))
- acquired( msg );
+ observeAcquire(msg, locker);
return msg;
}
@@ -504,7 +497,7 @@ void Queue::purgeExpired(qpid::sys::Dura
i != expired.end(); ++i) {
{
Mutex::ScopedLock locker(messageLock);
- acquired( *i ); // expects messageLock held
+ observeAcquire(*i, locker);
}
dequeue( 0, *i );
}
@@ -637,7 +630,7 @@ uint32_t Queue::purge(const uint32_t pur
for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin();
qmsg != c.matches.end(); ++qmsg) {
// Update observers and message state:
- acquired(*qmsg);
+ observeAcquire(*qmsg, locker);
dequeue(0, *qmsg);
// now reroute if necessary
if (dest.get()) {
@@ -661,7 +654,7 @@ uint32_t Queue::move(const Queue::shared
for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin();
qmsg != c.matches.end(); ++qmsg) {
// Update observers and message state:
- acquired(*qmsg);
+ observeAcquire(*qmsg, locker);
dequeue(0, *qmsg);
// and move to destination Queue.
assert(qmsg->payload);
@@ -673,21 +666,22 @@ uint32_t Queue::move(const Queue::shared
/** Acquire the front (oldest) message from the in-memory queue.
* assumes messageLock held by caller
*/
-void Queue::pop()
+void Queue::pop(const Mutex::ScopedLock& locker)
{
assertClusterSafe();
QueuedMessage msg;
if (messages->pop(msg)) {
- acquired( msg ); // mark it removed
+ observeAcquire(msg, locker);
++dequeueSincePurge;
}
}
/** Acquire the message at the given position, return true and msg if acquire succeeds */
-bool Queue::acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg )
+bool Queue::acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg,
+ const Mutex::ScopedLock& locker)
{
if (messages->remove(position, msg)) {
- acquired( msg );
+ observeAcquire(msg, locker);
++dequeueSincePurge;
return true;
}
@@ -705,12 +699,13 @@ void Queue::push(boost::intrusive_ptr<Me
if (insertSeqNo) msg->insertCustomProperty(seqNoKey, sequence);
dequeueRequired = messages->push(qm, removed);
+ if (dequeueRequired)
+ observeAcquire(removed, locker);
listeners.populate(copy);
- enqueued(qm);
+ observeEnqueue(qm, locker);
}
copy.notify();
if (dequeueRequired) {
- acquired( removed ); // tell observers
if (isRecovery) {
//can't issue new requests for the store until
//recovery is complete
@@ -841,7 +836,7 @@ bool Queue::dequeue(TransactionContext*
Mutex::ScopedLock locker(messageLock);
if (!isEnqueued(msg)) return false;
if (!ctxt) {
- dequeued(msg);
+ observeDequeue(msg, locker);
}
}
// This check prevents messages which have been forced persistent on one queue from dequeuing
@@ -861,7 +856,7 @@ bool Queue::dequeue(TransactionContext*
void Queue::dequeueCommitted(const QueuedMessage& msg)
{
Mutex::ScopedLock locker(messageLock);
- dequeued(msg);
+ observeDequeue(msg, locker);
if (mgmtObject != 0) {
mgmtObject->inc_msgTxnDequeues();
mgmtObject->inc_byteTxnDequeues(msg.payload->contentSize());
@@ -872,11 +867,11 @@ void Queue::dequeueCommitted(const Queue
* Removes the first (oldest) message from the in-memory delivery queue as well dequeing
* it from the logical (and persistent if applicable) queue
*/
-void Queue::popAndDequeue()
+void Queue::popAndDequeue(const Mutex::ScopedLock& held)
{
if (!messages->empty()) {
QueuedMessage msg = messages->front();
- pop();
+ pop(held);
dequeue(0, msg);
}
}
@@ -885,7 +880,7 @@ void Queue::popAndDequeue()
* Updates policy and management when a message has been dequeued,
* expects messageLock to be held
*/
-void Queue::dequeued(const QueuedMessage& msg)
+void Queue::observeDequeue(const QueuedMessage& msg, const Mutex::ScopedLock&)
{
if (policy.get()) policy->dequeued(msg);
mgntDeqStats(msg.payload);
@@ -901,7 +896,7 @@ void Queue::dequeued(const QueuedMessage
/** updates queue observers when a message has become unavailable for transfer,
* expects messageLock to be held
*/
-void Queue::acquired(const QueuedMessage& msg)
+void Queue::observeAcquire(const QueuedMessage& msg, const Mutex::ScopedLock&)
{
for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
try{
@@ -912,6 +907,20 @@ void Queue::acquired(const QueuedMessage
}
}
+/** updates queue observers when a message has become re-available for transfer,
+ * expects messageLock to be held
+ */
+void Queue::observeRequeue(const QueuedMessage& msg, const Mutex::ScopedLock&)
+{
+ for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
+ try{
+ (*i)->requeued(msg);
+ } catch (const std::exception& e) {
+ QPID_LOG(warning, "Exception on notification of message requeue for queue " << getName() << ": " << e.what());
+ }
+ }
+}
+
void Queue::create(const FieldTable& _settings)
{
settings = _settings;
@@ -1034,7 +1043,7 @@ void Queue::destroyed()
while(!messages->empty()){
DeliverableMessage msg(messages->front().payload);
alternateExchange->routeWithAlternate(msg);
- popAndDequeue();
+ popAndDequeue(locker);
}
alternateExchange->decAlternateUsers();
}
@@ -1328,7 +1337,10 @@ void Queue::insertSequenceNumbers(const
QPID_LOG(debug, "Inserting sequence numbers as " << key);
}
-void Queue::enqueued(const QueuedMessage& m)
+/** updates queue observers and state when a message has become available for transfer,
+ * expects messageLock to be held
+ */
+void Queue::observeEnqueue(const QueuedMessage& m, const Mutex::ScopedLock&)
{
for (Observers::iterator i = observers.begin(); i != observers.end(); ++i) {
try {
@@ -1351,7 +1363,8 @@ void Queue::updateEnqueued(const QueuedM
if (policy.get()) {
policy->recoverEnqueued(payload);
}
- enqueued(m);
+ Mutex::ScopedLock locker(messageLock);
+ observeEnqueue(m, locker);
} else {
QPID_LOG(warning, "Queue informed of enqueued message that has no payload");
}
@@ -1375,6 +1388,7 @@ void Queue::checkNotDeleted()
void Queue::addObserver(boost::shared_ptr<QueueObserver> observer)
{
+ Mutex::ScopedLock locker(messageLock);
observers.insert(observer);
}
Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.h?rev=1174282&r1=1174281&r2=1174282&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.h Thu Sep 22 17:49:25 2011
@@ -142,16 +142,19 @@ class Queue : public boost::enable_share
bool isExcluded(boost::intrusive_ptr<Message>& msg);
- /** update queue observers with new message state */
- void enqueued(const QueuedMessage& msg);
- void acquired(const QueuedMessage& msg);
- void dequeued(const QueuedMessage& msg);
+ /** update queue observers, stats, policy, etc when the messages' state changes. Lock
+ * must be held by caller */
+ void observeEnqueue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
+ void observeAcquire(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
+ void observeRequeue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
+ void observeDequeue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock);
/** modify the Queue's message container - assumes messageLock held */
- void pop(); // acquire front msg
- void popAndDequeue(); // acquire and dequeue front msg
+ void pop(const sys::Mutex::ScopedLock& held); // acquire front msg
+ void popAndDequeue(const sys::Mutex::ScopedLock& held); // acquire and dequeue front msg
// acquire message @ position, return true and set msg if acquire succeeds
- bool acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg );
+ bool acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg,
+ const sys::Mutex::ScopedLock& held);
void forcePersistent(QueuedMessage& msg);
int getEventMode();
Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/QueueObserver.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/QueueObserver.h?rev=1174282&r1=1174281&r2=1174282&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/QueueObserver.h (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/QueueObserver.h Thu Sep 22 17:49:25 2011
@@ -35,9 +35,11 @@ class Consumer;
* the queue it has been delivered to. A message can be considered in one of three states
* with respect to the queue:
*
- * 1) "Available" - available for transfer to consumers,
- * 2) "Locked" - to a particular consumer, no longer available for transfer, but not
- * considered fully dequeued.
+ * 1) "Available" - available for transfer to consumers (i.e. for browse or acquire),
+ *
+ * 2) "Acquired" - owned by a particular consumer, no longer available to other consumers
+ * (by either browse or acquire), but still considered on the queue.
+ *
* 3) "Dequeued" - removed from the queue and no longer available to any consumer.
*
* The queue events that are observable are:
@@ -45,15 +47,15 @@ class Consumer;
* "Enqueued" - the message is "Available" - on the queue for transfer to any consumer
* (e.g. browse or acquire)
*
- * "Acquired" - the message is "Locked" - a consumer has claimed exclusive access to it.
- * It is no longer available for other consumers to browse or acquire, but it is not yet
- * considered dequeued as it may be requeued by the consumer.
+ * "Acquired" - - a consumer has claimed exclusive access to it. It is no longer available
+ * for other consumers to browse or acquire, but it is not yet considered dequeued as it
+ * may be requeued by the consumer.
*
- * "Requeued" - a previously-consumed message is 'unlocked': it is put back on the queue
- * at its original position and returns to the "Available" state.
+ * "Requeued" - a previously-acquired message is released by its owner: it is put back on
+ * the queue at its original position and returns to the "Available" state.
*
- * "Dequeued" - a Locked message is no longer queued. At this point, the queue no longer
- * tracks the message, and the broker considers the consumer's transaction complete.
+ * "Dequeued" - a message is no longer queued. At this point, the queue no longer tracks
+ * the message, and the broker considers the consumer's transaction complete.
*/
class QueueObserver
{
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org