You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2009/01/27 03:08:25 UTC

svn commit: r737971 - in /qpid/trunk/qpid/cpp: src/ src/qpid/broker/ src/qpid/cluster/ src/tests/ xml/

Author: aconway
Date: Tue Jan 27 02:08:25 2009
New Revision: 737971

URL: http://svn.apache.org/viewvc?rev=737971&view=rev
Log:
Cluster rename: dump -> update, newbie -> joiner

Added:
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp   (contents, props changed)
      - copied, changed from r737067, qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h   (contents, props changed)
      - copied, changed from r737067, qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h
Removed:
    qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h
Modified:
    qpid/trunk/qpid/cpp/src/cluster.mk
    qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp
    qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
    qpid/trunk/qpid/cpp/xml/cluster.xml

Modified: qpid/trunk/qpid/cpp/src/cluster.mk
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/cluster.mk?rev=737971&r1=737970&r2=737971&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ qpid/trunk/qpid/cpp/src/cluster.mk Tue Jan 27 02:08:25 2009
@@ -52,8 +52,8 @@
   qpid/cluster/Cpg.cpp				\
   qpid/cluster/Cpg.h				\
   qpid/cluster/Dispatchable.h			\
-  qpid/cluster/DumpClient.cpp			\
-  qpid/cluster/DumpClient.h			\
+  qpid/cluster/UpdateClient.cpp			\
+  qpid/cluster/UpdateClient.h			\
   qpid/cluster/Event.cpp			\
   qpid/cluster/Event.h				\
   qpid/cluster/EventFrame.h			\

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=737971&r1=737970&r2=737971&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp Tue Jan 27 02:08:25 2009
@@ -98,6 +98,8 @@
     }
 }
 
+static const std::string QPID_MANAGEMENT("qpid.management");
+
 Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args,
                    Manageable* parent)
     : name(_name), durable(_durable), args(_args), alternateUsers(0), persistenceId(0), 
@@ -111,9 +113,9 @@
             mgmtExchange = new _qmf::Exchange (agent, this, parent, _name, durable);
             mgmtExchange->set_arguments(args);
             if (!durable) {
-                if (name == "") {
+                if (name.empty()) {
                     agent->addObject (mgmtExchange, 0x1000000000000004LL);  // Special default exchange ID
-                } else if (name == "qpid.management") {
+                } else if (name == QPID_MANAGEMENT) {
                     agent->addObject (mgmtExchange, 0x1000000000000005LL);  // Special management exchange ID
                 } else {
                     ManagementBroker* mb = dynamic_cast<ManagementBroker*>(agent);
@@ -125,12 +127,12 @@
 
     sequence = _args.get(qpidMsgSequence);
     if (sequence) {
-        QPID_LOG(debug, "Configured exchange "+ _name +" with Msg sequencing");
+        QPID_LOG(debug, "Configured exchange " <<  _name  << " with Msg sequencing");
         args.setInt64(std::string(qpidSequenceCounter), sequenceNo);
     }
 
     ive = _args.get(qpidIVE);
-    if (ive) QPID_LOG(debug, "Configured exchange "+ _name +" with Initial Value");
+    if (ive) QPID_LOG(debug, "Configured exchange " <<  _name  << " with Initial Value");
 }
 
 Exchange::~Exchange ()

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=737971&r1=737970&r2=737971&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Tue Jan 27 02:08:25 2009
@@ -18,7 +18,7 @@
 
 #include "Cluster.h"
 #include "Connection.h"
-#include "DumpClient.h"
+#include "UpdateClient.h"
 #include "FailoverExchange.h"
 #include "ClusterQueueHandler.h"
 
@@ -29,10 +29,10 @@
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/framing/AMQP_AllOperations.h"
 #include "qpid/framing/AllInvoker.h"
-#include "qpid/framing/ClusterDumpRequestBody.h"
+#include "qpid/framing/ClusterUpdateRequestBody.h"
 #include "qpid/framing/ClusterReadyBody.h"
 #include "qpid/framing/ClusterConfigChangeBody.h"
-#include "qpid/framing/ClusterDumpOfferBody.h"
+#include "qpid/framing/ClusterUpdateOfferBody.h"
 #include "qpid/framing/ClusterShutdownBody.h"
 #include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
 #include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
@@ -77,10 +77,10 @@
     Cluster::Lock& l;
     ClusterDispatcher(Cluster& c, const MemberId& id, Cluster::Lock& l_) : cluster(c), member(id), l(l_) {}
 
-    void dumpRequest(const std::string& url) { cluster.dumpRequest(member, url, l); }
+    void updateRequest(const std::string& url) { cluster.updateRequest(member, url, l); }
     void ready(const std::string& url) { cluster.ready(member, url, l); }
     void configChange(const std::string& addresses) { cluster.configChange(member, addresses, l); }
-    void dumpOffer(uint64_t dumpee, const Uuid& id) { cluster.dumpOffer(member, dumpee, id, l); }
+    void updateOffer(uint64_t updatee, const Uuid& id) { cluster.updateOffer(member, updatee, id, l); }
     void shutdown() { cluster.shutdown(member, l); }
 
     bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); }
