You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/09/18 15:55:31 UTC

svn commit: r696657 - in /incubator/qpid/trunk/qpid/cpp/src: ./ qpid/cluster/ qpid/sys/ tests/

Author: aconway
Date: Thu Sep 18 06:55:30 2008
New Revision: 696657

URL: http://svn.apache.org/viewvc?rev=696657&view=rev
Log:
Refactor Cluster logic into separate handlers for Joining & Member modes.

Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.h   (with props)
Modified:
    incubator/qpid/trunk/qpid/cpp/src/cluster.mk
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h
    incubator/qpid/trunk/qpid/cpp/src/tests/DumpClientTest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/cluster.mk
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/cluster.mk?rev=696657&r1=696656&r2=696657&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ incubator/qpid/trunk/qpid/cpp/src/cluster.mk Thu Sep 18 06:55:30 2008
@@ -28,7 +28,13 @@
   qpid/cluster/DumpClient.h \
   qpid/cluster/DumpClient.cpp \
   qpid/cluster/ClusterMap.h \
-  qpid/cluster/ClusterMap.cpp
+  qpid/cluster/ClusterMap.cpp \
+  qpid/cluster/ClusterHandler.h \
+  qpid/cluster/ClusterHandler.cpp \
+  qpid/cluster/JoiningHandler.h \
+  qpid/cluster/JoiningHandler.cpp \
+  qpid/cluster/MemberHandler.h \
+  qpid/cluster/MemberHandler.cpp
 
 cluster_la_LIBADD= -lcpg libqpidbroker.la libqpidclient.la
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=696657&r1=696656&r2=696657&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Thu Sep 18 06:55:30 2008
@@ -23,8 +23,6 @@
 #include "qpid/broker/SessionState.h"
 #include "qpid/broker/Connection.h"
 #include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/AMQP_AllOperations.h"
-#include "qpid/framing/AllInvoker.h"
 #include "qpid/framing/ClusterDumpRequestBody.h"
 #include "qpid/framing/ClusterUpdateBody.h"
 #include "qpid/framing/ClusterReadyBody.h"
@@ -55,17 +53,6 @@
 using qpid::management::Args;
 namespace _qmf = qmf::org::apache::qpid::cluster;
 
-struct ClusterOperations : public AMQP_AllOperations::ClusterHandler {
-    Cluster& cluster;
-    MemberId member;
-    ClusterOperations(Cluster& c, const MemberId& id) : cluster(c), member(id) {}
-    bool invoke(AMQFrame& f) { return framing::invoke(*this, *f.getBody()).wasHandled(); }
-
-    void update(const FieldTable& members, uint64_t dumping) { cluster.update(members, dumping); }
-    void dumpRequest(const std::string& url) { cluster.dumpRequest(member, url); }
-    void ready(const std::string& url) { cluster.ready(member, url); }
-};
-
 Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
     broker(b),
     poller(b.getPoller()),
@@ -79,16 +66,18 @@
                       boost::bind(&Cluster::disconnect, this, _1) // disconnect
     ),
     connectionEventQueue(EventQueue::forEach(boost::bind(&Cluster::connectionEvent, this, _1))),
-    state(START)
+    handler(&joiningHandler),
+    joiningHandler(*this),
+    memberHandler(*this)
 {
     ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
     if (agent != 0){
         _qmf::Package  packageInit(agent);
         mgmtObject = new _qmf::Cluster (agent, this, &broker,name.str(),url.str());
         agent->addObject (mgmtObject);
-		mgmtObject->set_status("JOINING");
+        mgmtObject->set_status("JOINING");
 		
-		// if first cluster up set new UUID to set_clusterID() else set UUID of cluster being joined.
+        // if first cluster up set new UUID to set_clusterID() else set UUID of cluster being joined.
     }
     QPID_LOG(notice, self << " joining cluster " << name.str());
     broker.addFinalizer(boost::bind(&Cluster::shutdown, this));
@@ -108,9 +97,6 @@
     connections.erase(id);
 }
 
-// FIXME aconway 2008-09-10: call leave from cluster admin command.
-// Any other type of exit is caught in disconnect().
-// 
 void Cluster::leave() {
     QPID_LOG(notice, self << " leaving cluster " << name.str());
     cpg.leave(name);
@@ -147,6 +133,7 @@
 } 
 
 // FIXME aconway 2008-09-15: volatile for locked/unlocked functions.
+// Check locking from Handler functions.
 boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& id) {
     Mutex::ScopedLock l(lock);
     if (id.getMember() == self)
@@ -179,24 +166,16 @@
             AMQFrame frame;
             while (frame.decode(buf)) {
                 QPID_LOG(trace, "DLVR [" << e.getConnectionId().getMember() << "]: " << *frame.getBody());
-                if (!ClusterOperations(*this, e.getConnectionId().getMember()).invoke(frame))
+                if (!handler->invoke(e.getConnectionId().getMember(), frame))
                     throw Exception(QPID_MSG("Invalid cluster control"));
             }
         }
