You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2009/11/17 19:09:22 UTC

svn commit: r881423 - in /qpid/trunk/qpid/cpp: src/ src/qpid/cluster/ src/qpid/framing/ src/tests/ xml/

Author: aconway
Date: Tue Nov 17 18:09:21 2009
New Revision: 881423

URL: http://svn.apache.org/viewvc?rev=881423&view=rev
Log:
Integrated InitialStatusMap into cluster code.

Added:
    qpid/trunk/qpid/cpp/src/qpid/cluster/MemberSet.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/MemberSet.h
      - copied, changed from r881420, qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.cpp
Modified:
    qpid/trunk/qpid/cpp/src/cluster.mk
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/types.h
    qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.cpp
    qpid/trunk/qpid/cpp/src/tests/InitialStatusMap.cpp
    qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
    qpid/trunk/qpid/cpp/xml/cluster.xml

Modified: qpid/trunk/qpid/cpp/src/cluster.mk
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/cluster.mk?rev=881423&r1=881422&r2=881423&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ qpid/trunk/qpid/cpp/src/cluster.mk Tue Nov 17 18:09:21 2009
@@ -84,6 +84,8 @@
   qpid/cluster/Quorum.h				\
   qpid/cluster/InitialStatusMap.h		\
   qpid/cluster/InitialStatusMap.cpp		\
+  qpid/cluster/MemberSet.h			\
+  qpid/cluster/MemberSet.cpp			\
   qpid/cluster/types.h
 
 cluster_la_LIBADD=  -lcpg $(libcman) libqpidbroker.la libqpidclient.la

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=881423&r1=881422&r2=881423&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Tue Nov 17 18:09:21 2009
@@ -162,13 +162,14 @@
     ClusterDispatcher(Cluster& c, const MemberId& id, Cluster::Lock& l_) : cluster(c), member(id), l(l_) {}
 
     void updateRequest(const std::string& url) { cluster.updateRequest(member, url, l); }
-    void initialStatus(bool active, bool persistent, const framing::FieldTable& props) {
-        cluster.initialStatus(member, active, persistent, props);
+    void initialStatus(bool active, bool persistent, const Uuid& clusterId,
+                       uint32_t version, const std::string& url) {
+        cluster.initialStatus(member, active, persistent, clusterId, version, url, l);
     }
     void ready(const std::string& url) { cluster.ready(member, url, l); }
     void configChange(const std::string& current) { cluster.configChange(member, current, l); }
-    void updateOffer(uint64_t updatee, const Uuid& id, uint32_t version) {
-        cluster.updateOffer(member, updatee, id, version, l);
+    void updateOffer(uint64_t updatee) {
+        cluster.updateOffer(member, updatee, l);
     }
     void retractOffer(uint64_t updatee) { cluster.retractOffer(member, updatee, l); }
     void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); }
@@ -190,6 +191,7 @@
     name(settings.name),
     myUrl(settings.url.empty() ? Url() : Url(settings.url)),
     self(cpg.self()),
+    clusterId(true),
     expiryPolicy(new ExpiryPolicy(mcast, self, broker.getTimer())),
     mcast(cpg, poller, boost::bind(&Cluster::leave, this)),
     dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)),
@@ -206,6 +208,7 @@
     decoder(boost::bind(&Cluster::deliverFrame, this, _1)),
     discarding(true),
     state(INIT),
+    initMap(self),
     lastSize(0),
     lastBroker(false),
     updateRetracted(false),
