You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2010/03/26 18:01:24 UTC

svn commit: r927970 - in /qpid/branches/qmf-devel0.7a/qpid: cpp/include/qpid/management/ cpp/managementgen/qmfgen/ cpp/src/qpid/agent/ cpp/src/qpid/management/ extras/qmf/src/py/qmf/

Author: tross
Date: Fri Mar 26 17:01:23 2010
New Revision: 927970

URL: http://svn.apache.org/viewvc?rev=927970&view=rev
Log:
Updating qmf branch with latest updates:
  - Implemented V2 query in Python
  - Cleaned up map formatting in c++ agents
  - Updated for changes in the protocol wiki
  - Significant cleanup and refactoring in the pure-Python console

Modified:
    qpid/branches/qmf-devel0.7a/qpid/cpp/include/qpid/management/ManagementObject.h
    qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmfgen/schema.py
    qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
    qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
    qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementAgent.cpp
    qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementObject.cpp
    qpid/branches/qmf-devel0.7a/qpid/extras/qmf/src/py/qmf/console.py

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/include/qpid/management/ManagementObject.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/include/qpid/management/ManagementObject.h?rev=927970&r1=927969&r2=927970&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/include/qpid/management/ManagementObject.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/include/qpid/management/ManagementObject.h Fri Mar 26 17:01:23 2010
@@ -59,7 +59,8 @@ protected:
     void fromString(const std::string&);
 public:
     QPID_COMMON_EXTERN ObjectId() : agent(0), first(0) {}
