You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2010/05/25 20:05:55 UTC

svn commit: r948143 - in /qpid/trunk/qpid/cpp/src/qpid/cluster: Cluster.cpp Cluster.h

Author: aconway
Date: Tue May 25 18:05:54 2010
New Revision: 948143

URL: http://svn.apache.org/viewvc?rev=948143&view=rev
Log:
Fix "mismatched cluster-id" errors during start up.

Intermittent failure when starting a persistent cluster with all clean stores.
Some brokers fail with:
  critical Unexpected error: Cluster-ID mismatch. Stores belong to different clusters.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=948143&r1=948142&r2=948143&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Tue May 25 18:05:54 2010
@@ -266,7 +266,7 @@ Cluster::Cluster(const ClusterSettings& 
     initMap(self, settings.size),
     store(broker.getDataDir().getPath()),
     elder(false),
-    lastSize(0),
+    lastAliveCount(0),
     lastBroker(false),
     updateRetracted(false),
     error(*this)
@@ -290,7 +290,7 @@ Cluster::Cluster(const ClusterSettings& 
         store.load();
         clusterId = store.getClusterId(); 
         QPID_LOG(notice, "Cluster store state: " << store)
-    }
+            }
     cpg.join(name);
     // pump the CPG dispatch manually till we get past PRE_INIT.
     while (state == PRE_INIT)
@@ -326,7 +326,8 @@ void Cluster::initialize() {
         mgmtObject->set_status("JOINING");
     }
 
-    // Run initMapCompleted immediately to process the initial configuration.
+    // Run initMapCompleted immediately to process the initial configuration
+    // that allowed us to transition out of PRE_INIT
     assert(state == INIT);
     initMapCompleted(*(Mutex::ScopedLock*)0); // Fake lock, single-threaded context.
 
@@ -433,7 +434,7 @@ const ClusterUpdateOfferBody* castUpdate
 const ClusterConnectionAnnounceBody* castAnnounce( const framing::AMQBody *body) {
     return  (body && body->getMethod() &&
              body->getMethod()->isA<ClusterConnectionAnnounceBody>()) ?
-      static_cast<const ClusterConnectionAnnounceBody*>(body) : 0;
+        static_cast<const ClusterConnectionAnnounceBody*>(body) : 0;
 }
 
 // Handler for deliverEventQueue.
@@ -616,8 +617,8 @@ void Cluster::initMapCompleted(Lock& l) 
                      << " members, waiting for at least " << initMap.getRequiredSize());
             return;
         }
-        initMap.checkConsistent();
 
+        initMap.checkConsistent();
         elders = initMap.getElders();
         QPID_LOG(debug, *this << " elders: " << elders);
         if (elders.empty())
