You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2008/09/03 20:01:45 UTC

svn commit: r691700 [2/2] - in /incubator/qpid/trunk/qpid: cpp/examples/qmf-agent/ cpp/managementgen/ cpp/managementgen/templates/ cpp/src/qpid/agent/ cpp/src/qpid/broker/ cpp/src/qpid/framing/ cpp/src/qpid/management/ python/qpid/ specs/

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp?rev=691700&r1=691699&r2=691700&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp Wed Sep  3 11:01:44 2008
@@ -27,6 +27,7 @@
 #include "qpid/framing/MessageTransferBody.h"
 #include "qpid/sys/Time.h"
 #include "qpid/broker/ConnectionState.h"
+#include "qpid/broker/AclModule.h"
 #include <list>
 #include <iostream>
 #include <fstream>
@@ -80,8 +81,8 @@
 ManagementBroker::ManagementBroker () :
     threadPoolSize(1), interval(10), broker(0)
 {
-    localBank      = 5;
     nextObjectId   = 1;
+    brokerBank     = 1;
     bootSequence   = 1;
     nextRemoteBank = 10;
     nextRequestSequence = 1;
@@ -112,7 +113,7 @@
     }
 }
 
-void ManagementBroker::configure(string _dataDir, uint16_t _interval, Manageable* _broker, int _threads)
+void ManagementBroker::configure(string _dataDir, uint16_t _interval, broker::Broker* _broker, int _threads)
 {
     dataDir        = _dataDir;
     interval       = _interval;
@@ -140,7 +141,10 @@
             inFile.close();
             QPID_LOG (debug, "ManagementBroker restored broker ID: " << uuid);
 
+            // if sequence goes beyond a 12-bit field, skip zero and wrap to 1.
             bootSequence++;
+            if (bootSequence & 0xF000)
+                bootSequence = 1;
             writeData();
         }
         else
@@ -183,29 +187,26 @@
     AddClass(pIter, className, md5Sum, schemaCall);
 }
 
-uint64_t ManagementBroker::addObject (ManagementObject* object,
-                                      uint32_t          persistId,
-                                      uint32_t          persistBank)
+ObjectId ManagementBroker::addObject (ManagementObject* object,
+                                      uint64_t          persistId)
 {
     Mutex::ScopedLock lock (addLock);
-    uint64_t objectId;
+    uint16_t sequence;
+    uint64_t objectNum;
 
-    if (persistId == 0)
-    {
-        objectId = ((uint64_t) bootSequence) << 48 |
-            ((uint64_t) localBank) << 24 | nextObjectId++;
-        if ((nextObjectId & 0xFF000000) != 0)
-        {
-            nextObjectId = 1;
-            localBank++;
-        }
+    if (persistId == 0) {
+        sequence  = bootSequence;
+        objectNum = nextObjectId++;
+    } else {
+        sequence  = 0;
+        objectNum = persistId;
     }
-    else
-        objectId = ((uint64_t) persistBank) << 24 | persistId;
 
-    object->setObjectId (objectId);
-    newManagementObjects[objectId] = object;
-    return objectId;
+    ObjectId objId(0 /*flags*/ , sequence, brokerBank, 0, objectNum);
+
+    object->setObjectId(objId);
+    newManagementObjects[objId] = object;
+    return objId;
 }
 
 ManagementBroker::Periodic::Periodic (ManagementBroker& _broker, uint32_t _seconds)
@@ -308,7 +309,7 @@
     char                msgChars[BUFSIZE];
     uint32_t            contentSize;
     string              routingKey;
-    std::list<uint64_t> deleteList;
+    std::list<ObjectId> deleteList;
 
     {
         Buffer msgBuffer(msgChars, BUFSIZE);
@@ -373,7 +374,7 @@
     }
 
     // Delete flagged objects
-    for (std::list<uint64_t>::reverse_iterator iter = deleteList.rbegin ();
+    for (std::list<ObjectId>::reverse_iterator iter = deleteList.rbegin ();
          iter != deleteList.rend ();
          iter++)
         managementObjects.erase (*iter);
@@ -408,48 +409,72 @@
     // Parse the routing key.  This management broker should act as though it
     // is bound to the exchange to match the following keys:
     //
-    //    agent.<X>.#
-    //    broker.#
-    //
-    // where <X> is any non-negative decimal integer less than the lowest remote
-    // object-id bank.
+    //    agent.0.#
+    //    broker
 
     if (routingKey == "broker") {
-        dispatchAgentCommandLH (msg);
+        dispatchAgentCommandLH(msg);
+        return false;
+    }
+
+    else if (routingKey.compare(0, 7, "agent.0") == 0) {
+        dispatchAgentCommandLH(msg);
         return false;
     }
 
     else if (routingKey.compare(0, 6, "agent.") == 0) {
-        std::string::size_type delim = routingKey.find('.', 6);
-        if (delim == string::npos)
-            delim = routingKey.length();
-        string bank = routingKey.substr(6, delim - 6);
-        if ((uint32_t) atoi(bank.c_str()) <= localBank) {
-            dispatchAgentCommandLH (msg);
-            return false;
-        }
+        return authorizeAgentMessageLH(msg);
     }
 
     return true;
 }
 
-void ManagementBroker::handleMethodRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence)
+void ManagementBroker::handleMethodRequestLH (Buffer& inBuffer, string replyToKey,
+                                              uint32_t sequence, const ConnectionToken* connToken)
 {
     string   methodName;
+    string   packageName;
+    string   className;
+    uint8_t  hash[16];
     Buffer   outBuffer (outputBuffer, MA_BUFFER_SIZE);
     uint32_t outLen;
+    AclModule* acl = broker->getAcl();
 
-    uint64_t objId = inBuffer.getLongLong();
+    ObjectId objId(inBuffer);
+    inBuffer.getShortString(packageName);
+    inBuffer.getShortString(className);
+    inBuffer.getBin128(hash);
     inBuffer.getShortString(methodName);
-
     EncodeHeader(outBuffer, 'm', sequence);
 
+    if (acl != 0) {
+        string userId = ((const broker::ConnectionState*) connToken)->getUserId();
+        std::map<acl::Property, string> params;
+        params[acl::SCHEMAPACKAGE] = packageName;
+        params[acl::SCHEMACLASS]   = className;
+
+        if (!acl->authorise(userId, acl::ACCESS, acl::METHOD, methodName, &params)) {
+            outBuffer.putLong(Manageable::STATUS_FORBIDDEN);
+            outBuffer.putShortString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN));
+            outLen = MA_BUFFER_SIZE - outBuffer.available();
+            outBuffer.reset();
+            SendBuffer(outBuffer, outLen, dExchange, replyToKey);
+            return;
+        }
+    }
+
     ManagementObjectMap::iterator iter = managementObjects.find(objId);
     if (iter == managementObjects.end() || iter->second->isDeleted()) {
         outBuffer.putLong        (Manageable::STATUS_UNKNOWN_OBJECT);
         outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT));
     } else {
-        iter->second->doMethod(methodName, inBuffer, outBuffer);
+        if ((iter->second->getPackageName() != packageName) ||
+            (iter->second->getClassName()   != className)) {
+            outBuffer.putLong        (Manageable::STATUS_INVALID_PARAMETER);
+            outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_INVALID_PARAMETER));
+        }
+        else
+            iter->second->doMethod(methodName, inBuffer, outBuffer);
     }
 
     outLen = MA_BUFFER_SIZE - outBuffer.available();
