You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2012/07/26 22:08:30 UTC
svn commit: r1366179 - in /qpid/trunk/qpid/cpp/src: qpid/ha/HaBroker.cpp
qpid/ha/HaBroker.h qpid/ha/Primary.cpp qpid/ha/RemoteBackup.cpp
qpid/ha/RemoteBackup.h qpid/ha/ReplicatingSubscription.cpp tests/ha_tests.py
Author: aconway
Date: Thu Jul 26 20:08:29 2012
New Revision: 1366179
URL: http://svn.apache.org/viewvc?rev=1366179&view=rev
Log:
QPID-4159: HA missing messages in failover test.
Fix test_failover_send_receive showing missing messages. With this fix,
ran with -DDURATION=2 overnight with no failures.
- Primary, RemoteBackup: Only report "ready" once per remote backup.
- HaBroker: Put membership updates under mutex.
- ReplicatingSubscription: Check for backup missing messages at the front.
- ha_tests.py: Added assertion to test_priority_ring, verify primary queue as expected.
Modified:
qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h
qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.h
qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
qpid/trunk/qpid/cpp/src/tests/ha_tests.py
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1366179&r1=1366178&r2=1366179&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp Thu Jul 26 20:08:29 2012
@@ -290,8 +290,8 @@ void HaBroker::statusChanged(Mutex::Scop
setLinkProperties(l);
}
-void HaBroker::membershipUpdated(const Variant::List& brokers) {
- // No lock, these are thread-safe.
+void HaBroker::membershipUpdated(Mutex::ScopedLock&) {
+ Variant::List brokers = membership.asList();
mgmtObject->set_members(brokers);
broker.getManagementAgent()->raiseEvent(_qmf::EventMembersUpdate(brokers));
}
@@ -304,37 +304,28 @@ void HaBroker::setMembership(const Varia
// Update my status to what the primary says.
if (membership.get(systemId, info) && status != info.getStatus())
setStatus(info.getStatus(), l);
- membershipUpdated(brokers);
+ membershipUpdated(l);
}
void HaBroker::resetMembership(const BrokerInfo& b) {
- Variant::List members;
- {
- Mutex::ScopedLock l(lock);
- membership.reset(b);
- members = membership.asList();
- }
- membershipUpdated(members);
+ Mutex::ScopedLock l(lock);
+ membership.reset(b);
+ QPID_LOG(debug, logPrefix << "Membership reset to: " << membership);
+ membershipUpdated(l);
}
void HaBroker::addBroker(const BrokerInfo& b) {
- Variant::List members;
- {
- Mutex::ScopedLock l(lock);
- membership.add(b);
- members = membership.asList();
- }
- membershipUpdated(members);
+ Mutex::ScopedLock l(lock);
+ membership.add(b);
+ QPID_LOG(debug, logPrefix << "Membership add: " << b << " now: " << membership);
+ membershipUpdated(l);
}
void HaBroker::removeBroker(const Uuid& id) {
- Variant::List members;
- {
- Mutex::ScopedLock l(lock);
- membership.remove(id);
- members = membership.asList();
- }
- membershipUpdated(members);
+ Mutex::ScopedLock l(lock);
+ membership.remove(id);
+ QPID_LOG(debug, logPrefix << "Membership remove: " << id << " now: " << membership);
+ membershipUpdated(l);
}
void HaBroker::setLinkProperties(Mutex::ScopedLock&) {
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h?rev=1366179&r1=1366178&r2=1366179&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h Thu Jul 26 20:08:29 2012
@@ -111,7 +111,7 @@ class HaBroker : public management::Mana
std::vector<Url> getKnownBrokers() const;
- void membershipUpdated(const types::Variant::List&);
+ void membershipUpdated(sys::Mutex::ScopedLock&);
std::string logPrefix;
broker::Broker& broker;
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp?rev=1366179&r1=1366178&r2=1366179&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp Thu Jul 26 20:08:29 2012
@@ -127,7 +127,7 @@ void Primary::checkReady(Mutex::ScopedLo
}
void Primary::checkReady(BackupMap::iterator i, Mutex::ScopedLock& l) {
- if (i != backups.end() && i->second->isReady()) {
+ if (i != backups.end() && i->second->reportReady()) {
BrokerInfo info = i->second->getBrokerInfo();
info.setStatus(READY);
haBroker.addBroker(info);
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp?rev=1366179&r1=1366178&r2=1366179&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp Thu Jul 26 20:08:29 2012
@@ -33,7 +33,7 @@ using boost::bind;
RemoteBackup::RemoteBackup(const BrokerInfo& info, ReplicationTest rt, bool con) :
logPrefix("Primary remote backup "+info.getLogId()+": "),
- brokerInfo(info), replicationTest(rt), connected(con)
+ brokerInfo(info), replicationTest(rt), connected(con), reportedReady(false)
{}
void RemoteBackup::setInitialQueues(broker::QueueRegistry& queues, bool createGuards)
@@ -109,4 +109,12 @@ void RemoteBackup::queueDestroy(const Qu
}
}
+bool RemoteBackup::reportReady() {
+ if (!reportedReady && isReady()) {
+ reportedReady = true;
+ return true;
+ }
+ return false;
+}
+
}} // namespace qpid::ha
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.h?rev=1366179&r1=1366178&r2=1366179&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.h Thu Jul 26 20:08:29 2012
@@ -83,6 +83,9 @@ class RemoteBackup
/**@return true when all initial queues for this backup are ready. */
bool isReady();
+ /**@return true if isReady() and this is the first call to reportReady */
+ bool reportReady();
+
/**Cancel all queue guards, called if we are timed out. */
void cancel();
@@ -100,6 +103,7 @@ class RemoteBackup
GuardMap guards;
QueueSet initialQueues;
bool connected;
+ bool reportedReady;
};
}} // namespace qpid::ha
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1366179&r1=1366178&r2=1366179&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp Thu Jul 26 20:08:29 2012
@@ -184,16 +184,17 @@ ReplicatingSubscription::ReplicatingSubs
if (!guard) guard.reset(new QueueGuard(*queue, info));
guard->attach(*this);
- QueueRange backup(arguments); // The remote backup state.
- QueueRange primary(guard->getRange()); // The local state at the time the guard was set.
+ QueueRange backup(arguments); // Remote backup range.
+ QueueRange primary(guard->getRange()); // Unguarded range when the guard was set.
backupPosition = backup.back;
// Sync backup and primary queues, don't send messages already on the backup
- if (backup.back < primary.front || backup.front > primary.back
- || primary.empty() || backup.empty())
+ if (backup.front > primary.front || // Missing messages at front
+ backup.back < primary.front || // No overlap
+ primary.empty() || backup.empty()) // Empty
{
- // No overlap - erase backup and start from the beginning
+ // No useful overlap - erase backup and start from the beginning
if (!backup.empty()) dequeued(backup.front, backup.back);
position = primary.front-1;
}
Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1366179&r1=1366178&r2=1366179&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Thu Jul 26 20:08:29 2012
@@ -588,8 +588,10 @@ class ReplicationTests(BrokerTest):
# correct result, the uncommented one is for the actualy buggy
# result. See https://issues.apache.org/jira/browse/QPID-3866
#
- # backup.assert_browse_backup("q", sorted(priorities,reverse=True)[0:5], transform=lambda m: m.priority)
- backup.assert_browse_backup("q", [9,9,9,9,2], transform=lambda m: m.priority)
+ # expect = sorted(priorities,reverse=True)[0:5]
+ expect = [9,9,9,9,2]
+ primary.assert_browse("q", expect, transform=lambda m: m.priority)
+ backup.assert_browse_backup("q", expect, transform=lambda m: m.priority)
def test_backup_acquired(self):
"""Verify that acquired messages are backed up, for all queue types."""
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org