You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2010/03/31 23:13:14 UTC

svn commit: r929716 [2/4] - in /qpid/trunk/qpid: cpp/bindings/qmf/tests/ cpp/examples/qmf-agent/ cpp/include/qpid/agent/ cpp/include/qpid/framing/ cpp/include/qpid/management/ cpp/managementgen/ cpp/managementgen/qmfgen/ cpp/managementgen/qmfgen/templa...

Modified: qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp?rev=929716&r1=929715&r2=929716&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp Wed Mar 31 21:13:12 2010
@@ -22,7 +22,7 @@
 #include "qpid/management/ManagementObject.h"
 #include "qpid/log/Statement.h"
 #include "qpid/agent/ManagementAgentImpl.h"
-#include "qpid/sys/Mutex.h"
+#include "qpid/amqp_0_10/Codecs.h"
 #include <list>
 #include <string.h>
 #include <stdlib.h>
@@ -41,6 +41,9 @@ using std::ofstream;
 using std::ifstream;
 using std::string;
 using std::endl;
+using qpid::types::Variant;
+using qpid::amqp_0_10::MapCodec;
+using qpid::amqp_0_10::ListCodec;
 
 namespace {
     Mutex lock;
@@ -81,7 +84,7 @@ const string ManagementAgentImpl::storeM
 ManagementAgentImpl::ManagementAgentImpl() :
     interval(10), extThread(false), pipeHandle(0), notifyCallback(0), notifyContext(0),
     notifyable(0), inCallback(false),
-    initialized(false), connected(false), lastFailure("never connected"),
+    initialized(false), connected(false), useMapMsg(false), lastFailure("never connected"),
     clientWasAdded(true), requestedBrokerBank(0), requestedAgentBank(0),
     assignedBrokerBank(0), assignedAgentBank(0), bootSequence(0),
     connThreadBody(*this), connThread(connThreadBody),
@@ -117,6 +120,21 @@ ManagementAgentImpl::~ManagementAgentImp
     }
 }
 
+void ManagementAgentImpl::setName(const string& vendor, const string& product, const string& instance)
+{
+    attrMap["_vendor"] = vendor;
+    attrMap["_product"] = product;
+    string inst;
+    if (instance.empty()) {
+        inst = qpid::types::Uuid(true).str();
+    } else
+        inst = instance;
+
+   name_address = vendor + ":" + product + ":" + inst;
+   attrMap["_instance"] = inst;
+   attrMap["_name"] = name_address;
+}
+
 void ManagementAgentImpl::init(const string& brokerHost,
                                uint16_t brokerPort,
                                uint16_t intervalSeconds,
@@ -140,7 +158,7 @@ void ManagementAgentImpl::init(const str
 void ManagementAgentImpl::init(const qpid::client::ConnectionSettings& settings,
                                uint16_t intervalSeconds,
                                bool useExternalThread,
-                               const std::string& _storeFile)
+                               const string& _storeFile)
 {
     interval     = intervalSeconds;
     extThread    = useExternalThread;
@@ -157,13 +175,16 @@ void ManagementAgentImpl::init(const qpi
         bootSequence = 1;
     storeData(true);
 
+    if (attrMap.empty())
+        setName("vendor", "product");
+
     initialized = true;
 }
 
 void ManagementAgentImpl::registerClass(const string& packageName,
                                         const string& className,
                                         uint8_t*     md5Sum,
-                                        qpid::management::ManagementObject::writeSchemaCall_t schemaCall)
+                                        ManagementObject::writeSchemaCall_t schemaCall)
 { 
     Mutex::ScopedLock lock(agentLock);
     PackageMap::iterator pIter = findOrAddPackage(packageName);
@@ -173,49 +194,77 @@ void ManagementAgentImpl::registerClass(
 void ManagementAgentImpl::registerEvent(const string& packageName,
                                         const string& eventName,
                                         uint8_t*     md5Sum,
-                                        qpid::management::ManagementObject::writeSchemaCall_t schemaCall)
+                                        ManagementObject::writeSchemaCall_t schemaCall)
 { 
     Mutex::ScopedLock lock(agentLock);
     PackageMap::iterator pIter = findOrAddPackage(packageName);
     addClassLocal(ManagementItem::CLASS_KIND_EVENT, pIter, eventName, md5Sum, schemaCall);
 }
 
+// old-style add object: 64bit id - deprecated
 ObjectId ManagementAgentImpl::addObject(ManagementObject* object,
                                         uint64_t          persistId)
 {
+    std::string key;
+    if (persistId) {
+        key = boost::lexical_cast<std::string>(persistId);
+    }
+    return addObject(object, key, persistId != 0);
+}
+
+
+// new style add object - use this approach!
+ObjectId ManagementAgentImpl::addObject(ManagementObject* object,
+                                        const std::string& key,
+                                        bool persistent)
+{
     Mutex::ScopedLock lock(addLock);
-    uint16_t sequence  = persistId ? 0 : bootSequence;
-    uint64_t objectNum = persistId ? persistId : nextObjectId++;
 
-    ObjectId objectId(&attachment, 0, sequence, objectNum);
+    uint16_t sequence  = persistent ? 0 : bootSequence;
+
+    ObjectId objectId(&attachment, 0, sequence);
+    if (key.empty())
+        objectId.setV2Key(*object);  // let object generate the key
+    else
+        objectId.setV2Key(key);
 
-    // TODO: fix object-id handling
     object->setObjectId(objectId);
     newManagementObjects[objectId] = object;
     return objectId;
 }
 
+
 void ManagementAgentImpl::raiseEvent(const ManagementEvent& event, severity_t severity)
 {
     Mutex::ScopedLock lock(agentLock);
     Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE);
-    uint32_t outLen;
     uint8_t sev = (severity == SEV_DEFAULT) ? event.getSeverity() : (uint8_t) severity;
     stringstream key;
 
     key << "console.event." << assignedBrokerBank << "." << assignedAgentBank << "." <<
         event.getPackageName() << "." << event.getEventName();
 
-    encodeHeader(outBuffer, 'e');
-    outBuffer.putShortString(event.getPackageName());
-    outBuffer.putShortString(event.getEventName());
-    outBuffer.putBin128(event.getMd5Sum());
-    outBuffer.putLongLong(uint64_t(Duration(now())));
-    outBuffer.putOctet(sev);
-    event.encode(outBuffer);
-    outLen = MA_BUFFER_SIZE - outBuffer.available();
-    outBuffer.reset();
-    connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", key.str());
+    Variant::Map map_;
+    Variant::Map schemaId;
+    Variant::Map values;
+    Variant::Map headers;
+    string content;
+
+    map_["_schema_id"] = mapEncodeSchemaId(event.getPackageName(),
+                                           event.getEventName(),
+                                           event.getMd5Sum());
+    event.mapEncode(values);
+    map_["_values"] = values;
+    map_["_timestamp"] = uint64_t(Duration(now()));
+    map_["_severity"] = sev;
+
+    headers["method"] = "indication";
+    headers["qmf.opcode"] = "_data_indication";
+    headers["qmf.content"] = "_event";
+    headers["qmf.agent"] = name_address;
+
+    MapCodec::encode(map_, content);
+    connThreadBody.sendBuffer(content, "", headers, "qmf.default.topic", key.str());
 }
 
 uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit)