@@ -497,34 +522,33 @@
     FindOrAddPackageLH(packageName);
 }
 
-void ManagementBroker::handleClassQueryLH (Buffer& inBuffer, string replyToKey, uint32_t sequence)
+void ManagementBroker::handleClassQueryLH(Buffer& inBuffer, string replyToKey, uint32_t sequence)
 {
     std::string packageName;
 
-    inBuffer.getShortString (packageName);
-    PackageMap::iterator pIter = packages.find (packageName);
-    if (pIter != packages.end ())
+    inBuffer.getShortString(packageName);
+    PackageMap::iterator pIter = packages.find(packageName);
+    if (pIter != packages.end())
     {
         ClassMap cMap = pIter->second;
-        for (ClassMap::iterator cIter = cMap.begin ();
-             cIter != cMap.end ();
+        for (ClassMap::iterator cIter = cMap.begin();
+             cIter != cMap.end();
              cIter++)
         {
-            if (cIter->second->hasSchema ())
+            if (cIter->second.hasSchema())
             {
-                Buffer   outBuffer (outputBuffer, MA_BUFFER_SIZE);
+                Buffer   outBuffer(outputBuffer, MA_BUFFER_SIZE);
                 uint32_t outLen;
 
-                EncodeHeader (outBuffer, 'q', sequence);
-                EncodeClassIndication (outBuffer, pIter, cIter);
-                outLen = MA_BUFFER_SIZE - outBuffer.available ();
-                outBuffer.reset ();
-                SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+                EncodeHeader(outBuffer, 'q', sequence);
+                EncodeClassIndication(outBuffer, pIter, cIter);
+                outLen = MA_BUFFER_SIZE - outBuffer.available();
+                outBuffer.reset();
+                SendBuffer(outBuffer, outLen, dExchange, replyToKey);
             }
         }
     }
-
-    sendCommandComplete (replyToKey, sequence);
+    sendCommandComplete(replyToKey, sequence);
 }
 
 void ManagementBroker::handleClassIndLH (Buffer& inBuffer, string replyToKey, uint32_t)
@@ -551,9 +575,7 @@
         outBuffer.reset ();
         SendBuffer (outBuffer, outLen, dExchange, replyToKey);
 
-        SchemaClass* newSchema = new SchemaClass;
-        newSchema->pendingSequence = sequence;
-        pIter->second[key] = newSchema;
+        pIter->second.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(sequence)));
     }
 }
 
@@ -569,7 +591,7 @@
         buf.putRawData(buffer, bufferLen);
 }
 
