You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2011/12/22 21:36:11 UTC

svn commit: r1222432 - in /qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha: QueueReplicator.cpp QueueReplicator.h WiringReplicator.cpp

Author: aconway
Date: Thu Dec 22 20:36:10 2011
New Revision: 1222432

URL: http://svn.apache.org/viewvc?rev=1222432&view=rev
Log:
QPID-3603: Lifecycle and locking fixes for QueueReplicator

Separate bridge de-activation from destruction in QueueReplicator:
Only deactivate if destroyed by the WiringReplicator because of a
queue delete.  If destroyed for any other reason (e.g. broker
destruction) don't de-activate the bridge as required resources may
not exist.

Added missing locks in QueueReplicator functions.

Modified:
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.h
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1222432&r1=1222431&r2=1222432&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Thu Dec 22 20:36:10 2011
@@ -82,14 +82,18 @@ void QueueReplicator::activate() {
     );
 }
 
-QueueReplicator::~QueueReplicator() {
+QueueReplicator::~QueueReplicator() {}
+
+void QueueReplicator::deactivate() {
+    sys::Mutex::ScopedLock l(lock);
     queue->getBroker()->getLinks().destroy(
         link->getHost(), link->getPort(), queue->getName(), getName(), string());
 }
 
 // Called in a broker connection thread when the bridge is created.
-// shared_ptr to self is just to ensure we are still in memory.
 void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) {
+    sys::Mutex::ScopedLock l(lock);
+
     framing::AMQP_ServerProxy peer(sessionHandler.out);
     const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
     framing::FieldTable settings;

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.h?rev=1222432&r1=1222431&r2=1222432&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.h (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.h Thu Dec 22 20:36:10 2011
@@ -46,7 +46,7 @@ namespace ha {
  * Creates a ReplicatingSubscription on the primary by passing special
  * arguments to the consume command.
  *
- * THREAD UNSAFE: Only called in the connection thread of the source queue.
+ * THREAD SAFE: Called in different connection threads.
  */
 class QueueReplicator : public broker::Exchange,
                         public boost::enable_shared_from_this<QueueReplicator>
@@ -59,10 +59,12 @@ class QueueReplicator : public broker::E
     QueueReplicator(boost::shared_ptr<broker::Queue> q, boost::shared_ptr<broker::Link> l);
     ~QueueReplicator();
 
-    void activate();
+    void activate();            // Call after ctor
+    void deactivate();          // Call before dtor
 
     std::string getType() const;
-    bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
+    bool bind(boost::shared_ptr<broker::Queue
+              >, const std::string&, const framing::FieldTable*);
     bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
     void route(broker::Deliverable&, const std::string&, const framing::FieldTable*);
     bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const);

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp?rev=1222432&r1=1222431&r2=1222432&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp Thu Dec 22 20:36:10 2011
@@ -296,6 +296,11 @@ void WiringReplicator::doEventQueueDelet
             values[USER].asString(),
             values[RHOST].asString());
         // Delete the QueueReplicator exchange for this queue.
+        boost::shared_ptr<broker::Exchange> ex =
+            broker.getExchanges().find(QueueReplicator::replicatorName(name));
+        boost::shared_ptr<QueueReplicator> qr =
+            boost::dynamic_pointer_cast<QueueReplicator>(ex);
+        if (qr) qr->deactivate();
         broker.getExchanges().destroy(QueueReplicator::replicatorName(name));
     }
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org