You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2013/01/11 21:34:19 UTC

svn commit: r1432273 - in /qpid/trunk/qpid/cpp/src: qpid/ha/HaBroker.cpp qpid/ha/HaBroker.h qpid/ha/Primary.cpp qpid/ha/RemoteBackup.cpp qpid/ha/StatusCheck.h tests/ha_tests.py

Author: aconway
Date: Fri Jan 11 20:34:19 2013
New Revision: 1432273

URL: http://svn.apache.org/viewvc?rev=1432273&view=rev
Log:
QPID-4516: Sporadic failure in ha_tests test_failover_send_receive

Several fixes were required in the code to correct this problem:

- Missing break statement in switch.
- Remove unused function HaBroker::resetMembership
- Abort connection of timed-out backups so they can attempt to reconnect.
- New primary resets membership before allowing backups to connect.
- Test for and ignore double-promotion.
- HaBroker: dynamic logPrefix() shows status. Made status atomic for efficient access for log messages.
- Update primary status in membership.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h
    qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.h
    qpid/trunk/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1432273&r1=1432272&r2=1432273&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp Fri Jan 11 20:34:19 2013
@@ -59,8 +59,7 @@ using boost::shared_ptr;
 
 // Called in Plugin::earlyInitialize
 HaBroker::HaBroker(broker::Broker& b, const Settings& s)
-    : logPrefix("Broker: "),
-      broker(b),
+    : broker(b),
       systemId(broker.getSystem()->getSystemId().data()),
       settings(s),
       observer(new ConnectionObserver(*this, systemId)),