@@ -235,8 +284,7 @@ uint32_t ManagementAgentImpl::pollCallba
         methodQueue.pop_front();
         {
             Mutex::ScopedUnlock unlock(agentLock);
-            Buffer inBuffer(const_cast<char*>(item->body.c_str()), item->body.size());
-            invokeMethodRequest(inBuffer, item->sequence, item->replyTo);
+            invokeMethodRequest(item->body, item->cid, item->replyTo);
             delete item;
         }
     }
@@ -274,20 +322,7 @@ void ManagementAgentImpl::setSignalCallb
 
 void ManagementAgentImpl::startProtocol()
 {
-    char    rawbuffer[512];
-    Buffer  buffer(rawbuffer, 512);
-
-    connected = true;
-    encodeHeader(buffer, 'A');
-    buffer.putShortString("RemoteAgent [C++]");
-    systemId.encode (buffer);
-    buffer.putLong(requestedBrokerBank);
-    buffer.putLong(requestedAgentBank);
-    uint32_t length = buffer.getPosition();
-    buffer.reset();
-    connThreadBody.sendBuffer(buffer, length, "qpid.management", "broker");
-    QPID_LOG(trace, "SENT AttachRequest: reqBroker=" << requestedBrokerBank <<
-             " reqAgent=" << requestedAgentBank);
+    sendHeartbeat();
 }
 
 void ManagementAgentImpl::storeData(bool requested)
@@ -323,76 +358,54 @@ void ManagementAgentImpl::retrieveData()
     }
 }
 
-void ManagementAgentImpl::sendCommandComplete(string replyToKey, uint32_t sequence,
-                                              uint32_t code, string text)
+void ManagementAgentImpl::sendHeartbeat()
 {
-    Buffer   outBuffer(outputBuffer, MA_BUFFER_SIZE);
-    uint32_t outLen;
+    static const string addr_exchange("qmf.default.topic");
+    static const string addr_key("agent.ind.heartbeat");
+
+    Variant::Map map;
+    Variant::Map headers;
+    string content;
 
-    encodeHeader(outBuffer, 'z', sequence);
-    outBuffer.putLong(code);
-    outBuffer.putShortString(text);
-    outLen = MA_BUFFER_SIZE - outBuffer.available();
-    outBuffer.reset();
-    connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyToKey);
-    QPID_LOG(trace, "SENT CommandComplete: seq=" << sequence << " code=" << code << " text=" << text);
+    headers["method"] = "indication";
+    headers["qmf.opcode"] = "_agent_heartbeat_indication";
+    headers["qmf.agent"] = name_address;
+
+    map["_values"] = attrMap;
+    map["_values"].asMap()["timestamp"] = uint64_t(Duration(now()));
+    map["_values"].asMap()["heartbeat_interval"] = interval;
+
+    MapCodec::encode(map, content);
+    connThreadBody.sendBuffer(content, "", headers, addr_exchange, addr_key);
+
+    QPID_LOG(trace, "SENT AgentHeartbeat name=" << name_address);
 }
 
-void ManagementAgentImpl::handleAttachResponse(Buffer& inBuffer)
+void ManagementAgentImpl::sendException(const string& replyToKey, const string& cid,
+                                        const string& text, uint32_t code)
 {
-    Mutex::ScopedLock lock(agentLock);
+    static const string addr_exchange("qmf.default.direct");
 
-    assignedBrokerBank = inBuffer.getLong();
-    assignedAgentBank  = inBuffer.getLong();
+    Variant::Map map;
+    Variant::Map headers;
+    Variant::Map values;
+    string content;
 
-    QPID_LOG(trace, "RCVD AttachResponse: broker=" << assignedBrokerBank << " agent=" << assignedAgentBank);
+    headers["method"] = "indication";
+    headers["qmf.opcode"] = "_exception";
+    headers["qmf.agent"] = name_address;
 
-    if ((assignedBrokerBank != requestedBrokerBank) ||
-        (assignedAgentBank  != requestedAgentBank)) {
-        if (requestedAgentBank == 0) {
-            QPID_LOG(notice, "Initial object-id bank assigned: " << assignedBrokerBank << "." <<
-                     assignedAgentBank);
-        } else {
-            QPID_LOG(warning, "Collision in object-id! New bank assigned: " << assignedBrokerBank <<
-                     "." << assignedAgentBank);
-        }
-        storeData();
-        requestedBrokerBank = assignedBrokerBank;
-        requestedAgentBank = assignedAgentBank;
-    }
-
-    attachment.setBanks(assignedBrokerBank, assignedAgentBank);
-
-    // Bind to qpid.management to receive commands
-    connThreadBody.bindToBank(assignedBrokerBank, assignedAgentBank);
-
-    // Send package indications for all local packages
-    for (PackageMap::iterator pIter = packages.begin();
-         pIter != packages.end();
-         pIter++) {
-        Buffer   outBuffer(outputBuffer, MA_BUFFER_SIZE);
-        uint32_t outLen;
+    values["error_code"] = code;
+    values["error_text"] = text;
+    map["_values"] = values;
 
-        encodeHeader(outBuffer, 'p');
-        encodePackageIndication(outBuffer, pIter);
-        outLen = MA_BUFFER_SIZE - outBuffer.available();
-        outBuffer.reset();
-        connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker");
+    MapCodec::encode(map, content);
+    connThreadBody.sendBuffer(content, cid, headers, addr_exchange, replyToKey);
 
-        // Send class indications for all local classes
-        ClassMap cMap = pIter->second;
-        for (ClassMap::iterator cIter = cMap.begin(); cIter != cMap.end(); cIter++) {
-            outBuffer.reset();
-            encodeHeader(outBuffer, 'q');
-            encodeClassIndication(outBuffer, pIter, cIter);
-            outLen = MA_BUFFER_SIZE - outBuffer.available();
-            outBuffer.reset();
-            connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker");
-        }
-    }
+    QPID_LOG(trace, "SENT Exception code=" << code <<" text=" << text);
 }
 
