You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2012/07/05 21:57:26 UTC

svn commit: r1357846 - in /qpid/trunk/qpid/cpp/src/qpid/ha: BrokerReplicator.cpp BrokerReplicator.h Primary.cpp QueueReplicator.cpp

Author: aconway
Date: Thu Jul  5 19:57:25 2012
New Revision: 1357846

URL: http://svn.apache.org/viewvc?rev=1357846&view=rev
Log:
QPID-4085: HA message-loss race condition, handling replication event after response.

If the backup broker receives a declare event for a queue after receiving a
queue response for the same queue, it removes the queue and replaces it with the
new one from the reponse. Previously it did not remove the corresponding bridge
so things fail when we attempt to create it. Corrected to remove the bridge also.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h
    qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1357846&r1=1357845&r2=1357846&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Thu Jul  5 19:57:25 2012
@@ -312,6 +312,7 @@ void BrokerReplicator::doEventQueueDecla
         if (broker.getQueues().find(name)) {
             QPID_LOG(warning, logPrefix << "Replacing exsiting queue: " << name);
             broker.getQueues().destroy(name);
+            stopQueueReplicator(name);
         }
         std::pair<boost::shared_ptr<Queue>, bool> result =
             broker.createQueue(
@@ -343,13 +344,7 @@ void BrokerReplicator::doEventQueueDelet
     boost::shared_ptr<Queue> queue = broker.getQueues().find(name);
     if (queue && replicationTest.replicateLevel(queue->getSettings())) {
         QPID_LOG(debug, logPrefix << "Queue delete event: " << name);
-        boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(name);
-        if (qr) {
-            qr->deactivate();
-            // QueueReplicator's bridge is now queued for destruction but may not
-            // actually be destroyed.
-            broker.getExchanges().destroy(qr->getName());
-        }
+        stopQueueReplicator(name);
         broker.deleteQueue(name, values[USER].asString(), values[RHOST].asString());
     }
 }
@@ -563,6 +558,16 @@ void BrokerReplicator::startQueueReplica
     }
 }
 
+void BrokerReplicator::stopQueueReplicator(const std::string& name) {
+    boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(name);
+    if (qr) {
+        qr->deactivate();
+        // QueueReplicator's bridge is now queued for destruction but may not
+        // actually be destroyed.
+        broker.getExchanges().destroy(qr->getName());
+    }
+}
+
 bool BrokerReplicator::bind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; }
 bool BrokerReplicator::unbind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; }
 bool BrokerReplicator::isBound(boost::shared_ptr<Queue>, const string* const, const framing::FieldTable* const) { return false; }

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h?rev=1357846&r1=1357845&r2=1357846&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h Thu Jul  5 19:57:25 2012
@@ -94,6 +94,7 @@ class BrokerReplicator : public broker::
 
     QueueReplicatorPtr findQueueReplicator(const std::string& qname);
     void startQueueReplicator(const boost::shared_ptr<broker::Queue>&);
+    void stopQueueReplicator(const std::string& name);
 
     std::string logPrefix;
     ReplicationTest replicationTest;

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp?rev=1357846&r1=1357845&r2=1357846&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp Thu Jul  5 19:57:25 2012
@@ -28,6 +28,7 @@
 #include "qpid/assert.h"
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/ConfigurationObserver.h"
+#include "qpid/broker/Connection.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/framing/FieldTable.h"
 #include "qpid/log/Statement.h"
@@ -161,6 +162,9 @@ void Primary::opened(broker::Connection&
         }
         haBroker.addBroker(info);
     }
+    else
+        QPID_LOG(debug, logPrefix << "Accepted client connection "
+                 << connection.getMgmtId())
 }
 
 void Primary::closed(broker::Connection& connection) {

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1357846&r1=1357845&r2=1357846&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Thu Jul  5 19:57:25 2012
@@ -94,7 +94,7 @@ void QueueReplicator::activate() {
     bridge = result.first;
 }
 
-QueueReplicator::~QueueReplicator() {}
+QueueReplicator::~QueueReplicator() { deactivate(); }
 
 void QueueReplicator::deactivate() {
     // destroy the route



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org