You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2010/02/05 19:17:58 UTC

svn commit: r907030 - in /qpid/trunk/qpid/cpp: src/qpid/cluster/ src/qpid/management/ xml/

Author: aconway
Date: Fri Feb  5 18:17:57 2010
New Revision: 907030

URL: http://svn.apache.org/viewvc?rev=907030&view=rev
Log:
Synchronize management agent lists during cluster update.

- replicate management agent lists during cluster update.
- suppress management agent output during update.
- on join all members force full output at next periodic processing.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h
    qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
    qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h
    qpid/trunk/qpid/cpp/xml/cluster.xml

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=907030&r1=907029&r2=907030&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Fri Feb  5 18:17:57 2010
@@ -590,6 +590,7 @@
         if (initMap.isUpdateNeeded())  { // Joining established cluster.
             broker.setRecovery(false); // Ditch my current store.
             broker.setClusterUpdatee(true);
+            if (mAgent) mAgent->suppress(true); // Suppress mgmt output during update.
             state = JOINER;
             mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self);
             QPID_LOG(notice, *this << " joining cluster " << name);
@@ -672,7 +673,7 @@
     assertClusterSafe();
     std::ostringstream msg;
     msg << prefix;
-    if (connection) msg << " " << *connection;
+    if (connection) msg << " " << connection->getId();
     msg << " snapshot " << map.getFrameSeq() << ":";
     AppendQueue append(msg);
     broker.getQueues().eachQueue(append);
@@ -761,7 +762,10 @@
                  << " to " << updatee);
         deliverEventQueue.start(); // Not involved in update.
     }
-    if (updatee != self && url) debugSnapshot("join");
+    if (updatee != self && url) {
+        debugSnapshot("join");
+        if (mAgent) mAgent->clusterUpdate();
+    }
 }
 
 static client::ConnectionSettings connectionSettings(const ClusterSettings& settings) {
@@ -830,9 +834,11 @@
         mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
         state = CATCHUP;
         broker.setClusterUpdatee(false);
+        if (mAgent) mAgent->suppress(false); // Enable management output.
         discarding = false;     // ok to set, we're stalled for update.
         QPID_LOG(notice, *this << " update complete, starting catch-up.");
         debugSnapshot("initial");
+        if (mAgent) mAgent->clusterUpdate();
         deliverEventQueue.start();
     }
     else if (updateRetracted) { // Update was retracted, request another update
@@ -992,10 +998,12 @@
 }
 
 void Cluster::timerWakeup(const MemberId& , const std::string& name, Lock&) {
+    QPID_LOG(debug, "Cluster timer wakeup " << map.getFrameSeq() << ": " << name)
     timer->deliverWakeup(name);
 }
 
 void Cluster::timerDrop(const MemberId& , const std::string& name, Lock&) {
+    QPID_LOG(debug, "Cluster timer drop " << map.getFrameSeq() << ": " << name)
     timer->deliverDrop(name);
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=907030&r1=907029&r2=907030&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Fri Feb  5 18:17:57 2010
@@ -197,7 +197,6 @@
 
 void Connection::deliverDoOutput(uint32_t limit) {
     output.deliverDoOutput(limit);
-    cluster.debugSnapshot("deliver-do-output", this);
 }
 
 // Called in delivery thread, in cluster order.
@@ -532,5 +531,14 @@
     agent->setBootSequence(bootSequence);
 }
 
+void Connection::managementAgents(const std::string& data) {
+    management::ManagementAgent* agent = cluster.getBroker().getManagementAgent();
+    if (!agent)
+        throw Exception(QPID_MSG("Management agents update but no management agent."));
+    framing::Buffer buf(const_cast<char*>(data.data()), data.size());
+    agent->importAgents(buf);
+    QPID_LOG(debug, cluster << " updated management agents");
+}
+
 }} // Namespace qpid::cluster
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=907030&r1=907029&r2=907030&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Fri Feb  5 18:17:57 2010
@@ -164,6 +164,7 @@
 
     void addQueueListener(const std::string& queue, uint32_t listener);
     void managementSchema(const std::string& data);
+    void managementAgents(const std::string& data);
     void managementSetupState(uint64_t objectNum, uint16_t bootSequence);
 
     uint32_t getSsf() const { return connectionCtor.ssf; }

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=907030&r1=907029&r2=907030&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Fri Feb  5 18:17:57 2010
@@ -53,6 +53,7 @@
 #include "qpid/framing/TypeCode.h"
 #include "qpid/log/Statement.h"
 #include "qpid/Url.h"
+#include "qmf/org/apache/qpid/broker/ManagementSetupState.h"
 #include <boost/bind.hpp>
 #include <boost/cast.hpp>
 #include <algorithm>
