You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2013/06/21 17:02:09 UTC
svn commit: r1495466 - in /qpid/trunk/qpid/cpp/src: qpid/ha/ tests/
Author: aconway
Date: Fri Jun 21 15:02:08 2013
New Revision: 1495466
URL: http://svn.apache.org/r1495466
Log:
QPID-4944: HA Sporadic failure in ha_tests: tes_failover_send_receive and test_expected_backup_timeout
Very sporadic failures so difficult to verify the fix.
- Simplified Membership, centralized status change, make it atomic.
- Fix test bug in test_expected_backup_timeout: not waiting on final status check, race.
- Remove out-of-date status info from log prefixes: Guard, ReplicatingSubscription
Modified:
qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.h
qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/Membership.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/Membership.h
qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
qpid/trunk/qpid/cpp/src/tests/ha_test.py
qpid/trunk/qpid/cpp/src/tests/ha_tests.py
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.cpp?rev=1495466&r1=1495465&r2=1495466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.cpp Fri Jun 21 15:02:08 2013
@@ -91,13 +91,16 @@ void BrokerInfo::assign(const Variant::M
status = BrokerStatus(get(m, STATUS).asUint8());
}
-std::ostream& operator<<(std::ostream& o, const BrokerInfo& b) {
- o << b.getSystemId().str().substr(0,8);
- if (b.getAddress() != empty) o << "@" << b.getAddress();
- o << "(" << printable(b.getStatus()) << ")";
+std::ostream& BrokerInfo::printId(std::ostream& o) const {
+ o << getSystemId().str().substr(0,8);
+ if (getAddress() != empty) o << "@" << getAddress();
return o;
}
+std::ostream& operator<<(std::ostream& o, const BrokerInfo& b) {
+ return b.printId(o) << "(" << printable(b.getStatus()) << ")";
+}
+
std::ostream& operator<<(std::ostream& o, const BrokerInfo::Set& infos) {
std::ostream_iterator<BrokerInfo> out(o, " ");
copy(infos.begin(), infos.end(), out);
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.h?rev=1495466&r1=1495465&r2=1495466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.h Fri Jun 21 15:02:08 2013
@@ -64,6 +64,9 @@ class BrokerInfo
// So it can be put in a set.
bool operator<(const BrokerInfo x) const { return systemId < x.systemId; }
+ // Print just the identifying information, not the status.
+ std::ostream& printId(std::ostream& o) const;
+
private:
Address address;
types::Uuid systemId;
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1495466&r1=1495465&r2=1495466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Fri Jun 21 15:02:08 2013
@@ -902,24 +902,7 @@ void BrokerReplicator::disconnected() {
}
void BrokerReplicator::setMembership(const Variant::List& brokers) {
- Membership& membership(haBroker.getMembership());
- membership.assign(brokers);
- // Check if the primary has signalled a change in my status:
- // from CATCHUP to READY when we are caught up.
- // from READY TO CATCHUP if we are timed out during fail-over.
- BrokerInfo info;
- if (membership.get(membership.getSelf(), info)) {
- BrokerStatus oldStatus = haBroker.getStatus();
- BrokerStatus newStatus = info.getStatus();
- if (oldStatus == CATCHUP && newStatus == READY) {
- QPID_LOG(info, logPrefix << logPrefix << "Caught-up and ready");
- haBroker.getMembership().setStatus(READY);
- }
- else if (oldStatus == READY && newStatus == CATCHUP) {
- QPID_LOG(info, logPrefix << logPrefix << "No longer ready, catching up");
- haBroker.getMembership().setStatus(CATCHUP);
- }
- }
+ haBroker.getMembership().assign(brokers);
}
}} // namespace broker
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Membership.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Membership.cpp?rev=1495466&r1=1495465&r2=1495466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Membership.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Membership.cpp Fri Jun 21 15:02:08 2013
@@ -43,6 +43,7 @@ Membership::Membership(const BrokerInfo&
: haBroker(b), self(info.getSystemId())
{
brokers[self] = info;
+ oldStatus = info.getStatus();
}
void Membership::clear() {
@@ -54,6 +55,7 @@ void Membership::clear() {
void Membership::add(const BrokerInfo& b) {
Mutex::ScopedLock l(lock);
+ assert(b.getSystemId() != self);
brokers[b.getSystemId()] = b;
update(l);
}
@@ -86,6 +88,10 @@ void Membership::assign(const types::Var
types::Variant::List Membership::asList() const {
Mutex::ScopedLock l(lock);
+ return asList(l);
+}
+
+types::Variant::List Membership::asList(sys::Mutex::ScopedLock&) const {
types::Variant::List list;
for (BrokerInfo::Map::const_iterator i = brokers.begin(); i != brokers.end(); ++i)
list.push_back(i->second.asMap());
@@ -109,18 +115,43 @@ bool Membership::get(const types::Uuid&
return true;
}
+namespace {
+bool checkTransition(BrokerStatus from, BrokerStatus to) {
+ // Legal state transitions. Initial state is JOINING, ACTIVE is terminal.
+ static const BrokerStatus TRANSITIONS[][2] = {
+ { STANDALONE, JOINING }, // Initialization of backup broker
+ { JOINING, CATCHUP }, // Connected to primary
+ { JOINING, RECOVERING }, // Chosen as initial primary.
+ { CATCHUP, READY }, // Caught up all queues, ready to take over.
+ { READY, RECOVERING }, // Chosen as new primary
+ { READY, CATCHUP }, // Timed out failing over, demoted to catch-up.
+ { RECOVERING, ACTIVE } // All expected backups are ready
+ };
+ static const size_t N = sizeof(TRANSITIONS)/sizeof(TRANSITIONS[0]);
+ for (size_t i = 0; i < N; ++i) {
+ if (TRANSITIONS[i][0] == from && TRANSITIONS[i][1] == to)
+ return true;
+ }
+ return false;
+}
+} // namespace
+
+
void Membership::update(Mutex::ScopedLock& l) {
QPID_LOG(info, "Membership: " << brokers);
// Update managment and send update event.
- Variant::List brokerList = asList();
- if (mgmtObject) mgmtObject->set_status(printable(getStatus(l)).str());
- if (mgmtObject) mgmtObject->set_members(brokerList);
+ BrokerStatus newStatus = getStatus(l);
+ Variant::List brokerList = asList(l);
+ if (mgmtObject) {
+ mgmtObject->set_status(printable(newStatus).str());
+ mgmtObject->set_members(brokerList);
+ }
haBroker.getBroker().getManagementAgent()->raiseEvent(
_qmf::EventMembersUpdate(brokerList));
// Update link client properties
framing::FieldTable linkProperties = haBroker.getBroker().getLinkClientProperties();
- if (isBackup(getStatus(l))) {
+ if (isBackup(newStatus)) {
// Set backup tag on outgoing link properties.
linkProperties.setTable(
ConnectionObserver::BACKUP_TAG, brokers[types::Uuid(self)].asFieldTable());
@@ -130,6 +161,17 @@ void Membership::update(Mutex::ScopedLoc
linkProperties.erase(ConnectionObserver::BACKUP_TAG);
haBroker.getBroker().setLinkClientProperties(linkProperties);
}
+
+ // Check status transitions
+ if (oldStatus != newStatus) {
+ QPID_LOG(info, "Status change: "
+ << printable(oldStatus) << " -> " << printable(newStatus));
+ if (!checkTransition(oldStatus, newStatus)) {
+ haBroker.shutdown(QPID_MSG("Illegal state transition: " << printable(oldStatus)
+ << " -> " << printable(newStatus)));
+ }
+ oldStatus = newStatus;
+ }
}
void Membership::setMgmtObject(boost::shared_ptr<_qmf::HaBroker> mo) {
@@ -139,40 +181,9 @@ void Membership::setMgmtObject(boost::sh
}
-namespace {
-bool checkTransition(BrokerStatus from, BrokerStatus to) {
- // Legal state transitions. Initial state is JOINING, ACTIVE is terminal.
- static const BrokerStatus TRANSITIONS[][2] = {
- { STANDALONE, JOINING }, // Initialization of backup broker
- { JOINING, CATCHUP }, // Connected to primary
- { JOINING, RECOVERING }, // Chosen as initial primary.
- { CATCHUP, READY }, // Caught up all queues, ready to take over.
- { READY, RECOVERING }, // Chosen as new primary
- { READY, CATCHUP }, // Timed out failing over, demoted to catch-up.
- { RECOVERING, ACTIVE } // All expected backups are ready
- };
- static const size_t N = sizeof(TRANSITIONS)/sizeof(TRANSITIONS[0]);
- for (size_t i = 0; i < N; ++i) {
- if (TRANSITIONS[i][0] == from && TRANSITIONS[i][1] == to)
- return true;
- }
- return false;
-}
-} // namespace
-
void Membership::setStatus(BrokerStatus newStatus) {
- BrokerStatus status = getStatus();
- QPID_LOG(info, "Status change: "
- << printable(status) << " -> " << printable(newStatus));
- bool legal = checkTransition(status, newStatus);
- if (!legal) {
- haBroker.shutdown(QPID_MSG("Illegal state transition: " << printable(status)
- << " -> " << printable(newStatus)));
- }
-
Mutex::ScopedLock l(lock);
brokers[self].setStatus(newStatus);
- if (mgmtObject) mgmtObject->set_status(printable(newStatus).str());
update(l);
}
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Membership.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Membership.h?rev=1495466&r1=1495465&r2=1495466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Membership.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Membership.h Fri Jun 21 15:02:08 2013
@@ -83,12 +83,14 @@ class Membership
private:
void update(sys::Mutex::ScopedLock&);
BrokerStatus getStatus(sys::Mutex::ScopedLock&) const;
+ types::Variant::List asList(sys::Mutex::ScopedLock&) const;
mutable sys::Mutex lock;
HaBroker& haBroker;
boost::shared_ptr<qmf::org::apache::qpid::ha::HaBroker> mgmtObject;
const types::Uuid self;
BrokerInfo::Map brokers;
+ BrokerStatus oldStatus;
};
}} // namespace qpid::ha
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp?rev=1495466&r1=1495465&r2=1495466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp Fri Jun 21 15:02:08 2013
@@ -51,7 +51,8 @@ QueueGuard::QueueGuard(broker::Queue& q,
: cancelled(false), queue(q)
{
std::ostringstream os;
- os << "Guard of " << queue.getName() << " at " << info << ": ";
+ os << "Guard of " << queue.getName() << " at ";
+ info.printId(os) << ": ";
logPrefix = os.str();
observer.reset(new QueueObserver(*this));
queue.addObserver(observer);
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1495466&r1=1495465&r2=1495466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp Fri Jun 21 15:02:08 2013
@@ -121,7 +121,8 @@ ReplicatingSubscription::ReplicatingSubs
// Set a log prefix message that identifies the remote broker.
ostringstream os;
- os << "Subscription to " << queue->getName() << " at " << info << ": ";
+ os << "Subscription to " << queue->getName() << " at ";
+ info.printId(os) << ": ";
logPrefix = os.str();
// If this is a non-cluster standalone replication then we need to
Modified: qpid/trunk/qpid/cpp/src/tests/ha_test.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_test.py?rev=1495466&r1=1495465&r2=1495466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_test.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_test.py Fri Jun 21 15:02:08 2013
@@ -107,7 +107,7 @@ class HaBroker(Broker):
ha_port = ha_port or HaPort(test)
args = copy(args)
args += ["--load-module", BrokerTest.ha_lib,
- "--log-enable=trace+:ha::", # FIXME aconway 2013-06-14: debug+
+ "--log-enable=debug+:ha::",
# Non-standard settings for faster tests.
"--link-maintenance-interval=0.1",
# Heartbeat and negotiate time are needed so that a broker wont
Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1495466&r1=1495465&r2=1495466&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Fri Jun 21 15:02:08 2013
@@ -1121,13 +1121,10 @@ class RecoveryTests(HaBrokerTest):
but can still rejoin.
"""
cluster = HaCluster(self, 3, args=["--ha-backup-timeout=0.5"]);
- cluster[0].wait_status("active") # Primary ready
- for b in cluster[1:3]: b.wait_status("ready") # Backups ready
for i in [0,1]: cluster.kill(i, False)
- cluster[2].promote() # New primary, expected backup will 1
- cluster[2].wait_status("recovering")
+ cluster[2].promote() # New primary, expected backup will be 1
# Should not go active till the expected backup connects or times out.
- self.assertEqual(cluster[2].ha_status(), "recovering")
+ cluster[2].wait_status("recovering")
# Messages should be held till expected backup times out
s = cluster[2].connect().session().sender("q;{create:always}")
s.send("foo", sync=False)
@@ -1135,7 +1132,7 @@ class RecoveryTests(HaBrokerTest):
try: s.sync(timeout=.01); self.fail("Expected Timeout exception")
except Timeout: pass
s.sync(timeout=1) # And released after the timeout.
- self.assertEqual(cluster[2].ha_status(), "active")
+ cluster[2].wait_status("active")
def test_join_ready_cluster(self):
"""If we join a cluster where the primary is dead, the new primary is
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org