You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/09/25 14:30:15 UTC

svn commit: r698945 - in /incubator/qpid/trunk/qpid/cpp: src/qpid/ src/qpid/amqp_0_10/ src/qpid/broker/ src/qpid/cluster/ xml/

Author: aconway
Date: Thu Sep 25 05:30:14 2008
New Revision: 698945

URL: http://svn.apache.org/viewvc?rev=698945&view=rev
Log:
Enabled management, add cluster shutdown command.
Remove dead Handler methods in Cluster.
Fixed SessionException handling in broker, was throwing some SessionExceptions as "unknown exception"

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.h
    incubator/qpid/trunk/qpid/cpp/xml/cluster.xml

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp?rev=698945&r1=698944&r2=698945&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp Thu Sep 25 05:30:14 2008
@@ -147,7 +147,7 @@
 
 void SessionState::senderConfirmed(const SessionPoint& confirmed) {
     if (confirmed > sender.sendPoint)
-        throw InvalidArgumentException(QPID_MSG(getId() << "Confirmed commands not yet sent."));
+        throw InvalidArgumentException(QPID_MSG(getId() << ": confirmed commands not yet sent."));
     QPID_LOG(debug, getId() << ": sender confirmed point moved to " << confirmed);
     ReplayList::iterator i = sender.replayList.begin();
     while (i != sender.replayList.end() && sender.replayPoint.command < confirmed.command) {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp?rev=698945&r1=698944&r2=698945&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp Thu Sep 25 05:30:14 2008
@@ -81,6 +81,15 @@
             getInHandler()->handle(f);
         }
     }
