You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2010/03/30 22:57:13 UTC

svn commit: r929274 - in /qpid/trunk/qpid/cpp: src/qpid/cluster/Cluster.cpp src/qpid/cluster/Cluster.h src/qpid/cluster/ErrorCheck.cpp src/qpid/cluster/StoreStatus.h xml/cluster.xml

Author: aconway
Date: Tue Mar 30 20:57:12 2010
New Revision: 929274

URL: http://svn.apache.org/viewvc?rev=929274&view=rev
Log:
Cluster logging improvements: log config changes in the deliver thread.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.h
    qpid/trunk/qpid/cpp/xml/cluster.xml

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=929274&r1=929273&r2=929274&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Tue Mar 30 20:57:12 2010
@@ -217,8 +217,11 @@ struct ClusterDispatcher : public framin
     void ready(const std::string& url) {
         cluster.ready(member, url, l);
     }
-    void configChange(const std::string& current) {
-        cluster.configChange(member, current, l);
+    void configChange(const std::string& members,
+                      const std::string& left,
+                      const std::string& joined)
+    {
+        cluster.configChange(member, members, left, joined, l);
     }
     void updateOffer(uint64_t updatee) {
         cluster.updateOffer(member, updatee, l);
@@ -554,40 +557,28 @@ Cluster::ConnectionVector Cluster::getCo
                    boost::bind(&ConnectionMap::value_type::second, _1));
     return result;
 }
-  
-struct AddrList {
-    const cpg_address* addrs;
-    int count;
-    const char *prefix;
-    AddrList(const cpg_address* a, int n, const char* p="")
-        : addrs(a), count(n), prefix(p) {}
-};
-
-ostream& operator<<(ostream& o, const AddrList& a) {
-    if (!a.count) return o;
-    o << a.prefix;
-    for (const cpg_address* p = a.addrs; p < a.addrs+a.count; ++p)
-        o << qpid::cluster::MemberId(*p) << " ";
-    return o;
-}
 
+// CPG config-change callback.
 void Cluster::configChange ( 
     cpg_handle_t /*handle*/,
     const cpg_name */*group*/,
-    const cpg_address *current, int nCurrent,
+    const cpg_address *members, int nMembers,
     const cpg_address *left, int nLeft,
     const cpg_address *joined, int nJoined)
 {
     Mutex::ScopedLock l(lock);
-    QPID_LOG(notice, *this << " membership change: "
-             << AddrList(current, nCurrent) << "("
-             << AddrList(joined, nJoined, "joined: ")
-             << AddrList(left, nLeft, "left: ")
-             << ")");
-    string addresses;
-    for (const cpg_address* p = current; p < current+nCurrent; ++p) 
-        addresses.append(MemberId(*p).str());
-    deliverEvent(Event::control(ClusterConfigChangeBody(ProtocolVersion(), addresses), self));
+    string membersStr, leftStr, joinedStr;
+    // Encode members and enqueue as an event so the config change can
+    // be executed in the correct thread.
+    for (const cpg_address* p = members; p < members+nMembers; ++p)
+        membersStr.append(MemberId(*p).str());
+    for (const cpg_address* p = left; p < left+nLeft; ++p)
+        leftStr.append(MemberId(*p).str());
+    for (const cpg_address* p = joined; p < joined+nJoined; ++p)
+        joinedStr.append(MemberId(*p).str());
+    deliverEvent(Event::control(ClusterConfigChangeBody(
+                                    ProtocolVersion(), membersStr, leftStr, joinedStr),
+                                self));
 }
 
 void Cluster::setReady(Lock&) {
@@ -654,22 +645,33 @@ void Cluster::initMapCompleted(Lock& l) 
     }
 }
 
