You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2010/05/25 20:05:55 UTC
svn commit: r948143 - in /qpid/trunk/qpid/cpp/src/qpid/cluster: Cluster.cpp
Cluster.h
Author: aconway
Date: Tue May 25 18:05:54 2010
New Revision: 948143
URL: http://svn.apache.org/viewvc?rev=948143&view=rev
Log:
Fix "mismatched cluster-id" errors during start up.
Intermittent failure when starting a persistent cluster with all clean stores.
Some brokers fail with:
critical Unexpected error: Cluster-ID mismatch. Stores belong to different clusters.
Modified:
qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=948143&r1=948142&r2=948143&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Tue May 25 18:05:54 2010
@@ -266,7 +266,7 @@ Cluster::Cluster(const ClusterSettings&
initMap(self, settings.size),
store(broker.getDataDir().getPath()),
elder(false),
- lastSize(0),
+ lastAliveCount(0),
lastBroker(false),
updateRetracted(false),
error(*this)
@@ -290,7 +290,7 @@ Cluster::Cluster(const ClusterSettings&
store.load();
clusterId = store.getClusterId();
QPID_LOG(notice, "Cluster store state: " << store)
- }
+ }
cpg.join(name);
// pump the CPG dispatch manually till we get past PRE_INIT.
while (state == PRE_INIT)
@@ -326,7 +326,8 @@ void Cluster::initialize() {
mgmtObject->set_status("JOINING");
}
- // Run initMapCompleted immediately to process the initial configuration.
+ // Run initMapCompleted immediately to process the initial configuration
+ // that allowed us to transition out of PRE_INIT
assert(state == INIT);
initMapCompleted(*(Mutex::ScopedLock*)0); // Fake lock, single-threaded context.
@@ -433,7 +434,7 @@ const ClusterUpdateOfferBody* castUpdate
const ClusterConnectionAnnounceBody* castAnnounce( const framing::AMQBody *body) {
return (body && body->getMethod() &&
body->getMethod()->isA<ClusterConnectionAnnounceBody>()) ?
- static_cast<const ClusterConnectionAnnounceBody*>(body) : 0;
+ static_cast<const ClusterConnectionAnnounceBody*>(body) : 0;
}
// Handler for deliverEventQueue.
@@ -616,8 +617,8 @@ void Cluster::initMapCompleted(Lock& l)
<< " members, waiting for at least " << initMap.getRequiredSize());
return;
}
- initMap.checkConsistent();
+ initMap.checkConsistent();
elders = initMap.getElders();
QPID_LOG(debug, *this << " elders: " << elders);
if (elders.empty())
@@ -657,11 +658,11 @@ void Cluster::configChange(const MemberI
MemberSet members = decodeMemberSet(membersStr);
MemberSet left = decodeMemberSet(leftStr);
MemberSet joined = decodeMemberSet(joinedStr);
- QPID_LOG(notice, *this << " Membership update: " << members);
+ QPID_LOG(notice, *this << " configuration change: " << members);
QPID_LOG_IF(notice, !left.empty(), *this << " Members left: " << left);
QPID_LOG_IF(notice, !joined.empty(), *this << " Members joined: " << joined);
- // Update initital status for members joining or leaving.
+ // If we are still joining, make sure there is someone to give us an update.
elders = intersection(elders, members);
if (elders.empty() && INIT < state && state < CATCHUP) {
QPID_LOG(critical, "Cannot update, all potential updaters left the cluster.");
@@ -882,6 +883,7 @@ void Cluster::checkUpdateIn(Lock& l) {
failoverExchange->setUrls(getUrls(l));
mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
state = CATCHUP;
+ memberUpdate(l);
broker.setClusterUpdatee(false);
if (mAgent) mAgent->suppress(false); // Enable management output.
discarding = false; // ok to set, we're stalled for update.
@@ -908,7 +910,7 @@ void Cluster::updateOutDone(Lock& l) {
QPID_LOG(notice, *this << " update sent");
assert(state == UPDATER);
state = READY;
- deliverEventQueue.start(); // Start processing events again.
+ deliverEventQueue.start(); // Start processing events again.
makeOffer(map.firstJoiner(), l); // Try another offer
}
@@ -959,15 +961,18 @@ void Cluster::stopFullCluster(Lock& ) {
}
void Cluster::memberUpdate(Lock& l) {
+ // Ignore config changes while we are joining.
+ if (state < CATCHUP) return;
QPID_LOG(info, *this << " member update: " << map);
std::vector<Url> urls = getUrls(l);
std::vector<string> ids = getIds(l);
- size_t size = urls.size();
+ size_t aliveCount = map.aliveCount();
+ assert(map.isAlive(self));
failoverExchange->updateUrls(urls);
+ // Mark store clean if I am the only broker, dirty otherwise.
if (store.hasStore()) {
- // Mark store clean if I am the only broker, dirty otherwise.
- if (size == 1 ) {
+ if (aliveCount == 1) {
if (store.getState() != STORE_STATE_CLEAN_STORE) {
QPID_LOG(notice, *this << "Sole member of cluster, marking store clean.");
store.clean(Uuid(true));
@@ -975,26 +980,28 @@ void Cluster::memberUpdate(Lock& l) {
}
else {
if (store.getState() != STORE_STATE_DIRTY_STORE) {
- QPID_LOG(notice, "No longer sole cluster member, marking store dirty.");
+ QPID_LOG(notice, "Running in a cluster, marking store dirty.");
store.dirty();
}
}
}
- if (size == 1 && lastSize > 1 && state >= CATCHUP) {
+ // If I am the last member standing, set queue policies.
+ if (aliveCount == 1 && lastAliveCount > 1 && state >= CATCHUP) {
QPID_LOG(notice, *this << " last broker standing, update queue policies");
lastBroker = true;
broker.getQueues().updateQueueClusterState(true);
}
- else if (size > 1 && lastBroker) {
- QPID_LOG(notice, *this << " last broker standing joined by " << size-1 << " replicas, updating queue policies" << size);
+ else if (aliveCount > 1 && lastBroker) {
+ QPID_LOG(notice, *this << " last broker standing joined by " << aliveCount-1
+ << " replicas, updating queue policies.");
lastBroker = false;
broker.getQueues().updateQueueClusterState(false);
}
- lastSize = size;
+ lastAliveCount = aliveCount;
if (mgmtObject) {
- mgmtObject->set_clusterSize(size);
+ mgmtObject->set_clusterSize(urls.size());
string urlstr;
for(std::vector<Url>::iterator iter = urls.begin(); iter != urls.end(); iter++ ) {
if (iter != urls.begin()) urlstr += ";";
@@ -1029,7 +1036,7 @@ std::ostream& operator<<(std::ostream& o
assert(sizeof(STATE)/sizeof(*STATE) == Cluster::LEFT+1);
o << "cluster(" << cluster.self << " " << STATE[cluster.state];
if (cluster.error.isUnresolved()) o << "/error";
- return o << ")";;
+ return o << ")";
}
MemberId Cluster::getId() const {
@@ -1071,8 +1078,8 @@ void Cluster::timerWakeup(const MemberId
void Cluster::timerDrop(const MemberId& , const std::string& name, Lock&) {
QPID_LOG(debug, "Cluster timer drop " << map.getFrameSeq() << ": " << name)
- if (state >= CATCHUP) // Pre catchup our timer isn't set up.
- timer->deliverDrop(name);
+ if (state >= CATCHUP) // Pre catchup our timer isn't set up.
+ timer->deliverDrop(name);
}
bool Cluster::isElder() const {
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=948143&r1=948142&r2=948143&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Tue May 25 18:05:54 2010
@@ -273,7 +273,7 @@ class Cluster : private Cpg::Handler, pu
ClusterMap map;
MemberSet elders;
bool elder;
- size_t lastSize;
+ size_t lastAliveCount;
bool lastBroker;
sys::Thread updateThread;
boost::optional<ClusterMap> updatedMap;
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org