You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2012/07/26 22:08:30 UTC

svn commit: r1366179 - in /qpid/trunk/qpid/cpp/src: qpid/ha/HaBroker.cpp qpid/ha/HaBroker.h qpid/ha/Primary.cpp qpid/ha/RemoteBackup.cpp qpid/ha/RemoteBackup.h qpid/ha/ReplicatingSubscription.cpp tests/ha_tests.py

Author: aconway
Date: Thu Jul 26 20:08:29 2012
New Revision: 1366179

URL: http://svn.apache.org/viewvc?rev=1366179&view=rev
Log:
QPID-4159: HA missing messages in failover test.

Fix test_failover_send_receive showing missing messages. With this fix,
ran with -DDURATION=2 overnight with no failures.

- Primary, RemoteBackup: Only report "ready" once per remote backup.
- HaBroker: Put membership updates under mutex.
- ReplicatingSubscription: Check for backup missing messages at the front.
- ha_tests.py: Added assertion to test_priority_ring, verify primary queue as expected.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h
    qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.h
    qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
    qpid/trunk/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1366179&r1=1366178&r2=1366179&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp Thu Jul 26 20:08:29 2012
@@ -290,8 +290,8 @@ void HaBroker::statusChanged(Mutex::Scop
     setLinkProperties(l);
 }
 
-void HaBroker::membershipUpdated(const Variant::List& brokers) {
-    // No lock, these are thread-safe.
+void HaBroker::membershipUpdated(Mutex::ScopedLock&) {
+    Variant::List brokers = membership.asList();
     mgmtObject->set_members(brokers);
     broker.getManagementAgent()->raiseEvent(_qmf::EventMembersUpdate(brokers));
 }
@@ -304,37 +304,28 @@ void HaBroker::setMembership(const Varia
     // Update my status to what the primary says.
     if (membership.get(systemId, info) && status != info.getStatus())
         setStatus(info.getStatus(), l);
-    membershipUpdated(brokers);
+    membershipUpdated(l);
 }
 
 void HaBroker::resetMembership(const BrokerInfo& b) {
-    Variant::List members;
-    {
-        Mutex::ScopedLock l(lock);
-        membership.reset(b);
-        members = membership.asList();
-    }
-    membershipUpdated(members);
+    Mutex::ScopedLock l(lock);
+    membership.reset(b);
+    QPID_LOG(debug, logPrefix << "Membership reset to: " <<  membership);
+    membershipUpdated(l);
 }
 
 void HaBroker::addBroker(const BrokerInfo& b) {
-    Variant::List members;
-    {
-        Mutex::ScopedLock l(lock);
-        membership.add(b);
-        members = membership.asList();
-    }
-    membershipUpdated(members);
+    Mutex::ScopedLock l(lock);
+    membership.add(b);
+    QPID_LOG(debug, logPrefix << "Membership add: " <<  b << " now: " << membership);
+    membershipUpdated(l);
 }
 
 void HaBroker::removeBroker(const Uuid& id) {
-    Variant::List members;
-    {
-        Mutex::ScopedLock l(lock);
-        membership.remove(id);
-        members = membership.asList();
-    }
-    membershipUpdated(members);
+    Mutex::ScopedLock l(lock);
+    membership.remove(id);
+    QPID_LOG(debug, logPrefix << "Membership remove: " <<  id << " now: " << membership);
+    membershipUpdated(l);
 }
 
 void HaBroker::setLinkProperties(Mutex::ScopedLock&) {

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h?rev=1366179&r1=1366178&r2=1366179&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h Thu Jul 26 20:08:29 2012
@@ -111,7 +111,7 @@ class HaBroker : public management::Mana
 
     std::vector<Url> getKnownBrokers() const;
 
-    void membershipUpdated(const types::Variant::List&);
+    void membershipUpdated(sys::Mutex::ScopedLock&);
 
     std::string logPrefix;
     broker::Broker& broker;

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp?rev=1366179&r1=1366178&r2=1366179&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp Thu Jul 26 20:08:29 2012
@@ -127,7 +127,7 @@ void Primary::checkReady(Mutex::ScopedLo
 }
 
 void Primary::checkReady(BackupMap::iterator i, Mutex::ScopedLock& l)  {
-    if (i != backups.end() && i->second->isReady()) {
+    if (i != backups.end() && i->second->reportReady()) {
         BrokerInfo info = i->second->getBrokerInfo();
         info.setStatus(READY);
         haBroker.addBroker(info);

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp?rev=1366179&r1=1366178&r2=1366179&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp Thu Jul 26 20:08:29 2012
@@ -33,7 +33,7 @@ using boost::bind;
 
 RemoteBackup::RemoteBackup(const BrokerInfo& info, ReplicationTest rt, bool con) :
     logPrefix("Primary remote backup "+info.getLogId()+": "),
-    brokerInfo(info), replicationTest(rt), connected(con)
+    brokerInfo(info), replicationTest(rt), connected(con), reportedReady(false)
 {}
 
 void RemoteBackup::setInitialQueues(broker::QueueRegistry& queues, bool createGuards)
@@ -109,4 +109,12 @@ void RemoteBackup::queueDestroy(const Qu
     }
 }
 
+bool RemoteBackup::reportReady() {
+    if (!reportedReady && isReady()) {
+        reportedReady = true;
+        return true;
+    }
+    return false;
+}
+
 }} // namespace qpid::ha

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.h?rev=1366179&r1=1366178&r2=1366179&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.h Thu Jul 26 20:08:29 2012
@@ -83,6 +83,9 @@ class RemoteBackup
     /**@return true when all initial queues for this backup are ready. */
     bool isReady();
 
+    /**@return true if isReady() and this is the first call to reportReady */
+    bool reportReady();
+
     /**Cancel all queue guards, called if we are timed out. */
     void cancel();
 
@@ -100,6 +103,7 @@ class RemoteBackup
     GuardMap guards;
     QueueSet initialQueues;
     bool connected;
+    bool reportedReady;
 };
 
 }} // namespace qpid::ha

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1366179&r1=1366178&r2=1366179&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp Thu Jul 26 20:08:29 2012
@@ -184,16 +184,17 @@ ReplicatingSubscription::ReplicatingSubs
         if (!guard) guard.reset(new QueueGuard(*queue, info));
         guard->attach(*this);
 
