You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2010/03/12 21:11:15 UTC

svn commit: r922411 - in /qpid/trunk/qpid/cpp/src/qpid/broker: IncompleteMessageList.cpp Message.cpp Message.h

Author: aconway
Date: Fri Mar 12 20:11:15 2010
New Revision: 922411

URL: http://svn.apache.org/viewvc?rev=922411&view=rev
Log:
Fix for deadlock beween journal timer & connection threads.

See https://bugzilla.redhat.com/show_bug.cgi?id=570154.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Message.h

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp?rev=922411&r1=922410&r2=922411&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp Fri Mar 12 20:11:15 2010
@@ -30,7 +30,8 @@ IncompleteMessageList::IncompleteMessage
 
 IncompleteMessageList::~IncompleteMessageList() 
 {
-    sys::Mutex::ScopedLock l(lock);
+    //  No lock here. We are relying on Messsag::reset*CompleteCallback
+    //  to ensure no callbacks are in progress before they return.
     for (Messages::iterator i = incomplete.begin(); i != incomplete.end(); ++i) {
         (*i)->resetEnqueueCompleteCallback();
         (*i)->resetDequeueCompleteCallback();

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=922411&r1=922410&r2=922411&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Fri Mar 12 20:11:15 2010
@@ -49,7 +49,8 @@ TransferAdapter Message::TRANSFER;
 Message::Message(const framing::SequenceNumber& id) :
     frames(id), persistenceId(0), redelivered(false), loaded(false),
     staged(false), forcePersistentPolicy(false), publisher(0), adapter(0), 
-    expiration(FAR_FUTURE), enqueueCallback(0), dequeueCallback(0), requiredCredit(0) {}
+    expiration(FAR_FUTURE), enqueueCallback(0), dequeueCallback(0),
+    inCallback(false), requiredCredit(0) {}
 
 Message::~Message()
 {
@@ -398,35 +399,55 @@ void Message::setReplacementMessage(boos
     replacement[qfor] = msg;
 }
 
+namespace {
+struct ScopedSet {
+    sys::Monitor& lock;
+    bool& flag;
+    ScopedSet(sys::Monitor& l, bool& f) : lock(l), flag(f) {
+        sys::Monitor::ScopedLock sl(lock);
+        flag = true;
+    }
+    ~ScopedSet(){
+        sys::Monitor::ScopedLock sl(lock);
+        flag = false;
+        lock.notifyAll();
+    }
+};
+}
+
 void Message::allEnqueuesComplete() {
-    sys::Mutex::ScopedLock l(callbackLock);
+    ScopedSet ss(callbackLock, inCallback);
     MessageCallback* cb = enqueueCallback;
     if (cb && *cb) (*cb)(intrusive_ptr<Message>(this));
 }
 
 void Message::allDequeuesComplete() {
-    sys::Mutex::ScopedLock l(callbackLock);
+    ScopedSet ss(callbackLock, inCallback);
     MessageCallback* cb = dequeueCallback;
     if (cb && *cb) (*cb)(intrusive_ptr<Message>(this));
 }
 
 void Message::setEnqueueCompleteCallback(MessageCallback& cb) {
     sys::Mutex::ScopedLock l(callbackLock);
+    while (inCallback) callbackLock.wait();
     enqueueCallback = &cb;
 }
 
 void Message::resetEnqueueCompleteCallback() {
     sys::Mutex::ScopedLock l(callbackLock);
+    while (inCallback) callbackLock.wait();
     enqueueCallback = 0;
 }
 
 void Message::setDequeueCompleteCallback(MessageCallback& cb) {
     sys::Mutex::ScopedLock l(callbackLock);
+    while (inCallback) callbackLock.wait();
     dequeueCallback = &cb;
 }
 
 void Message::resetDequeueCompleteCallback() {
     sys::Mutex::ScopedLock l(callbackLock);
+    while (inCallback) callbackLock.wait();
     dequeueCallback = 0;
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=922411&r1=922410&r2=922411&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Fri Mar 12 20:11:15 2010
@@ -26,7 +26,7 @@
 #include "qpid/broker/PersistableMessage.h"
 #include "qpid/broker/MessageAdapter.h"
 #include "qpid/framing/amqp_types.h"
-#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Monitor.h"
 #include "qpid/sys/Time.h"
 #include <boost/function.hpp>
 #include <boost/shared_ptr.hpp>
@@ -189,9 +189,10 @@ public:
     mutable Replacement replacement;
     mutable boost::intrusive_ptr<Message> empty;
 
-    sys::Mutex callbackLock;
+    sys::Monitor callbackLock;
     MessageCallback* enqueueCallback;
     MessageCallback* dequeueCallback;
+    bool inCallback;
 
     uint32_t requiredCredit;
 };



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org