@@ -657,11 +658,11 @@ void Cluster::configChange(const MemberI
     MemberSet members = decodeMemberSet(membersStr);
     MemberSet left = decodeMemberSet(leftStr);
     MemberSet joined = decodeMemberSet(joinedStr);
-    QPID_LOG(notice, *this << " Membership update: " << members);
+    QPID_LOG(notice, *this << " configuration change: " << members);
     QPID_LOG_IF(notice, !left.empty(), *this << " Members left: " << left);
     QPID_LOG_IF(notice, !joined.empty(), *this << " Members joined: " << joined);
 
-    // Update initital status for members joining or leaving.
+    // If we are still joining, make sure there is someone to give us an update.
     elders = intersection(elders, members);
     if (elders.empty() && INIT < state && state < CATCHUP) {
         QPID_LOG(critical, "Cannot update, all potential updaters left the cluster.");
@@ -882,6 +883,7 @@ void Cluster::checkUpdateIn(Lock& l) {
         failoverExchange->setUrls(getUrls(l));
         mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
         state = CATCHUP;
+        memberUpdate(l);
         broker.setClusterUpdatee(false);
         if (mAgent) mAgent->suppress(false); // Enable management output.
         discarding = false;     // ok to set, we're stalled for update.
@@ -908,7 +910,7 @@ void Cluster::updateOutDone(Lock& l) {
     QPID_LOG(notice, *this << " update sent");
     assert(state == UPDATER);
     state = READY;
-     deliverEventQueue.start();       // Start processing events again.
+    deliverEventQueue.start();       // Start processing events again.
     makeOffer(map.firstJoiner(), l); // Try another offer
 }
 
@@ -959,15 +961,18 @@ void Cluster::stopFullCluster(Lock& ) {
 }
 
 void Cluster::memberUpdate(Lock& l) {
+    // Ignore config changes while we are joining.
+    if (state < CATCHUP) return;
     QPID_LOG(info, *this << " member update: " << map);
     std::vector<Url> urls = getUrls(l);
     std::vector<string> ids = getIds(l);
-    size_t size = urls.size();
+    size_t aliveCount = map.aliveCount();
+    assert(map.isAlive(self));
     failoverExchange->updateUrls(urls);
 
+    // Mark store clean if I am the only broker, dirty otherwise.
     if (store.hasStore()) {
-        // Mark store clean if I am the only broker, dirty otherwise.
-        if (size == 1 ) {
+        if (aliveCount == 1) {
             if (store.getState() != STORE_STATE_CLEAN_STORE) {
                 QPID_LOG(notice, *this << "Sole member of cluster, marking store clean.");
                 store.clean(Uuid(true));
@@ -975,26 +980,28 @@ void Cluster::memberUpdate(Lock& l) {
         }
         else {
             if (store.getState() != STORE_STATE_DIRTY_STORE) {
-                QPID_LOG(notice, "No longer sole cluster member, marking store dirty.");
+                QPID_LOG(notice, "Running in a cluster, marking store dirty.");
                 store.dirty();
             }
         }
     }
 
-    if (size == 1 && lastSize > 1 && state >= CATCHUP) {
+    // If I am the last member standing, set queue policies.
+    if (aliveCount == 1 && lastAliveCount > 1 && state >= CATCHUP) {
         QPID_LOG(notice, *this << " last broker standing, update queue policies");
         lastBroker = true;
         broker.getQueues().updateQueueClusterState(true);
     }
-    else if (size > 1 && lastBroker) {
-        QPID_LOG(notice, *this << " last broker standing joined by " << size-1 << " replicas, updating queue policies" << size);
+    else if (aliveCount > 1 && lastBroker) {
+        QPID_LOG(notice, *this << " last broker standing joined by " << aliveCount-1
+                 << " replicas, updating queue policies.");
         lastBroker = false;
         broker.getQueues().updateQueueClusterState(false);
     }
-    lastSize = size;
+    lastAliveCount = aliveCount;
 
     if (mgmtObject) {
-        mgmtObject->set_clusterSize(size); 
+        mgmtObject->set_clusterSize(urls.size()); 
         string urlstr;
         for(std::vector<Url>::iterator iter = urls.begin(); iter != urls.end(); iter++ ) {
             if (iter != urls.begin()) urlstr += ";";
@@ -1029,7 +1036,7 @@ std::ostream& operator<<(std::ostream& o
     assert(sizeof(STATE)/sizeof(*STATE) == Cluster::LEFT+1);
     o << "cluster(" << cluster.self << " " << STATE[cluster.state];
     if (cluster.error.isUnresolved()) o << "/error";
-    return o << ")";;
+    return o << ")";
 }
 
 MemberId Cluster::getId() const {
@@ -1071,8 +1078,8 @@ void Cluster::timerWakeup(const MemberId
 
 void Cluster::timerDrop(const MemberId& , const std::string& name, Lock&) {
     QPID_LOG(debug, "Cluster timer drop " << map.getFrameSeq() << ": " << name)
-    if (state >= CATCHUP) // Pre catchup our timer isn't set up.
-        timer->deliverDrop(name);
+        if (state >= CATCHUP) // Pre catchup our timer isn't set up.
+            timer->deliverDrop(name);
 }
 
 bool Cluster::isElder() const {

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=948143&r1=948142&r2=948143&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Tue May 25 18:05:54 2010
@@ -273,7 +273,7 @@ class Cluster : private Cpg::Handler, pu
     ClusterMap map;
     MemberSet elders;
     bool elder;
-    size_t lastSize;
+    size_t lastAliveCount;
     bool lastBroker;
     sys::Thread updateThread;
     boost::optional<ClusterMap> updatedMap;



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org