-    QPID_COMMON_EXTERN ObjectId(const messaging::Variant& map) : agent(0) { mapDecode(map.asMap()); }
+    QPID_COMMON_EXTERN ObjectId(const messaging::Variant& map) :
+        agent(0), first(0), agentEpoch(0) { mapDecode(map.asMap()); }
     QPID_COMMON_EXTERN ObjectId(uint8_t flags, uint16_t seq, uint32_t broker);
     QPID_COMMON_EXTERN ObjectId(AgentAttachment* _agent, uint8_t flags, uint16_t seq);
     QPID_COMMON_EXTERN ObjectId(std::istream&);

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmfgen/schema.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmfgen/schema.py?rev=927970&r1=927969&r2=927970&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmfgen/schema.py (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmfgen/schema.py Fri Mar 26 17:01:23 2010
@@ -682,7 +682,7 @@ class SchemaStatistic:
 
   def genMap (self, stream):
     if self.type.type.perThread:
-      self.type.type.genMap(stream, "totals." + self.name)
+      self.type.type.genMap(stream, "totals." + self.name, key=self.name)
     else:
       self.type.type.genMap(stream, self.name)
 

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp?rev=927970&r1=927969&r2=927970&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp Fri Mar 26 17:01:23 2010
@@ -44,6 +44,7 @@ using std::ifstream;
 using std::string;
 using std::cout;
 using std::endl;
+using qpid::messaging::Variant;
 
 Mutex            ManagementAgent::Singleton::lock;
 bool             ManagementAgent::Singleton::disabled = false;
@@ -179,7 +180,7 @@ void ManagementAgentImpl::init(const qpi
 void ManagementAgentImpl::registerClass(const string& packageName,
                                         const string& className,
                                         uint8_t*     md5Sum,
-                                        qpid::management::ManagementObject::writeSchemaCall_t schemaCall)
+                                        ManagementObject::writeSchemaCall_t schemaCall)
 { 
     Mutex::ScopedLock lock(agentLock);
     PackageMap::iterator pIter = findOrAddPackage(packageName);
@@ -189,7 +190,7 @@ void ManagementAgentImpl::registerClass(
 void ManagementAgentImpl::registerEvent(const string& packageName,
                                         const string& eventName,
                                         uint8_t*     md5Sum,
-                                        qpid::management::ManagementObject::writeSchemaCall_t schemaCall)
+                                        ManagementObject::writeSchemaCall_t schemaCall)
 { 
     Mutex::ScopedLock lock(agentLock);
     PackageMap::iterator pIter = findOrAddPackage(packageName);
@@ -239,12 +240,12 @@ void ManagementAgentImpl::raiseEvent(con
     key << "console.event." << assignedBrokerBank << "." << assignedAgentBank << "." <<
         event.getPackageName() << "." << event.getEventName();
 
-    ::qpid::messaging::Message msg;
-    ::qpid::messaging::MapContent content(msg);
-    ::qpid::messaging::VariantMap &map_ = content.asMap();
-    ::qpid::messaging::VariantMap schemaId;
-    ::qpid::messaging::VariantMap values;
-    ::qpid::messaging::VariantMap headers;
+    qpid::messaging::Message msg;
+    qpid::messaging::MapContent content(msg);
+    Variant::Map &map_ = content.asMap();
+    Variant::Map schemaId;
+    Variant::Map values;
+    Variant::Map headers;
 
     map_["_schema_id"] = mapEncodeSchemaId(event.getPackageName(),
                                            event.getEventName(),
@@ -379,73 +380,29 @@ void ManagementAgentImpl::sendHeartbeat(
     QPID_LOG(trace, "SENT AgentHeartbeat name=" << name_address);
 }
 
-void ManagementAgentImpl::sendCommandComplete(string replyToKey, uint32_t sequence,
-                                              uint32_t code, string text)
+void ManagementAgentImpl::sendException(const string& replyToKey, const string& cid,
+                                        const string& text, uint32_t code)
 {
-    Buffer   outBuffer(outputBuffer, MA_BUFFER_SIZE);
-    uint32_t outLen;
-
-    encodeHeader(outBuffer, 'z', sequence);
-    outBuffer.putLong(code);
-    outBuffer.putShortString(text);
-    outLen = MA_BUFFER_SIZE - outBuffer.available();
-    outBuffer.reset();
-    connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyToKey);
-    QPID_LOG(trace, "SENT CommandComplete: seq=" << sequence << " code=" << code << " text=" << text);
-}
-
-void ManagementAgentImpl::handleAttachResponse(Buffer& inBuffer)
-{
-    Mutex::ScopedLock lock(agentLock);
-
-    assignedBrokerBank = inBuffer.getLong();
-    assignedAgentBank  = inBuffer.getLong();
-
-    QPID_LOG(trace, "RCVD AttachResponse: broker=" << assignedBrokerBank << " agent=" << assignedAgentBank);
-
-    if ((assignedBrokerBank != requestedBrokerBank) ||
-        (assignedAgentBank  != requestedAgentBank)) {
-        if (requestedAgentBank == 0) {
-            QPID_LOG(notice, "Initial object-id bank assigned: " << assignedBrokerBank << "." <<
-                     assignedAgentBank);
-        } else {
-            QPID_LOG(warning, "Collision in object-id! New bank assigned: " << assignedBrokerBank <<
-                     "." << assignedAgentBank);
-        }
-        storeData();
-        requestedBrokerBank = assignedBrokerBank;
-        requestedAgentBank = assignedAgentBank;
-    }
+    static const string addr_exchange("qmf.default.direct");
 
-    attachment.setBanks(assignedBrokerBank, assignedAgentBank);
+    messaging::Message msg;
+    messaging::MapContent content(msg);
+    messaging::Variant::Map& map(content.asMap());
+    messaging::Variant::Map headers;
+    messaging::Variant::Map values;
 
-    // Bind to qpid.management to receive commands
-    connThreadBody.bindToBank(assignedBrokerBank, assignedAgentBank);
+    headers["method"] = "indication";
+    headers["qmf.opcode"] = "_exception";
+    headers["qmf.agent"] = name_address;
 
-    // Send package indications for all local packages
-    for (PackageMap::iterator pIter = packages.begin();
-         pIter != packages.end();
-         pIter++) {
-        Buffer   outBuffer(outputBuffer, MA_BUFFER_SIZE);
-        uint32_t outLen;
+    values["error_code"] = code;
+    values["error_text"] = text;
+    map["_values"] = values;
 
-        encodeHeader(outBuffer, 'p');
-        encodePackageIndication(outBuffer, pIter);
-        outLen = MA_BUFFER_SIZE - outBuffer.available();
-        outBuffer.reset();
-        connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker");
+    content.encode();
+    connThreadBody.sendBuffer(msg.getContent(), cid, headers, addr_exchange, replyToKey);
 
-        // Send class indications for all local classes
-        ClassMap cMap = pIter->second;
-        for (ClassMap::iterator cIter = cMap.begin(); cIter != cMap.end(); cIter++) {
-            outBuffer.reset();
-            encodeHeader(outBuffer, 'q');
-            encodeClassIndication(outBuffer, pIter, cIter);
-            outLen = MA_BUFFER_SIZE - outBuffer.available();
-            outBuffer.reset();
-            connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker");
-        }
-    }
+    QPID_LOG(trace, "SENT Exception code=" << code <<" text=" << text);
 }
 
 void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, const string& replyTo)
@@ -509,7 +466,7 @@ void ManagementAgentImpl::invokeMethodRe
     } else {
         string methodName;
         ObjectId objId;
-        qpid::messaging::Variant::Map inArgs;
+        Variant::Map inArgs;
 
         try {
             // coversions will throw if input is invalid.
@@ -539,7 +496,7 @@ void ManagementAgentImpl::invokeMethodRe
         }
     }
 
-    qpid::messaging::Variant::Map headers;
+    Variant::Map headers;
     headers["method"] = "response";
     headers["qmf.agent"] = name_address;
     if (failed)
@@ -551,20 +508,14 @@ void ManagementAgentImpl::invokeMethodRe
     connThreadBody.sendBuffer(outMsg.getContent(), cid, headers, "qmf.default.direct", replyTo);
 }
 
-void ManagementAgentImpl::handleGetQuery(const string& body, const string& contentType,
-                                         const string& cid, const string& replyTo)
+void ManagementAgentImpl::handleGetQuery(const string& body, const string& cid, const string& replyTo)
 {
     moveNewObjectsLH();
 
-    if (contentType != "_query_v1") {
-        QPID_LOG(warning, "Support for QMF V2 Query format TBD!!!");
-        return;
-    }
-
     qpid::messaging::Message inMsg(body);
     qpid::messaging::MapView inMap(inMsg);
     qpid::messaging::MapView::const_iterator i;
-    ::qpid::messaging::Variant::Map headers;
+    Variant::Map headers;
 
     QPID_LOG(trace, "RCVD GetQuery: map=" << inMap << " cid=" << cid);
 
@@ -572,71 +523,111 @@ void ManagementAgentImpl::handleGetQuery
     headers["qmf.opcode"] = "_query_response";
     headers["qmf.content"] = "_data";
     headers["qmf.agent"] = name_address;
-    headers["partial"];
+    headers["partial"] = Variant();
 
-    ::qpid::messaging::Message outMsg;
-    ::qpid::messaging::ListContent content(outMsg);
-    ::qpid::messaging::Variant::List &list_ = content.asList();
-    ::qpid::messaging::Variant::Map  map_;
-    ::qpid::messaging::Variant::Map values;
-    string className;
+    qpid::messaging::Message outMsg;
+    qpid::messaging::ListContent content(outMsg);
+    Variant::List &list_ = content.asList();
+    Variant::Map  map_;
+    Variant::Map values;
+    Variant::Map oidMap;
+
+    /*
+     * Unpack the _what element of the query.  Currently we only support OBJECT queries.
+     */
+    i = inMap.find("_what");
+    if (i == inMap.end()) {
+        sendException(replyTo, cid, "_what element missing in Query");
+        return;
+    }
 
-    i = inMap.find("_class");
-    if (i != inMap.end())
-        try {
-            className = i->second.asString();
-        } catch(exception& e) {
-            className.clear();
-            QPID_LOG(trace, "RCVD GetQuery: invalid format - class target ignored.");
-        }
+    if (i->second.getType() != qpid::messaging::VAR_STRING) {
+        sendException(replyTo, cid, "_what element is not a string");
+        return;
+    }
 
-    if (className.empty()) {
-        ObjectId objId;
-        i = inMap.find("_object_id");
-        if (i != inMap.end()) {
+    if (i->second.asString() != "OBJECT") {
+        sendException(replyTo, cid, "Query for _what => '" + i->second.asString() + "' not supported");
+        return;
+    }
 
-            try {
-                objId = ObjectId(i->second.asMap());
-            } catch (exception &e) {
-                objId = ObjectId();   // empty object id - won't find a match (I hope).
-                QPID_LOG(trace, "RCVD GetQuery (invalid Object Id format) to=" << replyTo << " seq=" << cid);
-            }
+    string className;
+    string packageName;
 
-            ManagementObjectMap::iterator iter = managementObjects.find(objId);
-            if (iter != managementObjects.end()) {
-                ManagementObject* object = iter->second;
+    /*
+     * Handle the _schema_id element, if supplied.
+     */
+    i = inMap.find("_schema_id");
+    if (i != inMap.end() && i->second.getType() == qpid::messaging::VAR_MAP) {
+        const Variant::Map& schemaIdMap(i->second.asMap());
+
+        Variant::Map::const_iterator s_iter = schemaIdMap.find("_class_name");
+        if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::messaging::VAR_STRING)
+            className = s_iter->second.asString();
+
+        s_iter = schemaIdMap.find("_package_name");
+        if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::messaging::VAR_STRING)
+            packageName = s_iter->second.asString();
+    }
+
+    /*
+     * Unpack the _object_id element of the query if it is present.  If it is present, find that one
+     * object and return it.  If it is not present, send a class-based result.
+     */
+    i = inMap.find("_object_id");
+    if (i != inMap.end() && i->second.getType() == qpid::messaging::VAR_MAP) {
+        ObjectId objId(i->second);
 
-                if (object->getConfigChanged() || object->getInstChanged())
-                    object->setUpdateTime();
+        ManagementObjectMap::iterator iter = managementObjects.find(objId);
+        if (iter != managementObjects.end()) {
+            ManagementObject* object = iter->second;
 
-                object->mapEncodeValues(values, true, true); // write both stats and properties
-                map_["_values"] = values;
-                list_.push_back(map_);
+            if (object->getConfigChanged() || object->getInstChanged())
+                object->setUpdateTime();
 
-                content.encode();
-                connThreadBody.sendBuffer(outMsg.getContent(), cid, headers, "qmf.default.direct", replyTo);
-            }
+            object->mapEncodeValues(values, true, true); // write both stats and properties
+            objId.mapEncode(oidMap);
+            map_["_values"] = values;
+            map_["_object_id"] = oidMap;
+            map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
+                                                   object->getClassName(),
+                                                   object->getMd5Sum());
+            list_.push_back(map_);
+            headers.erase("partial");
+
+            content.encode();
+            connThreadBody.sendBuffer(outMsg.getContent(), cid, headers, "qmf.default.direct", replyTo, "amqp/list");
+            QPID_LOG(trace, "SENT QueryResponse (query by object_id) to=" << replyTo);
+            return;
         }
     } else {
         for (ManagementObjectMap::iterator iter = managementObjects.begin();
              iter != managementObjects.end();
              iter++) {
             ManagementObject* object = iter->second;
-            if (object->getClassName() == className) {
+            if (object->getClassName() == className &&
+                (packageName.empty() || object->getPackageName() == packageName)) {
 
                 // @todo support multiple object reply per message
                 values.clear();
                 list_.clear();
+                oidMap.clear();
 
                 if (object->getConfigChanged() || object->getInstChanged())
                     object->setUpdateTime();
 
                 object->mapEncodeValues(values, true, true); // write both stats and properties
+                iter->first.mapEncode(oidMap);
                 map_["_values"] = values;
+                map_["_object_id"] = oidMap;
+                map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
+                                                       object->getClassName(),
+                                                       object->getMd5Sum());
                 list_.push_back(map_);
 
                 content.encode();
-                connThreadBody.sendBuffer(outMsg.getContent(), cid, headers, "qmf.default.direct", replyTo);
+                connThreadBody.sendBuffer(outMsg.getContent(), cid, headers, "qmf.default.direct", replyTo, "amqp/list");
+                QPID_LOG(trace, "SENT QueryResponse (query by schema_id) to=" << replyTo);
             }
         }
     }
@@ -645,8 +636,8 @@ void ManagementAgentImpl::handleGetQuery
     list_.clear();
     headers.erase("partial");
     content.encode();
-    connThreadBody.sendBuffer(outMsg.getContent(), cid, headers, "qmf.default.direct", replyTo);
-    QPID_LOG(trace, "SENT ObjectInd");
+    connThreadBody.sendBuffer(outMsg.getContent(), cid, headers, "qmf.default.direct", replyTo, "amqp/list");
+    QPID_LOG(trace, "SENT QueryResponse (empty with no 'partial' indicator) to=" << replyTo);
 }
 
 void ManagementAgentImpl::handleLocateRequest(const string&, const string& cid, const string& replyTo)