-void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence)
+void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, const string& replyTo)
 {
     Mutex::ScopedLock lock(agentLock);
     string packageName;
@@ -412,12 +425,14 @@ void ManagementAgentImpl::handleSchemaRe
             SchemaClass& schema = cIter->second;
             Buffer   outBuffer(outputBuffer, MA_BUFFER_SIZE);
             uint32_t outLen;
+            string   body;
 
             encodeHeader(outBuffer, 's', sequence);
-            schema.writeSchemaCall(outBuffer);
+            schema.writeSchemaCall(body);
+            outBuffer.putRawData(body);
             outLen = MA_BUFFER_SIZE - outBuffer.available();
             outBuffer.reset();
-            connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker");
+            connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyTo);
 
             QPID_LOG(trace, "SENT SchemaInd: package=" << packageName << " class=" << key.name);
         }
@@ -432,124 +447,250 @@ void ManagementAgentImpl::handleConsoleA
     QPID_LOG(trace, "RCVD ConsoleAddedInd");
 }
 
-void ManagementAgentImpl::invokeMethodRequest(Buffer& inBuffer, uint32_t sequence, string replyTo)
+void ManagementAgentImpl::invokeMethodRequest(const string& body, const string& cid, const string& replyTo)
 {
-    string   methodName;
-    string   packageName;
-    string   className;
-    uint8_t  hash[16];
-    Buffer   outBuffer(outputBuffer, MA_BUFFER_SIZE);
-    uint32_t outLen;
-
-    ObjectId objId(inBuffer);
-    inBuffer.getShortString(packageName);
-    inBuffer.getShortString(className);
-    inBuffer.getBin128(hash);
-    inBuffer.getShortString(methodName);
-
-    encodeHeader(outBuffer, 'm', sequence);
-
-    ManagementObjectMap::iterator iter = managementObjects.find(objId);
-    if (iter == managementObjects.end() || iter->second->isDeleted()) {
-        outBuffer.putLong        (Manageable::STATUS_UNKNOWN_OBJECT);
-        outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT));
+    string  methodName;
+    bool    failed = false;
+    Variant::Map inMap;
+    Variant::Map outMap;
+    Variant::Map::const_iterator oid, mid;
+    string content;
+
+    MapCodec::decode(body, inMap);
+
+    outMap["_values"] = Variant::Map();
+
+    if ((oid = inMap.find("_object_id")) == inMap.end() ||
+        (mid = inMap.find("_method_name")) == inMap.end()) {
+        (outMap["_values"].asMap())["_status_code"] = Manageable::STATUS_PARAMETER_INVALID;
+        (outMap["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID);
+        failed = true;
     } else {
-        if ((iter->second->getPackageName() != packageName) ||
-            (iter->second->getClassName()   != className)) {
-            outBuffer.putLong        (Manageable::STATUS_PARAMETER_INVALID);
-            outBuffer.putMediumString(Manageable::StatusText (Manageable::STATUS_PARAMETER_INVALID));
-        }
-        else
-            try {
-                outBuffer.record();
-                iter->second->doMethod(methodName, inBuffer, outBuffer);
-            } catch(exception& e) {
-                outBuffer.restore();
-                outBuffer.putLong(Manageable::STATUS_EXCEPTION);
-                outBuffer.putMediumString(e.what());
+        string methodName;
+        ObjectId objId;
+        Variant::Map inArgs;
+        Variant::Map callMap;
+
+        try {
+            // conversions will throw if input is invalid.
+            objId = ObjectId(oid->second.asMap());
+            methodName = mid->second.getString();
+
+            mid = inMap.find("_arguments");
+            if (mid != inMap.end()) {
+                inArgs = (mid->second).asMap();
+            }
+
+            ManagementObjectMap::iterator iter = managementObjects.find(objId);
+            if (iter == managementObjects.end() || iter->second->isDeleted()) {
+                (outMap["_values"].asMap())["_status_code"] = Manageable::STATUS_UNKNOWN_OBJECT;
+                (outMap["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT);
+                failed = true;
+            } else {
+                iter->second->doMethod(methodName, inArgs, callMap);
             }
+
+            if (callMap["_status_code"].asUint32() == 0) {
+                outMap["_arguments"] = Variant::Map();
+                for (Variant::Map::const_iterator iter = callMap.begin();
+                     iter != callMap.end(); iter++)
+                    if (iter->first != "_status_code" && iter->first != "_status_text")
+                        outMap["_arguments"].asMap()[iter->first] = iter->second;
+            } else {
+                (outMap["_values"].asMap())["_status_code"] = callMap["_status_code"];
+                (outMap["_values"].asMap())["_status_text"] = callMap["_status_text"];
+                failed = true;
+            }
+
+        } catch(types::InvalidConversion& e) {
+            outMap.clear();
+            outMap["_values"] = Variant::Map();
+            (outMap["_values"].asMap())["_status_code"] = Manageable::STATUS_EXCEPTION;
+            (outMap["_values"].asMap())["_status_text"] = e.what();
+            failed = true;
+        }
+    }
+
+    Variant::Map headers;
+    headers["method"] = "response";
+    headers["qmf.agent"] = name_address;
+    if (failed) {
+        headers["qmf.opcode"] = "_exception";
+        QPID_LOG(trace, "SENT Exception map=" << outMap);
+    } else {
+        headers["qmf.opcode"] = "_method_response";
+        QPID_LOG(trace, "SENT MethodResponse map=" << outMap);
     }
 
-    outLen = MA_BUFFER_SIZE - outBuffer.available();
-    outBuffer.reset();
-    connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyTo);
+    MapCodec::encode(outMap, content);
+    connThreadBody.sendBuffer(content, cid, headers, "qmf.default.direct", replyTo);
 }
 
-void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, string replyTo)
+void ManagementAgentImpl::handleGetQuery(const string& body, const string& cid, const string& replyTo)
 {
-    FieldTable           ft;
-    FieldTable::ValuePtr value;
-
     moveNewObjectsLH();
 
-    ft.decode(inBuffer);
+    Variant::Map inMap;
+    Variant::Map::const_iterator i;
+    Variant::Map headers;
+
+    MapCodec::decode(body, inMap);
+    QPID_LOG(trace, "RCVD GetQuery: map=" << inMap << " cid=" << cid);
+
+    headers["method"] = "response";
+    headers["qmf.opcode"] = "_query_response";
+    headers["qmf.content"] = "_data";
+    headers["qmf.agent"] = name_address;
+    headers["partial"] = Variant();
+
+    Variant::List list_;
+    Variant::Map  map_;
+    Variant::Map values;
+    Variant::Map oidMap;
+    string content;
+
+    /*
+     * Unpack the _what element of the query.  Currently we only support OBJECT queries.
+     */
+    i = inMap.find("_what");
+    if (i == inMap.end()) {
+        sendException(replyTo, cid, "_what element missing in Query");
+        return;
+    }
 
-    QPID_LOG(trace, "RCVD GetQuery: map=" << ft);
+    if (i->second.getType() != qpid::types::VAR_STRING) {
+        sendException(replyTo, cid, "_what element is not a string");
+        return;
+    }
 
-    value = ft.get("_class");
-    if (value.get() == 0 || !value->convertsTo<string>()) {
-        value = ft.get("_objectid");
-        if (value.get() == 0 || !value->convertsTo<string>())
-            return;
+    if (i->second.asString() != "OBJECT") {
+        sendException(replyTo, cid, "Query for _what => '" + i->second.asString() + "' not supported");
+        return;
+    }
 
-        ObjectId selector(value->get<string>());
-        ManagementObjectMap::iterator iter = managementObjects.find(selector);
+    string className;
+    string packageName;
+
+    /*
+     * Handle the _schema_id element, if supplied.
+     */
+    i = inMap.find("_schema_id");
+    if (i != inMap.end() && i->second.getType() == qpid::types::VAR_MAP) {
+        const Variant::Map& schemaIdMap(i->second.asMap());
+
+        Variant::Map::const_iterator s_iter = schemaIdMap.find("_class_name");
+        if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::types::VAR_STRING)
+            className = s_iter->second.asString();
+
+        s_iter = schemaIdMap.find("_package_name");
+        if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::types::VAR_STRING)
+            packageName = s_iter->second.asString();
+    }
+
+    /*
+     * Unpack the _object_id element of the query if it is present.  If it is present, find that one
+     * object and return it.  If it is not present, send a class-based result.
+     */
+    i = inMap.find("_object_id");
+    if (i != inMap.end() && i->second.getType() == qpid::types::VAR_MAP) {
+        ObjectId objId(i->second.asMap());
+
+        ManagementObjectMap::iterator iter = managementObjects.find(objId);
         if (iter != managementObjects.end()) {
             ManagementObject* object = iter->second;
-            Buffer   outBuffer (outputBuffer, MA_BUFFER_SIZE);
-            uint32_t outLen;
 
             if (object->getConfigChanged() || object->getInstChanged())
                 object->setUpdateTime();
 
-            encodeHeader(outBuffer, 'g', sequence);
-            object->writeProperties(outBuffer);
-            object->writeStatistics(outBuffer, true);
-            outLen = MA_BUFFER_SIZE - outBuffer.available ();
-            outBuffer.reset ();
-            connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyTo);
+            object->mapEncodeValues(values, true, true); // write both stats and properties
+            objId.mapEncode(oidMap);
+            map_["_values"] = values;
+            map_["_object_id"] = oidMap;
+            map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
+                                                   object->getClassName(),
+                                                   object->getMd5Sum());
+            list_.push_back(map_);
+            headers.erase("partial");
+
+            ListCodec::encode(list_, content);
+            connThreadBody.sendBuffer(content, cid, headers, "qmf.default.direct", replyTo, "amqp/list");
+            QPID_LOG(trace, "SENT QueryResponse (query by object_id) to=" << replyTo);
+            return;
+        }
+    } else {
+        for (ManagementObjectMap::iterator iter = managementObjects.begin();
+             iter != managementObjects.end();
+             iter++) {
+            ManagementObject* object = iter->second;
+            if (object->getClassName() == className &&
+                (packageName.empty() || object->getPackageName() == packageName)) {
+
+                // @todo support multiple object reply per message
+                values.clear();
+                list_.clear();
+                oidMap.clear();
+
+                if (object->getConfigChanged() || object->getInstChanged())
+                    object->setUpdateTime();
 
-            QPID_LOG(trace, "SENT ObjectInd");
+                object->mapEncodeValues(values, true, true); // write both stats and properties
+                iter->first.mapEncode(oidMap);
+                map_["_values"] = values;
+                map_["_object_id"] = oidMap;
+                map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
+                                                       object->getClassName(),
+                                                       object->getMd5Sum());
+                list_.push_back(map_);
+
+                ListCodec::encode(list_, content);
+                connThreadBody.sendBuffer(content, cid, headers, "qmf.default.direct", replyTo, "amqp/list");
+                QPID_LOG(trace, "SENT QueryResponse (query by schema_id) to=" << replyTo);
+            }
         }
