You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2012/07/17 23:23:12 UTC

svn commit: r1362658 - in /qpid/trunk/qpid/cpp/src: qpid/ha/BrokerReplicator.cpp qpid/ha/HaBroker.cpp qpid/ha/Primary.cpp qpid/ha/QueueGuard.cpp qpid/ha/QueueReplicator.cpp qpid/ha/RemoteBackup.cpp tests/ha_tests.py

Author: aconway
Date: Tue Jul 17 21:23:12 2012
New Revision: 1362658

URL: http://svn.apache.org/viewvc?rev=1362658&view=rev
Log:
QPID-4145: HA Minor fixes to recovery 

- Demote timed-out backups from ready to catch-up.
- Don't cancel connected backups on timeout, only disconnected ones.
- Don't allow promotion of a catch-up broker.
- Minor logging improvement.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
    qpid/trunk/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1362658&r1=1362657&r2=1362658&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Tue Jul 17 21:23:12 2012
@@ -536,7 +536,7 @@ const string REPLICATE_DEFAULT="replicat
 // Received the ha-broker configuration object for the primary broker.
 void BrokerReplicator::doResponseHaBroker(Variant::Map& values) {
     try {
-        QPID_LOG(debug, logPrefix << "HA Broker response: " << values);
+        QPID_LOG(trace, logPrefix << "HA Broker response: " << values);
         ReplicateLevel mine = haBroker.getSettings().replicateDefault.get();
         ReplicateLevel primary = replicationTest.replicateLevel(
             values[REPLICATE_DEFAULT].asString());
@@ -545,7 +545,8 @@ void BrokerReplicator::doResponseHaBroke
                                      << ") does not match primary (" <<  primary << ")"));
         haBroker.setMembership(values[MEMBERS].asList());
     } catch (const std::exception& e) {
-        QPID_LOG(critical, logPrefix << "Invalid HA Broker response: " << e.what());
+        QPID_LOG(critical, logPrefix << "Invalid HA Broker response: " << e.what()
+                 << ": " << values);
         haBroker.shutdown();
     }
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1362658&r1=1362657&r2=1362658&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp Tue Jul 17 21:23:12 2012
@@ -141,10 +141,8 @@ Manageable::status_t HaBroker::Managemen
           switch (getStatus()) {
             case JOINING: recover(); break;
             case CATCHUP:
-              // FIXME aconway 2012-04-27: don't allow promotion in catch-up
-              // QPID_LOG(error, logPrefix << "Still catching up, cannot be promoted.");
-              // throw Exception("Still catching up, cannot be promoted.");
-              recover();
+              QPID_LOG(error, logPrefix << "Still catching up, cannot be promoted.");
+              throw Exception("Still catching up, cannot be promoted.");
               break;
             case READY: recover(); break;
             case RECOVERING: break;
@@ -243,12 +241,12 @@ namespace {
 bool checkTransition(BrokerStatus from, BrokerStatus to) {
     // Legal state transitions. Initial state is JOINING, ACTIVE is terminal.
     static const BrokerStatus TRANSITIONS[][2] = {
-        { CATCHUP, RECOVERING }, // FIXME aconway 2012-04-27: illegal transition, allow while fixing behavior
-        { JOINING, CATCHUP },   // Connected to primary
-        { JOINING, RECOVERING },    // Chosen as initial primary.
-        { CATCHUP, READY },     // Caught up all queues, ready to take over.
+        { JOINING, CATCHUP },    // Connected to primary
+        { JOINING, RECOVERING }, // Chosen as initial primary.
+        { CATCHUP, READY },      // Caught up all queues, ready to take over.
         { READY, RECOVERING },   // Chosen as new primary
-        { RECOVERING, ACTIVE }
+        { READY, CATCHUP },      // Timed out failing over, demoted to catch-up.
+        { RECOVERING, ACTIVE }   // All expected backups are ready
     };
     static const size_t N = sizeof(TRANSITIONS)/sizeof(TRANSITIONS[0]);
     for (size_t i = 0; i < N; ++i) {
@@ -290,10 +288,9 @@ void HaBroker::setMembership(const Varia
     membership.assign(brokers);
     QPID_LOG(debug, logPrefix << "Membership update: " <<  membership);
     BrokerInfo info;
-    // Check if my own status has been updated to READY
-    if (getStatus() == CATCHUP &&
-        membership.get(systemId, info) && info.getStatus() == READY)
-        setStatus(READY, l);
+    // Update my status to what the primary thinks.
+    if (membership.get(systemId, info) && status != info.getStatus())
+        setStatus(info.getStatus(), l);
     membershipUpdated(brokers);
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp?rev=1362658&r1=1362657&r2=1362658&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp Tue Jul 17 21:23:12 2012
@@ -140,12 +140,23 @@ void Primary::checkReady(BackupMap::iter
 void Primary::timeoutExpectedBackups() {
     sys::Mutex::ScopedLock l(lock);
     if (active) return;         // Already activated
-    for (BackupSet::iterator i = expectedBackups.begin(); i != expectedBackups.end(); ++i)
+    // Remove records for any expectedBackups that are not yet connected
+    // Allow backups that are connected to continue becoming ready.
+    for (BackupSet::iterator i = expectedBackups.begin(); i != expectedBackups.end();)
     {
-        QPID_LOG(error, "Expected backup timed out: " << (*i)->getBrokerInfo());
-        (*i)->cancel();
+        boost::shared_ptr<RemoteBackup> rb = *i;
+        if (!rb->isConnected()) {
+            BrokerInfo info = rb->getBrokerInfo();
+            QPID_LOG(error, "Expected backup timed out: " << info);
+            expectedBackups.erase(i++);
+            backups.erase(info.getSystemId());
+            rb->cancel();
+            // Downgrade the broker to CATCHUP
+            info.setStatus(CATCHUP);
+            haBroker.addBroker(info);
+        }
+        else ++i;
     }
-    expectedBackups.clear();
     checkReady(l);
 }
 
@@ -191,6 +202,7 @@ void Primary::opened(broker::Connection&
             i->second->setConnected(true);
             checkReady(i, l);
         }
+        if (info.getStatus() == JOINING) info.setStatus(CATCHUP);
         haBroker.addBroker(info);
     }
     else
@@ -218,6 +230,7 @@ void Primary::closed(broker::Connection&
 
 boost::shared_ptr<QueueGuard> Primary::getGuard(const QueuePtr& q, const BrokerInfo& info)
 {
+    Mutex::ScopedLock l(lock);
     BackupMap::iterator i = backups.find(info.getSystemId());
     return i == backups.end() ? boost::shared_ptr<QueueGuard>() : i->second->guard(q);
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp?rev=1362658&r1=1362657&r2=1362658&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp Tue Jul 17 21:23:12 2012
@@ -135,6 +135,4 @@ void QueueGuard::complete(const QueuedMe
     qm.payload->getIngressCompletion().finishCompleter();
 }
 
-// FIXME aconway 2012-06-04: TODO support for timeout.
-
 }} // namespaces qpid::ha

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1362658&r1=1362657&r2=1362658&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Tue Jul 17 21:23:12 2012
@@ -123,7 +123,7 @@ void QueueReplicator::initializeBridge(B
     peer.getMessage().subscribe(
         args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/,
         false/*exclusive*/, "", 0, settings);
-    // FIXME aconway 2012-05-22: use a finite credit window
+    // FIXME aconway 2012-05-22: use a finite credit window?
     peer.getMessage().flow(getName(), 0, 0xFFFFFFFF);
     peer.getMessage().flow(getName(), 1, 0xFFFFFFFF);
 

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp?rev=1362658&r1=1362657&r2=1362658&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp Tue Jul 17 21:23:12 2012
@@ -29,6 +29,7 @@ namespace qpid {
 namespace ha {
 
 using sys::Mutex;
+using boost::bind;
 
 RemoteBackup::RemoteBackup(const BrokerInfo& info, ReplicationTest rt, bool con) :
     logPrefix("Primary remote backup "+info.getLogId()+": "),
@@ -43,7 +44,11 @@ void RemoteBackup::createGuards(broker::
 
 RemoteBackup::~RemoteBackup() { cancel(); }
 
-void RemoteBackup::cancel() { guards.clear(); }
+void RemoteBackup::cancel() {
+    for_each(guards.begin(), guards.end(),
+             bind(&QueueGuard::cancel, bind(&GuardMap::value_type::second, _1)));
+    guards.clear();
+}
 
 bool RemoteBackup::isReady() {
     return connected && initialQueues.empty();
@@ -70,9 +75,11 @@ namespace {
 typedef std::set<boost::shared_ptr<broker::Queue> > QS;
 struct QueueSetPrinter {
     const QS& qs;
-    QueueSetPrinter(const QS& q) : qs(q) {}
+    std::string prefix;
+    QueueSetPrinter(const std::string& p, const QS& q) : qs(q), prefix(p) {}
 };
 std::ostream& operator<<(std::ostream& o, const QueueSetPrinter& qp) {
+    if (!qp.qs.empty()) o << qp.prefix;
     for (QS::const_iterator i = qp.qs.begin(); i != qp.qs.end(); ++i)
         o << (*i)->getName() << " ";
     return o;
@@ -82,7 +89,7 @@ std::ostream& operator<<(std::ostream& o
 void RemoteBackup::ready(const QueuePtr& q) {
     initialQueues.erase(q);
     QPID_LOG(debug, logPrefix << "Queue ready: " << q->getName()
-             << " remaining unready: " << QueueSetPrinter(initialQueues));
+             <<  QueueSetPrinter(", waiting for: ", initialQueues));
     if (isReady()) QPID_LOG(debug, logPrefix << "All queues ready");
 }
 

Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1362658&r1=1362657&r2=1362658&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Tue Jul 17 21:23:12 2012
@@ -211,7 +211,7 @@ class HaCluster(object):
         if promote_next: self[(i+1) % len(self)].promote()
 
     def restart(self, i):
-        """Start a broker with the same name and data directory. It will get
+        """Start a broker with the same port, name and data directory. It will get
         a separate log file: foo.n.log"""
         b = self._brokers[i]
         self._brokers[i] = HaBroker(
@@ -956,7 +956,8 @@ class RecoveryTests(BrokerTest):
 
     def test_expected_backup_timeout(self):
         """Verify that we time-out expected backups and release held queues
-        after a configured interval
+        after a configured interval. Verify backup is demoted to catch-up,
+        but can still rejoin.
         """
         cluster = HaCluster(self, 3, args=["--ha-backup-timeout=0.5"]);
         cluster[0].wait_status("active") # Primary ready



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org