@@ -265,8 +268,8 @@
 // Called in connection thread to insert an updated shadow connection.
 void Cluster::addShadowConnection(const boost::intrusive_ptr<Connection>& c) {
     QPID_LOG(info, *this << " new shadow connection " << c->getId());
-    // Safe to use connections here because we're pre-catchup, either
-    // discarding or stalled, so deliveredFrame is not processing any
+    // Safe to use connections here because we're pre-catchup, stalled
+    // and discarding, so deliveredFrame is not processing any
     // connection events.
     assert(discarding);         
     pair<ConnectionMap::iterator, bool> ib
@@ -522,7 +525,8 @@
     const cpg_address *joined, int nJoined)
 {
     Mutex::ScopedLock l(lock);
-    if (state == INIT) {        // First config change.
+    if (state == INIT) {
+        // FIXME aconway 2009-11-16: persistent restart
         // Recover only if we are first in cluster.
         broker.setRecovery(nCurrent == 1);
         initialized = true;
@@ -545,39 +549,55 @@
     broker.getQueueEvents().enable();
 }
 
-void Cluster::configChange(const MemberId&, const std::string& current, Lock& l) {
-    bool memberChange = map.configChange(current);
-    QPID_LOG(debug, *this << " applied config change: " << map);
+void Cluster::initMapCompleted(Lock& l) {
+    if (state == INIT) {
+        elders = initMap.getElders();
+        if (!elders.empty()) { // I'm not the elder, I don't handle links & replication.
+            broker.getLinks().setPassive(true);
+            broker.getQueueEvents().disable();
+        }
+        setClusterId(initMap.getClusterId(), l);
+        if (initMap.isUpdateNeeded())  { // Joining established cluster.
+            state = JOINER;
+            mcast.mcastControl(
+                ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self);
+        }
+        else {                  // I can go ready.
+            QPID_LOG(notice, *this << " ready.");
+            discarding = false;
+            setReady(l);
+            map = ClusterMap(initMap.getMemberUrls());
+            memberUpdate(l);
+        }
+    }
+}
+
+void Cluster::configChange(const MemberId&, const std::string& configStr, Lock& l) {
     if (state == LEFT) return;
-    
-    if (!map.isAlive(self)) {  // Final config change.
+
+    MemberSet config = decodeMemberSet(configStr);
+    elders = intersection(elders, config);
+    if (elders.empty() && INIT < state && state < CATCHUP) {
+        QPID_LOG(critical, "Cannot update, all potential updaters left the cluster.");
         leave(l);
         return;
     }
+    bool memberChange = map.configChange(config);
 
-    if (state == INIT) {        // First configChange
-        if (map.aliveCount() == 1) {
-            setClusterId(true, l);
-            discarding = false;
-            setReady(l);
-            map = ClusterMap(self, myUrl, true);
-            memberUpdate(l);
-            QPID_LOG(notice, *this << " first in cluster");
-        }
-        else {                  // Joining established group.
-            state = JOINER;
-            mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self);
-            elders = map.getAlive();
-            elders.erase(self);
-            broker.getLinks().setPassive(true);
-            broker.getQueueEvents().disable();
-        }
-    } 
-    else if (state >= CATCHUP && memberChange) {
+    // Update initital status for new members joining.
+    initMap.configChange(config);
+    if (initMap.isResendNeeded()) {
+        mcast.mcastControl(
+            // FIXME aconway 2009-11-17: persistent restart, set persistence bit.
+            ClusterInitialStatusBody(ProtocolVersion(), state > INIT, false, clusterId,
+                                     CLUSTER_VERSION, myUrl.str()), self);
+    }
+    if (initMap.transitionToComplete()) initMapCompleted(l);
+
+    if (state >= CATCHUP && memberChange) {
         memberUpdate(l);
-        elders = ClusterMap::intersection(elders, map.getAlive());
         if (elders.empty()) {
-            //assume we are oldest, reactive links if necessary
+            // We are the oldest, reactive links if necessary
             broker.getLinks().setPassive(false);
         }
     }
@@ -587,8 +607,7 @@
     if (state == READY && map.isJoiner(id)) {
         state = OFFER;
         QPID_LOG(info, *this << " send update-offer to " << id);
-        mcast.mcastControl(
-            ClusterUpdateOfferBody(ProtocolVersion(), id, clusterId, CLUSTER_VERSION), self);
+        mcast.mcastControl(ClusterUpdateOfferBody(ProtocolVersion(), id), self);
     }
 }
 
@@ -610,10 +629,23 @@
     makeOffer(id, l);
 }
 
-void Cluster::initialStatus(const MemberId&, bool /*active*/, bool /*persistent*/,
-                     const framing::FieldTable&) {
-    // FIXME aconway 2009-11-12: fill in.
+void Cluster::initialStatus(const MemberId& member, bool active, bool persistent,
+                            const framing::Uuid& id, uint32_t version,
+                            const std::string& url, Lock& l)
+{
+    if (version != CLUSTER_VERSION) {
+        QPID_LOG(critical, *this << " incompatible cluster versions " <<
+                 version << " != " << CLUSTER_VERSION);
+        leave(l);
+        return;
+    }
+    initMap.received(
+        member,
+        ClusterInitialStatusBody(ProtocolVersion(), active, persistent, id, version, url)
+    );
+    if (initMap.transitionToComplete()) initMapCompleted(l);
 }
+
 void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) {
     if (map.ready(id, Url(url))) 
         memberUpdate(l);
@@ -623,17 +655,10 @@
     }
 }
 
-void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uuid& uuid,
-                          uint32_t version, Lock& l) {
+void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, Lock& l) {
     // NOTE: deliverEventQueue has been stopped at the update offer by
     // deliveredEvent in case an update is required.
     if (state == LEFT) return;
-    if (version != CLUSTER_VERSION) {
-        QPID_LOG(critical, *this << " incompatible cluster versions " <<
-                 version << " != " << CLUSTER_VERSION);
-        leave(l);
-        return;
-    }
     MemberId updatee(updateeInt);
     boost::optional<Url> url = map.updateOffer(updater, updatee);
     if (updater == self) {
@@ -649,7 +674,6 @@
     }
     else if (updatee == self && url) {
         assert(state == JOINER);
-        setClusterId(uuid, l);
         state = UPDATEE;
         QPID_LOG(notice, *this << " receiving update from " << updater);
         checkUpdateIn(l);

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=881423&r1=881422&r2=881423&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Tue Nov 17 18:09:21 2009
@@ -19,6 +19,7 @@
  *
  */
 
+#include "InitialStatusMap.h"
 #include "ClusterMap.h"
 #include "ClusterSettings.h"
 #include "Cpg.h"
@@ -144,10 +145,11 @@
 
     // Cluster controls implement XML methods from cluster.xml.
     void updateRequest(const MemberId&, const std::string&, Lock&);
-    void updateOffer(const MemberId& updater, uint64_t updatee, const framing::Uuid&,
-                     uint32_t version, Lock&);
+    void updateOffer(const MemberId& updater, uint64_t updatee, Lock&);
     void retractOffer(const MemberId& updater, uint64_t updatee, Lock&);
-    void initialStatus(const MemberId&, bool active, bool persistent, const framing::FieldTable& props);
+    void initialStatus(const MemberId&, bool active, bool persistent,
+                       const framing::Uuid& id, uint32_t version,
+                       const std::string& url, Lock&);
     void ready(const MemberId&, const std::string&, Lock&);
     void configChange(const MemberId&, const std::string& current, Lock& l);
     void messageExpired(const MemberId&, uint64_t, Lock& l);
@@ -165,6 +167,10 @@
     void setClusterId(const framing::Uuid&, Lock&);
     void erase(const ConnectionId&, Lock&);       
 
+    void initMapCompleted(Lock&);
+
+
+
     // == Called in CPG dispatch thread
     void deliver( // CPG deliver callback. 
         cpg_handle_t /*handle*/,
@@ -241,7 +247,7 @@
 
     //    Local cluster state, cluster map
     enum {
-        INIT,    ///< Initial state, no CPG messages received.
+        INIT,    ///< Establishing inital cluster stattus.
         JOINER,  ///< Sent update request, waiting for update offer.
         UPDATEE, ///< Stalled receive queue at update offer, waiting for update to complete.
         CATCHUP, ///< Update complete, unstalled but has not yet seen own "ready" event.
@@ -252,8 +258,9 @@
     } state;
 
     ConnectionMap connections;
+    InitialStatusMap initMap;
     ClusterMap map;
-    ClusterMap::Set elders;
+    MemberSet elders;
     size_t lastSize;
     bool lastBroker;
     sys::Thread updateThread;

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp?rev=881423&r1=881422&r2=881423&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp Tue Nov 17 18:09:21 2009
@@ -28,24 +28,20 @@
 #include <iterator>
 #include <ostream>
 
+using namespace std;
+using namespace boost;
+
 namespace qpid {
 using namespace framing;
 
 namespace cluster {
 
-ClusterMap::Set ClusterMap::decode(const std::string& s) {
-    Set set;
-    for (std::string::const_iterator i = s.begin(); i < s.end(); i += 8)  
-        set.insert(MemberId(std::string(i, i+8)));
-    return set;
-}
-
 namespace {
 
 void addFieldTableValue(FieldTable::ValueMap::value_type vt, ClusterMap::Map& map, ClusterMap::Set& set) {
     MemberId id(vt.first);
     set.insert(id);
-    std::string url = vt.second->get<std::string>();
+    string url = vt.second->get<string>();
     if (!url.empty())
         map.insert(ClusterMap::Map::value_type(id, Url(url)));
 }
@@ -56,37 +52,34 @@
 
 void assignFieldTable(FieldTable& ft, const ClusterMap::Map& map) {
     ft.clear();
-    std::for_each(map.begin(), map.end(), boost::bind(&insertFieldTableFromMapValue, boost::ref(ft), _1));
+    for_each(map.begin(), map.end(), bind(&insertFieldTableFromMapValue, ref(ft), _1));
 }
 
 }
 
 ClusterMap::ClusterMap() : frameSeq(0) {}
 
-ClusterMap::ClusterMap(const MemberId& id, const Url& url , bool isMember) : frameSeq(0) {
-    alive.insert(id);
-    if (isMember)
-        members[id] = url;
-    else
-        joiners[id] = url;
+ClusterMap::ClusterMap(const Map& map) : frameSeq(0) {
+    transform(map.begin(), map.end(), inserter(alive, alive.begin()), bind(&Map::value_type::first, _1));
+    members = map;
 }
 
 ClusterMap::ClusterMap(const FieldTable& joinersFt, const FieldTable& membersFt, framing::SequenceNumber frameSeq_)
   : frameSeq(frameSeq_)
 {
-    std::for_each(joinersFt.begin(), joinersFt.end(), boost::bind(&addFieldTableValue, _1, boost::ref(joiners), boost::ref(alive)));
-    std::for_each(membersFt.begin(), membersFt.end(), boost::bind(&addFieldTableValue, _1, boost::ref(members), boost::ref(alive)));
+    for_each(joinersFt.begin(), joinersFt.end(), bind(&addFieldTableValue, _1, ref(joiners), ref(alive)));
+    for_each(membersFt.begin(), membersFt.end(), bind(&addFieldTableValue, _1, ref(members), ref(alive)));
 }
 
 void ClusterMap::toMethodBody(framing::ClusterConnectionMembershipBody& b) const {
     b.getJoiners().clear();
-    std::for_each(joiners.begin(), joiners.end(), boost::bind(&insertFieldTableFromMapValue, boost::ref(b.getJoiners()), _1));
+    for_each(joiners.begin(), joiners.end(), bind(&insertFieldTableFromMapValue, ref(b.getJoiners()), _1));
     for(Set::const_iterator i = alive.begin(); i != alive.end(); ++i) {
         if (!isMember(*i) && !isJoiner(*i))
-            b.getJoiners().setString(i->str(), std::string());
+            b.getJoiners().setString(i->str(), string());
     }
     b.getMembers().clear();
-    std::for_each(members.begin(), members.end(), boost::bind(&insertFieldTableFromMapValue, boost::ref(b.getMembers()), _1));
+    for_each(members.begin(), members.end(), bind(&insertFieldTableFromMapValue, ref(b.getMembers()), _1));
     b.setFrameSeq(frameSeq);
 }
 
@@ -99,21 +92,21 @@
     return joiners.empty() ? MemberId() : joiners.begin()->first;
 }
 
-std::vector<string> ClusterMap::memberIds() const {
-    std::vector<string> ids;
+vector<string> ClusterMap::memberIds() const {
+    vector<string> ids;
     for (Map::const_iterator iter = members.begin();
          iter != members.end(); iter++) {
-        std::stringstream stream;
+        stringstream stream;
         stream << iter->first;
         ids.push_back(stream.str());
     }
     return ids;
 }
 
-std::vector<Url> ClusterMap::memberUrls() const {
-    std::vector<Url> urls(members.size());
-    std::transform(members.begin(), members.end(), urls.begin(),
-                   boost::bind(&Map::value_type::second, _1));
+vector<Url> ClusterMap::memberUrls() const {
+    vector<Url> urls(members.size());
+    transform(members.begin(), members.end(), urls.begin(),
+                   bind(&Map::value_type::second, _1));
     return urls;
 }
 
@@ -121,18 +114,18 @@
 
 ClusterMap::Set ClusterMap::getMembers() const {
     Set s;
-    std::transform(members.begin(), members.end(), std::inserter(s, s.begin()),
-                   boost::bind(&Map::value_type::first, _1));
+    transform(members.begin(), members.end(), inserter(s, s.begin()),
+                   bind(&Map::value_type::first, _1));
     return s;
 }
 
-std::ostream& operator<<(std::ostream& o, const ClusterMap::Map& m) {
-    std::ostream_iterator<MemberId> oi(o);
-    std::transform(m.begin(), m.end(), oi, boost::bind(&ClusterMap::Map::value_type::first, _1));
+ostream& operator<<(ostream& o, const ClusterMap::Map& m) {
+    ostream_iterator<MemberId> oi(o);
+    transform(m.begin(), m.end(), oi, bind(&ClusterMap::Map::value_type::first, _1));
     return o;
 }
 
-std::ostream& operator<<(std::ostream& o, const ClusterMap& m) {
+ostream& operator<<(ostream& o, const ClusterMap& m) {
     for (ClusterMap::Set::const_iterator i = m.alive.begin(); i != m.alive.end(); ++i) {
         o << *i;
         if (m.isMember(*i)) o << "(member)";
@@ -143,7 +136,7 @@
     return o;
 }
 
-bool ClusterMap::updateRequest(const MemberId& id, const std::string& url) {
+bool ClusterMap::updateRequest(const MemberId& id, const string& url) {
     if (isAlive(id)) {
         joiners[id] = Url(url);
         return true;
@@ -155,13 +148,12 @@
     return isAlive(id) &&  members.insert(Map::value_type(id,url)).second;
 }
 
-bool ClusterMap::configChange(const std::string& addresses) {
+bool ClusterMap::configChange(const Set& update) {
     bool memberChange = false;
-    Set update = decode(addresses);
     Set removed;
-    std::set_difference(alive.begin(), alive.end(),
+    set_difference(alive.begin(), alive.end(),
                         update.begin(), update.end(),
-                        std::inserter(removed, removed.begin()));
+                        inserter(removed, removed.begin()));
     alive = update;
     for (Set::const_iterator i = removed.begin(); i != removed.end(); ++i) {
         memberChange = memberChange || members.erase(*i);
@@ -170,23 +162,14 @@
     return memberChange;
 }
 
-boost::optional<Url> ClusterMap::updateOffer(const MemberId& from, const MemberId& to) {
+optional<Url> ClusterMap::updateOffer(const MemberId& from, const MemberId& to) {
     Map::iterator i = joiners.find(to);
     if (isAlive(from) && i != joiners.end()) {
         Url url= i->second;
         joiners.erase(i);       // No longer a potential updatee.
         return url;
     }
-    return boost::optional<Url>();
+    return optional<Url>();
 }
 
-ClusterMap::Set ClusterMap::intersection(const ClusterMap::Set& a, const ClusterMap::Set& b)
-{
-    Set intersection;
-    std::set_intersection(a.begin(), a.end(),
-                          b.begin(), b.end(),
-                          std::inserter(intersection, intersection.begin()));
-    return intersection;
-
-}
 }} // namespace qpid::cluster

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h?rev=881423&r1=881422&r2=881423&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h Tue Nov 17 18:09:21 2009
@@ -22,7 +22,7 @@
  *
  */
 
-#include "qpid/cluster/types.h"
+#include "MemberSet.h"
 #include "qpid/Url.h"
 #include "qpid/framing/ClusterConnectionMembershipBody.h"
 #include "qpid/framing/SequenceNumber.h"
@@ -47,16 +47,14 @@
     typedef std::map<MemberId, Url> Map;
     typedef std::set<MemberId> Set;
 
-    static Set decode(const std::string&);
-        
     ClusterMap();
-    ClusterMap(const MemberId& id, const Url& url, bool isReady);
+    ClusterMap(const Map& map);
     ClusterMap(const framing::FieldTable& joiners, const framing::FieldTable& members, framing::SequenceNumber frameSeq);
 
     /** Update from config change.
      *@return true if member set changed.
      */
-    bool configChange(const std::string& addresses);
+    bool configChange(const Set& members);
 
     bool isJoiner(const MemberId& id) const { return joiners.find(id) != joiners.end(); }
     bool isMember(const MemberId& id) const { return members.find(id) != members.end(); }
@@ -85,11 +83,6 @@
     /**@return true If this is a new member */ 
     bool ready(const MemberId& id, const Url&);
 
-    /**
-     * Utility method to return intersection of two member sets
-     */
-    static Set intersection(const Set& a, const Set& b);
-
     framing::SequenceNumber getFrameSeq() { return frameSeq; }
     framing::SequenceNumber incrementFrameSeq() { return ++frameSeq; }
     

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp?rev=881423&r1=881422&r2=881423&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp Tue Nov 17 18:09:21 2009
@@ -105,7 +105,7 @@
 
     QPID_LOG(notice, "Initializing CPG");
     cpg_error_t err = cpg_initialize(&handle, &callbacks);
-    int retries = 6; // FIXME aconway 2009-08-06: configure, use same config for cman connection.
+    int retries = 6; // FIXME aconway 2009-08-06: make this configurable.
     while (err == CPG_ERR_TRY_AGAIN && --retries) {
         QPID_LOG(notice, "Re-trying CPG initialization.");
         sys::sleep(5);

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp?rev=881423&r1=881422&r2=881423&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp Tue Nov 17 18:09:21 2009
@@ -39,11 +39,6 @@
     : cluster(c), mcast(c.getMulticast()), frameSeq(0), type(ERROR_TYPE_NONE), connection(0)
 {}
 
-ostream& operator<<(ostream& o, const ErrorCheck::MemberSet& ms) {
-    copy(ms.begin(), ms.end(), ostream_iterator<MemberId>(o, " "));
-    return o;
-}
-
 void ErrorCheck::error(
     Connection& c, ErrorType t, framing::SequenceNumber seq, const MemberSet& ms,
     const std::string& msg)
@@ -115,7 +110,7 @@
         const ClusterConfigChangeBody* configChange =
             static_cast<const ClusterConfigChangeBody*>(method);
         if (configChange) {
-            MemberSet members(ClusterMap::decode(configChange->getCurrent()));
+            MemberSet members(decodeMemberSet(configChange->getCurrent()));
             QPID_LOG(debug, cluster << " apply config change to error "
                      << frameSeq << ": " << members);
             MemberSet intersect;

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.h?rev=881423&r1=881422&r2=881423&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.h Tue Nov 17 18:09:21 2009
@@ -22,7 +22,7 @@
  *
  */
 
-#include "qpid/cluster/types.h"
+#include "qpid/cluster/MemberSet.h"
 #include "qpid/cluster/Multicaster.h"
 #include "qpid/framing/enum.h"
 #include "qpid/framing/SequenceNumber.h"
@@ -34,7 +34,6 @@
 namespace cluster {
 
 class EventFrame;
-class ClusterMap;
 class Cluster;
 class Multicaster;
 class Connection;
@@ -48,7 +47,6 @@
 class ErrorCheck
 {
   public:
-    typedef std::set<MemberId> MemberSet;
     typedef framing::cluster::ErrorType ErrorType;
     typedef framing::SequenceNumber SequenceNumber;
     

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp?rev=881423&r1=881422&r2=881423&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp Tue Nov 17 18:09:21 2009
@@ -29,13 +29,12 @@
 namespace cluster {
 
 InitialStatusMap::InitialStatusMap(const MemberId& self_)
-    : self(self_), complete(), updateNeeded(), resendNeeded()
-{
-    map[self] = optional<Status>();
-}
+    : self(self_), completed(), resendNeeded()
+{}
 
 void InitialStatusMap::configChange(const MemberSet& members) {
     resendNeeded = false;
+    bool wasComplete = isComplete();
     if (firstConfig.empty()) firstConfig = members;
     MemberSet::const_iterator i = members.begin();
     Map::iterator j = map.begin();
@@ -66,10 +65,13 @@
         for (Map::iterator i = map.begin(); i != map.end(); ++i)
             i->second = optional<Status>();
     }
+    completed = isComplete() && !wasComplete; // Set completed on the transition.
 }
 
 void InitialStatusMap::received(const MemberId& m, const Status& s){
+    bool wasComplete = isComplete();
     map[m] = s;
+    completed = isComplete() && !wasComplete; // Set completed on the transition.
 }
 
 bool InitialStatusMap::notInitialized(const Map::value_type& v) {
@@ -81,7 +83,11 @@
 }
 
 bool InitialStatusMap::isComplete() {
-    return find_if(map.begin(), map.end(), &notInitialized) == map.end();
+    return !map.empty() && find_if(map.begin(), map.end(), &notInitialized) == map.end();
+}
+
+bool InitialStatusMap::transitionToComplete() {
+    return completed;
 }
 
 bool InitialStatusMap::isResendNeeded() {
@@ -107,4 +113,26 @@
     return elders;
 }
 
+// Get cluster ID from an active member or the youngest newcomer.
+framing::Uuid InitialStatusMap::getClusterId() {
+    assert(isComplete());
+    assert(!map.empty());
+    Map::iterator i = find_if(map.begin(), map.end(), &isActive);
+    if (i != map.end())
+        return i->second->getClusterId(); // An active member
+    else
+        return map.begin()->second->getClusterId();
+}
+
+std::map<MemberId, Url> InitialStatusMap::getMemberUrls() {
+    assert(isComplete());
+    assert(!isUpdateNeeded());
+    std::map<MemberId, Url> urlMap;
+    for (Map::iterator i = map.begin(); i != map.end(); ++i) {
+        assert(i->second);
+        urlMap.insert(std::make_pair(i->first, i->second->getUrl()));
+    }
+    return urlMap;
+}
+
 }} // namespace qpid::cluster

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.h?rev=881423&r1=881422&r2=881423&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.h Tue Nov 17 18:09:21 2009
@@ -22,7 +22,7 @@
  *
  */
 
-#include "types.h"
+#include "MemberSet.h"
 #include <qpid/framing/ClusterInitialStatusBody.h>
 #include <boost/optional.hpp>
 
@@ -48,10 +48,19 @@
 
     /**@return true if the map is complete. */
     bool isComplete();
-    /**@pre isComplete. @return this node's elders */
+    /**@return true if the map was completed by the last config change or received. */
+    bool transitionToComplete();
+    /**@pre isComplete(). @return this node's elders */
     MemberSet getElders();
-    /**@pre isComplete. @return True if we need an update. */
+    /**@pre isComplete(). @return True if we need an update. */
     bool isUpdateNeeded();
+    /**@pre isComplete(). @return Cluster-wide cluster ID. */
+    framing::Uuid getClusterId();
+
+    /**@pre isComplete() && !isUpdateNeeded().
+     *@return member->URL map for all members.
+     */
+    std::map<MemberId, Url> getMemberUrls();
 
   private:
     typedef std::map<MemberId, boost::optional<Status> > Map;
@@ -61,7 +70,7 @@
     Map map;
     MemberSet firstConfig;
     MemberId self;
-    bool complete, updateNeeded, resendNeeded;
+    bool completed, resendNeeded;
 };
 }} // namespace qpid::cluster
 

Added: qpid/trunk/qpid/cpp/src/qpid/cluster/MemberSet.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberSet.cpp?rev=881423&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/MemberSet.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/MemberSet.cpp Tue Nov 17 18:09:21 2009
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "MemberSet.h"
+#include <ostream>
+
+namespace qpid {
+namespace cluster {
+
+MemberSet decodeMemberSet(const std::string& s) {
+    MemberSet set;
+    for (std::string::const_iterator i = s.begin(); i < s.end(); i += 8) {
+        assert(size_t(i-s.begin())+8 <= s.size());
+        set.insert(MemberId(std::string(i, i+8)));
+    }
+    return set;
+}
+
+MemberSet intersection(const MemberSet& a, const MemberSet& b)
+{
+    MemberSet intersection;
+    std::set_intersection(a.begin(), a.end(),
+                          b.begin(), b.end(),
+                          std::inserter(intersection, intersection.begin()));
+    return intersection;
+
+}
+
+std::ostream& operator<<(std::ostream& o, const MemberSet& ms) {
+    copy(ms.begin(), ms.end(), std::ostream_iterator<MemberId>(o, " "));
+    return o;
+}
+
+}} // namespace qpid::cluster

Copied: qpid/trunk/qpid/cpp/src/qpid/cluster/MemberSet.h (from r881420, qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.cpp)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberSet.h?p2=qpid/trunk/qpid/cpp/src/qpid/cluster/MemberSet.h&p1=qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.cpp&r1=881420&r2=881423&rev=881423&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/MemberSet.h Tue Nov 17 18:09:21 2009
@@ -1,3 +1,6 @@
+#ifndef QPID_CLUSTER_MEMBERSET_H
+#define QPID_CLUSTER_MEMBERSET_H
+
 /*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -7,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,27 +21,23 @@
  * under the License.
  *
  */
-#include "qpid/framing/AMQContentBody.h"
-#include <iostream>
 
-qpid::framing::AMQContentBody::AMQContentBody(){
-}
+#include "types.h"
+#include <set>
+#include <iosfwd>
+
+namespace qpid {
+namespace cluster {
+
+typedef std::set<MemberId> MemberSet;
+
+MemberSet decodeMemberSet(const std::string&);
+
+MemberSet intersection(const MemberSet& a, const MemberSet& b);
+
+std::ostream& operator<<(std::ostream& o, const MemberSet& ms);
+
 
-qpid::framing::AMQContentBody::AMQContentBody(const string& _data) : data(_data){
-}
+}} // namespace qpid::cluster
 
-uint32_t qpid::framing::AMQContentBody::encodedSize() const{
-    return data.size();
-}
-void qpid::framing::AMQContentBody::encode(Buffer& buffer) const{
-    buffer.putRawData(data);
-}
-void qpid::framing::AMQContentBody::decode(Buffer& buffer, uint32_t _size){
-    buffer.getRawData(data, _size);
-}
-
-void qpid::framing::AMQContentBody::print(std::ostream& out) const
-{
-    out << "content (" << encodedSize() << " bytes)";
-    out << " " << data.substr(0,16) << "...";
-}
+#endif  /*!QPID_CLUSTER_MEMBERSET_H*/

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/types.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h?rev=881423&r1=881422&r2=881423&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/types.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/types.h Tue Nov 17 18:09:21 2009
@@ -29,8 +29,6 @@
 #include <utility>
 #include <iosfwd>
 #include <string>
-#include <set>
-
 
 extern "C" {
 #if defined (HAVE_OPENAIS_CPG_H)
@@ -76,8 +74,6 @@
     uint64_t getNumber() const { return second; }
 };
 
-typedef std::set<MemberId> MemberSet;
-
 std::ostream& operator<<(std::ostream&, const ConnectionId&);
 
 std::ostream& operator<<(std::ostream&, EventType);

Modified: qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.cpp?rev=881423&r1=881422&r2=881423&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.cpp Tue Nov 17 18:09:21 2009
@@ -40,5 +40,7 @@
 void qpid::framing::AMQContentBody::print(std::ostream& out) const
 {
     out << "content (" << encodedSize() << " bytes)";
-    out << " " << data.substr(0,16) << "...";
+    const size_t max = 32;
+    out << " " << data.substr(0, max);
+    if (data.size() > max) out << "...";
 }

Modified: qpid/trunk/qpid/cpp/src/tests/InitialStatusMap.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/InitialStatusMap.cpp?rev=881423&r1=881422&r2=881423&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/InitialStatusMap.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/InitialStatusMap.cpp Tue Nov 17 18:09:21 2009
@@ -20,6 +20,7 @@
 #include "unit_test.h"
 #include "test_tools.h"
 #include "qpid/cluster/InitialStatusMap.h"
+#include "qpid/framing/Uuid.h"
 #include <boost/assign.hpp>
 
 using namespace std;
@@ -34,58 +35,72 @@
 
 typedef InitialStatusMap::Status Status;
 
-Status activeStatus() { return Status(ProtocolVersion(), true, false, FieldTable()); }
-Status newcomerStatus() { return Status(ProtocolVersion(), false, false, FieldTable()); }
+Status activeStatus(const Uuid& id=Uuid()) { return Status(ProtocolVersion(), true, false, id, 0, ""); }
+Status newcomerStatus(const Uuid& id=Uuid()) { return Status(ProtocolVersion(), false, false, id, 0, ""); }
 
 QPID_AUTO_TEST_CASE(testFirstInCluster) {
     // Single member is first in cluster.
     InitialStatusMap map(MemberId(0));
+    Uuid id(true);
     BOOST_CHECK(!map.isComplete());
     MemberSet members = list_of(MemberId(0));
     map.configChange(members);
     BOOST_CHECK(!map.isComplete());
-    map.received(MemberId(0), newcomerStatus());
+    map.received(MemberId(0), newcomerStatus(id));
     BOOST_CHECK(map.isComplete());
+    BOOST_CHECK(map.transitionToComplete());
     BOOST_CHECK(map.getElders().empty());
     BOOST_CHECK(!map.isUpdateNeeded());
+    BOOST_CHECK_EQUAL(id, map.getClusterId());
 }
 
 QPID_AUTO_TEST_CASE(testJoinExistingCluster) {
     // Single member 0 joins existing cluster 1,2
     InitialStatusMap map(MemberId(0));
+    Uuid id(true);
     MemberSet members = list_of(MemberId(0))(MemberId(1))(MemberId(2));
     map.configChange(members);
     BOOST_CHECK(map.isResendNeeded());
     BOOST_CHECK(!map.isComplete());
     map.received(MemberId(0), newcomerStatus());
-    map.received(MemberId(1), activeStatus());
+    map.received(MemberId(1), activeStatus(id));
     BOOST_CHECK(!map.isComplete());
-    map.received(MemberId(2), activeStatus());
+    map.received(MemberId(2), activeStatus(id));
     BOOST_CHECK(map.isComplete());
+    BOOST_CHECK(map.transitionToComplete());
     BOOST_CHECK_EQUAL(map.getElders(), list_of<MemberId>(1)(2));
     BOOST_CHECK(map.isUpdateNeeded());
+    BOOST_CHECK_EQUAL(map.getClusterId(), id);
+
+    // Check that transitionToComplete is reset.
+    map.configChange(list_of<MemberId>(0)(1));
+    BOOST_CHECK(!map.transitionToComplete());
 }
 
 QPID_AUTO_TEST_CASE(testMultipleFirstInCluster) {
     // Multiple members 0,1,2 join at same time.
     InitialStatusMap map(MemberId(1)); // self is 1
+    Uuid id(true);
     MemberSet members = list_of(MemberId(0))(MemberId(1))(MemberId(2));
     map.configChange(members);
     BOOST_CHECK(map.isResendNeeded());
 
     // All new members
-    map.received(MemberId(0), newcomerStatus());
+    map.received(MemberId(0), newcomerStatus(id));
     map.received(MemberId(1), newcomerStatus());
     map.received(MemberId(2), newcomerStatus());
     BOOST_CHECK(!map.isResendNeeded());
     BOOST_CHECK(map.isComplete());
+    BOOST_CHECK(map.transitionToComplete());
     BOOST_CHECK_EQUAL(map.getElders(), list_of(MemberId(2)));
     BOOST_CHECK(!map.isUpdateNeeded());
+    BOOST_CHECK_EQUAL(map.getClusterId(), id);
 }
 
 QPID_AUTO_TEST_CASE(testMultipleJoinExisting) {
     // Multiple members 1,2,3 join existing cluster containing 0.
     InitialStatusMap map(MemberId(2)); // self is 2
+    Uuid id(true);
     MemberSet members = list_of(MemberId(0))(MemberId(1))(MemberId(2))(MemberId(3));
     map.configChange(members);
     BOOST_CHECK(map.isResendNeeded());
@@ -93,28 +108,34 @@
     map.received(MemberId(1), newcomerStatus());
     map.received(MemberId(2), newcomerStatus());
     map.received(MemberId(3), newcomerStatus());
-    map.received(MemberId(0), activeStatus());
+    map.received(MemberId(0), activeStatus(id));
     BOOST_CHECK(!map.isResendNeeded());
     BOOST_CHECK(map.isComplete());
+    BOOST_CHECK(map.transitionToComplete());
     BOOST_CHECK_EQUAL(map.getElders(), list_of(MemberId(0))(MemberId(3)));
     BOOST_CHECK(map.isUpdateNeeded());
+    BOOST_CHECK_EQUAL(map.getClusterId(), id);
 }
 
 QPID_AUTO_TEST_CASE(testMembersLeave) {
     // Test that map completes if members leave rather than send status.
     InitialStatusMap map(MemberId(0));
+    Uuid id(true);
     map.configChange(list_of(MemberId(0))(MemberId(1))(MemberId(2)));
     map.received(MemberId(0), newcomerStatus());
-    map.received(MemberId(1), activeStatus());
+    map.received(MemberId(1), activeStatus(id));
     BOOST_CHECK(!map.isComplete());
     map.configChange(list_of(MemberId(0))(MemberId(1))); // 2 left
     BOOST_CHECK(map.isComplete());
+    BOOST_CHECK(map.transitionToComplete());
     BOOST_CHECK_EQUAL(map.getElders(), list_of(MemberId(1)));
+    BOOST_CHECK_EQUAL(map.getClusterId(), id);
 }
 
 QPID_AUTO_TEST_CASE(testInteveningConfig) {
     // Multiple config changes arrives before we complete the map.
     InitialStatusMap map(MemberId(0));
+    Uuid id(true);
 
     map.configChange(list_of<MemberId>(0)(1));
     BOOST_CHECK(map.isResendNeeded());
@@ -125,7 +146,7 @@
     map.configChange(list_of<MemberId>(0)(1)(2));
     BOOST_CHECK(!map.isComplete());
     BOOST_CHECK(map.isResendNeeded());
-    map.received(1, activeStatus());
+    map.received(1, activeStatus(id));
     map.received(2, newcomerStatus());
     // We should not be complete as we haven't received 0 since new member joined
     BOOST_CHECK(!map.isComplete());
@@ -133,7 +154,9 @@
 
     map.received(0, newcomerStatus());
     BOOST_CHECK(map.isComplete());
+    BOOST_CHECK(map.transitionToComplete());
     BOOST_CHECK_EQUAL(map.getElders(), list_of<MemberId>(1));
+    BOOST_CHECK_EQUAL(map.getClusterId(), id);
 }
 
 QPID_AUTO_TEST_SUITE_END()

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=881423&r1=881422&r2=881423&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Tue Nov 17 18:09:21 2009
@@ -29,6 +29,11 @@
 class ClusterTests(BrokerTest):
     """Cluster tests with support for testing with a store plugin."""
 
+    def duration(self):
+        d = self.config.defines.get("DURATION")
+        if d: return float(d)*60
+        else: return 3
+
     def test_message_replication(self):
         """Test basic cluster message replication."""
         # Start a cluster, send some messages to member 0.
@@ -66,11 +71,10 @@
         sender = NumberedSender(cluster[2])
         sender.start()
 
-        # Kill original brokers, start new ones.
-        endtime = time.time() + (int(self.config.defines.get("DURATION") or 3))
+        # Kill original brokers, start new ones for the duration.
+        endtime = time.time() + self.duration()
         i = 0
         while time.time() < endtime:
-            print time.time(), endtime
             cluster[i].kill()
             i += 1
             b = cluster.start(expect=EXPECT_EXIT_FAIL)

Modified: qpid/trunk/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=881423&r1=881422&r2=881423&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ qpid/trunk/qpid/cpp/xml/cluster.xml Tue Nov 17 18:09:21 2009
@@ -40,8 +40,6 @@
     <!-- Sender offers an update to a new joiner. -->
     <control name = "update-offer" code="0x2">
       <field name="updatee" type="uint64"/>
-      <field name="cluster-id" type="uuid"/>
-      <field name="version" type="uint32"/>
     </control>
 
     <!-- Sender retracts an offer to a new joiner. -->
@@ -53,7 +51,9 @@
     <control name="initial-status" code="0x4">
       <field name="active" type="bit"/>
       <field name="persistent" type="bit"/>
-      <field name="properties" type="map"/>>
+      <field name="cluster-id" type="uuid"/>>
+      <field name="version" type="uint32"/>
+      <field name="url" type="str16"/>>
     </control>
 
     <!-- New member or updater is ready as an active member. -->



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org