You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2012/03/19 21:07:38 UTC
svn commit: r1302629 - in /qpid/branches/qpid-3890/qpid/cpp/src/qpid/broker:
Queue.cpp Queue.h
Author: kgiusti
Date: Mon Mar 19 20:07:37 2012
New Revision: 1302629
URL: http://svn.apache.org/viewvc?rev=1302629&view=rev
Log:
QPID-3890: revert changes to observer methods interfaces
Modified:
qpid/branches/qpid-3890/qpid/cpp/src/qpid/broker/Queue.cpp
qpid/branches/qpid-3890/qpid/cpp/src/qpid/broker/Queue.h
Modified: qpid/branches/qpid-3890/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3890/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1302629&r1=1302628&r2=1302629&view=diff
==============================================================================
--- qpid/branches/qpid-3890/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/qpid-3890/qpid/cpp/src/qpid/broker/Queue.cpp Mon Mar 19 20:07:37 2012
@@ -301,7 +301,7 @@ void Queue::requeue(const QueuedMessage&
{
Mutex::ScopedLock locker(messageLock);
messages->release(msg);
- observeRequeueLH(msg);
+ observeRequeue(msg, locker);
listeners.populate(copy);
}
@@ -424,7 +424,7 @@ Queue::ConsumeCode Queue::consumeNextMes
Mutex::ScopedLock locker(messageLock);
bool ok = allocator->allocate( c->getName(), msg ); // inform allocator
(void) ok; assert(ok);
- observeAcquireLH(msg);
+ observeAcquire(msg, locker);
}
if (mgmtObject) {
mgmtObject->inc_acquires();
@@ -535,7 +535,7 @@ void Queue::consume(Consumer::shared_ptr
if (autoDeleteTimeout && autoDeleteTask) {
autoDeleteTask->cancel();
}
- observeConsumerAddLH(*c);
+ observeConsumerAdd(*c, locker);
}
if (mgmtObject != 0)
mgmtObject->inc_consumerCount ();
@@ -547,7 +547,7 @@ void Queue::cancel(Consumer::shared_ptr
Mutex::ScopedLock locker(messageLock);
consumerCount--;
if(exclusive) exclusive = 0;
- observeConsumerRemoveLH(*c);
+ observeConsumerRemove(*c, locker);
}
if (mgmtObject != 0)
mgmtObject->dec_consumerCount ();
@@ -559,7 +559,7 @@ QueuedMessage Queue::get(){
{
Mutex::ScopedLock locker(messageLock);
ok = messages->consume(msg);
- if (ok) observeAcquireLH(msg);
+ if (ok) observeAcquire(msg, locker);
}
if (ok && mgmtObject) {
@@ -615,7 +615,7 @@ void Queue::purgeExpired(qpid::sys::Dura
// KAG: should be safe to retake lock after the removeIf, since
// no other thread can touch these messages after the removeIf() call
Mutex::ScopedLock locker(messageLock);
- observeAcquireLH(*i);
+ observeAcquire(*i, locker);
}
dequeue( 0, *i );
}
@@ -774,7 +774,7 @@ uint32_t Queue::purge(const uint32_t pur
// KAG: should be safe to retake lock after the removeIf, since
// no other thread can touch these messages after the removeIf call
Mutex::ScopedLock locker(messageLock);
- observeAcquireLH(*qmsg);
+ observeAcquire(*qmsg, locker);
}
dequeue(0, *qmsg);
QPID_LOG(debug, "Purged message at " << qmsg->position << " from " << getName());
@@ -814,7 +814,7 @@ uint32_t Queue::move(const Queue::shared
qmsg != c.matches.end(); ++qmsg) {
{
Mutex::ScopedLock locker(messageLock);
- observeAcquireLH(*qmsg);
+ observeAcquire(*qmsg, locker);
}
dequeue(0, *qmsg);
// and move to destination Queue.
@@ -832,7 +832,7 @@ bool Queue::acquire(const qpid::framing:
{
Mutex::ScopedLock locker(messageLock);
ok = messages->acquire(position, msg);
- if (ok) observeAcquireLH(msg);
+ if (ok) observeAcquire(msg, locker);
}
if (ok) {
if (mgmtObject) {
@@ -856,9 +856,9 @@ void Queue::push(boost::intrusive_ptr<Me
qm.position = ++sequence;
if (messages->push(qm, removed)) {
dequeueRequired = true;
- observeAcquireLH(removed);
+ observeAcquire(removed, locker);
}
- observeEnqueueLH(qm);
+ observeEnqueue(qm, locker);
if (policy.get()) {
policy->enqueued(qm);
}
@@ -1029,7 +1029,7 @@ bool Queue::dequeue(TransactionContext*
if (!ctxt) {
if (policy.get()) policy->dequeued(msg);
messages->deleted(msg);
- observeDequeueLH(msg);
+ observeDequeue(msg, locker);
}
}
@@ -1057,7 +1057,7 @@ void Queue::dequeueCommitted(const Queue
Mutex::ScopedLock locker(messageLock);
if (policy.get()) policy->dequeued(msg);
messages->deleted(msg);
- observeDequeueLH(msg);
+ observeDequeue(msg, locker);
}
mgntDeqStats(msg.payload, mgmtObject, brokerMgmtObject);
if (mgmtObject != 0) {
@@ -1085,7 +1085,7 @@ bool Queue::popAndDequeue(QueuedMessage&
{
Mutex::ScopedLock locker(messageLock);
popped = messages->consume(msg);
- if (popped) observeAcquireLH(msg);
+ if (popped) observeAcquire(msg, locker);
}
if (popped) {
if (mgmtObject) {
@@ -1102,8 +1102,9 @@ bool Queue::popAndDequeue(QueuedMessage&
/**
* Updates policy and management when a message has been dequeued,
+ * Requires messageLock be held by caller.
*/
-void Queue::observeDequeueLH(const QueuedMessage& msg)
+void Queue::observeDequeue(const QueuedMessage& msg, const qpid::sys::Mutex::ScopedLock&)
{
for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
try{
@@ -1114,9 +1115,10 @@ void Queue::observeDequeueLH(const Queue
}
}
-/** updates queue observers when a message has become unavailable for transfer
+/** updates queue observers when a message has become unavailable for transfer.
+ * Requires messageLock be held by caller.
*/
-void Queue::observeAcquireLH(const QueuedMessage& msg)
+void Queue::observeAcquire(const QueuedMessage& msg, const qpid::sys::Mutex::ScopedLock&)
{
for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
try{
@@ -1128,8 +1130,9 @@ void Queue::observeAcquireLH(const Queue
}
/** updates queue observers when a message has become re-available for transfer
+ * Requires messageLock be held by caller.
*/
-void Queue::observeRequeueLH(const QueuedMessage& msg)
+void Queue::observeRequeue(const QueuedMessage& msg, const qpid::sys::Mutex::ScopedLock&)
{
for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
try{
@@ -1142,7 +1145,7 @@ void Queue::observeRequeueLH(const Queue
/** updates queue observers when a new consumer has subscribed to this queue.
*/
-void Queue::observeConsumerAddLH( const Consumer& c)
+void Queue::observeConsumerAdd( const Consumer& c, const qpid::sys::Mutex::ScopedLock&)
{
for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
try{
@@ -1155,7 +1158,7 @@ void Queue::observeConsumerAddLH( const
/** updates queue observers when a consumer has unsubscribed from this queue.
*/
-void Queue::observeConsumerRemoveLH( const Consumer& c)
+void Queue::observeConsumerRemove( const Consumer& c, const qpid::sys::Mutex::ScopedLock&)
{
for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
try{
@@ -1674,8 +1677,9 @@ void Queue::insertSequenceNumbers(const
}
/** updates queue observers and state when a message has become available for transfer
+ * Requires messageLock be held by caller.
*/
-void Queue::observeEnqueueLH(const QueuedMessage& m)
+void Queue::observeEnqueue(const QueuedMessage& m, const qpid::sys::Mutex::ScopedLock&)
{
for (Observers::iterator i = observers.begin(); i != observers.end(); ++i) {
try {
@@ -1694,7 +1698,7 @@ void Queue::updateEnqueued(const QueuedM
{
Mutex::ScopedLock locker(messageLock);
messages->updateAcquired(m);
- observeEnqueueLH(m);
+ observeEnqueue(m, locker);
if (policy.get()) {
policy->recoverEnqueued(payload);
policy->enqueued(m);
Modified: qpid/branches/qpid-3890/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3890/qpid/cpp/src/qpid/broker/Queue.h?rev=1302629&r1=1302628&r2=1302629&view=diff
==============================================================================
--- qpid/branches/qpid-3890/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/branches/qpid-3890/qpid/cpp/src/qpid/broker/Queue.h Mon Mar 19 20:07:37 2012
@@ -144,12 +144,12 @@ class Queue : public boost::enable_share
/** update queue observers, stats, policy, etc when the messages' state changes.
* messageLock is held by caller */
- void observeEnqueueLH(const QueuedMessage& msg);
- void observeAcquireLH(const QueuedMessage& msg);
- void observeRequeueLH(const QueuedMessage& msg);
- void observeDequeueLH(const QueuedMessage& msg);
- void observeConsumerAddLH( const Consumer& );
- void observeConsumerRemoveLH( const Consumer& );
+ void observeEnqueue(const QueuedMessage& msg, const qpid::sys::Mutex::ScopedLock&);
+ void observeAcquire(const QueuedMessage& msg, const qpid::sys::Mutex::ScopedLock&);
+ void observeRequeue(const QueuedMessage& msg, const qpid::sys::Mutex::ScopedLock&);
+ void observeDequeue(const QueuedMessage& msg, const qpid::sys::Mutex::ScopedLock&);
+ void observeConsumerAdd( const Consumer&, const qpid::sys::Mutex::ScopedLock&);
+ void observeConsumerRemove( const Consumer&, const qpid::sys::Mutex::ScopedLock&);
bool popAndDequeue(QueuedMessage&);
bool acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org