You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2010/05/13 20:54:40 UTC
svn commit: r943973 - in /qpid/trunk/qpid/cpp/src/qpid: client/amqp0_10/
messaging/
Author: aconway
Date: Thu May 13 18:54:39 2010
New Revision: 943973
URL: http://svn.apache.org/viewvc?rev=943973&view=rev
Log:
Fix deadlocks & thread safety in new API classes.
Modified:
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/FailoverUpdates.cpp
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
qpid/trunk/qpid/cpp/src/qpid/messaging/PrivateImplRef.h
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp?rev=943973&r1=943972&r2=943973&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp Thu May 13 18:54:39 2010
@@ -109,6 +109,7 @@ ConnectionImpl::ConnectionImpl(const std
void ConnectionImpl::setOptions(const Variant::Map& options)
{
+ sys::Mutex::ScopedLock l(lock);
convert(options, settings);
setIfFound(options, "reconnect", reconnect);
setIfFound(options, "reconnect-timeout", timeout);
@@ -139,13 +140,14 @@ void ConnectionImpl::setOption(const std
void ConnectionImpl::close()
{
- std::vector<std::string> names;
- {
- qpid::sys::Mutex::ScopedLock l(lock);
- for (Sessions::const_iterator i = sessions.begin(); i != sessions.end(); ++i) names.push_back(i->first);
- }
- for (std::vector<std::string>::const_iterator i = names.begin(); i != names.end(); ++i) {
- getSession(*i).close();
+ while(true) {
+ messaging::Session session;
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ if (sessions.empty()) break;
+ session = sessions.begin()->second;
+ }
+ session.close();
}
detach();
}
@@ -246,12 +248,7 @@ void ConnectionImpl::connect(const qpid:
bool ConnectionImpl::tryConnect()
{
- if (tryConnect(urls)) return resetSessions();
- else return false;
-}
-
-bool ConnectionImpl::tryConnect(const std::vector<std::string>& urls)
-{
+ sys::Mutex::ScopedLock l(lock);
for (std::vector<std::string>::const_iterator i = urls.begin(); i != urls.end(); ++i) {
try {
QPID_LOG(info, "Trying to connect to " << *i << "...");
@@ -264,7 +261,7 @@ bool ConnectionImpl::tryConnect(const st
connection.open(settings);
}
QPID_LOG(info, "Connected to " << *i);
- return true;
+ return resetSessions(l);
} catch (const qpid::ConnectionException& e) {
//TODO: need to fix timeout on
//qpid::client::Connection::open() so that it throws
@@ -275,7 +272,7 @@ bool ConnectionImpl::tryConnect(const st
return false;
}
-bool ConnectionImpl::resetSessions()
+bool ConnectionImpl::resetSessions(const sys::Mutex::ScopedLock& )
{
try {
qpid::sys::Mutex::ScopedLock l(lock);
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h?rev=943973&r1=943972&r2=943973&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h Thu May 13 18:54:39 2010
@@ -68,8 +68,8 @@ class ConnectionImpl : public qpid::mess
void setOptions(const qpid::types::Variant::Map& options);
void connect(const qpid::sys::AbsTime& started);
bool tryConnect();
- bool tryConnect(const std::vector<std::string>& urls);
- bool resetSessions();
+ bool resetSessions(const sys::Mutex::ScopedLock&); // dummy parameter indicates call with lock held.
+
};
}}} // namespace qpid::client::amqp0_10
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/FailoverUpdates.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/FailoverUpdates.cpp?rev=943973&r1=943972&r2=943973&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/FailoverUpdates.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/FailoverUpdates.cpp Thu May 13 18:54:39 2010
@@ -51,8 +51,9 @@ struct FailoverUpdatesImpl : qpid::sys::
}
~FailoverUpdatesImpl() {
- receiver.close();
- session.close();
+ try {
+ session.close();
+ } catch(...) {} // Squash exceptions in a destructor.
thread.join();
}
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp?rev=943973&r1=943972&r2=943973&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp Thu May 13 18:54:39 2010
@@ -104,6 +104,7 @@ struct Match
void IncomingMessages::setSession(qpid::client::AsyncSession s)
{
+ sys::Mutex::ScopedLock l(lock);
session = s;
incoming = SessionBase_0_10Access(session).get()->getDemux().getDefault();
acceptTracker.reset();
@@ -111,13 +112,16 @@ void IncomingMessages::setSession(qpid::
bool IncomingMessages::get(Handler& handler, Duration timeout)
{
- //search through received list for any transfer of interest:
- for (FrameSetQueue::iterator i = received.begin(); i != received.end(); i++)
{
- MessageTransfer transfer(*i, *this);
- if (handler.accept(transfer)) {
- received.erase(i);
- return true;
+ sys::Mutex::ScopedLock l(lock);
+ //search through received list for any transfer of interest:
+ for (FrameSetQueue::iterator i = received.begin(); i != received.end(); i++)
+ {
+ MessageTransfer transfer(*i, *this);
+ if (handler.accept(transfer)) {
+ received.erase(i);
+ return true;
+ }
}
}
//none found, check incoming:
@@ -126,6 +130,7 @@ bool IncomingMessages::get(Handler& hand
bool IncomingMessages::getNextDestination(std::string& destination, Duration timeout)
{
+ sys::Mutex::ScopedLock l(lock);
//if there is not already a received message, we must wait for one
if (received.empty() && !wait(timeout)) return false;
//else we have a message in received; return the corresponding destination
@@ -135,20 +140,25 @@ bool IncomingMessages::getNextDestinatio
void IncomingMessages::accept()
{
+ sys::Mutex::ScopedLock l(lock);
acceptTracker.accept(session);
}
void IncomingMessages::releaseAll()
{
- //first process any received messages...
- while (!received.empty()) {
- retrieve(received.front(), 0);
- received.pop_front();
+ {
+ //first process any received messages...
+ sys::Mutex::ScopedLock l(lock);
+ while (!received.empty()) {
+ retrieve(received.front(), 0);
+ received.pop_front();
+ }
}
//then pump out any available messages from incoming queue...
GetAny handler;
while (process(&handler, 0)) ;
//now release all messages
+ sys::Mutex::ScopedLock l(lock);
acceptTracker.release(session);
}
@@ -158,6 +168,7 @@ void IncomingMessages::releasePending(co
while (process(0, 0)) ;
//now remove all messages for this destination from received list, recording their ids...
+ sys::Mutex::ScopedLock l(lock);
MatchAndTrack match(destination);
for (FrameSetQueue::iterator i = received.begin(); i != received.end(); i = match(*i) ? received.erase(i) : ++i) ;
//now release those messages
@@ -184,6 +195,7 @@ bool IncomingMessages::process(Handler*
} else {
//received message for another destination, keep for later
QPID_LOG(debug, "Pushed " << *content->getMethod() << " to received queue");
+ sys::Mutex::ScopedLock l(lock);
received.push_back(content);
}
} else {
@@ -200,6 +212,7 @@ bool IncomingMessages::wait(qpid::sys::D
for (Duration timeout = duration; incoming->pop(content, timeout); timeout = Duration(AbsTime::now(), deadline)) {
if (content->isA<MessageTransferBody>()) {
QPID_LOG(debug, "Pushed " << *content->getMethod() << " to received queue");
+ sys::Mutex::ScopedLock l(lock);
received.push_back(content);
return true;
} else {
@@ -211,10 +224,12 @@ bool IncomingMessages::wait(qpid::sys::D
uint32_t IncomingMessages::pendingAccept()
{
+ sys::Mutex::ScopedLock l(lock);
return acceptTracker.acceptsPending();
}
uint32_t IncomingMessages::pendingAccept(const std::string& destination)
{
+ sys::Mutex::ScopedLock l(lock);
return acceptTracker.acceptsPending(destination);
}
@@ -223,6 +238,7 @@ uint32_t IncomingMessages::available()
//first pump all available messages from incoming to received...
while (process(0, 0)) {}
//return the count of received messages
+ sys::Mutex::ScopedLock l(lock);
return received.size();
}
@@ -232,6 +248,7 @@ uint32_t IncomingMessages::available(con
while (process(0, 0)) {}
//count all messages for this destination from received list
+ sys::Mutex::ScopedLock l(lock);
return std::for_each(received.begin(), received.end(), Match(destination)).matched;
}
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h?rev=943973&r1=943972&r2=943973&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h Thu May 13 18:54:39 2010
@@ -43,7 +43,7 @@ namespace client {
namespace amqp0_10 {
/**
- *
+ * Queue of incoming messages.
*/
class IncomingMessages
{
@@ -83,6 +83,7 @@ class IncomingMessages
private:
typedef std::deque<FrameSetPtr> FrameSetQueue;
+ sys::Mutex lock;
qpid::client::AsyncSession session;
boost::shared_ptr< sys::BlockingQueue<FrameSetPtr> > incoming;
FrameSetQueue received;
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp?rev=943973&r1=943972&r2=943973&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp Thu May 13 18:54:39 2010
@@ -37,6 +37,7 @@ using qpid::messaging::Duration;
void ReceiverImpl::received(qpid::messaging::Message&)
{
//TODO: should this be configurable
+ sys::Mutex::ScopedLock l(lock);
if (capacity && --window <= capacity/2) {
session.sendCompletion();
window = capacity;
@@ -78,14 +79,16 @@ void ReceiverImpl::close()
void ReceiverImpl::start()
{
+ sys::Mutex::ScopedLock l(lock);
if (state == STOPPED) {
state = STARTED;
- startFlow();
+ startFlow(l);
}
}
void ReceiverImpl::stop()
{
+ sys::Mutex::ScopedLock l(lock);
state = STOPPED;
session.messageStop(destination);
}
@@ -95,7 +98,7 @@ void ReceiverImpl::setCapacity(uint32_t
execute1<SetCapacity>(c);
}
-void ReceiverImpl::startFlow()
+void ReceiverImpl::startFlow(const sys::Mutex::ScopedLock&)
{
if (capacity > 0) {
session.messageSetFlowMode(destination, FLOW_MODE_WINDOW);
@@ -107,10 +110,11 @@ void ReceiverImpl::startFlow()
void ReceiverImpl::init(qpid::client::AsyncSession s, AddressResolution& resolver)
{
-
+ sys::Mutex::ScopedLock l(lock);
session = s;
if (state == UNRESOLVED) {
source = resolver.resolveSource(session, address);
+ assert(source.get());
state = STARTED;
}
if (state == CANCELLED) {
@@ -118,15 +122,19 @@ void ReceiverImpl::init(qpid::client::As
parent->receiverCancelled(destination);
} else {
source->subscribe(session, destination);
- startFlow();
+ startFlow(l);
}
}
-const std::string& ReceiverImpl::getName() const { return destination; }
+const std::string& ReceiverImpl::getName() const {
+ sys::Mutex::ScopedLock l(lock);
+ return destination;
+}
uint32_t ReceiverImpl::getCapacity()
{
+ sys::Mutex::ScopedLock l(lock);
return capacity;
}
@@ -153,25 +161,31 @@ bool ReceiverImpl::getImpl(qpid::messagi
bool ReceiverImpl::fetchImpl(qpid::messaging::Message& message, qpid::messaging::Duration timeout)
{
- if (state == CANCELLED) return false;//TODO: or should this be an error?
-
- if (capacity == 0 || state != STARTED) {
- session.messageSetFlowMode(destination, FLOW_MODE_CREDIT);
- session.messageFlow(destination, CREDIT_UNIT_MESSAGE, 1);
- session.messageFlow(destination, CREDIT_UNIT_BYTE, 0xFFFFFFFF);
+ {
+ sys::Mutex::ScopedLock l(lock);
+ if (state == CANCELLED) return false;//TODO: or should this be an error?
+
+ if (capacity == 0 || state != STARTED) {
+ session.messageSetFlowMode(destination, FLOW_MODE_CREDIT);
+ session.messageFlow(destination, CREDIT_UNIT_MESSAGE, 1);
+ session.messageFlow(destination, CREDIT_UNIT_BYTE, 0xFFFFFFFF);
+ }
}
-
if (getImpl(message, timeout)) {
return true;
} else {
sync(session).messageFlush(destination);
- startFlow();//reallocate credit
+ {
+ sys::Mutex::ScopedLock l(lock);
+ startFlow(l); //reallocate credit
+ }
return getImpl(message, Duration::IMMEDIATE);
}
}
void ReceiverImpl::closeImpl()
{
+ sys::Mutex::ScopedLock l(lock);
if (state != CANCELLED) {
state = CANCELLED;
source->cancel(session, destination);
@@ -181,14 +195,16 @@ void ReceiverImpl::closeImpl()
void ReceiverImpl::setCapacityImpl(uint32_t c)
{
+ sys::Mutex::ScopedLock l(lock);
if (c != capacity) {
capacity = c;
if (state == STARTED) {
session.messageStop(destination);
- startFlow();
+ startFlow(l);
}
}
}
+
qpid::messaging::Session ReceiverImpl::getSession() const
{
return qpid::messaging::Session(parent.get());
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h?rev=943973&r1=943972&r2=943973&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h Thu May 13 18:54:39 2010
@@ -27,6 +27,7 @@
#include "qpid/client/AsyncSession.h"
#include "qpid/client/amqp0_10/SessionImpl.h"
#include "qpid/messaging/Duration.h"
+#include "qpid/sys/Mutex.h"
#include <boost/intrusive_ptr.hpp>
#include <memory>
@@ -65,6 +66,7 @@ class ReceiverImpl : public qpid::messag
void received(qpid::messaging::Message& message);
qpid::messaging::Session getSession() const;
private:
+ mutable sys::Mutex lock;
boost::intrusive_ptr<SessionImpl> parent;
const std::string destination;
const qpid::messaging::Address address;
@@ -77,15 +79,14 @@ class ReceiverImpl : public qpid::messag
qpid::messaging::MessageListener* listener;
uint32_t window;
- void startFlow();
+ void startFlow(const sys::Mutex::ScopedLock&); // Dummy param, call with lock held
//implementation of public facing methods
bool fetchImpl(qpid::messaging::Message& message, qpid::messaging::Duration timeout);
bool getImpl(qpid::messaging::Message& message, qpid::messaging::Duration timeout);
void closeImpl();
void setCapacityImpl(uint32_t);
- //functors for public facing methods (allows locking and retry
- //logic to be centralised)
+ //functors for public facing methods.
struct Command
{
ReceiverImpl& impl;
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp?rev=943973&r1=943972&r2=943973&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp Thu May 13 18:54:39 2010
@@ -53,6 +53,9 @@ namespace qpid {
namespace client {
namespace amqp0_10 {
+typedef qpid::sys::Mutex::ScopedLock ScopedLock;
+typedef qpid::sys::Mutex::ScopedUnlock ScopedUnlock;
+
SessionImpl::SessionImpl(ConnectionImpl& c, bool t) : connection(&c), transactional(t) {}
void SessionImpl::checkError()
@@ -112,23 +115,29 @@ void SessionImpl::release(qpid::messagin
void SessionImpl::close()
{
if (hasError()) {
+ ScopedLock l(lock);
senders.clear();
receivers.clear();
} else {
- //close all the senders and receivers (get copy of names and then
- //make the calls to avoid modifying maps while iterating over
- //them):
- std::vector<std::string> s;
- std::vector<std::string> r;
- {
- qpid::sys::Mutex::ScopedLock l(lock);
- for (Senders::const_iterator i = senders.begin(); i != senders.end(); ++i) s.push_back(i->first);
- for (Receivers::const_iterator i = receivers.begin(); i != receivers.end(); ++i) r.push_back(i->first);
+ while (true) {
+ Sender s;
+ {
+ ScopedLock l(lock);
+ if (senders.empty()) break;
+ s = senders.begin()->second;
+ }
+ s.close(); // outside the lock, will call senderCancelled
+ }
+ while (true) {
+ Receiver r;
+ {
+ ScopedLock l(lock);
+ if (receivers.empty()) break;
+ r = receivers.begin()->second;
+ }
+ r.close(); // outside the lock, will call receiverCancelled
}
- for (std::vector<std::string>::const_iterator i = s.begin(); i != s.end(); ++i) getSender(*i).close();
- for (std::vector<std::string>::const_iterator i = r.begin(); i != r.end(); ++i) getReceiver(*i).close();
}
-
connection->closed(*this);
if (!hasError()) session.close();
}
@@ -151,7 +160,7 @@ template <class T> void getFreeKey(std::
void SessionImpl::setSession(qpid::client::Session s)
{
- qpid::sys::Mutex::ScopedLock l(lock);
+ ScopedLock l(lock);
session = s;
incoming.setSession(session);
if (transactional) session.txSelect();
@@ -181,6 +190,7 @@ Receiver SessionImpl::createReceiver(con
Receiver SessionImpl::createReceiverImpl(const qpid::messaging::Address& address)
{
+ ScopedLock l(lock);
std::string name = address.getName();
getFreeKey(name, receivers);
Receiver receiver(new ReceiverImpl(*this, name, address));
@@ -205,7 +215,8 @@ Sender SessionImpl::createSender(const q
}
Sender SessionImpl::createSenderImpl(const qpid::messaging::Address& address)
-{
+{
+ ScopedLock l(lock);
std::string name = address.getName();
getFreeKey(name, senders);
Sender sender(new SenderImpl(*this, name, address));
@@ -265,6 +276,7 @@ struct IncomingMessageHandler : Incoming
bool SessionImpl::getNextReceiver(Receiver* receiver, IncomingMessages::MessageTransfer& transfer)
{
+ ScopedLock l(lock);
Receivers::const_iterator i = receivers.find(transfer.getDestination());
if (i == receivers.end()) {
QPID_LOG(error, "Received message for unknown destination " << transfer.getDestination());
@@ -371,6 +383,7 @@ struct SessionImpl::Receivable : Command
uint32_t SessionImpl::getReceivableImpl(const std::string* destination)
{
+ ScopedLock l(lock);
if (destination) {
return incoming.available(*destination);
} else {
@@ -399,6 +412,7 @@ struct SessionImpl::UnsettledAcks : Comm
uint32_t SessionImpl::getUnsettledAcksImpl(const std::string* destination)
{
+ ScopedLock l(lock);
if (destination) {
return incoming.pendingAccept(*destination);
} else {
@@ -414,12 +428,14 @@ void SessionImpl::syncImpl(bool block)
void SessionImpl::commitImpl()
{
+ ScopedLock l(lock);
incoming.accept();
session.txCommit();
}
void SessionImpl::rollbackImpl()
{
+ ScopedLock l(lock);
for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) {
getImplPtr<Receiver, ReceiverImpl>(i->second)->stop();
}
@@ -436,6 +452,7 @@ void SessionImpl::rollbackImpl()
void SessionImpl::acknowledgeImpl()
{
+ ScopedLock l(lock);
if (!transactional) incoming.accept();
}
@@ -455,6 +472,7 @@ void SessionImpl::releaseImpl(qpid::mess
void SessionImpl::receiverCancelled(const std::string& name)
{
+ ScopedLock l(lock);
receivers.erase(name);
session.sync();
incoming.releasePending(name);
@@ -462,6 +480,7 @@ void SessionImpl::receiverCancelled(cons
void SessionImpl::senderCancelled(const std::string& name)
{
+ ScopedLock l(lock);
senders.erase(name);
}
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h?rev=943973&r1=943972&r2=943973&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h Thu May 13 18:54:39 2010
@@ -94,7 +94,6 @@ class SessionImpl : public qpid::messagi
template <class T> bool execute(T& f)
{
try {
- qpid::sys::Mutex::ScopedLock l(lock);
f();
return true;
} catch (const qpid::TransportFailure&) {
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/PrivateImplRef.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/PrivateImplRef.h?rev=943973&r1=943972&r2=943973&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/PrivateImplRef.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/PrivateImplRef.h Thu May 13 18:54:39 2010
@@ -29,9 +29,7 @@
namespace qpid {
namespace messaging {
-// FIXME aconway 2009-04-24: details!
-/** @file
- *
+/**
* Helper class to implement a class with a private, reference counted
* implementation and reference semantics.
*
@@ -73,8 +71,10 @@ template <class T> class PrivateImplRef
typedef typename T::Impl Impl;
typedef boost::intrusive_ptr<Impl> intrusive_ptr;
+ /** Get the implementation pointer from a handle */
static intrusive_ptr get(const T& t) { return intrusive_ptr(t.impl); }
+ /** Set the implementation pointer in a handle */
static void set(T& t, const intrusive_ptr& p) {
if (t.impl == p) return;
if (t.impl) boost::intrusive_ptr_release(t.impl);
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org