You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2011/12/22 21:36:11 UTC
svn commit: r1222432 - in /qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha:
QueueReplicator.cpp QueueReplicator.h WiringReplicator.cpp
Author: aconway
Date: Thu Dec 22 20:36:10 2011
New Revision: 1222432
URL: http://svn.apache.org/viewvc?rev=1222432&view=rev
Log:
QPID-3603: Lifecycle and locking fixes for QueueReplicator
Separate bridge de-activation from destruction in QueueReplicator:
Only deactivate if destroyed by the WiringReplicator because of a
queue delete. If destroyed for any other reason (e.g. broker
destruction) don't de-activate the bridge as required resources may
not exist.
Added missing locks in QueueReplicator functions.
Modified:
qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.h
qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1222432&r1=1222431&r2=1222432&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Thu Dec 22 20:36:10 2011
@@ -82,14 +82,18 @@ void QueueReplicator::activate() {
);
}
-QueueReplicator::~QueueReplicator() {
+QueueReplicator::~QueueReplicator() {}
+
+void QueueReplicator::deactivate() {
+ sys::Mutex::ScopedLock l(lock);
queue->getBroker()->getLinks().destroy(
link->getHost(), link->getPort(), queue->getName(), getName(), string());
}
// Called in a broker connection thread when the bridge is created.
-// shared_ptr to self is just to ensure we are still in memory.
void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) {
+ sys::Mutex::ScopedLock l(lock);
+
framing::AMQP_ServerProxy peer(sessionHandler.out);
const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
framing::FieldTable settings;
Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.h?rev=1222432&r1=1222431&r2=1222432&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.h (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.h Thu Dec 22 20:36:10 2011
@@ -46,7 +46,7 @@ namespace ha {
* Creates a ReplicatingSubscription on the primary by passing special
* arguments to the consume command.
*
- * THREAD UNSAFE: Only called in the connection thread of the source queue.
+ * THREAD SAFE: Called in different connection threads.
*/
class QueueReplicator : public broker::Exchange,
public boost::enable_shared_from_this<QueueReplicator>
@@ -59,10 +59,12 @@ class QueueReplicator : public broker::E
QueueReplicator(boost::shared_ptr<broker::Queue> q, boost::shared_ptr<broker::Link> l);
~QueueReplicator();
- void activate();
+ void activate(); // Call after ctor
+ void deactivate(); // Call before dtor
std::string getType() const;
- bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
+ bool bind(boost::shared_ptr<broker::Queue
+ >, const std::string&, const framing::FieldTable*);
bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
void route(broker::Deliverable&, const std::string&, const framing::FieldTable*);
bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const);
Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp?rev=1222432&r1=1222431&r2=1222432&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp Thu Dec 22 20:36:10 2011
@@ -296,6 +296,11 @@ void WiringReplicator::doEventQueueDelet
values[USER].asString(),
values[RHOST].asString());
// Delete the QueueReplicator exchange for this queue.
+ boost::shared_ptr<broker::Exchange> ex =
+ broker.getExchanges().find(QueueReplicator::replicatorName(name));
+ boost::shared_ptr<QueueReplicator> qr =
+ boost::dynamic_pointer_cast<QueueReplicator>(ex);
+ if (qr) qr->deactivate();
broker.getExchanges().destroy(QueueReplicator::replicatorName(name));
}
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org