You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2010/03/12 21:11:15 UTC
svn commit: r922411 - in /qpid/trunk/qpid/cpp/src/qpid/broker:
IncompleteMessageList.cpp Message.cpp Message.h
Author: aconway
Date: Fri Mar 12 20:11:15 2010
New Revision: 922411
URL: http://svn.apache.org/viewvc?rev=922411&view=rev
Log:
Fix for deadlock beween journal timer & connection threads.
See https://bugzilla.redhat.com/show_bug.cgi?id=570154.
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp?rev=922411&r1=922410&r2=922411&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp Fri Mar 12 20:11:15 2010
@@ -30,7 +30,8 @@ IncompleteMessageList::IncompleteMessage
IncompleteMessageList::~IncompleteMessageList()
{
- sys::Mutex::ScopedLock l(lock);
+ // No lock here. We are relying on Messsag::reset*CompleteCallback
+ // to ensure no callbacks are in progress before they return.
for (Messages::iterator i = incomplete.begin(); i != incomplete.end(); ++i) {
(*i)->resetEnqueueCompleteCallback();
(*i)->resetDequeueCompleteCallback();
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=922411&r1=922410&r2=922411&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Fri Mar 12 20:11:15 2010
@@ -49,7 +49,8 @@ TransferAdapter Message::TRANSFER;
Message::Message(const framing::SequenceNumber& id) :
frames(id), persistenceId(0), redelivered(false), loaded(false),
staged(false), forcePersistentPolicy(false), publisher(0), adapter(0),
- expiration(FAR_FUTURE), enqueueCallback(0), dequeueCallback(0), requiredCredit(0) {}
+ expiration(FAR_FUTURE), enqueueCallback(0), dequeueCallback(0),
+ inCallback(false), requiredCredit(0) {}
Message::~Message()
{
@@ -398,35 +399,55 @@ void Message::setReplacementMessage(boos
replacement[qfor] = msg;
}
+namespace {
+struct ScopedSet {
+ sys::Monitor& lock;
+ bool& flag;
+ ScopedSet(sys::Monitor& l, bool& f) : lock(l), flag(f) {
+ sys::Monitor::ScopedLock sl(lock);
+ flag = true;
+ }
+ ~ScopedSet(){
+ sys::Monitor::ScopedLock sl(lock);
+ flag = false;
+ lock.notifyAll();
+ }
+};
+}
+
void Message::allEnqueuesComplete() {
- sys::Mutex::ScopedLock l(callbackLock);
+ ScopedSet ss(callbackLock, inCallback);
MessageCallback* cb = enqueueCallback;
if (cb && *cb) (*cb)(intrusive_ptr<Message>(this));
}
void Message::allDequeuesComplete() {
- sys::Mutex::ScopedLock l(callbackLock);
+ ScopedSet ss(callbackLock, inCallback);
MessageCallback* cb = dequeueCallback;
if (cb && *cb) (*cb)(intrusive_ptr<Message>(this));
}
void Message::setEnqueueCompleteCallback(MessageCallback& cb) {
sys::Mutex::ScopedLock l(callbackLock);
+ while (inCallback) callbackLock.wait();
enqueueCallback = &cb;
}
void Message::resetEnqueueCompleteCallback() {
sys::Mutex::ScopedLock l(callbackLock);
+ while (inCallback) callbackLock.wait();
enqueueCallback = 0;
}
void Message::setDequeueCompleteCallback(MessageCallback& cb) {
sys::Mutex::ScopedLock l(callbackLock);
+ while (inCallback) callbackLock.wait();
dequeueCallback = &cb;
}
void Message::resetDequeueCompleteCallback() {
sys::Mutex::ScopedLock l(callbackLock);
+ while (inCallback) callbackLock.wait();
dequeueCallback = 0;
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=922411&r1=922410&r2=922411&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Fri Mar 12 20:11:15 2010
@@ -26,7 +26,7 @@
#include "qpid/broker/PersistableMessage.h"
#include "qpid/broker/MessageAdapter.h"
#include "qpid/framing/amqp_types.h"
-#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Monitor.h"
#include "qpid/sys/Time.h"
#include <boost/function.hpp>
#include <boost/shared_ptr.hpp>
@@ -189,9 +189,10 @@ public:
mutable Replacement replacement;
mutable boost::intrusive_ptr<Message> empty;
- sys::Mutex callbackLock;
+ sys::Monitor callbackLock;
MessageCallback* enqueueCallback;
MessageCallback* dequeueCallback;
+ bool inCallback;
uint32_t requiredCredit;
};
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org