-void Cluster::configChange(const MemberId&, const std::string& configStr, Lock& l) {
+void Cluster::configChange(const MemberId&,
+                           const std::string& membersStr,
+                           const std::string& leftStr,
+                           const std::string& joinedStr,
+                           Lock& l)
+{
     if (state == LEFT) return;
+    MemberSet members = decodeMemberSet(membersStr);
+    MemberSet left = decodeMemberSet(leftStr);
+    MemberSet joined = decodeMemberSet(joinedStr);
+    QPID_LOG(notice, *this << " Membership update " << map.getConfigSeq() << ": "
+             << members);
+    QPID_LOG_IF(notice, !left.empty(), *this << " Members left: " << left);
+    QPID_LOG_IF(notice, !joined.empty(), *this << " Members joined: " << joined);
 
-    MemberSet config = decodeMemberSet(configStr);
-    elders = intersection(elders, config);
+    // Update initital status for members joining or leaving.
+    elders = intersection(elders, members);
     if (elders.empty() && INIT < state && state < CATCHUP) {
         QPID_LOG(critical, "Cannot update, all potential updaters left the cluster.");
         leave(l);
         return;
     }
-    bool memberChange = map.configChange(config);
-    QPID_LOG(debug, "Config sequence " << map.getConfigSeq());
+    bool memberChange = map.configChange(members);
     store.setConfigSeq(map.getConfigSeq());
 
     // Update initital status for members joining or leaving.
-    initMap.configChange(config);
+    initMap.configChange(members);
     if (initMap.isResendNeeded()) {
         mcast.mcastControl(
             ClusterInitialStatusBody(
@@ -965,8 +967,11 @@ void Cluster::memberUpdate(Lock& l) {
 
     if (store.hasStore()) {
         // Mark store clean if I am the only broker, dirty otherwise.
-        if (size == 1) store.clean(Uuid(true));
-        else store.dirty(clusterId);
+        if (size == 1 ) {
+            if (!store.isClean()) store.clean(Uuid(true));
+        } else {
+            if (!store.isDirty()) store.dirty(clusterId);
+        }
     }
 
     if (size == 1 && lastSize > 1 && state >= CATCHUP) {

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=929274&r1=929273&r2=929274&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Tue Mar 30 20:57:12 2010
@@ -163,7 +163,11 @@ class Cluster : private Cpg::Handler, pu
                        const std::string& firstConfig,
                        Lock&);
     void ready(const MemberId&, const std::string&, Lock&);
-    void configChange(const MemberId&, const std::string& current, Lock& l);
+    void configChange(const MemberId&,
+                      const std::string& members,
+                      const std::string& left,
+                      const std::string& joined,
+                      Lock& l);
     void messageExpired(const MemberId&, uint64_t, Lock& l);
     void errorCheck(const MemberId&, uint8_t type, SequenceNumber frameSeq, Lock&);
     void timerWakeup(const MemberId&, const std::string& name, Lock&);

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp?rev=929274&r1=929273&r2=929274&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp Tue Mar 30 20:57:12 2010
@@ -110,7 +110,7 @@ ErrorCheck::FrameQueue::iterator ErrorCh
         const ClusterConfigChangeBody* configChange =
             static_cast<const ClusterConfigChangeBody*>(method);
         if (configChange) {
-            MemberSet members(decodeMemberSet(configChange->getCurrent()));
+            MemberSet members(decodeMemberSet(configChange->getMembers()));
             QPID_LOG(debug, cluster << " apply config change to error "
                      << frameSeq << ": " << members);
             MemberSet intersect;

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.h?rev=929274&r1=929273&r2=929274&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.h Tue Mar 30 20:57:12 2010
@@ -42,6 +42,9 @@ class StoreStatus
     StoreStatus(const std::string& dir);
 
     framing::cluster::StoreState getState() const { return state; }
+    bool isClean() { return state == framing::cluster::STORE_STATE_CLEAN_STORE; }
+    bool isDirty() { return state == framing::cluster::STORE_STATE_DIRTY_STORE; }
+
     const Uuid& getClusterId() const { return clusterId; }
     const Uuid& getShutdownId() const { return shutdownId; }
     framing::SequenceNumber getConfigSeq() const { return configSeq; }

Modified: qpid/trunk/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=929274&r1=929273&r2=929274&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ qpid/trunk/qpid/cpp/xml/cluster.xml Tue Mar 30 20:57:12 2010
@@ -73,7 +73,9 @@
     </control>
 
     <control name="config-change" code="0x11" label="Raw cluster membership.">
-      <field name="current" type="vbin16"/> <!-- packed member-id array -->
+      <field name="members" type="vbin16"/> <!-- packed member-id array -->
+      <field name="joined" type="vbin16"/> <!-- packed member-id array -->
+      <field name="left" type="vbin16"/> <!-- packed member-id array -->
     </control>
 
     <control name="message-expired" code="0x12">



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org