-void ManagementBroker::handleSchemaRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence)
+void ManagementBroker::handleSchemaRequestLH(Buffer& inBuffer, string replyToKey, uint32_t sequence)
 {
     string         packageName;
     SchemaClassKey key;
@@ -578,33 +600,33 @@
     inBuffer.getShortString (key.name);
     inBuffer.getBin128      (key.hash);
 
-    PackageMap::iterator pIter = packages.find (packageName);
+    PackageMap::iterator pIter = packages.find(packageName);
     if (pIter != packages.end()) {
         ClassMap cMap = pIter->second;
-        ClassMap::iterator cIter = cMap.find (key);
+        ClassMap::iterator cIter = cMap.find(key);
         if (cIter != cMap.end()) {
-            Buffer   outBuffer (outputBuffer, MA_BUFFER_SIZE);
+            Buffer   outBuffer(outputBuffer, MA_BUFFER_SIZE);
             uint32_t outLen;
-            SchemaClass* classInfo = cIter->second;
+            SchemaClass& classInfo = cIter->second;
 
-            if (classInfo->hasSchema()) {
+            if (classInfo.hasSchema()) {
                 EncodeHeader(outBuffer, 's', sequence);
-                classInfo->appendSchema (outBuffer);
-                outLen = MA_BUFFER_SIZE - outBuffer.available ();
+                classInfo.appendSchema(outBuffer);
+                outLen = MA_BUFFER_SIZE - outBuffer.available();
                 outBuffer.reset();
-                SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+                SendBuffer(outBuffer, outLen, dExchange, replyToKey);
             }
             else
-                sendCommandComplete (replyToKey, sequence, 1, "Schema not available");
+                sendCommandComplete(replyToKey, sequence, 1, "Schema not available");
         }
         else
-            sendCommandComplete (replyToKey, sequence, 1, "Class key not found");
+            sendCommandComplete(replyToKey, sequence, 1, "Class key not found");
     }
     else
-        sendCommandComplete (replyToKey, sequence, 1, "Package not found");
+        sendCommandComplete(replyToKey, sequence, 1, "Package not found");
 }
 
-void ManagementBroker::handleSchemaResponseLH (Buffer& inBuffer, string /*replyToKey*/, uint32_t sequence)
+void ManagementBroker::handleSchemaResponseLH(Buffer& inBuffer, string /*replyToKey*/, uint32_t sequence)
 {
     string         packageName;
     SchemaClassKey key;
@@ -619,24 +641,26 @@
     if (pIter != packages.end()) {
         ClassMap cMap = pIter->second;
         ClassMap::iterator cIter = cMap.find(key);
-        if (cIter != cMap.end() && cIter->second->pendingSequence == sequence) {
+        if (cIter != cMap.end() && cIter->second.pendingSequence == sequence) {
             size_t length = ValidateSchema(inBuffer);
-            if (length == 0)
+            if (length == 0) {
+                QPID_LOG(warning, "Management Broker received invalid schema response: " << packageName << "." << key.name);
                 cMap.erase(key);
+            }
             else {
-                cIter->second->buffer    = (uint8_t*) malloc(length);
-                cIter->second->bufferLen = length;
-                inBuffer.getRawData(cIter->second->buffer, cIter->second->bufferLen);
+                cIter->second.buffer    = (uint8_t*) malloc(length);
+                cIter->second.bufferLen = length;
+                inBuffer.getRawData(cIter->second.buffer, cIter->second.bufferLen);
 
                 // Publish a class-indication message
-                Buffer   outBuffer (outputBuffer, MA_BUFFER_SIZE);
+                Buffer   outBuffer(outputBuffer, MA_BUFFER_SIZE);
                 uint32_t outLen;
 
-                EncodeHeader (outBuffer, 'q');
-                EncodeClassIndication (outBuffer, pIter, cIter);
-                outLen = MA_BUFFER_SIZE - outBuffer.available ();
-                outBuffer.reset ();
-                SendBuffer (outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema");
+                EncodeHeader(outBuffer, 'q');
+                EncodeClassIndication(outBuffer, pIter, cIter);
+                outLen = MA_BUFFER_SIZE - outBuffer.available();
+                outBuffer.reset();
+                SendBuffer(outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema");
             }
         }
     }
@@ -671,14 +695,14 @@
 
 void ManagementBroker::deleteOrphanedAgentsLH()
 {
-    vector<uint64_t> deleteList;
+    vector<ObjectId> deleteList;
 
     for (RemoteAgentMap::iterator aIter = remoteAgents.begin(); aIter != remoteAgents.end(); aIter++) {
-        uint64_t connectionRef = aIter->first;
+        ObjectId connectionRef = aIter->first;
         bool found = false;
 
-        for (ManagementObjectMap::iterator iter = managementObjects.begin ();
-             iter != managementObjects.end ();
+        for (ManagementObjectMap::iterator iter = managementObjects.begin();
+             iter != managementObjects.end();
              iter++) {
             if (iter->first == connectionRef && !iter->second->isDeleted()) {
                 found = true;
@@ -692,10 +716,8 @@
         }
     }
 
-    for (vector<uint64_t>::iterator dIter = deleteList.begin(); dIter != deleteList.end(); dIter++) {
-        
+    for (vector<ObjectId>::iterator dIter = deleteList.begin(); dIter != deleteList.end(); dIter++)
         remoteAgents.erase(*dIter);
-    }
 
     deleteList.clear();
 }
@@ -705,7 +727,7 @@
     string   label;
     uint32_t requestedBank;
     uint32_t assignedBank;
-    uint64_t connectionRef = ((const ConnectionState*) connToken)->GetManagementObject()->getObjectId();
+    ObjectId connectionRef = ((const ConnectionState*) connToken)->GetManagementObject()->getObjectId();
     Uuid     systemId;
 
     moveNewObjectsLH();
@@ -741,6 +763,7 @@
     uint32_t outLen;
 
     EncodeHeader (outBuffer, 'a', sequence);
+    outBuffer.putLong (brokerBank);
     outBuffer.putLong (assignedBank);
     outLen = MA_BUFFER_SIZE - outBuffer.available ();
     outBuffer.reset ();
@@ -786,13 +809,77 @@
     sendCommandComplete (replyToKey, sequence);
 }
 
-void ManagementBroker::dispatchAgentCommandLH (Message& msg)
+bool ManagementBroker::authorizeAgentMessageLH(Message& msg)
 {
     Buffer   inBuffer (inputBuffer, MA_BUFFER_SIZE);
     uint8_t  opcode;
     uint32_t sequence;
     string   replyToKey;
 
+    if (msg.encodedSize() > MA_BUFFER_SIZE)
+        return false;
+
+    msg.encodeContent(inBuffer);
+    inBuffer.reset();
+
+    if (!CheckHeader(inBuffer, &opcode, &sequence))
+        return false;
+
+    if (opcode == 'M') {
+        // TODO: check method call against ACL list.
+        AclModule* acl = broker->getAcl();
+        if (acl == 0)
+            return true;
+
+        string  userId = ((const broker::ConnectionState*) msg.getPublisher())->getUserId();
+        string  packageName;
+        string  className;
+        uint8_t hash[16];
+        string  methodName;
+
+        std::map<acl::Property, string> params;
+        ObjectId objId(inBuffer);
+        inBuffer.getShortString(packageName);
+        inBuffer.getShortString(className);
+        inBuffer.getBin128(hash);
+        inBuffer.getShortString(methodName);
+
+        params[acl::SCHEMAPACKAGE] = packageName;
+        params[acl::SCHEMACLASS]   = className;
+
+        if (acl->authorise(userId, acl::ACCESS, acl::METHOD, methodName, &params))
+            return true;
+
+        const framing::MessageProperties* p =
+            msg.getFrames().getHeaders()->get<framing::MessageProperties>();
+        if (p && p->hasReplyTo()) {
+            const framing::ReplyTo& rt = p->getReplyTo();
+            replyToKey = rt.getRoutingKey();
+
+            Buffer   outBuffer(outputBuffer, MA_BUFFER_SIZE);
+            uint32_t outLen;
+
+            EncodeHeader(outBuffer, 'm', sequence);
+            outBuffer.putLong(Manageable::STATUS_FORBIDDEN);
+            outBuffer.putShortString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN));
+            outLen = MA_BUFFER_SIZE - outBuffer.available();
+            outBuffer.reset();
+            SendBuffer(outBuffer, outLen, dExchange, replyToKey);
+        }
+
+        return false;
+    }
+
+    return true;
+}
+
+void ManagementBroker::dispatchAgentCommandLH(Message& msg)
+{
+    Buffer   inBuffer(inputBuffer, MA_BUFFER_SIZE);
+    uint8_t  opcode;
+    uint32_t sequence;
+    string   replyToKey;
+
     const framing::MessageProperties* p =
         msg.getFrames().getHeaders()->get<framing::MessageProperties>();
     if (p && p->hasReplyTo()) {
@@ -823,7 +910,7 @@
     else if (opcode == 's') handleSchemaResponseLH (inBuffer, replyToKey, sequence);
     else if (opcode == 'A') handleAttachRequestLH  (inBuffer, replyToKey, sequence, msg.getPublisher());
     else if (opcode == 'G') handleGetQueryLH       (inBuffer, replyToKey, sequence);
-    else if (opcode == 'M') handleMethodRequestLH  (inBuffer, replyToKey, sequence);
+    else if (opcode == 'M') handleMethodRequestLH  (inBuffer, replyToKey, sequence, msg.getPublisher());
 }
 
 ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackageLH(std::string name)
@@ -834,7 +921,7 @@
 
     // No such package found, create a new map entry.
     pair<PackageMap::iterator, bool> result =
-        packages.insert (pair<string, ClassMap> (name, ClassMap ()));
+        packages.insert(pair<string, ClassMap>(name, ClassMap()));
     QPID_LOG (debug, "ManagementBroker added package " << name);
 
     // Publish a package-indication message
@@ -859,20 +946,18 @@
     ClassMap&      cMap = pIter->second;
 
     key.name = className;
-    memcpy (&key.hash, md5Sum, 16);
+    memcpy(&key.hash, md5Sum, 16);
 
-    ClassMap::iterator cIter = cMap.find (key);
-    if (cIter != cMap.end ())
+    ClassMap::iterator cIter = cMap.find(key);
+    if (cIter != cMap.end())
         return;
 
     // No such class found, create a new class with local information.
     QPID_LOG (debug, "ManagementBroker added class " << pIter->first << "." <<
               key.name);
-    SchemaClass* classInfo = new SchemaClass;
 
-    classInfo->writeSchemaCall = schemaCall;
-    cMap[key] = classInfo;
-    cIter     = cMap.find (key);
+    cMap.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(schemaCall)));
+    cIter = cMap.find(key);
 }
 
 void ManagementBroker::EncodePackageIndication (Buffer&              buf,
@@ -917,6 +1002,8 @@
     for (uint16_t idx = 0; idx < methCount; idx++) {
         FieldTable ft;
         ft.decode(inBuffer);
+        if (!ft.isSet("argCount"))
+            return 0;
         int argCount = ft.getInt("argCount");
         for (int mIdx = 0; mIdx < argCount; mIdx++) {
             FieldTable aft;
@@ -924,10 +1011,41 @@
         }
     }
 
-    if (evntCount != 0)
-        return 0;
+    for (uint16_t idx = 0; idx < evntCount; idx++) {
+        FieldTable ft;
+        ft.decode(inBuffer);
+        if (!ft.isSet("argCount"))
+            return 0;
+        int argCount = ft.getInt("argCount");
+        for (int mIdx = 0; mIdx < argCount; mIdx++) {
+            FieldTable aft;
+            aft.decode(inBuffer);
+        }
+    }
 
     end = inBuffer.getPosition();
     inBuffer.restore(); // restore original position
     return end - start;
 }
+
+Mutex& ManagementBroker::getMutex()
+{
+    return userLock;
+}
+
+Buffer* ManagementBroker::startEventLH()
+{
+    Buffer* outBuffer(new Buffer(eventBuffer, MA_BUFFER_SIZE));
+    EncodeHeader(*outBuffer, 'e');
+    outBuffer->putLongLong(uint64_t(Duration(now())));
+    return outBuffer;
+}
+
+void ManagementBroker::finishEventLH(Buffer* outBuffer)
+{
+    uint32_t outLen = MA_BUFFER_SIZE - outBuffer->available();
+    outBuffer->reset();
+    SendBuffer(*outBuffer, outLen, mExchange, "mgmt.event");
+    delete outBuffer;
+}
+

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h?rev=691700&r1=691699&r2=691700&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h Wed Sep  3 11:01:44 2008
@@ -47,7 +47,7 @@
     ManagementBroker ();
     virtual ~ManagementBroker ();
 
-    void configure       (std::string dataDir, uint16_t interval, Manageable* broker, int threadPoolSize);
+    void configure       (std::string dataDir, uint16_t interval, broker::Broker* broker, int threadPoolSize);
     void setInterval     (uint16_t _interval) { interval = _interval; }
     void setExchange     (broker::Exchange::shared_ptr mgmtExchange,
                           broker::Exchange::shared_ptr directExchange);
@@ -56,16 +56,15 @@
                           std::string className,
                           uint8_t*    md5Sum,
                           ManagementObject::writeSchemaCall_t schemaCall);
-    uint64_t addObject   (ManagementObject* object,
-                          uint32_t          persistId   = 0,
-                          uint32_t          persistBank = 4);
+    ObjectId addObject   (ManagementObject* object,
+                          uint64_t          persistId = 0);
     void clientAdded     (void);
     bool dispatchCommand (broker::Deliverable&       msg,
                           const std::string&         routingKey,
                           const framing::FieldTable* args);
 
     // Stubs for remote management agent calls
-    void init (std::string, uint16_t, uint16_t, bool) { assert(0); }
+    void init (std::string, uint16_t, uint16_t, bool, std::string) { assert(0); }
     uint32_t pollCallbacks (uint32_t) { assert(0); return 0; }
     int getSignalFd () { assert(0); return -1; }
 
@@ -88,7 +87,7 @@
     {
         uint32_t          objIdBank;
         std::string       routingKey;
-        uint64_t          connectionRef;
+        ObjectId          connectionRef;
         Agent*            mgmtObject;
         ManagementObject* GetManagementObject (void) const { return mgmtObject; }
         virtual ~RemoteAgent ();
@@ -97,7 +96,7 @@
     // TODO: Eventually replace string with entire reply-to structure.  reply-to
     //       currently assumes that the exchange is "amq.direct" even though it could
     //       in theory be specified differently.
-    typedef std::map<uint64_t, RemoteAgent*> RemoteAgentMap;
+    typedef std::map<ObjectId, RemoteAgent*> RemoteAgentMap;
     typedef std::vector<std::string>         ReplyToVector;
 
     //  Storage for known schema classes:
@@ -133,12 +132,15 @@
         size_t   bufferLen;
         uint8_t* buffer;
 
-        SchemaClass () : writeSchemaCall(0), pendingSequence(0), bufferLen(0), buffer(0) {}
+        SchemaClass(uint32_t seq) :
+            writeSchemaCall(0), pendingSequence(seq), bufferLen(0), buffer(0) {}
+        SchemaClass(ManagementObject::writeSchemaCall_t call) :
+            writeSchemaCall(call), pendingSequence(0), bufferLen(0), buffer(0) {}
         bool hasSchema () { return (writeSchemaCall != 0) || (buffer != 0); }
         void appendSchema (framing::Buffer& buf);
     };
 
-    typedef std::map<SchemaClassKey, SchemaClass*, SchemaClassKeyComp> ClassMap;
+    typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap;
     typedef std::map<std::string, ClassMap> PackageMap;
 
     RemoteAgentMap               remoteAgents;
@@ -157,10 +159,10 @@
     broker::Exchange::shared_ptr dExchange;
     std::string                  dataDir;
     uint16_t                     interval;
-    Manageable*                  broker;
+    broker::Broker*              broker;
     uint16_t                     bootSequence;
-    uint32_t                     localBank;
     uint32_t                     nextObjectId;
+    uint32_t                     brokerBank;
     uint32_t                     nextRemoteBank;
     uint32_t                     nextRequestSequence;
     bool                         clientWasAdded;
@@ -168,6 +170,7 @@
 #   define MA_BUFFER_SIZE 65536
     char inputBuffer[MA_BUFFER_SIZE];
     char outputBuffer[MA_BUFFER_SIZE];
+    char eventBuffer[MA_BUFFER_SIZE];
 
     void writeData ();
     void PeriodicProcessing (void);
@@ -179,7 +182,8 @@
                              std::string                  routingKey);
     void moveNewObjectsLH();
 