@@ -128,15 +129,7 @@
              << " at " << updateeUrl);
     Broker& b = updaterBroker;
 
-    //
-    // Bash the state of the slave into conformance with ours.  The
-    // goal here is to get his state arranged so as to mimic our
-    // state, w/r/t object ID creation.  Currently, that means that we
-    // propagate our boot seq and object UID counter to him so that
-    // subsequently created objects on his side will track what's on
-    // our side.
-    //
-    updateManagementSetupState(b);
+    updateManagementSetupState();
 
     b.getExchanges().eachExchange(boost::bind(&UpdateClient::updateExchange, this, _1));
     b.getQueues().eachQueue(boost::bind(&UpdateClient::updateNonExclusiveQueue, this, _1));
@@ -154,16 +147,8 @@
 
     ClusterConnectionProxy(session).expiryId(expiry.getId());
 
-    // FIXME aconway 2010-01-08: we should enforce that all cluster members 
-    // have mgmt enabled or none of them do.
-
-    management::ManagementAgent* agent = updaterBroker.getManagementAgent();
-    if (agent) {
-        string schemaData;
-        agent->exportSchemas(schemaData);
-        ClusterConnectionProxy(session).managementSchema(schemaData);
-    }
-
+    updateManagementAgent();
+    
     ClusterConnectionMembershipBody membership;
     map.toMethodBody(membership);
     AMQFrame frame(membership);
@@ -184,21 +169,41 @@
 }
 } // namespace
 
-//
-// Propagate the management setup state block, currently consisting of
-// object number counter and boot sequence counter, to the slave.
-//
-void UpdateClient::updateManagementSetupState(Broker & b)
+
+// Propagate the management state
+void UpdateClient::updateManagementSetupState()
 {
     management::ManagementAgent* agent = updaterBroker.getManagementAgent();
-    if (agent) {
-        qmf::org::apache::qpid::broker::ManagementSetupState mss(b.getManagementAgent(), 0);
-        mss.set_objectNum(b.getManagementAgent()->getNextObjectId());
-        mss.set_bootSequence(b.getManagementAgent()->getBootSequence());
-        QPID_LOG(debug, updaterId << " updating management-setup-state " << mss.get_objectNum() 
-                 << " " << mss.get_bootSequence() << "\n");
-        ClusterConnectionProxy(session).managementSetupState(mss.get_objectNum(), mss.get_bootSequence());
-    }
+    if (!agent) return;
+
+    //
+    // Bash the state of the slave into conformance with ours.  The
+    // goal here is to get his state arranged so as to mimic our
+    // state, w/r/t object ID creation.  Currently, that means that we
+    // propagate our boot seq and object UID counter to him so that
+    // subsequently created objects on his side will track what's on
+    // our side.
+    //
+    qmf::org::apache::qpid::broker::ManagementSetupState mss(agent, 0);
+    mss.set_objectNum(agent->getNextObjectId());
+    mss.set_bootSequence(agent->getBootSequence());
+    QPID_LOG(debug, updaterId << " updating management-setup-state "
+             << mss.get_objectNum()
+             << " " << mss.get_bootSequence() << "\n");
+    ClusterConnectionProxy(session).managementSetupState(
+        mss.get_objectNum(), mss.get_bootSequence());
+}
+
+void UpdateClient::updateManagementAgent()
+{
+    management::ManagementAgent* agent = updaterBroker.getManagementAgent();
+    if (!agent) return;
+    // Send management schemas and agents.
+    string data;
+    agent->exportSchemas(data);
+    ClusterConnectionProxy(session).managementSchema(data);
+    agent->exportAgents(data);
+    ClusterConnectionProxy(session).managementAgents(data);
 }
 
 void UpdateClient::updateExchange(const boost::shared_ptr<Exchange>& ex) {

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h?rev=907030&r1=907029&r2=907030&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h Fri Feb  5 18:17:57 2010
@@ -29,7 +29,6 @@
 #include "qpid/client/AsyncSession.h"
 #include "qpid/broker/SemanticState.h"
 #include "qpid/sys/Runnable.h"
-#include "qmf/org/apache/qpid/broker/ManagementSetupState.h"
 #include <boost/shared_ptr.hpp>
 
 
@@ -98,7 +97,8 @@
     void updateConsumer(const broker::SemanticState::ConsumerImpl::shared_ptr&);
     void updateQueueListeners(const boost::shared_ptr<broker::Queue>&);
     void updateQueueListener(std::string& q, const boost::shared_ptr<broker::Consumer>& c);
-    void updateManagementSetupState(broker::Broker & b);
+    void updateManagementSetupState();
+    void updateManagementAgent();
 
     Numbering<broker::SemanticState::ConsumerImpl::shared_ptr> consumerNumbering;
     MemberId updaterId;

Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=907030&r1=907029&r2=907030&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp Fri Feb  5 18:17:57 2010
@@ -32,6 +32,7 @@
 #include <list>
 #include <iostream>
 #include <fstream>
+#include <sstream>
 
 using boost::intrusive_ptr;
 using qpid::framing::Uuid;
@@ -53,7 +54,8 @@
 
 ManagementAgent::ManagementAgent () :
     threadPoolSize(1), interval(10), broker(0), timer(0),
-    startTime(uint64_t(Duration(now())))
+    startTime(uint64_t(Duration(now()))),
+    suppressed(false)
 {
     nextObjectId   = 1;
     brokerBank     = 1;
@@ -87,7 +89,7 @@
 }
 
 void ManagementAgent::configure(const string& _dataDir, uint16_t _interval,
-                                 qpid::broker::Broker* _broker, int _threads)
+                                qpid::broker::Broker* _broker, int _threads)
 {
     dataDir        = _dataDir;
     interval       = _interval;
@@ -151,16 +153,16 @@
 }
 
 void ManagementAgent::setExchange (qpid::broker::Exchange::shared_ptr _mexchange,
-                                    qpid::broker::Exchange::shared_ptr _dexchange)
+                                   qpid::broker::Exchange::shared_ptr _dexchange)
 {
     mExchange = _mexchange;
     dExchange = _dexchange;
 }
 
 void ManagementAgent::registerClass (const string&  packageName,
-                                      const string&  className,
-                                      uint8_t* md5Sum,
-                                      ManagementObject::writeSchemaCall_t schemaCall)
+                                     const string&  className,
+                                     uint8_t* md5Sum,
+                                     ManagementObject::writeSchemaCall_t schemaCall)
 {
     Mutex::ScopedLock lock(userLock);
     PackageMap::iterator pIter = findOrAddPackageLH(packageName);
@@ -168,9 +170,9 @@
 }
 
 void ManagementAgent::registerEvent (const string&  packageName,
-                                      const string&  eventName,
-                                      uint8_t* md5Sum,
-                                      ManagementObject::writeSchemaCall_t schemaCall)
+                                     const string&  eventName,
+                                     uint8_t* md5Sum,
+                                     ManagementObject::writeSchemaCall_t schemaCall)
 {
     Mutex::ScopedLock lock(userLock);
     PackageMap::iterator pIter = findOrAddPackageLH(packageName);
@@ -240,7 +242,7 @@
 ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds)
     : TimerTask (qpid::sys::Duration((_seconds ? _seconds : 1) * qpid::sys::TIME_SEC),
                  "ManagementAgent::periodicProcessing"),
