You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2008/10/07 23:47:36 UTC

svn commit: r702651 [2/2] - in /incubator/qpid/trunk/qpid: cpp/examples/qmf-agent/ cpp/managementgen/ cpp/managementgen/qmf/ cpp/managementgen/qmf/templates/ cpp/src/qpid/acl/ cpp/src/qpid/agent/ cpp/src/qpid/broker/ cpp/src/qpid/framing/ cpp/src/qpid/...

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp?rev=702651&r1=702650&r2=702651&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp Tue Oct  7 14:47:35 2008
@@ -179,14 +179,24 @@
     dExchange = _dexchange;
 }
 
-void ManagementBroker::RegisterClass (string   packageName,
-                                      string   className,
+void ManagementBroker::registerClass (string&  packageName,
+                                      string&  className,
                                       uint8_t* md5Sum,
                                       ManagementObject::writeSchemaCall_t schemaCall)
 {
     Mutex::ScopedLock lock(userLock);
-    PackageMap::iterator pIter = FindOrAddPackageLH(packageName);
-    AddClass(pIter, className, md5Sum, schemaCall);
+    PackageMap::iterator pIter = findOrAddPackageLH(packageName);
+    addClassLH(ManagementItem::CLASS_KIND_TABLE, pIter, className, md5Sum, schemaCall);
+}
+
+void ManagementBroker::registerEvent (string&  packageName,
+                                      string&  eventName,
+                                      uint8_t* md5Sum,
+                                      ManagementObject::writeSchemaCall_t schemaCall)
+{
+    Mutex::ScopedLock lock(userLock);
+    PackageMap::iterator pIter = findOrAddPackageLH(packageName);
+    addClassLH(ManagementItem::CLASS_KIND_EVENT, pIter, eventName, md5Sum, schemaCall);
 }
 
 ObjectId ManagementBroker::addObject (ManagementObject* object,
@@ -211,6 +221,23 @@
     return objId;
 }
 
+void ManagementBroker::raiseEvent(const ManagementEvent& event)
+{
+    Mutex::ScopedLock lock (userLock);
+    Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE);
+    uint32_t outLen;
+
+    encodeHeader(outBuffer, 'e');
+    outBuffer.putShortString(event.getPackageName());
+    outBuffer.putShortString(event.getEventName());
+    outBuffer.putBin128(event.getMd5Sum());
+    outBuffer.putLongLong(uint64_t(Duration(now())));
+    event.encode(outBuffer);
+    outLen = MA_BUFFER_SIZE - outBuffer.available();
+    outBuffer.reset();
+    sendBuffer(outBuffer, outLen, mExchange, "mgmt.event");
+}
+
 ManagementBroker::Periodic::Periodic (ManagementBroker& _broker, uint32_t _seconds)
     : TimerTask (qpid::sys::Duration ((_seconds ? _seconds : 1) * qpid::sys::TIME_SEC)), broker(_broker) {}
 
@@ -219,7 +246,7 @@
 void ManagementBroker::Periodic::fire ()
 {
     broker.timer.add (intrusive_ptr<TimerTask> (new Periodic (broker, broker.interval)));
-    broker.PeriodicProcessing ();
+    broker.periodicProcessing ();
 }
 
 void ManagementBroker::clientAdded (void)
@@ -233,35 +260,35 @@
         Buffer   outBuffer (outputBuffer, MA_BUFFER_SIZE);
         uint32_t outLen;
 
-        EncodeHeader (outBuffer, 'x');
+        encodeHeader (outBuffer, 'x');
         outLen = MA_BUFFER_SIZE - outBuffer.available ();
         outBuffer.reset ();
-        SendBuffer (outBuffer, outLen, dExchange, aIter->second->routingKey);
+        sendBuffer (outBuffer, outLen, dExchange, aIter->second->routingKey);
     }
 }
 
-void ManagementBroker::EncodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq)
+void ManagementBroker::encodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq)
 {
     buf.putOctet ('A');
     buf.putOctet ('M');
-    buf.putOctet ('1');
+    buf.putOctet ('2');
     buf.putOctet (opcode);
     buf.putLong  (seq);
 }
 
-bool ManagementBroker::CheckHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq)
+bool ManagementBroker::checkHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq)
 {
-    uint8_t h1 = buf.getOctet ();
-    uint8_t h2 = buf.getOctet ();
-    uint8_t h3 = buf.getOctet ();
+    uint8_t h1 = buf.getOctet();
+    uint8_t h2 = buf.getOctet();
+    uint8_t h3 = buf.getOctet();
 
-    *opcode = buf.getOctet ();
-    *seq    = buf.getLong  ();
+    *opcode = buf.getOctet();
+    *seq    = buf.getLong();
 
-    return h1 == 'A' && h2 == 'M' && h3 == '1';
+    return h1 == 'A' && h2 == 'M' && h3 == '2';
 }
 
-void ManagementBroker::SendBuffer (Buffer&  buf,
+void ManagementBroker::sendBuffer (Buffer&  buf,
                                    uint32_t length,
                                    qpid::broker::Exchange::shared_ptr exchange,
                                    string   routingKey)
@@ -304,7 +331,7 @@
     newManagementObjects.clear();
 }
 
-void ManagementBroker::PeriodicProcessing (void)
+void ManagementBroker::periodicProcessing (void)
 {
 #define BUFSIZE   65536
     Mutex::ScopedLock lock (userLock);
@@ -315,13 +342,13 @@
 
     {
         Buffer msgBuffer(msgChars, BUFSIZE);
-        EncodeHeader(msgBuffer, 'h');
+        encodeHeader(msgBuffer, 'h');
         msgBuffer.putLongLong(uint64_t(Duration(now())));
 
         contentSize = BUFSIZE - msgBuffer.available ();
         msgBuffer.reset ();
         routingKey = "mgmt." + uuid.str() + ".heartbeat";
-        SendBuffer (msgBuffer, contentSize, mExchange, routingKey);
+        sendBuffer (msgBuffer, contentSize, mExchange, routingKey);
     }
 
     moveNewObjectsLH();
@@ -350,25 +377,25 @@
         if (object->getConfigChanged () || object->isDeleted ())
         {
             Buffer msgBuffer (msgChars, BUFSIZE);
-            EncodeHeader (msgBuffer, 'c');
+            encodeHeader (msgBuffer, 'c');
             object->writeProperties(msgBuffer);
 
             contentSize = BUFSIZE - msgBuffer.available ();
             msgBuffer.reset ();
             routingKey = "mgmt." + uuid.str() + ".prop." + object->getClassName ();
-            SendBuffer (msgBuffer, contentSize, mExchange, routingKey);
+            sendBuffer (msgBuffer, contentSize, mExchange, routingKey);
         }
         
         if (object->getInstChanged ())
         {
             Buffer msgBuffer (msgChars, BUFSIZE);
-            EncodeHeader (msgBuffer, 'i');
+            encodeHeader (msgBuffer, 'i');
             object->writeStatistics(msgBuffer);
 
             contentSize = BUFSIZE - msgBuffer.available ();
             msgBuffer.reset ();
             routingKey = "mgmt." + uuid.str () + ".stat." + object->getClassName ();
-            SendBuffer (msgBuffer, contentSize, mExchange, routingKey);
+            sendBuffer (msgBuffer, contentSize, mExchange, routingKey);
         }
 
         if (object->isDeleted ())
@@ -393,12 +420,12 @@
     Buffer   outBuffer (outputBuffer, MA_BUFFER_SIZE);
     uint32_t outLen;
 
-    EncodeHeader (outBuffer, 'z', sequence);
+    encodeHeader (outBuffer, 'z', sequence);
     outBuffer.putLong (code);
     outBuffer.putShortString (text);
     outLen = MA_BUFFER_SIZE - outBuffer.available ();
     outBuffer.reset ();
-    SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+    sendBuffer (outBuffer, outLen, dExchange, replyToKey);
 }
 
 bool ManagementBroker::dispatchCommand (Deliverable&      deliverable,
@@ -411,7 +438,7 @@
     // Parse the routing key.  This management broker should act as though it
     // is bound to the exchange to match the following keys:
     //
-    //    agent.0.#
+    //    agent.1.0.#
     //    broker
 
     if (routingKey == "broker") {
@@ -419,12 +446,12 @@
         return false;
     }
 
-    else if (routingKey.compare(0, 7, "agent.0") == 0) {
+    else if (routingKey.compare(0, 9, "agent.1.0") == 0) {
         dispatchAgentCommandLH(msg);
         return false;
     }
 
-    else if (routingKey.compare(0, 6, "agent.") == 0) {
+    else if (routingKey.compare(0, 8, "agent.1.") == 0) {
         return authorizeAgentMessageLH(msg);
     }
 
@@ -447,7 +474,7 @@
     inBuffer.getShortString(className);
     inBuffer.getBin128(hash);
     inBuffer.getShortString(methodName);
-    EncodeHeader(outBuffer, 'm', sequence);
+    encodeHeader(outBuffer, 'm', sequence);
 
     if (acl != 0) {
         string userId = ((const qpid::broker::ConnectionState*) connToken)->getUserId();
@@ -460,7 +487,7 @@
             outBuffer.putShortString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN));
             outLen = MA_BUFFER_SIZE - outBuffer.available();
             outBuffer.reset();
-            SendBuffer(outBuffer, outLen, dExchange, replyToKey);
+            sendBuffer(outBuffer, outLen, dExchange, replyToKey);
             return;
         }
     }
@@ -476,12 +503,19 @@
             outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_INVALID_PARAMETER));
         }
         else
-            iter->second->doMethod(methodName, inBuffer, outBuffer);
+            try {
+                outBuffer.record();
+                iter->second->doMethod(methodName, inBuffer, outBuffer);
+            } catch(std::exception& e) {
+                outBuffer.restore();
+                outBuffer.putLong(Manageable::STATUS_EXCEPTION);
+                outBuffer.putShortString(e.what());
+            }
     }
 
     outLen = MA_BUFFER_SIZE - outBuffer.available();
     outBuffer.reset();
-    SendBuffer(outBuffer, outLen, dExchange, replyToKey);
+    sendBuffer(outBuffer, outLen, dExchange, replyToKey);
 }
 
 void ManagementBroker::handleBrokerRequestLH (Buffer&, string replyToKey, uint32_t sequence)
@@ -489,12 +523,12 @@
     Buffer   outBuffer (outputBuffer, MA_BUFFER_SIZE);
     uint32_t outLen;
 
-    EncodeHeader (outBuffer, 'b', sequence);
+    encodeHeader (outBuffer, 'b', sequence);
     uuid.encode  (outBuffer);
 
     outLen = MA_BUFFER_SIZE - outBuffer.available ();
     outBuffer.reset ();