-        sendCommandComplete(replyTo, sequence);
-        return;
     }
 
-    string className(value->get<string>());
+    // end empty "non-partial" message to indicate CommandComplete
+    list_.clear();
+    headers.erase("partial");
+    ListCodec::encode(list_, content);
+    connThreadBody.sendBuffer(content, cid, headers, "qmf.default.direct", replyTo, "amqp/list");
+    QPID_LOG(trace, "SENT QueryResponse (empty with no 'partial' indicator) to=" << replyTo);
+}
 
-    for (ManagementObjectMap::iterator iter = managementObjects.begin();
-         iter != managementObjects.end();
-         iter++) {
-        ManagementObject* object = iter->second;
-        if (object->getClassName() == className) {
-            Buffer   outBuffer(outputBuffer, MA_BUFFER_SIZE);
-            uint32_t outLen;
+void ManagementAgentImpl::handleLocateRequest(const string&, const string& cid, const string& replyTo)
+{
+    QPID_LOG(trace, "RCVD AgentLocateRequest");
+    static const string addr_exchange("qmf.default.direct");
 
-            if (object->getConfigChanged() || object->getInstChanged())
-                object->setUpdateTime();
+    Variant::Map map;
+    Variant::Map headers;
+    string content;
 
-            encodeHeader(outBuffer, 'g', sequence);
-            object->writeProperties(outBuffer);
-            object->writeStatistics(outBuffer, true);
-            outLen = MA_BUFFER_SIZE - outBuffer.available();
-            outBuffer.reset();
-            connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyTo);
+    headers["method"] = "indication";
+    headers["qmf.opcode"] = "_agent_locate_response";
+    headers["qmf.agent"] = name_address;
 
-            QPID_LOG(trace, "SENT ObjectInd");
-        }
-    }
+    map["_values"] = attrMap;
+    map["_values"].asMap()["timestamp"] = uint64_t(Duration(now()));
+    map["_values"].asMap()["heartbeat_interval"] = interval;
 