-                 agent(_agent) {}
+      agent(_agent) {}
 
 ManagementAgent::Periodic::~Periodic () {}
 
@@ -271,6 +273,14 @@
     }
 }
 
+void ManagementAgent::clusterUpdate() {
+    // Called on all cluster memebesr when a new member joins a cluster.
+    // Set clientWasAdded so that on the next periodicProcessing we will do 
+    // a full update on all cluster members.
+    clientWasAdded = true;
+    debugSnapshot("update");
+}
+
 void ManagementAgent::encodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq)
 {
     buf.putOctet ('A');
@@ -293,12 +303,15 @@
 }
 
 void ManagementAgent::sendBuffer(Buffer&  buf,
-                                  uint32_t length,
-                                  qpid::broker::Exchange::shared_ptr exchange,
-                                  string   routingKey)
+                                 uint32_t length,
+                                 qpid::broker::Exchange::shared_ptr exchange,
+                                 string   routingKey)
 {
-    if (exchange.get() == 0)
+    if (suppressed) {
+        QPID_LOG(trace, "Suppressed management message to " << routingKey);
         return;
+    }
+    if (exchange.get() == 0) return;
 
     intrusive_ptr<Message> msg(new Message());
     AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange->getName (), 0, 0)));
@@ -341,7 +354,7 @@
 #define BUFSIZE   65536
 #define HEADROOM  4096
     QPID_LOG(trace, "Management agent periodic processing")
-    Mutex::ScopedLock lock (userLock);
+        Mutex::ScopedLock lock (userLock);
     char                msgChars[BUFSIZE];
     uint32_t            contentSize;
     string              routingKey;
@@ -452,6 +465,7 @@
         sendBuffer (msgBuffer, contentSize, mExchange, routingKey);
         QPID_LOG(trace, "SEND HeartbeatInd to=" << routingKey);
     }