-    void dispatchAgentCommandLH (broker::Message& msg);
+    bool authorizeAgentMessageLH(broker::Message& msg);
+    void dispatchAgentCommandLH(broker::Message& msg);
 
     PackageMap::iterator FindOrAddPackageLH(std::string name);
     void AddClass(PackageMap::iterator         pIter,
@@ -206,9 +210,12 @@
     void handleSchemaResponseLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
     void handleAttachRequestLH  (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const broker::ConnectionToken* connToken);
     void handleGetQueryLH       (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
-    void handleMethodRequestLH  (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+    void handleMethodRequestLH  (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const broker::ConnectionToken* connToken);
 
     size_t ValidateSchema(framing::Buffer&);
+    sys::Mutex& getMutex();
+    framing::Buffer* startEventLH();
+    void finishEventLH(framing::Buffer* outBuffer);
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.cpp?rev=691700&r1=691699&r2=691700&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.cpp Wed Sep  3 11:01:44 2008
@@ -28,6 +28,62 @@
 using namespace qpid::management;
 using namespace qpid::sys;
 
+void AgentAttachment::setBanks(uint32_t broker, uint32_t bank)
+{
+    first =
+        ((uint64_t) (broker & 0x000fffff)) << 28 |
+        ((uint64_t) (bank   & 0x0fffffff));
+}
+
+ObjectId::ObjectId(uint8_t flags, uint16_t seq, uint32_t broker, uint32_t bank, uint64_t object)
+    : agent(0)
+{
+    first =
+        ((uint64_t) (flags  &       0x0f)) << 60 |
+        ((uint64_t) (seq    &     0x0fff)) << 48 |
+        ((uint64_t) (broker & 0x000fffff)) << 28 |
+        ((uint64_t) (bank   & 0x0fffffff));
+    second = object;
+}
+
+ObjectId::ObjectId(AgentAttachment* _agent, uint8_t flags, uint16_t seq, uint64_t object)
+    : agent(_agent)
+{
+    first =
+        ((uint64_t) (flags &   0x0f)) << 60 |
+        ((uint64_t) (seq   & 0x0fff)) << 48;
+    second = object;
+}
+
+bool ObjectId::operator==(const ObjectId &other) const
+{
+    uint64_t otherFirst = agent == 0 ? other.first : other.first & 0xffff000000000000LL;
+
+    return first == otherFirst && second == other.second;
+}
+
+bool ObjectId::operator<(const ObjectId &other) const
+{
+    uint64_t otherFirst = agent == 0 ? other.first : other.first & 0xffff000000000000LL;
+
+    return (first < otherFirst) || ((first == otherFirst) && (second < other.second));
+}
+
+void ObjectId::encode(framing::Buffer& buffer)
+{
+    if (agent == 0)
+        buffer.putLongLong(first);
+    else
+        buffer.putLongLong(first | agent->first);
+    buffer.putLongLong(second);
+}
+
+void ObjectId::decode(framing::Buffer& buffer)
+{
+    first  = buffer.getLongLong();
+    second = buffer.getLongLong();
+}
+
 int ManagementObject::nextThreadIndex = 0;
 
 void ManagementObject::writeTimestamps (Buffer& buf)
@@ -38,10 +94,10 @@
     buf.putLongLong    (uint64_t (Duration (now ())));
     buf.putLongLong    (createTime);
     buf.putLongLong    (destroyTime);
-    buf.putLongLong    (objectId);
+    objectId.encode(buf);
 }
 
-void ManagementObject::setReference(uint64_t) {}
+void ManagementObject::setReference(ObjectId) {}
 
 int ManagementObject::getThreadIndex() {
     static __thread int thisIndex = -1;
@@ -54,3 +110,17 @@
     return thisIndex;
 }
 
+Mutex& ManagementObject::getMutex()
+{
+    return agent->getMutex();
+}
+
+Buffer* ManagementObject::startEventLH()
+{
+    return agent->startEventLH();
+}
+
+void ManagementObject::finishEventLH(Buffer* buf)
+{
+    agent->finishEventLH(buf);
+}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h?rev=691700&r1=691699&r2=691700&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h Wed Sep  3 11:01:44 2008
@@ -32,6 +32,34 @@
 
 class Manageable;
 class ManagementAgent;
+class ObjectId;
+
+
+class AgentAttachment {
+    friend class ObjectId;
+private:
+    uint64_t first;
+public:
+    AgentAttachment() : first(0) {}
+    void setBanks(uint32_t broker, uint32_t bank);
+};
+
+
+class ObjectId {
+private:
+    const AgentAttachment* agent;
+    uint64_t first;
+    uint64_t second;
+public:
+    ObjectId() : agent(0), first(0), second(0) {}
+    ObjectId(framing::Buffer& buf) : agent(0) { decode(buf); }
+    ObjectId(uint8_t flags, uint16_t seq, uint32_t broker, uint32_t bank, uint64_t object);
+    ObjectId(AgentAttachment* _agent, uint8_t flags, uint16_t seq, uint64_t object);
+    bool operator==(const ObjectId &other) const;
+    bool operator<(const ObjectId &other) const;
+    void encode(framing::Buffer& buffer);
+    void decode(framing::Buffer& buffer);
+};
 
 class ManagementObject
 {
@@ -39,7 +67,7 @@
     
     uint64_t         createTime;
     uint64_t         destroyTime;
-    uint64_t         objectId;
+    ObjectId         objectId;
     bool             configChanged;
     bool             instChanged;
     bool             deleted;
@@ -84,11 +112,15 @@
     int  getThreadIndex();
     void writeTimestamps (qpid::framing::Buffer& buf);
 
+    sys::Mutex& getMutex();
+    framing::Buffer* startEventLH();
+    void finishEventLH(framing::Buffer* buf);
+
   public:
     typedef void (*writeSchemaCall_t) (qpid::framing::Buffer&);
 
     ManagementObject (ManagementAgent* _agent, Manageable* _core) :
-        destroyTime(0), objectId (0), configChanged(true),
+        destroyTime(0), configChanged(true),
         instChanged(true), deleted(false), coreObject(_core), agent(_agent)
     { createTime = uint64_t (qpid::sys::Duration (qpid::sys::now ())); }
     virtual ~ManagementObject () {}
@@ -100,14 +132,14 @@
     virtual void doMethod       (std::string            methodName,
                                  qpid::framing::Buffer& inBuf,
                                  qpid::framing::Buffer& outBuf) = 0;
-    virtual void setReference   (uint64_t objectId);
+    virtual void setReference   (ObjectId objectId);
 
     virtual std::string& getClassName   (void) = 0;
     virtual std::string& getPackageName (void) = 0;
     virtual uint8_t*     getMd5Sum      (void) = 0;
 
-    void         setObjectId      (uint64_t oid) { objectId = oid; }
-    uint64_t     getObjectId      (void) { return objectId; }
+    void         setObjectId      (ObjectId oid) { objectId = oid; }
+    ObjectId     getObjectId      (void) { return objectId; }
     inline  bool getConfigChanged (void) { return configChanged; }
     virtual bool getInstChanged   (void) { return instChanged; }
     inline  void setAllChanged    (void) {
@@ -120,10 +152,9 @@
         deleted     = true;
     }
     inline bool isDeleted (void) { return deleted; }
-    inline sys::Mutex& getLock() { return accessLock; }
 };
 
-typedef std::map<uint64_t,ManagementObject*> ManagementObjectMap;
+typedef std::map<ObjectId, ManagementObject*> ManagementObjectMap;
 
 }}
             