-    sendCommandComplete(replyTo, sequence);
+    MapCodec::encode(map, content);
+    connThreadBody.sendBuffer(content, cid, headers, addr_exchange, replyTo);
+
+    QPID_LOG(trace, "SENT AgentLocateResponse replyTo=" << replyTo);
+
+    {
+        Mutex::ScopedLock lock(agentLock);
+        clientWasAdded = true;
+    }
 }
 
-void ManagementAgentImpl::handleMethodRequest(Buffer& inBuffer, uint32_t sequence, string replyTo)
+void ManagementAgentImpl::handleMethodRequest(const string& body, const string& cid, const string& replyTo)
 {
     if (extThread) {
         Mutex::ScopedLock lock(agentLock);
-        string body;
 
-        inBuffer.getRawData(body, inBuffer.available());
-        methodQueue.push_back(new QueuedMethod(sequence, replyTo, body));
+        methodQueue.push_back(new QueuedMethod(cid, replyTo, body));
         if (pipeHandle != 0) {
             pipeHandle->write("X", 1);
         } else if (notifyable != 0) {
@@ -568,7 +709,7 @@ void ManagementAgentImpl::handleMethodRe
             inCallback = false;
         }
     } else {
-        invokeMethodRequest(inBuffer, sequence, replyTo);
+        invokeMethodRequest(body, cid, replyTo);
     }
 
     QPID_LOG(trace, "RCVD MethodRequest");
@@ -576,28 +717,45 @@ void ManagementAgentImpl::handleMethodRe
 
 void ManagementAgentImpl::received(Message& msg)
 {
+    string   replyToKey;
+    framing::MessageProperties mp = msg.getMessageProperties();
+    if (mp.hasReplyTo()) {
+        const framing::ReplyTo& rt = mp.getReplyTo();
+        replyToKey = rt.getRoutingKey();
+    }
+
+    if (mp.hasAppId() && mp.getAppId() == "qmf2")
+    {
+        string opcode = mp.getApplicationHeaders().getAsString("qmf.opcode");
+        string cid = msg.getMessageProperties().getCorrelationId();
+
+        if      (opcode == "_agent_locate_request") handleLocateRequest(msg.getData(), cid, replyToKey);
+        else if (opcode == "_method_request")       handleMethodRequest(msg.getData(), cid, replyToKey);
+        else if (opcode == "_query_request")        handleGetQuery(msg.getData(), cid, replyToKey);
+        else {
+            QPID_LOG(warning, "Support for QMF V2 Opcode [" << opcode << "] TBD!!!");
+        }
+        return;
+    }
+
+    // old preV2 binary messages
+    
+    uint32_t sequence;
     string   data = msg.getData();
     Buffer   inBuffer(const_cast<char*>(data.c_str()), data.size());
     uint8_t  opcode;
-    uint32_t sequence;
-    string   replyToKey;
 
-    framing::MessageProperties p = msg.getMessageProperties();
-    if (p.hasReplyTo()) {
-        const framing::ReplyTo& rt = p.getReplyTo();
-        replyToKey = rt.getRoutingKey();
-    }
 
     if (checkHeader(inBuffer, &opcode, &sequence))
     {
-        if      (opcode == 'a') handleAttachResponse(inBuffer);
-        else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence);
+        if      (opcode == 'S') handleSchemaRequest(inBuffer, sequence, replyToKey);
         else if (opcode == 'x') handleConsoleAddedIndication();
-        else if (opcode == 'G') handleGetQuery(inBuffer, sequence, replyToKey);
-        else if (opcode == 'M') handleMethodRequest(inBuffer, sequence, replyToKey);
+        else
+            QPID_LOG(warning, "Ignoring old-format QMF Request! opcode=" << char(opcode));
     }
 }
 
+
 void ManagementAgentImpl::encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq)
 {
     buf.putOctet('A');
@@ -607,6 +765,19 @@ void ManagementAgentImpl::encodeHeader(B
     buf.putLong (seq);
 }
 
+Variant::Map ManagementAgentImpl::mapEncodeSchemaId(const string& pname,
+                                                    const string& cname,
+                                                    const uint8_t *md5Sum)
+{
+    Variant::Map map_;
+
+    map_["_package_name"] = pname;
+    map_["_class_name"] = cname;
+    map_["_hash"] = types::Uuid(md5Sum);
+    return map_;
+}
+
+
 bool ManagementAgentImpl::checkHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq)
 {
     if (buf.getSize() < 8)
@@ -661,7 +832,7 @@ void ManagementAgentImpl::addClassLocal(
                                         PackageMap::iterator  pIter,
                                         const string&         className,
                                         uint8_t*              md5Sum,
-                                        qpid::management::ManagementObject::writeSchemaCall_t schemaCall)
+                                        ManagementObject::writeSchemaCall_t schemaCall)
 {
     SchemaClassKey key;
     ClassMap&      cMap = pIter->second;
@@ -701,10 +872,7 @@ void ManagementAgentImpl::encodeClassInd
 
 void ManagementAgentImpl::periodicProcessing()
 {
-#define BUFSIZE   65536
     Mutex::ScopedLock lock(agentLock);
-    char                msgChars[BUFSIZE];
-    uint32_t            contentSize;
     list<pair<ObjectId, ManagementObject*> > deleteList;
 
     if (!connected)
@@ -745,42 +913,53 @@ void ManagementAgentImpl::periodicProces
              !baseObject->isDeleted()))
             continue;
 