-    SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+    sendBuffer (outBuffer, outLen, dExchange, replyToKey);
 }
 
 void ManagementBroker::handlePackageQueryLH (Buffer&, string replyToKey, uint32_t sequence)
@@ -506,11 +540,11 @@
         Buffer   outBuffer (outputBuffer, MA_BUFFER_SIZE);
         uint32_t outLen;
 
-        EncodeHeader (outBuffer, 'p', sequence);
-        EncodePackageIndication (outBuffer, pIter);
+        encodeHeader (outBuffer, 'p', sequence);
+        encodePackageIndication (outBuffer, pIter);
         outLen = MA_BUFFER_SIZE - outBuffer.available ();
         outBuffer.reset ();
-        SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+        sendBuffer (outBuffer, outLen, dExchange, replyToKey);
     }
 
     sendCommandComplete (replyToKey, sequence);
@@ -521,7 +555,7 @@
     std::string packageName;
 
     inBuffer.getShortString(packageName);
-    FindOrAddPackageLH(packageName);
+    findOrAddPackageLH(packageName);
 }
 
 void ManagementBroker::handleClassQueryLH(Buffer& inBuffer, string replyToKey, uint32_t sequence)
@@ -542,11 +576,11 @@
                 Buffer   outBuffer(outputBuffer, MA_BUFFER_SIZE);
                 uint32_t outLen;
 
-                EncodeHeader(outBuffer, 'q', sequence);
-                EncodeClassIndication(outBuffer, pIter, cIter);
+                encodeHeader(outBuffer, 'q', sequence);
+                encodeClassIndication(outBuffer, pIter, cIter);
                 outLen = MA_BUFFER_SIZE - outBuffer.available();
                 outBuffer.reset();
-                SendBuffer(outBuffer, outLen, dExchange, replyToKey);
+                sendBuffer(outBuffer, outLen, dExchange, replyToKey);
             }
         }
     }
@@ -558,26 +592,27 @@
     std::string packageName;
     SchemaClassKey key;
 
+    uint8_t kind = inBuffer.getOctet();
     inBuffer.getShortString(packageName);
     inBuffer.getShortString(key.name);
     inBuffer.getBin128(key.hash);
 
-    PackageMap::iterator pIter = FindOrAddPackageLH(packageName);
+    PackageMap::iterator pIter = findOrAddPackageLH(packageName);
     ClassMap::iterator   cIter = pIter->second.find(key);
     if (cIter == pIter->second.end()) {
         Buffer   outBuffer (outputBuffer, MA_BUFFER_SIZE);
         uint32_t outLen;
         uint32_t sequence = nextRequestSequence++;
 
-        EncodeHeader (outBuffer, 'S', sequence);
+        encodeHeader (outBuffer, 'S', sequence);
         outBuffer.putShortString(packageName);
         outBuffer.putShortString(key.name);
         outBuffer.putBin128(key.hash);
         outLen = MA_BUFFER_SIZE - outBuffer.available ();
         outBuffer.reset ();
-        SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+        sendBuffer (outBuffer, outLen, dExchange, replyToKey);
 
-        pIter->second.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(sequence)));
+        pIter->second.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(kind, sequence)));
     }
 }
 
@@ -612,11 +647,11 @@
             SchemaClass& classInfo = cIter->second;
 
             if (classInfo.hasSchema()) {
-                EncodeHeader(outBuffer, 's', sequence);
+                encodeHeader(outBuffer, 's', sequence);
                 classInfo.appendSchema(outBuffer);
                 outLen = MA_BUFFER_SIZE - outBuffer.available();
                 outBuffer.reset();
-                SendBuffer(outBuffer, outLen, dExchange, replyToKey);
+                sendBuffer(outBuffer, outLen, dExchange, replyToKey);
             }
             else
                 sendCommandComplete(replyToKey, sequence, 1, "Schema not available");
@@ -634,9 +669,10 @@
     SchemaClassKey key;
 
     inBuffer.record();
-    inBuffer.getShortString (packageName);
-    inBuffer.getShortString (key.name);
-    inBuffer.getBin128      (key.hash);
+    inBuffer.getOctet();
+    inBuffer.getShortString(packageName);
+    inBuffer.getShortString(key.name);
+    inBuffer.getBin128(key.hash);
     inBuffer.restore();
 
     PackageMap::iterator pIter = packages.find(packageName);
@@ -644,7 +680,7 @@
         ClassMap& cMap = pIter->second;
         ClassMap::iterator cIter = cMap.find(key);
         if (cIter != cMap.end() && cIter->second.pendingSequence == sequence) {
-            size_t length = ValidateSchema(inBuffer);
+            size_t length = validateSchema(inBuffer, cIter->second.kind);
             if (length == 0) {
                 QPID_LOG(warning, "Management Broker received invalid schema response: " << packageName << "." << key.name);
                 cMap.erase(key);
@@ -658,11 +694,11 @@
                 Buffer   outBuffer(outputBuffer, MA_BUFFER_SIZE);
                 uint32_t outLen;
 
-                EncodeHeader(outBuffer, 'q');
-                EncodeClassIndication(outBuffer, pIter, cIter);
+                encodeHeader(outBuffer, 'q');
+                encodeClassIndication(outBuffer, pIter, cIter);
                 outLen = MA_BUFFER_SIZE - outBuffer.available();
                 outBuffer.reset();
-                SendBuffer(outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema");
+                sendBuffer(outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema");
             }
         }
     }
@@ -727,7 +763,7 @@
 void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence, const ConnectionToken* connToken)
 {
     string   label;
-    uint32_t requestedBank;
+    uint32_t requestedBrokerBank, requestedAgentBank;
     uint32_t assignedBank;
     ObjectId connectionRef = ((const ConnectionState*) connToken)->GetManagementObject()->getObjectId();
     Uuid     systemId;
@@ -737,14 +773,15 @@
     RemoteAgentMap::iterator aIter = remoteAgents.find(connectionRef);
     if (aIter != remoteAgents.end()) {
         // There already exists an agent on this session.  Reject the request.
-        sendCommandComplete (replyToKey, sequence, 1, "Connection already has remote agent");
+        sendCommandComplete(replyToKey, sequence, 1, "Connection already has remote agent");
         return;
     }
 
-    inBuffer.getShortString (label);
-    systemId.decode  (inBuffer);
-    requestedBank = inBuffer.getLong ();
-    assignedBank  = assignBankLH (requestedBank);
+    inBuffer.getShortString(label);
+    systemId.decode(inBuffer);
+    requestedBrokerBank = inBuffer.getLong();
+    requestedAgentBank  = inBuffer.getLong();
+    assignedBank = assignBankLH(requestedAgentBank);
 
     RemoteAgent* agent = new RemoteAgent;
     agent->objIdBank  = assignedBank;
@@ -755,7 +792,8 @@
     agent->mgmtObject->set_label        (label);
     agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId());
     agent->mgmtObject->set_systemId     (systemId);
-    agent->mgmtObject->set_objectIdBank (assignedBank);
+    agent->mgmtObject->set_brokerBank   (brokerBank);
+    agent->mgmtObject->set_agentBank    (assignedBank);
     addObject (agent->mgmtObject);
 
     remoteAgents[connectionRef] = agent;
@@ -764,12 +802,12 @@
     Buffer   outBuffer (outputBuffer, MA_BUFFER_SIZE);
     uint32_t outLen;
 
-    EncodeHeader (outBuffer, 'a', sequence);
+    encodeHeader (outBuffer, 'a', sequence);
     outBuffer.putLong (brokerBank);
     outBuffer.putLong (assignedBank);
     outLen = MA_BUFFER_SIZE - outBuffer.available ();
     outBuffer.reset ();
-    SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+    sendBuffer (outBuffer, outLen, dExchange, replyToKey);
 }
 
 void ManagementBroker::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uint32_t sequence)
@@ -799,12 +837,12 @@
             Buffer   outBuffer (outputBuffer, MA_BUFFER_SIZE);
             uint32_t outLen;
 
-            EncodeHeader (outBuffer, 'g', sequence);
+            encodeHeader (outBuffer, 'g', sequence);
             object->writeProperties(outBuffer);
             object->writeStatistics(outBuffer, true);
             outLen = MA_BUFFER_SIZE - outBuffer.available ();
             outBuffer.reset ();
-            SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+            sendBuffer (outBuffer, outLen, dExchange, replyToKey);
         }
     }
 
@@ -824,7 +862,7 @@
     msg.encodeContent(inBuffer);
     inBuffer.reset();
 
-    if (!CheckHeader(inBuffer, &opcode, &sequence))
+    if (!checkHeader(inBuffer, &opcode, &sequence))
         return false;
 
     if (opcode == 'M') {
@@ -861,12 +899,12 @@
             Buffer   outBuffer(outputBuffer, MA_BUFFER_SIZE);
             uint32_t outLen;
 
-            EncodeHeader(outBuffer, 'm', sequence);
+            encodeHeader(outBuffer, 'm', sequence);
             outBuffer.putLong(Manageable::STATUS_FORBIDDEN);
             outBuffer.putShortString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN));
             outLen = MA_BUFFER_SIZE - outBuffer.available();
             outBuffer.reset();
-            SendBuffer(outBuffer, outLen, dExchange, replyToKey);
+            sendBuffer(outBuffer, outLen, dExchange, replyToKey);
         }
 
         return false;
@@ -900,7 +938,7 @@
     msg.encodeContent(inBuffer);
     inBuffer.reset();
 
-    if (!CheckHeader(inBuffer, &opcode, &sequence))
+    if (!checkHeader(inBuffer, &opcode, &sequence))
         return;
 
     if      (opcode == 'B') handleBrokerRequestLH  (inBuffer, replyToKey, sequence);
@@ -915,7 +953,7 @@
     else if (opcode == 'M') handleMethodRequestLH  (inBuffer, replyToKey, sequence, msg.getPublisher());
 }
 
-ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackageLH(std::string name)
+ManagementBroker::PackageMap::iterator ManagementBroker::findOrAddPackageLH(std::string name)
 {
     PackageMap::iterator pIter = packages.find (name);
     if (pIter != packages.end ())
@@ -930,19 +968,20 @@
     Buffer   outBuffer (outputBuffer, MA_BUFFER_SIZE);
     uint32_t outLen;
 
-    EncodeHeader (outBuffer, 'p');
-    EncodePackageIndication (outBuffer, result.first);
+    encodeHeader (outBuffer, 'p');
+    encodePackageIndication (outBuffer, result.first);
     outLen = MA_BUFFER_SIZE - outBuffer.available ();
     outBuffer.reset ();
-    SendBuffer (outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema.package");
+    sendBuffer (outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema.package");
 
     return result.first;
 }
 
-void ManagementBroker::AddClass(PackageMap::iterator  pIter,
-                                string                className,
-                                uint8_t*              md5Sum,
-                                ManagementObject::writeSchemaCall_t schemaCall)
+void ManagementBroker::addClassLH(uint8_t               kind,
+                                  PackageMap::iterator  pIter,
+                                  string&               className,
+                                  uint8_t*              md5Sum,
+                                  ManagementObject::writeSchemaCall_t schemaCall)
 {
     SchemaClassKey key;
     ClassMap&      cMap = pIter->second;
@@ -958,71 +997,76 @@
     QPID_LOG (debug, "ManagementBroker added class " << pIter->first << "." <<
               key.name);
 
-    cMap.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(schemaCall)));
+    cMap.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(kind, schemaCall)));
     cIter = cMap.find(key);
 }
 
-void ManagementBroker::EncodePackageIndication (Buffer&              buf,
-                                                PackageMap::iterator pIter)
+void ManagementBroker::encodePackageIndication(Buffer&              buf,
+                                               PackageMap::iterator pIter)
 {
-    buf.putShortString ((*pIter).first);
+    buf.putShortString((*pIter).first);
 }
 
-void ManagementBroker::EncodeClassIndication (Buffer&              buf,
-                                              PackageMap::iterator pIter,
-                                              ClassMap::iterator   cIter)
+void ManagementBroker::encodeClassIndication(Buffer&              buf,
+                                             PackageMap::iterator pIter,
+                                             ClassMap::iterator   cIter)
 {
     SchemaClassKey key = (*cIter).first;
 
-    buf.putShortString ((*pIter).first);
-    buf.putShortString (key.name);
-    buf.putBin128      (key.hash);
+    buf.putOctet((*cIter).second.kind);
+    buf.putShortString((*pIter).first);
+    buf.putShortString(key.name);
+    buf.putBin128(key.hash);
 }
 