-        QueueRange backup(arguments); // The remote backup state.
-        QueueRange primary(guard->getRange()); // The local state at the time the guard was set.
+        QueueRange backup(arguments); // Remote backup range.
+        QueueRange primary(guard->getRange()); // Unguarded range when the guard was set.
         backupPosition = backup.back;
 
         // Sync backup and primary queues, don't send messages already on the backup
 
-        if (backup.back < primary.front || backup.front > primary.back
-            || primary.empty() || backup.empty())
+        if (backup.front > primary.front || // Missing messages at front
+            backup.back < primary.front ||  // No overlap
+            primary.empty() || backup.empty()) // Empty
         {
-            // No overlap - erase backup and start from the beginning
+            // No useful overlap - erase backup and start from the beginning
             if (!backup.empty()) dequeued(backup.front, backup.back);
             position = primary.front-1;
         }

Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1366179&r1=1366178&r2=1366179&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Thu Jul 26 20:08:29 2012
@@ -588,8 +588,10 @@ class ReplicationTests(BrokerTest):
         # correct result, the uncommented one is for the actualy buggy
         # result.  See https://issues.apache.org/jira/browse/QPID-3866
         #
-        # backup.assert_browse_backup("q", sorted(priorities,reverse=True)[0:5], transform=lambda m: m.priority)
-        backup.assert_browse_backup("q", [9,9,9,9,2], transform=lambda m: m.priority)
+        # expect = sorted(priorities,reverse=True)[0:5]
+        expect = [9,9,9,9,2]
+        primary.assert_browse("q", expect, transform=lambda m: m.priority)
+        backup.assert_browse_backup("q", expect, transform=lambda m: m.priority)
 
     def test_backup_acquired(self):
         """Verify that acquired messages are backed up, for all queue types."""



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org