You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2012/07/31 15:58:36 UTC

svn commit: r1367554 - in /qpid/trunk/qpid/cpp/src/qpid/ha: BrokerReplicator.cpp HaBroker.cpp QueueReplicator.cpp QueueReplicator.h

Author: aconway
Date: Tue Jul 31 13:58:36 2012
New Revision: 1367554

URL: http://svn.apache.org/viewvc?rev=1367554&view=rev
Log:
QPID-4176: HA Error handling

Fix error handling so that backup brokers shut down on replication errors.

Previously replication errors were being thrown to the primary, breaking the replication
session. This would put the primary into an endless futile reconnect attempt.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1367554&r1=1367553&r2=1367554&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Tue Jul 31 13:58:36 2012
@@ -299,10 +299,12 @@ void BrokerReplicator::route(Deliverable
     } catch (const std::exception& e) {
         QPID_LOG(critical, logPrefix << "Configuration failed: " << e.what()
                  << ": while handling: " << list);
+        haBroker.shutdown();
         throw;
     }
 }
 
+
 void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) {
     Variant::Map argsMap = asMapVoid(values[ARGS]);
     bool autoDel = values[AUTODEL].asBool();
@@ -542,7 +544,7 @@ void BrokerReplicator::startQueueReplica
 {
     if (replicationTest.replicateLevel(queue->getSettings()) == ALL) {
         boost::shared_ptr<QueueReplicator> qr(
-            new QueueReplicator(haBroker.getBrokerInfo(), queue, link));
+            new QueueReplicator(haBroker, queue, link));
         if (!broker.getExchanges().registerExchange(qr))
             throw Exception(QPID_MSG("Duplicate queue replicator " << qr->getName()));
         qr->activate();

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1367554&r1=1367553&r2=1367554&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp Tue Jul 31 13:58:36 2012
@@ -193,7 +193,7 @@ Manageable::status_t HaBroker::Managemen
           link->setUrl(url);
           // Create a queue replicator
           boost::shared_ptr<QueueReplicator> qr(
-              new QueueReplicator(brokerInfo, queue, link));
+              new QueueReplicator(*this, queue, link));
           qr->activate();
           broker.getExchanges().registerExchange(qr);
           break;

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1367554&r1=1367553&r2=1367554&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Tue Jul 31 13:58:36 2012
@@ -19,6 +19,7 @@
  *
  */
 
+#include "HaBroker.h"
 #include "QueueReplicator.h"
 #include "ReplicatingSubscription.h"
 #include "qpid/broker/Bridge.h"
@@ -58,12 +59,13 @@ bool QueueReplicator::isEventKey(const s
     return ret;
 }
 
-QueueReplicator::QueueReplicator(const BrokerInfo& info,
+QueueReplicator::QueueReplicator(HaBroker& hb,
                                  boost::shared_ptr<Queue> q,
                                  boost::shared_ptr<Link> l)
     : Exchange(replicatorName(q->getName()), 0, q->getBroker()),
+      haBroker(hb),
       logPrefix("Backup queue "+q->getName()+": "),
-      queue(q), link(l), brokerInfo(info)
+      queue(q), link(l), brokerInfo(hb.getBrokerInfo())
 {
     Uuid uuid(true);
     bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str();
@@ -183,6 +185,7 @@ void QueueReplicator::route(Deliverable&
     }
     catch (const std::exception& e) {
         QPID_LOG(critical, logPrefix << "Replication failed: " << e.what());
+        haBroker.shutdown();
         throw;
     }
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h?rev=1367554&r1=1367553&r2=1367554&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h Tue Jul 31 13:58:36 2012
@@ -40,6 +40,7 @@ class Deliverable;
 }
 
 namespace ha {
+class HaBroker;
 
 /**
  * Exchange created on a backup broker to replicate a queue on the primary.
@@ -60,7 +61,7 @@ class QueueReplicator : public broker::E
     /** Test if a string is an event key */
     static bool isEventKey(const std::string key);
 
-    QueueReplicator(const BrokerInfo&,
+    QueueReplicator(HaBroker&,
                     boost::shared_ptr<broker::Queue> q,
                     boost::shared_ptr<broker::Link> l);
 
@@ -80,6 +81,7 @@ class QueueReplicator : public broker::E
     void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler);
     void dequeue(framing::SequenceNumber, sys::Mutex::ScopedLock&);
 
+    HaBroker& haBroker;
     std::string logPrefix;
     std::string bridgeName;
     sys::Mutex lock;



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org