@@ -718,9 +709,7 @@ void ManagementAgentImpl::received(Messa
 
         if      (opcode == "_agent_locate_request") handleLocateRequest(msg.getData(), cid, replyToKey);
         else if (opcode == "_method_request")       handleMethodRequest(msg.getData(), cid, replyToKey);
-        else if (opcode == "_query_request")        handleGetQuery(msg.getData(),
-                                                                   mp.getApplicationHeaders().getAsString("qmf.content"),
-                                                                   cid, replyToKey);
+        else if (opcode == "_query_request")        handleGetQuery(msg.getData(), cid, replyToKey);
         else {
             QPID_LOG(warning, "Support for QMF V2 Opcode [" << opcode << "] TBD!!!");
         }
@@ -737,9 +726,9 @@ void ManagementAgentImpl::received(Messa
 
     if (checkHeader(inBuffer, &opcode, &sequence))
     {
-        if      (opcode == 'a') handleAttachResponse(inBuffer);
-        else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence, replyToKey);
+        if      (opcode == 'S') handleSchemaRequest(inBuffer, sequence, replyToKey);
         else if (opcode == 'x') handleConsoleAddedIndication();
+        else
             QPID_LOG(warning, "Ignoring old-format QMF Request! opcode=" << char(opcode));
     }
 }
@@ -754,15 +743,15 @@ void ManagementAgentImpl::encodeHeader(B
     buf.putLong (seq);
 }
 
-qpid::messaging::Variant::Map ManagementAgentImpl::mapEncodeSchemaId(const string& pname,
-                                                                     const string& cname,
-                                                                     const uint8_t *md5Sum)
+Variant::Map ManagementAgentImpl::mapEncodeSchemaId(const string& pname,
+                                                    const string& cname,
+                                                    const uint8_t *md5Sum)
 {
-    qpid::messaging::Variant::Map map_;
+    Variant::Map map_;
 
     map_["_package_name"] = pname;
     map_["_class_name"] = cname;
-    map_["_hash_str"] = messaging::Uuid(md5Sum);
+    map_["_hash"] = messaging::Uuid(md5Sum);
     return map_;
 }
 
@@ -821,7 +810,7 @@ void ManagementAgentImpl::addClassLocal(
                                         PackageMap::iterator  pIter,
                                         const string&         className,
                                         uint8_t*              md5Sum,
-                                        qpid::management::ManagementObject::writeSchemaCall_t schemaCall)
+                                        ManagementObject::writeSchemaCall_t schemaCall)
 {
     SchemaClassKey key;
     ClassMap&      cMap = pIter->second;
@@ -902,9 +891,9 @@ void ManagementAgentImpl::periodicProces
              !baseObject->isDeleted()))
             continue;
 