+    catch(const SessionException& e) {
+        QPID_LOG(error, "Execution exception: " << e.what());
+        framing::AMQP_AllProxy::Execution  execution(channel);
+        AMQMethodBody* m = f.getMethod();
+        SequenceNumber commandId;
+        if (getState()) commandId =  getState()->receiverGetCurrent();
+        execution.exception(e.code, commandId, m ? m->amqpClassId() : 0, m ? m->amqpMethodId() : 0, 0, e.what(), FieldTable());
+        sendDetach();
+    }
     catch(const ChannelException& e){
         QPID_LOG(error, "Channel exception: " << e.what());
         peer.detached(name, e.code);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h?rev=698945&r1=698944&r2=698945&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h Thu Sep 25 05:30:14 2008
@@ -43,8 +43,6 @@
                        public framing::FrameHandler::InOutHandler
 {
   public:
-    typedef framing::AMQP_AllProxy::Session Peer;
-
     SessionHandler(framing::FrameHandler* out=0, uint16_t channel=0);
     ~SessionHandler();
 
@@ -103,7 +101,7 @@
     void checkName(const std::string& name);
 
     framing::ChannelHandler channel;
-    Peer peer;
+    framing::AMQP_AllProxy::Session  peer;
     bool ignoring;
     bool sendReady, receiveReady;
     std::string name;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=698945&r1=698944&r2=698945&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Thu Sep 25 05:30:14 2008
@@ -212,31 +212,15 @@
 
 void SessionState::handleIn(AMQFrame& frame) {
     SequenceNumber commandId = receiverGetCurrent();
-    try {
-        //TODO: make command handling more uniform, regardless of whether
-        //commands carry content.
-        AMQMethodBody* m = frame.getMethod();
-        if (m == 0 || m->isContentBearing()) {
-            handleContent(frame, commandId);
-        } else if (frame.getBof() && frame.getEof()) {
-            handleCommand(frame.getMethod(), commandId);                
-        } else {
-            throw InternalErrorException("Cannot handle multi-frame command segments yet");
-        }
-    } catch(const SessionException& e) {
-        //TODO: better implementation of new exception handling mechanism
-
-        //0-10 final changes the types of exceptions, 'model layer'
-        //exceptions will all be session exceptions regardless of
-        //current channel/connection classification
-
-        AMQMethodBody* m = frame.getMethod();
-        if (m) {
-            getProxy().getExecution().exception(e.code, commandId, m->amqpClassId(), m->amqpMethodId(), 0, e.what(), FieldTable());
-        } else {
-            getProxy().getExecution().exception(e.code, commandId, 0, 0, 0, e.what(), FieldTable());
-        }
-        handler->sendDetach();
+    //TODO: make command handling more uniform, regardless of whether
+    //commands carry content.
+    AMQMethodBody* m = frame.getMethod();
+    if (m == 0 || m->isContentBearing()) {
+        handleContent(frame, commandId);
+    } else if (frame.getBof() && frame.getEof()) {
+        handleCommand(frame.getMethod(), commandId);                
+    } else {
+        throw InternalErrorException("Cannot handle multi-frame command segments yet");
     }
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=698945&r1=698944&r2=698945&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Thu Sep 25 05:30:14 2008
@@ -26,6 +26,7 @@
 #include "qpid/framing/ClusterDumpRequestBody.h"
 #include "qpid/framing/ClusterUpdateBody.h"
 #include "qpid/framing/ClusterReadyBody.h"
+#include "qpid/framing/ClusterShutdownBody.h"
 #include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
 #include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
 #include "qpid/log/Statement.h"
@@ -51,7 +52,7 @@
 using qpid::management::ManagementObject;
 using qpid::management::Manageable;
 using qpid::management::Args;
-namespace _qmf = qmf::org::apache::qpid::cluster;
+namespace qmf = qmf::org::apache::qpid::cluster;
 
 Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
     broker(b),
@@ -66,6 +67,7 @@
                       boost::bind(&Cluster::disconnect, this, _1) // disconnect
     ),
     connectionEventQueue(EventQueue::forEach(boost::bind(&Cluster::connectionEvent, this, _1))),
+    mgmtObject(0),
     handler(&joiningHandler),
     joiningHandler(*this),
     memberHandler(*this),
@@ -73,30 +75,25 @@
 {
     ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
     if (agent != 0){
-        _qmf::Package  packageInit(agent);
-        mgmtObject = new _qmf::Cluster (agent, this, &broker,name.str(),url.str());
+        qmf::Package  packageInit(agent);
+        mgmtObject = new qmf::Cluster (agent, this, &broker,name.str(),url.str());
         agent->addObject (mgmtObject);
         mgmtObject->set_status("JOINING");
-		
+
+        // FIXME aconway 2008-09-24: 
         // if first cluster up set new UUID to set_clusterID() else set UUID of cluster being joined.
     }
     QPID_LOG(notice, self << " joining cluster " << name.str());
-    broker.addFinalizer(boost::bind(&Cluster::shutdown, this));
+    broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this));
     cpgDispatchHandle.startWatch(poller);
     cpg.join(name);
 }
 
 Cluster::~Cluster() {}
 
-void Cluster::insert(const boost::intrusive_ptr<Connection>& c) {
-    Mutex::ScopedLock l(lock);
-    handler->insert(c);
-}
+void Cluster::insert(const boost::intrusive_ptr<Connection>& c) { handler->insert(c); }
 
-void Cluster::catchUpClosed(const boost::intrusive_ptr<Connection>& c) {
-    Mutex::ScopedLock l(lock);
-    handler->catchUpClosed(c);
-}
+void Cluster::catchUpClosed(const boost::intrusive_ptr<Connection>& c) { handler->catchUpClosed(c); }
 
 void Cluster::erase(ConnectionId id) {
     Mutex::ScopedLock l(lock);
@@ -239,10 +236,8 @@
 }
 
 void Cluster::disconnect(sys::DispatchHandle& ) {
-    // FIXME aconway 2008-09-11: this should be logged as critical,
-    // when we provide admin option to shut down cluster and let
-    // members leave cleanly.
-    stopClusterNode();
+    QPID_LOG(critical, self << " unexpectedly disconnected from cluster, shutting down");
+    broker.shutdown();
 }
 
 void Cluster::configChange(
@@ -265,27 +260,8 @@
     
     map.left(left, nLeft);
     handler->configChange(current, nCurrent, left, nLeft, joined, nJoined);
-
-    // FIXME aconway 2008-09-17: management update.
-    //update mgnt stats
-    updateMemberStats();
 }
 
-void Cluster::update(const MemberId& id, const framing::FieldTable& members, uint64_t dumper) {
-    Mutex::ScopedLock l(lock);
-    handler->update(id, members, dumper); 
-}
-
-void Cluster::dumpRequest(const MemberId& dumpee, const string& urlStr) {
-    Mutex::ScopedLock l(lock);
-    handler->dumpRequest(dumpee, urlStr);
-}
-
-void Cluster::ready(const MemberId& member, const std::string& url) {
-    Mutex::ScopedLock l(lock);
-    handler->ready(member, url);
-    // FIXME aconway 2008-09-17: management update.
-}
 
 broker::Broker& Cluster::getBroker(){ return broker; }
 
@@ -295,12 +271,11 @@
     // Stop processing connection events. We still process config changes
     // and cluster controls in deliver()
     connectionEventQueue.stop();
+    if (mgmtObject!=0) mgmtObject->set_status("STALLED");
 
     // FIXME aconway 2008-09-11: Flow control, we should slow down or
     // stop reading from local connections while stalled to avoid an
     // unbounded queue.
-    //     if (mgmtObject!=0)
-    //         mgmtObject->set_status("STALLED");
 }
 
 void Cluster::ready() {
@@ -314,8 +289,7 @@
     QPID_LOG(debug, self << " un-stalling");
     handler = &memberHandler;   // Member mode.
     connectionEventQueue.start(poller);
-    //     if (mgmtObject!=0)
-    //         mgmtObject->set_status("ACTIVE");
+    if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
 }
 
 // Called from Broker::~Broker when broker is shut down.  At this