-        Buffer msgBuffer(msgChars, BUFSIZE);
+        Variant::List list_;
+
         for (ManagementObjectMap::iterator iter = baseIter;
              iter != managementObjects.end();
              iter++) {
             ManagementObject* object = iter->second;
+            bool send_stats, send_props;
             if (baseObject->isSameClass(*object) && object->getFlags() == 0) {
                 object->setFlags(1);
                 if (object->getConfigChanged() || object->getInstChanged())
                     object->setUpdateTime();
 
-                if (object->getConfigChanged() || object->getForcePublish() || object->isDeleted()) {
-                    encodeHeader(msgBuffer, 'c');
-                    object->writeProperties(msgBuffer);
-                }
-        
-                if (object->hasInst() && (object->getInstChanged() || object->getForcePublish())) {
-                    encodeHeader(msgBuffer, 'i');
-                    object->writeStatistics(msgBuffer);
+                send_props = (object->getConfigChanged() || object->getForcePublish() || object->isDeleted());
+                send_stats = (object->hasInst() && (object->getInstChanged() || object->getForcePublish()));
+
+                if (send_stats || send_props) {
+                    Variant::Map map_;
+                    Variant::Map values;
+                    Variant::Map oid;
+
+                    object->getObjectId().mapEncode(oid);
+                    map_["_object_id"] = oid;
+                    map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
+                                                           object->getClassName(),
+                                                           object->getMd5Sum());
+                    object->mapEncodeValues(values, send_props, send_stats);
+                    map_["_values"] = values;
+                    list_.push_back(map_);
                 }
 
                 if (object->isDeleted())
                     deleteList.push_back(pair<ObjectId, ManagementObject*>(iter->first, object));
                 object->setForcePublish(false);
-
-                if (msgBuffer.available() < (BUFSIZE / 2))
-                    break;
             }
         }
 
-        contentSize = BUFSIZE - msgBuffer.available();
-        if (contentSize > 0) {
-            msgBuffer.reset();
-            stringstream key;
-            key << "console.obj." << assignedBrokerBank << "." << assignedAgentBank << "." <<
-                baseObject->getPackageName() << "." << baseObject->getClassName();
-            connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", key.str());
+        string content;
+        ListCodec::encode(list_, content);
+        if (content.length()) {
+            Variant::Map  headers;
+            headers["method"] = "indication";
+            headers["qmf.opcode"] = "_data_indication";
+            headers["qmf.content"] = "_data";
+            headers["qmf.agent"] = name_address;
+
+            connThreadBody.sendBuffer(content, "", headers, "qmf.default.topic", "agent.ind.data", "amqp/list");
+            QPID_LOG(trace, "SENT DataIndication");
         }
     }
 
@@ -793,18 +972,7 @@ void ManagementAgentImpl::periodicProces
     }
 
     deleteList.clear();
-
-    {
-        Buffer msgBuffer(msgChars, BUFSIZE);
-        encodeHeader(msgBuffer, 'h');
-        msgBuffer.putLongLong(uint64_t(Duration(now())));
-        stringstream key;
-        key << "console.heartbeat." << assignedBrokerBank << "." << assignedAgentBank;
-
-        contentSize = BUFSIZE - msgBuffer.available();
-        msgBuffer.reset();
-        connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", key.str());
-    }
+    sendHeartbeat();
 }
 
 void ManagementAgentImpl::ConnectionThread::run()
@@ -831,6 +999,10 @@ void ManagementAgentImpl::ConnectionThre
                                      arg::exclusive=true);
                 session.exchangeBind(arg::exchange="amq.direct", arg::queue=queueName.str(),
                                      arg::bindingKey=queueName.str());
+                session.exchangeBind(arg::exchange="qmf.default.direct", arg::queue=queueName.str(),
+                                     arg::bindingKey=agent.name_address);
+                session.exchangeBind(arg::exchange="qmf.default.topic", arg::queue=queueName.str(),
+                                     arg::bindingKey="console.#");
 
                 subscriptions->subscribe(agent, queueName.str(), dest);
                 QPID_LOG(info, "Connection established with broker");
@@ -839,6 +1011,7 @@ void ManagementAgentImpl::ConnectionThre
                     if (shutdown)
                         return;
                     operational = true;
+                    agent.connected = true;
                     agent.startProtocol();
                     try {
                         Mutex::ScopedUnlock _unlock(connLock);
@@ -892,6 +1065,48 @@ void ManagementAgentImpl::ConnectionThre
                                                        const string& exchange,
                                                        const string& routingKey)
 {
+    Message msg;
+    string  data;
+
+    buf.getRawData(data, length);
+    msg.setData(data);
+    sendMessage(msg, exchange, routingKey);
+}
+
+
+
+void ManagementAgentImpl::ConnectionThread::sendBuffer(const string& data,
+                                                       const string& cid,
+                                                       const Variant::Map headers,
+                                                       const string& exchange,
+                                                       const string& routingKey,
+                                                       const string& contentType)
+{
+    Message msg;
+    Variant::Map::const_iterator i;
+
+    if (!cid.empty())
+        msg.getMessageProperties().setCorrelationId(cid);
+
+    if (!contentType.empty())
+        msg.getMessageProperties().setContentType(contentType);
+    for (i = headers.begin(); i != headers.end(); ++i) {
+        msg.getHeaders().setString(i->first, i->second.asString());
+    }
+    msg.getHeaders().setString("app_id", "qmf2");
+
+    msg.setData(data);
+    sendMessage(msg, exchange, routingKey);
+}
+
+
+
+
+
+void ManagementAgentImpl::ConnectionThread::sendMessage(Message msg,
+                                                        const string& exchange,
+                                                        const string& routingKey)
+{
     ConnectionThread::shared_ptr s;
     {
         Mutex::ScopedLock _lock(connLock);
@@ -900,23 +1115,21 @@ void ManagementAgentImpl::ConnectionThre
         s = subscriptions;
     }
 
-    Message msg;
-    string  data;
-
-    buf.getRawData(data, length);
     msg.getDeliveryProperties().setRoutingKey(routingKey);
     msg.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str()));
-    msg.setData(data);
+    msg.getMessageProperties().getApplicationHeaders().setString("qmf.agent", agent.name_address);
     try {
         session.messageTransfer(arg::content=msg, arg::destination=exchange);
     } catch(exception& e) {
-        QPID_LOG(error, "Exception caught in sendBuffer: " << e.what());
+        QPID_LOG(error, "Exception caught in sendMessage: " << e.what());
         // Bounce the connection
         if (s)
             s->stop();
     }
 }
 