+    debugSnapshot("periodic");
 }
 
 void ManagementAgent::deleteObjectNowLH(const ObjectId& oid)
@@ -481,7 +495,7 @@
 }
 
 void ManagementAgent::sendCommandComplete (string replyToKey, uint32_t sequence,
-                                            uint32_t code, string text)
+                                           uint32_t code, string text)
 {
     Buffer   outBuffer (outputBuffer, MA_BUFFER_SIZE);
     uint32_t outLen;
@@ -497,8 +511,8 @@
 }
 
 bool ManagementAgent::dispatchCommand (Deliverable&      deliverable,
-                                        const string&     routingKey,
-                                        const FieldTable* /*args*/)
+                                       const string&     routingKey,
+                                       const FieldTable* /*args*/)
 {
     Mutex::ScopedLock lock (userLock);
     Message&  msg = ((DeliverableMessage&) deliverable).getMessage ();
@@ -533,7 +547,7 @@
 }
 
 void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey,
-                                              uint32_t sequence, const ConnectionToken* connToken)
+                                             uint32_t sequence, const ConnectionToken* connToken)
 {
     string   methodName;
     string   packageName;
@@ -562,7 +576,7 @@
         outBuffer.reset();
         sendBuffer(outBuffer, outLen, dExchange, replyToKey);
         QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << sequence)
-        return;
+            return;
     }
 
     if (acl != 0) {
@@ -578,7 +592,7 @@
             outBuffer.reset();
             sendBuffer(outBuffer, outLen, dExchange, replyToKey);
             QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence)
-            return;
+                return;
         }
     }
 
@@ -917,7 +931,6 @@
     agent->mgmtObject->set_brokerBank   (brokerBank);
     agent->mgmtObject->set_agentBank    (assignedBank);
     addObject (agent->mgmtObject, 0, true);
-
     remoteAgents[connectionRef] = agent;
 
     QPID_LOG(trace, "Remote Agent registered bank=[" << brokerBank << "." << assignedBank << "] replyTo=" << replyToKey);
@@ -1062,7 +1075,7 @@
             outBuffer.reset();
             sendBuffer(outBuffer, outLen, dExchange, replyToKey);
             QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence)
-        }
+                }
 
         return false;
     }
@@ -1135,7 +1148,7 @@
     sendBuffer (outBuffer, outLen, mExchange, "schema.package");
     QPID_LOG(trace, "SEND PackageInd package=" << name << " to=schema.package")
 
-    return result.first;
+        return result.first;
 }
 
 void ManagementAgent::addClassLH(uint8_t               kind,
@@ -1366,3 +1379,69 @@
     }
 }
 
+void ManagementAgent::RemoteAgent::encode(qpid::framing::Buffer& outBuf) const {
+    outBuf.checkAvailable(encodedSize());
+    outBuf.putLong(brokerBank);
+    outBuf.putLong(agentBank);
+    outBuf.putShortString(routingKey);
+    connectionRef.encode(outBuf);
+    mgmtObject->writeProperties(outBuf);
+}
+
+void ManagementAgent::RemoteAgent::decode(qpid::framing::Buffer& inBuf) {
+    brokerBank = inBuf.getLong();
+    agentBank = inBuf.getLong();
+    inBuf.getShortString(routingKey);
+    connectionRef.decode(inBuf);
+    mgmtObject = new _qmf::Agent(&agent, this);
+    mgmtObject->readProperties(inBuf);
+    agent.addObject(mgmtObject, 0, true);
+}
+
+uint32_t ManagementAgent::RemoteAgent::encodedSize() const {
+    return sizeof(uint32_t) + sizeof(uint32_t) // 2 x Long
+        + routingKey.size() + sizeof(uint8_t) // ShortString
+        + connectionRef.encodedSize()
+        + mgmtObject->writePropertiesSize();
+}
+
+void ManagementAgent::exportAgents(std::string& out) {
+    out.clear();
+    for (RemoteAgentMap::const_iterator i = remoteAgents.begin();
+         i != remoteAgents.end();
+         ++i)
+    {
+        ObjectId id = i->first;
+        RemoteAgent* agent = i->second;
+        size_t encodedSize = id.encodedSize() + agent->encodedSize();
+        size_t end = out.size();
+        out.resize(end + encodedSize);
+        framing::Buffer outBuf(&out[end], encodedSize);
+        id.encode(outBuf);
+        agent->encode(outBuf);
+    }
+}
+
+void ManagementAgent::importAgents(qpid::framing::Buffer& inBuf) {
+    while (inBuf.available()) {
+        ObjectId id;
+        inBuf.checkAvailable(id.encodedSize());
+        id.decode(inBuf);
+        std::auto_ptr<RemoteAgent> agent(new RemoteAgent(*this));
+        agent->decode(inBuf);
+        addObject (agent->mgmtObject, 0, false);
+        remoteAgents[agent->connectionRef] = agent.release();
+    }
+}
+
+void ManagementAgent::debugSnapshot(const char* type) {
+    std::ostringstream msg;
+    msg << type << " snapshot, agents:";
+    for (RemoteAgentMap::const_iterator i=remoteAgents.begin();
+         i != remoteAgents.end(); ++i)
+        msg << " " << i->second->routingKey;
+    msg << " packages: " << packages.size();
+    msg << " objects: " << managementObjects.size();
+    msg << " new objects: " << newManagementObjects.size();
+    QPID_LOG(trace, msg.str());
+}

Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h?rev=907030&r1=907029&r2=907030&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h Fri Feb  5 18:17:57 2010
@@ -71,6 +71,9 @@
     /** Called after plugins are initialized. */
     void pluginsInitialized();
 
