You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2013/01/11 21:34:19 UTC
svn commit: r1432273 - in /qpid/trunk/qpid/cpp/src: qpid/ha/HaBroker.cpp
qpid/ha/HaBroker.h qpid/ha/Primary.cpp qpid/ha/RemoteBackup.cpp
qpid/ha/StatusCheck.h tests/ha_tests.py
Author: aconway
Date: Fri Jan 11 20:34:19 2013
New Revision: 1432273
URL: http://svn.apache.org/viewvc?rev=1432273&view=rev
Log:
QPID-4516: Sporadic failure in ha_tests test_failover_send_receive
Several fixes were required in the code to correct this problem:
- Missing break statement in switch.
- Remove unused function HaBroker::resetMembership
- Abort connection of timed-out backups so they can attempt to reconnect.
- New primary resets membership before allowing backups to connect.
- Test for and ignore double-promotion.
- HaBroker: dynamic logPrefix() shows status. Made status atomic for efficient access for log messages.
- Update primary status in membership.
Modified:
qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h
qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.h
qpid/trunk/qpid/cpp/src/tests/ha_tests.py
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1432273&r1=1432272&r2=1432273&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp Fri Jan 11 20:34:19 2013
@@ -59,8 +59,7 @@ using boost::shared_ptr;
// Called in Plugin::earlyInitialize
HaBroker::HaBroker(broker::Broker& b, const Settings& s)
- : logPrefix("Broker: "),
- broker(b),
+ : broker(b),
systemId(broker.getSystem()->getSystemId().data()),
settings(s),
observer(new ConnectionObserver(*this, systemId)),
@@ -72,7 +71,8 @@ HaBroker::HaBroker(broker::Broker& b, co
// otherwise there's a window for a client to connect before we get to
// initialize()
if (settings.cluster) {
- QPID_LOG(debug, logPrefix << "Rejecting client connections.");
+ status = JOINING;
+ QPID_LOG(debug, logPrefix() << "Rejecting client connections.");
shared_ptr<broker::ConnectionObserver> excluder(new BackupConnectionExcluder);
observer->setObserver(excluder, "Backup: ");
broker.getConnectionObservers().add(observer);
@@ -80,6 +80,16 @@ HaBroker::HaBroker(broker::Broker& b, co
}
namespace {
+const std::string PREFIX_PRIMARY("Primary(");
+const std::string PREFIX_BACKUP("Backup(");
+const std::string PREFIX_END("): ");
+}
+std::string HaBroker::logPrefix() const {
+ BrokerStatus s = status.get();
+ return (isPrimary(s) ? PREFIX_PRIMARY : PREFIX_BACKUP) + printable(s).str()+PREFIX_END;
+}
+
+namespace {
const std::string NONE("none");
bool isNone(const std::string& x) { return x.empty() || x == NONE; }
}
@@ -92,7 +102,7 @@ void HaBroker::initialize() {
broker.getSystem()->getNodeName(),
broker.getPort(broker::Broker::TCP_TRANSPORT),
systemId);
- QPID_LOG(notice, logPrefix << "Initializing: " << brokerInfo);
+ QPID_LOG(notice, logPrefix() << "Initializing: " << brokerInfo);
// Set up the management object.
ManagementAgent* ma = broker.getManagementAgent();
@@ -114,7 +124,7 @@ void HaBroker::initialize() {
status = JOINING;
backup.reset(new Backup(*this, settings));
broker.getKnownBrokers = boost::bind(&HaBroker::getKnownBrokers, this);
- statusCheck.reset(new StatusCheck(logPrefix, broker.getLinkHearbeatInterval(), brokerInfo));
+ statusCheck.reset(new StatusCheck(logPrefix(), broker.getLinkHearbeatInterval(), brokerInfo));
if (!isNone(settings.publicUrl)) setPublicUrl(Url(settings.publicUrl));
if (!isNone(settings.brokerUrl)) setBrokerUrl(Url(settings.brokerUrl));
}
@@ -127,7 +137,7 @@ void HaBroker::initialize() {
}
HaBroker::~HaBroker() {
- QPID_LOG(notice, logPrefix << "Shut down");
+ QPID_LOG(notice, logPrefix() << "Shut down");
broker.getConnectionObservers().remove(observer);
}
@@ -137,6 +147,11 @@ void HaBroker::recover() {
BrokerInfo::Set backups;
{
Mutex::ScopedLock l(lock);
+ if (isPrimary(status.get())) {
+ QPID_LOG(info, "Ignoring promotion, already primary: " << brokerInfo);
+ return;
+ }
+ QPID_LOG(notice, "Promoting to primary: " << brokerInfo);
// Reset membership before allowing backups to connect.
backups = membership.otherBackups();
membership.reset(brokerInfo);
@@ -167,12 +182,13 @@ Manageable::status_t HaBroker::Managemen
if (statusCheck->canPromote())
recover();
else {
- QPID_LOG(error, logPrefix << "Cluster already active, cannot be promoted");
+ QPID_LOG(error,
+ logPrefix() << "Joining active cluster, cannot be promoted.");
throw Exception("Cluster already active, cannot be promoted.");
}
break;
case CATCHUP:
- QPID_LOG(error, logPrefix << "Still catching up, cannot be promoted.");
+ QPID_LOG(error, logPrefix() << "Still catching up, cannot be promoted.");
throw Exception("Still catching up, cannot be promoted.");
break;
case READY: recover(); break;
@@ -193,7 +209,7 @@ Manageable::status_t HaBroker::Managemen
case _qmf::HaBroker::METHOD_REPLICATE: {
_qmf::ArgsHaBrokerReplicate& bq_args =
dynamic_cast<_qmf::ArgsHaBrokerReplicate&>(args);
- QPID_LOG(debug, logPrefix << "Replicate individual queue "
+ QPID_LOG(debug, logPrefix() << "Replicate individual queue "
<< bq_args.i_queue << " from " << bq_args.i_broker);
boost::shared_ptr<broker::Queue> queue = broker.getQueues().get(bq_args.i_queue);
@@ -228,7 +244,7 @@ void HaBroker::setPublicUrl(const Url& u
mgmtObject->set_publicUrl(url.str());
knownBrokers.clear();
knownBrokers.push_back(url);
- QPID_LOG(debug, logPrefix << "Setting public URL to: " << url);
+ QPID_LOG(debug, logPrefix() << "Setting public URL to: " << url);
}
void HaBroker::setBrokerUrl(const Url& url) {
@@ -237,8 +253,8 @@ void HaBroker::setBrokerUrl(const Url& u
Mutex::ScopedLock l(lock);
brokerUrl = url;
mgmtObject->set_brokersUrl(brokerUrl.str());
- QPID_LOG(info, logPrefix << "Brokers URL set to: " << url);
- if (status == JOINING && statusCheck.get()) statusCheck->setUrl(url);
+ QPID_LOG(info, logPrefix() << "Brokers URL set to: " << url);
+ if (status.get() == JOINING && statusCheck.get()) statusCheck->setUrl(url);
b = backup;
}
if (b) b->setBrokerUrl(url); // Oustside lock, avoid deadlock
@@ -250,13 +266,12 @@ std::vector<Url> HaBroker::getKnownBroke
}
void HaBroker::shutdown() {
- QPID_LOG(critical, logPrefix << "Critical error, shutting down.");
+ QPID_LOG(critical, logPrefix() << "Critical error, shutting down.");
broker.shutdown();
}
BrokerStatus HaBroker::getStatus() const {
- Mutex::ScopedLock l(lock);
- return status;
+ return status.get();
}
void HaBroker::setStatus(BrokerStatus newStatus) {
@@ -285,12 +300,12 @@ bool checkTransition(BrokerStatus from,
} // namespace
void HaBroker::setStatus(BrokerStatus newStatus, Mutex::ScopedLock& l) {
- QPID_LOG(info, logPrefix << "Status change: "
- << printable(status) << " -> " << printable(newStatus));
- bool legal = checkTransition(status, newStatus);
+ QPID_LOG(info, logPrefix() << "Status change: "
+ << printable(status.get()) << " -> " << printable(newStatus));
+ bool legal = checkTransition(status.get(), newStatus);
if (!legal) {
- QPID_LOG(critical, logPrefix << "Illegal state transition: "
- << printable(status) << " -> " << printable(newStatus));
+ QPID_LOG(critical, logPrefix() << "Illegal state transition: "
+ << printable(status.get()) << " -> " << printable(newStatus));
shutdown();
}
assert(legal); // FIXME aconway 2012-12-07: fail
@@ -299,13 +314,15 @@ void HaBroker::setStatus(BrokerStatus ne
}
void HaBroker::statusChanged(Mutex::ScopedLock& l) {
- mgmtObject->set_status(printable(status).str());
- brokerInfo.setStatus(status);
+ mgmtObject->set_status(printable(status.get()).str());
+ brokerInfo.setStatus(status.get());
+ membership.add(brokerInfo);
+ membershipUpdated(l);
setLinkProperties(l);
}
void HaBroker::membershipUpdated(Mutex::ScopedLock&) {
- QPID_LOG(info, logPrefix << "Membership changed: " << membership);
+ QPID_LOG(info, logPrefix() << "Membership: " << membership);
Variant::List brokers = membership.asList();
mgmtObject->set_members(brokers);
broker.getManagementAgent()->raiseEvent(_qmf::EventMembersUpdate(brokers));
@@ -316,23 +333,24 @@ void HaBroker::setMembership(const Varia
{
Mutex::ScopedLock l(lock);
membership.assign(brokers);
- QPID_LOG(info, logPrefix << "Membership update: " << membership);
BrokerInfo info;
- // Update my status to what the primary says it is. The primary can toggle
- // status between READY and CATCHUP based on the state of our subscriptions.
- if (membership.get(systemId, info) && status != info.getStatus()) {
+ // Update my status to what the primary says it is. The primary sets
+ // status to READY when we are caught up, and sets status to CATCHUP
+ // (from READY) if we are timed out during recovery.
+ if (membership.get(systemId, info) && status.get() != info.getStatus()) {
+ assert((status.get() == CATCHUP && info.getStatus() == READY) ||
+ (status.get() == READY && info.getStatus() == CATCHUP));
setStatus(info.getStatus(), l);
b = backup;
}
membershipUpdated(l);
}
- if (b) b->setStatus(status); // Oustside lock, avoid deadlock
+ if (b) b->setStatus(status.get()); // Oustside lock, avoid deadlock
}
void HaBroker::addBroker(const BrokerInfo& b) {
Mutex::ScopedLock l(lock);
membership.add(b);
- QPID_LOG(debug, logPrefix << "Membership add: " << b);
membershipUpdated(l);
}
@@ -341,14 +359,13 @@ void HaBroker::removeBroker(const Uuid&
BrokerInfo info;
if (membership.get(id, info)) {
membership.remove(id);
- QPID_LOG(debug, logPrefix << "Membership remove: " << info);
membershipUpdated(l);
}
}
void HaBroker::setLinkProperties(Mutex::ScopedLock&) {
framing::FieldTable linkProperties = broker.getLinkClientProperties();
- if (isBackup(status)) {
+ if (isBackup(status.get())) {
// If this is a backup then any outgoing links are backup
// links and need to be tagged.
linkProperties.setTable(ConnectionObserver::BACKUP_TAG, brokerInfo.asFieldTable());
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h?rev=1432273&r1=1432272&r2=1432273&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h Fri Jan 11 20:34:19 2013
@@ -32,6 +32,7 @@
#include "qmf/org/apache/qpid/ha/HaBroker.h"
#include "qpid/management/Manageable.h"
#include "qpid/types/Variant.h"
+#include "qpid/sys/AtomicValue.h"
#include <set>
#include <boost/shared_ptr.hpp>
@@ -103,18 +104,16 @@ class HaBroker : public management::Mana
void setBrokerUrl(const Url&);
void updateClientUrl(sys::Mutex::ScopedLock&);
- bool isPrimary(sys::Mutex::ScopedLock&) { return !backup.get(); }
-
void setStatus(BrokerStatus, sys::Mutex::ScopedLock&);
void recover();
void statusChanged(sys::Mutex::ScopedLock&);
void setLinkProperties(sys::Mutex::ScopedLock&);
+ std::string logPrefix() const;
std::vector<Url> getKnownBrokers() const;
void membershipUpdated(sys::Mutex::ScopedLock&);
- std::string logPrefix;
broker::Broker& broker;
types::Uuid systemId;
const Settings settings;
@@ -126,7 +125,7 @@ class HaBroker : public management::Mana
qmf::org::apache::qpid::ha::HaBroker::shared_ptr mgmtObject;
Url publicUrl, brokerUrl;
std::vector<Url> knownBrokers;
- BrokerStatus status;
+ sys::AtomicValue<BrokerStatus> status;
BrokerInfo brokerInfo;
Membership membership;
ReplicationTest replicationTest;
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp?rev=1432273&r1=1432272&r2=1432273&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp Fri Jan 11 20:34:19 2013
@@ -161,6 +161,10 @@ void Primary::timeoutExpectedBackups() {
expectedBackups.erase(i++);
backups.erase(info.getSystemId());
rb->cancel();
+ // Downgrade the broker's status to CATCHUP
+ // The broker will get this status change when it eventually connects.
+ info.setStatus(CATCHUP);
+ haBroker.addBroker(info);
}
else ++i;
}
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp?rev=1432273&r1=1432272&r2=1432273&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp Fri Jan 11 20:34:19 2013
@@ -48,6 +48,8 @@ void RemoteBackup::setCatchupQueues(brok
RemoteBackup::~RemoteBackup() { cancel(); }
void RemoteBackup::cancel() {
+ QPID_LOG(debug, logPrefix << "Cancelled " << (connection? "connected":"disconnected")
+ << " backup: " << brokerInfo);
for (GuardMap::iterator i = guards.begin(); i != guards.end(); ++i)
i->second->cancel();
guards.clear();
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.h?rev=1432273&r1=1432272&r2=1432273&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.h Fri Jan 11 20:34:19 2013
@@ -32,6 +32,11 @@
namespace qpid {
namespace ha {
+// FIXME aconway 2012-12-21: This solution is incomplete. It will only protect
+// against bad promotion if there are READY brokers when this broker starts.
+// It will not help the situation where brokers became READY after this one starts.
+//
+
/**
* Check whether a JOINING broker can be promoted .
*
@@ -49,8 +54,10 @@ class StatusCheck
~StatusCheck();
void setUrl(const Url&);
bool canPromote();
- void setPromote(bool p);
+
private:
+ void setPromote(bool p);
+
std::string logPrefix;
sys::Mutex lock;
std::vector<sys::Thread> threads;
Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1432273&r1=1432272&r2=1432273&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Fri Jan 11 20:34:19 2013
@@ -1068,13 +1068,15 @@ class RecoveryTests(HaBrokerTest):
l = LogLevel(ERROR) # Hide expected WARNING log messages from failover.
try:
# We don't want backups to time out for this test, set long timeout.
- cluster = HaCluster(self, 4, args=["--ha-backup-timeout=100000"]);
+ cluster = HaCluster(self, 4, args=["--ha-backup-timeout=120"]);
# Wait for the primary to be ready
cluster[0].wait_status("active")
+ for b in cluster[1:4]: b.wait_status("ready")
# Create a queue before the failure.
s1 = cluster.connect(0).session().sender("q1;{create:always}")
for b in cluster: b.wait_backup("q1")
for i in xrange(100): s1.send(str(i))
+
# Kill primary and 2 backups
cluster[3].wait_status("ready")
for i in [0,1,2]: cluster.kill(i, False)
@@ -1091,14 +1093,16 @@ class RecoveryTests(HaBrokerTest):
s2 = cluster.connect(3).session().sender("q2;{create:always}")
# Verify that messages sent are not completed
- for i in xrange(100,200): s1.send(str(i), sync=False); s2.send(str(i), sync=False)
+ for i in xrange(100,200):
+ s1.send(str(i), sync=False);
+ s2.send(str(i), sync=False)
assertSyncTimeout(s1)
self.assertEqual(s1.unsettled(), 100)
assertSyncTimeout(s2)
self.assertEqual(s2.unsettled(), 100)
# Verify we can receive even if sending is on hold:
- cluster[3].assert_browse("q1", [str(i) for i in range(100)+range(100,200)])
+ cluster[3].assert_browse("q1", [str(i) for i in range(200)])
# Restart backups, verify queues are released only when both backups are up
cluster.restart(1)
@@ -1106,11 +1110,10 @@ class RecoveryTests(HaBrokerTest):
self.assertEqual(s1.unsettled(), 100)
assertSyncTimeout(s2)
self.assertEqual(s2.unsettled(), 100)
- self.assertEqual(cluster[3].ha_status(), "recovering")
cluster.restart(2)
# Verify everything is up to date and active
- def settled(sender): sender.sync(); return sender.unsettled() == 0;
+ def settled(sender): sender.sync(timeout=1); return sender.unsettled() == 0;
assert retry(lambda: settled(s1)), "Unsetttled=%s"%(s1.unsettled())
assert retry(lambda: settled(s2)), "Unsetttled=%s"%(s2.unsettled())
cluster[1].assert_browse_backup("q1", [str(i) for i in range(100)+range(100,200)])
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org