You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/09/26 21:22:01 UTC

svn commit: r699456 - in /incubator/qpid/trunk/qpid/cpp/src: ./ qpid/client/ qpid/cluster/

Author: aconway
Date: Fri Sep 26 12:22:00 2008
New Revision: 699456

URL: http://svn.apache.org/viewvc?rev=699456&view=rev
Log:
cluster:]
 - call updateMemberStats() exactly once for each change in cluster membership.
 - fix spurious replication of catch-up connection close events.
Removed unused client/MessageQueue.h

Removed:
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageQueue.h
Modified:
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=699456&r1=699455&r2=699456&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Fri Sep 26 12:22:00 2008
@@ -507,7 +507,6 @@
   qpid/client/LocalQueue.h \
   qpid/client/Message.h \
   qpid/client/MessageListener.h \
-  qpid/client/MessageQueue.h \
   qpid/client/Results.h \
   qpid/client/SessionBase_0_10.h \
   qpid/client/Session.h \

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=699456&r1=699455&r2=699456&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Fri Sep 26 12:22:00 2008
@@ -248,7 +248,7 @@
     cpg_address *joined, int nJoined)
 {
     Mutex::ScopedLock l(lock);
-    QPID_LOG(debug, "CPG members: " << AddrList(current, nCurrent) 
+    QPID_LOG(debug, "Process members: " << AddrList(current, nCurrent) 
              << AddrList(left, nLeft, "( ", ")"));
     
     if (find(left, left+nLeft, self) != left+nLeft) { 
@@ -258,7 +258,7 @@
         return;
     }
     
-    map.left(left, nLeft);
+    if (map.left(left, nLeft)) updateMemberStats();
     handler->configChange(current, nCurrent, left, nLeft, joined, nJoined);
 }
 
@@ -326,7 +326,7 @@
     mcastControl(ClusterShutdownBody(), 0);
 }
 
-void Cluster::updateMemberStats(void) {
+void Cluster::updateMemberStats() {
     if (mgmtObject) {
         mgmtObject->set_clusterSize(size()); 
         std::vector<Url> vectUrl = getUrls();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp?rev=699456&r1=699455&r2=699456&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp Fri Sep 26 12:22:00 2008
@@ -39,11 +39,14 @@
     return (members.empty()) ?  MemberId() : members.begin()->first;
 }
 
-void ClusterMap::left(const cpg_address* addrs, size_t size) {
-    size_t (Members::*erase)(const MemberId&) = &Members::erase;
-    std::for_each(addrs, addrs+size, boost::bind(erase, &members, _1));
+bool ClusterMap::left(const cpg_address* addrs, size_t nLeft) {
+    bool changed=false;
+    for (const cpg_address* a = addrs; a < addrs+nLeft; ++a) 
+        changed = members.erase(*a) || changed;
     if (dumper && !isMember(dumper))
         dumper = MemberId();
+    QPID_LOG_IF(debug, changed, *this);
+    return changed;
 }
 
 framing::ClusterUpdateBody ClusterMap::toControl() const {
@@ -54,11 +57,17 @@
     return b;
 }
 
-void ClusterMap::update(const framing::FieldTable& ftMembers, uint64_t dumper_) {
-    framing:: FieldTable::ValueMap::const_iterator i;
-    for (i = ftMembers.begin(); i != ftMembers.end(); ++i) 
-        members[i->first] = Url(i->second->get<std::string>());
+bool ClusterMap::update(const framing::FieldTable& ftMembers, uint64_t dumper_) {
     dumper = MemberId(dumper_);
+    bool changed = false;
+    framing:: FieldTable::ValueMap::const_iterator i;
+    for (i = ftMembers.begin(); i != ftMembers.end(); ++i) {
+        MemberId id(i->first);
+        Url url(i->second->get<std::string>());
+        changed = members.insert(Members::value_type(id, url)).second || changed;
+    }
+    QPID_LOG_IF(debug, changed, *this);
+    return changed;
 }
 
 std::vector<Url> ClusterMap::memberUrls() const {
@@ -68,14 +77,12 @@
     return result;        
 }
 
-std::ostream& operator<<(std::ostream& o, const ClusterMap::Members::value_type& mv) {
-    return o << mv.first << "=" << mv.second;
-}
-
 std::ostream& operator<<(std::ostream& o, const ClusterMap& m) {
-    std::ostream_iterator<ClusterMap::Members::value_type> im(o, "\n    ");
-    o << "dumper=" << m.dumper << ", members:\n    ";
-    std::copy(m.members.begin(), m.members.end(), im);
+    o << "Broker members:";
+    for (ClusterMap::Members::const_iterator i=m.members.begin(); i != m.members.end(); ++i) {
+        o << " " << i->first;
+        if (i->first == m.dumper) o << "(dumping)";
+    }
     return o;
 }
 
@@ -83,11 +90,17 @@
     return dumper==id || (!dumper && first() == id);
 }
 
-void ClusterMap::ready(const MemberId& id, const Url& url) {
-    members[id] = url;
-    if (id == dumper)
+bool ClusterMap::ready(const MemberId& id, const Url& url) {
+    bool changed = members.insert(Members::value_type(id,url)).second;
+    if (id == dumper) {
         dumper = MemberId();
-    QPID_LOG(info, id << " joined cluster: " << *this);
+        QPID_LOG(info, id << " finished dump.");
+    }
+    else {
+        QPID_LOG(info, id << " joined cluster, url=" << url);
+    }
+    QPID_LOG_IF(debug, changed, *this);
+    return changed;
 }
 
 }} // namespace qpid::cluster

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h?rev=699456&r1=699455&r2=699456&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h Fri Sep 26 12:22:00 2008
@@ -50,17 +50,21 @@
     /** First member of the cluster in ID order, gets to perform one-off tasks. */
     MemberId first() const;
 
-    /** Update for members leaving. */
-    void left(const cpg_address* addrs, size_t size);
+    /** Update for members leaving.
+     *@return true if the cluster membership changed.
+     */
+    bool left(const cpg_address* addrs, size_t size);
 
     /** Convert map contents to a cluster update body. */
     framing::ClusterUpdateBody toControl() const;
 
     /** Add a new member or dump complete if id == dumper. */
-    void ready(const MemberId& id, const Url& url);
+    bool ready(const MemberId& id, const Url& url);
 
-    /** Apply update delivered from clsuter. */
-    void update(const framing::FieldTable& members, uint64_t dumper);
+    /** Apply update delivered from cluster.
+     *@return true if cluster membership changed.
+     **/
+    bool update(const framing::FieldTable& members, uint64_t dumper);
 
     bool isMember(const MemberId& id) const { return members.find(id) != members.end(); }
 
@@ -72,7 +76,6 @@
   private:
 
   friend std::ostream& operator<<(std::ostream&, const ClusterMap&);
-  friend std::ostream& operator<<(std::ostream& o, const ClusterMap::Members::value_type& mv);
 };
 
 }} // namespace qpid::cluster

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=699456&r1=699455&r2=699456&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Fri Sep 26 12:22:00 2008
@@ -106,23 +106,26 @@
         QPID_LOG(debug, "Connection closed " << *this);
 
         if (catchUp) {
-            catchUp = false;
             cluster.catchUpClosed(boost::intrusive_ptr<Connection>(this));
-            if (!isShadow()) connection.closed();
+            if (isShadow()) 
+                catchUp = false;
+            else
+                connection.closed();
         }