Modified: incubator/qpid/trunk/qpid/python/qpid/management.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/management.py?rev=691700&r1=691699&r2=691700&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/management.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/management.py Wed Sep  3 11:01:44 2008
@@ -69,6 +69,53 @@
     for cell in row:
       setattr (self, cell[0], cell[1])
 
+class objectId(object):
+  """ Object that represents QMF object identifiers """
+
+  def __init__(self, codec):
+    self.first  = codec.read_uint64()
+    self.second = codec.read_uint64()
+
+  def __cmp__(self, other):
+    if other == None:
+      return 1
+    if self.first < other.first:
+      return -1
+    if self.first > other.first:
+      return 1
+    if self.second < other.second:
+      return -1
+    if self.second > other.second:
+      return 1
+    return 0
+
+
+  def index(self):
+    return (self.first, self.second)
+
+  def getFlags(self):
+    return (self.first & 0xF000000000000000) >> 60
+
+  def getSequence(self):
+    return (self.first & 0x0FFF000000000000) >> 48
+
+  def getBroker(self):
+    return (self.first & 0x0000FFFFF0000000) >> 28
+
+  def getBank(self):
+    return self.first & 0x000000000FFFFFFF
+
+  def getObject(self):
+    return self.second
+
+  def isDurable(self):
+    return self.getSequence() == 0
+
+  def encode(self, codec):
+    codec.write_uint64(self.first)
+    codec.write_uint64(self.second)
+
+
 class methodResult:
   """ Object that contains the result of a method call """
 