-size_t ManagementBroker::ValidateSchema(Buffer& inBuffer)
+size_t ManagementBroker::validateSchema(Buffer& inBuffer, uint8_t kind)
+{
+    if      (kind == ManagementItem::CLASS_KIND_TABLE)
+        return validateTableSchema(inBuffer);
+    else if (kind == ManagementItem::CLASS_KIND_EVENT)
+        return validateEventSchema(inBuffer);
+    return 0;
+}
+
+size_t ManagementBroker::validateTableSchema(Buffer& inBuffer)
 {
     uint32_t start = inBuffer.getPosition();
     uint32_t end;
     string   text;
     uint8_t  hash[16];
 
-    inBuffer.record();
-    inBuffer.getShortString(text);
-    inBuffer.getShortString(text);
-    inBuffer.getBin128(hash);
-
-    uint16_t propCount = inBuffer.getShort();
-    uint16_t statCount = inBuffer.getShort();
-    uint16_t methCount = inBuffer.getShort();
-    uint16_t evntCount = inBuffer.getShort();
-
-    for (uint16_t idx = 0; idx < propCount + statCount; idx++) {
-        FieldTable ft;
-        ft.decode(inBuffer);
-    }
-
-    for (uint16_t idx = 0; idx < methCount; idx++) {
-        FieldTable ft;
-        ft.decode(inBuffer);
-        if (!ft.isSet("argCount"))
+    try {
+        inBuffer.record();
+        uint8_t kind = inBuffer.getOctet();
+        if (kind != ManagementItem::CLASS_KIND_TABLE)
             return 0;
-        int argCount = ft.getInt("argCount");
-        for (int mIdx = 0; mIdx < argCount; mIdx++) {
-            FieldTable aft;
-            aft.decode(inBuffer);
-        }
-    }
 
-    for (uint16_t idx = 0; idx < evntCount; idx++) {
-        FieldTable ft;
-        ft.decode(inBuffer);
-        if (!ft.isSet("argCount"))
-            return 0;
-        int argCount = ft.getInt("argCount");
-        for (int mIdx = 0; mIdx < argCount; mIdx++) {
-            FieldTable aft;
-            aft.decode(inBuffer);
+        inBuffer.getShortString(text);
+        inBuffer.getShortString(text);
+        inBuffer.getBin128(hash);
+
+        uint16_t propCount = inBuffer.getShort();
+        uint16_t statCount = inBuffer.getShort();
+        uint16_t methCount = inBuffer.getShort();
+
+        for (uint16_t idx = 0; idx < propCount + statCount; idx++) {
+            FieldTable ft;
+            ft.decode(inBuffer);
+        }
+
+        for (uint16_t idx = 0; idx < methCount; idx++) {
+            FieldTable ft;
+            ft.decode(inBuffer);
+            if (!ft.isSet("argCount"))
+                return 0;
+            int argCount = ft.getInt("argCount");
+            for (int mIdx = 0; mIdx < argCount; mIdx++) {
+                FieldTable aft;
+                aft.decode(inBuffer);
+            }
         }
+    } catch (std::exception& e) {
+        return 0;
     }
 
     end = inBuffer.getPosition();
@@ -1030,24 +1074,34 @@
     return end - start;
 }
 
-Mutex& ManagementBroker::getMutex()
+size_t ManagementBroker::validateEventSchema(Buffer& inBuffer)
 {
-    return userLock;
-}
+    uint32_t start = inBuffer.getPosition();
+    uint32_t end;
+    string   text;
+    uint8_t  hash[16];
 
-Buffer* ManagementBroker::startEventLH()
-{
-    Buffer* outBuffer(new Buffer(eventBuffer, MA_BUFFER_SIZE));
-    EncodeHeader(*outBuffer, 'e');
-    outBuffer->putLongLong(uint64_t(Duration(now())));
-    return outBuffer;
-}
+    try {
+        inBuffer.record();
+        uint8_t kind = inBuffer.getOctet();
+        if (kind != ManagementItem::CLASS_KIND_EVENT)
+            return 0;
 
-void ManagementBroker::finishEventLH(Buffer* outBuffer)
-{
-    uint32_t outLen = MA_BUFFER_SIZE - outBuffer->available();
-    outBuffer->reset();
-    SendBuffer(*outBuffer, outLen, mExchange, "mgmt.event");
-    delete outBuffer;
-}
+        inBuffer.getShortString(text);
+        inBuffer.getShortString(text);
+        inBuffer.getBin128(hash);
+
+        uint16_t argCount = inBuffer.getShort();
 
+        for (uint16_t idx = 0; idx < argCount; idx++) {
+            FieldTable ft;
+            ft.decode(inBuffer);
+        }
+    } catch (std::exception& e) {
+        return 0;
+    }
+
+    end = inBuffer.getPosition();
+    inBuffer.restore(); // restore original position
+    return end - start;
+}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h?rev=702651&r1=702650&r2=702651&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h Tue Oct  7 14:47:35 2008
@@ -38,11 +38,11 @@
 
 class ManagementBroker : public ManagementAgent
 {
-  private:
+private:
 
     int threadPoolSize;
 
-  public:
+public:
 
     ManagementBroker ();
     virtual ~ManagementBroker ();
@@ -52,13 +52,18 @@
     void setExchange     (qpid::broker::Exchange::shared_ptr mgmtExchange,
                           qpid::broker::Exchange::shared_ptr directExchange);
     int  getMaxThreads   () { return threadPoolSize; }
-    void RegisterClass   (std::string packageName,
-                          std::string className,
+    void registerClass   (std::string& packageName,
+                          std::string& className,
+                          uint8_t*    md5Sum,
+                          ManagementObject::writeSchemaCall_t schemaCall);
+    void registerEvent   (std::string& packageName,
+                          std::string& eventName,
                           uint8_t*    md5Sum,
                           ManagementObject::writeSchemaCall_t schemaCall);
     ObjectId addObject   (ManagementObject* object,
                           uint64_t          persistId = 0);
-    void clientAdded     (void);
+    void raiseEvent(const ManagementEvent& event);
+    void clientAdded     ();
     bool dispatchCommand (qpid::broker::Deliverable&       msg,
                           const std::string&         routingKey,
                           const framing::FieldTable* args);
@@ -68,7 +73,7 @@
     uint32_t pollCallbacks (uint32_t) { assert(0); return 0; }
     int getSignalFd () { assert(0); return -1; }
 
-  private:
+private:
     friend class ManagementAgent;
 
     struct Periodic : public qpid::broker::TimerTask
@@ -127,15 +132,16 @@
 
     struct SchemaClass
     {
+        uint8_t  kind;
         ManagementObject::writeSchemaCall_t writeSchemaCall;
         uint32_t pendingSequence;
         size_t   bufferLen;
         uint8_t* buffer;
 
-        SchemaClass(uint32_t seq) :
-            writeSchemaCall(0), pendingSequence(seq), bufferLen(0), buffer(0) {}
-        SchemaClass(ManagementObject::writeSchemaCall_t call) :
-            writeSchemaCall(call), pendingSequence(0), bufferLen(0), buffer(0) {}
+        SchemaClass(uint8_t _kind, uint32_t seq) :
+            kind(_kind), writeSchemaCall(0), pendingSequence(seq), bufferLen(0), buffer(0) {}
+        SchemaClass(uint8_t _kind, ManagementObject::writeSchemaCall_t call) :
+            kind(_kind), writeSchemaCall(call), pendingSequence(0), bufferLen(0), buffer(0) {}
         bool hasSchema () { return (writeSchemaCall != 0) || (buffer != 0); }
         void appendSchema (framing::Buffer& buf);
     };
@@ -154,12 +160,12 @@
     framing::Uuid                uuid;
     sys::Mutex                   addLock;
     sys::Mutex                   userLock;
-    qpid::broker::Timer                timer;
+    qpid::broker::Timer          timer;
     qpid::broker::Exchange::shared_ptr mExchange;
     qpid::broker::Exchange::shared_ptr dExchange;
     std::string                  dataDir;
     uint16_t                     interval;
-    qpid::broker::Broker*              broker;
+    qpid::broker::Broker*        broker;
     uint16_t                     bootSequence;
     uint32_t                     nextObjectId;
     uint32_t                     brokerBank;
@@ -173,10 +179,10 @@
     char eventBuffer[MA_BUFFER_SIZE];
 
     void writeData ();
-    void PeriodicProcessing (void);
-    void EncodeHeader       (framing::Buffer& buf, uint8_t  opcode, uint32_t  seq = 0);
-    bool CheckHeader        (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
-    void SendBuffer         (framing::Buffer&             buf,
+    void periodicProcessing (void);
+    void encodeHeader       (framing::Buffer& buf, uint8_t  opcode, uint32_t  seq = 0);
+    bool checkHeader        (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
+    void sendBuffer         (framing::Buffer&             buf,
                              uint32_t                     length,
                              qpid::broker::Exchange::shared_ptr exchange,
                              std::string                  routingKey);
@@ -185,14 +191,15 @@
     bool authorizeAgentMessageLH(qpid::broker::Message& msg);
     void dispatchAgentCommandLH(qpid::broker::Message& msg);
 
-    PackageMap::iterator FindOrAddPackageLH(std::string name);
-    void AddClass(PackageMap::iterator         pIter,
-                  std::string                  className,
-                  uint8_t*                     md5Sum,
-                  ManagementObject::writeSchemaCall_t schemaCall);
-    void EncodePackageIndication (framing::Buffer&     buf,
+    PackageMap::iterator findOrAddPackageLH(std::string name);
+    void addClassLH(uint8_t                      kind,
+                    PackageMap::iterator         pIter,
+                    std::string&                 className,
+                    uint8_t*                     md5Sum,
+                    ManagementObject::writeSchemaCall_t schemaCall);
+    void encodePackageIndication (framing::Buffer&     buf,
                                   PackageMap::iterator pIter);
-    void EncodeClassIndication (framing::Buffer&     buf,
+    void encodeClassIndication (framing::Buffer&     buf,
                                 PackageMap::iterator pIter,
                                 ClassMap::iterator   cIter);
     bool     bankInUse (uint32_t bank);
@@ -212,10 +219,9 @@
     void handleGetQueryLH       (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
     void handleMethodRequestLH  (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken);
 
-    size_t ValidateSchema(framing::Buffer&);
-    sys::Mutex& getMutex();
-    framing::Buffer* startEventLH();
-    void finishEventLH(framing::Buffer* outBuffer);
+    size_t validateSchema(framing::Buffer&, uint8_t kind);
+    size_t validateTableSchema(framing::Buffer&);
+    size_t validateEventSchema(framing::Buffer&);
 };
 
 }}

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementEvent.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementEvent.h?rev=702651&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementEvent.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementEvent.h Tue Oct  7 14:47:35 2008
@@ -0,0 +1,48 @@
+#ifndef _ManagementEvent_
+#define _ManagementEvent_
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ManagementObject.h"
+#include <qpid/framing/Buffer.h>
+#include <string>
+
+namespace qpid {
+namespace management {
+
+class ManagementAgent;
+
+class ManagementEvent : public ManagementItem {
+public:
+    typedef void (*writeSchemaCall_t)(qpid::framing::Buffer&);
+    virtual ~ManagementEvent() {}
+
+    virtual writeSchemaCall_t getWriteSchemaCall(void) = 0;
+    virtual std::string& getEventName() const = 0;
+    virtual std::string& getPackageName() const = 0;
+    virtual uint8_t* getMd5Sum() const = 0;
+    virtual void encode(qpid::framing::Buffer&) const = 0;
+};
+
+}}
+
+#endif  /*!_ManagementEvent_*/

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.cpp?rev=702651&r1=702650&r2=702651&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.cpp Tue Oct  7 14:47:35 2008
@@ -84,6 +84,21 @@
     second = buffer.getLongLong();
 }
 
+namespace qpid {
+namespace management {
+
+std::ostream& operator<<(std::ostream& out, const ObjectId& i)
+{
+    out << "[" << ((i.first & 0xF000000000000000LL) >> 60) <<
+        "-" << ((i.first & 0x0FFF000000000000LL) >> 48) <<
+        "-" << ((i.first & 0x0000FFFFF0000000LL) >> 32) <<
+        "-" << (i.first & 0x000000000FFFFFFFLL) <<
+        "-" << i.second << "]";
+    return out;
+}
+
+}}
+
 int ManagementObject::nextThreadIndex = 0;
 
 void ManagementObject::writeTimestamps (Buffer& buf)
@@ -109,18 +124,3 @@
     }
     return thisIndex;
 }
-
-Mutex& ManagementObject::getMutex()
-{
-    return agent->getMutex();
-}
-
-Buffer* ManagementObject::startEventLH()
-{
-    return agent->startEventLH();
-}
-
-void ManagementObject::finishEventLH(Buffer* buf)
-{
-    agent->finishEventLH(buf);
-}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h?rev=702651&r1=702650&r2=702651&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h Tue Oct  7 14:47:35 2008
@@ -46,7 +46,7 @@
 
 
 class ObjectId {
-private:
+protected:
     const AgentAttachment* agent;
     uint64_t first;
     uint64_t second;
@@ -59,23 +59,11 @@
     bool operator<(const ObjectId &other) const;
     void encode(framing::Buffer& buffer);
     void decode(framing::Buffer& buffer);
+    friend std::ostream& operator<<(std::ostream&, const ObjectId&);
 };
 
-class ManagementObject
-{
-  protected:
-    
-    uint64_t         createTime;
-    uint64_t         destroyTime;
-    ObjectId         objectId;
-    bool             configChanged;
-    bool             instChanged;
-    bool             deleted;
-    Manageable*      coreObject;
-    sys::Mutex       accessLock;
-    ManagementAgent* agent;
-    int              maxThreads;
-
+class ManagementItem {
+public:
     static const uint8_t TYPE_U8        = 1;
     static const uint8_t TYPE_U16       = 2;
     static const uint8_t TYPE_U32       = 3;
@@ -107,15 +95,35 @@
     static const uint8_t FLAG_INDEX  = 0x02;
     static const uint8_t FLAG_END    = 0x80;
 
-    static       int nextThreadIndex;
+    const static uint8_t CLASS_KIND_TABLE = 1;
+    const static uint8_t CLASS_KIND_EVENT = 2;
+
+
+
+public:
+    virtual ~ManagementItem() {}
+};
+
+class ManagementObject : public ManagementItem
+{
+  protected:
+    
+    uint64_t         createTime;
+    uint64_t         destroyTime;
+    ObjectId         objectId;
+    bool             configChanged;
+    bool             instChanged;
+    bool             deleted;
+    Manageable*      coreObject;
+    sys::Mutex       accessLock;
+    ManagementAgent* agent;
+    int              maxThreads;
+
+    static int nextThreadIndex;
         
     int  getThreadIndex();
     void writeTimestamps (qpid::framing::Buffer& buf);
 
-    sys::Mutex& getMutex();
-    framing::Buffer* startEventLH();
-    void finishEventLH(framing::Buffer* buf);
-
   public:
     typedef void (*writeSchemaCall_t) (qpid::framing::Buffer&);
 
@@ -129,14 +137,14 @@
     virtual void writeProperties(qpid::framing::Buffer& buf) = 0;
     virtual void writeStatistics(qpid::framing::Buffer& buf,
                                  bool skipHeaders = false) = 0;
-    virtual void doMethod       (std::string            methodName,
+    virtual void doMethod       (std::string&           methodName,
                                  qpid::framing::Buffer& inBuf,
                                  qpid::framing::Buffer& outBuf) = 0;
     virtual void setReference   (ObjectId objectId);
 
-    virtual std::string& getClassName   (void) = 0;
-    virtual std::string& getPackageName (void) = 0;
-    virtual uint8_t*     getMd5Sum      (void) = 0;
+    virtual std::string& getClassName   (void) const = 0;
+    virtual std::string& getPackageName (void) const = 0;
+    virtual uint8_t*     getMd5Sum      (void) const = 0;
 
     void         setObjectId      (ObjectId oid) { objectId = oid; }
     ObjectId     getObjectId      (void) { return objectId; }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/management/Names.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/management/Names.java?rev=702651&r1=702650&r2=702651&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/management/Names.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/management/Names.java Tue Oct  7 14:47:35 2008
@@ -35,7 +35,7 @@
     String METHOD_REPLY_QUEUE_PREFIX = "reply.";
    
     String AMQ_DIRECT_QUEUE = "amq.direct";
-    String AGENT_ROUTING_KEY = "agent.0";
+    String AGENT_ROUTING_KEY = "agent.1.0";
    
     String BROKER_ROUTING_KEY = "broker";
     
@@ -49,4 +49,4 @@
     String CONFIGURATION_FILE_NAME = "/org/apache/qpid/management/config.xml";
     
     String ARG_COUNT_PARAM_NAME = "argCount";
-}
\ No newline at end of file
+}

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/management/Protocol.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/management/Protocol.java?rev=702651&r1=702650&r2=702651&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/management/Protocol.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/management/Protocol.java Tue Oct  7 14:47:35 2008
@@ -27,8 +27,8 @@
  */
 public interface Protocol
 {
-    String MAGIC_NUMBER = "AM1";
+    String MAGIC_NUMBER = "AM2";
     
     byte [] METHOD_REQUEST_FIRST_FOUR_BYTES = (MAGIC_NUMBER+"M").getBytes();
     byte [] SCHEMA_REQUEST_FIRST_FOUR_BYTES = (MAGIC_NUMBER+"S").getBytes();  
-}
\ No newline at end of file
+}

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/management/domain/handler/impl/SchemaResponseMessageHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/management/domain/handler/impl/SchemaResponseMessageHandler.java?rev=702651&r1=702650&r2=702651&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/management/domain/handler/impl/SchemaResponseMessageHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/management/domain/handler/impl/SchemaResponseMessageHandler.java Tue Oct  7 14:47:35 2008
@@ -49,6 +49,10 @@
     {        
         try 
         {
+            int classKind = decoder.readUint8();
+            if (classKind != 1) {
+                return;
+            }
             String packageName = decoder.readStr8();
             String className = decoder.readStr8();
             
@@ -57,7 +61,7 @@
             int howManyProperties = decoder.readUint16();
             int howManyStatistics = decoder.readUint16();
             int howManyMethods = decoder.readUint16();
-            int howManyEvents = decoder.readUint16();
+            int howManyEvents = 0;
                                     
             // FIXME : Divide between schema error and raw data conversion error!!!!
             _domainModel.addSchema(
@@ -155,4 +159,4 @@
         }        
         return result;
     }
- }
\ No newline at end of file
+ }

Modified: incubator/qpid/trunk/qpid/python/commands/qpid-config
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/commands/qpid-config?rev=702651&r1=702650&r2=702651&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/commands/qpid-config (original)
+++ incubator/qpid/trunk/qpid/python/commands/qpid-config Tue Oct  7 14:47:35 2008
@@ -84,8 +84,8 @@
         self.qmf.delBroker(self.broker)
 
     def Overview (self):
-        exchanges = self.qmf.getObjects(cls="exchange")
-        queues    = self.qmf.getObjects(cls="queue")
+        exchanges = self.qmf.getObjects(_class="exchange")
+        queues    = self.qmf.getObjects(_class="queue")
         print "Total Exchanges: %d" % len (exchanges)
         etype = {}
         for ex in exchanges:
@@ -106,7 +106,7 @@
         print "    non-durable: %d" % (len (queues) - _durable)
 
     def ExchangeList (self, filter):
-        exchanges = self.qmf.getObjects(cls="exchange")
+        exchanges = self.qmf.getObjects(_class="exchange")
         print "Durable   Type      Bindings  Exchange Name"
         print "======================================================="
         for ex in exchanges:
@@ -114,9 +114,9 @@
                 print "%4c      %-10s%5d     %s" % (YN (ex.durable), ex.type, ex.bindingCount, ex.name)
 
     def ExchangeListRecurse (self, filter):
-        exchanges = self.qmf.getObjects(cls="exchange")
-        bindings  = self.qmf.getObjects(cls="binding")
-        queues    = self.qmf.getObjects(cls="queue")
+        exchanges = self.qmf.getObjects(_class="exchange")
+        bindings  = self.qmf.getObjects(_class="binding")
+        queues    = self.qmf.getObjects(_class="queue")
         for ex in exchanges:
             if self.match (ex.name, filter):
                 print "Exchange '%s' (%s)" % (ex.name, ex.type)
@@ -130,8 +130,8 @@
             
 
     def QueueList (self, filter):
-        queues   = self.qmf.getObjects(cls="queue")
-        journals = self.qmf.getObjects(cls="journal")
+        queues   = self.qmf.getObjects(_class="queue")
+        journals = self.qmf.getObjects(_class="journal")
         print "                                      Store Size"
         print "Durable  AutoDel  Excl  Bindings  (files x file pages)  Queue Name"
         print "==========================================================================================="
@@ -151,9 +151,9 @@
                              YN (q.exclusive), q.bindingCount, q.name)
 
     def QueueListRecurse (self, filter):
-        exchanges = self.qmf.getObjects(cls="exchange")
-        bindings  = self.qmf.getObjects(cls="binding")
-        queues    = self.qmf.getObjects(cls="queue")
+        exchanges = self.qmf.getObjects(_class="exchange")
+        bindings  = self.qmf.getObjects(_class="binding")
+        queues    = self.qmf.getObjects(_class="queue")
         for queue in queues:
             if self.match (queue.name, filter):
                 print "Queue '%s'" % queue.name

Modified: incubator/qpid/trunk/qpid/python/commands/qpid-printevents
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/commands/qpid-printevents?rev=702651&r1=702650&r2=702651&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/commands/qpid-printevents (original)
+++ incubator/qpid/trunk/qpid/python/commands/qpid-printevents Tue Oct  7 14:47:35 2008
@@ -30,16 +30,13 @@
   def event(self, broker, event):
     print event
 
-  def heartbeat(self, agent, timestamp):
-    print "Heartbeat"
-
 ##
 ## Main Program
 ##
 def main():
   _usage = "%prog [options] [broker-addr]..."
   _description = \
-"""Collect and print events from one of more Qpid message brokers.  If no broker-addr is
+"""Collect and print events from one or more Qpid message brokers.  If no broker-addr is
 supplied, %prog will connect to 'localhost:5672'.
 broker-addr is of the form:  [username/password@] hostname | ip-address [:<port>]
 ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost

Modified: incubator/qpid/trunk/qpid/python/commands/qpid-route
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/commands/qpid-route?rev=702651&r1=702650&r2=702651&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/commands/qpid-route (original)
+++ incubator/qpid/trunk/qpid/python/commands/qpid-route Tue Oct  7 14:47:35 2008
@@ -62,7 +62,7 @@
         self.qmf.delBroker(self.broker)
 
     def getLink (self):
-        links = self.qmf.getObjects(cls="link")
+        links = self.qmf.getObjects(_class="link")
         for link in links:
             if "%s:%d" % (link.host, link.port) == self.src.name ():
                 return link
@@ -74,7 +74,7 @@
             print "Linking broker to itself is not permitted"
             sys.exit(1)
 
-        brokers = self.qmf.getObjects(cls="broker")
+        brokers = self.qmf.getObjects(_class="broker")
         broker = brokers[0]
         link = self.getLink()
         if link != None:
@@ -92,7 +92,7 @@
 
     def DelLink (self, srcBroker):
         self.src = qmfconsole.BrokerURL(srcBroker)
-        brokers = self.qmf.getObjects(cls="broker")
+        brokers = self.qmf.getObjects(_class="broker")
         broker = brokers[0]
         link = self.getLink()
         if link == None:
@@ -103,7 +103,7 @@
             print "Close method returned:", res.status, res.text
 
     def ListLinks (self):
-        links = self.qmf.getObjects(cls="link")
+        links = self.qmf.getObjects(_class="link")
         if len(links) == 0:
             print "No Links Found"
         else:
@@ -119,7 +119,7 @@
         if self.dest.name() == self.src.name():
             raise Exception("Linking broker to itself is not permitted")
 
-        brokers = self.qmf.getObjects(cls="broker")
+        brokers = self.qmf.getObjects(_class="broker")
         broker = brokers[0]
 
         link = self.getLink()
@@ -140,7 +140,7 @@
         if link == None:
             raise Exception("Protocol Error - Missing link ID")
 
-        bridges = self.qmf.getObjects(cls="bridge")
+        bridges = self.qmf.getObjects(_class="bridge")
         for bridge in bridges:
             if bridge.linkRef == link.getObjectId() and \
                     bridge.dest == exchange and bridge.key == routingKey:
@@ -164,7 +164,7 @@
                 raise Exception("No link found from %s to %s" % (self.src.name(), self.dest.name()))
             sys.exit (0)
 
-        bridges = self.qmf.getObjects(cls="bridge")
+        bridges = self.qmf.getObjects(_class="bridge")
         for bridge in bridges:
             if bridge.linkRef == link.getObjectId() and bridge.dest == exchange and bridge.key == routingKey:
                 if _verbose:
@@ -186,8 +186,8 @@
             raise Exception("Route not found")
 
     def ListRoutes (self):
-        links   = self.qmf.getObjects(cls="link")
-        bridges = self.qmf.getObjects(cls="bridge")
+        links   = self.qmf.getObjects(_class="link")
+        bridges = self.qmf.getObjects(_class="bridge")
 
         for bridge in bridges:
             myLink = None
@@ -199,8 +199,8 @@
                 print "%s %s:%d %s %s" % (self.dest.name(), myLink.host, myLink.port, bridge.dest, bridge.key)
 
     def ClearAllRoutes (self):
-        links   = self.qmf.getObjects(cls="link")
-        bridges = self.qmf.getObjects(cls="bridge")
+        links   = self.qmf.getObjects(_class="link")
+        bridges = self.qmf.getObjects(_class="bridge")
 
         for bridge in bridges:
             if _verbose:
@@ -218,7 +218,7 @@
                 print "Ok"
 
         if _dellink:
-            links = self.qmf.getObjects(cls="link")
+            links = self.qmf.getObjects(_class="link")
             for link in links:
                 if _verbose:
                     print "Deleting Link: %s:%d... " % (link.host, link.port),

Modified: incubator/qpid/trunk/qpid/python/qpid/management.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/management.py?rev=702651&r1=702650&r2=702651&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/management.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/management.py Tue Oct  7 14:47:35 2008
@@ -285,7 +285,7 @@
     ft = {}
     ft["_class"] = className
     codec.write_map (ft)
-    msg = channel.message(codec.encoded, routing_key="agent.%d" % bank)
+    msg = channel.message(codec.encoded, routing_key="agent.1.%d" % bank)
     channel.send ("qpid.management", msg)
 
   def syncWaitForStable (self, channel):
@@ -398,7 +398,7 @@
     """ Compose the header of a management message. """
     codec.write_uint8 (ord ('A'))
     codec.write_uint8 (ord ('M'))
-    codec.write_uint8 (ord ('1'))
+    codec.write_uint8 (ord ('2'))
     codec.write_uint8 (opcode)
     codec.write_uint32  (seq)
 
@@ -412,7 +412,7 @@
     if octet != 'M':
       return None
     octet = chr (codec.read_uint8 ())
-    if octet != '1':
+    if octet != '2':
       return None
     opcode = chr (codec.read_uint8 ())
     seq    = codec.read_uint32 ()
@@ -433,7 +433,7 @@
     elif typecode == 6:
       codec.write_str8   (value)
     elif typecode == 7:
-      codec.write_vbin32 (value)
+      codec.write_str16 (value)
     elif typecode == 8:  # ABSTIME
       codec.write_uint64 (long (value))
     elif typecode == 9:  # DELTATIME
@@ -476,7 +476,7 @@
     elif typecode == 6:
       data = str (codec.read_str8 ())
     elif typecode == 7:
-      data = codec.read_vbin32 ()
+      data = codec.read_str16 ()
     elif typecode == 8:  # ABSTIME
       data = codec.read_uint64 ()
     elif typecode == 9:  # DELTATIME
@@ -604,6 +604,9 @@
       ch.send ("qpid.management", smsg)
 
   def handleClassInd (self, ch, codec):
+    kind  = codec.read_uint8()
+    if kind != 1:  # This API doesn't handle new-style events
+      return
     pname = str (codec.read_str8())
     cname = str (codec.read_str8())
     hash  = codec.read_bin128()
@@ -656,13 +659,15 @@
   def parseSchema (self, ch, codec):
     """ Parse a received schema-description message. """
     self.decOutstanding (ch)
+    kind  = codec.read_uint8()
+    if kind != 1:  # This API doesn't handle new-style events
+      return
     packageName = str (codec.read_str8 ())
     className   = str (codec.read_str8 ())
     hash        = codec.read_bin128 ()
     configCount = codec.read_uint16 ()
     instCount   = codec.read_uint16 ()
     methodCount = codec.read_uint16 ()
-    eventCount  = codec.read_uint16 ()
 
     if packageName not in self.packages:
       return
@@ -676,7 +681,6 @@
     configs = []
     insts   = []
     methods = {}
-    events  = {}
 
     configs.append (("id", 4, "", "", 1, 1, None, None, None, None, None))
     insts.append   (("id", 4, None, None))
@@ -765,42 +769,14 @@
         args.append (arg)
       methods[mname] = (mdesc, args)
 
-    for idx in range (eventCount):
-      ft = codec.read_map ()
-      ename    = str (ft["name"])
-      argCount = ft["argCount"]
-      if "desc" in ft:
-        edesc = str (ft["desc"])
-      else:
-        edesc = None
-
-      args = []
-      for aidx in range (argCount):
-        ft = codec.read_map ()
-        name    = str (ft["name"])
-        type    = ft["type"]
-        unit    = None
-        desc    = None
-
-        for key, value in ft.items ():
-          if   key == "unit":
-            unit = str (value)
-          elif key == "desc":
-            desc = str (value)
-
-        arg = (name, type, unit, desc)
-        args.append (arg)
-      events[ename] = (edesc, args)
-
     schemaClass = {}
     schemaClass['C'] = configs
     schemaClass['I'] = insts
     schemaClass['M'] = methods
-    schemaClass['E'] = events
     self.schema[classKey] = schemaClass
 
     if self.schemaCb != None:
-      self.schemaCb (ch.context, classKey, configs, insts, methods, events)
+      self.schemaCb (ch.context, classKey, configs, insts, methods, {})
 
   def parsePresenceMasks(self, codec, schemaClass):
     """ Generate a list of not-present properties """
@@ -896,7 +872,7 @@
     codec.write_str8 (classId[1])
     codec.write_bin128 (classId[2])
     codec.write_str8 (methodName)
-    bank = objId.getBank()
+    bank = "%d.%d" % (objId.getBroker(), objId.getBank())
 
     # Encode args according to schema
     if classId not in self.schema:
@@ -926,5 +902,5 @@
 
     packageName = classId[0]
     className   = classId[1]
-    msg = channel.message(codec.encoded, "agent." + str(bank))
+    msg = channel.message(codec.encoded, "agent." + bank)
     channel.send ("qpid.management", msg)

Modified: incubator/qpid/trunk/qpid/python/qpid/managementdata.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/managementdata.py?rev=702651&r1=702650&r2=702651&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/managementdata.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/managementdata.py Tue Oct  7 14:47:35 2008
@@ -546,10 +546,10 @@
       for classKey in sorted:
         tuple = self.schema[classKey]
         row = (self.displayClassName(classKey), len (tuple[0]), len (tuple[1]),
-               len (tuple[2]), len (tuple[3]))
+               len (tuple[2]))
         rows.append (row)
       self.disp.table ("Classes in Schema:",
-                       ("Class", "Properties", "Statistics", "Methods", "Events"),
+                       ("Class", "Properties", "Statistics", "Methods"),
                        rows)
     finally:
       self.lock.release ()