@@ -124,7 +124,7 @@
 }
 
 Cluster::~Cluster() {
-    if (dumpThread.id()) dumpThread.join(); // Join the previous dumpthread.
+    if (updateThread.id()) updateThread.join(); // Join the previous updatethread.
 }
 
 void Cluster::insert(const boost::intrusive_ptr<Connection>& c) {
@@ -205,7 +205,6 @@
 
 void Cluster::deliver(const Event& e, Lock&) {
     if (state == LEFT) return;
-    QPID_LOG(trace, *this << " PUSH: " << e);
     QPID_LATENCY_INIT(e);
     deliverEventQueue.push(e);
 }
@@ -216,7 +215,7 @@
     Buffer buf(const_cast<char*>(e.getData()), e.getSize());
     boost::intrusive_ptr<Connection> connection;
     if (e.isConnection()) {
-        if (state == NEWBIE) {
+        if (state == JOINER) {
             QPID_LOG(trace, *this << " DROP: " << e);
             return;
         }
@@ -236,11 +235,11 @@
 
 void Cluster::deliveredFrame(const EventFrame& e) {
     QPID_LATENCY_RECORD("delivered frame queue", e.frame);
-    QPID_LOG(trace, *this << " DLVR: " << e.frame);
     if (e.connection)   {
         e.connection->deliveredFrame(e);
     }
     else {
+        QPID_LOG(trace, *this << " DLVR: " << e.frame);
         Mutex::ScopedLock l(lock); // FIXME aconway 2008-12-11: lock scope too big?
         ClusterDispatcher dispatch(*this, e.member, l);
         if (!framing::invoke(dispatch, *e.frame.getBody()).wasHandled())
@@ -313,9 +312,9 @@
             memberUpdate(l);
         }
         else {                  // Joining established group.
-            state = NEWBIE;
+            state = JOINER;
             QPID_LOG(info, *this << " joining cluster: " << map);
-            mcast.mcastControl(ClusterDumpRequestBody(ProtocolVersion(), myUrl.str()), myId);
+            mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), myId);
             ClusterMap::Set members = map.getAlive();
             members.erase(myId);
             myElders = members;
@@ -336,10 +335,10 @@
 
 
 void Cluster::tryMakeOffer(const MemberId& id, Lock& ) {
-    if (state == READY && map.isNewbie(id)) {
+    if (state == READY && map.isJoiner(id)) {
         state = OFFER;
-        QPID_LOG(info, *this << " send dump-offer to " << id);
-        mcast.mcastControl(ClusterDumpOfferBody(ProtocolVersion(), id, clusterId), myId);
+        QPID_LOG(info, *this << " send update-offer to " << id);
+        mcast.mcastControl(ClusterUpdateOfferBody(ProtocolVersion(), id, clusterId), myId);
     }
 }
 
@@ -359,8 +358,8 @@
     delete this;
 }
 
-void Cluster::dumpRequest(const MemberId& id, const std::string& url, Lock& l) {
-    map.dumpRequest(id, url);
+void Cluster::updateRequest(const MemberId& id, const std::string& url, Lock& l) {
+    map.updateRequest(id, url);
     tryMakeOffer(id, l);
 }
 
@@ -376,81 +375,81 @@
     }
 }
 
-void Cluster::dumpOffer(const MemberId& dumper, uint64_t dumpeeInt, const Uuid& uuid, Lock& l) {
+void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uuid& uuid, Lock& l) {
     if (state == LEFT) return;
-    MemberId dumpee(dumpeeInt);
-    boost::optional<Url> url = map.dumpOffer(dumper, dumpee);
-    if (dumper == myId) {
+    MemberId updatee(updateeInt);
+    boost::optional<Url> url = map.updateOffer(updater, updatee);
+    if (updater == myId) {
         assert(state == OFFER);
         if (url) {              // My offer was first.
-            dumpStart(dumpee, *url, l);
+            updateStart(updatee, *url, l);
         }
         else {                  // Another offer was first.
             state = READY;
             mcast.release();
-            QPID_LOG(info, *this << " cancelled dump offer to " << dumpee);
-            tryMakeOffer(map.firstNewbie(), l); // Maybe make another offer.
+            QPID_LOG(info, *this << " cancelled update offer to " << updatee);
+            tryMakeOffer(map.firstJoiner(), l); // Maybe make another offer.
         }
     }
-    else if (dumpee == myId && url) {
-        assert(state == NEWBIE);
+    else if (updatee == myId && url) {
+        assert(state == JOINER);
         setClusterId(uuid);
-        state = DUMPEE;
-        QPID_LOG(info, *this << " receiving dump from " << dumper);
+        state = UPDATEE;
+        QPID_LOG(info, *this << " receiving update from " << updater);
         deliverEventQueue.stop();
-        checkDumpIn(l);
+        checkUpdateIn(l);
     }
 }
 
-void Cluster::dumpStart(const MemberId& dumpee, const Url& url, Lock&) {
+void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock&) {
     if (state == LEFT) return;
     assert(state == OFFER);
-    state = DUMPER;
-    QPID_LOG(info, *this << " stall for dump to " << dumpee << " at " << url);
+    state = UPDATER;
+    QPID_LOG(info, *this << " stall for update to " << updatee << " at " << url);
     deliverEventQueue.stop();
-    if (dumpThread.id()) dumpThread.join(); // Join the previous dumpthread.
-    dumpThread = Thread(
-        new DumpClient(myId, dumpee, url, broker, map, connections.values(),
-                       boost::bind(&Cluster::dumpOutDone, this),
-                       boost::bind(&Cluster::dumpOutError, this, _1)));
+    if (updateThread.id()) updateThread.join(); // Join the previous updatethread.
+    updateThread = Thread(
+        new UpdateClient(myId, updatee, url, broker, map, connections.values(),
+                       boost::bind(&Cluster::updateOutDone, this),
+                       boost::bind(&Cluster::updateOutError, this, _1)));
 }
 
-// Called in dump thread.
-void Cluster::dumpInDone(const ClusterMap& m) {
+// Called in update thread.
+void Cluster::updateInDone(const ClusterMap& m) {
     Lock l(lock);
-    dumpedMap = m;
-    checkDumpIn(l);
+    updatedMap = m;
+    checkUpdateIn(l);
 }
 
-void Cluster::checkDumpIn(Lock& ) {
+void Cluster::checkUpdateIn(Lock& ) {
     if (state == LEFT) return;
-    if (state == DUMPEE && dumpedMap) {
-        map = *dumpedMap;
+    if (state == UPDATEE && updatedMap) {
+        map = *updatedMap;
         mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), myId);
         state = CATCHUP;
-        QPID_LOG(info, *this << " received dump, starting catch-up");
+        QPID_LOG(info, *this << " received update, starting catch-up");
         deliverEventQueue.start();
     }
 }
 
-void Cluster::dumpOutDone() {
+void Cluster::updateOutDone() {
     Monitor::ScopedLock l(lock);
-    dumpOutDone(l);
+    updateOutDone(l);
 }
 
-void Cluster::dumpOutDone(Lock& l) {
-    assert(state == DUMPER);
+void Cluster::updateOutDone(Lock& l) {
+    assert(state == UPDATER);
     state = READY;
     mcast.release();
-    QPID_LOG(info, *this << " sent dump");
+    QPID_LOG(info, *this << " sent update");
     deliverEventQueue.start();
-    tryMakeOffer(map.firstNewbie(), l); // Try another offer
+    tryMakeOffer(map.firstJoiner(), l); // Try another offer
 }
 
-void Cluster::dumpOutError(const std::exception& e)  {
+void Cluster::updateOutError(const std::exception& e)  {
     Monitor::ScopedLock l(lock);
-    QPID_LOG(error, *this << " error sending dump: " << e.what());    
-    dumpOutDone(l);
+    QPID_LOG(error, *this << " error sending update: " << e.what());    
+    updateOutDone(l);
 }
 
 void Cluster ::shutdown(const MemberId& id, Lock& l) {
@@ -534,7 +533,7 @@
 }
 
 std::ostream& operator<<(std::ostream& o, const Cluster& cluster) {
-    static const char* STATE[] = { "INIT", "NEWBIE", "DUMPEE", "CATCHUP", "READY", "OFFER", "DUMPER", "LEFT" };
+    static const char* STATE[] = { "INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT" };
     return o << cluster.myId << "(" << STATE[cluster.state] << ")";
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=737971&r1=737970&r2=737971&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Tue Jan 27 02:08:25 2009
@@ -59,7 +59,7 @@
 /**
  * Connection to the cluster
  *
- * Threading notes: 3 thread categories: connection, deliver, dump.
+ * Threading notes: 3 thread categories: connection, deliver, update.
  * 
  */
 class Cluster : private Cpg::Handler, public management::Manageable {
@@ -87,8 +87,8 @@
     // Leave the cluster - called in any thread.
     void leave();
 
-    // Dump completed - called in dump thread
-    void dumpInDone(const ClusterMap&);
+    // Update completed - called in update thread
+    void updateInDone(const ClusterMap&);
 
     MemberId getId() const;
     broker::Broker& getBroker() const;
@@ -124,8 +124,8 @@
     // Cluster controls implement XML methods from cluster.xml.
     // Called in deliver thread.
     // 
-    void dumpRequest(const MemberId&, const std::string&, Lock&);
-    void dumpOffer(const MemberId& dumper, uint64_t dumpee, const framing::Uuid&, Lock&);
+    void updateRequest(const MemberId&, const std::string&, Lock&);
+    void updateOffer(const MemberId& updater, uint64_t updatee, const framing::Uuid&, Lock&);
     void ready(const MemberId&, const std::string&, Lock&);
     void configChange(const MemberId&, const std::string& addresses, Lock& l);
     void shutdown(const MemberId&, Lock&);
@@ -133,7 +133,7 @@
     void deliveredFrame(const EventFrame&); 
 
     // Helper, called in deliver thread.
-    void dumpStart(const MemberId& dumpee, const Url& url, Lock&);
+    void updateStart(const MemberId& updatee, const Url& url, Lock&);
 
     void deliver( // CPG deliver callback. 
         cpg_handle_t /*handle*/,
@@ -163,12 +163,12 @@
     void memberUpdate(Lock&);
 
     // Called in connection IO threads .
-    void checkDumpIn(Lock&);
+    void checkUpdateIn(Lock&);
 
-    // Called in DumpClient thread.
-    void dumpOutDone();
-    void dumpOutError(const std::exception&);
-    void dumpOutDone(Lock&);
+    // Called in UpdateClient thread.
+    void updateOutDone();
+    void updateOutError(const std::exception&);
+    void updateOutDone(Lock&);
 
     void setClusterId(const framing::Uuid&);
 
@@ -201,23 +201,23 @@
 
     //    Local cluster state, cluster map
     enum {
-        INIT,                   ///< Initial state, no CPG messages received.
-        NEWBIE,                 ///< Sent dump request, waiting for dump offer.
-        DUMPEE,                 ///< Stalled receive queue at dump offer, waiting for dump to complete.
-        CATCHUP,                ///< Dump complete, unstalled but has not yet seen own "ready" event.
-        READY,                  ///< Fully operational 
-        OFFER,                  ///< Sent an offer, waiting for accept/reject.
-        DUMPER,                 ///< Offer accepted, sending a state dump.
-        LEFT                    ///< Final state, left the cluster.
+        INIT,    ///< Initial state, no CPG messages received.
+        JOINER,  ///< Sent update request, waiting for update offer.
+        UPDATEE, ///< Stalled receive queue at update offer, waiting for update to complete.
+        CATCHUP, ///< Update complete, unstalled but has not yet seen own "ready" event.
+        READY,   ///< Fully operational 
+        OFFER,   ///< Sent an offer, waiting for accept/reject.
+        UPDATER, ///< Offer accepted, sending a state update.
+        LEFT     ///< Final state, left the cluster.
     } state;
     ClusterMap map;
     size_t lastSize;
     bool lastBroker;
     uint64_t sequence;
 
-    //     Dump related
-    sys::Thread dumpThread;
-    boost::optional<ClusterMap> dumpedMap;
+    //     Update related
+    sys::Thread updateThread;
+    boost::optional<ClusterMap> updatedMap;
 
   friend std::ostream& operator<<(std::ostream&, const Cluster&);
   friend class ClusterDispatcher;

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp?rev=737971&r1=737970&r2=737971&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp Tue Jan 27 02:08:25 2009
@@ -61,21 +61,21 @@
     if (isMember)
         members[id] = url;
     else
-        newbies[id] = url;
+        joiners[id] = url;
 }
 
-ClusterMap::ClusterMap(const FieldTable& newbiesFt, const FieldTable& membersFt) {
-    std::for_each(newbiesFt.begin(), newbiesFt.end(), boost::bind(&addFieldTableValue, _1, boost::ref(newbies), boost::ref(alive)));
+ClusterMap::ClusterMap(const FieldTable& joinersFt, const FieldTable& membersFt) {
+    std::for_each(joinersFt.begin(), joinersFt.end(), boost::bind(&addFieldTableValue, _1, boost::ref(joiners), boost::ref(alive)));
     std::for_each(membersFt.begin(), membersFt.end(), boost::bind(&addFieldTableValue, _1, boost::ref(members), boost::ref(alive)));
 }
 
 ClusterConnectionMembershipBody ClusterMap::asMethodBody() const {
     framing::ClusterConnectionMembershipBody b;
-    b.getNewbies().clear();
-    std::for_each(newbies.begin(), newbies.end(), boost::bind(&insertFieldTableFromMapValue, boost::ref(b.getNewbies()), _1));
+    b.getJoiners().clear();
+    std::for_each(joiners.begin(), joiners.end(), boost::bind(&insertFieldTableFromMapValue, boost::ref(b.getJoiners()), _1));
     for(Set::const_iterator i = alive.begin(); i != alive.end(); ++i) {
-        if (!isMember(*i) && !isNewbie(*i))
-            b.getNewbies().setString(i->str(), std::string());
+        if (!isMember(*i) && !isJoiner(*i))
+            b.getJoiners().setString(i->str(), std::string());
     }
     b.getMembers().clear();
     std::for_each(members.begin(), members.end(), boost::bind(&insertFieldTableFromMapValue, boost::ref(b.getMembers()), _1));
@@ -91,7 +91,7 @@
     bool memberChange=false;
     for (a = left; a != left+nLeft; ++a) {
         memberChange = memberChange || members.erase(*a);
-        newbies.erase(*a);
+        joiners.erase(*a);
     }
     alive.clear();
     std::copy(current, current+nCurrent, std::inserter(alive, alive.end()));
@@ -103,8 +103,8 @@
     return i == map.end() ? Url() : i->second;
 }
      
-MemberId ClusterMap::firstNewbie() const {
-    return newbies.empty() ? MemberId() : newbies.begin()->first;
+MemberId ClusterMap::firstJoiner() const {
+    return joiners.empty() ? MemberId() : joiners.begin()->first;
 }
 
 std::vector<string> ClusterMap::memberIds() const {
@@ -139,16 +139,16 @@
     for (ClusterMap::Set::const_iterator i = m.alive.begin(); i != m.alive.end(); ++i) {
         o << *i;
         if (m.isMember(*i)) o << "(member)";
-        else if (m.isNewbie(*i)) o << "(newbie)";
+        else if (m.isJoiner(*i)) o << "(joiner)";
         else o << "(unknown)";
         o << " ";
     }
     return o;
 }
 
-bool ClusterMap::dumpRequest(const MemberId& id, const std::string& url) {
+bool ClusterMap::updateRequest(const MemberId& id, const std::string& url) {
     if (isAlive(id)) {
-        newbies[id] = Url(url);
+        joiners[id] = Url(url);
         return true;
     }
     return false;
@@ -170,16 +170,16 @@
     alive = update;
     for (Set::const_iterator i = removed.begin(); i != removed.end(); ++i) {
         memberChange = memberChange || members.erase(*i);
-        newbies.erase(*i);
+        joiners.erase(*i);
     }
     return memberChange;
 }
 
-boost::optional<Url> ClusterMap::dumpOffer(const MemberId& from, const MemberId& to) {
-    Map::iterator i = newbies.find(to);
-    if (isAlive(from) && i != newbies.end()) {
+boost::optional<Url> ClusterMap::updateOffer(const MemberId& from, const MemberId& to) {
+    Map::iterator i = joiners.find(to);
+    if (isAlive(from) && i != joiners.end()) {
         Url url= i->second;
-        newbies.erase(i);       // No longer a potential dumpee.
+        joiners.erase(i);       // No longer a potential updatee.
         return url;
     }
     return boost::optional<Url>();

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h?rev=737971&r1=737970&r2=737971&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h Tue Jan 27 02:08:25 2009
@@ -39,7 +39,7 @@
 namespace cluster {
 
 /**
- * Map of established cluster members and newbies waiting for a brain dump.
+ * Map of established cluster members and joiners waiting for an update.
  */
 class ClusterMap {
   public:
@@ -60,15 +60,15 @@
 
     bool configChange(const std::string& addresses);
 
-    bool isNewbie(const MemberId& id) const { return newbies.find(id) != newbies.end(); }
+    bool isJoiner(const MemberId& id) const { return joiners.find(id) != joiners.end(); }
     bool isMember(const MemberId& id) const { return members.find(id) != members.end(); }
     bool isAlive(const MemberId& id) const { return alive.find(id) != alive.end(); }
     
-    Url getNewbieUrl(const MemberId& id) { return getUrl(newbies, id); }
+    Url getJoinerUrl(const MemberId& id) { return getUrl(joiners, id); }
     Url getMemberUrl(const MemberId& id) { return getUrl(members, id); }
 
-    /** First newbie in the cluster in ID order, target for offers */
-    MemberId firstNewbie() const;
+    /** First joiner in the cluster in ID order, target for offers */
+    MemberId firstJoiner() const;
 
     /** Convert map contents to a cluster control body. */
     framing::ClusterConnectionMembershipBody asMethodBody() const;
@@ -79,9 +79,9 @@
     std::vector<Url> memberUrls() const;
     Set getAlive() const;
 
-    bool dumpRequest(const MemberId& id, const std::string& url);       
+    bool updateRequest(const MemberId& id, const std::string& url);       
     /** Return non-empty Url if accepted */
-    boost::optional<Url> dumpOffer(const MemberId& from, const MemberId& to);
+    boost::optional<Url> updateOffer(const MemberId& from, const MemberId& to);
 
     /**@return true If this is a new member */ 
     bool ready(const MemberId& id, const Url&);
@@ -93,7 +93,7 @@
   private:
     Url getUrl(const Map& map, const  MemberId& id);
     
-    Map newbies, members;
+    Map joiners, members;
     Set alive;
 
   friend std::ostream& operator<<(std::ostream&, const Map&);

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=737971&r1=737970&r2=737971&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Tue Jan 27 02:08:25 2009
@@ -21,7 +21,7 @@
 
 #include "qpid/cluster/Cluster.h"
 #include "qpid/cluster/ConnectionCodec.h"
-#include "qpid/cluster/DumpClient.h"
+#include "qpid/cluster/UpdateClient.h"
 
 #include "qpid/broker/Broker.h"
 #include "qpid/Plugin.h"
@@ -87,43 +87,43 @@
     }
 };
 
-struct DumpClientIdAllocator : management::IdAllocator
+struct UpdateClientIdAllocator : management::IdAllocator
 {
     qpid::sys::AtomicValue<uint64_t> sequence;
 
-    DumpClientIdAllocator() : sequence(0x4000000000000000LL) {}
+    UpdateClientIdAllocator() : sequence(0x4000000000000000LL) {}
 
     uint64_t getIdFor(management::Manageable* m)
     {
-        if (isDumpQueue(m) || isDumpExchange(m) || isDumpSession(m) || isDumpBinding(m)) {
+        if (isUpdateQueue(m) || isUpdateExchange(m) || isUpdateSession(m) || isUpdateBinding(m)) {
             return ++sequence;
         } else {
             return 0;
         }
     }
 
-    bool isDumpQueue(management::Manageable* manageable)
+    bool isUpdateQueue(management::Manageable* manageable)
     {
         qpid::broker::Queue* queue = dynamic_cast<qpid::broker::Queue*>(manageable);
-        return queue && queue->getName() == DumpClient::DUMP;
+        return queue && queue->getName() == UpdateClient::UPDATE;
     }
 
-    bool isDumpExchange(management::Manageable* manageable)
+    bool isUpdateExchange(management::Manageable* manageable)
     {
         qpid::broker::Exchange* exchange = dynamic_cast<qpid::broker::Exchange*>(manageable);
-        return exchange && exchange->getName() == DumpClient::DUMP;
+        return exchange && exchange->getName() == UpdateClient::UPDATE;
     }
 
-    bool isDumpSession(management::Manageable* manageable)
+    bool isUpdateSession(management::Manageable* manageable)
     {
         broker::SessionState* session = dynamic_cast<broker::SessionState*>(manageable);
-        return session && session->getId().getName() == DumpClient::DUMP;
+        return session && session->getId().getName() == UpdateClient::UPDATE;
     }
 
-    bool isDumpBinding(management::Manageable* manageable)
+    bool isUpdateBinding(management::Manageable* manageable)
     {
         broker::Exchange::Binding* binding = dynamic_cast<broker::Exchange::Binding*>(manageable);
-        return binding && binding->queue->getName() == DumpClient::DUMP;
+        return binding && binding->queue->getName() == UpdateClient::UPDATE;
     }
 };
 
@@ -155,7 +155,7 @@
         broker->getExchanges().registerExchange(cluster->getFailoverExchange());
         ManagementBroker* mgmt = dynamic_cast<ManagementBroker*>(ManagementAgent::Singleton::getInstance());
         if (mgmt) {
-            std::auto_ptr<IdAllocator> allocator(new DumpClientIdAllocator());
+            std::auto_ptr<IdAllocator> allocator(new UpdateClientIdAllocator());
             mgmt->setAllocator(allocator);
         }
     }

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=737971&r1=737970&r2=737971&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Tue Jan 27 02:08:25 2009
@@ -19,7 +19,7 @@
  *
  */
 #include "Connection.h"
-#include "DumpClient.h"
+#include "UpdateClient.h"
 #include "Cluster.h"
 
 #include "qpid/broker/SessionState.h"
@@ -45,8 +45,8 @@
 
 // TODO aconway 2008-11-03:
 // 
-// Disproportionate amount of code here is dedicated to receiving a
-// brain-dump when joining a cluster and building initial
+// Disproportionate amount of code here is dedicated to receiving an
+// update when joining a cluster and building initial
 // state. Should be separated out into its own classes.
 //
 
@@ -104,7 +104,7 @@
         if (!framing::invoke(*this, *f.getBody()).wasHandled())
             connection.received(f);
     }
-    else {             // Shadow or dumped ex catch-up connection.
+    else {             // Shadow or updated ex catch-up connection.
         if (f.getMethod() && f.getMethod()->isA<ConnectionCloseBody>()) {
             if (isShadow()) {
                 QPID_LOG(debug, cluster << " inserting connection " << *this);
@@ -155,7 +155,7 @@
 
 // Delivered from cluster.
 void Connection::deliveredFrame(const EventFrame& f) {
-    QPID_LOG(trace, cluster << " RECV: " << *this << ": " << f.frame);
+    QPID_LOG(trace, cluster << " DLVR: " << *this << ": " << f.frame);
     assert(!catchUp);
     currentChannel = f.frame.getChannel(); 
     if (!framing::invoke(*this, *f.frame.getBody()).wasHandled() // Connection contol.
@@ -174,8 +174,8 @@
             QPID_LOG(critical, cluster << " catch-up connection closed prematurely " << *this);
             cluster.leave();
         }
-        else if (isDumped()) {
-            QPID_LOG(debug, cluster << " closed dump connection " << *this);
+        else if (isUpdated()) {
+            QPID_LOG(debug, cluster << " closed update connection " << *this);
             connection.closed();
         }
         else if (isLocal()) {
@@ -268,7 +268,7 @@
         received,
         unknownCompleted,
         receivedIncomplete);
-    QPID_LOG(debug, cluster << " received session state dump for " << sessionState().getId());
+    QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId());
 }
     
 void Connection::shadowReady(uint64_t memberId, uint64_t connectionId) {
@@ -277,10 +277,10 @@
     self = shadow;
 }
 
-void Connection::membership(const FieldTable& newbies, const FieldTable& members) {
-    QPID_LOG(debug, cluster << " incoming dump complete on connection " << *this);
-    cluster.dumpInDone(ClusterMap(newbies, members));
-    self.second = 0;        // Mark this as completed dump connection.
+void Connection::membership(const FieldTable& joiners, const FieldTable& members) {
+    QPID_LOG(debug, cluster << " incoming update complete on connection " << *this);
+    cluster.updateInDone(ClusterMap(joiners, members));
+    self.second = 0;        // Mark this as completed update connection.
 }
 
 bool Connection::isLocal() const {
@@ -291,7 +291,7 @@
     return self.first != cluster.getId();
 }
 
-bool Connection::isDumped() const {
+bool Connection::isUpdated() const {
     return self.first == cluster.getId() && self.second == 0;
 }
 
@@ -302,9 +302,9 @@
     return queue;
 }
 
-broker::QueuedMessage Connection::getDumpMessage() {
-    broker::QueuedMessage m = findQueue(DumpClient::DUMP)->get();
-    if (!m.payload) throw Exception(QPID_MSG(cluster << " empty dump queue"));
+broker::QueuedMessage Connection::getUpdateMessage() {
+    broker::QueuedMessage m = findQueue(UpdateClient::UPDATE)->get();
+    if (!m.payload) throw Exception(QPID_MSG(cluster << " empty update queue"));
     return m;
 }
 
@@ -323,12 +323,12 @@
     broker::QueuedMessage m;
     broker::Queue::shared_ptr queue = findQueue(qname);
     if (!ended) {               // Has a message
-        if (acquired)           // Message is on the dump queue
-            m = getDumpMessage();
+        if (acquired)           // Message is on the update queue
+            m = getUpdateMessage();
         else                    // Message at original position in original queue
             m = queue->find(position);
         if (!m.payload)
-            throw Exception(QPID_MSG("deliveryRecord no dump message"));
+            throw Exception(QPID_MSG("deliveryRecord no update message"));
     }
 
     broker::DeliveryRecord dr(m, queue, tag, acquired, accepted, windowing, credit);
@@ -349,7 +349,7 @@
     const char* type="unknown";
     if (c.isLocal()) type = "local";
     else if (c.isShadow()) type = "shadow";
-    else if (c.isDumped()) type = "dumped";
+    else if (c.isUpdated()) type = "updated";
     return o << c.getId() << "(" << type << (c.isCatchUp() ? ",catchup" : "") << ")";
 }
 
@@ -361,15 +361,15 @@
 }
 
 void Connection::txDequeue(const std::string& queue) {
-    txBuffer->enlist(make_shared_ptr(new broker::RecoveredDequeue(findQueue(queue), getDumpMessage().payload)));
+    txBuffer->enlist(make_shared_ptr(new broker::RecoveredDequeue(findQueue(queue), getUpdateMessage().payload)));
 }
 
 void Connection::txEnqueue(const std::string& queue) {
-    txBuffer->enlist(make_shared_ptr(new broker::RecoveredEnqueue(findQueue(queue), getDumpMessage().payload)));
+    txBuffer->enlist(make_shared_ptr(new broker::RecoveredEnqueue(findQueue(queue), getUpdateMessage().payload)));
 }
 
 void Connection::txPublish(const framing::Array& queues, bool delivered) {
-    boost::shared_ptr<broker::TxPublish> txPub(new broker::TxPublish(getDumpMessage().payload));
+    boost::shared_ptr<broker::TxPublish> txPub(new broker::TxPublish(getUpdateMessage().payload));
     for (framing::Array::const_iterator i = queues.begin(); i != queues.end(); ++i) 
         txPub->deliverTo(findQueue((*i)->get<std::string>()));
     txPub->delivered = delivered;

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=737971&r1=737970&r2=737971&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Tue Jan 27 02:08:25 2009
@@ -81,8 +81,8 @@
     /** True if the connection is in "catch-up" mode: building initial broker state. */
     bool isCatchUp() const { return catchUp; }
 
-    /** True if the connection is a completed shared dump connection */
-    bool isDumped() const;
+    /** True if the connection is a completed shared update connection */
+    bool isUpdated() const;
 
     Cluster& getCluster() { return cluster; }
 
@@ -108,7 +108,7 @@
     
     // ==== Used in catch-up mode to build initial state.
     // 
-    // State dump methods.
+    // State update methods.
     void sessionState(const framing::SequenceNumber& replayStart,
                       const framing::SequenceNumber& sendCommandPoint,
                       const framing::SequenceSet& sentIncomplete,
@@ -156,7 +156,7 @@
     boost::shared_ptr<broker::Queue> findQueue(const std::string& qname);
     broker::SessionState& sessionState();
     broker::SemanticState& semanticState();
-    broker::QueuedMessage getDumpMessage();
+    broker::QueuedMessage getUpdateMessage();
 
     static NoOpConnectionOutputHandler discardHandler;
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp?rev=737971&r1=737970&r2=737971&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp Tue Jan 27 02:08:25 2009
@@ -66,7 +66,6 @@
 ConnectionCodec::~ConnectionCodec() {}
 
 size_t ConnectionCodec::decode(const char* buffer, size_t size) {
-    QPID_LOG(trace, "RECVB [" << localId << "]: " << size << " bytes");
     return interceptor->decode(buffer, size);
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp?rev=737971&r1=737970&r2=737971&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp Tue Jan 27 02:08:25 2009
@@ -23,6 +23,7 @@
 #include "Cpg.h"
 #include "qpid/log/Statement.h"
 #include "qpid/sys/LatencyMetric.h"
+#include "qpid/framing/AMQBody.h"
 
 namespace qpid {
 namespace cluster {
@@ -40,12 +41,14 @@
 }
 
 void Multicaster::mcastControl(const framing::AMQBody& body, const ConnectionId& id) {
+    QPID_LOG(trace, "MCAST " << id << ": " << body);
     mcast(Event::control(body, id));
 }
 
 void Multicaster::mcastBuffer(const char* data, size_t size, const ConnectionId& id) {
     Event e(DATA, id, size);
     memcpy(e.getData(), data, size);
+    QPID_LOG(trace, "MCAST " << e);
     mcast(e);
 }
 
@@ -54,7 +57,6 @@
         sys::Mutex::ScopedLock l(lock);
         if (e.getType() == DATA && e.isConnection() && holding) {
             holdingQueue.push_back(e); 
-            QPID_LOG(trace, " MCAST held: " << e );
             return;
         }
     }
@@ -85,7 +87,6 @@
                 }
                 break; 
             }
-            QPID_LOG(trace, " MCAST " << *i); 
             ++i;
         }
         values.erase(values.begin(), i); // Erase sent events.

Copied: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (from r737067, qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?p2=qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp&p1=qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp&r1=737067&r2=737971&rev=737971&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Tue Jan 27 02:08:25 2009
@@ -18,7 +18,7 @@
  * under the License.
  *
  */
-#include "DumpClient.h"
+#include "UpdateClient.h"
 #include "Cluster.h"
 #include "ClusterMap.h"
 #include "Connection.h"
@@ -83,49 +83,49 @@
     sb.get()->send(body);
 }
 
-// TODO aconway 2008-09-24: optimization: dump connections/sessions in parallel.
+// TODO aconway 2008-09-24: optimization: update connections/sessions in parallel.
 
-DumpClient::DumpClient(const MemberId& dumper, const MemberId& dumpee, const Url& url,
+UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, const Url& url,
                        broker::Broker& broker, const ClusterMap& m, const Cluster::Connections& cons,
                        const boost::function<void()>& ok,
                        const boost::function<void(const std::exception&)>& fail)
-    : dumperId(dumper), dumpeeId(dumpee), dumpeeUrl(url), dumperBroker(broker), map(m), connections(cons), 
+    : updaterId(updater), updateeId(updatee), updateeUrl(url), updaterBroker(broker), map(m), connections(cons), 
       connection(catchUpConnection()), shadowConnection(catchUpConnection()),
       done(ok), failed(fail)
 {
     connection.open(url);
-    session = connection.newSession("dump_shared");
+    session = connection.newSession("update_shared");
 }
 
-DumpClient::~DumpClient() {}
+UpdateClient::~UpdateClient() {}
 
 // Illegal exchange/queue name for catch-up, avoid clashes with user queues/exchanges.
-static const char DUMP_CHARS[] = "\000qpid-dump";
-const std::string DumpClient::DUMP(DUMP_CHARS, sizeof(DUMP_CHARS)); 
+static const char UPDATE_CHARS[] = "\000qpid-update";
+const std::string UpdateClient::UPDATE(UPDATE_CHARS, sizeof(UPDATE_CHARS)); 
 
-void DumpClient::dump() {
-    QPID_LOG(debug, dumperId << " dumping state to " << dumpeeId << " at " << dumpeeUrl);
-    Broker& b = dumperBroker;
-    b.getExchanges().eachExchange(boost::bind(&DumpClient::dumpExchange, this, _1));
-
-    // Dump exchange is used to route messages to the proper queue without modifying routing key.
-    session.exchangeDeclare(arg::exchange=DUMP, arg::type="fanout", arg::autoDelete=true);
-    b.getQueues().eachQueue(boost::bind(&DumpClient::dumpQueue, this, _1));
-    // Dump queue is used to transfer acquired messages that are no longer on their original queue.
-    session.queueDeclare(arg::queue=DUMP, arg::autoDelete=true);
+void UpdateClient::update() {
+    QPID_LOG(debug, updaterId << " updateing state to " << updateeId << " at " << updateeUrl);
+    Broker& b = updaterBroker;
+    b.getExchanges().eachExchange(boost::bind(&UpdateClient::updateExchange, this, _1));
+
+    // Update exchange is used to route messages to the proper queue without modifying routing key.
+    session.exchangeDeclare(arg::exchange=UPDATE, arg::type="fanout", arg::autoDelete=true);
+    b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueue, this, _1));
+    // Update queue is used to transfer acquired messages that are no longer on their original queue.
+    session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true);
     session.sync();
     session.close();
 
-    std::for_each(connections.begin(), connections.end(), boost::bind(&DumpClient::dumpConnection, this, _1));
+    std::for_each(connections.begin(), connections.end(), boost::bind(&UpdateClient::updateConnection, this, _1));
     AMQFrame frame(map.asMethodBody());
     client::ConnectionAccess::getImpl(connection)->handle(frame);
     connection.close();
-    QPID_LOG(debug,  dumperId << " dumped state to " << dumpeeId << " at " << dumpeeUrl);
+    QPID_LOG(debug,  updaterId << " updated state to " << updateeId << " at " << updateeUrl);
 }
 
-void DumpClient::run() {
+void UpdateClient::run() {
     try {
-        dump();
+        update();
         done();
     } catch (const std::exception& e) {
         failed(e);
@@ -143,16 +143,16 @@
 }
 } // namespace
 
-void DumpClient::dumpExchange(const boost::shared_ptr<Exchange>& ex) {
-    QPID_LOG(debug, dumperId << " dumping exchange " << ex->getName());
+void UpdateClient::updateExchange(const boost::shared_ptr<Exchange>& ex) {
+    QPID_LOG(debug, updaterId << " updateing exchange " << ex->getName());
     ClusterConnectionProxy proxy(session);
     proxy.exchange(encode(*ex));
 }
 
-/** Bind a queue to the dump exchange and dump messges to it
+/** Bind a queue to the update exchange and update messges to it
  * setting the message possition as needed.
  */
-class MessageDumper {
+class MessageUpdater {
     std::string queue;
     bool haveLastPos;
     framing::SequenceNumber lastPos;
@@ -160,15 +160,15 @@
 
   public:
 
-    MessageDumper(const string& q, const client::AsyncSession s) : queue(q), haveLastPos(false), session(s) {
-        session.exchangeBind(queue, DumpClient::DUMP);
+    MessageUpdater(const string& q, const client::AsyncSession s) : queue(q), haveLastPos(false), session(s) {
+        session.exchangeBind(queue, UpdateClient::UPDATE);
     }
 
-    ~MessageDumper() {
-        session.exchangeUnbind(queue, DumpClient::DUMP);
+    ~MessageUpdater() {
+        session.exchangeUnbind(queue, UpdateClient::UPDATE);
     }
 
-    void dumpQueuedMessage(const broker::QueuedMessage& message) {
+    void updateQueuedMessage(const broker::QueuedMessage& message) {
         if (!haveLastPos || message.position - lastPos != 1)  {
             ClusterConnectionProxy(session).queuePosition(queue, message.position.getValue()-1);
             haveLastPos = true;
@@ -176,52 +176,52 @@
         lastPos = message.position;
         SessionBase_0_10Access sb(session);
         framing::MessageTransferBody transfer(
-            framing::ProtocolVersion(), DumpClient::DUMP, message::ACCEPT_MODE_NONE, message::ACQUIRE_MODE_PRE_ACQUIRED);
+            framing::ProtocolVersion(), UpdateClient::UPDATE, message::ACCEPT_MODE_NONE, message::ACQUIRE_MODE_PRE_ACQUIRED);
         sb.get()->send(transfer, message.payload->getFrames());
     }
 
-    void dumpMessage(const boost::intrusive_ptr<broker::Message>& message) {
-        dumpQueuedMessage(broker::QueuedMessage(0, message, haveLastPos? lastPos.getValue()+1 : 1));
+    void updateMessage(const boost::intrusive_ptr<broker::Message>& message) {
+        updateQueuedMessage(broker::QueuedMessage(0, message, haveLastPos? lastPos.getValue()+1 : 1));
     }
 };
 
 
-void DumpClient::dumpQueue(const boost::shared_ptr<Queue>& q) {
-    QPID_LOG(debug, dumperId << " dumping queue " << q->getName());
+void UpdateClient::updateQueue(const boost::shared_ptr<Queue>& q) {
+    QPID_LOG(debug, updaterId << " updateing queue " << q->getName());
     ClusterConnectionProxy proxy(session);
     proxy.queue(encode(*q));
-    MessageDumper dumper(q->getName(), session);
-    q->eachMessage(boost::bind(&MessageDumper::dumpQueuedMessage, &dumper, _1));
-    q->eachBinding(boost::bind(&DumpClient::dumpBinding, this, q->getName(), _1));
+    MessageUpdater updater(q->getName(), session);
+    q->eachMessage(boost::bind(&MessageUpdater::updateQueuedMessage, &updater, _1));
+    q->eachBinding(boost::bind(&UpdateClient::updateBinding, this, q->getName(), _1));
 }
 
 
-void DumpClient::dumpBinding(const std::string& queue, const QueueBinding& binding) {
+void UpdateClient::updateBinding(const std::string& queue, const QueueBinding& binding) {
     session.exchangeBind(queue, binding.exchange, binding.key, binding.args);
 }
 
-void DumpClient::dumpConnection(const boost::intrusive_ptr<Connection>& dumpConnection) {
-    QPID_LOG(debug, dumperId << " dumping connection " << *dumpConnection);
+void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& updateConnection) {
+    QPID_LOG(debug, updaterId << " updateing connection " << *updateConnection);
     shadowConnection = catchUpConnection();
 
-    broker::Connection& bc = dumpConnection->getBrokerConnection();
+    broker::Connection& bc = updateConnection->getBrokerConnection();
     // FIXME aconway 2008-10-20: What authentication info to use on reconnect?
-    shadowConnection.open(dumpeeUrl, bc.getUserId(), ""/*password*/, "/"/*vhost*/, bc.getFrameMax());
-    bc.eachSessionHandler(boost::bind(&DumpClient::dumpSession, this, _1));
+    shadowConnection.open(updateeUrl, bc.getUserId(), ""/*password*/, "/"/*vhost*/, bc.getFrameMax());
+    bc.eachSessionHandler(boost::bind(&UpdateClient::updateSession, this, _1));
     ClusterConnectionProxy(shadowConnection).shadowReady(
-        dumpConnection->getId().getMember(),
-        reinterpret_cast<uint64_t>(dumpConnection->getId().getPointer()));
+        updateConnection->getId().getMember(),
+        reinterpret_cast<uint64_t>(updateConnection->getId().getPointer()));
     shadowConnection.close();
-    QPID_LOG(debug, dumperId << " dumped connection " << *dumpConnection);
+    QPID_LOG(debug, updaterId << " updated connection " << *updateConnection);
 }
 
-void DumpClient::dumpSession(broker::SessionHandler& sh) {
-    QPID_LOG(debug, dumperId << " dumping session " << &sh.getConnection()  << "[" << sh.getChannel() << "] = "
+void UpdateClient::updateSession(broker::SessionHandler& sh) {
+    QPID_LOG(debug, updaterId << " updateing session " << &sh.getConnection()  << "[" << sh.getChannel() << "] = "
              << sh.getSession()->getId());
     broker::SessionState* ss = sh.getSession();
     if (!ss) return;            // no session.
 
-    // Create a client session to dump session state. 
+    // Create a client session to update session state. 
     boost::shared_ptr<client::ConnectionImpl> cimpl = client::ConnectionAccess::getImpl(shadowConnection);
     boost::shared_ptr<client::SessionImpl> simpl = cimpl->newSession(ss->getId().getName(), ss->getTimeout(), sh.getChannel());
     client::SessionBase_0_10Access(shadowSession).set(simpl);
@@ -229,15 +229,15 @@
 
     // Re-create session state on remote connection.
 
-    // Dump consumers. For reasons unknown, boost::bind does not work here with boost 1.33.
-    QPID_LOG(debug, dumperId << " dumping consumers.");
-    ss->getSemanticState().eachConsumer(std::bind1st(std::mem_fun(&DumpClient::dumpConsumer),this));
+    // Update consumers. For reasons unknown, boost::bind does not work here with boost 1.33.
+    QPID_LOG(debug, updaterId << " updateing consumers.");
+    ss->getSemanticState().eachConsumer(std::bind1st(std::mem_fun(&UpdateClient::updateConsumer),this));
 
-    QPID_LOG(debug, dumperId << " dumping unacknowledged messages.");
+    QPID_LOG(debug, updaterId << " updateing unacknowledged messages.");
     broker::DeliveryRecords& drs = ss->getSemanticState().getUnacked();
-    std::for_each(drs.begin(), drs.end(),  boost::bind(&DumpClient::dumpUnacked, this, _1));
+    std::for_each(drs.begin(), drs.end(),  boost::bind(&UpdateClient::updateUnacked, this, _1));
 
-    dumpTxState(ss->getSemanticState());           // Tx transaction state.
+    updateTxState(ss->getSemanticState());           // Tx transaction state.
 
     //  Adjust for command counter for message in progress, will be sent after state update.
     boost::intrusive_ptr<Message> inProgress = ss->getMessageInProgress();
@@ -263,11 +263,11 @@
 
     // FIXME aconway 2008-09-23: update session replay list.
 
-    QPID_LOG(debug, dumperId << " dumped session " << sh.getSession()->getId());
+    QPID_LOG(debug, updaterId << " updated session " << sh.getSession()->getId());
 }
 
-void DumpClient::dumpConsumer(const broker::SemanticState::ConsumerImpl* ci) {
-    QPID_LOG(debug, dumperId << " dumping consumer " << ci->getName() << " on " << shadowSession.getId());
+void UpdateClient::updateConsumer(const broker::SemanticState::ConsumerImpl* ci) {
+    QPID_LOG(debug, updaterId << " updateing consumer " << ci->getName() << " on " << shadowSession.getId());
     using namespace message;
     shadowSession.messageSubscribe(
         arg::queue       = ci->getQueue()->getName(),
@@ -289,15 +289,15 @@
         ci->isNotifyEnabled()
     );
     client::SessionBase_0_10Access(shadowSession).get()->send(state);
-    QPID_LOG(debug, dumperId << " dumped consumer " << ci->getName() << " on " << shadowSession.getId());
+    QPID_LOG(debug, updaterId << " updated consumer " << ci->getName() << " on " << shadowSession.getId());
 }
     
-void DumpClient::dumpUnacked(const broker::DeliveryRecord& dr) {
+void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr) {
     if (!dr.isEnded() && dr.isAcquired() && dr.getMessage().payload) {
         // If the message is acquired then it is no longer on the
-        // dumpees queue, put it on the dump queue for dumpee to pick up.
+        // updatees queue, put it on the update queue for updatee to pick up.
         //
-        MessageDumper(DUMP, shadowSession).dumpQueuedMessage(dr.getMessage());
+        MessageUpdater(UPDATE, shadowSession).updateQueuedMessage(dr.getMessage());
     }
     ClusterConnectionProxy(shadowSession).deliveryRecord(
         dr.getQueue()->getName(),
@@ -314,22 +314,22 @@
     );
 }
 
-class TxOpDumper : public broker::TxOpConstVisitor, public MessageDumper {
+class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater {
   public:
-    TxOpDumper(DumpClient& dc, client::AsyncSession s)
-        : MessageDumper(DumpClient::DUMP, s), parent(dc), session(s), proxy(s) {}
+    TxOpUpdater(UpdateClient& dc, client::AsyncSession s)
+        : MessageUpdater(UpdateClient::UPDATE, s), parent(dc), session(s), proxy(s) {}
 
     void operator()(const broker::DtxAck& ) {
         throw InternalErrorException("DTX transactions not currently supported by cluster.");
     }
     
     void operator()(const broker::RecoveredDequeue& rdeq) {
-        dumpMessage(rdeq.getMessage());
+        updateMessage(rdeq.getMessage());
         proxy.txEnqueue(rdeq.getQueue()->getName());
     }
 
     void operator()(const broker::RecoveredEnqueue& renq) {
-        dumpMessage(renq.getMessage());
+        updateMessage(renq.getMessage());
         proxy.txEnqueue(renq.getQueue()->getName());
     }
 
@@ -338,7 +338,7 @@
     }
 
     void operator()(const broker::TxPublish& txPub) {
-        dumpMessage(txPub.getMessage());
+        updateMessage(txPub.getMessage());
         typedef std::list<Queue::shared_ptr> QueueList;
         const QueueList& qlist = txPub.getQueues();
         Array qarray(TYPE_CODE_STR8);
@@ -348,20 +348,20 @@
     }
 
   private:
-    DumpClient& parent;
+    UpdateClient& parent;
     client::AsyncSession session;
     ClusterConnectionProxy proxy;
 };
     
-void DumpClient::dumpTxState(broker::SemanticState& s) {
-    QPID_LOG(debug, dumperId << " dumping TX transaction state.");
+void UpdateClient::updateTxState(broker::SemanticState& s) {
+    QPID_LOG(debug, updaterId << " updateing TX transaction state.");
     ClusterConnectionProxy proxy(shadowSession);
     proxy.accumulatedAck(s.getAccumulatedAck());
     broker::TxBuffer::shared_ptr txBuffer = s.getTxBuffer();
     if (txBuffer) {
         proxy.txStart();
-        TxOpDumper dumper(*this, shadowSession);
-        txBuffer->accept(dumper);
+        TxOpUpdater updater(*this, shadowSession);
+        txBuffer->accept(updater);
         proxy.txEnd();
     }
 }

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
------------------------------------------------------------------------------
    svn:mergeinfo = 

Copied: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h (from r737067, qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h?p2=qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h&p1=qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h&r1=737067&r2=737971&rev=737971&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h Tue Jan 27 02:08:25 2009
@@ -1,5 +1,5 @@
-#ifndef QPID_CLUSTER_DUMPCLIENT_H
-#define QPID_CLUSTER_DUMPCLIENT_H
+#ifndef QPID_CLUSTER_UPDATECLIENT_H
+#define QPID_CLUSTER_UPDATECLIENT_H
 
 /*
  *
@@ -56,38 +56,38 @@
 class ClusterMap;
 
 /**
- * A client that dumps the contents of a local broker to a remote one using AMQP.
+ * A client that updates the contents of a local broker to a remote one using AMQP.
  */
-class DumpClient : public sys::Runnable {
+class UpdateClient : public sys::Runnable {
   public:
-    static const std::string DUMP; // Name for special dump queue and exchange.
+    static const std::string UPDATE; // Name for special update queue and exchange.
     
-    DumpClient(const MemberId& dumper, const MemberId& dumpee, const Url&,
+    UpdateClient(const MemberId& updater, const MemberId& updatee, const Url&,
                broker::Broker& donor, const ClusterMap& map, const std::vector<boost::intrusive_ptr<Connection> >& ,
                const boost::function<void()>& done,
                const boost::function<void(const std::exception&)>& fail);
 
-    ~DumpClient();
-    void dump();
+    ~UpdateClient();
+    void update();
     void run();                 // Will delete this when finished.
 
-    void dumpUnacked(const broker::DeliveryRecord&);
+    void updateUnacked(const broker::DeliveryRecord&);
 
   private:
-    void dumpQueue(const boost::shared_ptr<broker::Queue>&);
-    void dumpExchange(const boost::shared_ptr<broker::Exchange>&);
-    void dumpMessage(const broker::QueuedMessage&);
-    void dumpMessageTo(const broker::QueuedMessage&, const std::string& queue, client::Session s);
-    void dumpBinding(const std::string& queue, const broker::QueueBinding& binding);
-    void dumpConnection(const boost::intrusive_ptr<Connection>& connection);
-    void dumpSession(broker::SessionHandler& s);
-    void dumpTxState(broker::SemanticState& s);
-    void dumpConsumer(const broker::SemanticState::ConsumerImpl*);
-
-    MemberId dumperId;
-    MemberId dumpeeId;
-    Url dumpeeUrl;
-    broker::Broker& dumperBroker;
+    void updateQueue(const boost::shared_ptr<broker::Queue>&);
+    void updateExchange(const boost::shared_ptr<broker::Exchange>&);
+    void updateMessage(const broker::QueuedMessage&);
+    void updateMessageTo(const broker::QueuedMessage&, const std::string& queue, client::Session s);
+    void updateBinding(const std::string& queue, const broker::QueueBinding& binding);
+    void updateConnection(const boost::intrusive_ptr<Connection>& connection);
+    void updateSession(broker::SessionHandler& s);
+    void updateTxState(broker::SemanticState& s);
+    void updateConsumer(const broker::SemanticState::ConsumerImpl*);
+
+    MemberId updaterId;
+    MemberId updateeId;
+    Url updateeUrl;
+    broker::Broker& updaterBroker;
     ClusterMap map;
     std::vector<boost::intrusive_ptr<Connection> > connections;
     client::Connection connection, shadowConnection;
@@ -98,4 +98,4 @@
 
 }} // namespace qpid::cluster
 
-#endif  /*!QPID_CLUSTER_DUMPCLIENT_H*/
+#endif  /*!QPID_CLUSTER_UPDATECLIENT_H*/

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h
------------------------------------------------------------------------------
    svn:mergeinfo = 

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=737971&r1=737970&r2=737971&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Tue Jan 27 02:08:25 2009
@@ -27,7 +27,7 @@
 #include "qpid/client/FailoverListener.h"
 #include "qpid/cluster/Cluster.h"
 #include "qpid/cluster/Cpg.h"
-#include "qpid/cluster/DumpClient.h"
+#include "qpid/cluster/UpdateClient.h"
 #include "qpid/framing/AMQBody.h"
 #include "qpid/framing/Uuid.h"
 #include "qpid/framing/reply_exceptions.h"
@@ -352,8 +352,8 @@
     BOOST_CHECK_EQUAL(c1.subs.get("q2", TIME_SEC).getData(), "22");
 }
 
-QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testDumpTxState, 1) {
-    // Verify that we dump transaction state correctly to new members.
+QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testUpdateTxState, 1) {
+    // Verify that we update transaction state correctly to new members.
     ClusterFixture cluster(1);
     Client c0(cluster[0], "c0");
 
@@ -386,8 +386,8 @@
     BOOST_CHECK_EQUAL(m.getData(), "3");
 }
 
-QPID_AUTO_TEST_CASE(testDumpMessageBuilder) {
-    // Verify that we dump a partially recieved message to a new member.
+QPID_AUTO_TEST_CASE(testUpdateMessageBuilder) {
+    // Verify that we update a partially recieved message to a new member.
     ClusterFixture cluster(1);    
     Client c0(cluster[0], "c0");
     c0.session.queueDeclare("q");
@@ -452,7 +452,7 @@
     BOOST_CHECK_EQUAL(kb0, kb2);
 }
 
-QPID_AUTO_TEST_CASE(DumpConsumers) {
+QPID_AUTO_TEST_CASE(UpdateConsumers) {
     ClusterFixture cluster(1, 1);  
 
     Client c0(cluster[0], "c0"); 

Modified: qpid/trunk/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=737971&r1=737970&r2=737971&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ qpid/trunk/qpid/cpp/xml/cluster.xml Tue Jan 27 02:08:25 2009
@@ -27,12 +27,12 @@
   <class name = "cluster" code = "0x80" label="Qpid clustering extensions.">
     <doc>Qpid extension class to allow clustered brokers to communicate.</doc>
 
-    <control name="dump-request" code="0x1" label="URL for a member.">
+    <control name="update-request" code="0x1" label="URL for a member.">
       <field name="url" type="str16"/>
     </control>
 
-    <control name = "dump-offer" code="0x2" label="Member offering to be dumper for dumpee.">
-      <field name="dumpee" type="uint64"/>
+    <control name = "update-offer" code="0x2" label="Member offering to be updater for updatee.">
+      <field name="updatee" type="uint64"/>
       <field name="cluster-id" type="uuid"/>
     </control>
 
@@ -60,13 +60,13 @@
       <field name="bytes" type="uint32"/>
     </control>
 
-    <!-- Brain-dump controls. Sent to a new broker in joining mode.
-	 A connection is dumped as followed:
+    <!-- Update controls. Sent to a new broker in joining mode.
+	 A connection is updateed as followed:
 	 - open as a normal connection.
 	 - attach sessions, create consumers, set flow with normal AMQP cokmmands.
 	 - send /reset additional session state with controls below.
-	 - send shadow-ready to mark end of shadow dump.
-	 - send dump-complete when entire dump is complete.
+	 - send shadow-ready to mark end of shadow update.
+	 - send update-complete when entire update is complete.
     -->
     
     <!-- Consumer state that cannot be set by standard AMQP controls. -->
@@ -103,8 +103,8 @@
     <control name="tx-end" code="0x17"/>
     <control name="accumulated-ack" code="0x18"> <field name="commands" type="sequence-set"/> </control>
     
-    <!-- Complete a session state dump. -->
-    <control name="session-state" code="0x1F" label="Set session state during a brain dump.">
+    <!-- Complete a session state update. -->
+    <control name="session-state" code="0x1F" label="Set session state during a brain update.">
       <!-- Target session deduced from channel number.  -->
       <field name="replay-start" type="sequence-no"/>	       <!-- Replay frames will start from this point.-->
       <field name="command-point" type="sequence-no"/>	       <!-- Id of next command sent -->
@@ -116,15 +116,15 @@
       <field name="received-incomplete" type="sequence-set"/>  <!-- Received and incomplete -->
     </control>
 
-    <!-- Complete a shadow connection dump. -->
-    <control name="shadow-ready" code="0x20" label="End of shadow connection dump.">
+    <!-- Complete a shadow connection update. -->
+    <control name="shadow-ready" code="0x20" label="End of shadow connection update.">
       <field name="member-id" type="uint64"/>
       <field name="connection-id" type="uint64"/>
     </control>
 
-    <!-- Complete a cluster state dump. -->
+    <!-- Complete a cluster state update. -->
     <control name="membership" code="0x21" label="Cluster membership details.">
-      <field name="newbies" type="map"/> <!-- member-id -> URL -->
+      <field name="joiners" type="map"/> <!-- member-id -> URL -->
       <field name="members" type="map"/> <!-- member-id -> state -->
     </control>
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org