@@ -323,61 +297,46 @@
 // invoked. We must ensure that CPG has also shut down so no CPG
 // callbacks will be invoked.
 // 
-void Cluster::shutdown() {
+void Cluster::brokerShutdown() {
     QPID_LOG(notice, self << " shutting down.");
     try { cpg.shutdown(); }
     catch (const std::exception& e) { QPID_LOG(error, "During CPG shutdown: " << e.what()); }
     delete this;
 }
 
-ManagementObject* Cluster::GetManagementObject(void) const {
-    return (ManagementObject*) mgmtObject;
-}
+ManagementObject* Cluster::GetManagementObject(void) const { return mgmtObject; }
 
-Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args& /*args*/, string&) {
-    Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
+Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args&, string&) {
     QPID_LOG (debug, "Queue::ManagementMethod [id=" << methodId << "]");
-
-    switch (methodId)
-    {
-      case _qmf::Cluster::METHOD_STOPCLUSTERNODE:
-        stopClusterNode();
-        break;
-      case _qmf::Cluster::METHOD_STOPFULLCLUSTER:
-        stopFullCluster();
-        break;
+    switch (methodId) {
+      case qmf::Cluster::METHOD_STOPCLUSTERNODE: stopClusterNode(); break;
+      case qmf::Cluster::METHOD_STOPFULLCLUSTER: stopFullCluster(); break;
+      default: return Manageable::STATUS_UNKNOWN_METHOD;
     }
-
-    return status;
+    return Manageable::STATUS_OK;
 }    
 
-void Cluster::stopClusterNode(void)
-{
-    // FIXME aconway 2008-09-18: mgmt
-    QPID_LOG(notice, self << " disconnected from cluster " << name.str());
-    broker.shutdown();
-}
-
-void Cluster::stopFullCluster(void)
-{
-    // FIXME aconway 2008-09-17: TODO
-}
-
-void Cluster::updateMemberStats(void)
-{
-    //update mgnt stats
-    // FIXME aconway 2008-09-18: 
-//     if (mgmtObject!=0){
-//         mgmtObject->set_clusterSize(size()); 
-//         std::vector<Url> vectUrl = getUrls();
-//         string urlstr;
-//         for(std::vector<Url>::iterator iter = vectUrl.begin(); iter != vectUrl.end(); iter++ ) {
-//             if (iter != vectUrl.begin()) urlstr += ";";
-//             urlstr += iter->str();
-//         }
-//         mgmtObject->set_members(urlstr);
-//     }
-
+void Cluster::stopClusterNode(void) {
+    QPID_LOG(notice, self << " stopped by admin");
+    leave();
+}
+
+void Cluster::stopFullCluster(void) {
+    QPID_LOG(notice, self << " sending shutdown to cluster.");
+    mcastControl(ClusterShutdownBody(), 0);
+}
+
+void Cluster::updateMemberStats(void) {
+    if (mgmtObject) {
+        mgmtObject->set_clusterSize(size()); 
+        std::vector<Url> vectUrl = getUrls();
+        string urlstr;
+        for(std::vector<Url>::iterator iter = vectUrl.begin(); iter != vectUrl.end(); iter++ ) {
+            if (iter != vectUrl.begin()) urlstr += "\n";
+            urlstr += iter->str();
+        }
+        mgmtObject->set_members(urlstr);
+    }
 }
 
 }} // namespace qpid::cluster

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=698945&r1=698944&r2=698945&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Thu Sep 25 05:30:14 2008
@@ -83,11 +83,6 @@
     /** Leave the cluster */
     void leave();
 
