You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ns...@apache.org on 2008/03/25 21:30:04 UTC

svn commit: r640970 - in /incubator/qpid/trunk/qpid: cpp/managementgen/templates/ cpp/src/qpid/broker/ cpp/src/qpid/management/ python/mgmt-cli/ python/qpid/ specs/

Author: nsantos
Date: Tue Mar 25 13:30:01 2008
New Revision: 640970

URL: http://svn.apache.org/viewvc?rev=640970&view=rev
Log:
QPID-877: applied patch from Ted Ross

Modified:
    incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.cpp
    incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h
    incubator/qpid/trunk/qpid/python/mgmt-cli/managementdata.py
    incubator/qpid/trunk/qpid/python/qpid/management.py
    incubator/qpid/trunk/qpid/specs/management-schema.xml
    incubator/qpid/trunk/qpid/specs/management-types.xml

Modified: incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.cpp?rev=640970&r1=640969&r2=640970&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.cpp Tue Mar 25 13:30:01 2008
@@ -106,11 +106,12 @@
 /*MGEN:Class.WriteConfig*/
 }
 
-void /*MGEN:Class.NameCap*/::writeInstrumentation (Buffer& buf)
+void /*MGEN:Class.NameCap*/::writeInstrumentation (Buffer& buf, bool skipHeaders)
 {
     instChanged = false;
 
-    writeTimestamps (buf);
+    if (!skipHeaders)
+        writeTimestamps (buf);
 /*MGEN:Class.WriteInst*/
 
     // Maintenance of hi-lo statistics

Modified: incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.h?rev=640970&r1=640969&r2=640970&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.h (original)
+++ incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.h Tue Mar 25 13:30:01 2008
@@ -25,8 +25,9 @@
 
 #include "qpid/sys/Mutex.h"
 #include "qpid/management/ManagementObject.h"
+#include "qpid/framing/Uuid.h"
 
-namespace qpid { 
+namespace qpid {
 namespace management {
 
 class /*MGEN:Class.NameCap*/ : public ManagementObject
@@ -45,7 +46,8 @@
     // Private Methods
     static void writeSchema   (qpid::framing::Buffer& buf);
     void writeConfig          (qpid::framing::Buffer& buf);
-    void writeInstrumentation (qpid::framing::Buffer& buf);
+    void writeInstrumentation (qpid::framing::Buffer& buf,
+                               bool skipHeaders = false);
     void doMethod             (std::string            methodName,
                                qpid::framing::Buffer& inBuf,
                                qpid::framing::Buffer& outBuf);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=640970&r1=640969&r2=640970&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Tue Mar 25 13:30:01 2008
@@ -127,7 +127,8 @@
     dtxManager.setStore (store);
 
     if(conf.enableMgmt){
-        ManagementAgent::enableManagement ();
+        ManagementAgent::enableManagement (dataDir.isEnabled () ? dataDir.getPath () : string (),
+                                           conf.mgmtPubInterval);
         managementAgent = ManagementAgent::getAgent ();
         managementAgent->setInterval (conf.mgmtPubInterval);
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=640970&r1=640969&r2=640970&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp Tue Mar 25 13:30:01 2008
@@ -25,6 +25,8 @@
 #include <qpid/broker/Message.h>
 #include <qpid/broker/MessageDelivery.h>
 #include <list>
+#include <iostream>
+#include <fstream>
 
 using boost::intrusive_ptr;
 using namespace qpid::framing;
@@ -36,25 +38,62 @@
 ManagementAgent::shared_ptr ManagementAgent::agent;
 bool                        ManagementAgent::enabled = 0;
 
-ManagementAgent::ManagementAgent (uint16_t _interval) : interval (_interval)
+ManagementAgent::ManagementAgent (string _dataDir, uint16_t _interval) :
+    dataDir (_dataDir), interval (_interval)
 {
     timer.add (intrusive_ptr<TimerTask> (new Periodic(*this, interval)));
     nextObjectId = uint64_t (qpid::sys::Duration (qpid::sys::now ()));
     nextRemotePrefix = 101;
+
+    // Get from file or generate and save to file.
+    if (dataDir.empty ())
+    {
+        uuid.generate ();
+        QPID_LOG (info, "ManagementAgent has no data directory, generated new broker ID: "
+                  << uuid);
+    }
+    else
+    {
+        string   filename (dataDir + "/brokerId");
+        ifstream inFile   (filename.c_str ());
+
+        if (inFile.good ())
+        {
+            inFile >> uuid;
+            inFile.close ();
+            QPID_LOG (debug, "ManagementAgent restored broker ID: " << uuid);
+        }
+        else
+        {
+            uuid.generate ();
+            QPID_LOG (info, "ManagementAgent generated broker ID: " << uuid);
+
+            ofstream outFile (filename.c_str ());
+            if (outFile.good ())
+            {
+                outFile << uuid << endl;
+                outFile.close ();
+                QPID_LOG (debug, "ManagementAgent saved broker ID");
+            }
+            else
+            {
+                QPID_LOG (warning, "ManagementAgent unable to save broker ID");
+            }
+        }
+    }
 }
 
 ManagementAgent::~ManagementAgent () {}
 
-void ManagementAgent::enableManagement (void)
+void ManagementAgent::enableManagement (string dataDir, uint16_t interval)
 {
     enabled = 1;
+    if (agent.get () == 0)
+        agent = shared_ptr (new ManagementAgent (dataDir, interval));
 }
 
 ManagementAgent::shared_ptr ManagementAgent::getAgent (void)
 {
-    if (enabled && agent.get () == 0)
-        agent = shared_ptr (new ManagementAgent (10));
-
     return agent;
 }
 
@@ -122,27 +161,25 @@
     }
 }
 
-void ManagementAgent::EncodeHeader (Buffer& buf, uint8_t opcode, uint8_t cls)
+void ManagementAgent::EncodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq)
 {
     buf.putOctet ('A');
     buf.putOctet ('M');
-    buf.putOctet ('0');
     buf.putOctet ('1');
     buf.putOctet (opcode);
-    buf.putOctet (cls);
+    buf.putLong  (seq);
 }
 
-bool ManagementAgent::CheckHeader (Buffer& buf, uint8_t *opcode, uint8_t *cls)
+bool ManagementAgent::CheckHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq)
 {
     uint8_t h1 = buf.getOctet ();
     uint8_t h2 = buf.getOctet ();
     uint8_t h3 = buf.getOctet ();
-    uint8_t h4 = buf.getOctet ();
 
     *opcode = buf.getOctet ();
-    *cls    = buf.getOctet ();
+    *seq    = buf.getLong  ();
 
-    return h1 == 'A' && h2 == 'M' && h3 == '0' && h4 == '1';
+    return h1 == 'A' && h2 == 'M' && h3 == '1';
 }
 
 void ManagementAgent::SendBuffer (Buffer&  buf,
@@ -199,24 +236,24 @@
         if (object->getConfigChanged () || object->isDeleted ())
         {
             Buffer msgBuffer (msgChars, BUFSIZE);
-            EncodeHeader (msgBuffer, 'C', 'C');
+            EncodeHeader (msgBuffer, 'c');
             object->writeConfig (msgBuffer);
 
             contentSize = BUFSIZE - msgBuffer.available ();
             msgBuffer.reset ();
-            routingKey = "mgmt.config." + object->getClassName ();
+            routingKey = "mgmt." + uuid.str() + ".config." + object->getClassName ();
             SendBuffer (msgBuffer, contentSize, mExchange, routingKey);
         }
         
         if (object->getInstChanged ())
         {
             Buffer msgBuffer (msgChars, BUFSIZE);
-            EncodeHeader (msgBuffer, 'C', 'I');
+            EncodeHeader (msgBuffer, 'i');
             object->writeInstrumentation (msgBuffer);
 
             contentSize = BUFSIZE - msgBuffer.available ();
             msgBuffer.reset ();
-            routingKey = "mgmt.inst." + object->getClassName ();
+            routingKey = "mgmt." + uuid.str () + ".inst." + object->getClassName ();
             SendBuffer (msgBuffer, contentSize, mExchange, routingKey);
         }
 
@@ -233,6 +270,20 @@
     deleteList.clear ();
 }
 
+void ManagementAgent::sendCommandComplete (string replyToKey, uint32_t sequence,
+                                           uint32_t code, string text)
+{
+    Buffer   outBuffer (outputBuffer, MA_BUFFER_SIZE);
+    uint32_t outLen;
+
+    EncodeHeader (outBuffer, 'z', sequence);
+    outBuffer.putLong (code);
+    outBuffer.putShortString (text);
+    outLen = MA_BUFFER_SIZE - outBuffer.available ();
+    outBuffer.reset ();
+    SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+}
+
 void ManagementAgent::dispatchCommand (Deliverable&      deliverable,
                                        const string&     routingKey,
                                        const FieldTable* /*args*/)
@@ -295,13 +346,13 @@
 
     Buffer   inBuffer  (inputBuffer,  MA_BUFFER_SIZE);
     Buffer   outBuffer (outputBuffer, MA_BUFFER_SIZE);
-    uint32_t outLen;
-    uint8_t  opcode, unused;
+    uint32_t outLen, sequence;
+    uint8_t  opcode;
 
     msg.encodeContent (inBuffer);
     inBuffer.reset ();
 
-    if (!CheckHeader (inBuffer, &opcode, &unused))
+    if (!CheckHeader (inBuffer, &opcode, &sequence))
     {
         QPID_LOG (debug, "    Invalid content header");
         return;
@@ -313,8 +364,7 @@
         return;
     }
 
-    uint32_t   methodId = inBuffer.getLong     ();
-    uint64_t   objId    = inBuffer.getLongLong ();
+    uint64_t   objId = inBuffer.getLongLong ();
     string     replyToKey;
 
     const framing::MessageProperties* p =
@@ -330,8 +380,7 @@
         return;
     }
 