+    /** Called by cluster to suppress management output during update. */
+    void suppress(bool s) { suppressed = s; }
+
     void setInterval     (uint16_t _interval) { interval = _interval; }
     void setExchange     (qpid::broker::Exchange::shared_ptr mgmtExchange,
                           qpid::broker::Exchange::shared_ptr directExchange);
@@ -90,6 +93,8 @@
                                        severity_t severity = SEV_DEFAULT);
     QPID_BROKER_EXTERN void clientAdded     (const std::string& routingKey);
 
+    QPID_BROKER_EXTERN void clusterUpdate();
+
     bool dispatchCommand (qpid::broker::Deliverable&       msg,
                           const std::string&         routingKey,
                           const framing::FieldTable* args);
@@ -105,9 +110,15 @@
     /** Serialize my schemas as a binary blob into schemaOut */
     void exportSchemas(std::string& schemaOut);
 
+    /** Serialize my remote-agent map as a binary blob into agentsOut */
+    void exportAgents(std::string& agentsOut);
+
     /** Decode a serialized schemas and add to my schema cache */
     void importSchemas(framing::Buffer& inBuf);
 
+    /** Decode a serialized agent map */
+    void importAgents(framing::Buffer& inBuf);
+
     // these are in support of the managementSetup-state stuff, for synch'ing clustered brokers
     uint64_t getNextObjectId(void) { return nextObjectId; }
     void setNextObjectId(uint64_t o) { nextObjectId = o; }
@@ -136,9 +147,13 @@
         std::string       routingKey;
         ObjectId          connectionRef;
         qmf::org::apache::qpid::broker::Agent*    mgmtObject;
-        RemoteAgent(ManagementAgent& _agent) : agent(_agent) {}
+        RemoteAgent(ManagementAgent& _agent) : agent(_agent), mgmtObject(0) {}
         ManagementObject* GetManagementObject (void) const { return mgmtObject; }
+
         virtual ~RemoteAgent ();
+        void encode(framing::Buffer& buffer) const;
+        void decode(framing::Buffer& buffer);
+        uint32_t encodedSize() const;
     };
 
     // TODO: Eventually replace string with entire reply-to structure.  reply-to
@@ -205,9 +220,6 @@
     ManagementObjectMap          managementObjects;
     ManagementObjectMap          newManagementObjects;
 
-    static ManagementAgent*      agent;
-    static bool                  enabled;
-
     framing::Uuid                uuid;
     sys::Mutex                   addLock;
     sys::Mutex                   userLock;
@@ -224,6 +236,7 @@
     uint32_t                     nextRequestSequence;
     bool                         clientWasAdded;
     const uint64_t               startTime;
+    bool                         suppressed;
 
     std::auto_ptr<IdAllocator> allocator;
 
@@ -282,6 +295,7 @@
     size_t validateSchema(framing::Buffer&, uint8_t kind);
     size_t validateTableSchema(framing::Buffer&);
     size_t validateEventSchema(framing::Buffer&);
+    void debugSnapshot(const char*);
 };
 
 }}

Modified: qpid/trunk/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=907030&r1=907029&r2=907030&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ qpid/trunk/qpid/cpp/xml/cluster.xml Fri Feb  5 18:17:57 2010
@@ -248,6 +248,9 @@
       <field name="bootSequence" type="uint16"/>
     </control>
 
-
+    <!-- Replicate management agent's remote-agent map -->
+    <control name="management-agents" code="0x37">
+      <field name="data" type="vbin32"/>
+    </control>
   </class>
 </amqp>



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org