+
+
 void ManagementAgentImpl::ConnectionThread::bindToBank(uint32_t brokerBank, uint32_t agentBank)
 {
     stringstream key;

Modified: qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h?rev=929716&r1=929715&r2=929716&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h Wed Mar 31 21:13:12 2010
@@ -51,6 +51,9 @@ class ManagementAgentImpl : public Manag
     // Methods from ManagementAgent
     //
     int getMaxThreads() { return 1; }
+    void setName(const std::string& vendor,
+                 const std::string& product,
+                 const std::string& instance="");
     void init(const std::string& brokerHost = "localhost",
               uint16_t brokerPort = 5672,
               uint16_t intervalSeconds = 10,
@@ -75,6 +78,8 @@ class ManagementAgentImpl : public Manag
                        uint8_t*     md5Sum,
                        management::ManagementObject::writeSchemaCall_t schemaCall);
     ObjectId addObject(management::ManagementObject* objectPtr, uint64_t persistId = 0);
+    ObjectId addObject(management::ManagementObject* objectPtr, const std::string& key,
+                       bool persistent);
     void raiseEvent(const management::ManagementEvent& event, severity_t severity = SEV_DEFAULT);
     uint32_t pollCallbacks(uint32_t callLimit = 0);
     int getSignalFd();
@@ -120,10 +125,10 @@ class ManagementAgentImpl : public Manag
     };
 
     struct QueuedMethod {
-        QueuedMethod(uint32_t _seq, std::string _reply, std::string _body) :
-            sequence(_seq), replyTo(_reply), body(_body) {}
+    QueuedMethod(const std::string& _cid, const std::string& _reply, const std::string& _body) :
+        cid(_cid), replyTo(_reply), body(_body) {}
 
-        uint32_t    sequence;
+        std::string cid;
         std::string replyTo;
         std::string body;
     };
@@ -140,6 +145,8 @@ class ManagementAgentImpl : public Manag
 
     void received (client::Message& msg);
 
+    qpid::types::Variant::Map attrMap;
+    std::string       name_address;
     uint16_t          interval;
     bool              extThread;
     sys::PipeHandle*  pipeHandle;
@@ -155,6 +162,7 @@ class ManagementAgentImpl : public Manag
     client::ConnectionSettings connectionSettings;
     bool              initialized;
     bool              connected;
+    bool              useMapMsg;
     std::string       lastFailure;
 
     bool              clientWasAdded;
@@ -198,6 +206,15 @@ class ManagementAgentImpl : public Manag
                         uint32_t               length,
                         const std::string&     exchange,
                         const std::string&     routingKey);
+        void sendBuffer(const std::string&     data,
+                        const std::string&     cid,
+                        const qpid::types::Variant::Map headers,
+                        const std::string&     exchange,
+                        const std::string&     routingKey,
+                        const std::string&     contentType="amqp/map");
+        void sendMessage(qpid::client::Message msg,
+                         const std::string&     exchange,
+                         const std::string&     routingKey);
         void bindToBank(uint32_t brokerBank, uint32_t agentBank);
         void close();
         bool isSleeping() const;
@@ -237,16 +254,21 @@ class ManagementAgentImpl : public Manag
                                 PackageMap::iterator   pIter,
                                 ClassMap::iterator     cIter);
     void encodeHeader (framing::Buffer& buf, uint8_t  opcode, uint32_t  seq = 0);