-    EncodeHeader (outBuffer, 'm');
-    outBuffer.putLong (methodId);
+    EncodeHeader (outBuffer, 'm', sequence);
 
     ManagementObjectMap::iterator iter = managementObjects.find (objId);
     if (iter == managementObjects.end ())
@@ -349,22 +398,20 @@
     SendBuffer (outBuffer, outLen, dExchange, replyToKey);
 }
 
-void ManagementAgent::handleHello (Buffer&, string replyToKey)
+void ManagementAgent::handleBrokerRequest (Buffer&, string replyToKey, uint32_t sequence)
 {
     Buffer   outBuffer (outputBuffer, MA_BUFFER_SIZE);
     uint32_t outLen;
 
-    uint8_t* dat = (uint8_t*) "Broker ID";
-    EncodeHeader (outBuffer, 'I');
-    outBuffer.putShort (9);
-    outBuffer.putRawData (dat, 9);
+    EncodeHeader (outBuffer, 'b', sequence);
+    uuid.encode  (outBuffer);
 
     outLen = MA_BUFFER_SIZE - outBuffer.available ();
     outBuffer.reset ();
     SendBuffer (outBuffer, outLen, dExchange, replyToKey);
 }
 
-void ManagementAgent::handlePackageQuery (Buffer&, string replyToKey)
+void ManagementAgent::handlePackageQuery (Buffer&, string replyToKey, uint32_t sequence)
 {
     for (PackageMap::iterator pIter = packages.begin ();
          pIter != packages.end ();
@@ -373,15 +420,17 @@
         Buffer   outBuffer (outputBuffer, MA_BUFFER_SIZE);
         uint32_t outLen;
 
-        EncodeHeader (outBuffer, 'p');
+        EncodeHeader (outBuffer, 'p', sequence);
         EncodePackageIndication (outBuffer, pIter);
         outLen = MA_BUFFER_SIZE - outBuffer.available ();
         outBuffer.reset ();
         SendBuffer (outBuffer, outLen, dExchange, replyToKey);
     }
+
+    sendCommandComplete (replyToKey, sequence);
 }
 