-
-        // Local network connection has closed.  We need to keep the
-        // connection around but replace the output handler with a
-        // no-op handler as the network output handler will be
-        // deleted.
-        output.setOutputHandler(discardHandler);
-
-        if (isLocal()) {
-            // This was a local replicated connection. Multicast a deliver
-            // closed and process any outstanding frames from the cluster
-            // until self-delivery of deliver-close.
-            cluster.mcastControl(ClusterConnectionDeliverCloseBody(), this);
-            ++mcastSeq;
+        else {
+            // Local network connection has closed.  We need to keep the
+            // connection around but replace the output handler with a
+            // no-op handler as the network output handler will be
+            // deleted.
+            output.setOutputHandler(discardHandler);
+
+            if (isLocal()) {
+                // This was a local replicated connection. Multicast a deliver
+                // closed and process any outstanding frames from the cluster
+                // until self-delivery of deliver-close.
+                cluster.mcastControl(ClusterConnectionDeliverCloseBody(), this);
+                ++mcastSeq;
+            }
         }
     }
     catch (const std::exception& e) {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp?rev=699456&r1=699455&r2=699456&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp Fri Sep 26 12:22:00 2008
@@ -55,10 +55,8 @@
 
 void JoiningHandler::update(const MemberId&, const framing::FieldTable& members, uint64_t dumper) {
     Mutex::ScopedLock l(cluster.lock);
-    cluster.map.update(members, dumper);
-    QPID_LOG(debug, "Cluster update: " << cluster.map);
+    if (cluster.map.update(members, dumper)) cluster.updateMemberStats();
     checkDumpRequest();
-    cluster.updateMemberStats();
 }
 
 void JoiningHandler::checkDumpRequest() {
@@ -99,9 +97,10 @@
     }
 }
 
-void JoiningHandler::ready(const MemberId& id, const std::string& url) {
+void JoiningHandler::ready(const MemberId& id, const std::string& urlStr) {
     Mutex::ScopedLock l(cluster.lock);
-    cluster.map.ready(id, Url(url));
+    if (cluster.map.ready(id, Url(urlStr)))
+        cluster.updateMemberStats();
     checkDumpRequest();
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.cpp?rev=699456&r1=699455&r2=699456&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.cpp Fri Sep 26 12:22:00 2008
@@ -52,10 +52,8 @@
     cluster.connectionEventQueue.push(e);
 }
 
-void MemberHandler::update(const MemberId&, const framing::FieldTable& , uint64_t) {
-    Mutex::ScopedLock l(cluster.lock);
-    cluster.updateMemberStats();
-}
+// Updates are for new joiners.
+void MemberHandler::update(const MemberId&, const framing::FieldTable& , uint64_t) {}
 
 void MemberHandler::dumpRequest(const MemberId& dumpee, const std::string& urlStr) {
     Mutex::ScopedLock l(cluster.lock);
@@ -75,8 +73,9 @@
                             boost::bind(&MemberHandler::dumpError, this, _1)));
 }
 
-void MemberHandler::ready(const MemberId& id, const std::string& url) {
-    cluster.map.ready(id, Url(url));
+void MemberHandler::ready(const MemberId& id, const std::string& urlStr) {
+    if (cluster.map.ready(id, Url(urlStr)))
+        cluster.updateMemberStats();
 }