Modified: incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py?rev=702651&r1=702650&r2=702651&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py Tue Oct  7 14:47:35 2008
@@ -49,7 +49,7 @@
     """ Invoked when a QMF package is discovered. """
     pass
 
-  def newClass(self, classKey):
+  def newClass(self, kind, classKey):
     """ Invoked when a new class is discovered.  Session.getSchema can be
     used to obtain details about the class."""
     pass
@@ -158,7 +158,7 @@
       raise Exception(broker.error)
 
     self.brokers.append(broker)
-    self.getObjects(broker=broker, cls="agent")
+    self.getObjects(broker=broker, _class="agent")
     return broker
 
   def delBroker(self, broker):
@@ -219,35 +219,36 @@
 
     The class for queried objects may be specified in one of the following ways:
 
-    schema = <schema> - supply a schema object returned from getSchema
-    key = <key>       - supply a classKey from the list returned by getClasses
-    cls = <name>      - supply a class name as a string
+    _schema = <schema> - supply a schema object returned from getSchema.
+    _key = <key>       - supply a classKey from the list returned by getClasses.
+    _class = <name>    - supply a class name as a string.  If the class name exists
+                         in multiple packages, a _package argument may also be supplied.
 
     If objects should be obtained from only one agent, use the following argument.
     Otherwise, the query will go to all agents.
 