-    // Cluster controls.
-    void update(const MemberId&, const framing::FieldTable& members, uint64_t dumping);
-    void dumpRequest(const MemberId&, const std::string& url);
-    void ready(const MemberId&, const std::string& url);
-
     MemberId getSelf() const { return self; }
     MemberId getId() const { return self; }
 
@@ -95,7 +90,7 @@
     void stall();
     void unstall();
 
-    void shutdown();
+    void brokerShutdown();
 
     broker::Broker& getBroker();
 
@@ -172,6 +167,7 @@
 
     size_t mcastId;
 
+  friend class ClusterHandler;
   friend class JoiningHandler;
   friend class MemberHandler;
 };

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.cpp?rev=698945&r1=698944&r2=698945&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.cpp Thu Sep 25 05:30:14 2008
@@ -19,11 +19,13 @@
  *
  */
 
-#include "qpid/framing/AllInvoker.h"
-
+#include "Cluster.h"
 #include "ClusterHandler.h"
+
 #include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/AllInvoker.h"
 #include "qpid/framing/FieldTable.h"
+#include "qpid/log/Statement.h"
 
 
 
@@ -38,6 +40,7 @@
     void update(const framing::FieldTable& members, uint64_t dumping) { handler.update(member, members, dumping); }
     void dumpRequest(const std::string& url) { handler.dumpRequest(member, url); }
     void ready(const std::string& url) { handler.ready(member, url); }
+    void shutdown() { handler.shutdown(member); }
 };
 
 ClusterHandler::~ClusterHandler() {}
@@ -49,5 +52,11 @@
     return framing::invoke(ops, *frame.getBody()).wasHandled(); 
 }
 
+void ClusterHandler::shutdown(const MemberId& id) {
+    QPID_LOG(notice, cluster.self << " received shutdown from " << id);
+    cluster.leave();
+}
+
+
 }} // namespace qpid::cluster
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.h?rev=698945&r1=698944&r2=698945&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.h Thu Sep 25 05:30:14 2008
@@ -51,6 +51,7 @@
     virtual void update(const MemberId&, const framing::FieldTable& members, uint64_t dumping) = 0;
     virtual void dumpRequest(const MemberId&, const std::string& url) = 0;
     virtual void ready(const MemberId&, const std::string& url) = 0;
+    virtual void shutdown(const MemberId&);
 
     virtual void deliver(Event& e) = 0; // Deliver a connection event.
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp?rev=698945&r1=698944&r2=698945&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp Thu Sep 25 05:30:14 2008
@@ -37,6 +37,7 @@
     cpg_address */*left*/, int nLeft,
     cpg_address */*joined*/, int /*nJoined*/)
 {
+    // FIXME aconway 2008-09-24: Called with lock held - volatile
     if (nLeft == 0 && nCurrent == 1 && *current == cluster.self) { // First in cluster.
         QPID_LOG(notice, cluster.self << " first in cluster.");
         cluster.map.ready(cluster.self, cluster.url);
@@ -53,9 +54,11 @@
 }
 
 void JoiningHandler::update(const MemberId&, const framing::FieldTable& members, uint64_t dumper) {
+    Mutex::ScopedLock l(cluster.lock);
     cluster.map.update(members, dumper);
     QPID_LOG(debug, "Cluster update: " << cluster.map);
     checkDumpRequest();
+    cluster.updateMemberStats();
 }
 
 void JoiningHandler::checkDumpRequest() {
@@ -67,6 +70,7 @@
 }
 
 void JoiningHandler::dumpRequest(const MemberId& dumpee, const std::string& ) {
+    Mutex::ScopedLock l(cluster.lock);
     if (cluster.map.dumper) {   // Already a dump in progress.
         if (dumpee == cluster.self && state == DUMP_REQUESTED)
             state = START;      // Need to make another request.
@@ -96,11 +100,13 @@
 }
 
 void JoiningHandler::ready(const MemberId& id, const std::string& url) {
+    Mutex::ScopedLock l(cluster.lock);
     cluster.map.ready(id, Url(url));
     checkDumpRequest();
 }
 
 void JoiningHandler::insert(const boost::intrusive_ptr<Connection>& c) {
+    Mutex::ScopedLock l(cluster.lock);
     if (c->isCatchUp()) {
         ++catchUpConnections;
         QPID_LOG(debug, "Catch-up connection " << *c << " started, total " << catchUpConnections);
@@ -109,6 +115,7 @@
 }
 
 void JoiningHandler::catchUpClosed(const boost::intrusive_ptr<Connection>& c) {
+    Mutex::ScopedLock l(cluster.lock);
     QPID_LOG(debug, "Catch-up complete for " << *c << ", remaining catch-ups: " << catchUpConnections-1);
     if (c->isShadow()) 
         cluster.connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c));
@@ -118,7 +125,7 @@
 
 void JoiningHandler::dumpComplete() {
     // FIXME aconway 2008-09-18: need to detect incomplete dump.
-    // 
+    // Called with lock  - volatile?
     if (state == STALLED) {
         QPID_LOG(debug, cluster.self << " received dump and stalled at start point, unstalling.");
         cluster.ready();
@@ -130,4 +137,5 @@
     }
 }
 
+
 }} // namespace qpid::cluster

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.cpp?rev=698945&r1=698944&r2=698945&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.cpp Thu Sep 25 05:30:14 2008
@@ -43,6 +43,7 @@
     cpg_address */*left*/, int /*nLeft*/,
     cpg_address */*joined*/, int nJoined)
 {
+    // FIXME aconway 2008-09-24: Called with lock held - volatile
     if (nJoined && cluster.map.sendUpdate(cluster.self))  // New members need update
         cluster.mcastControl(cluster.map.toControl(), 0);
 }