@@ -308,6 +355,8 @@
       self.handleClassInd (ch, codec)
     elif hdr[0] == 'h':
       self.handleHeartbeat (ch, codec)
+    elif hdr[0] == 'e':
+      self.handleEvent (ch, codec)
     else:
       self.parse (ch, codec, hdr[0], hdr[1])
     ch.accept(msg)
@@ -386,7 +435,7 @@
     elif typecode == 9:  # DELTATIME
       codec.write_uint64 (long (value))
     elif typecode == 10: # REF
-      codec.write_uint64 (long (value))
+      value.encode(codec)
     elif typecode == 11: # BOOL
       codec.write_uint8  (int  (value))
     elif typecode == 12: # FLOAT
@@ -429,7 +478,7 @@
     elif typecode == 9:  # DELTATIME
       data = codec.read_uint64 ()
     elif typecode == 10: # REF
-      data = codec.read_uint64 ()
+      data = objectId(codec)
     elif typecode == 11: # BOOL
       data = codec.read_uint8 ()
     elif typecode == 12: # FLOAT
@@ -551,9 +600,9 @@
       ch.send ("qpid.management", smsg)
 
   def handleClassInd (self, ch, codec):
-    pname = str (codec.read_str8 ())
-    cname = str (codec.read_str8 ())
-    hash  = codec.read_bin128   ()
+    pname = str (codec.read_str8())
+    cname = str (codec.read_str8())
+    hash  = codec.read_bin128()
     if pname not in self.packages:
       return
 
@@ -574,6 +623,32 @@
     if self.ctrlCb != None:
       self.ctrlCb (ch.context, self.CTRL_HEARTBEAT, timestamp)
 
+  def handleEvent (self, ch, codec):
+    if self.eventCb == None:
+      return
+    timestamp = codec.read_uint64()
+    objId = objectId(codec)
+    packageName = str(codec.read_str8())
+    className   = str(codec.read_str8())
+    hash        = codec.read_bin128()
+    name        = str(codec.read_str8())
+    classKey    = (packageName, className, hash)
+    if classKey not in self.schema:
+      return;
+    schemaClass = self.schema[classKey]
+    row = []
+    es = schemaClass['E']
+    arglist = None
+    for ename in es:
+      (edesc, eargs) = es[ename]
+      if ename == name:
+        arglist = eargs
+    if arglist == None:
+      return
+    for arg in arglist:
+      row.append((arg[0], self.decodeValue(codec, arg[1])))
+    self.eventCb(ch.context, classKey, objId, name, row)
+
   def parseSchema (self, ch, codec):
     """ Parse a received schema-description message. """
     self.decOutstanding (ch)
@@ -597,22 +672,23 @@
     configs = []
     insts   = []
     methods = {}
-    events  = []
+    events  = {}
 
     configs.append (("id", 4, "", "", 1, 1, None, None, None, None, None))
     insts.append   (("id", 4, None, None))
 
     for idx in range (configCount):
       ft = codec.read_map ()
-      name   = str (ft["name"])
-      type   = ft["type"]
-      access = ft["access"]
-      index  = ft["index"]
-      unit   = None
-      min    = None
-      max    = None
-      maxlen = None
-      desc   = None
+      name     = str (ft["name"])
+      type     = ft["type"]
+      access   = ft["access"]
+      index    = ft["index"]
+      optional = ft["optional"]
+      unit     = None
+      min      = None
+      max      = None
+      maxlen   = None
+      desc     = None
 
       for key, value in ft.items ():
         if   key == "unit":
@@ -626,7 +702,7 @@
         elif key == "desc":
           desc = str (value)
 
-      config = (name, type, unit, desc, access, index, min, max, maxlen)
+      config = (name, type, unit, desc, access, index, min, max, maxlen, optional)
       configs.append (config)
 
     for idx in range (instCount):
@@ -685,6 +761,33 @@
         args.append (arg)
       methods[mname] = (mdesc, args)
 
+    for idx in range (eventCount):
+      ft = codec.read_map ()
+      ename    = str (ft["name"])
+      argCount = ft["argCount"]
+      if "desc" in ft:
+        edesc = str (ft["desc"])
+      else:
+        edesc = None
+
+      args = []
+      for aidx in range (argCount):
+        ft = codec.read_map ()
+        name    = str (ft["name"])
+        type    = ft["type"]
+        unit    = None
+        desc    = None
+
+        for key, value in ft.items ():
+          if   key == "unit":
+            unit = str (value)
+          elif key == "desc":
+            desc = str (value)
+
+        arg = (name, type, unit, desc)
+        args.append (arg)
+      events[ename] = (edesc, args)
+
     schemaClass = {}
     schemaClass['C'] = configs
     schemaClass['I'] = insts
@@ -695,6 +798,22 @@
     if self.schemaCb != None:
       self.schemaCb (ch.context, classKey, configs, insts, methods, events)
 
+  def parsePresenceMasks(self, codec, schemaClass):
+    """ Generate a list of not-present properties """
+    excludeList = []
+    bit = 0
+    for element in schemaClass['C'][1:]:
+      if element[9] == 1:
+        if bit == 0:
+          mask = codec.read_uint8()
+          bit  = 1
+        if (mask & bit) == 0:
+          excludeList.append(element[0])
+        bit = bit * 2
+        if bit == 256:
+          bit = 0
+    return excludeList
+
   def parseContent (self, ch, cls, codec, seq=0):
     """ Parse a received content message. """
     if (cls == 'C' or (cls == 'B' and seq == 0)) and self.configCb == None:
@@ -716,21 +835,26 @@
     timestamps.append (codec.read_uint64 ())  # Current Time
     timestamps.append (codec.read_uint64 ())  # Create Time
     timestamps.append (codec.read_uint64 ())  # Delete Time
-
+    objId = objectId(codec)
     schemaClass = self.schema[classKey]
     if cls == 'C' or cls == 'B':
-      for element in schemaClass['C'][:]:
+      notPresent = self.parsePresenceMasks(codec, schemaClass)
+
+    if cls == 'C' or cls == 'B':
+      row.append(("id", objId))
+      for element in schemaClass['C'][1:]:
         tc   = element[1]
         name = element[0]
-        data = self.decodeValue (codec, tc)
-        row.append ((name, data))
+        if name in notPresent:
+          row.append((name, None))
+        else:
+          data = self.decodeValue(codec, tc)
+          row.append((name, data))
 
     if cls == 'I' or cls == 'B':
-      if cls == 'B':
-        start = 1
-      else:
-        start = 0
-      for element in schemaClass['I'][start:]:
+      if cls == 'I':
+        row.append(("id", objId))
+      for element in schemaClass['I'][1:]:
         tc   = element[1]
         name = element[0]
         data = self.decodeValue (codec, tc)
@@ -763,9 +887,12 @@
     codec = Codec (self.spec)
     sequence = self.seqMgr.reserve ((userSequence, classId, methodName))
     self.setHeader (codec, ord ('M'), sequence)
-    codec.write_uint64 (objId)       # ID of object
+    objId.encode(codec)
+    codec.write_str8 (classId[0])
+    codec.write_str8 (classId[1])
+    codec.write_bin128 (classId[2])
     codec.write_str8 (methodName)
-    bank = (objId & 0x0000FFFFFF000000) >> 24
+    bank = objId.getBank()
 
     # Encode args according to schema
     if classId not in self.schema:

Modified: incubator/qpid/trunk/qpid/python/qpid/managementdata.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/managementdata.py?rev=691700&r1=691699&r2=691700&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/managementdata.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/managementdata.py Wed Sep  3 11:01:44 2008
@@ -71,14 +71,14 @@
   #
 
   def registerObjId (self, objId):
-    if not objId in self.idBackMap:
-      self.idBackMap[objId]   = self.nextId
+    if not objId.index() in self.idBackMap:
+      self.idBackMap[objId.index()] = self.nextId
       self.idMap[self.nextId] = objId
       self.nextId += 1
 
-  def displayObjId (self, objId):
-    if objId in self.idBackMap:
-      return self.idBackMap[objId]
+  def displayObjId (self, objIdIndex):
+    if objIdIndex in self.idBackMap:
+      return self.idBackMap[objIdIndex]
     else:
       return 0
 
@@ -86,7 +86,7 @@
     if displayId in self.idMap:
       return self.idMap[displayId]
     else:
-      return 0
+      return None
 
   def displayClassName (self, cls):
     (packageName, className, hash) = cls
@@ -102,19 +102,20 @@
         self.tables[className] = {}
 
       # Register the ID so a more friendly presentation can be displayed
-      id = long (list[0][1])
-      self.registerObjId (id)
+      objId = list[0][1]
+      oidx  = objId.index()
+      self.registerObjId (objId)
 
       # If this object hasn't been seen before, create a new object record with
       # the timestamps and empty lists for configuration and instrumentation data.
-      if id not in self.tables[className]:
-        self.tables[className][id] = (timestamps, [], [])
+      if oidx not in self.tables[className]:
+        self.tables[className][oidx] = (timestamps, [], [])
 
-      (unused, oldConf, oldInst) = self.tables[className][id]
+      (unused, oldConf, oldInst) = self.tables[className][oidx]
 
       # For config updates, simply replace old config list with the new one.
       if   context == 0: #config
-        self.tables[className][id] = (timestamps, list, oldInst)
+        self.tables[className][oidx] = (timestamps, list, oldInst)
 
       # For instrumentation updates, carry the minimum and maximum values for
       # "hi-lo" stats forward.
@@ -132,7 +133,7 @@
               if oldInst[idx][1] < value:
                 value = oldInst[idx][1]
             newInst.append ((key, value))
-        self.tables[className][id] = (timestamps, oldConf, newInst)
+        self.tables[className][oidx] = (timestamps, oldConf, newInst)
 
     finally:
       self.lock.release ()
@@ -211,11 +212,13 @@
     pass
 
   def refName (self, oid):
-    if oid == 0:
+    if oid == None:
       return "NULL"
-    return str (self.displayObjId (oid))
+    return str (self.displayObjId (oid.index()))
 
   def valueDisplay (self, classKey, key, value):
+    if value == None:
+      return "<NULL>"
     for kind in range (2):
       schema = self.schema[classKey][kind]
       for item in schema:
@@ -437,7 +440,7 @@
         if classKey in self.tables:
           ids = self.listOfIds(classKey, tokens[1:])
           for objId in ids:
-            (ts, config, inst) = self.tables[classKey][self.rawObjId(objId)]
+            (ts, config, inst) = self.tables[classKey][self.rawObjId(objId).index()]
             createTime  = self.disp.timestamp (ts[1])
             destroyTime = "-"
             if ts[2] > 0:
@@ -486,32 +489,32 @@
 
       rows = []
       timestamp = None
-      config = self.tables[classKey][ids[0]][1]
+      config = self.tables[classKey][ids[0].index()][1]
       for eIdx in range (len (config)):
         key = config[eIdx][0]
         if key != "id":
           row   = ("property", key)
           for id in ids:
             if timestamp == None or \
-               timestamp < self.tables[classKey][id][0][0]:
-              timestamp = self.tables[classKey][id][0][0]
-            (key, value) = self.tables[classKey][id][1][eIdx]
+               timestamp < self.tables[classKey][id.index()][0][0]:
+              timestamp = self.tables[classKey][id.index()][0][0]
+            (key, value) = self.tables[classKey][id.index()][1][eIdx]
             row = row + (self.valueDisplay (classKey, key, value),)
           rows.append (row)
 
-      inst = self.tables[classKey][ids[0]][2]
+      inst = self.tables[classKey][ids[0].index()][2]
       for eIdx in range (len (inst)):
         key = inst[eIdx][0]
         if key != "id":
           row = ("statistic", key)
           for id in ids:
-            (key, value) = self.tables[classKey][id][2][eIdx]
+            (key, value) = self.tables[classKey][id.index()][2][eIdx]
             row = row + (self.valueDisplay (classKey, key, value),)
           rows.append (row)
 
       titleRow = ("Type", "Element")
       for id in ids:
-        titleRow = titleRow + (self.refName (id),)
+        titleRow = titleRow + (self.refName(id),)
       caption = "Object of type %s.%s:" % (classKey[0], classKey[1])
       if timestamp != None:
         caption = caption + " (last sample time: " + self.disp.timestamp (timestamp) + ")"