-        else {
-            // Process connection controls & data via the connectionEventQueue
-            // unless we are in the DISCARD state, in which case ignore.
-            if (state != DISCARD) { 
-                e.setConnection(getConnection(e.getConnectionId()));
-                connectionEventQueue.push(e);
-            }
-        }
+        else 
+            handler->deliver(e);
     }
     catch (const std::exception& e) {
-        // FIXME aconway 2008-01-30: exception handling.
         QPID_LOG(critical, "Error in cluster deliver: " << e.what());
-        assert(0);
-        throw;
+        leave();
     }
 }
 
@@ -208,17 +187,19 @@
     else {              // control
         AMQFrame frame;
         while (frame.decode(buf))
-            e.getConnection()->deliver(frame);
+            e.getConnection()->received(frame);
     }
 }
 
 struct AddrList {
     const cpg_address* addrs;
     int count;
-    AddrList(const cpg_address* a, int n) : addrs(a), count(n) {}
+    const char* prefix;
+    AddrList(const cpg_address* a, int n, const char* p=0) : addrs(a), count(n), prefix(p) {}
 };
 
 ostream& operator<<(ostream& o, const AddrList& a) {
+    if (a.count && a.prefix) o << a.prefix;
     for (const cpg_address* p = a.addrs; p < a.addrs+a.count; ++p) {
         const char* reasonString;
         switch (p->reason) {
@@ -252,82 +233,41 @@
     cpg_name */*group*/,
     cpg_address *current, int nCurrent,
     cpg_address *left, int nLeft,
-    cpg_address */*joined*/, int nJoined)
+    cpg_address *joined, int nJoined)
 {
     Mutex::ScopedLock l(lock);
-    // FIXME aconway 2008-09-15: use group terminology not cluster. Member not node.
-    QPID_LOG(info, "Current cluster: " << AddrList(current, nCurrent));
-    QPID_LOG_IF(info, nLeft, "Left the cluster: " << AddrList(left, nLeft));
+    QPID_LOG(debug, "Cluster: " << AddrList(current, nCurrent) << ". "
+             << AddrList(left, nLeft, "Left: "));
     
-    map.left(left, nLeft);
     if (find(left, left+nLeft, self) != left+nLeft) { 
         // I have left the group, this is the final config change.
         QPID_LOG(notice, self << " left cluster " << name.str());
         broker.shutdown();
         return;
     }
+    
+    map.left(left, nLeft);
+    handler->configChange(current, nCurrent, left, nLeft, joined, nJoined);
 
-    if (state == START) {
-        if (nCurrent == 1 && *current == self) { // First in cluster.
-            // First in cluster
-            QPID_LOG(notice, self << " first in cluster.");
-            map.add(self, url);
-            ready();
-        }
-	    updateMemberStats();
-        return;                 
-    }
-
-    if (state == DISCARD && !map.dumper) // try another dump request.
-        mcastControl(ClusterDumpRequestBody(ProtocolVersion(), url.str()), 0);
-
-    if (nJoined && map.sendUpdate(self))  // New members need update
-        mcastControl(map.toControl(), 0);
-
+    // FIXME aconway 2008-09-17: management update.
     //update mgnt stats
-	updateMemberStats();
+    updateMemberStats();
 }
 
-void Cluster::update(const FieldTable& members, uint64_t dumper) {
+void Cluster::update(const MemberId& id, const framing::FieldTable& members, uint64_t dumper) {
     Mutex::ScopedLock l(lock);
-    map.update(members, dumper);
-    QPID_LOG(debug, "Cluster update: " << map);
-    if (state == START) state = DISCARD; // Got first update.
-    if (state == DISCARD && !map.dumper)
-        mcastControl(ClusterDumpRequestBody(ProtocolVersion(), url.str()), 0);
+    handler->update(id, members, dumper); 
 }
 
 void Cluster::dumpRequest(const MemberId& dumpee, const string& urlStr) {
     Mutex::ScopedLock l(lock);
-    if (map.dumper) return;     // Dump already in progress, ignore.
-    map.dumper = map.first();
-    if (dumpee == self && state == DISCARD) { // My turn to receive a dump.
-        QPID_LOG(info, self << " receiving state dump from " << map.dumper);
-        // FIXME aconway 2008-09-15: RECEIVE DUMP
-        // state = CATCHUP; 
-        // stall();
-        // When received
-        mcastControl(ClusterReadyBody(ProtocolVersion(), url.str()), 0);
-        ready();
-    }
-    else if (map.dumper == self && state == READY) { // My turn to send the dump
-        QPID_LOG(info, self << " sending state dump to " << dumpee);
-        // FIXME aconway 2008-09-15: stall & send brain dump - finish DumpClient.
-        // state = DUMPING;
-        // stall();
-        (void)urlStr;       
-        // When dump complete:
-        assert(map.dumper == self);
-        ClusterUpdateBody b = map.toControl();
-        b.setDumper(0);
-        mcastControl(b, 0);
-        // NB: Don't modify my own map till self-delivery.
-    }
+    handler->dumpRequest(dumpee, urlStr);
 }
 
 void Cluster::ready(const MemberId& member, const std::string& url) {
     Mutex::ScopedLock l(lock);
-    map.add(member, Url(url));
+    handler->ready(member, url);
+    // FIXME aconway 2008-09-17: management update.
 }
 
 broker::Broker& Cluster::getBroker(){ return broker; }
@@ -341,18 +281,18 @@
     // FIXME aconway 2008-09-11: Flow control, we should slow down or
     // stop reading from local connections while stalled to avoid an
     // unbounded queue.
-	if (mgmtObject!=0)
-	    mgmtObject->set_status("STALLED");
+    //     if (mgmtObject!=0)
+    //         mgmtObject->set_status("STALLED");
 }
 
 void Cluster::ready() {
     // Called with lock held
-    QPID_LOG(info, self << " ready with URL " << url);
-    state = READY;
+    QPID_LOG(info, self << " ready at URL " << url);
+    mcastControl(ClusterReadyBody(ProtocolVersion(), url.str()), 0);
+    handler = &memberHandler;   // Member mode.
     connectionEventQueue.start(poller);
-    // FIXME aconway 2008-09-15: stall/unstall map?
-	if (mgmtObject!=0)
-	    mgmtObject->set_status("ACTIVE");
+    //     if (mgmtObject!=0)
+    //         mgmtObject->set_status("ACTIVE");
 }
 
 // Called from Broker::~Broker when broker is shut down.  At this
@@ -367,52 +307,53 @@
     delete this;
 }
 