-    agent = <agent> - supply an agent from the list returned by getAgents
+    _agent = <agent> - supply an agent from the list returned by getAgents.
 
     If the get query is to be restricted to one broker (as opposed to all connected brokers),
     add the following argument:
 
-    broker = <broker> - supply a broker as returned by addBroker
+    _broker = <broker> - supply a broker as returned by addBroker.
 
     If additional arguments are supplied, they are used as property selectors.  For example,
     if the argument name="test" is supplied, only objects whose "name" property is "test"
     will be returned in the result.
     """
-    if "broker" in kwargs:
+    if "_broker" in kwargs:
       brokerList = []
-      brokerList.append(kwargs["broker"])
+      brokerList.append(kwargs["_broker"])
     else:
       brokerList = self.brokers
     for broker in brokerList:
       broker._waitForStable()
 
     agentList = []
-    if "agent" in kwargs:
-      agent = kwargs["agent"]
+    if "_agent" in kwargs:
+      agent = kwargs["_agent"]
       if agent.broker not in brokerList:
         raise Exception("Supplied agent is not accessible through the supplied broker")
       agentList.append(agent)
@@ -257,11 +258,14 @@
           agentList.append(agent)
 
     cname = None
-    if   "schema" in kwargs: pname, cname, hash = kwargs["schema"].getKey()
-    elif "key"    in kwargs: pname, cname, hash = kwargs["key"]
-    elif "cls"    in kwargs: pname, cname, hash = None, kwargs["cls"], None
+    if   "_schema" in kwargs: pname, cname, hash = kwargs["_schema"].getKey()
+    elif "_key"    in kwargs: pname, cname, hash = kwargs["_key"]
+    elif "_class"  in kwargs:
+      pname, cname, hash = None, kwargs["_class"], None
+      if "_package" in kwargs:
+        pname = kwargs["_package"]
     if cname == None:
-      raise Exception("No class supplied, use 'schema', 'key', or 'cls' argument")
+      raise Exception("No class supplied, use '_schema', '_key', or '_class' argument")
     map = {}
     map["_class"] = cname
     if pname != None: map["_package"] = pname
@@ -269,7 +273,7 @@
 
     self.getSelect = []
     for item in kwargs:
-      if item != "schema" and item != "key" and item != "cls":
+      if item[0] != '_':
         self.getSelect.append((item, kwargs[item]))
 
     self.getResult = []
@@ -282,7 +286,7 @@
       self.cv.release()
       broker._setHeader(sendCodec, 'G', seq)
       sendCodec.write_map(map)
-      smsg = broker._message(sendCodec.encoded, "agent.%d" % agent.bank)
+      smsg = broker._message(sendCodec.encoded, "agent.%s" % agent.bank)
       broker._send(smsg)
 
     starttime = time()
@@ -382,6 +386,7 @@
       self.cv.release()
 
   def _handleClassInd(self, broker, codec, seq):
+    kind  = codec.read_uint8()
     pname = str(codec.read_str8())
     cname = str(codec.read_str8())
     hash  = codec.read_bin128()
@@ -431,17 +436,18 @@
       self.console.event(broker, event)
 
   def _handleSchemaResp(self, broker, codec, seq):
+    kind  = codec.read_uint8()
     pname = str(codec.read_str8())
     cname = str(codec.read_str8())
     hash  = codec.read_bin128()
     classKey = (pname, cname, hash)
-    _class = SchemaClass(classKey, codec)
+    _class = SchemaClass(kind, classKey, codec)
     self.cv.acquire()
     self.packages[pname][(cname, hash)] = _class
     self.cv.release()
     broker._decOutstanding()
     if self.console != None:
-      self.console.newClass(classKey)
+      self.console.newClass(kind, classKey)
 
   def _handleContentInd(self, broker, codec, seq, prop=False, stat=False):
     pname = str(codec.read_str8())
@@ -485,7 +491,7 @@
   def _selectMatch(self, object):
     """ Check the object against self.getSelect to check for a match """
     for key, value in self.getSelect:
-      for prop, propval in object.properties:
+      for prop, propval in object.getProperties():
         if key == prop.name and value != propval:
           return False
     return True
@@ -497,7 +503,7 @@
     elif typecode == 3:  data = codec.read_uint32()    # U32
     elif typecode == 4:  data = codec.read_uint64()    # U64
     elif typecode == 6:  data = str(codec.read_str8()) # SSTR
-    elif typecode == 7:  data = codec.read_vbin32()    # LSTR
+    elif typecode == 7:  data = codec.read_str16()     # LSTR
     elif typecode == 8:  data = codec.read_int64()     # ABSTIME
     elif typecode == 9:  data = codec.read_uint64()    # DELTATIME
     elif typecode == 10: data = ObjectId(codec)        # REF
@@ -521,7 +527,7 @@
     elif typecode == 3:  codec.write_uint32 (long(value))   # U32
     elif typecode == 4:  codec.write_uint64 (long(value))   # U64
     elif typecode == 6:  codec.write_str8   (value)         # SSTR
-    elif typecode == 7:  codec.write_vbin32 (value)         # LSTR
+    elif typecode == 7:  codec.write_str16  (value)         # LSTR
     elif typecode == 8:  codec.write_int64  (long(value))   # ABSTIME
     elif typecode == 9:  codec.write_uint64 (long(value))   # DELTATIME
     elif typecode == 10: value.encode       (codec)         # REF
@@ -577,30 +583,42 @@
 
 class SchemaClass:
   """ """
-  def __init__(self, key, codec):
+  CLASS_KIND_TABLE = 1
+  CLASS_KIND_EVENT = 2
+
+  def __init__(self, kind, key, codec):
+    self.kind = kind
     self.classKey = key
     self.properties = []
     self.statistics = []
-    self.methods    = []
-    self.events     = []
+    self.methods = []
+    self.arguments = []
 
-    propCount   = codec.read_uint16()
-    statCount   = codec.read_uint16()
-    methodCount = codec.read_uint16()
-    eventCount  = codec.read_uint16()
-
-    for idx in range(propCount):
-      self.properties.append(SchemaProperty(codec))
-    for idx in range(statCount):
-      self.statistics.append(SchemaStatistic(codec))
-    for idx in range(methodCount):
-      self.methods.append(SchemaMethod(codec))
-    for idx in range(eventCount):
-      self.events.append(SchemaEvent(codec))
+    if self.kind == self.CLASS_KIND_TABLE:
+      propCount   = codec.read_uint16()
+      statCount   = codec.read_uint16()
+      methodCount = codec.read_uint16()
+      for idx in range(propCount):
+        self.properties.append(SchemaProperty(codec))
+      for idx in range(statCount):
+        self.statistics.append(SchemaStatistic(codec))
+      for idx in range(methodCount):
+        self.methods.append(SchemaMethod(codec))
+
+    elif self.kind == self.CLASS_KIND_EVENT:
+      argCount = codec.read_uint16()
+      for idx in range(argCount):
+        self.arguments.append(SchemaArgument(codec, methodArg=False))
 
   def __repr__(self):
     pname, cname, hash = self.classKey
-    result = "Class: %s:%s " % (pname, cname)
+    if self.kind == self.CLASS_KIND_TABLE:
+      kindStr = "Table"
+    elif self.kind == self.CLASS_KIND_EVENT:
+      kindStr = "Event"
+    else:
+      kindStr = "Unsupported"
+    result = "%s Class: %s:%s " % (kindStr, pname, cname)
     result += "(%08x-%04x-%04x-%04x-%04x%08x)" % struct.unpack ("!LHHHHL", hash)
     return result
 
@@ -620,9 +638,9 @@
     """ Return the list of methods for the class. """
     return self.methods
 
-  def getEvents(self):
+  def getArguments(self):
     """ Return the list of events for the class. """
-    return self.events
+    return self.arguments
 
 class SchemaProperty:
   """ """
@@ -693,33 +711,6 @@
     result += ")"
     return result
 
-class SchemaEvent:
-  """ """
-  def __init__(self, codec):
-    map = codec.read_map()
-    self.name = str(map["name"])
-    argCount  = map["argCount"]
-    if "desc" in map:
-      self.desc = str(map["desc"])
-    else:
-      self.desc = None
-    self.arguments = []
-
-    for idx in range(argCount):
-      self.arguments.append(SchemaArgument(codec, methodArg=False))
-
-  def __repr__(self):
-    result = self.name + "("
-    first = True
-    for arg in self.arguments:
-      if first:
-        first = False
-      else:
-        result += ", "
-      result += arg.name
-    result += ")"
-    return result
-
 class SchemaArgument:
   """ """
   def __init__(self, codec, methodArg):
@@ -743,7 +734,7 @@
       elif key == "desc"    : self.desc    = str(value)
       elif key == "default" : self.default = str(value)
 
-class ObjectId(object):
+class ObjectId:
   """ Object that represents QMF object identifiers """
   def __init__(self, codec, first=0, second=0):
     if codec:
@@ -800,80 +791,86 @@
   """ """
   def __init__(self, session, broker, schema, codec, prop, stat):
     """ """
-    self.session = session
-    self.broker  = broker
-    self.schema  = schema
-    self.currentTime = codec.read_uint64()
-    self.createTime  = codec.read_uint64()
-    self.deleteTime  = codec.read_uint64()
-    self.objectId    = ObjectId(codec)
-    self.properties  = []
-    self.statistics  = []
+    self._session = session
+    self._broker  = broker
+    self._schema  = schema
+    self._currentTime = codec.read_uint64()
+    self._createTime  = codec.read_uint64()
+    self._deleteTime  = codec.read_uint64()
+    self._objectId    = ObjectId(codec)
+    self._properties  = []
+    self._statistics  = []
     if prop:
       notPresent = self._parsePresenceMasks(codec, schema)
       for property in schema.getProperties():
         if property.name in notPresent:
-          self.properties.append((property, None))
+          self._properties.append((property, None))
         else:
-          self.properties.append((property, self.session._decodeValue(codec, property.type)))
+          self._properties.append((property, self._session._decodeValue(codec, property.type)))
     if stat:
       for statistic in schema.getStatistics():
-        self.statistics.append((statistic, self.session._decodeValue(codec, statistic.type)))
+        self._statistics.append((statistic, self._session._decodeValue(codec, statistic.type)))
 
   def getObjectId(self):
     """ Return the object identifier for this object """
-    return self.objectId
+    return self._objectId
 
   def getClassKey(self):
     """ Return the class-key that references the schema describing this object. """
-    return self.schema.getKey()
+    return self._schema.getKey()
 
   def getSchema(self):
     """ Return the schema that describes this object. """
-    return self.schema
+    return self._schema
 
   def getMethods(self):
     """ Return a list of methods available for this object. """
-    return self.schema.getMethods()
+    return self._schema.getMethods()
 
   def getTimestamps(self):
     """ Return the current, creation, and deletion times for this object. """
-    return self.currentTime, self.createTime, self.deleteTime
+    return self._currentTime, self._createTime, self._deleteTime
 
   def getIndex(self):
     """ Return a string describing this object's primary key. """
     result = ""