@@ -51,9 +52,13 @@
     cluster.connectionEventQueue.push(e);
 }
 
-void MemberHandler::update(const MemberId&, const framing::FieldTable& , uint64_t) {}
+void MemberHandler::update(const MemberId&, const framing::FieldTable& , uint64_t) {
+    Mutex::ScopedLock l(cluster.lock);
+    cluster.updateMemberStats();
+}
 
 void MemberHandler::dumpRequest(const MemberId& dumpee, const std::string& urlStr) {
+    Mutex::ScopedLock l(cluster.lock);
     if (cluster.map.dumper) return; // dump in progress, ignore request.
 
     cluster.map.dumper = cluster.map.first();
@@ -76,17 +81,18 @@
 
 
 void MemberHandler::dumpSent() {
-    QPID_LOG(debug, "Finished sending state dump.");
     Mutex::ScopedLock l(cluster.lock);
+    QPID_LOG(debug, "Finished sending state dump.");
     cluster.ready();
 }
 
 void MemberHandler::dumpError(const std::exception& e) {
-    QPID_LOG(error, "Error sending state dump from " << cluster.self << ": " << e.what());
+    QPID_LOG(error, cluster.self << " error sending state dump: " << e.what());
     dumpSent();
 }
 
 void MemberHandler::insert(const boost::intrusive_ptr<Connection>& c) {
+    Mutex::ScopedLock l(cluster.lock);
     if (c->isCatchUp())         // Not allowed in member mode
         c->getBrokerConnection().close(execution::ERROR_CODE_ILLEGAL_STATE, "Not in catch-up mode.");
     else
@@ -94,6 +100,7 @@
 }
 
 void MemberHandler::catchUpClosed(const boost::intrusive_ptr<Connection>& c) {
+    Mutex::ScopedLock l(cluster.lock);
     QPID_LOG(warning, "Catch-up connection " << c << " closed in member mode");
     assert(0);
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.h?rev=698945&r1=698944&r2=698945&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.h Thu Sep 25 05:30:14 2008
@@ -48,7 +48,7 @@
     void update(const MemberId&, const framing::FieldTable& members, uint64_t dumping);
     void dumpRequest(const MemberId&, const std::string& url);
     void ready(const MemberId&, const std::string& url);
-
+    
     void dumpSent();
     void dumpError(const std::exception&);
 

Modified: incubator/qpid/trunk/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=698945&r1=698944&r2=698945&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ incubator/qpid/trunk/qpid/cpp/xml/cluster.xml Thu Sep 25 05:30:14 2008
@@ -39,6 +39,8 @@
     <control name="ready" code="0x3" label="New member is ready.">
       <field name="url" type="str16" label="Url for brain dump."/>
     </control>
+
+    <control name="shutdown" code="0x4" label="Shut down cluster"/>
   </class>
 
   <!-- TODO aconway 2008-09-10: support for un-attached connections. -->