You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2009/02/27 20:34:47 UTC

svn commit: r748651 - in /qpid/trunk/qpid/cpp/src/qpid/cluster: Cluster.cpp Cluster.h

Author: aconway
Date: Fri Feb 27 19:34:47 2009
New Revision: 748651

URL: http://svn.apache.org/viewvc?rev=748651&view=rev
Log:
cluster: apply membership updates while in CATCHUP mode.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=748651&r1=748650&r2=748651&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Fri Feb 27 19:34:47 2009
@@ -234,9 +234,9 @@
             // Check for deliver close here so we can erase the
             // connection decoder safely in this thread.
             if (frame.getMethod()->isA<ClusterConnectionDeliverCloseBody>())
-                decoder.erase(e.getConnectionId());
+                    decoder.erase(e.getConnectionId());
             deliverFrameQueue.push(EventFrame(e, frame));
-    }
+        }
     }
     else if (e.getType() == DATA)
         decoder.decode(e, e.getData());
@@ -345,7 +345,7 @@
             broker.getLinks().setPassive(true);
         }
     }
-    else if (state >= READY && memberChange) {
+    else if (state >= CATCHUP && memberChange) {
         memberUpdate(l);
         elders = ClusterMap::intersection(elders, map.getAlive());
         if (elders.empty()) {
@@ -357,7 +357,7 @@
 
 bool Cluster::isLeader() const { return elders.empty(); }
 
-void Cluster::tryMakeOffer(const MemberId& id, Lock& ) {
+void Cluster::makeOffer(const MemberId& id, Lock& ) {
     if (state == READY && map.isJoiner(id)) {
         state = OFFER;
         QPID_LOG(info, *this << " send update-offer to " << id);
@@ -382,7 +382,7 @@
 
 void Cluster::updateRequest(const MemberId& id, const std::string& url, Lock& l) {
     map.updateRequest(id, url);
-    tryMakeOffer(id, l);
+    makeOffer(id, l);
 }
 
 void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) {
@@ -406,7 +406,7 @@
         else {                  // Another offer was first.
             setReady(l);
             QPID_LOG(info, *this << " cancelled update offer to " << updatee);
-            tryMakeOffer(map.firstJoiner(), l); // Maybe make another offer.
+            makeOffer(map.firstJoiner(), l); // Maybe make another offer.
         }
     }
     else if (updatee == myId && url) {
@@ -446,7 +446,6 @@
 }
 
 void Cluster::checkUpdateIn(Lock& ) {
-    if (state == LEFT) return;
     if (state == UPDATEE && updatedMap) {
         map = *updatedMap;
         mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), myId);
@@ -467,7 +466,7 @@
     state = READY;
     mcast.release();
     deliverFrameQueue.start();
-    tryMakeOffer(map.firstJoiner(), l); // Try another offer
+    makeOffer(map.firstJoiner(), l); // Try another offer
 }
 
 void Cluster::updateOutError(const std::exception& e)  {
@@ -522,7 +521,7 @@
     size_t size = urls.size();
     failoverExchange->setUrls(urls);
 
-    if (size == 1 && lastSize > 1 && state >= READY) { 
+    if (size == 1 && lastSize > 1 && state >= CATCHUP) { 
         QPID_LOG(info, *this << " last broker standing, update queue policies");
         lastBroker = true;
         broker.getQueues().updateQueueClusterState(true);

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=748651&r1=748650&r2=748651&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Fri Feb 27 19:34:47 2009
@@ -119,7 +119,7 @@
     std::vector<Url> getUrls(Lock&) const;
 
     // Make an offer if we can - called in deliver thread.
-    void tryMakeOffer(const MemberId&, Lock&);
+    void makeOffer(const MemberId&, Lock&);
 
     // Called in main thread in ~Broker.
     void brokerShutdown();
@@ -133,6 +133,8 @@
     void configChange(const MemberId&, const std::string& addresses, Lock& l);
     void messageExpired(const MemberId&, uint64_t, Lock& l);
     void shutdown(const MemberId&, Lock&);
+
+    // Handlers for pollable queues.
     void deliveredEvent(const Event&); 
     void deliveredFrame(const EventFrame&); 
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org