-    for property, value in self.properties:
+    for property, value in self._properties:
       if property.index:
         if result != "":
           result += ":"
         result += str(value)
     return result
 
+  def getProperties(self):
+    return self._properties
+
+  def getStatistics(self):
+    return self._statistics
+
   def __repr__(self):
     return self.getIndex()
 
   def __getattr__(self, name):
-    for method in self.schema.getMethods():
+    for method in self._schema.getMethods():
       if name == method.name:
         return lambda *args, **kwargs : self._invoke(name, args, kwargs)
-    for property, value in self.properties:
+    for property, value in self._properties:
       if name == property.name:
         return value
-    for statistic, value in self.statistics:
+    for statistic, value in self._statistics:
       if name == statistic.name:
         return value
     raise Exception("Type Object has no attribute '%s'" % name)
 
   def _invoke(self, name, args, kwargs):
-    for method in self.schema.getMethods():
+    for method in self._schema.getMethods():
       if name == method.name:
         aIdx = 0
-        sendCodec = Codec(self.broker.conn.spec)
-        seq = self.session.seqMgr._reserve((self, method))
-        self.broker._setHeader(sendCodec, 'M', seq)
-        self.objectId.encode(sendCodec)
-        pname, cname, hash = self.schema.getKey()
+        sendCodec = Codec(self._broker.conn.spec)
+        seq = self._session.seqMgr._reserve((self, method))
+        self._broker._setHeader(sendCodec, 'M', seq)
+        self._objectId.encode(sendCodec)
+        pname, cname, hash = self._schema.getKey()
         sendCodec.write_str8(pname)
         sendCodec.write_str8(cname)
         sendCodec.write_bin128(hash)
@@ -888,29 +885,30 @@
 
         for arg in method.arguments:
           if arg.dir.find("I") != -1:
-            self.session._encodeValue(sendCodec, args[aIdx], arg.type)
+            self._session._encodeValue(sendCodec, args[aIdx], arg.type)
             aIdx += 1
-        smsg = self.broker._message(sendCodec.encoded, "agent." + str(self.objectId.getBank()))
-        self.broker.cv.acquire()
-        self.broker.syncInFlight = True
-        self.broker.cv.release()
+        smsg = self._broker._message(sendCodec.encoded, "agent.%d.%d" %
+                                     (self._objectId.getBroker(), self._objectId.getBank()))
+        self._broker.cv.acquire()
+        self._broker.syncInFlight = True
+        self._broker.cv.release()
 
-        self.broker._send(smsg)
+        self._broker._send(smsg)
 
-        self.broker.cv.acquire()
+        self._broker.cv.acquire()
         starttime = time()
-        while self.broker.syncInFlight and self.broker.error == None:
-          self.broker.cv.wait(self.broker.SYNC_TIME)
-          if time() - starttime > self.broker.SYNC_TIME:
-            self.broker.cv.release()
-            self.session.seqMgr._release(seq)
+        while self._broker.syncInFlight and self._broker.error == None:
+          self._broker.cv.wait(self._broker.SYNC_TIME)
+          if time() - starttime > self._broker.SYNC_TIME:
+            self._broker.cv.release()
+            self._session.seqMgr._release(seq)
             raise RuntimeError("Timed out waiting for method to respond")
-        self.broker.cv.release()
-        if self.broker.error != None:
-          errorText = self.broker.error
-          self.broker.error = None
+        self._broker.cv.release()
+        if self._broker.error != None:
+          errorText = self._broker.error
+          self._broker.error = None
           raise Exception(errorText)
-        return self.broker.syncResult
+        return self._broker.syncResult
     raise Exception("Invalid Method (software defect) [%s]" % name)
 
   def _parsePresenceMasks(self, codec, schema):
@@ -954,7 +952,7 @@
     self.authUser = authUser
     self.authPass = authPass
     self.agents   = {}
-    self.agents[0] = Agent(self, 0, "BrokerAgent")
+    self.agents[0] = Agent(self, "1.0", "BrokerAgent")
     self.topicBound = False
     self.cv = Condition()
     self.syncInFlight = False
@@ -1040,14 +1038,15 @@
       self.error = "Connect Failed %d - %s" % (e[0], e[1])
 
   def _updateAgent(self, obj):
-    if obj.deleteTime == 0:
-      if obj.objectIdBank not in self.agents:
-        agent = Agent(self, obj.objectIdBank, obj.label)
-        self.agents[obj.objectIdBank] = agent
+    bankKey = "%d.%d" % (obj.brokerBank, obj.agentBank)
+    if obj._deleteTime == 0:
+      if bankKey not in self.agents:
+        agent = Agent(self, bankKey, obj.label)
+        self.agents[bankKey] = agent
         if self.session.console != None:
           self.session.console.newAgent(agent)
     else:
-      agent = self.agents.pop(obj.objectIdBank, None)
+      agent = self.agents.pop(bankKey, None)
       if agent != None and self.session.console != None:
         self.session.console.delAgent(agent)
 
@@ -1055,7 +1054,7 @@
     """ Compose the header of a management message. """
     codec.write_uint8(ord('A'))
     codec.write_uint8(ord('M'))
-    codec.write_uint8(ord('1'))
+    codec.write_uint8(ord('2'))
     codec.write_uint8(ord(opcode))
     codec.write_uint32(seq)
 
@@ -1068,7 +1067,7 @@
     if octet != 'M':
       return None, None
     octet = chr(codec.read_uint8())
-    if octet != '1':
+    if octet != '2':
       return None, None
     opcode = chr(codec.read_uint8())
     seq    = codec.read_uint32()
@@ -1164,28 +1163,24 @@
     self.label  = label
 
   def __repr__(self):
-    return "Agent at bank %d (%s)" % (self.bank, self.label)
+    return "Agent at bank %s (%s)" % (self.bank, self.label)
 
 class Event:
   """ """
   def __init__(self, session, codec):
     self.session = session
-    self.timestamp = codec.read_int64()
-    self.objectId  = ObjectId(codec)
     pname = codec.read_str8()
     cname = codec.read_str8()
     hash  = codec.read_bin128()
     self.classKey = (pname, cname, hash)
-    self.name = codec.read_str8()
+    self.timestamp = codec.read_int64()
+    self.schema = None
     if pname in session.packages:
       if (cname, hash) in session.packages[pname]:
-        schema = session.packages[pname][(cname, hash)]
-        for event in schema.getEvents():
-          if event.name == self.name:
-            self.schemaEvent = event
-            self.arguments = {}
-            for arg in event.arguments:
-              self.arguments[arg.name] = session._decodeValue(codec, arg.type)
+        self.schema = session.packages[pname][(cname, hash)]
+        self.arguments = {}
+        for arg in self.schema.arguments:
+          self.arguments[arg.name] = session._decodeValue(codec, arg.type)
 
   def __repr__(self):
     return self.getSyslogText()