-        ::qpid::messaging::Message m;
-        ::qpid::messaging::ListContent content(m);
-        ::qpid::messaging::Variant::List &list_ = content.asList();
+        qpid::messaging::Message m;
+        qpid::messaging::ListContent content(m);
+        Variant::List &list_ = content.asList();
 
         for (ManagementObjectMap::iterator iter = baseIter;
              iter != managementObjects.end();
@@ -920,9 +909,9 @@ void ManagementAgentImpl::periodicProces
                 send_stats = (object->hasInst() && (object->getInstChanged() || object->getForcePublish()));
 
                 if (send_stats || send_props) {
-                    ::qpid::messaging::Variant::Map map_;
-                    ::qpid::messaging::Variant::Map values;
-                    ::qpid::messaging::Variant::Map oid;
+                    Variant::Map map_;
+                    Variant::Map values;
+                    Variant::Map oid;
 
                     object->getObjectId().mapEncode(oid);
                     map_["_object_id"] = oid;
@@ -943,7 +932,7 @@ void ManagementAgentImpl::periodicProces
         content.encode();
         const string &str = m.getContent();
         if (str.length()) {
-            ::qpid::messaging::Variant::Map  headers;
+            Variant::Map  headers;
             headers["method"] = "indication";
             headers["qmf.opcode"] = "_data_indication";
             headers["qmf.content"] = "_data";
@@ -1068,13 +1057,13 @@ void ManagementAgentImpl::ConnectionThre
 
 void ManagementAgentImpl::ConnectionThread::sendBuffer(const string& data,
                                                        const string& cid,
-                                                       const qpid::messaging::VariantMap headers,
+                                                       const Variant::Map headers,
                                                        const string& exchange,
                                                        const string& routingKey,
                                                        const string& contentType)
 {
     Message msg;
-    qpid::messaging::VariantMap::const_iterator i;
+    Variant::Map::const_iterator i;
 
     if (!cid.empty())
         msg.getMessageProperties().setCorrelationId(cid);

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h?rev=927970&r1=927969&r2=927970&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h Fri Mar 26 17:01:23 2010
@@ -259,16 +259,14 @@ class ManagementAgentImpl : public Manag
                                                     const uint8_t *md5Sum);
     bool checkHeader  (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
     void sendHeartbeat();
-    void sendCommandComplete  (std::string replyToKey, uint32_t sequence,
-                               uint32_t code = 0, std::string text = std::string("OK"));
-    void handleAttachResponse (qpid::framing::Buffer& inBuffer);
+    void sendException(const std::string& replyToKey, const std::string& cid,
+                       const std::string& text, uint32_t code=1);
     void handlePackageRequest (qpid::framing::Buffer& inBuffer);
     void handleClassQuery     (qpid::framing::Buffer& inBuffer);
     void handleSchemaRequest  (qpid::framing::Buffer& inBuffer, uint32_t sequence, const std::string& replyTo);
     void invokeMethodRequest  (const std::string& body, const std::string& cid, const std::string& replyTo);
 
-    void handleGetQuery       (const std::string& body, const std::string& content_type,
-                               const std::string& cid, const std::string& replyTo);
+    void handleGetQuery       (const std::string& body, const std::string& cid, const std::string& replyTo);
     void handleLocateRequest  (const std::string& body, const std::string& sequence, const std::string& replyTo);
     void handleMethodRequest  (const std::string& body, const std::string& sequence, const std::string& replyTo);
     void handleConsoleAddedIndication();

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=927970&r1=927969&r2=927970&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementAgent.cpp Fri Mar 26 17:01:23 2010
@@ -62,8 +62,7 @@ static qpid::messaging::Variant::Map map
     map_["_package_name"] = pname;
     map_["_class_name"] = cname;
     map_["_type"] = type;
-    map_["_hash_str"] = std::string((const char *)md5Sum,
-                                    qpid::management::ManagementObject::MD5_LEN);
+    map_["_hash"] = qpid::messaging::Uuid(md5Sum);
     return map_;
 }
 
@@ -1896,9 +1895,8 @@ void ManagementAgent::disallow(const std
 }
 
 void ManagementAgent::SchemaClassKey::mapEncode(qpid::messaging::Variant::Map& _map) const {
-    const std::string hash_str((const char *)hash, sizeof(hash));
     _map["_cname"] = name;
-    _map["_hash"] = hash_str;
+    _map["_hash"] = qpid::messaging::Uuid(hash);
 }
 
 void ManagementAgent::SchemaClassKey::mapDecode(const qpid::messaging::Variant::Map& _map) {
@@ -1909,8 +1907,8 @@ void ManagementAgent::SchemaClassKey::ma
     }
 
     if ((i = _map.find("_hash")) != _map.end()) {
-        const std::string s = i->second.asString();
-        memcpy(hash, s.data(), sizeof(hash));
+        const qpid::messaging::Uuid& uuid = i->second.asUuid();
+        memcpy(hash, uuid.data(), uuid.size());
     }
 }
 

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementObject.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementObject.cpp?rev=927970&r1=927969&r2=927970&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementObject.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementObject.cpp Fri Mar 26 17:01:23 2010
@@ -184,9 +184,6 @@ void ObjectId::mapDecode(const messaging
     else
         throw Exception("Required _object_name field missing.");
 
-    if ((i = map.find("_first")) != map.end())
-        first  = i->second.asUint64();
-
     if ((i = map.find("_agent_name")) != map.end())
         agentName = i->second.asString();
 
@@ -270,7 +267,7 @@ void ManagementObject::writeTimestamps (
 
     sid["_package_name"] = getPackageName();
     sid["_class_name"] = getClassName();
-    sid["_hash_str"] = std::string((const char *)getMd5Sum(), MD5_LEN);
+    sid["_hash"] = qpid::messaging::Uuid(getMd5Sum());
     map["_schema_id"] = sid;
 
     objectId.mapEncode(oid);

Modified: qpid/branches/qmf-devel0.7a/qpid/extras/qmf/src/py/qmf/console.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/extras/qmf/src/py/qmf/console.py?rev=927970&r1=927969&r2=927970&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/extras/qmf/src/py/qmf/console.py (original)
+++ qpid/branches/qmf-devel0.7a/qpid/extras/qmf/src/py/qmf/console.py Fri Mar 26 17:01:23 2010
@@ -41,6 +41,9 @@ from cStringIO       import StringIO
 #import qpid.log
 #qpid.log.enable(name="qpid.io.cmd", level=qpid.log.DEBUG)
 
+#===================================================================================================
+# CONSOLE
+#===================================================================================================
 class Console:
   """ To access the asynchronous operations, a class must be derived from
   Console with overrides of any combination of the available methods. """
@@ -94,6 +97,10 @@ class Console:
     """ Invoked when a method response from an asynchronous method call is received. """
     pass
 
+
+#===================================================================================================
+# BrokerURL
+#===================================================================================================
 class BrokerURL(URL):
   def __init__(self, text):
     URL.__init__(self, text)
@@ -115,13 +122,22 @@ class BrokerURL(URL):
   def match(self, host, port):
     return socket.getaddrinfo(self.host, self.port)[0][4] == socket.getaddrinfo(host, port)[0][4]
 
+
+#===================================================================================================
+# Object
+#===================================================================================================
 class Object(object):
-  """ This class defines a 'proxy' object representing a real managed object on an agent.
-      Actions taken on this proxy are remotely affected on the real managed object.
   """
-  def __init__(self, session, broker, schema, codec=None, prop=None, stat=None, managed=True, v2Map=None, agentName=None, kwargs={}):
-    self._session = session
-    self._broker  = broker
+  This class defines a 'proxy' object representing a real managed object on an agent.
+  Actions taken on this proxy are remotely affected on the real managed object.
+  """
+  def __init__(self, agent, schema, codec=None, prop=None, stat=None, v2Map=None, agentName=None, kwargs={}):
+    self._agent   = agent
+    self._session = None
+    self._broker  = None
+    if agent:
+      self._session = agent.session
+      self._broker  = agent.broker
     self._schema  = schema
     self._properties  = []
     self._statistics  = []
@@ -129,8 +145,7 @@ class Object(object):
       self.v2Init(v2Map, agentName)
       return
 
-    self._managed = managed
-    if self._managed:
+    if self._agent:
       self._currentTime = codec.read_uint64()
       self._createTime  = codec.read_uint64()
       self._deleteTime  = codec.read_uint64()
@@ -176,10 +191,9 @@ class Object(object):
     if '_subtypes' in omap:
       self._subtypes = omap['_subtypes']
     if '_object_id' in omap:
-      self._managed = True
       self._objectId = ObjectId(omap['_object_id'], agentName=agentName)
     else:
-      self._managed = None
+      self._objectId = None
 
   def getBroker(self):
     """ Return the broker from which this object was sent """
@@ -211,7 +225,7 @@ class Object(object):
 
   def isManaged(self):
     """ Return True iff this object is a proxy for a managed object on an agent. """
-    return self._managed
+    return self._objectId and self._agent
 
   def getIndex(self):
     """ Return a string describing this object's primary key. """
@@ -250,7 +264,7 @@ class Object(object):
     """ Contact the agent and retrieve the lastest property and statistic values for this object. """
     if not self.isManaged():
       raise Exception("Object is not managed")
-    obj = self._session.getObjects(_objectId = self._objectId, _broker=self._broker)
+    obj = self._agent.getObjects(_objectId=self._objectId)
     if obj:
       self.mergeUpdate(obj[0])
     else:
@@ -423,6 +437,10 @@ class Object(object):
           bit = 0
     return excludeList    
 
+
+#===================================================================================================
+# Session
+#===================================================================================================
 class Session:
   """
   An instance of the Session class represents a console session running
@@ -495,132 +513,26 @@ class Session:
     if self.userBindings and not self.rcvObjects:
       raise Exception("userBindings can't be set unless rcvObjects is set and a console is provided")
 
-    """
-    ##
-    ## v2_data_queues is used to store object data received from QMFv2 agents.
-    ## It is stored here in case we need to go and query schema data from the
-    ## agent before reporting to the user.
-    ##
-    ## v2_data_queues is a map, keyed by agent address of queues of entries
-    ## The format of entries in the queue is a data map
-    ## This list must be protected by self.cv
-    ##
-    """
-    self.v2_data_queues = {}
-    self.v2_pending_queues = {}
-
   def _getBrokerForAgentAddr(self, agent_addr):
-    broker = None
     try:
       self.cv.acquire()
       key = (1, agent_addr)
       for b in self.brokers:
         if key in b.agents:
-          broker = b
-    finally:
-      self.cv.release()
-    return broker
-
-  def _processV2Data(self):
-    """
-    Attempt to make progress on the entries in the v2_data_queue.  If an entry has a schema
-    that is in our schema cache, process it.  Otherwise, send a request for the schema information
-    to the agent that manages the object.
-    """
-    try:
-      self.cv.acquire()
-      pop_list = []
-      for agent_addr in self.v2_data_queues:
-        entries = self.v2_data_queues[agent_addr]
-        keep_going = True
-        while keep_going and len(entries) > 0:
-          schemaId = self._getSchemaIdforV2ObjectLH(entries[0])
-          schema = self.schemaCache.getSchema(schemaId)
-          if schema:
-            broker = self._getBrokerForAgentAddr(agent_addr)
-            obj = Object(self, broker, schema, v2Map=entries[0], agentName=agent_addr)
-            entries.pop(0)
-
-            """
-            TODO:  This following code assumes that the data indication came unsolicited.
-                   This needs to be enhanced to handle the case of a query response.
-            """
-            if self.console:
-              self.console.objectProps(broker, obj)
-
-          else:
-            """
-            We have no schema for this data object, move the queue to the pending map and request
-            schema data from the agent
-            """
-            self.v2_pending_queues[agent_addr] = self.v2_data_queues[agent_addr]
-            pop_list.append(agent_addr)
-            self._v2SendSchemaRequest(agent_addr, schemaId)
-            keep_going = None
-      for agent_addr in pop_list:
-        self.v2_data_queues.pop(agent_addr)
-    finally:
-      self.cv.release()
-
-  def _addV2Data(self, agent_addr, data_map):
-    """
-    Add data-for-processing to the work queue
-    """
-    process = None
-    try:
-      self.cv.acquire()
-      if agent_addr in self.v2_pending_queues:
-        self.v2_pending_queues[agent_addr].append(data_map)
-      else:
-        if agent_addr not in self.v2_data_queues:
-          self.v2_data_queues[agent_addr] = []
-        self.v2_data_queues[agent_addr].append(data_map)
-        process = True
-    finally:
-      self.cv.release()
-
-    if process:
-      self._processV2Data()
-
-  def _removeV2Agent(self, agent):
-    """
-    Remove entries in the data queues related to a lost agent.
-    """
-    agent_name = agent.getAgentBank()
-    try:
-      self.cv.acquire()
-      if agent_name in self.v2_data_queues:
-        self.v2_data_queues.pop(agent_name)
-      if agent_name in self.v2_pending_queues:
-        self.v2_pending_queues.pop(agent_name)
+          return b
     finally:
       self.cv.release()
+    return None
 
-  def _schemaInfoFromV2Agent(self, agent_addr):
-    """
-    We have just received new schema information from an agent.  Check to see if there's
-    more work that can now be done.
-    """
-    re_process = None
+  def _getAgentForAgentAddr(self, agent_addr):
     try:
       self.cv.acquire()
-      if agent_addr in self.v2_pending_queues:
-        self.v2_data_queues[agent_addr] = self.v2_pending_queues.pop(agent_addr)
-        re_process = True
+      key = agent_addr
+      for b in self.brokers:
+        if key in b.agents:
+          return b.agents[key]
     finally:
       self.cv.release()
-
-    if re_process:
-      self._processV2Data()
-
-  def _getSchemaIdforV2ObjectLH(self, data):
-    """
-    Given a data map, extract the schema-identifier.
-    """
-    if data.__class__ != dict:
-      return None
-    if '_schema_id' in data:
-      return ClassKey(data['_schema_id'])
     return None
 
   def __repr__(self):
@@ -642,7 +554,6 @@ class Session:
     returned from the addBroker call """
     if self.console:
       for agent in broker.getAgents():
-        self.console.removev2Agent(agent)
         self.console.delAgent(agent)
     broker._shutdown()
     self.brokers.remove(broker)
@@ -711,12 +622,12 @@ class Session:
         agentList.append(a)
     return agentList
 
-  def makeObject(self, classKey, broker=None, **kwargs):
+  def makeObject(self, classKey, **kwargs):
     """ Create a new, unmanaged object of the schema indicated by classKey """
     schema = self.getSchema(classKey)
     if schema == None:
       raise Exception("Schema not found for classKey")
-    return Object(self, broker, schema, None, True, True, False, kwargs)
+    return Object(None, schema, None, True, True, kwargs)
 
   def getObjects(self, **kwargs):
     """ Get a list of objects from QMF agents.
@@ -886,7 +797,6 @@ class Session:
   def _handleBrokerDisconnect(self, broker):
     if self.console:
       for agent in broker.getAgents():
-        self.session._removeV2Agent(agent)
         self.console.delAgent(agent)
       self.console.brokerDisconnected(broker)
 
@@ -955,31 +865,6 @@ class Session:
       smsg = broker._message(sendCodec.encoded)
       broker._send(smsg)
 
-  def _handleMethodResp(self, broker, codec, seq):
-    code = codec.read_uint32()
-    text = codec.read_str16()
-    outArgs = {}
-    pair = self.seqMgr._release(seq)
-    if pair == None:
-      return
-    method, synchronous = pair
-    if code == 0:
-      for arg in method.arguments:
-        if arg.dir.find("O") != -1:
-          outArgs[arg.name] = self._decodeValue(codec, arg.type, broker)
-    result = MethodResult(code, text, outArgs)
-    if synchronous:
-      try:
-        broker.cv.acquire()
-        broker.syncResult = result
-        broker.syncInFlight = False
-        broker.cv.notify()
-      finally:
-        broker.cv.release()
-    else:
-      if self.console:
-        self.console.methodResponse(broker, seq, result)
-
   def _handleHeartbeatInd(self, broker, codec, seq, msg):
     brokerBank = 1
     agentBank = 0
@@ -1003,58 +888,32 @@ class Session:
       self.console.heartbeat(agent, timestamp)
     broker._ageAgents()
 
-  def _handleEventInd(self, broker, codec, seq):
-    if self.console != None:
-      event = Event(self, broker, codec)
-      self.console.event(broker, event)
-
   def _handleSchemaResp(self, broker, codec, seq, agent_addr):
     kind  = codec.read_uint8()
     classKey = ClassKey(codec)
     _class = SchemaClass(kind, classKey, codec, self)
     self.schemaCache.declareClass(classKey, _class)
-    self.seqMgr._release(seq)
-    broker._decOutstanding()
+    ctx = self.seqMgr._release(seq)
+    if ctx:
+      broker._decOutstanding()
     if self.console != None:
       self.console.newClass(kind, classKey)
 
-    if agent_addr:
-      self._schemaInfoFromV2Agent(agent_addr)
-
-  def _handleContentInd(self, broker, codec, seq, prop=False, stat=False):
-    classKey = ClassKey(codec)
-    schema = self.schemaCache.getSchema(classKey)
-    if not schema:
-      return
-
-    object = Object(self, broker, schema, codec, prop, stat)
-    if classKey.getPackageName() == "org.apache.qpid.broker" and classKey.getClassName() == "agent" and prop:
-      broker._updateAgent(object)
-
-    try:
-      self.cv.acquire()
-      if seq in self.syncSequenceList:
-        if object.getTimestamps()[2] == 0 and self._selectMatch(object):
-          self.getResult.append(object)
-        return
-    finally:
-      self.cv.release()
-
-    if self.console and self.rcvObjects:
-      if prop:
-        self.console.objectProps(broker, object)
-      if stat:
-        self.console.objectStats(broker, object)
+    if agent_addr and (agent_addr.__class__ == str or agent_addr.__class__ == unicode):
+      agent = self._getAgentForAgentAddr(agent_addr)
+      if agent:
+        agent._schemaInfoFromV2Agent()
 
   def _v2HandleHeartbeatInd(self, broker, mp, ah, content):
-    brokerBank = 1
-    agentName = ah["qmf.agent"]
-    values = content["_values"]
-    timestamp = values["timestamp"]
-    interval = values["heartbeat_interval"]
-    if agentName == None:
+    try:
+      agentName = ah["qmf.agent"]
+      values = content["_values"]
+      timestamp = values["timestamp"]
+      interval = values["heartbeat_interval"]
+    except:
       return
-    agent = broker.getAgent(brokerBank, agentName)
+
+    agent = broker.getAgent(1, agentName)
     if agent == None:
       agent = Agent(broker, agentName, "QMFv2 Agent", True, interval)
       broker._addAgent(agentName, agent)
@@ -1067,44 +926,6 @@ class Session:
   def _v2HandleAgentLocateRsp(self, broker, mp, ah, content):
     self._v2HandleHeartbeatInd(broker, mp, ah, content)
 
-  def _v2HandleDataInd(self, broker, mp, ah, content):
-    kind = "_data"
-    if "qmf.content" in ah:
-      kind = ah["qmf.content"]
-    agent_addr = ah["qmf.agent"]
-    if content.__class__ != list:
-      return
-    if kind == "_data":
-      for omap in content:
-        self._addV2Data(agent_addr, omap)
-
-  def _v2HandleQueryRsp(self, broker, mp, ah, content):
-    pass
-
-  def _v2HandleMethodRsp(self, broker, mp, ah, content):
-    pass
-
-  def _v2HandleException(self, broker, mp, ah, content):
-    pass
-
-  def _v2SendSchemaRequest(self, agent_addr, schemaId):
-    """
-    Send a query to an agent to request details on a particular schema class.
-    IMPORTANT:  This function currently sends a QMFv1 schema-request to the address of
-                the agent.  The agent will send its response to amq.direct/<our-key>.
-                Eventually, this will be converted to a proper QMFv2 schema query.
-    """
-    broker = self._getBrokerForAgentAddr(agent_addr)
-    if not broker:
-      return
-
-    sendCodec = Codec()
-    seq = self.seqMgr._reserve(None)
-    broker._setHeader(sendCodec, 'S', seq)
-    schemaId.encode(sendCodec)
-    smsg = broker._message(sendCodec.encoded, agent_addr)
-    broker._send(smsg, "qmf.default.direct")
-
   def _handleError(self, error):
     try:
       self.cv.acquire()
@@ -1343,6 +1164,10 @@ class Session:
         return seq
     return None
 
+
+#===================================================================================================
+# SchemaCache
+#===================================================================================================
 class SchemaCache(object):
   """
   The SchemaCache is a data structure that stores learned schema information.
@@ -1419,6 +1244,10 @@ class SchemaCache(object):
       self.lock.release()
     return True
 
+
+#===================================================================================================
+# ClassKey
+#===================================================================================================
 class ClassKey:
   """ A ClassKey uniquely identifies a class from the schema. """
   def __init__(self, constructor):
@@ -1443,7 +1272,7 @@ class ClassKey:
       try:
         self.pname = constructor['_package_name']
         self.cname = constructor['_class_name']
-        self.hash  = constructor['_hash_str']
+        self.hash  = constructor['_hash']
       except:
         raise Exception("Invalid ClassKey map format")
     else:
@@ -1458,6 +1287,9 @@ class ClassKey:
     codec.write_str8(self.cname)
     codec.write_bin128(self.hash.bytes)
 
+  def asMap(self):
+    return {'_package_name': self.pname, '_class_name': self.cname, '_hash': self.hash}
+
   def getPackageName(self):
     return self.pname
 
@@ -1476,6 +1308,10 @@ class ClassKey:
   def __repr__(self):
     return self.pname + ":" + self.cname + "(" + self.getHashString() + ")"
 
+
+#===================================================================================================
+# SchemaClass
+#===================================================================================================
 class SchemaClass:
   """ """
   CLASS_KIND_TABLE = 1
@@ -1558,6 +1394,10 @@ class SchemaClass:
     else:
         return self.arguments + self.session.getSchema(self.superTypeKey).getArguments()  
 
+
+#===================================================================================================
+# SchemaProperty
+#===================================================================================================
 class SchemaProperty:
   """ """
   def __init__(self, codec):
@@ -1587,6 +1427,10 @@ class SchemaProperty:
   def __repr__(self):
     return self.name
 
+
+#===================================================================================================
+# SchemaStatistic
+#===================================================================================================
 class SchemaStatistic:
   """ """
   def __init__(self, codec):
@@ -1603,6 +1447,10 @@ class SchemaStatistic:
   def __repr__(self):
     return self.name
 
+
+#===================================================================================================
+# SchemaMethod
+#===================================================================================================
 class SchemaMethod:
   """ """
   def __init__(self, codec):
@@ -1631,6 +1479,10 @@ class SchemaMethod:
     result += ")"
     return result
 
+
+#===================================================================================================
+# SchemaArgument
+#===================================================================================================
 class SchemaArgument:
   """ """
   def __init__(self, codec, methodArg):
@@ -1658,6 +1510,10 @@ class SchemaArgument:
       elif key == "refPackage" : self.refPackage = value
       elif key == "refClass"   : self.refClass = value
 
+
+#===================================================================================================
+# ObjectId
+#===================================================================================================
 class ObjectId:
   """ Object that represents QMF object identifiers """
   def __init__(self, constructor, first=0, second=0, agentName=None):
@@ -1742,12 +1598,22 @@ class ObjectId:
     codec.write_uint64(first)
     codec.write_uint64(second)
 
+  def asMap(self):
+    omap = {'_agent_name': self.agentName, '_object_name': self.objectName}
+    if self.agentEpoch != 0:
+      omap['_agent_epoch'] = self.agentEpoch
+    return omap
+
   def __hash__(self):
     return self.__repr__().__hash__()
 
   def __eq__(self, other):
     return self.__repr__().__eq__(other)
 
+
+#===================================================================================================
+# MethodResult
+#===================================================================================================
 class MethodResult(object):
   """ """
   def __init__(self, status, text, outArgs):
@@ -1763,6 +1629,10 @@ class MethodResult(object):
   def __repr__(self):
     return "%s (%d) - %s" % (self.text, self.status, self.outArgs)
 
+
+#===================================================================================================
+# ManagedConnection
+#===================================================================================================
 class ManagedConnection(Thread):
   """ Thread class for managing a connection. """
   DELAY_MIN = 1
@@ -1825,6 +1695,10 @@ class ManagedConnection(Thread):
       finally:
         self.cv.release()
 
+
+#===================================================================================================
+# Broker
+#===================================================================================================
 class Broker:
   """ This object represents a connection (or potential connection) to a QMF broker. """
   SYNC_TIME = 60
@@ -1872,7 +1746,7 @@ class Broker:
 
   def getAgent(self, brokerBank, agentBank):
     """ Return the agent object associated with a particular broker and agent bank value."""
-    bankKey = (brokerBank, agentBank)
+    bankKey = agentBank
     try:
       self.cv.acquire()
       if bankKey in self.agents:
@@ -1923,7 +1797,7 @@ class Broker:
       try:
         self.cv.acquire()
         self.agents = {}
-        self.agents[(1,0)] = Agent(self, 0, "BrokerAgent")
+        self.agents[0] = Agent(self, 0, "BrokerAgent")
       finally:
         self.cv.release()
 
@@ -1960,7 +1834,7 @@ class Broker:
       self.amqpSession.message_subscribe(queue=self.replyName, destination="rdest",
                                          accept_mode=self.amqpSession.accept_mode.none,
                                          acquire_mode=self.amqpSession.acquire_mode.pre_acquired)
-      self.amqpSession.incoming("rdest").listen(self._replyCb, self._exceptionCb)
+      self.amqpSession.incoming("rdest").listen(self._v1Cb, self._exceptionCb)
       self.amqpSession.message_set_flow_mode(destination="rdest", flow_mode=1)
       self.amqpSession.message_flow(destination="rdest", unit=0, value=0xFFFFFFFFL)
       self.amqpSession.message_flow(destination="rdest", unit=1, value=0xFFFFFFFFL)
@@ -1970,7 +1844,7 @@ class Broker:
       self.amqpSession.message_subscribe(queue=self.topicName, destination="tdest",
                                          accept_mode=self.amqpSession.accept_mode.none,
                                          acquire_mode=self.amqpSession.acquire_mode.pre_acquired)
-      self.amqpSession.incoming("tdest").listen(self._replyCb)
+      self.amqpSession.incoming("tdest").listen(self._v1Cb)
       self.amqpSession.message_set_flow_mode(destination="tdest", flow_mode=1)
       self.amqpSession.message_flow(destination="tdest", unit=0, value=0xFFFFFFFFL)
       self.amqpSession.message_flow(destination="tdest", unit=1, value=0xFFFFFFFFL)
@@ -2013,7 +1887,7 @@ class Broker:
       raise
 
   def _updateAgent(self, obj):
-    bankKey = (obj.brokerBank, obj.agentBank)
+    bankKey = obj.agentBank
     agent = None
     if obj._deleteTime == 0:
       try:
@@ -2037,7 +1911,7 @@ class Broker:
   def _addAgent(self, name, agent):
     try:
       self.cv.acquire()
-      self.agents[(1, name)] = agent
+      self.agents[name] = agent
     finally:
       self.cv.release()
     if self.session.console:
@@ -2057,7 +1931,6 @@ class Broker:
       self.cv.release()
     if self.session.console:
       for agent in to_notify:
-        self.session._removeV2Agent(agent)
         self.session.console.delAgent(agent)
 
   def _v2SendAgentLocate(self, predicate={}):
@@ -2167,51 +2040,88 @@ class Broker:
     finally:
       self.cv.release()
 
-  def _replyCb(self, msg):
+  def _v1Cb(self, msg):
+    """
+    This is the general message handler for messages received via the QMFv1 exchanges.
+    """
+    agent = None
     agent_addr = None
     mp = msg.get("message_properties")
     ah = mp.application_headers
     if ah and 'qmf.agent' in ah:
       agent_addr = ah['qmf.agent']
+
+    if not agent_addr:
+      #
+      # See if we can determine the agent identity from the routing key
+      #
+      dp = msg.get("delivery_properties")
+      rkey = None
+      if dp.routing_key:
+        rkey = dp.routing_key
+        items = rkey.split('.')
+        if len(items) >= 4:
+          if items[0] == 'console' and items[3].isdigit():
+            agent_addr = int(items[3]) # The QMFv1 Agent Bank
+    if agent_addr and agent_addr in self.agents:
+      agent = self.agents[agent_addr]
+
     codec = Codec(msg.body)
     while True:
       opcode, seq = self._checkHeader(codec)
       if   opcode == None: return
       if   opcode == 'b': self.session._handleBrokerResp      (self, codec, seq)
       elif opcode == 'p': self.session._handlePackageInd      (self, codec, seq)
-      elif opcode == 'z': self.session._handleCommandComplete (self, codec, seq)
       elif opcode == 'q': self.session._handleClassInd        (self, codec, seq)
-      elif opcode == 'm': self.session._handleMethodResp      (self, codec, seq)
-      elif opcode == 'h': self.session._handleHeartbeatInd    (self, codec, seq, msg)
-      elif opcode == 'e': self.session._handleEventInd        (self, codec, seq)
       elif opcode == 's': self.session._handleSchemaResp      (self, codec, seq, agent_addr)
-      elif opcode == 'c': self.session._handleContentInd      (self, codec, seq, prop=True)
-      elif opcode == 'i': self.session._handleContentInd      (self, codec, seq, stat=True)
-      elif opcode == 'g': self.session._handleContentInd      (self, codec, seq, prop=True, stat=True)
-    self.session.receiver._completed.add(msg.id)
-    self.session.channel.session_completed(self.session.receiver._completed)
+      elif opcode == 'h': self.session._handleHeartbeatInd    (self, codec, seq, msg)
+      elif opcode == 'z': self.session._handleCommandComplete (self, codec, seq)
+      elif agent:
+        agent._handleQmfV1Message(opcode, mp, ah, codec)
+
+    self.amqpSession.receiver._completed.add(msg.id)
+    self.amqpSession.channel.session_completed(self.amqpSession.receiver._completed)
 
   def _v2Cb(self, msg):
-    dp = msg.get("delivery_properties")
+    """
+    This is the general message handler for messages received via QMFv2 exchanges.
+    """
     mp = msg.get("message_properties")
     ah = mp["application_headers"]
-    opcode = ah["qmf.opcode"]
     codec = Codec(msg.body)
 
-    if mp.content_type == "amqp/list":
-      content = codec.read_list()
-    elif mp.content_type == "amqp/map":
-      content = codec.read_map()
-    else:
-      return
+    if 'qmf.opcode' in ah:
+      opcode = ah['qmf.opcode']
+      if mp.content_type == "amqp/list":
+        content = codec.read_list()
+        if not content:
+          content = []
+      elif mp.content_type == "amqp/map":
+        content = codec.read_map()
+        if not content:
+          content = {}
+      else:
+        content = None
+
+      if content != None:
+        ##
+        ## Directly handle agent heartbeats and agent locate responses as these are broker-scope (they are
+        ## used to maintain the broker's list of agent proxies.
+        ##
+        if   opcode == '_agent_heartbeat_indication': self.session._v2HandleHeartbeatInd(self, mp, ah, content)
+        elif opcode == '_agent_locate_response':      self.session._v2HandleAgentLocateRsp(self, mp, ah, content)
+        else:
+          ##
+          ## All other opcodes are agent-scope and are forwarded to the agent proxy representing the sender
+          ## of the message.
+          ##
+          agent_addr = ah['qmf.agent']
+          if agent_addr in self.agents:
+            agent = self.agents[agent_addr]
+            agent._handleQmfV2Message(opcode, mp, ah, content)
 
-    if   opcode == None: return
-    elif opcode == '_agent_heartbeat_indication': self.session._v2HandleHeartbeatInd(self, mp, ah, content)
-    elif opcode == '_agent_locate_response':      self.session._v2HandleAgentLocateRsp(self, mp, ah, content)
-    elif opcode == '_data_indication':            self.session._v2HandleDataInd(self, mp, ah, content)
-    elif opcode == '_query_response':             self.session._v2HandleQueryRsp(self, mp, ah, content)
-    elif opcode == '_method_response':            self.session._v2HandleMethodRsp(self, mp, ah, content)
-    elif opcode == '_exception':                  self.session._v2HandleException(self, mp, ah, content)
+    self.amqpSession.receiver._completed.add(msg.id)
+    self.amqpSession.channel.session_completed(self.amqpSession.receiver._completed)
 
   def _exceptionCb(self, data):
     self.connected = False
@@ -2227,20 +2137,46 @@ class Broker:
     if self.thread:
       self.thread.disconnected()
 
+
+#===================================================================================================
+# Agent
+#===================================================================================================
 class Agent:
-  """ """
+  """
+  This class represents a proxy for a remote agent being managed
+  """
   def __init__(self, broker, agentBank, label, isV2=False, interval=0):
     self.broker = broker
+    self.session = broker.session
+    self.schemaCache = self.session.schemaCache
     self.brokerBank = broker.getBrokerBank()
     self.agentBank = agentBank
     self.label = label
     self.isV2 = isV2
     self.heartbeatInterval = interval
+    self.lock = Lock()
+    self.seqMgr = self.session.seqMgr
+    self.contextMap = {}
+    self.unsolicitedContext = RequestContext(self, self)
     self.lastSeenTime = time()
 
+
+  def __call__(self, **kwargs):
+    """
+    This is the handler for unsolicited stuff received from the agent
+    """
+    if 'qmf_object' in kwargs:
+      if self.session.console:
+        self.session.console.objectProps(self.broker, kwargs['qmf_object'])
+    if 'qmf_object_stats' in kwargs:
+      if self.session.console:
+        self.session.console.objectStats(self.broker, kwargs['qmf_object_stats'])
+
+
   def touch(self):
     self.lastSeenTime = time()
 
+
   def isOld(self):
     if self.heartbeatInterval == 0:
       return None
@@ -2248,6 +2184,7 @@ class Agent:
       return True
     return None
 
+
   def __repr__(self):
     if self.isV2:
       ver = "v2"
@@ -2255,15 +2192,392 @@ class Agent:
       ver = "v1"
     return "Agent(%s) at bank %d.%s (%s)" % (ver, self.brokerBank, self.agentBank, self.label)
 
+
   def getBroker(self):
     return self.broker
 
+
   def getBrokerBank(self):
     return self.brokerBank
 
+
   def getAgentBank(self):
     return self.agentBank
 
+
+  def getObjects(self, notifiable=None, **kwargs):
+    """ Get a list of objects from QMF agents.
+    All arguments are passed by name(keyword).
+
+    If 'notifiable' is None (default), this call will block until completion or timeout.
+    If supplied, notifiable is assumed to be a callable object that will be called when the
+    list of queried objects arrives.  The single argument to the call shall be a list of
+    the returned objects.
+
+    The class for queried objects may be specified in one of the following ways:
+
+    _schema = <schema> - supply a schema object returned from getSchema.
+    _key = <key>       - supply a classKey from the list returned by getClasses.
+    _class = <name>    - supply a class name as a string.  If the class name exists
+                         in multiple packages, a _package argument may also be supplied.
+    _objectId = <id>   - get the object referenced by the object-id
+
+    The default timeout for this synchronous operation is 60 seconds.  To change the timeout,
+    use the following argument:
+
+    _timeout = <time in seconds>
+
+    If additional arguments are supplied, they are used as property selectors.  For example,
+    if the argument name="test" is supplied, only objects whose "name" property is "test"
+    will be returned in the result.
+    """
+    if notifiable:
+      if not callable(notifiable):
+        raise Exception("notifiable object must be callable")
+
+    #
+    # Allocate a context to track this asynchronous request.
+    #
+    context = RequestContext(self, notifiable)
+    sequence = self.seqMgr._reserve(context)
+    try:
+      self.lock.acquire()
+      self.contextMap[sequence] = context
+    finally:
+      self.lock.release()
+
+    #
+    # Compose and send the query message to the agent using the appropriate protocol for the
+    # agent's QMF version.
+    #
+    if self.isV2:
+      self._v2SendGetQuery(sequence, kwargs)
+    else:
+      self._v1SendGetQuery(sequence, kwargs)
+
+    #
+    # If this is a synchronous call, block and wait for completion.
+    #
+    if not notifiable:
+      timeout = 60
+      if '_timeout' in kwargs:
+        timeout = kwargs['_timeout']
+      context.waitForSignal(timeout)
+      if context.exception:
+        raise Exception(context.exception)
+      result = context.queryResults
+      self.contextMap.pop(sequence)
+      return result
+
+
+  def _schemaInfoFromV2Agent(self):
+    """
+    We have just received new schema information from this agent.  Check to see if there's
+    more work that can now be done.
+    """
+    try:
+      self.lock.acquire()
+      copy_of_map = self.contextMap
+    finally:
+      self.lock.release()
+
+    self.unsolicitedContext.reprocess()
+    for context in copy_of_map:
+      copy_of_map[context].reprocess()
+
+
+  def _v1HandleMethodResp(self, codec, seq):
+    """
+    Handle a QMFv1 method response
+    """
+    code = codec.read_uint32()
+    text = codec.read_str16()
+    outArgs = {}
+    pair = self.seqMgr._release(seq)
+    if pair == None:
+      return
+    method, synchronous = pair
+    if code == 0:
+      for arg in method.arguments:
+        if arg.dir.find("O") != -1:
+          outArgs[arg.name] = self._decodeValue(codec, arg.type, broker)
+    result = MethodResult(code, text, outArgs)
+    if synchronous:
+      try:
+        broker.cv.acquire()
+        broker.syncResult = result
+        broker.syncInFlight = False
+        broker.cv.notify()
+      finally:
+        broker.cv.release()
+    else:
+      if self.console:
+        self.console.methodResponse(broker, seq, result)
+
+
+  def _v1HandleEventInd(self, broker, codec, seq):
+    """
+    Handle a QMFv1 event indication
+    """
+    if self.console != None:
+      event = Event(self, broker, codec)
+      self.console.event(broker, event)
+
+
+  def _v1HandleContentInd(self, broker, codec, seq, prop=False, stat=False):
+    """
+    Handle a QMFv1 content indication
+    """
+    classKey = ClassKey(codec)
+    schema = self.schemaCache.getSchema(classKey)
+    if not schema:
+      return
+
+    obj = Object(self, broker, schema, codec, prop, stat)
+    if classKey.getPackageName() == "org.apache.qpid.broker" and classKey.getClassName() == "agent" and prop:
+      broker._updateAgent(obj)
+
+    try:
+      self.lock.acquire()
+      if seq in self.syncSequenceList:
+        if object.getTimestamps()[2] == 0 and self._selectMatch(object):
+          self.getResult.append(object)
+        return
+    finally:
+      self.lock.release()
+
+    if self.console and self.rcvObjects:
+      if prop:
+        self.console.objectProps(broker, object)
+      if stat:
+        self.console.objectStats(broker, object)
+
+
+  def _v2HandleDataInd(self, mp, ah, content):
+    """
+    Handle a QMFv2 data indication from the agent
+    """
+    if mp.correlation_id:
+      sequence = int(mp.correlation_id)
+      if sequence not in self.contextMap:
+        return
+      context = self.contextMap[sequence]
+    else:
+      context = self.unsolicitedContext
+
+    kind = "_data"
+    if "qmf.content" in ah:
+      kind = ah["qmf.content"]
+    if kind == "_data":
+      if content.__class__ != list:
+        return
+      for omap in content:
+        context.addV2QueryResult(omap)
+      context.processV2Data()
+
+    if 'partial' not in ah:
+      context.signal()
+
+
+  def _v2HandleMethodRsp(self, mp, ah, content):
+    pass
+
+
+  def _v2HandleException(self, mp, ah, content):
+    pass
+
+
+  def _v1SendGetQuery(self, kwargs):
+    pass
+
+
+  def _v2SendGetQuery(self, sequence, kwargs):
+    """
+    Send a get query to a QMFv2 agent.
+    """
+    #
+    # Build the query map
+    #
+    query = {'_what': 'OBJECT'}
+    if '_class' in kwargs:
+      schemaMap = {'_class_name': kwargs['_class']}
+      if '_package' in kwargs:
+        schemaMap['_package_name'] = kwargs['_package']
+      query['_schema_id'] = schemaMap
+    elif '_key' in kwargs:
+      query['_schema_id'] = kwargs['_key'].asMap()
+    elif '_objectId' in kwargs:
+      query['_object_id'] = kwargs['_objectId'].asMap
+
+    #
+    # Construct and transmit the message
+    #
+    dp = self.broker.amqpSession.delivery_properties()
+    dp.routing_key = self.agentBank
+    mp = self.broker.amqpSession.message_properties()
+    mp.content_type = "amqp/map"
+    mp.user_id = self.broker.authUser
+    mp.correlation_id = str(sequence)
+    mp.app_id = "qmf2"
+    mp.reply_to = self.broker.amqpSession.reply_to("qmf.default.direct", self.broker.v2_queue_name)
+    mp.application_headers = {'qmf.opcode':'_query_request'}
+    sendCodec = Codec()
+    sendCodec.write_map(query)
+    msg = Message(dp, mp, sendCodec.encoded)
+    self.broker._send(msg, "qmf.default.direct")
+
+
+  def _v2SendSchemaRequest(self, schemaId):
+    """
+    Send a query to an agent to request details on a particular schema class.
+    IMPORTANT:  This function currently sends a QMFv1 schema-request to the address of
+                the agent.  The agent will send its response to amq.direct/<our-key>.
+                Eventually, this will be converted to a proper QMFv2 schema query.
+    """
+    sendCodec = Codec()
+    seq = self.seqMgr._reserve(None)
+    self.broker._setHeader(sendCodec, 'S', seq)
+    schemaId.encode(sendCodec)
+    smsg = self.broker._message(sendCodec.encoded, self.agentBank)
+    self.broker._send(smsg, "qmf.default.direct")
+
+
+  def _handleQmfV1Message(self, opcode, mp, ah, codec):
+    """
+    Process QMFv1 messages arriving from an agent.
+    """
+    if   opcode == 'm': self._v1HandleMethodResp(codec, seq)
+    elif opcode == 'e': self._v1HandleEventInd(codec, seq)
+    elif opcode == 'c': self._v1HandleContentInd(codec, seq, prop=True)
+    elif opcode == 'i': self._v1HandleContentInd(codec, seq, stat=True)
+    elif opcode == 'g': self._v1HandleContentInd(codec, seq, prop=True, stat=True)
+
+
+  def _handleQmfV2Message(self, opcode, mp, ah, content):
+    """
+    Process QMFv2 messages arriving from an agent.
+    """
+    if   opcode == '_data_indication': self._v2HandleDataInd(mp, ah, content)
+    elif opcode == '_query_response':  self._v2HandleDataInd(mp, ah, content)
+    elif opcode == '_method_response': self._v2HandleMethodRsp(mp, ah, content)
+    elif opcode == '_exception':       self._v2HandleException(mp, ah, content)
+
+
+#===================================================================================================
+# RequestContext
+#===================================================================================================
+class RequestContext(object):
+  """
+  This class tracks an asynchronous request sent to an agent.
+  """
+  def __init__(self, agent, notifiable):
+    self.agent = agent
+    self.schemaCache = self.agent.schemaCache
+    self.notifiable = notifiable
+    self.startTime = time()
+    self.rawQueryResults = []
+    self.queryResults = []
+    self.exception = None
+    self.waitingForSchema = None
+    self.cv = Condition()
+    self.blocked = notifiable == None
+
+
+  def addV2QueryResult(self, data):
+    self.rawQueryResults.append(data)
+
+
+  def setException(self, ex):
+    self.exception = ex
+
+
+  def getAge(self):
+    return time() - self.startTime
+
+
+  def waitForSignal(self, timeout):
+    try:
+      self.cv.acquire()
+      while self.blocked:
+        if (time() - self.startTime) > timeout:
+          self.exception = "Request timed out after %d seconds" % timeout
+          return
+        self.cv.wait(1)
+    finally:
+      self.cv.release()
+
+
+  def signal(self):
+    try:
+      self.cv.acquire()
+      self.blocked = None
+      self.cv.notify()
+    finally:
+      self.cv.release()
+
+
+  def processV2Data(self):
+    """
+    Attempt to make progress on the entries in the raw_query_results queue.  If an entry has a schema
+    that is in our schema cache, process it.  Otherwise, send a request for the schema information
+    to the agent that manages the object.
+    """
+    schemaId = None
+    queryResults = []
+    try:
+      self.cv.acquire()
+      if self.waitingForSchema:
+        return
+      while (not self.waitingForSchema) and len(self.rawQueryResults) > 0:
+        head = self.rawQueryResults[0]
+        schemaId = self._getSchemaIdforV2ObjectLH(head)
+        schema = self.schemaCache.getSchema(schemaId)
+        if schema:
+          obj = Object(self.agent, schema, v2Map=head, agentName=self.agent.agentBank)
+          queryResults.append(obj)
+          self.rawQueryResults.pop(0)
+        else:
+          self.waitingForSchema = True
+    finally:
+      self.cv.release()
+
+    if self.waitingForSchema:
+      self.agent._v2SendSchemaRequest(schemaId)
+
+    for result in queryResults:
+      if self.notifiable:
+        self.notifiable(qmf_object=result)
+      else:
+        self.queryResults.append(result)
+
+
+  def reprocess(self):
+    """
+    New schema information has been added to the schema-cache.  Clear our 'waiting' status
+    and see if we can make more progress on the raw query list.
+    """
+    try:
+      self.cv.acquire()
+      self.waitingForSchema = None
+    finally:
+      self.cv.release()
+    self.processV2Data()
+
+
+  def _getSchemaIdforV2ObjectLH(self, data):
+    """
+    Given a data map, extract the schema-identifier.
+    """
+    if data.__class__ != dict:
+      return None
+    if '_schema_id' in data:
+      return ClassKey(data['_schema_id'])
+    return None
+
+
+#===================================================================================================
+# Event
+#===================================================================================================
 class Event:
   """ """
   def __init__(self, session, broker, codec):
@@ -2318,6 +2632,10 @@ class Event:
   def getSchema(self):
     return self.schema
 
+
+#===================================================================================================
+# SequenceManager
+#===================================================================================================
 class SequenceManager:
   """ Manage sequence numbers for asynchronous method calls """
   def __init__(self):
@@ -2349,6 +2667,9 @@ class SequenceManager:
     return data
 
 
+#===================================================================================================
+# DebugConsole
+#===================================================================================================
 class DebugConsole(Console):
   """ """
   def brokerConnected(self, broker):



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org