+    qpid::types::Variant::Map mapEncodeSchemaId(const std::string& pname,
+                                                const std::string& cname,
+                                                const uint8_t *md5Sum);
     bool checkHeader  (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
-    void sendCommandComplete  (std::string replyToKey, uint32_t sequence,
-                               uint32_t code = 0, std::string text = std::string("OK"));
-    void handleAttachResponse (qpid::framing::Buffer& inBuffer);
+    void sendHeartbeat();
+    void sendException(const std::string& replyToKey, const std::string& cid,
+                       const std::string& text, uint32_t code=1);
     void handlePackageRequest (qpid::framing::Buffer& inBuffer);
     void handleClassQuery     (qpid::framing::Buffer& inBuffer);
-    void handleSchemaRequest  (qpid::framing::Buffer& inBuffer, uint32_t sequence);
-    void invokeMethodRequest  (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo);
-    void handleGetQuery       (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo);
-    void handleMethodRequest  (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo);
+    void handleSchemaRequest  (qpid::framing::Buffer& inBuffer, uint32_t sequence, const std::string& replyTo);
+    void invokeMethodRequest  (const std::string& body, const std::string& cid, const std::string& replyTo);
+
+    void handleGetQuery       (const std::string& body, const std::string& cid, const std::string& replyTo);
+    void handleLocateRequest  (const std::string& body, const std::string& sequence, const std::string& replyTo);
+    void handleMethodRequest  (const std::string& body, const std::string& sequence, const std::string& replyTo);
     void handleConsoleAddedIndication();
 };
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=929716&r1=929715&r2=929716&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Wed Mar 31 21:13:12 2010
@@ -93,7 +93,8 @@ Broker::Options::Options(const std::stri
     tcpNoDelay(false),
     requireEncrypted(false),
     maxSessionRate(0),
-    asyncQueueEvents(false)     // Must be false in a cluster.
+    asyncQueueEvents(false),     // Must be false in a cluster.
+    qmf2Support(false)
 {
     int c = sys::SystemInfo::concurrency();
     workerThreads=c+1;
@@ -114,6 +115,7 @@ Broker::Options::Options(const std::stri
         ("connection-backlog", optValue(connectionBacklog, "N"), "Sets the connection backlog limit for the server socket")
         ("staging-threshold", optValue(stagingThreshold, "N"), "Stages messages over N bytes to disk")
         ("mgmt-enable,m", optValue(enableMgmt,"yes|no"), "Enable Management")
+        ("mgmt-qmf2", optValue(qmf2Support,"yes|no"), "Use QMF v2 for Broker Management")
         ("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"), "Management Publish Interval")
         ("queue-purge-interval", optValue(queueCleanInterval, "SECONDS"),
          "Interval between attempts to purge any expired messages from queues")
@@ -138,7 +140,9 @@ const std::string knownHostsNone("none")
 Broker::Broker(const Broker::Options& conf) :
     poller(new Poller),
     config(conf),
-    managementAgent(conf.enableMgmt ? new ManagementAgent() : 0),
+    managementAgent(conf.enableMgmt ? new ManagementAgent(!conf.qmf2Support,
+                                                          conf.qmf2Support)
+                                    : 0),
     store(new NullMessageStore),
     acl(0),
     dataDir(conf.noDataDir ? std::string() : conf.dataDir),
@@ -164,6 +168,7 @@ Broker::Broker(const Broker::Options& co
         QPID_LOG(info, "Management enabled");
         managementAgent->configure(dataDir.isEnabled() ? dataDir.getPath() : string(),
                                    conf.mgmtPubInterval, this, conf.workerThreads + 3);
+        managementAgent->setName("apache.org", "qpidd");
         _qmf::Package packageInitializer(managementAgent.get());
 
         System* system = new System (dataDir.isEnabled() ? dataDir.getPath() : string(), this);

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=929716&r1=929715&r2=929716&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Wed Mar 31 21:13:12 2010
@@ -113,6 +113,7 @@ public:
         std::string knownHosts;
         uint32_t maxSessionRate;
         bool asyncQueueEvents;
+        bool qmf2Support;
 
       private:
         std::string getHome();

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=929716&r1=929715&r2=929716&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp Wed Mar 31 21:13:12 2010
@@ -149,7 +149,7 @@ Exchange::Exchange(const string& _name, 
             mgmtExchange = new _qmf::Exchange (agent, this, parent, _name);
             mgmtExchange->set_durable(durable);
             mgmtExchange->set_autoDelete(false);
-            mgmtExchange->set_arguments(args);
+            mgmtExchange->set_arguments(ManagementAgent::toMap(args));
             if (!durable) {
                 if (name.empty()) {
                     agent->addObject (mgmtExchange, 0x1000000000000004LL);  // Special default exchange ID
@@ -336,7 +336,7 @@ void Exchange::Binding::startManagement(
                         {
                             management::ObjectId queueId = mo->getObjectId();
                             mgmtBinding = new _qmf::Binding
-                                (agent, this, (Manageable*) parent, queueId, key, args);
+                                (agent, this, (Manageable*) parent, queueId, key, ManagementAgent::toMap(args));
                             if (!origin.empty())
                                 mgmtBinding->set_origin(origin);
                             agent->addObject (mgmtBinding, agent->allocateId(this));

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=929716&r1=929715&r2=929716&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Wed Mar 31 21:13:12 2010
@@ -873,7 +873,7 @@ void Queue::configure(const FieldTable& 
     if (p && p->convertsTo<std::string>()) insertSequenceNumbers(p->get<std::string>());
 
     if (mgmtObject != 0)
-        mgmtObject->set_arguments (_settings);
+        mgmtObject->set_arguments(ManagementAgent::toMap(_settings));
 
     if ( isDurable() && ! getPersistenceId() && ! recovering )
       store->create(*this, _settings);

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=929716&r1=929715&r2=929716&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Wed Mar 31 21:13:12 2010
@@ -280,7 +280,7 @@ SemanticState::ConsumerImpl::ConsumerImp
         if (agent != 0)
         {
             mgmtObject = new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId() ,name,
-                !acquire, ackExpected, exclusive ,arguments);
+                                                !acquire, ackExpected, exclusive, ManagementAgent::toMap(arguments));
             agent->addObject (mgmtObject, agent->allocateId(this));
             mgmtObject->set_creditMode("WINDOW");
         }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=929716&r1=929715&r2=929716&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Wed Mar 31 21:13:12 2010
@@ -106,7 +106,7 @@ void SessionAdapter::ExchangeHandlerImpl
             ManagementAgent* agent = getBroker().getManagementAgent();
             if (agent)
                 agent->raiseEvent(_qmf::EventExchangeDeclare(getConnection().getUrl(), getConnection().getUserId(), exchange, type,
-                                                             alternateExchange, durable, false, args,
+                                                             alternateExchange, durable, false, ManagementAgent::toMap(args),
                                                              response.second ? "created" : "existing"));
 
         }catch(UnknownExchangeTypeException& /*e*/){
@@ -194,7 +194,8 @@ void SessionAdapter::ExchangeHandlerImpl
 
             ManagementAgent* agent = getBroker().getManagementAgent();
             if (agent)
-                agent->raiseEvent(_qmf::EventBind(getConnection().getUrl(), getConnection().getUserId(), exchangeName, queueName, exchangeRoutingKey, arguments));
+                agent->raiseEvent(_qmf::EventBind(getConnection().getUrl(), getConnection().getUserId(), exchangeName,
+                                                  queueName, exchangeRoutingKey, ManagementAgent::toMap(arguments)));
         }
     }else{
         throw NotFoundException("Bind failed. No such exchange: " + exchangeName);
@@ -389,7 +390,7 @@ void SessionAdapter::QueueHandlerImpl::d
         ManagementAgent* agent = getBroker().getManagementAgent();
         if (agent)
             agent->raiseEvent(_qmf::EventQueueDeclare(getConnection().getUrl(), getConnection().getUserId(),
-                                                      name, durable, exclusive, autoDelete, arguments,
+                                                      name, durable, exclusive, autoDelete, ManagementAgent::toMap(arguments),
                                                       queue_created.second ? "created" : "existing"));
     }
 
@@ -499,7 +500,7 @@ SessionAdapter::MessageHandlerImpl::subs
     ManagementAgent* agent = getBroker().getManagementAgent();
     if (agent)
         agent->raiseEvent(_qmf::EventSubscribe(getConnection().getUrl(), getConnection().getUserId(),
-                                               queueName, destination, exclusive, arguments));
+                                               queueName, destination, exclusive, ManagementAgent::toMap(arguments)));
 }
 
 void

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/System.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/System.cpp?rev=929716&r1=929715&r2=929716&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/System.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/System.cpp Wed Mar 31 21:13:12 2010
@@ -22,6 +22,7 @@
 #include "qpid/management/ManagementAgent.h"
 #include "qpid/framing/Uuid.h"
 #include "qpid/sys/SystemInfo.h"
+#include "qpid/types/Uuid.h"
 #include <iostream>
 #include <fstream>
 
@@ -64,7 +65,7 @@ System::System (string _dataDir, Broker*
             }
         }
 
-        mgmtObject = new _qmf::System (agent, this, systemId);
+        mgmtObject = new _qmf::System(agent, this, types::Uuid(systemId.c_array()));
         std::string sysname, nodename, release, version, machine;
         qpid::sys::SystemInfo::getSystemId (sysname,
                                             nodename,



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org