-ManagementObject* Cluster::GetManagementObject(void) const
-{
-   return (ManagementObject*) mgmtObject;
+ManagementObject* Cluster::GetManagementObject(void) const {
+    return (ManagementObject*) mgmtObject;
 }
 
-Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args& /*args*/, string&)
-{
-  Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
-  QPID_LOG (debug, "Queue::ManagementMethod [id=" << methodId << "]");
-
-  switch (methodId)
-  {
-  case _qmf::Cluster::METHOD_STOPCLUSTERNODE:
-      stopClusterNode();
-      break;
-  case _qmf::Cluster::METHOD_STOPFULLCLUSTER:
-      stopFullCluster();
-      break;
-  }
+Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args& /*args*/, string&) {
+    Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
+    QPID_LOG (debug, "Queue::ManagementMethod [id=" << methodId << "]");
+
+    switch (methodId)
+    {
+      case _qmf::Cluster::METHOD_STOPCLUSTERNODE:
+        stopClusterNode();
+        break;
+      case _qmf::Cluster::METHOD_STOPFULLCLUSTER:
+        stopFullCluster();
+        break;
+    }
 
-  return status;
+    return status;
 }    
 
 void Cluster::stopClusterNode(void)
 {
+    // FIXME aconway 2008-09-18: 
     QPID_LOG(notice, self << " disconnected from cluster " << name.str());
     broker.shutdown();
 }
 
 void Cluster::stopFullCluster(void)
 {
+    // FIXME aconway 2008-09-17: TODO
 }
 
 void Cluster::updateMemberStats(void)
 {
     //update mgnt stats
-	if (mgmtObject!=0){
-	    mgmtObject->set_clusterSize(size()); 
-		std::vector<Url> vectUrl = getUrls();
-        string urlstr;
-        for(std::vector<Url>::iterator iter = vectUrl.begin(); iter != vectUrl.end(); iter++ ) {
-		    if (iter != vectUrl.begin()) urlstr += ";";
-			urlstr += iter->str();
-		}
-		mgmtObject->set_members(urlstr);
-	}
+    // FIXME aconway 2008-09-18: 
+//     if (mgmtObject!=0){
+//         mgmtObject->set_clusterSize(size()); 
+//         std::vector<Url> vectUrl = getUrls();
+//         string urlstr;
+//         for(std::vector<Url>::iterator iter = vectUrl.begin(); iter != vectUrl.end(); iter++ ) {
+//             if (iter != vectUrl.begin()) urlstr += ";";
+//             urlstr += iter->str();
+//         }
+//         mgmtObject->set_members(urlstr);
+//     }
 
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=696657&r1=696656&r2=696657&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Thu Sep 18 06:55:30 2008
@@ -23,11 +23,12 @@
 #include "Event.h"
 #include "NoOpConnectionOutputHandler.h"
 #include "ClusterMap.h"
+#include "JoiningHandler.h"
+#include "MemberHandler.h"
 
 #include "qpid/broker/Broker.h"
 #include "qpid/sys/PollableQueue.h"
 #include "qpid/sys/Monitor.h"
-#include "qpid/framing/AMQP_AllOperations.h"
 #include "qpid/Url.h"
 #include "qpid/management/Manageable.h"
 #include "qmf/org/apache/qpid/cluster/Cluster.h"
@@ -78,7 +79,7 @@
     void leave();
 
     // Cluster controls.
-    void update(const framing::FieldTable& members, uint64_t dumping);
+    void update(const MemberId&, const framing::FieldTable& members, uint64_t dumping);
     void dumpRequest(const MemberId&, const std::string& url);
     void ready(const MemberId&, const std::string& url);
 
@@ -127,8 +128,6 @@
     /** Callback if CPG fd is disconnected. */
     void disconnect(sys::DispatchHandle&);
 
-    void handleMethod(MemberId from, cluster::Connection* connection, framing::AMQMethodBody& method);
-
     boost::intrusive_ptr<cluster::Connection> getConnection(const ConnectionId&);
 
     virtual qpid::management::ManagementObject* GetManagementObject(void) const;
@@ -151,6 +150,14 @@
     EventQueue connectionEventQueue;
     State state;
     qmf::org::apache::qpid::cluster::Cluster* mgmtObject; // mgnt owns lifecycle
+
+    // Handlers for different states.
+    ClusterHandler* handler;
+    JoiningHandler joiningHandler;
+    MemberHandler memberHandler;
+
+  friend class JoiningHandler;
+  friend class MemberHandler;
 };
 
 }} // namespace qpid::cluster

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.cpp?rev=696657&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.cpp Thu Sep 18 06:55:30 2008
@@ -0,0 +1,53 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/framing/AllInvoker.h"
+
+#include "ClusterHandler.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/FieldTable.h"
+
+
+
+namespace qpid {
+namespace cluster {
+
+struct Operations : public framing::AMQP_AllOperations::ClusterHandler {
+    qpid::cluster::ClusterHandler& handler;
+    MemberId member;
+    Operations(qpid::cluster::ClusterHandler& c, const MemberId& id) : handler(c), member(id) {}
+
+    void update(const framing::FieldTable& members, uint64_t dumping) { handler.update(member, members, dumping); }
+    void dumpRequest(const std::string& url) { handler.dumpRequest(member, url); }
+    void ready(const std::string& url) { handler.ready(member, url); }
+};
+
+ClusterHandler::~ClusterHandler() {}
+
+ClusterHandler::ClusterHandler(Cluster& c) : cluster (c) {}
+
+bool ClusterHandler::invoke(const MemberId& id, framing::AMQFrame& frame) {
+    Operations ops(*this, id);
+    return framing::invoke(ops, *frame.getBody()).wasHandled(); 
+}
+
+}} // namespace qpid::cluster
+

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.h?rev=696657&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.h Thu Sep 18 06:55:30 2008
@@ -0,0 +1,65 @@
+#ifndef QPID_CLUSTER_CLUSTERHANDLER_H
+#define QPID_CLUSTER_CLUSTERHANDLER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Cpg.h"
+#include "types.h"
+
+namespace qpid {
+
+namespace framing { class AMQFrame; }
+
+namespace cluster {
+
+class Cluster;
+class Event;
+
+/**
+ * Interface for handing cluster events.
+ * Implementations provide different behavior for different states of a member..
+ */
+class ClusterHandler
+{
+  public:
+    ClusterHandler(Cluster& c);
+    virtual ~ClusterHandler();
+
+    virtual void update(const MemberId&, const framing::FieldTable& members, uint64_t dumping) = 0;
+    virtual void dumpRequest(const MemberId&, const std::string& url) = 0;
+    virtual void ready(const MemberId&, const std::string& url) = 0;
+
+    virtual void deliver(Event& e) = 0; // Deliver a connection event.
+
+    virtual void configChange(cpg_address *current, int nCurrent,
+                              cpg_address *left, int nLeft,
+                              cpg_address *joined, int nJoined) = 0;
+
+    bool invoke(const MemberId&, framing::AMQFrame& f);
+
+  protected:
+    Cluster& cluster;
+};
+
+}} // namespace qpid::cluster
+
+#endif  /*!QPID_CLUSTER_CLUSTERHANDLER_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp?rev=696657&r1=696656&r2=696657&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp Thu Sep 18 06:55:30 2008
@@ -53,8 +53,8 @@
     return b;
 }
 