-void ManagementAgent::handlePackageInd (Buffer& inBuffer, string /*replyToKey*/)
+void ManagementAgent::handlePackageInd (Buffer& inBuffer, string /*replyToKey*/, uint32_t /*sequence*/)
 {
     std::string packageName;
 
@@ -389,7 +438,7 @@
     FindOrAddPackage (packageName);
 }
 
-void ManagementAgent::handleClassQuery (Buffer& inBuffer, string replyToKey)
+void ManagementAgent::handleClassQuery (Buffer& inBuffer, string replyToKey, uint32_t sequence)
 {
     std::string packageName;
 
@@ -405,16 +454,18 @@
             Buffer   outBuffer (outputBuffer, MA_BUFFER_SIZE);
             uint32_t outLen;
 
-            EncodeHeader (outBuffer, 'q');
+            EncodeHeader (outBuffer, 'q', sequence);
             EncodeClassIndication (outBuffer, pIter, cIter);
             outLen = MA_BUFFER_SIZE - outBuffer.available ();
             outBuffer.reset ();
             SendBuffer (outBuffer, outLen, dExchange, replyToKey);
         }
     }
+
+    sendCommandComplete (replyToKey, sequence);
 }
 
-void ManagementAgent::handleSchemaQuery (Buffer& inBuffer, string replyToKey)
+void ManagementAgent::handleSchemaQuery (Buffer& inBuffer, string replyToKey, uint32_t sequence)
 {
     string         packageName;
     SchemaClassKey key;
@@ -436,7 +487,7 @@
 
             if (classInfo.writeSchemaCall != 0)
             {
-                EncodeHeader (outBuffer, 's');
+                EncodeHeader (outBuffer, 's', sequence);
                 classInfo.writeSchemaCall (outBuffer);
                 outLen = MA_BUFFER_SIZE - outBuffer.available ();
                 outBuffer.reset ();
@@ -459,7 +510,7 @@
     return nextRemotePrefix++;
 }
 
-void ManagementAgent::handleAttachRequest (Buffer& inBuffer, string replyToKey)
+void ManagementAgent::handleAttachRequest (Buffer& inBuffer, string replyToKey, uint32_t sequence)
 {
     string   label;
     uint32_t requestedPrefix;
@@ -472,17 +523,55 @@
     Buffer   outBuffer (outputBuffer, MA_BUFFER_SIZE);
     uint32_t outLen;
 
-    EncodeHeader (outBuffer, 'a');
+    EncodeHeader (outBuffer, 'a', sequence);
     outBuffer.putLong (assignedPrefix);
     outLen = MA_BUFFER_SIZE - outBuffer.available ();
     outBuffer.reset ();
     SendBuffer (outBuffer, outLen, dExchange, replyToKey);
 }
 
+void ManagementAgent::handleGetRequest (Buffer& inBuffer, string replyToKey, uint32_t sequence)
+{
+    FieldTable           ft;
+    FieldTable::ValuePtr value;
+
+    ft.decode (inBuffer);
+    value = ft.get ("_class");
+    if (value->empty () || !value->convertsTo<string> ())
+    {
+        // TODO: Send completion with an error code
+        return;
+    }
+
+    string className (value->get<string> ());
+
+    for (ManagementObjectMap::iterator iter = managementObjects.begin ();
+         iter != managementObjects.end ();
+         iter++)
+    {
+        ManagementObject::shared_ptr object = iter->second;
+        if (object->getClassName () == className)
+        {
+            Buffer   outBuffer (outputBuffer, MA_BUFFER_SIZE);
+            uint32_t outLen;
+
+            EncodeHeader (outBuffer, 'g', sequence);
+            object->writeConfig (outBuffer);
+            object->writeInstrumentation (outBuffer, true);
+            outLen = MA_BUFFER_SIZE - outBuffer.available ();
+            outBuffer.reset ();
+            SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+        }
+    }
+
+    sendCommandComplete (replyToKey, sequence);
+}
+
 void ManagementAgent::dispatchAgentCommand (Message& msg)
 {
     Buffer   inBuffer (inputBuffer,  MA_BUFFER_SIZE);
-    uint8_t  opcode, unused;
+    uint8_t  opcode;
+    uint32_t sequence;
     string   replyToKey;
 
     const framing::MessageProperties* p =
@@ -498,15 +587,16 @@
     msg.encodeContent (inBuffer);
     inBuffer.reset ();
 
-    if (!CheckHeader (inBuffer, &opcode, &unused))
+    if (!CheckHeader (inBuffer, &opcode, &sequence))
         return;
 
-    if      (opcode == 'H') handleHello         (inBuffer, replyToKey);
-    else if (opcode == 'P') handlePackageQuery  (inBuffer, replyToKey);
-    else if (opcode == 'p') handlePackageInd    (inBuffer, replyToKey);
-    else if (opcode == 'Q') handleClassQuery    (inBuffer, replyToKey);
-    else if (opcode == 'S') handleSchemaQuery   (inBuffer, replyToKey);
-    else if (opcode == 'A') handleAttachRequest (inBuffer, replyToKey);
+    if      (opcode == 'B') handleBrokerRequest (inBuffer, replyToKey, sequence);
+    else if (opcode == 'P') handlePackageQuery  (inBuffer, replyToKey, sequence);
+    else if (opcode == 'p') handlePackageInd    (inBuffer, replyToKey, sequence);
+    else if (opcode == 'Q') handleClassQuery    (inBuffer, replyToKey, sequence);
+    else if (opcode == 'S') handleSchemaQuery   (inBuffer, replyToKey, sequence);
+    else if (opcode == 'A') handleAttachRequest (inBuffer, replyToKey, sequence);
+    else if (opcode == 'G') handleGetRequest    (inBuffer, replyToKey, sequence);
 }
 
 ManagementAgent::PackageMap::iterator ManagementAgent::FindOrAddPackage (std::string name)
@@ -528,7 +618,7 @@
     EncodePackageIndication (outBuffer, result.first);
     outLen = MA_BUFFER_SIZE - outBuffer.available ();
     outBuffer.reset ();
-    SendBuffer (outBuffer, outLen, mExchange, "mgmt.schema.package");
+    SendBuffer (outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema.package");
 
     return result.first;
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h?rev=640970&r1=640969&r2=640970&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h Tue Mar 25 13:30:01 2008
@@ -25,19 +25,20 @@
 #include "qpid/Options.h"
 #include "qpid/broker/Exchange.h"
 #include "qpid/broker/Timer.h"
+#include "qpid/framing/Uuid.h"
 #include "qpid/sys/Mutex.h"
 #include "ManagementObject.h"
 #include <qpid/framing/AMQFrame.h>
 #include <boost/shared_ptr.hpp>
 
-namespace qpid { 
+namespace qpid {
 namespace management {
 
 class ManagementAgent
 {
   private:
 
-    ManagementAgent (uint16_t interval);
+    ManagementAgent (std::string dataDir, uint16_t interval);
 
   public:
 
@@ -45,7 +46,7 @@
 
     typedef boost::shared_ptr<ManagementAgent> shared_ptr;
 
-    static void       enableManagement (void);
+    static void       enableManagement (std::string dataDir, uint16_t interval);
     static shared_ptr getAgent (void);
     static void       shutdown (void);
 
@@ -130,10 +131,12 @@
     static shared_ptr            agent;
     static bool                  enabled;
 
+    qpid::framing::Uuid          uuid;
     qpid::sys::RWlock            userLock;
     broker::Timer                timer;
     broker::Exchange::shared_ptr mExchange;
     broker::Exchange::shared_ptr dExchange;
+    std::string                  dataDir;
     uint16_t                     interval;
     uint64_t                     nextObjectId;
     uint32_t                     nextRemotePrefix;
@@ -143,8 +146,8 @@
     char outputBuffer[MA_BUFFER_SIZE];
 
     void PeriodicProcessing (void);
-    void EncodeHeader       (qpid::framing::Buffer& buf, uint8_t  opcode, uint8_t  cls = 0);
-    bool CheckHeader        (qpid::framing::Buffer& buf, uint8_t *opcode, uint8_t *cls);
+    void EncodeHeader       (qpid::framing::Buffer& buf, uint8_t  opcode, uint32_t  seq = 0);
+    bool CheckHeader        (qpid::framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
     void SendBuffer         (qpid::framing::Buffer&       buf,
                              uint32_t                     length,
                              broker::Exchange::shared_ptr exchange,
@@ -164,16 +167,17 @@
                                 PackageMap::iterator   pIter,
                                 ClassMap::iterator     cIter);
     uint32_t assignPrefix (uint32_t requestedPrefix);
-    void handleHello         (qpid::framing::Buffer& inBuffer, std::string replyToKey);
-    void handlePackageQuery  (qpid::framing::Buffer& inBuffer, std::string replyToKey);
-    void handlePackageInd    (qpid::framing::Buffer& inBuffer, std::string replyToKey);
-    void handleClassQuery    (qpid::framing::Buffer& inBuffer, std::string replyToKey);
-    void handleSchemaQuery   (qpid::framing::Buffer& inBuffer, std::string replyToKey);
-    void handleAttachRequest (qpid::framing::Buffer& inBuffer, std::string replyToKey);
+    void sendCommandComplete (std::string replyToKey, uint32_t sequence,
+                              uint32_t code = 0, std::string text = std::string("OK"));
+    void handleBrokerRequest (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+    void handlePackageQuery  (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+    void handlePackageInd    (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+    void handleClassQuery    (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+    void handleSchemaQuery   (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+    void handleAttachRequest (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+    void handleGetRequest    (qpid::framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
 };
 
 }}
             
-
-
 #endif  /*!_ManagementAgent_*/

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h?rev=640970&r1=640969&r2=640970&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h Tue Mar 25 13:30:01 2008
@@ -57,6 +57,7 @@
     static const uint8_t TYPE_BOOL      = 11;
     static const uint8_t TYPE_FLOAT     = 12;
     static const uint8_t TYPE_DOUBLE    = 13;
+    static const uint8_t TYPE_UUID      = 14;
 
     static const uint8_t ACCESS_RC = 1;
     static const uint8_t ACCESS_RW = 2;
@@ -85,7 +86,8 @@
     virtual writeSchemaCall_t getWriteSchemaCall (void) = 0;
     virtual bool firstInstance        (void) = 0;
     virtual void writeConfig          (qpid::framing::Buffer& buf) = 0;
-    virtual void writeInstrumentation (qpid::framing::Buffer& buf) = 0;
+    virtual void writeInstrumentation (qpid::framing::Buffer& buf,
+                                       bool skipHeaders = false) = 0;
     virtual void doMethod             (std::string            methodName,
                                        qpid::framing::Buffer& inBuf,
                                        qpid::framing::Buffer& outBuf) = 0;

Modified: incubator/qpid/trunk/qpid/python/mgmt-cli/managementdata.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/mgmt-cli/managementdata.py?rev=640970&r1=640969&r2=640970&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/mgmt-cli/managementdata.py (original)
+++ incubator/qpid/trunk/qpid/python/mgmt-cli/managementdata.py Tue Mar 25 13:30:01 2008
@@ -111,6 +111,10 @@
     finally:
       self.lock.release ()
 
+  def ctrlHandler (self, context, op, data):
+    if op == self.mclient.CTRL_BROKER_INFO:
+      pass
+
   def configHandler (self, context, className, list, timestamps):
     self.dataHandler (0, className, list, timestamps);
 
@@ -149,7 +153,7 @@
     self.client.start ({"LOGIN": username, "PASSWORD": password})
     self.channel = self.client.channel (1)
 
-    self.mclient = managementClient (self.spec, None, self.configHandler,
+    self.mclient = managementClient (self.spec, self.ctrlHandler, self.configHandler,
                                      self.instHandler, self.methodReply)
     self.mclient.schemaListener (self.schemaHandler)
     self.mch = managementChannel (self.channel, self.mclient.topicCb, self.mclient.replyCb)
@@ -194,6 +198,8 @@
               return "False"
             else:
               return "True"
+          elif typecode == 14:
+            return str (UUID (bytes=value))
     return "*type-error*"
 
   def getObjIndex (self, className, config):

Modified: incubator/qpid/trunk/qpid/python/qpid/management.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/management.py?rev=640970&r1=640969&r2=640970&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/management.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/management.py Tue Mar 25 13:30:01 2008
@@ -69,12 +69,14 @@
     opens a session and performs all of the declarations and bindings needed
     to participate in the management protocol. """
     response         = ch.session_open (detached_lifetime=300)
+    self.sessionId   = response.session_id
     self.topicName   = "mgmt-"  + base64.urlsafe_b64encode (response.session_id)
     self.replyName   = "reply-" + base64.urlsafe_b64encode (response.session_id)
     self.qpidChannel = ch
     self.tcb         = topicCb
     self.rcb         = replyCb
     self.context     = cbContext
+    self.reqsOutstanding = 0
 
     ch.queue_declare (queue=self.topicName, exclusive=1, auto_delete=1)
     ch.queue_declare (queue=self.replyName, exclusive=1, auto_delete=1)
@@ -114,6 +116,10 @@
   network.  It implements the management protocol and manages the management
   schemas as advertised by the various management agents in the network. """
 
+  CTRL_BROKER_INFO   = 1
+  CTRL_SCHEMA_LOADED = 2
+  CTRL_USER          = 3
+
   #========================================================
   # User API - interacts with the class's user
   #========================================================
@@ -144,7 +150,7 @@
     """ Register a new channel. """
     self.channels.append (channel)
     codec = Codec (StringIO (), self.spec)
-    self.setHeader (codec, ord ('H'))
+    self.setHeader (codec, ord ('B'))
     msg = Content  (codec.stream.getvalue ())
     msg["content_type"] = "application/octet-stream"
     msg["routing_key"]  = "agent"
@@ -161,6 +167,22 @@
     """ Invoke a method on a managed object. """
     self.method (channel, userSequence, objId, className, methodName, args)
 
+  def getObjects (self, channel, userSequence, className):
+    """ Request immediate content from broker """
+    codec = Codec (StringIO (), self.spec)
+    self.setHeader (codec, ord ('G'), userSequence)
+    ft = {}
+    ft["_class"] = className
+    codec.encode_table (ft)
+    msg = Content (codec.stream.getvalue ())
+    msg["content_type"] = "application/octet-stream"
+    msg["routing_key"]  = "agent"
+    msg["reply_to"]     = self.spec.struct ("reply_to")
+    msg["reply_to"]["exchange_name"] = "amq.direct"
+    msg["reply_to"]["routing_key"]   = channel.replyName
+    channel.send ("qpid.management", msg)
+
+
   #========================================================
   # Channel API - interacts with registered channel objects
   #========================================================
@@ -182,9 +204,11 @@
       return
 
     if   hdr[0] == 'm':
-      self.handleMethodReply (ch, codec)
-    elif hdr[0] == 'I':
-      self.handleInit (ch, codec)
+      self.handleMethodReply (ch, codec, hdr[1])
+    elif hdr[0] == 'z':
+      self.handleCommandComplete (ch, codec, hdr[1])
+    elif hdr[0] == 'b':
+      self.handleBrokerResponse (ch, codec)
     elif hdr[0] == 'p':
       self.handlePackageInd (ch, codec)
     elif hdr[0] == 'q':
@@ -196,14 +220,13 @@
   #========================================================
   # Internal Functions
   #========================================================
-  def setHeader (self, codec, opcode, cls = 0):
+  def setHeader (self, codec, opcode, seq = 0):
     """ Compose the header of a management message. """
     codec.encode_octet (ord ('A'))
     codec.encode_octet (ord ('M'))
-    codec.encode_octet (ord ('0'))
     codec.encode_octet (ord ('1'))
     codec.encode_octet (opcode)
-    codec.encode_octet (cls)
+    codec.encode_long  (seq)
 
   def checkHeader (self, codec):
     """ Check the header of a management message and extract the opcode and
@@ -215,14 +238,11 @@
     if octet != 'M':
       return None
     octet = chr (codec.decode_octet ())
-    if octet != '0':
-      return None
-    octet = chr (codec.decode_octet ())
     if octet != '1':
       return None
     opcode = chr (codec.decode_octet ())
-    cls    = chr (codec.decode_octet ())
-    return (opcode, cls)
+    seq    = codec.decode_long ()
+    return (opcode, seq)
 
   def encodeValue (self, codec, value, typecode):
     """ Encode, into the codec, a value based on its typecode. """
@@ -252,6 +272,8 @@
       codec.encode_float    (float (value))
     elif typecode == 13: # DOUBLE
       codec.encode_double   (double (value))
+    elif typecode == 14: # UUID
+      codec.encode_uuid     (value)
     else:
       raise ValueError ("Invalid type code: %d" % typecode)
 
@@ -283,14 +305,24 @@
       data = codec.decode_float ()
     elif typecode == 13: # DOUBLE
       data = codec.decode_double ()
+    elif typecode == 14: # UUID
+      data = codec.decode_uuid ()
     else:
       raise ValueError ("Invalid type code: %d" % typecode)
     return data
 
-  def handleMethodReply (self, ch, codec):
-    sequence = codec.decode_long ()
-    status   = codec.decode_long ()
-    sText    = codec.decode_shortstr ()
+  def incOutstanding (self, ch):
+    ch.reqsOutstanding = ch.reqsOutstanding + 1
+
+  def decOutstanding (self, ch):
+    ch.reqsOutstanding = ch.reqsOutstanding - 1
+    if ch.reqsOutstanding == 0:
+      if self.ctrlCb != None:
+        self.ctrlCb (ch.context, self.CTRL_SCHEMA_LOADED, None)
+
+  def handleMethodReply (self, ch, codec, sequence):
+    status = codec.decode_long ()
+    sText  = codec.decode_shortstr ()
 
     data = self.seqMgr.release (sequence)
     if data == None:
@@ -317,15 +349,27 @@
     if self.methodCb != None:
       self.methodCb (ch.context, userSequence, status, sText, args)
 
-  def handleInit (self, ch, codec):
-    len = codec.decode_short ()
-    data = codec.decode_raw (len)
+  def handleCommandComplete (self, ch, codec, seq):
+    code = codec.decode_long ()
+    text = codec.decode_shortstr ()
+    data = (seq, code, text)
+    context = self.seqMgr.release (seq)
+    if context == "outstanding":
+      self.decOutstanding (ch)
+    elif self.ctrlCb != None:
+      self.ctrlCb (ch.context, self.CTRL_USER, data)
+
+  def handleBrokerResponse (self, ch, codec):
     if self.ctrlCb != None:
-      self.ctrlCb (ch.context, len, data)
+      uuid = codec.decode_uuid ()
+      data = (uuid, ch.sessionId)
+      self.ctrlCb (ch.context, self.CTRL_BROKER_INFO, data)
 
     # Send a package request
     sendCodec = Codec (StringIO (), self.spec)
-    self.setHeader (sendCodec, ord ('P'))
+    seq = self.seqMgr.reserve ("outstanding")
+    self.setHeader (sendCodec, ord ('P'), seq)
+    self.incOutstanding (ch)
     smsg = Content  (sendCodec.stream.getvalue ())
     smsg["content_type"] = "application/octet-stream"
     smsg["routing_key"]  = "agent"
@@ -341,7 +385,9 @@
 
       # Send a class request
       sendCodec = Codec (StringIO (), self.spec)
-      self.setHeader (sendCodec, ord ('Q'))
+      seq = self.seqMgr.reserve ("outstanding")
+      self.setHeader (sendCodec, ord ('Q'), seq)
+      self.incOutstanding (ch)
       sendCodec.encode_shortstr (pname)
       smsg = Content  (sendCodec.stream.getvalue ())
       smsg["content_type"] = "application/octet-stream"
@@ -362,6 +408,7 @@
       # Send a schema request
       sendCodec = Codec (StringIO (), self.spec)
       self.setHeader (sendCodec, ord ('S'))
+      self.incOutstanding (ch)
       sendCodec.encode_shortstr (pname)
       sendCodec.encode_shortstr (cname)
       sendCodec.encode_bin128   (hash)
@@ -373,8 +420,9 @@
       smsg["reply_to"]["routing_key"]   = ch.replyName
       ch.send ("qpid.management", smsg)
 
-  def parseSchema (self, ch, cls, codec):
+  def parseSchema (self, ch, codec):
     """ Parse a received schema-description message. """
+    self.decOutstanding (ch)
     packageName = codec.decode_shortstr ()
     className   = codec.decode_shortstr ()
     hash        = codec.decode_bin128 ()
@@ -495,7 +543,7 @@
 
   def parseContent (self, ch, cls, codec):
     """ Parse a received content message. """
-    if cls == 'C' and self.configCb == None:
+    if (cls == 'C' or cls == 'B') and self.configCb == None:
       return
     if cls == 'I' and self.instCb == None:
       return
@@ -516,23 +564,39 @@
     timestamps.append (codec.decode_longlong ())  # Delete Time
 
     schemaClass = self.schema[classKey]
-    for element in schemaClass[cls][:]:
-      tc   = element[1]
-      name = element[0]
-      data = self.decodeValue (codec, tc)
-      row.append ((name, data))
+    if cls == 'C' or cls == 'B':
+      for element in schemaClass['C'][:]:
+        tc   = element[1]
+        name = element[0]
+        data = self.decodeValue (codec, tc)
+        row.append ((name, data))
+
+    if cls == 'I' or cls == 'B':
+      if cls == 'B':
+        start = 1
+      else:
+        start = 0
+      for element in schemaClass['I'][start:]:
+        tc   = element[1]
+        name = element[0]
+        data = self.decodeValue (codec, tc)
+        row.append ((name, data))
 
-    if   cls == 'C':
+    if   cls == 'C' or cls == 'B':
       self.configCb (ch.context, classKey, row, timestamps)
     elif cls == 'I':
       self.instCb   (ch.context, classKey, row, timestamps)
 
-  def parse (self, ch, codec, opcode, cls):
+  def parse (self, ch, codec, opcode, seq):
     """ Parse a message received from the topic queue. """
     if opcode   == 's':
-      self.parseSchema  (ch, cls, codec)
-    elif opcode == 'C':
-      self.parseContent (ch, cls, codec)
+      self.parseSchema  (ch, codec)
+    elif opcode == 'c':
+      self.parseContent (ch, 'C', codec)
+    elif opcode == 'i':
+      self.parseContent (ch, 'I', codec)
+    elif opcode == 'g':
+      self.parseContent (ch, 'B', codec)
     else:
       raise ValueError ("Unknown opcode: %c" % opcode);
 
@@ -540,8 +604,7 @@
     """ Invoke a method on an object """
     codec = Codec (StringIO (), self.spec)
     sequence = self.seqMgr.reserve ((userSequence, classId, methodName))
-    self.setHeader (codec, ord ('M'))
-    codec.encode_long     (sequence)    # Method sequence id
+    self.setHeader (codec, ord ('M'), sequence)
     codec.encode_longlong (objId)       # ID of object
 
     # Encode args according to schema

Modified: incubator/qpid/trunk/qpid/specs/management-schema.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/specs/management-schema.xml?rev=640970&r1=640969&r2=640970&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/specs/management-schema.xml (original)
+++ incubator/qpid/trunk/qpid/specs/management-schema.xml Tue Mar 25 13:30:01 2008
@@ -93,6 +93,15 @@
 
   <!--
   ===============================================================
+  Management Agent
+  ===============================================================
+  -->
+  <class name="agent">
+    <configElement name="id" type="uuid" access="RO" index="y" desc="Agent ID"/>
+  </class>
+
+  <!--
+  ===============================================================
   Virtual Host
   ===============================================================
   -->

Modified: incubator/qpid/trunk/qpid/specs/management-types.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/specs/management-types.xml?rev=640970&r1=640969&r2=640970&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/specs/management-types.xml (original)
+++ incubator/qpid/trunk/qpid/specs/management-types.xml Tue Mar 25 13:30:01 2008
@@ -19,18 +19,19 @@
   under the License.
 -->
 
-<type name="objId"     base="REF"       cpp="uint64_t"    encode="@.putLongLong (#)"    decode="# = @.getLongLong ()"  accessor="direct" init="0"/>
-<type name="uint8"     base="U8"        cpp="uint8_t"     encode="@.putOctet (#)"       decode="# = @.getOctet ()"     accessor="direct" init="0"/>
-<type name="uint16"    base="U16"       cpp="uint16_t"    encode="@.putShort (#)"       decode="# = @.getShort ()"     accessor="direct" init="0"/>
-<type name="uint32"    base="U32"       cpp="uint32_t"    encode="@.putLong (#)"        decode="# = @.getLong ()"      accessor="direct" init="0"/>
-<type name="uint64"    base="U64"       cpp="uint64_t"    encode="@.putLongLong (#)"    decode="# = @.getLongLong ()"  accessor="direct" init="0"/>
-<type name="bool"      base="BOOL"      cpp="uint8_t"     encode="@.putOctet (#?1:0)"   decode="# = @.getOctet ()==1"  accessor="direct" init="0"/>
-<type name="sstr"      base="SSTR"      cpp="std::string" encode="@.putShortString (#)" decode="@.getShortString (#)"  accessor="direct" init='""'/>
-<type name="lstr"      base="LSTR"      cpp="std::string" encode="@.putLongString (#)"  decode="@.getLongString (#)"   accessor="direct" init='""'/>
-<type name="absTime"   base="ABSTIME"   cpp="uint64_t"    encode="@.putLongLong (#)"    decode="# = @.getLongLong ()"  accessor="direct" init="0"/>
-<type name="deltaTime" base="DELTATIME" cpp="uint64_t"    encode="@.putLongLong (#)"    decode="# = @.getLongLong ()"  accessor="direct" init="0"/>
-<type name="float"     base="FLOAT"     cpp="float"       encode="@.putFloat (#)"       decode="# = @.getFloat ()"     accessor="direct" init="0."/>
-<type name="double"    base="DOUBLE"    cpp="double"      encode="@.putDouble (#)"      decode="# = @.getDouble ()"    accessor="direct" init="0."/>
+<type name="objId"     base="REF"       cpp="uint64_t"      encode="@.putLongLong (#)"    decode="# = @.getLongLong ()"  accessor="direct" init="0"/>
+<type name="uint8"     base="U8"        cpp="uint8_t"       encode="@.putOctet (#)"       decode="# = @.getOctet ()"     accessor="direct" init="0"/>
+<type name="uint16"    base="U16"       cpp="uint16_t"      encode="@.putShort (#)"       decode="# = @.getShort ()"     accessor="direct" init="0"/>
+<type name="uint32"    base="U32"       cpp="uint32_t"      encode="@.putLong (#)"        decode="# = @.getLong ()"      accessor="direct" init="0"/>
+<type name="uint64"    base="U64"       cpp="uint64_t"      encode="@.putLongLong (#)"    decode="# = @.getLongLong ()"  accessor="direct" init="0"/>
+<type name="bool"      base="BOOL"      cpp="uint8_t"       encode="@.putOctet (#?1:0)"   decode="# = @.getOctet ()==1"  accessor="direct" init="0"/>
+<type name="sstr"      base="SSTR"      cpp="std::string"   encode="@.putShortString (#)" decode="@.getShortString (#)"  accessor="direct" init='""'/>
+<type name="lstr"      base="LSTR"      cpp="std::string"   encode="@.putLongString (#)"  decode="@.getLongString (#)"   accessor="direct" init='""'/>
+<type name="absTime"   base="ABSTIME"   cpp="uint64_t"      encode="@.putLongLong (#)"    decode="# = @.getLongLong ()"  accessor="direct" init="0"/>
+<type name="deltaTime" base="DELTATIME" cpp="uint64_t"      encode="@.putLongLong (#)"    decode="# = @.getLongLong ()"  accessor="direct" init="0"/>
+<type name="float"     base="FLOAT"     cpp="float"         encode="@.putFloat (#)"       decode="# = @.getFloat ()"     accessor="direct" init="0."/>
+<type name="double"    base="DOUBLE"    cpp="double"        encode="@.putDouble (#)"      decode="# = @.getDouble ()"    accessor="direct" init="0."/>
+<type name="uuid"      base="UUID"      cpp="framing::Uuid" encode="#.encode (@)"         decode="#.decode (@)"          accessor="direct"/>
 
 <type name="hilo8"   base="U8"   cpp="uint8_t"  encode="@.putOctet (#)"    decode="# = @.getOctet ()"    style="wm" accessor="counter" init="0"/>
 <type name="hilo16"  base="U16"  cpp="uint16_t" encode="@.putShort (#)"    decode="# = @.getShort ()"    style="wm" accessor="counter" init="0"/>