You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2010/05/13 20:54:40 UTC

svn commit: r943973 - in /qpid/trunk/qpid/cpp/src/qpid: client/amqp0_10/ messaging/

Author: aconway
Date: Thu May 13 18:54:39 2010
New Revision: 943973

URL: http://svn.apache.org/viewvc?rev=943973&view=rev
Log:
Fix deadlocks & thread safety in new API classes.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/FailoverUpdates.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
    qpid/trunk/qpid/cpp/src/qpid/messaging/PrivateImplRef.h

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp?rev=943973&r1=943972&r2=943973&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp Thu May 13 18:54:39 2010
@@ -109,6 +109,7 @@ ConnectionImpl::ConnectionImpl(const std
 
 void ConnectionImpl::setOptions(const Variant::Map& options)
 {
+    sys::Mutex::ScopedLock l(lock);
     convert(options, settings);
     setIfFound(options, "reconnect", reconnect);
     setIfFound(options, "reconnect-timeout", timeout);
@@ -139,13 +140,14 @@ void ConnectionImpl::setOption(const std
 
 void ConnectionImpl::close()
 {
-    std::vector<std::string> names;
-    {
-        qpid::sys::Mutex::ScopedLock l(lock);
-        for (Sessions::const_iterator i = sessions.begin(); i != sessions.end(); ++i) names.push_back(i->first);
-    }
-    for (std::vector<std::string>::const_iterator i = names.begin(); i != names.end(); ++i) {
-        getSession(*i).close();
+    while(true) {
+        messaging::Session session;
+        {
+            qpid::sys::Mutex::ScopedLock l(lock);
+            if (sessions.empty()) break;
+            session = sessions.begin()->second;
+        }
+        session.close();
     }
     detach();
 }
@@ -246,12 +248,7 @@ void ConnectionImpl::connect(const qpid:
 
 bool ConnectionImpl::tryConnect()
 {
-    if (tryConnect(urls)) return resetSessions();
-    else return false;
-}
-
-bool ConnectionImpl::tryConnect(const std::vector<std::string>& urls)
-{
+    sys::Mutex::ScopedLock l(lock);
     for (std::vector<std::string>::const_iterator i = urls.begin(); i != urls.end(); ++i) {
         try {
             QPID_LOG(info, "Trying to connect to " << *i << "...");
@@ -264,7 +261,7 @@ bool ConnectionImpl::tryConnect(const st
                 connection.open(settings);
             }
             QPID_LOG(info, "Connected to " << *i);                
-            return true;
+            return resetSessions(l);
         } catch (const qpid::ConnectionException& e) {
             //TODO: need to fix timeout on
             //qpid::client::Connection::open() so that it throws
@@ -275,7 +272,7 @@ bool ConnectionImpl::tryConnect(const st
     return false;
 }
 
-bool ConnectionImpl::resetSessions()
+bool ConnectionImpl::resetSessions(const sys::Mutex::ScopedLock& )
 {
     try {
         qpid::sys::Mutex::ScopedLock l(lock);

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h?rev=943973&r1=943972&r2=943973&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h Thu May 13 18:54:39 2010
@@ -68,8 +68,8 @@ class ConnectionImpl : public qpid::mess
     void setOptions(const qpid::types::Variant::Map& options);
     void connect(const qpid::sys::AbsTime& started);
     bool tryConnect();
-    bool tryConnect(const std::vector<std::string>& urls);
-    bool resetSessions();
+    bool resetSessions(const sys::Mutex::ScopedLock&); // dummy parameter indicates call with lock held.
+
 };
 }}} // namespace qpid::client::amqp0_10
 

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/FailoverUpdates.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/FailoverUpdates.cpp?rev=943973&r1=943972&r2=943973&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/FailoverUpdates.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/FailoverUpdates.cpp Thu May 13 18:54:39 2010
@@ -51,8 +51,9 @@ struct FailoverUpdatesImpl : qpid::sys::
     }
 
     ~FailoverUpdatesImpl() {
-        receiver.close();
-        session.close();
+        try {
+            session.close();
+        } catch(...) {}         // Squash exceptions in a destructor.
         thread.join();
     }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp?rev=943973&r1=943972&r2=943973&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp Thu May 13 18:54:39 2010
@@ -104,6 +104,7 @@ struct Match
 
 void IncomingMessages::setSession(qpid::client::AsyncSession s)
 {
+    sys::Mutex::ScopedLock l(lock);
     session = s;
     incoming = SessionBase_0_10Access(session).get()->getDemux().getDefault();
     acceptTracker.reset();
@@ -111,13 +112,16 @@ void IncomingMessages::setSession(qpid::
 
 bool IncomingMessages::get(Handler& handler, Duration timeout)
 {
-    //search through received list for any transfer of interest:
-    for (FrameSetQueue::iterator i = received.begin(); i != received.end(); i++)
     {
-        MessageTransfer transfer(*i, *this);
-        if (handler.accept(transfer)) {
-            received.erase(i);
-            return true;
+        sys::Mutex::ScopedLock l(lock);
+        //search through received list for any transfer of interest:
+        for (FrameSetQueue::iterator i = received.begin(); i != received.end(); i++)
+        {
+            MessageTransfer transfer(*i, *this);
+            if (handler.accept(transfer)) {
+                received.erase(i);
+                return true;
+            }
         }
     }
     //none found, check incoming:
@@ -126,6 +130,7 @@ bool IncomingMessages::get(Handler& hand
 
 bool IncomingMessages::getNextDestination(std::string& destination, Duration timeout)
 {
+    sys::Mutex::ScopedLock l(lock);
     //if there is not already a received message, we must wait for one
     if (received.empty() && !wait(timeout)) return false;
     //else we have a message in received; return the corresponding destination
@@ -135,20 +140,25 @@ bool IncomingMessages::getNextDestinatio
 
 void IncomingMessages::accept()
 {
+    sys::Mutex::ScopedLock l(lock);
     acceptTracker.accept(session);
 }
 
 void IncomingMessages::releaseAll()
 {
-    //first process any received messages...
-    while (!received.empty()) {
-        retrieve(received.front(), 0);
-        received.pop_front();
+    {
+        //first process any received messages...
+        sys::Mutex::ScopedLock l(lock);
+        while (!received.empty()) {
+            retrieve(received.front(), 0);
+            received.pop_front();
+        }
     }
     //then pump out any available messages from incoming queue...
     GetAny handler;
     while (process(&handler, 0)) ;
     //now release all messages
+    sys::Mutex::ScopedLock l(lock);
     acceptTracker.release(session);
 }
 
@@ -158,6 +168,7 @@ void IncomingMessages::releasePending(co
     while (process(0, 0)) ;
 
     //now remove all messages for this destination from received list, recording their ids...
+    sys::Mutex::ScopedLock l(lock);
     MatchAndTrack match(destination);
     for (FrameSetQueue::iterator i = received.begin(); i != received.end(); i = match(*i) ? received.erase(i) : ++i) ;
     //now release those messages
@@ -184,6 +195,7 @@ bool IncomingMessages::process(Handler* 
             } else {
                 //received message for another destination, keep for later
                 QPID_LOG(debug, "Pushed " << *content->getMethod() << " to received queue");
+                sys::Mutex::ScopedLock l(lock);
                 received.push_back(content);
             }
         } else {
@@ -200,6 +212,7 @@ bool IncomingMessages::wait(qpid::sys::D
     for (Duration timeout = duration; incoming->pop(content, timeout); timeout = Duration(AbsTime::now(), deadline)) {
         if (content->isA<MessageTransferBody>()) {
             QPID_LOG(debug, "Pushed " << *content->getMethod() << " to received queue");
+            sys::Mutex::ScopedLock l(lock);
             received.push_back(content);
             return true;
         } else {
@@ -211,10 +224,12 @@ bool IncomingMessages::wait(qpid::sys::D
 
 uint32_t IncomingMessages::pendingAccept()
 {
+    sys::Mutex::ScopedLock l(lock);
     return acceptTracker.acceptsPending();
 }
 uint32_t IncomingMessages::pendingAccept(const std::string& destination)
 {
+    sys::Mutex::ScopedLock l(lock);
     return acceptTracker.acceptsPending(destination);
 }
 
@@ -223,6 +238,7 @@ uint32_t IncomingMessages::available()
     //first pump all available messages from incoming to received...
     while (process(0, 0)) {}
     //return the count of received messages
+    sys::Mutex::ScopedLock l(lock);
     return received.size();
 }
 
@@ -232,6 +248,7 @@ uint32_t IncomingMessages::available(con
     while (process(0, 0)) {}
 
     //count all messages for this destination from received list
+    sys::Mutex::ScopedLock l(lock);
     return std::for_each(received.begin(), received.end(), Match(destination)).matched;
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h?rev=943973&r1=943972&r2=943973&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h Thu May 13 18:54:39 2010
@@ -43,7 +43,7 @@ namespace client {
 namespace amqp0_10 {
 
 /**
- * 
+ * Queue of incoming messages.
  */
 class IncomingMessages
 {
@@ -83,6 +83,7 @@ class IncomingMessages
   private:
     typedef std::deque<FrameSetPtr> FrameSetQueue;
 
+    sys::Mutex lock;
     qpid::client::AsyncSession session;
     boost::shared_ptr< sys::BlockingQueue<FrameSetPtr> > incoming;
     FrameSetQueue received;

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp?rev=943973&r1=943972&r2=943973&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp Thu May 13 18:54:39 2010
@@ -37,6 +37,7 @@ using qpid::messaging::Duration;
 void ReceiverImpl::received(qpid::messaging::Message&)
 {
     //TODO: should this be configurable
+    sys::Mutex::ScopedLock l(lock);
     if (capacity && --window <= capacity/2) {
         session.sendCompletion();
         window = capacity;
@@ -78,14 +79,16 @@ void ReceiverImpl::close() 
 
 void ReceiverImpl::start()
 {
+    sys::Mutex::ScopedLock l(lock);
     if (state == STOPPED) {
         state = STARTED;
-        startFlow();
+        startFlow(l);
     }
 }
 
 void ReceiverImpl::stop()
 {
+    sys::Mutex::ScopedLock l(lock);
     state = STOPPED;
     session.messageStop(destination);
 }
@@ -95,7 +98,7 @@ void ReceiverImpl::setCapacity(uint32_t 
     execute1<SetCapacity>(c);
 }
 
-void ReceiverImpl::startFlow()
+void ReceiverImpl::startFlow(const sys::Mutex::ScopedLock&)
 {
     if (capacity > 0) {
         session.messageSetFlowMode(destination, FLOW_MODE_WINDOW);
@@ -107,10 +110,11 @@ void ReceiverImpl::startFlow()
 
 void ReceiverImpl::init(qpid::client::AsyncSession s, AddressResolution& resolver)
 {
-
+    sys::Mutex::ScopedLock l(lock);
     session = s;
     if (state == UNRESOLVED) {
         source = resolver.resolveSource(session, address);
+        assert(source.get());
         state = STARTED;
     }
     if (state == CANCELLED) {
@@ -118,15 +122,19 @@ void ReceiverImpl::init(qpid::client::As
         parent->receiverCancelled(destination);        
     } else {
         source->subscribe(session, destination);
-        startFlow();
+        startFlow(l);
     }
 }
 
 
-const std::string& ReceiverImpl::getName() const { return destination; }
+const std::string& ReceiverImpl::getName() const {
+    sys::Mutex::ScopedLock l(lock);
+    return destination;
+}
 
 uint32_t ReceiverImpl::getCapacity()
 {
+    sys::Mutex::ScopedLock l(lock);
     return capacity;
 }
 
@@ -153,25 +161,31 @@ bool ReceiverImpl::getImpl(qpid::messagi
 
 bool ReceiverImpl::fetchImpl(qpid::messaging::Message& message, qpid::messaging::Duration timeout)
 {
-    if (state == CANCELLED) return false;//TODO: or should this be an error?
-
-    if (capacity == 0 || state != STARTED) {
-        session.messageSetFlowMode(destination, FLOW_MODE_CREDIT);
-        session.messageFlow(destination, CREDIT_UNIT_MESSAGE, 1);
-        session.messageFlow(destination, CREDIT_UNIT_BYTE, 0xFFFFFFFF);
+    {
+        sys::Mutex::ScopedLock l(lock);
+        if (state == CANCELLED) return false;//TODO: or should this be an error?
+
+        if (capacity == 0 || state != STARTED) {
+            session.messageSetFlowMode(destination, FLOW_MODE_CREDIT);
+            session.messageFlow(destination, CREDIT_UNIT_MESSAGE, 1);
+            session.messageFlow(destination, CREDIT_UNIT_BYTE, 0xFFFFFFFF);
+        }
     }
-    
     if (getImpl(message, timeout)) {
         return true;
     } else {
         sync(session).messageFlush(destination);
-        startFlow();//reallocate credit
+        {
+            sys::Mutex::ScopedLock l(lock);
+            startFlow(l); //reallocate credit
+        }
         return getImpl(message, Duration::IMMEDIATE);
     }
 }
 
 void ReceiverImpl::closeImpl() 
 { 
+    sys::Mutex::ScopedLock l(lock);
     if (state != CANCELLED) {
         state = CANCELLED;
         source->cancel(session, destination);
@@ -181,14 +195,16 @@ void ReceiverImpl::closeImpl() 
 
 void ReceiverImpl::setCapacityImpl(uint32_t c)
 {
+    sys::Mutex::ScopedLock l(lock);
     if (c != capacity) {
         capacity = c;
         if (state == STARTED) {
             session.messageStop(destination);
-            startFlow();
+            startFlow(l);
         }
     }
 }
+
 qpid::messaging::Session ReceiverImpl::getSession() const
 {
     return qpid::messaging::Session(parent.get());

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h?rev=943973&r1=943972&r2=943973&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h Thu May 13 18:54:39 2010
@@ -27,6 +27,7 @@
 #include "qpid/client/AsyncSession.h"
 #include "qpid/client/amqp0_10/SessionImpl.h"
 #include "qpid/messaging/Duration.h"
+#include "qpid/sys/Mutex.h"
 #include <boost/intrusive_ptr.hpp>
 #include <memory>
 
@@ -65,6 +66,7 @@ class ReceiverImpl : public qpid::messag
     void received(qpid::messaging::Message& message);
     qpid::messaging::Session getSession() const;
   private:
+    mutable sys::Mutex lock;
     boost::intrusive_ptr<SessionImpl> parent;
     const std::string destination;
     const qpid::messaging::Address address;
@@ -77,15 +79,14 @@ class ReceiverImpl : public qpid::messag
     qpid::messaging::MessageListener* listener;
     uint32_t window;
 
-    void startFlow();
+    void startFlow(const sys::Mutex::ScopedLock&); // Dummy param, call with lock held
     //implementation of public facing methods
     bool fetchImpl(qpid::messaging::Message& message, qpid::messaging::Duration timeout);
     bool getImpl(qpid::messaging::Message& message, qpid::messaging::Duration timeout);
     void closeImpl();
     void setCapacityImpl(uint32_t);
 
-    //functors for public facing methods (allows locking and retry
-    //logic to be centralised)
+    //functors for public facing methods.
     struct Command
     {
         ReceiverImpl& impl;

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp?rev=943973&r1=943972&r2=943973&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp Thu May 13 18:54:39 2010
@@ -53,6 +53,9 @@ namespace qpid {
 namespace client {
 namespace amqp0_10 {
 
+typedef qpid::sys::Mutex::ScopedLock ScopedLock;
+typedef qpid::sys::Mutex::ScopedUnlock ScopedUnlock;
+
 SessionImpl::SessionImpl(ConnectionImpl& c, bool t) : connection(&c), transactional(t) {}
 
 void SessionImpl::checkError()
@@ -112,23 +115,29 @@ void SessionImpl::release(qpid::messagin
 void SessionImpl::close()
 {
     if (hasError()) {
+        ScopedLock l(lock);
         senders.clear();
         receivers.clear();
     } else {
-        //close all the senders and receivers (get copy of names and then
-        //make the calls to avoid modifying maps while iterating over
-        //them):
-        std::vector<std::string> s;
-        std::vector<std::string> r;
-        {
-            qpid::sys::Mutex::ScopedLock l(lock);        
-            for (Senders::const_iterator i = senders.begin(); i != senders.end(); ++i) s.push_back(i->first);
-            for (Receivers::const_iterator i = receivers.begin(); i != receivers.end(); ++i) r.push_back(i->first);
+        while (true) {
+            Sender s;
+            {
+                ScopedLock l(lock);
+                if (senders.empty()) break;
+                s = senders.begin()->second;
+            }
+            s.close();  // outside the lock, will call senderCancelled
+        }
+        while (true) {
+            Receiver r;
+            {
+                ScopedLock l(lock);
+                if (receivers.empty()) break;
+                r = receivers.begin()->second;
+            }
+            r.close();  // outside the lock, will call receiverCancelled
         }
-        for (std::vector<std::string>::const_iterator i = s.begin(); i != s.end(); ++i) getSender(*i).close();
-        for (std::vector<std::string>::const_iterator i = r.begin(); i != r.end(); ++i) getReceiver(*i).close();
     }
-    
     connection->closed(*this);
     if (!hasError()) session.close();
 }
@@ -151,7 +160,7 @@ template <class T> void getFreeKey(std::
 
 void SessionImpl::setSession(qpid::client::Session s)
 {
-    qpid::sys::Mutex::ScopedLock l(lock);
+    ScopedLock l(lock);
     session = s;
     incoming.setSession(session);
     if (transactional) session.txSelect();
@@ -181,6 +190,7 @@ Receiver SessionImpl::createReceiver(con
 
 Receiver SessionImpl::createReceiverImpl(const qpid::messaging::Address& address)
 {
+    ScopedLock l(lock);
     std::string name = address.getName();
     getFreeKey(name, receivers);
     Receiver receiver(new ReceiverImpl(*this, name, address));
@@ -205,7 +215,8 @@ Sender SessionImpl::createSender(const q
 }
 
 Sender SessionImpl::createSenderImpl(const qpid::messaging::Address& address)
-{ 
+{
+    ScopedLock l(lock);
     std::string name = address.getName();
     getFreeKey(name, senders);
     Sender sender(new SenderImpl(*this, name, address));
@@ -265,6 +276,7 @@ struct IncomingMessageHandler : Incoming
 
 bool SessionImpl::getNextReceiver(Receiver* receiver, IncomingMessages::MessageTransfer& transfer)
 {
+    ScopedLock l(lock);
     Receivers::const_iterator i = receivers.find(transfer.getDestination());
     if (i == receivers.end()) {
         QPID_LOG(error, "Received message for unknown destination " << transfer.getDestination());
@@ -371,6 +383,7 @@ struct SessionImpl::Receivable : Command
 
 uint32_t SessionImpl::getReceivableImpl(const std::string* destination)
 {
+    ScopedLock l(lock);
     if (destination) {
         return incoming.available(*destination);
     } else {
@@ -399,6 +412,7 @@ struct SessionImpl::UnsettledAcks : Comm
 
 uint32_t SessionImpl::getUnsettledAcksImpl(const std::string* destination)
 {
+    ScopedLock l(lock);
     if (destination) {
         return incoming.pendingAccept(*destination);
     } else {
@@ -414,12 +428,14 @@ void SessionImpl::syncImpl(bool block)
 
 void SessionImpl::commitImpl()
 {
+    ScopedLock l(lock);
     incoming.accept();
     session.txCommit();
 }
 
 void SessionImpl::rollbackImpl()
 {
+    ScopedLock l(lock);
     for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) {
         getImplPtr<Receiver, ReceiverImpl>(i->second)->stop();
     }
@@ -436,6 +452,7 @@ void SessionImpl::rollbackImpl()
 
 void SessionImpl::acknowledgeImpl()
 {
+    ScopedLock l(lock);
     if (!transactional) incoming.accept();
 }
 
@@ -455,6 +472,7 @@ void SessionImpl::releaseImpl(qpid::mess
 
 void SessionImpl::receiverCancelled(const std::string& name)
 {
+    ScopedLock l(lock);
     receivers.erase(name);
     session.sync();
     incoming.releasePending(name);
@@ -462,6 +480,7 @@ void SessionImpl::receiverCancelled(cons
 
 void SessionImpl::senderCancelled(const std::string& name)
 {
+    ScopedLock l(lock);
     senders.erase(name);
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h?rev=943973&r1=943972&r2=943973&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h Thu May 13 18:54:39 2010
@@ -94,7 +94,6 @@ class SessionImpl : public qpid::messagi
     template <class T> bool execute(T& f)
     {
         try {
-            qpid::sys::Mutex::ScopedLock l(lock);
             f();
             return true;
         } catch (const qpid::TransportFailure&) {

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/PrivateImplRef.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/PrivateImplRef.h?rev=943973&r1=943972&r2=943973&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/PrivateImplRef.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/PrivateImplRef.h Thu May 13 18:54:39 2010
@@ -29,9 +29,7 @@
 namespace qpid {
 namespace messaging {
 
-// FIXME aconway 2009-04-24: details!
-/** @file
- *
+/**
  * Helper class to implement a class with a private, reference counted
  * implementation and reference semantics.
  *
@@ -73,8 +71,10 @@ template <class T> class PrivateImplRef 
     typedef typename T::Impl Impl;
     typedef boost::intrusive_ptr<Impl> intrusive_ptr;
 
+    /** Get the implementation pointer from a handle */
     static intrusive_ptr get(const T& t) { return intrusive_ptr(t.impl); }
 
+    /** Set the implementation pointer in a handle */
     static void set(T& t, const intrusive_ptr& p) {
         if (t.impl == p) return;
         if (t.impl) boost::intrusive_ptr_release(t.impl);



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org