@@ -72,7 +71,8 @@ HaBroker::HaBroker(broker::Broker& b, co
     // otherwise there's a window for a client to connect before we get to
     // initialize()
     if (settings.cluster) {
-        QPID_LOG(debug, logPrefix << "Rejecting client connections.");
+        status = JOINING;
+        QPID_LOG(debug, logPrefix() << "Rejecting client connections.");
         shared_ptr<broker::ConnectionObserver> excluder(new BackupConnectionExcluder);
         observer->setObserver(excluder, "Backup: ");
         broker.getConnectionObservers().add(observer);
@@ -80,6 +80,16 @@ HaBroker::HaBroker(broker::Broker& b, co
 }
 
 namespace {
+const std::string PREFIX_PRIMARY("Primary(");
+const std::string PREFIX_BACKUP("Backup(");
+const std::string PREFIX_END("): ");
+}
+std::string HaBroker::logPrefix() const {
+    BrokerStatus s = status.get();
+    return (isPrimary(s) ? PREFIX_PRIMARY : PREFIX_BACKUP) + printable(s).str()+PREFIX_END;
+}
+
+namespace {
 const std::string NONE("none");
 bool isNone(const std::string& x) { return x.empty() || x == NONE; }
 }
@@ -92,7 +102,7 @@ void HaBroker::initialize() {
         broker.getSystem()->getNodeName(),
         broker.getPort(broker::Broker::TCP_TRANSPORT),
         systemId);
-    QPID_LOG(notice, logPrefix << "Initializing: " << brokerInfo);
+    QPID_LOG(notice, logPrefix() << "Initializing: " << brokerInfo);
 
     // Set up the management object.
     ManagementAgent* ma = broker.getManagementAgent();
@@ -114,7 +124,7 @@ void HaBroker::initialize() {
         status = JOINING;
         backup.reset(new Backup(*this, settings));
         broker.getKnownBrokers = boost::bind(&HaBroker::getKnownBrokers, this);
-        statusCheck.reset(new StatusCheck(logPrefix, broker.getLinkHearbeatInterval(), brokerInfo));
+        statusCheck.reset(new StatusCheck(logPrefix(), broker.getLinkHearbeatInterval(), brokerInfo));
         if (!isNone(settings.publicUrl)) setPublicUrl(Url(settings.publicUrl));
         if (!isNone(settings.brokerUrl)) setBrokerUrl(Url(settings.brokerUrl));
     }
@@ -127,7 +137,7 @@ void HaBroker::initialize() {
 }
 
 HaBroker::~HaBroker() {
-    QPID_LOG(notice, logPrefix << "Shut down");
+    QPID_LOG(notice, logPrefix() << "Shut down");
     broker.getConnectionObservers().remove(observer);
 }
 
@@ -137,6 +147,11 @@ void HaBroker::recover() {
     BrokerInfo::Set backups;
    {
         Mutex::ScopedLock l(lock);
+        if (isPrimary(status.get())) {
+            QPID_LOG(info, "Ignoring promotion, already primary: " << brokerInfo);
+            return;
+        }
+        QPID_LOG(notice, "Promoting to primary: " << brokerInfo);
         // Reset membership before allowing backups to connect.
         backups = membership.otherBackups();
         membership.reset(brokerInfo);
@@ -167,12 +182,13 @@ Manageable::status_t HaBroker::Managemen
               if (statusCheck->canPromote())
                   recover();
               else {
-                  QPID_LOG(error, logPrefix << "Cluster already active, cannot be promoted");
+                  QPID_LOG(error,
+                           logPrefix() << "Joining active cluster, cannot be promoted.");
                   throw Exception("Cluster already active, cannot be promoted.");
               }
               break;
              case CATCHUP:
-              QPID_LOG(error, logPrefix << "Still catching up, cannot be promoted.");
+              QPID_LOG(error, logPrefix() << "Still catching up, cannot be promoted.");
               throw Exception("Still catching up, cannot be promoted.");
               break;
             case READY: recover(); break;
@@ -193,7 +209,7 @@ Manageable::status_t HaBroker::Managemen
       case _qmf::HaBroker::METHOD_REPLICATE: {
           _qmf::ArgsHaBrokerReplicate& bq_args =
               dynamic_cast<_qmf::ArgsHaBrokerReplicate&>(args);
-          QPID_LOG(debug, logPrefix << "Replicate individual queue "
+          QPID_LOG(debug, logPrefix() << "Replicate individual queue "
                    << bq_args.i_queue << " from " << bq_args.i_broker);
 
           boost::shared_ptr<broker::Queue> queue = broker.getQueues().get(bq_args.i_queue);
@@ -228,7 +244,7 @@ void HaBroker::setPublicUrl(const Url& u
     mgmtObject->set_publicUrl(url.str());
     knownBrokers.clear();
     knownBrokers.push_back(url);
-    QPID_LOG(debug, logPrefix << "Setting public URL to: " << url);
+    QPID_LOG(debug, logPrefix() << "Setting public URL to: " << url);
 }
 
 void HaBroker::setBrokerUrl(const Url& url) {
@@ -237,8 +253,8 @@ void HaBroker::setBrokerUrl(const Url& u
         Mutex::ScopedLock l(lock);
         brokerUrl = url;
         mgmtObject->set_brokersUrl(brokerUrl.str());
-        QPID_LOG(info, logPrefix << "Brokers URL set to: " << url);
-        if (status == JOINING && statusCheck.get()) statusCheck->setUrl(url);
+        QPID_LOG(info, logPrefix() << "Brokers URL set to: " << url);
+        if (status.get() == JOINING && statusCheck.get()) statusCheck->setUrl(url);
         b = backup;
     }
     if (b) b->setBrokerUrl(url); // Oustside lock, avoid deadlock
@@ -250,13 +266,12 @@ std::vector<Url> HaBroker::getKnownBroke
 }
 
 void HaBroker::shutdown() {
-    QPID_LOG(critical, logPrefix << "Critical error, shutting down.");
+    QPID_LOG(critical, logPrefix() << "Critical error, shutting down.");
     broker.shutdown();
 }
 
 BrokerStatus HaBroker::getStatus() const {
-    Mutex::ScopedLock l(lock);
-    return status;
+    return status.get();
 }
 
 void HaBroker::setStatus(BrokerStatus newStatus) {
@@ -285,12 +300,12 @@ bool checkTransition(BrokerStatus from, 
 } // namespace
 
 void HaBroker::setStatus(BrokerStatus newStatus, Mutex::ScopedLock& l) {
-    QPID_LOG(info, logPrefix << "Status change: "
-             << printable(status) << " -> " << printable(newStatus));
-    bool legal = checkTransition(status, newStatus);
+    QPID_LOG(info, logPrefix() << "Status change: "
+             << printable(status.get()) << " -> " << printable(newStatus));
+    bool legal = checkTransition(status.get(), newStatus);
     if (!legal) {
-        QPID_LOG(critical, logPrefix << "Illegal state transition: "
-                 << printable(status) << " -> " << printable(newStatus));
+        QPID_LOG(critical, logPrefix() << "Illegal state transition: "
+                 << printable(status.get()) << " -> " << printable(newStatus));
         shutdown();
     }
     assert(legal);              // FIXME aconway 2012-12-07: fail
@@ -299,13 +314,15 @@ void HaBroker::setStatus(BrokerStatus ne
 }
 
 void HaBroker::statusChanged(Mutex::ScopedLock& l) {
-    mgmtObject->set_status(printable(status).str());
-    brokerInfo.setStatus(status);
+    mgmtObject->set_status(printable(status.get()).str());
+    brokerInfo.setStatus(status.get());
+    membership.add(brokerInfo);
+    membershipUpdated(l);
     setLinkProperties(l);
 }
 
 void HaBroker::membershipUpdated(Mutex::ScopedLock&) {
-    QPID_LOG(info, logPrefix << "Membership changed: " <<  membership);
+    QPID_LOG(info, logPrefix() << "Membership: " <<  membership);
     Variant::List brokers = membership.asList();
     mgmtObject->set_members(brokers);
     broker.getManagementAgent()->raiseEvent(_qmf::EventMembersUpdate(brokers));
@@ -316,23 +333,24 @@ void HaBroker::setMembership(const Varia
     {
         Mutex::ScopedLock l(lock);
         membership.assign(brokers);
-        QPID_LOG(info, logPrefix << "Membership update: " <<  membership);
         BrokerInfo info;
-        // Update my status to what the primary says it is.  The primary can toggle
-        // status between READY and CATCHUP based on the state of our subscriptions.
-        if (membership.get(systemId, info) && status != info.getStatus()) {
+        // Update my status to what the primary says it is.  The primary sets
+        // status to READY when we are caught up, and sets status to CATCHUP
+        // (from READY) if we are timed out during recovery.
+        if (membership.get(systemId, info) && status.get() != info.getStatus()) {
+            assert((status.get() == CATCHUP && info.getStatus() == READY) ||
+                   (status.get() == READY && info.getStatus() == CATCHUP));
             setStatus(info.getStatus(), l);
             b = backup;
         }
         membershipUpdated(l);
     }
-    if (b) b->setStatus(status); // Oustside lock, avoid deadlock
+    if (b) b->setStatus(status.get()); // Oustside lock, avoid deadlock
 }
 
 void HaBroker::addBroker(const BrokerInfo& b) {
     Mutex::ScopedLock l(lock);
     membership.add(b);
-    QPID_LOG(debug, logPrefix << "Membership add: " <<  b);
     membershipUpdated(l);
 }
 
@@ -341,14 +359,13 @@ void HaBroker::removeBroker(const Uuid& 
     BrokerInfo info;
     if (membership.get(id, info)) {
         membership.remove(id);
-        QPID_LOG(debug, logPrefix << "Membership remove: " <<  info);
         membershipUpdated(l);
     }
 }
 
 void HaBroker::setLinkProperties(Mutex::ScopedLock&) {
     framing::FieldTable linkProperties = broker.getLinkClientProperties();
-    if (isBackup(status)) {
+    if (isBackup(status.get())) {
         // If this is a backup then any outgoing links are backup
         // links and need to be tagged.
         linkProperties.setTable(ConnectionObserver::BACKUP_TAG, brokerInfo.asFieldTable());

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h?rev=1432273&r1=1432272&r2=1432273&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h Fri Jan 11 20:34:19 2013
@@ -32,6 +32,7 @@
 #include "qmf/org/apache/qpid/ha/HaBroker.h"
 #include "qpid/management/Manageable.h"
 #include "qpid/types/Variant.h"
+#include "qpid/sys/AtomicValue.h"
 #include <set>
 #include <boost/shared_ptr.hpp>
 
@@ -103,18 +104,16 @@ class HaBroker : public management::Mana
     void setBrokerUrl(const Url&);
     void updateClientUrl(sys::Mutex::ScopedLock&);
 
-    bool isPrimary(sys::Mutex::ScopedLock&) { return !backup.get(); }
-
     void setStatus(BrokerStatus, sys::Mutex::ScopedLock&);
     void recover();
     void statusChanged(sys::Mutex::ScopedLock&);
     void setLinkProperties(sys::Mutex::ScopedLock&);
+    std::string logPrefix() const;
 
     std::vector<Url> getKnownBrokers() const;
 
     void membershipUpdated(sys::Mutex::ScopedLock&);
 
-    std::string logPrefix;
     broker::Broker& broker;
     types::Uuid systemId;
     const Settings settings;
@@ -126,7 +125,7 @@ class HaBroker : public management::Mana
     qmf::org::apache::qpid::ha::HaBroker::shared_ptr mgmtObject;
     Url publicUrl, brokerUrl;
     std::vector<Url> knownBrokers;
-    BrokerStatus status;
+    sys::AtomicValue<BrokerStatus> status;
     BrokerInfo brokerInfo;
     Membership membership;
     ReplicationTest replicationTest;

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp?rev=1432273&r1=1432272&r2=1432273&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp Fri Jan 11 20:34:19 2013
@@ -161,6 +161,10 @@ void Primary::timeoutExpectedBackups() {
                 expectedBackups.erase(i++);
                 backups.erase(info.getSystemId());
                 rb->cancel();
+                // Downgrade the broker's status to CATCHUP
+                // The broker will get this status change when it eventually connects.
+                info.setStatus(CATCHUP);
+                haBroker.addBroker(info);
             }
             else ++i;
         }

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp?rev=1432273&r1=1432272&r2=1432273&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp Fri Jan 11 20:34:19 2013
@@ -48,6 +48,8 @@ void RemoteBackup::setCatchupQueues(brok
 RemoteBackup::~RemoteBackup() { cancel(); }
 
 void RemoteBackup::cancel() {
+    QPID_LOG(debug, logPrefix << "Cancelled " << (connection? "connected":"disconnected")
+             << " backup: " << brokerInfo);
     for (GuardMap::iterator i = guards.begin(); i != guards.end(); ++i)
         i->second->cancel();
     guards.clear();

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.h?rev=1432273&r1=1432272&r2=1432273&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.h Fri Jan 11 20:34:19 2013
@@ -32,6 +32,11 @@
 namespace qpid {
 namespace ha {
 
+// FIXME aconway 2012-12-21: This solution is incomplete. It will only protect
+// against bad promotion if there are READY brokers when this broker starts.
+// It will not help the situation where brokers became READY after this one starts.
+//
+
 /**
  * Check whether a JOINING broker can be promoted .
  *
@@ -49,8 +54,10 @@ class StatusCheck
     ~StatusCheck();
     void setUrl(const Url&);
     bool canPromote();
-    void setPromote(bool p);
+
   private:
+    void setPromote(bool p);
+
     std::string logPrefix;
     sys::Mutex lock;
     std::vector<sys::Thread> threads;

Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1432273&r1=1432272&r2=1432273&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Fri Jan 11 20:34:19 2013
@@ -1068,13 +1068,15 @@ class RecoveryTests(HaBrokerTest):
         l = LogLevel(ERROR) # Hide expected WARNING log messages from failover.
         try:
             # We don't want backups to time out for this test, set long timeout.
-            cluster = HaCluster(self, 4, args=["--ha-backup-timeout=100000"]);
+            cluster = HaCluster(self, 4, args=["--ha-backup-timeout=120"]);
             # Wait for the primary to be ready
             cluster[0].wait_status("active")
+            for b in cluster[1:4]: b.wait_status("ready")
             # Create a queue before the failure.
             s1 = cluster.connect(0).session().sender("q1;{create:always}")
             for b in cluster: b.wait_backup("q1")
             for i in xrange(100): s1.send(str(i))
+
             # Kill primary and 2 backups
             cluster[3].wait_status("ready")
             for i in [0,1,2]: cluster.kill(i, False)
@@ -1091,14 +1093,16 @@ class RecoveryTests(HaBrokerTest):
             s2 = cluster.connect(3).session().sender("q2;{create:always}")
 
             # Verify that messages sent are not completed
-            for i in xrange(100,200): s1.send(str(i), sync=False); s2.send(str(i), sync=False)
+            for i in xrange(100,200):
+                s1.send(str(i), sync=False);
+                s2.send(str(i), sync=False)
             assertSyncTimeout(s1)
             self.assertEqual(s1.unsettled(), 100)
             assertSyncTimeout(s2)
             self.assertEqual(s2.unsettled(), 100)
 
             # Verify we can receive even if sending is on hold:
-            cluster[3].assert_browse("q1", [str(i) for i in range(100)+range(100,200)])
+            cluster[3].assert_browse("q1", [str(i) for i in range(200)])
 
             # Restart backups, verify queues are released only when both backups are up
             cluster.restart(1)
@@ -1106,11 +1110,10 @@ class RecoveryTests(HaBrokerTest):
             self.assertEqual(s1.unsettled(), 100)
             assertSyncTimeout(s2)
             self.assertEqual(s2.unsettled(), 100)
-            self.assertEqual(cluster[3].ha_status(), "recovering")
             cluster.restart(2)
 
             # Verify everything is up to date and active
-            def settled(sender): sender.sync(); return sender.unsettled() == 0;
+            def settled(sender): sender.sync(timeout=1); return sender.unsettled() == 0;
             assert retry(lambda: settled(s1)), "Unsetttled=%s"%(s1.unsettled())
             assert retry(lambda: settled(s2)), "Unsetttled=%s"%(s2.unsettled())
             cluster[1].assert_browse_backup("q1", [str(i) for i in range(100)+range(100,200)])



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org