@@ -563,13 +566,15 @@
           access   = self.accessName (config[4])
           extra    = ""
           if config[5] == 1:
-            extra = extra + "index "
+            extra += "index "
           if config[6] != None:
-            extra = extra + "Min: " + str (config[6])
+            extra += "Min: " + str(config[6]) + " "
           if config[7] != None:
-            extra = extra + "Max: " + str (config[7])
+            extra += "Max: " + str(config[7]) + " "
           if config[8] != None:
-            extra = extra + "MaxLen: " + str (config[8])
+            extra += "MaxLen: " + str(config[8]) + " "
+          if config[9] == 1:
+            extra += "optional "
           rows.append ((name, typename, unit, access, extra, desc))
         
       for config in self.schema[classKey][1]:
@@ -613,7 +618,7 @@
   def getClassForId (self, objId):
     """ Given an object ID, return the class key for the referenced object """
     for classKey in self.tables:
-      if objId in self.tables[classKey]:
+      if objId.index() in self.tables[classKey]:
         return classKey
     return None
 
@@ -659,14 +664,19 @@
 
   def makeIdRow (self, displayId):
     if displayId in self.idMap:
-      rawId = self.idMap[displayId]
+      objId = self.idMap[displayId]
     else:
       return None
-    return (displayId,
-            rawId,
-            (rawId & 0x7FFF000000000000) >> 48,
-            (rawId & 0x0000FFFFFF000000) >> 24,
-            (rawId & 0x0000000000FFFFFF))
+    if objId.getFlags() == 0:
+      flags = ""
+    else:
+      flags = str(objId.getFlags())
+    seq = objId.getSequence()
+    if seq == 0:
+      seqText = "<durable>"
+    else:
+      seqText = str(seq)
+    return (displayId, flags, seqText, objId.getBroker(), objId.getBank(), hex(objId.getObject()))
 
   def listIds (self, select):
     rows = []
@@ -683,7 +693,7 @@
         return
       rows.append(row)
     self.disp.table("Translation of Display IDs:",
-                    ("DisplayID", "RawID", "BootSequence", "Bank", "Object"),
+                    ("DisplayID", "Flags", "BootSequence", "Broker", "Bank", "Object"),
                     rows)
 
   def do_list (self, data):

Modified: incubator/qpid/trunk/qpid/specs/management-schema.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/specs/management-schema.xml?rev=691700&r1=691699&r2=691700&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/specs/management-schema.xml (original)
+++ incubator/qpid/trunk/qpid/specs/management-schema.xml Wed Sep  3 11:01:44 2008
@@ -61,18 +61,16 @@
   ===============================================================
   -->
   <class name="Broker">
-    <property name="systemRef"            type="objId"  references="System" access="RC" index="y" desc="System ID" parentRef="y"/>
-    <property name="port"                 type="uint16" access="RC" index="y" desc="TCP Port for AMQP Service"/>
-    <property name="workerThreads"        type="uint16" access="RO" desc="Thread pool size"/>
-    <property name="maxConns"             type="uint16" access="RO" desc="Maximum allowed connections"/>
-    <property name="connBacklog"          type="uint16" access="RO" desc="Connection backlog limit for listening socket"/>
-    <property name="stagingThreshold"     type="uint32" access="RO" desc="Broker stages messages over this size to disk"/>
-    <property name="mgmtPubInterval"      type="uint16" access="RW" unit="second" min="1" desc="Interval for management broadcasts"/>
-    <property name="clusterName"          type="sstr"   access="RO"
-                   desc="Name of cluster this server is a member of"/>
-    <property name="version"              type="sstr"   access="RO" desc="Running software version"/>
-    <property name="dataDirEnabled"       type="bool"   access="RO" desc="Persistent configuration storage enabled"/>
-    <property name="dataDir"              type="sstr"   access="RO" desc="Persistent configuration storage location"/>
+    <property name="systemRef"        type="objId"  references="System" access="RC" index="y" desc="System ID" parentRef="y"/>
+    <property name="port"             type="uint16" access="RC" index="y" desc="TCP Port for AMQP Service"/>
+    <property name="workerThreads"    type="uint16" access="RO" desc="Thread pool size"/>
+    <property name="maxConns"         type="uint16" access="RO" desc="Maximum allowed connections"/>
+    <property name="connBacklog"      type="uint16" access="RO" desc="Connection backlog limit for listening socket"/>
+    <property name="stagingThreshold" type="uint32" access="RO" desc="Broker stages messages over this size to disk"/>
+    <property name="mgmtPubInterval"  type="uint16" access="RW" unit="second" min="1" desc="Interval for management broadcasts"/>
+    <property name="clusterName"      type="sstr"   access="RO" desc="Name of cluster this server is a member of"/>
+    <property name="version"          type="sstr"   access="RO" desc="Running software version"/>
+    <property name="dataDir"          type="sstr"   access="RO" optional="y" desc="Persistent configuration storage location"/>
 
     <method name="joinCluster">
       <arg name="clusterName" dir="I" type="sstr"/>
@@ -94,6 +92,17 @@
       <arg name="username"      dir="I" type="sstr"/>
       <arg name="password"      dir="I" type="sstr"/>
     </method>
+
+    <event name="agentConnect" desc="QMF Management Agent has connected to the broker">
+      <arg name="remoteAddress" type="sstr"/>
+      <arg name="label"         type="sstr"/>
+      <arg name="brokerBank"    type="uint32"/>
+      <arg name="agentBank"     type="uint32"/>
+    </event>
+
+    <event name="agentDisconnect" desc="QMF Management Agent has disconnected from the broker">
+      <arg name="remoteAddress" type="sstr"/>
+    </event>
   </class>
 
   <!--

Modified: incubator/qpid/trunk/qpid/specs/management-types.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/specs/management-types.xml?rev=691700&r1=691699&r2=691700&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/specs/management-types.xml (original)
+++ incubator/qpid/trunk/qpid/specs/management-types.xml Wed Sep  3 11:01:44 2008
@@ -19,7 +19,7 @@
   under the License.
 -->
 
-<type name="objId"     base="REF"       cpp="uint64_t"      encode="@.putLongLong(#)"    decode="# = @.getLongLong()"  accessor="direct" init="0"/>
+<type name="objId"     base="REF"       cpp="ObjectId"      encode="#.encode(@)"         decode="#.decode(@)"          accessor="direct" init="0"/>
 <type name="uint8"     base="U8"        cpp="uint8_t"       encode="@.putOctet(#)"       decode="# = @.getOctet()"     accessor="direct" init="0"/>
 <type name="uint16"    base="U16"       cpp="uint16_t"      encode="@.putShort(#)"       decode="# = @.getShort()"     accessor="direct" init="0"/>
 <type name="uint32"    base="U32"       cpp="uint32_t"      encode="@.putLong(#)"        decode="# = @.getLong()"      accessor="direct" init="0"/>