-void ClusterMap::update(const FieldTable& ftMembers, uint64_t dumper_) {
-    FieldTable::ValueMap::const_iterator i;
+void ClusterMap::update(const framing::FieldTable& ftMembers, uint64_t dumper_) {
+    framing:: FieldTable::ValueMap::const_iterator i;
     for (i = ftMembers.begin(); i != ftMembers.end(); ++i) 
         members[i->first] = Url(i->second->get<std::string>());
     dumper = MemberId(dumper_);
@@ -82,8 +82,10 @@
     return dumper==id || (!dumper && first() == id);
 }
 
-void ClusterMap::add(const MemberId& id, const Url& url) {
+void ClusterMap::ready(const MemberId& id, const Url& url) {
     members[id] = url;
+    if (id == dumper)
+        dumper = MemberId();
 }
 
 }} // namespace qpid::cluster

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h?rev=696657&r1=696656&r2=696657&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h Thu Sep 18 06:55:30 2008
@@ -34,8 +34,6 @@
 namespace qpid {
 namespace cluster {
 
-// FIXME aconway 2008-09-15: rename cluster status?
-
 /**
  * Map of established cluster members and brain-dumps in progress.
  * A dumper is an established member that is sending catch-up data.
@@ -58,8 +56,8 @@
     /** Convert map contents to a cluster update body. */
     framing::ClusterUpdateBody toControl() const;
 
-    /** Add a new member. */
-    void add(const MemberId& id, const Url& url);
+    /** Add a new member or dump complete if id == dumper. */
+    void ready(const MemberId& id, const Url& url);
 
     /** Apply update delivered from clsuter. */
     void update(const framing::FieldTable& members, uint64_t dumper);
@@ -70,6 +68,7 @@
     std::vector<Url> memberUrls() const;
     size_t size() const { return members.size(); }
     
+    bool empty() const { return members.empty(); }
   private:
 
   friend std::ostream& operator<<(std::ostream&, const ClusterMap&);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=696657&r1=696656&r2=696657&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Thu Sep 18 06:55:30 2008
@@ -21,11 +21,9 @@
 #include "Connection.h"
 #include "Cluster.h"
 #include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/Invoker.h"
-#include "qpid/framing/AllInvoker.h"
 #include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
 #include "qpid/log/Statement.h"
-
+#include "qpid/framing/AllInvoker.h"
 #include <boost/current_function.hpp>
 
 namespace qpid {
@@ -47,11 +45,6 @@
 
 Connection::~Connection() {}
 
-void Connection::received(framing::AMQFrame& ) {
-    // FIXME aconway 2008-09-02: not called, codec sends straight to deliver
-    assert(0);
-}
-
 bool Connection::doOutput() { return output.doOutput(); }
 
 // Delivery of doOutput allows us to run the real connection doOutput()
@@ -62,7 +55,7 @@
 }
 
 // Handle frames delivered from cluster.
-void Connection::deliver(framing::AMQFrame& f) {
+void Connection::received(framing::AMQFrame& f) {
     QPID_LOG(trace, "DLVR [" << self << "]: " << f);
     // Handle connection controls, deliver other frames to connection.
     if (!framing::invoke(*this, *f.getBody()).wasHandled())
@@ -95,14 +88,13 @@
 size_t Connection::decode(const char* buffer, size_t size) { 
     ++mcastSeq;
     cluster.mcastBuffer(buffer, size, self);
-    // FIXME aconway 2008-09-01: deserialize?
     return size;
 }
 
 void Connection::deliverBuffer(Buffer& buf) {
     ++deliverSeq;
     while (decoder.decode(buf))
-        deliver(decoder.frame); // FIXME aconway 2008-09-01: Queue frames for delivery in separate thread.
+        received(decoder.frame);
 }
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=696657&r1=696656&r2=696657&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Thu Sep 18 06:55:30 2008
@@ -63,7 +63,6 @@
     Cluster& getCluster() { return cluster; }
 
     // self-delivery of multicast data.
-    void deliver(framing::AMQFrame& f);
     void deliverClose();
     void deliverDoOutput(uint32_t requested);
     void deliverBuffer(framing::Buffer&);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp?rev=696657&r1=696656&r2=696657&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp Thu Sep 18 06:55:30 2008
@@ -36,9 +36,7 @@
     return 0;
 }
 
-// FIXME aconway 2008-08-27: outbound connections need to be made
-// with proper qpid::client code for failover, get rid of this
-// broker-side hack.
+// Used for outgoing Link connections, we don't care.
 sys::ConnectionCodec*
 ConnectionCodec::Factory::create(sys::OutputControl& out, const std::string& id) {
     return next->create(out, id);
@@ -60,7 +58,6 @@
     return interceptor->decode(buffer, size);
 }
 
-// FIXME aconway 2008-09-02: delegate to interceptor?
 size_t ConnectionCodec::encode(const char* buffer, size_t size) { return codec.encode(buffer, size); }
 bool ConnectionCodec::canEncode() { return codec.canEncode(); }
 void ConnectionCodec::closed() { codec.closed(); }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp?rev=696657&r1=696656&r2=696657&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp Thu Sep 18 06:55:30 2008
@@ -101,9 +101,7 @@
     return flowState == CPG_FLOW_CONTROL_ENABLED;
 }
 
-// TODO aconway 2008-08-07: better handling of flow control.
-// Wait for flow control to be disabled.
-// FIXME aconway 2008-08-08: does flow control check involve a round-trip? If so maybe remove...
+// FIXME aconway 2008-08-07: better handling of cpg flow control, no sleeping.
 void Cpg::waitForFlowControl() {
     int delayNs=1000;           // one millisecond
     int tries=8;                // double the delay on each try.

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp?rev=696657&r1=696656&r2=696657&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp Thu Sep 18 06:55:30 2008
@@ -44,36 +44,39 @@
 
 using namespace client;
 
-DumpClient::DumpClient(const Url& url, Broker& b, const boost::function<void(const char*)>& f)
-  : donor(b), failed(f)
+DumpClient::DumpClient(const Url& url, Broker& b,
+                       const boost::function<void()>& ok,
+                       const boost::function<void(const std::exception&)>& fail)
+    : donor(b), done(ok), failed(fail)
 {
+    // FIXME aconway 2008-09-16: Identify as DumpClient connection.
     connection.open(url);
     session = connection.newSession();
 }
 
-DumpClient::~DumpClient() {
-    session.close();
-    connection.close();
-}
+DumpClient::~DumpClient() {}
 
 // Catch-up exchange name: an illegal AMQP exchange name to avoid clashes.
 static const char CATCH_UP_CHARS[] = "\000qpid-dump-exchange";
 static const std::string CATCH_UP(CATCH_UP_CHARS, sizeof(CATCH_UP_CHARS)); 
 
 void DumpClient::dump() {
-    // FIXME aconway 2008-09-08: send cluster map frame first.
     donor.getExchanges().eachExchange(boost::bind(&DumpClient::dumpExchange, this, _1));
     // Catch-up exchange is used to route messages to the proper queue without modifying routing key.
     session.exchangeDeclare(arg::exchange=CATCH_UP, arg::type="fanout", arg::autoDelete=true);
     donor.getQueues().eachQueue(boost::bind(&DumpClient::dumpQueue, this, _1));
     session.sync();
+    session.close();
+    // FIXME aconway 2008-09-17: send dump complete indication.
+    connection.close();
 }
 
 void DumpClient::run() {
     try {
         dump();
-    } catch (const Exception& e) {
-        failed(e.what());
+        done();
+    } catch (const std::exception& e) {
+        failed(e);
     }
     delete this;
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h?rev=696657&r1=696656&r2=696657&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h Thu Sep 18 06:55:30 2008
@@ -54,7 +54,10 @@
  */
 class DumpClient : public sys::Runnable {
   public:
-    DumpClient(const Url& url, broker::Broker& donor, const boost::function<void(const char*)>& onFail);
+    DumpClient(const Url& url, broker::Broker& donor,
+               const boost::function<void()>& done,
+               const boost::function<void(const std::exception&)>& fail);
+
     ~DumpClient();
     void dump();
     void run();                 // Will delete this when finished.
@@ -69,7 +72,8 @@
     client::Connection connection;
     client::AsyncSession session;
     broker::Broker& donor;
-    boost::function<void(const char*)> failed;
+    boost::function<void()> done;
+    boost::function<void(const std::exception& e)> failed;
 };
 
 }} // namespace qpid::cluster

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp?rev=696657&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp Thu Sep 18 06:55:30 2008
@@ -0,0 +1,106 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "JoiningHandler.h"
+#include "Cluster.h"
+#include "qpid/framing/ClusterDumpRequestBody.h"
+#include "qpid/framing/ClusterReadyBody.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace cluster {
+
+using namespace sys;
+using namespace framing;
+
+JoiningHandler::JoiningHandler(Cluster& c) : ClusterHandler(c), state(START) {}
+
+void JoiningHandler::configChange(
+    cpg_address *current, int nCurrent,
+    cpg_address */*left*/, int nLeft,
+    cpg_address */*joined*/, int /*nJoined*/)
+{
+    if (nLeft == 0 && nCurrent == 1 && *current == cluster.self) { // First in cluster.
+        QPID_LOG(notice, cluster.self << " first in cluster.");
+        cluster.map.ready(cluster.self, cluster.url);
+        cluster.ready();
+    }
+}
+
+void JoiningHandler::deliver(Event& e) {
+    // Discard connection events unless we are stalled and getting a dump.
+    if (state == STALLED) {
+        e.setConnection(cluster.getConnection(e.getConnectionId()));
+        cluster.connectionEventQueue.push(e);
+    }
+}
+
+void JoiningHandler::update(const MemberId&, const framing::FieldTable& members, uint64_t dumper) {
+    cluster.map.update(members, dumper);
+    QPID_LOG(debug, "Cluster update: " << cluster.map);
+    checkDumpRequest();
+}
+
+void JoiningHandler::checkDumpRequest() {
+    if (state == START && !cluster.map.dumper) {
+        cluster.broker.getPort(); // ensure the broker is listening.
+        state = DUMP_REQUESTED;
+        cluster.mcastControl(ClusterDumpRequestBody(framing::ProtocolVersion(), cluster.url.str()), 0);
+    }
+}
+
+void JoiningHandler::dumpRequest(const MemberId& dumpee, const std::string& ) {
+    if (cluster.map.dumper) {   // Already a dump in progress.
+        if (dumpee == cluster.self && state == DUMP_REQUESTED)
+            state = START;      // Need to make another request.
+    }
+    else {                      // Start a new dump
+        cluster.map.dumper = cluster.map.first();
+        if (dumpee == cluster.self) { // My turn
+
+            state = DUMP_COMPLETE;        // FIXME aconway 2008-09-18: bypass dump
+
+            QPID_LOG(info, cluster.self << " receiving state dump from " << cluster.map.dumper);
+            switch (state) {
+              case START:
+              case STALLED:
+                assert(0); break;
+
+              case DUMP_REQUESTED: 
+                state = STALLED;
+                cluster.stall();
+                break;
+
+                // FIXME aconway 2008-09-17: no transition to DUMP_COMPLETE state.
+              case DUMP_COMPLETE:
+                cluster.ready();
+                break;
+            }
+        }
+    }
+}
+
+void JoiningHandler::ready(const MemberId& id, const std::string& url) {
+    cluster.map.ready(id, Url(url));
+    checkDumpRequest();
+}
+
+
+}} // namespace qpid::cluster

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.h?rev=696657&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.h Thu Sep 18 06:55:30 2008
@@ -0,0 +1,56 @@
+#ifndef QPID_CLUSTER_JOININGHANDLER_H
+#define QPID_CLUSTER_JOININGHANDLER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ClusterHandler.h"
+
+namespace qpid {
+namespace cluster {
+
+/**
+ * Cluster handler for the "joining" phase, before the process is a
+ * full cluster member.
+ */
+class JoiningHandler : public ClusterHandler
+{
+  public:
+    JoiningHandler(Cluster& c);
+    
+    void configChange(struct cpg_address */*members*/, int /*nMembers*/,
+                      struct cpg_address */*left*/, int /*nLeft*/,
+                      struct cpg_address */*joined*/, int /*nJoined*/
+    );
+
+    void deliver(Event& e);
+    
+    void update(const MemberId&, const framing::FieldTable& members, uint64_t dumping);
+    void dumpRequest(const MemberId&, const std::string& url);
+    void ready(const MemberId&, const std::string& url);
+
+  private:
+    enum { START, DUMP_REQUESTED, STALLED, DUMP_COMPLETE } state;
+    void checkDumpRequest();
+
+};
+}} // namespace qpid::cluster
+
+#endif  /*!QPID_CLUSTER_JOININGHANDLER_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.cpp?rev=696657&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.cpp Thu Sep 18 06:55:30 2008
@@ -0,0 +1,83 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "MemberHandler.h"
+#include "Cluster.h"
+#include "DumpClient.h"
+#include "qpid/log/Statement.h"
+#include "qpid/framing/ClusterUpdateBody.h"
+
+namespace qpid {
+namespace cluster {
+
+using namespace sys;
+using namespace framing;
+
+MemberHandler::MemberHandler(Cluster& c) : ClusterHandler(c) {}
+
+void MemberHandler::configChange(
+    cpg_address */*current*/, int /*nCurrent*/,
+    cpg_address */*left*/, int /*nLeft*/,
+    cpg_address */*joined*/, int nJoined)
+{
+    if (nJoined && cluster.map.sendUpdate(cluster.self))  // New members need update
+        cluster.mcastControl(cluster.map.toControl(), 0);
+}
+
+void MemberHandler::deliver(Event& e) {
+    e.setConnection(cluster.getConnection(e.getConnectionId())); 
+    cluster.connectionEventQueue.push(e);
+}
+
+void MemberHandler::update(const MemberId&, const framing::FieldTable& , uint64_t) {}
+
+void MemberHandler::dumpRequest(const MemberId& dumpee, const std::string& urlStr) {
+    if (cluster.map.dumper) return; // dump in progress, ignore request.
+
+    cluster.map.dumper = cluster.map.first();
+    if (cluster.map.dumper != cluster.self) return;
+    
+    QPID_LOG(info, cluster.self << " sending state dump to " << dumpee);
+    assert(!cluster.connectionEventQueue.isStopped()); // Not currently stalled.
+    cluster.stall();
+
+    cluster.ready();            // FIXME aconway 2008-09-18: Bypass dump
+    (void)urlStr;
+//     dumpThread = Thread(new DumpClient(Url(urlStr), cluster.broker,
+//                             boost::bind(&MemberHandler::dumpDone, this),
+//                             boost::bind(&MemberHandler::dumpError, this, _1)));
+}
+
+void MemberHandler::ready(const MemberId& id, const std::string& url) {
+    cluster.map.ready(id, Url(url));
+}
+
+
+void MemberHandler::dumpDone() {
+    dumpThread.join();          // Clean up.
+    cluster.ready();
+}
+
+void MemberHandler::dumpError(const std::exception& e) {
+    QPID_LOG(error, "Error in state dump from " << cluster.self << ": " << e.what());
+    dumpDone();
+}
+
+}} // namespace qpid::cluster

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.h?rev=696657&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.h Thu Sep 18 06:55:30 2008
@@ -0,0 +1,60 @@
+#ifndef QPID_CLUSTER_MEMBERHANDLER_H
+#define QPID_CLUSTER_MEMBERHANDLER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ClusterHandler.h"
+#include "qpid/sys/Thread.h"
+
+namespace qpid {
+namespace cluster {
+
+/**
+ * Cluster handler for the "member" phase, before the process is a
+ * full cluster member.
+ */
+class MemberHandler : public ClusterHandler
+{
+  public:
+    MemberHandler(Cluster& c);
+    
+    void configChange(
+        struct cpg_address */*members*/, int /*nMembers*/,
+        struct cpg_address */*left*/, int /*nLeft*/,
+        struct cpg_address */*joined*/, int /*nJoined*/
+    );
+
+    void deliver(Event& e);
+    
+    void update(const MemberId&, const framing::FieldTable& members, uint64_t dumping);
+    void dumpRequest(const MemberId&, const std::string& url);
+    void ready(const MemberId&, const std::string& url);
+
+    void dumpDone();
+    void dumpError(const std::exception&);
+
+  public:
+    sys::Thread dumpThread;
+};
+}} // namespace qpid::cluster
+
+#endif  /*!QPID_CLUSTER_MEMBERHANDLER_H*/
+

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp?rev=696657&r1=696656&r2=696657&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp Thu Sep 18 06:55:30 2008
@@ -88,7 +88,6 @@
 // Send a doOutput request if one is not already in flight.
 void OutputInterceptor::sendDoOutput() {
     // Call with lock held.
-    // FIXME aconway 2008-08-28: used to  have || parent.getClosed())
     if (!parent.isLocal()) return;
 
     doingOutput = true;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h?rev=696657&r1=696656&r2=696657&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h Thu Sep 18 06:55:30 2008
@@ -71,6 +71,9 @@
 
     /** Stop polling and wait for the current callback, if any, to complete. */
     void stop();
+
+    /** Are we currently stopped?*/
+    bool isStopped() const;
     
   private:
     typedef sys::Monitor::ScopedLock ScopedLock;
@@ -78,7 +81,7 @@
 
     void dispatch(sys::DispatchHandle&);
     
-    sys::Monitor lock;
+    mutable sys::Monitor lock;
     Callback callback;
     PollableCondition condition;
     sys::DispatchHandle handle;
@@ -130,6 +133,11 @@
     while (dispatching) lock.wait();
 }
 
+template <class T> bool PollableQueue<T>::isStopped() const {
+   ScopedLock l(lock);
+   return stopped;
+}
+
 }} // namespace qpid::sys
 
 #endif  /*!QPID_SYS_POLLABLEQUEUE_H*/

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/DumpClientTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/DumpClientTest.cpp?rev=696657&r1=696656&r2=696657&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/DumpClientTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/DumpClientTest.cpp Thu Sep 18 06:55:30 2008
@@ -61,7 +61,7 @@
         c.connection.close();
     }
     Url url(Url::getIpAddressesUrl(receiver.getPort()));
-    qpid::cluster::DumpClient dump(url, *donor.broker, 0);
+    qpid::cluster::DumpClient dump(url, *donor.broker, 0, 0);
     dump.dump();
     {
         Client r(receiver.getPort());