@@ -1202,10 +1197,15 @@
   def getName(self):
     return self.name
 
+  def getSchema(self):
+    return self.schema
+
   def getSyslogText(self):
+    if self.schema == None:
+      return "<uninterpretable>"
     out = strftime("%c", gmtime(self.timestamp / 1000000000))
-    out += " " + self.classKey[0] + ":" + self.classKey[1] + " " + self.name
-    for arg in self.schemaEvent.arguments:
+    out += " " + self.classKey[0] + ":" + self.classKey[1]
+    for arg in self.schema.arguments:
       out += " " + arg.name + "=" + self.session._displayValue(self.arguments[arg.name], arg.type)
     return out
 
@@ -1247,8 +1247,8 @@
   def newPackage(self, name):
     print "newPackage:", name
 
-  def newClass(self, classKey):
-    print "newClass:", classKey
+  def newClass(self, kind, classKey):
+    print "newClass:", kind, classKey
 
   def newAgent(self, agent):
     print "newAgent:", agent

Modified: incubator/qpid/trunk/qpid/python/tests_0-10/management.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/management.py?rev=702651&r1=702650&r2=702651&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/management.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/management.py Tue Oct  7 14:47:35 2008
@@ -59,7 +59,7 @@
         session = self.session
         self.startQmf()
  
-        brokers = self.qmf.getObjects(cls="broker")
+        brokers = self.qmf.getObjects(_class="broker")
         self.assertEqual (len(brokers), 1)
         broker = brokers[0]
 
@@ -147,43 +147,43 @@
         session.queue_declare(queue="dest-queue", exclusive=True, auto_delete=True)
         session.exchange_bind(queue="dest-queue", exchange="amq.direct")
 
-        queues = self.qmf.getObjects(cls="queue")
+        queues = self.qmf.getObjects(_class="queue")
 
         "Move 10 messages from src-queue to dest-queue"
-        result = self.qmf.getObjects(cls="broker")[0].queueMoveMessages("src-queue", "dest-queue", 10)
+        result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 10)
         self.assertEqual (result.status, 0) 
 
-        sq = self.qmf.getObjects(cls="queue", name="src-queue")[0]
-        dq = self.qmf.getObjects(cls="queue", name="dest-queue")[0]
+        sq = self.qmf.getObjects(_class="queue", name="src-queue")[0]
+        dq = self.qmf.getObjects(_class="queue", name="dest-queue")[0]
 
         self.assertEqual (sq.msgDepth,10)
         self.assertEqual (dq.msgDepth,10)
 
         "Move all remaining messages to destination"
-        result = self.qmf.getObjects(cls="broker")[0].queueMoveMessages("src-queue", "dest-queue", 0)
+        result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 0)
         self.assertEqual (result.status,0)
 
-        sq = self.qmf.getObjects(cls="queue", name="src-queue")[0]
-        dq = self.qmf.getObjects(cls="queue", name="dest-queue")[0]
+        sq = self.qmf.getObjects(_class="queue", name="src-queue")[0]
+        dq = self.qmf.getObjects(_class="queue", name="dest-queue")[0]
 
         self.assertEqual (sq.msgDepth,0)
         self.assertEqual (dq.msgDepth,20)
 
         "Use a bad source queue name"
-        result = self.qmf.getObjects(cls="broker")[0].queueMoveMessages("bad-src-queue", "dest-queue", 0)
+        result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("bad-src-queue", "dest-queue", 0)
         self.assertEqual (result.status,4)
 
         "Use a bad destination queue name"
-        result = self.qmf.getObjects(cls="broker")[0].queueMoveMessages("src-queue", "bad-dest-queue", 0)
+        result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "bad-dest-queue", 0)
         self.assertEqual (result.status,4)
 
         " Use a large qty (40) to move from dest-queue back to "
         " src-queue- should move all "
-        result = self.qmf.getObjects(cls="broker")[0].queueMoveMessages("dest-queue", "src-queue", 40)
+        result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("dest-queue", "src-queue", 40)
         self.assertEqual (result.status,0)
 
-        sq = self.qmf.getObjects(cls="queue", name="src-queue")[0]
-        dq = self.qmf.getObjects(cls="queue", name="dest-queue")[0]
+        sq = self.qmf.getObjects(_class="queue", name="src-queue")[0]
+        dq = self.qmf.getObjects(_class="queue", name="dest-queue")[0]
 
         self.assertEqual (sq.msgDepth,20)
         self.assertEqual (dq.msgDepth,0)
@@ -216,23 +216,23 @@
             msg = Message(props, body)
             session.message_transfer(destination="amq.direct", message=msg)
 
-        pq = self.qmf.getObjects(cls="queue", name="purge-queue")[0]
+        pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
 
         "Purge top message from purge-queue"
         result = pq.purge(1)
         self.assertEqual (result.status, 0) 
-        pq = self.qmf.getObjects(cls="queue", name="purge-queue")[0]
+        pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
         self.assertEqual (pq.msgDepth,19)
 
         "Purge top 9 messages from purge-queue"
         result = pq.purge(9)
         self.assertEqual (result.status, 0) 
-        pq = self.qmf.getObjects(cls="queue", name="purge-queue")[0]
+        pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
         self.assertEqual (pq.msgDepth,10)
 
         "Purge all messages from purge-queue"
         result = pq.purge(0)
         self.assertEqual (result.status, 0) 
-        pq = self.qmf.getObjects(cls="queue", name="purge-queue")[0]
+        pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
         self.assertEqual (pq.msgDepth,0)
 

Modified: incubator/qpid/trunk/qpid/specs/management-schema.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/specs/management-schema.xml?rev=702651&r1=702650&r2=702651&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/specs/management-schema.xml (original)
+++ incubator/qpid/trunk/qpid/specs/management-schema.xml Tue Oct  7 14:47:35 2008
@@ -86,17 +86,6 @@
       <arg name="password"      dir="I" type="sstr"/>
     </method>
 
-    <event name="agentConnect" desc="QMF Management Agent has connected to the broker">
-      <arg name="remoteAddress" type="sstr"/>
-      <arg name="label"         type="sstr"/>
-      <arg name="brokerBank"    type="uint32"/>
-      <arg name="agentBank"     type="uint32"/>
-    </event>
-
-    <event name="agentDisconnect" desc="QMF Management Agent has disconnected from the broker">
-      <arg name="remoteAddress" type="sstr"/>
-    </event>
-
     <method name="queueMoveMessages" desc="Move messages from one queue to another">
       <arg name="srcQueue"          dir="I" type="sstr" desc="Source queue"/>
       <arg name="destQueue"         dir="I" type="sstr" desc="Destination queue"/>
@@ -115,7 +104,8 @@
     <property name="label"         type="sstr"   access="RO"           desc="Label for agent"/>
     <property name="registeredTo"  type="objId"  references="Broker" access="RO" desc="Broker agent is registered to"/>
     <property name="systemId"      type="uuid"   access="RO"           desc="Identifier of system where agent resides"/>
-    <property name="objectIdBank"  type="uint32" access="RO"           desc="Assigned object-id bank"/>
+    <property name="brokerBank"    type="uint32" access="RO"           desc="Assigned object-id broker bank"/>
+    <property name="agentBank"     type="uint32" access="RO"           desc="Assigned object-id agent bank"/>
   </class>
 
   <!--
@@ -218,7 +208,7 @@
     <property name="vhostRef" type="objId"  references="Vhost" access="RC" index="y" parentRef="y"/>
     <property name="address"  type="sstr"   access="RC" index="y"/>
     <property name="incoming" type="bool"   access="RC"/>
-    <property name="SystemConnection"   type="bool"   access="RC" desc="Infrastucture/ Inter-system connection (Cluster, Federation ,...)"/>
+    <property name="SystemConnection"   type="bool"   access="RC" desc="Infrastucture/ Inter-system connection (Cluster, Federation, ...)"/>
 
     <statistic name="closing"          type="bool" desc="This client is closing by management request"/>
     <statistic name="federationLink"   type="bool" desc="Is this a federation link"/>
@@ -305,5 +295,36 @@
     <method name="resetLifespan"/>
     <method name="close"/>
   </class>
+
+  <eventArguments>
+    <arg name="altEx"   type="sstr"   desc="Name of the alternate exchange"/>
+    <arg name="args"    type="map"    desc="Supplemental arguments or parameters supplied"/>
+    <arg name="autoDel" type="bool"   desc="Created object is automatically deleted when no longer in use"/>
+    <arg name="dest"    type="sstr"   desc="Destination tag for a subscription"/>
+    <arg name="disp"    type="sstr"   desc="Disposition of a declaration: 'created' if object was created, 'existing' if object already existed"/>
+    <arg name="durable" type="bool"   desc="Created object is durable"/>
+    <arg name="exName"  type="sstr"   desc="Name of an exchange"/>
+    <arg name="exType"  type="sstr"   desc="Type of an exchange"/>
+    <arg name="excl"    type="bool"   desc="Created object is exclusive for the use of the owner only"/>
+    <arg name="key"     type="sstr"   desc="Key text used for routing or binding"/>
+    <arg name="qName"   type="sstr"   desc="Name of a queue"/>
+    <arg name="rhost"   type="sstr"   desc="Address (i.e. DNS name, IP address, etc.) of a remotely connected host"/>
+    <arg name="user"    type="sstr"   desc="Authentication identity"/>
+  </eventArguments>
+
+  <event name="clientConnect"     args="rhost, user"/>
+  <event name="clientDisconnect"  args="rhost, user"/>
+  <event name="agentConnect"      args="rhost, user"/>
+  <event name="agentDisconnect"   args="rhost, user"/>
+  <event name="brokerConnect"     args="rhost, user"/>
+  <event name="brokerDisconnect"  args="rhost, user"/>
+  <event name="queueDeclare"      args="rhost, user, qName, durable, excl, autoDel, args, disp"/>
+  <event name="queueDelete"       args="rhost, user, qName"/>
+  <event name="exchangeDeclare"   args="rhost, user, exName, exType, altEx, durable, autoDel, args, disp"/>
+  <event name="exchangeDelete"    args="rhost, user, exName"/>
+  <event name="bind"              args="rhost, user, exName, qName, key, args"/>
+  <event name="unbind"            args="rhost, user, exName, qName, key"/>
+  <event name="subscribe"         args="rhost, user, qName, dest, excl, args"/>
+  <event name="unsubscribe"       args="rhost, user, dest"/>
 </schema>