You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2010/03/31 23:13:14 UTC

svn commit: r929716 [3/4] - in /qpid/trunk/qpid: cpp/bindings/qmf/tests/ cpp/examples/qmf-agent/ cpp/include/qpid/agent/ cpp/include/qpid/framing/ cpp/include/qpid/management/ cpp/managementgen/ cpp/managementgen/qmfgen/ cpp/managementgen/qmfgen/templa...

Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=929716&r1=929715&r2=929716&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp Wed Mar 31 21:13:12 2010
@@ -29,20 +29,46 @@
 #include "qpid/sys/Time.h"
 #include "qpid/broker/ConnectionState.h"
 #include "qpid/broker/AclModule.h"
+#include "qpid/types/Variant.h"
+#include "qpid/types/Uuid.h"
+#include "qpid/framing/List.h"
+#include "qpid/amqp_0_10/Codecs.h"
 #include <list>
 #include <iostream>
 #include <fstream>
 #include <sstream>
+#include <typeinfo>
 
 using boost::intrusive_ptr;
 using qpid::framing::Uuid;
+using qpid::types::Variant;
+using qpid::amqp_0_10::MapCodec;
+using qpid::amqp_0_10::ListCodec;
 using namespace qpid::framing;
 using namespace qpid::management;
 using namespace qpid::broker;
 using namespace qpid::sys;
+using namespace qpid;
 using namespace std;
 namespace _qmf = qmf::org::apache::qpid::broker;
 
+
+
+static Variant::Map mapEncodeSchemaId(const std::string& pname,
+                                      const std::string& cname,
+                                      const std::string& type,
+                                      const uint8_t *md5Sum)
+{
+    Variant::Map map_;
+
+    map_["_package_name"] = pname;
+    map_["_class_name"] = cname;
+    map_["_type"] = type;
+    map_["_hash"] = qpid::types::Uuid(md5Sum);
+    return map_;
+}
+
+
 ManagementAgent::RemoteAgent::~RemoteAgent ()
 {
     QPID_LOG(trace, "Remote Agent removed bank=[" << brokerBank << "." << agentBank << "]");
@@ -52,10 +78,11 @@ ManagementAgent::RemoteAgent::~RemoteAge
     }
 }
 
-ManagementAgent::ManagementAgent () :
+ManagementAgent::ManagementAgent (const bool qmfV1, const bool qmfV2) :
     threadPoolSize(1), interval(10), broker(0), timer(0),
     startTime(uint64_t(Duration(now()))),
-    suppressed(false)
+    suppressed(false),
+    qmf1Support(qmfV1), qmf2Support(qmfV2)
 {
     nextObjectId   = 1;
     brokerBank     = 1;
@@ -148,6 +175,27 @@ void ManagementAgent::pluginsInitialized
     timer->add(new Periodic(*this, interval));
 }
 
+
+void ManagementAgent::setName(const string& vendor, const string& product, const string& instance)
+{
+    attrMap["_vendor"] = vendor;
+    attrMap["_product"] = product;
+    string inst;
+    if (instance.empty()) {
+        if (uuid.isNull())
+        {
+            throw Exception("ManagementAgent::configure() must be called if default name is used.");
+        }
+        inst = uuid.str();
+    } else
+        inst = instance;
+
+   name_address = vendor + ":" + product + ":" + inst;
+   attrMap["_instance"] = inst;
+   attrMap["_name"] = name_address;
+}
+
+
 void ManagementAgent::writeData ()
 {
     string   filename (dataDir + "/.mbrokerdata");
@@ -194,6 +242,7 @@ void ManagementAgent::registerEvent (con
     addClassLH(ManagementItem::CLASS_KIND_EVENT, pIter, eventName, md5Sum, schemaCall);
 }
 
+// Deprecated:  V1 objects
 ObjectId ManagementAgent::addObject(ManagementObject* object, uint64_t persistId)
 {
     uint16_t sequence;
@@ -207,8 +256,47 @@ ObjectId ManagementAgent::addObject(Mana
         objectNum = persistId;
     }
 
-    ObjectId objId(0 /*flags*/ , sequence, brokerBank, 0, objectNum);
-    objId.setV2Key(*object);
+    ObjectId objId(0 /*flags*/, sequence, brokerBank, objectNum);
+    objId.setV2Key(*object);   // let object generate the v2 key
+
+    object->setObjectId(objId);
+
+    {
+        Mutex::ScopedLock lock (addLock);
+        ManagementObjectMap::iterator destIter = newManagementObjects.find(objId);
+        if (destIter != newManagementObjects.end()) {
+            if (destIter->second->isDeleted()) {
+                newDeletedManagementObjects.push_back(destIter->second);
+                newManagementObjects.erase(destIter);
+            } else {
+                QPID_LOG(error, "ObjectId collision in addObject. class=" << object->getClassName() <<
+                         " key=" << objId.getV2Key());
+                return objId;
+            }
+        }
+        newManagementObjects[objId] = object;
+    }
+
+    return objId;
+}
+
+
+
+ObjectId ManagementAgent::addObject(ManagementObject* object,
+                                    const std::string& key,
+                                    bool persistent)
+{
+    uint16_t sequence;
+
+    sequence = persistent ? 0 : bootSequence;
+
+    ObjectId objId(0 /*flags*/, sequence, brokerBank);
+    if (key.empty()) {
+        objId.setV2Key(*object);   // let object generate the key
+    } else {
+        objId.setV2Key(key);
+    }
+
     object->setObjectId(objId);
 
     {
@@ -233,21 +321,57 @@ ObjectId ManagementAgent::addObject(Mana
 void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severity)
 {
     Mutex::ScopedLock lock (userLock);
-    Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE);
-    uint32_t outLen;
     uint8_t sev = (severity == SEV_DEFAULT) ? event.getSeverity() : (uint8_t) severity;
 
-    encodeHeader(outBuffer, 'e');
-    outBuffer.putShortString(event.getPackageName());
-    outBuffer.putShortString(event.getEventName());
-    outBuffer.putBin128(event.getMd5Sum());
-    outBuffer.putLongLong(uint64_t(Duration(now())));
-    outBuffer.putOctet(sev);
-    event.encode(outBuffer);
-    outLen = MA_BUFFER_SIZE - outBuffer.available();
-    outBuffer.reset();
-    sendBuffer(outBuffer, outLen, mExchange,
-               "console.event.1.0." + event.getPackageName() + "." + event.getEventName());
+    if (qmf1Support) {
+        Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE);
+        uint32_t outLen;
+
+        encodeHeader(outBuffer, 'e');
+        outBuffer.putShortString(event.getPackageName());
+        outBuffer.putShortString(event.getEventName());
+        outBuffer.putBin128(event.getMd5Sum());
+        outBuffer.putLongLong(uint64_t(Duration(now())));
+        outBuffer.putOctet(sev);
+        std::string sBuf;
+        event.encode(sBuf);
+        outBuffer.putRawData(sBuf);
+        outLen = MA_BUFFER_SIZE - outBuffer.available();
+        outBuffer.reset();
+        sendBuffer(outBuffer, outLen, mExchange,
+                   "console.event.1.0." + event.getPackageName() + "." + event.getEventName());
+        QPID_LOG(trace, "SEND raiseEvent (v1) class=" << event.getPackageName() << "." << event.getEventName());
+    }
+
+    if (qmf2Support) {
+        Variant::Map map_;
+        Variant::Map schemaId;
+        Variant::Map values;
+        Variant::Map headers;
+
+        map_["_schema_id"] = mapEncodeSchemaId(event.getPackageName(),
+                                               event.getEventName(),
+                                               "_event",
+                                               event.getMd5Sum());
+        event.mapEncode(values);
+        map_["_values"] = values;
+        map_["_timestamp"] = uint64_t(Duration(now()));
+        map_["_severity"] = sev;
+
+        headers["method"] = "indication";
+        headers["qmf.opcode"] = "_data_indication";
+        headers["qmf.content"] = "_event";
+        headers["qmf.agent"] = name_address;
+
+        stringstream key;
+        key << "agent.ind.event." << sev << "." << name_address << "." << event.getEventName();
+
+        string content;
+        MapCodec::encode(map_, content);
+        sendBuffer(content, "", headers, v2Topic, key.str());
+        QPID_LOG(trace, "SEND raiseEvent (v2) class=" << event.getPackageName() << "." << event.getEventName());
+    }
+
 }
 
 ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds)
@@ -355,6 +479,59 @@ void ManagementAgent::sendBuffer(Buffer&
     } catch(exception&) {}
 }
 
+
+void ManagementAgent::sendBuffer(const std::string& data,
+                                 const std::string& cid,
+                                 const Variant::Map& headers,
+                                 qpid::broker::Exchange::shared_ptr exchange,
+                                 const std::string& routingKey)
+{
+    Variant::Map::const_iterator i;
+
+    if (suppressed) {
+        QPID_LOG(trace, "Suppressed management message to " << routingKey);
+        return;
+    }
+    if (exchange.get() == 0) return;
+
+    intrusive_ptr<Message> msg(new Message());
+    AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange->getName (), 0, 0)));
+    AMQFrame header((AMQHeaderBody()));
+    AMQFrame content((AMQContentBody(data)));
+
+    method.setEof(false);
+    header.setBof(false);
+    header.setEof(false);
+    content.setBof(false);
+
+    msg->getFrames().append(method);
+    msg->getFrames().append(header);
+
+    MessageProperties* props =
+        msg->getFrames().getHeaders()->get<MessageProperties>(true);
+    props->setContentLength(data.length());
+    if (!cid.empty()) {
+        props->setCorrelationId(cid);
+    }
+
+    for (i = headers.begin(); i != headers.end(); ++i) {
+        msg->getOrInsertHeaders().setString(i->first, i->second.asString());
+    }
+    msg->getOrInsertHeaders().setString("app_id", "qmf2");
+
+    DeliveryProperties* dp =
+        msg->getFrames().getHeaders()->get<DeliveryProperties>(true);
+    dp->setRoutingKey(routingKey);
+
+    msg->getFrames().append(content);
+
+    DeliverableMessage deliverable (msg);
+    try {
+        exchange->route(deliverable, routingKey, 0);
+    } catch(exception&) {}
+}
+
+
 void ManagementAgent::moveNewObjectsLH()
 {
     Mutex::ScopedLock lock (addLock);
@@ -391,12 +568,13 @@ void ManagementAgent::periodicProcessing
 {
 #define BUFSIZE   65536
 #define HEADROOM  4096
-    QPID_LOG(trace, "Management agent periodic processing")
-        Mutex::ScopedLock lock (userLock);
+    QPID_LOG(trace, "Management agent periodic processing");
+    Mutex::ScopedLock lock (userLock);
     char                msgChars[BUFSIZE];
     uint32_t            contentSize;
     string              routingKey;
     list<pair<ObjectId, ManagementObject*> > deleteList;
+    std::string sBuf;
 
     uint64_t uptime = uint64_t(Duration(now())) - startTime;
     static_cast<_qmf::Broker*>(broker->GetManagementObject())->set_uptime(uptime);
@@ -439,43 +617,90 @@ void ManagementAgent::periodicProcessing
             continue;
 
         Buffer msgBuffer(msgChars, BUFSIZE);
+        Variant::List list_;
+
         for (ManagementObjectMap::iterator iter = baseIter;
              iter != managementObjects.end();
              iter++) {
             ManagementObject* object = iter->second;
+            bool send_stats, send_props;
             if (baseObject->isSameClass(*object) && object->getFlags() == 0) {
                 object->setFlags(1);
                 if (object->getConfigChanged() || object->getInstChanged())
                     object->setUpdateTime();
 
-                if (object->getConfigChanged() || object->getForcePublish() || object->isDeleted()) {
+                send_props = (object->getConfigChanged() || object->getForcePublish() || object->isDeleted());
+                send_stats = (object->hasInst() && (object->getInstChanged() || object->getForcePublish()));
+
+                if (send_props && qmf1Support) {
                     encodeHeader(msgBuffer, 'c');
-                    object->writeProperties(msgBuffer);
-                    pcount++;
+                    sBuf.clear();
+                    object->writeProperties(sBuf);
+                    msgBuffer.putRawData(sBuf);
                 }
-        
-                if (object->hasInst() && (object->getInstChanged() || object->getForcePublish())) {
+
+                if (send_stats && qmf1Support) {
                     encodeHeader(msgBuffer, 'i');
-                    object->writeStatistics(msgBuffer);
-                    scount++;
+                    sBuf.clear();
+                    object->writeStatistics(sBuf);
+                    msgBuffer.putRawData(sBuf);
+                }
+
+                if ((send_stats || send_props) && qmf2Support) {
+                    Variant::Map  map_;
+                    Variant::Map values;
+
+                    map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
+                                                           object->getClassName(),
+                                                           "_data",
+                                                           object->getMd5Sum());
+                    object->mapEncodeValues(values, send_props, send_stats);
+                    map_["_values"] = values;
+                    list_.push_back(map_);
+
                 }
 
+                if (send_props) pcount++;
+                if (send_stats) scount++;
+
                 if (object->isDeleted())
                     deleteList.push_back(pair<ObjectId, ManagementObject*>(iter->first, object));
                 object->setForcePublish(false);
 
-                if (msgBuffer.available() < HEADROOM)
+                if (qmf1Support && (msgBuffer.available() < HEADROOM))
                     break;
             }
         }
 
-        contentSize = BUFSIZE - msgBuffer.available();
-        if (contentSize > 0) {
-            msgBuffer.reset();
-            stringstream key;
-            key << "console.obj.1.0." << baseObject->getPackageName() << "." << baseObject->getClassName();
-            sendBuffer(msgBuffer, contentSize, mExchange, key.str());
-            QPID_LOG(trace, "SEND Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount);
+        if (pcount || scount) {
+            if (qmf1Support) {
+                contentSize = BUFSIZE - msgBuffer.available();
+                if (contentSize > 0) {
+                    msgBuffer.reset();
+                    stringstream key;
+                    key << "console.obj.1.0." << baseObject->getPackageName() << "." << baseObject->getClassName();
+                    sendBuffer(msgBuffer, contentSize, mExchange, key.str());
+                    QPID_LOG(trace, "SEND Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount);
+                }
+            }
+
+            if (qmf2Support) {
+                string content;
+                ListCodec::encode(list_, content);
+                if (content.length()) {
+                    stringstream key;
+                    Variant::Map  headers;
+                    key << "agent.ind.data." << baseObject->getPackageName() << "." << baseObject->getClassName();
+                    // key << "console.obj.1.0." << baseObject->getPackageName() << "." << baseObject->getClassName();
+                    headers["method"] = "indication";
+                    headers["qmf.opcode"] = "_data_indication";
+                    headers["qmf.content"] = "_data";
+                    headers["qmf.agent"] = name_address;
+
+                    sendBuffer(content, "", headers, v2Topic, key.str());
+                    QPID_LOG(trace, "SEND Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount);
+                }
+            }
         }
     }
 
@@ -492,15 +717,49 @@ void ManagementAgent::periodicProcessing
     for (ManagementObjectVector::iterator cdIter = deletedManagementObjects.begin();
          cdIter != deletedManagementObjects.end(); cdIter++) {
         collisionDeletions = true;
-        Buffer msgBuffer(msgChars, BUFSIZE);
-        encodeHeader(msgBuffer, 'c');
-        (*cdIter)->writeProperties(msgBuffer);
-        contentSize = BUFSIZE - msgBuffer.available ();
-        msgBuffer.reset ();
-        stringstream key;
-        key << "console.obj.1.0." << (*cdIter)->getPackageName() << "." << (*cdIter)->getClassName();
-        sendBuffer (msgBuffer, contentSize, mExchange, key.str());
-        QPID_LOG(trace, "SEND ContentInd for deleted object to=" << key.str());
+        {
+            if (qmf1Support) {
+                Buffer msgBuffer(msgChars, BUFSIZE);
+                encodeHeader(msgBuffer, 'c');
+                sBuf.clear();
+                (*cdIter)->writeProperties(sBuf);
+                msgBuffer.putRawData(sBuf);
+                contentSize = BUFSIZE - msgBuffer.available ();
+                msgBuffer.reset ();
+                stringstream key;
+                key << "console.obj.1.0." << (*cdIter)->getPackageName() << "." << (*cdIter)->getClassName();
+                sendBuffer (msgBuffer, contentSize, mExchange, key.str());
+                QPID_LOG(trace, "SEND ContentInd for deleted object to=" << key.str());
+            }
+
+            if (qmf2Support) {
+                Variant::List list_;
+                Variant::Map  map_;
+                Variant::Map  values;
+                Variant::Map  headers;
+
+                map_["_schema_id"] = mapEncodeSchemaId((*cdIter)->getPackageName(),
+                                                       (*cdIter)->getClassName(),
+                                                       "_data",
+                                                       (*cdIter)->getMd5Sum());
+                (*cdIter)->mapEncodeValues(values, true, false);
+                map_["_values"] = values;
+                list_.push_back(map_);
+
+                headers["method"] = "indication";
+                headers["qmf.opcode"] = "_data_indication";
+                headers["qmf.content"] = "_data";
+                headers["qmf.agent"] = name_address;
+
+                stringstream key;
+                key << "agent.ind.data." << (*cdIter)->getPackageName() << "." << (*cdIter)->getClassName();
+
+                string content;
+                ListCodec::encode(list_, content);
+                sendBuffer(content, "", headers, v2Topic, key.str());
+                QPID_LOG(trace, "SEND ContentInd for deleted object to=" << key.str());
+            }
+        }
     }
 
     if (!deleteList.empty() || collisionDeletions) {
@@ -508,7 +767,12 @@ void ManagementAgent::periodicProcessing
         deleteOrphanedAgentsLH();
     }
 
-    {
+    // heartbeat generation
+
+    if (qmf1Support) {
+#define BUFSIZE   65536
+        uint32_t            contentSize;
+        char                msgChars[BUFSIZE];
         Buffer msgBuffer(msgChars, BUFSIZE);
         encodeHeader(msgBuffer, 'h');
         msgBuffer.putLongLong(uint64_t(Duration(now())));
@@ -519,6 +783,27 @@ void ManagementAgent::periodicProcessing
         sendBuffer (msgBuffer, contentSize, mExchange, routingKey);
         QPID_LOG(trace, "SEND HeartbeatInd to=" << routingKey);
     }
+
+    if (qmf2Support) {
+        static const string addr_key("agent.ind.heartbeat");
+
+        Variant::Map map;
+        Variant::Map headers;
+
+        headers["method"] = "indication";
+        headers["qmf.opcode"] = "_agent_heartbeat_indication";
+        headers["qmf.agent"] = name_address;
+
+        map["_values"] = attrMap;
+        map["_values"].asMap()["timestamp"] = uint64_t(Duration(now()));
+        map["_values"].asMap()["heartbeat_interval"] = interval;
+
+        string content;
+        MapCodec::encode(map, content);
+        sendBuffer(content, "", headers, v2Topic, addr_key);
+
+        QPID_LOG(trace, "SENT AgentHeartbeat name=" << name_address);
+    }
     QPID_LOG(debug, "periodic update " << debugSnapshot());
 }
 
@@ -531,19 +816,51 @@ void ManagementAgent::deleteObjectNowLH(
     if (!object->isDeleted())
         return;
 
+    if (qmf1Support) {
 #define DNOW_BUFSIZE 2048
-    char     msgChars[DNOW_BUFSIZE];
-    uint32_t contentSize;
-    Buffer   msgBuffer(msgChars, DNOW_BUFSIZE);
-
-    encodeHeader(msgBuffer, 'c');
-    object->writeProperties(msgBuffer);
-    contentSize = msgBuffer.getPosition();
-    msgBuffer.reset();
-    stringstream key;
-    key << "console.obj.1.0." << object->getPackageName() << "." << object->getClassName();
-    sendBuffer(msgBuffer, contentSize, mExchange, key.str());
-    QPID_LOG(trace, "SEND Immediate(delete) ContentInd to=" << key.str());
+        char     msgChars[DNOW_BUFSIZE];
+        uint32_t contentSize;
+        Buffer   msgBuffer(msgChars, DNOW_BUFSIZE);
+        std::string sBuf;
+
+        encodeHeader(msgBuffer, 'c');
+        object->writeProperties(sBuf);
+        msgBuffer.putRawData(sBuf);
+        contentSize = msgBuffer.getPosition();
+        msgBuffer.reset();
+        stringstream key;
+        key << "console.obj.1.0." << object->getPackageName() << "." << object->getClassName();
+        sendBuffer(msgBuffer, contentSize, mExchange, key.str());
+        QPID_LOG(trace, "SEND Immediate(delete) ContentInd to=" << key.str());
+    }
+
+    if (qmf2Support) {
+        Variant::List list_;
+        Variant::Map  map_;
+        Variant::Map  values;
+
+        map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
+                                               object->getClassName(),
+                                               "_data",
+                                               object->getMd5Sum());
+        object->mapEncodeValues(values, true, false);
+        map_["_values"] = values;
+        list_.push_back(map_);
+
+        stringstream key;
+        key << "agent.ind.data." << object->getPackageName() << "." << object->getClassName();
+
+        Variant::Map  headers;
+        headers["method"] = "indication";
+        headers["qmf.opcode"] = "_data_indication";
+        headers["qmf.content"] = "_data";
+        headers["qmf.agent"] = name_address;
+
+        string content;
+        ListCodec::encode(list_, content);
+        sendBuffer(content, "", headers, v2Topic, key.str());
+        QPID_LOG(trace, "SEND Immediate(delete) ContentInd to=" << key.str());
+    }
 
     managementObjects.erase(oid);
 }
@@ -566,35 +883,68 @@ void ManagementAgent::sendCommandComplet
 
 bool ManagementAgent::dispatchCommand (Deliverable&      deliverable,
                                        const string&     routingKey,
-                                       const FieldTable* /*args*/)
+                                       const FieldTable* /*args*/,
+                                       const bool topic)
 {
     Mutex::ScopedLock lock (userLock);
     Message&  msg = ((DeliverableMessage&) deliverable).getMessage ();
 
-    // Parse the routing key.  This management broker should act as though it
-    // is bound to the exchange to match the following keys:
-    //
-    //    agent.1.0.#
-    //    broker
-    //    schema.#
+    if (qmf1Support && topic) {
 
-    if (routingKey == "broker") {
-        dispatchAgentCommandLH(msg);
-        return false;
-    }
+        // qmf1 is bound only to the topic management exchange.
+        // Parse the routing key.  This management broker should act as though it
+        // is bound to the exchange to match the following keys:
+        //
+        //    agent.1.0.#
+        //    broker
+        //    schema.#
 
-    else if (routingKey.compare(0, 9, "agent.1.0") == 0) {
-        dispatchAgentCommandLH(msg);
-        return false;
-    }
+        if (routingKey == "broker") {
+            dispatchAgentCommandLH(msg);
+            return false;
+        }
+
+        if (routingKey.length() > 6) {
 
-    else if (routingKey.compare(0, 8, "agent.1.") == 0) {
-        return authorizeAgentMessageLH(msg);
+            if (routingKey.compare(0, 9, "agent.1.0") == 0) {
+                dispatchAgentCommandLH(msg);
+                return false;
+            }
+
+            if (routingKey.compare(0, 8, "agent.1.") == 0) {
+                return authorizeAgentMessageLH(msg);
+            }
+
+            if (routingKey.compare(0, 7, "schema.") == 0) {
+                dispatchAgentCommandLH(msg);
+                return true;
+            }
+        }
     }
 
-    else if (routingKey.compare(0, 7, "schema.") == 0) {
-        dispatchAgentCommandLH(msg);
-        return true;
+    if (qmf2Support) {
+
+        if (topic) {
+
+            // Intercept messages bound to:
+            //  "console.ind.locate.# - process these messages, and also allow them to be forwarded.
+
+            if (routingKey.compare(0, 18, "console.ind.locate") == 0) {
+                dispatchAgentCommandLH(msg);
+                return true;
+            }
+
+        } else { // direct exchange
+
+            // Intercept messages bound to:
+            //  "broker" - generic alias for the local broker
+            //  "<name_address>" - the broker agent's proper name
+            // and do not forward them futher
+            if (routingKey == "broker" || routingKey == name_address) {
+                dispatchAgentCommandLH(msg);
+                return false;
+            }
+        }
     }
 
     return true;
@@ -610,14 +960,19 @@ void ManagementAgent::handleMethodReques
     Buffer   outBuffer (outputBuffer, MA_BUFFER_SIZE);
     uint32_t outLen;
     AclModule* acl = broker->getAcl();
+    std::string inArgs;
 
-    ObjectId objId(inBuffer);
+    std::string sBuf;
+    inBuffer.getRawData(sBuf, 16);
+    ObjectId objId;
+    objId.decode(sBuf);
     inBuffer.getShortString(packageName);
     inBuffer.getShortString(className);
     inBuffer.getBin128(hash);
     inBuffer.getShortString(methodName);
+    inBuffer.getRawData(inArgs, inBuffer.available());
 
-    QPID_LOG(trace, "RECV MethodRequest class=" << packageName << ":" << className << "(" << Uuid(hash) << ") method=" <<
+    QPID_LOG(trace, "RECV MethodRequest (v1) class=" << packageName << ":" << className << "(" << Uuid(hash) << ") method=" <<
              methodName << " replyTo=" << replyToKey);
 
     encodeHeader(outBuffer, 'm', sequence);
@@ -629,8 +984,8 @@ void ManagementAgent::handleMethodReques
         outLen = MA_BUFFER_SIZE - outBuffer.available();
         outBuffer.reset();
         sendBuffer(outBuffer, outLen, dExchange, replyToKey);
-        QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << sequence)
-            return;
+        QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << sequence);
+        return;
     }
 
     if (acl != 0) {
@@ -645,8 +1000,8 @@ void ManagementAgent::handleMethodReques
             outLen = MA_BUFFER_SIZE - outBuffer.available();
             outBuffer.reset();
             sendBuffer(outBuffer, outLen, dExchange, replyToKey);
-            QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence)
-                return;
+            QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence);
+            return;
         }
     }
 
@@ -664,7 +1019,9 @@ void ManagementAgent::handleMethodReques
             try {
                 outBuffer.record();
                 Mutex::ScopedUnlock u(userLock);
-                iter->second->doMethod(methodName, inBuffer, outBuffer);
+                std::string outBuf;
+                iter->second->doMethod(methodName, inArgs, outBuf);
+                outBuffer.putRawData(outBuf);
             } catch(exception& e) {
                 outBuffer.restore();
                 outBuffer.putLong(Manageable::STATUS_EXCEPTION);
@@ -675,9 +1032,135 @@ void ManagementAgent::handleMethodReques
     outLen = MA_BUFFER_SIZE - outBuffer.available();
     outBuffer.reset();
     sendBuffer(outBuffer, outLen, dExchange, replyToKey);
-    QPID_LOG(trace, "SEND MethodResponse to=" << replyToKey << " seq=" << sequence);
+    QPID_LOG(trace, "SEND MethodResponse (v1) to=" << replyToKey << " seq=" << sequence);
 }
 
+
+void ManagementAgent::handleMethodRequestLH (const std::string& body, string replyTo,
+                                             const std::string& cid, const ConnectionToken* connToken)
+{
+    string   methodName;
+    Variant::Map inMap;
+    MapCodec::decode(body, inMap);
+    Variant::Map::const_iterator oid, mid;
+    string content;
+
+    Variant::Map outMap;
+    Variant::Map headers;
+
+    headers["method"] = "response";
+    headers["qmf.opcode"] = "_method_response";
+    headers["qmf.agent"] = name_address;
+
+    if ((oid = inMap.find("_object_id")) == inMap.end() ||
+        (mid = inMap.find("_method_name")) == inMap.end())
+    {
+        headers["qmf.opcode"] = "_exception";
+        (outMap["_values"].asMap())["_status"] = Manageable::STATUS_PARAMETER_INVALID;
+        (outMap["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID);
+
+        MapCodec::encode(outMap, content);
+        sendBuffer(content, cid, headers, v2Direct, replyTo);
+        QPID_LOG(trace, "SEND MethodResponse (invalid param) to=" << replyTo << " seq=" << cid);
+        return;
+    }
+
+    ObjectId objId;
+    Variant::Map inArgs;
+
+    try {
+        // coversions will throw if input is invalid.
+        objId = ObjectId(oid->second.asMap());
+        methodName = mid->second.getString();
+
+        mid = inMap.find("_arguments");
+        if (mid != inMap.end()) {
+            inArgs = (mid->second).asMap();
+        }
+    } catch(exception& e) {
+        headers["qmf.opcode"] = "_exception";
+        (outMap["_values"].asMap())["_status"] = Manageable::STATUS_EXCEPTION;
+        (outMap["_values"].asMap())["_status_text"] = e.what();
+
+        MapCodec::encode(outMap, content);
+        sendBuffer(content, cid, headers, v2Direct, replyTo);
+        QPID_LOG(trace, "SEND MethodResponse (invalid format) to=" << replyTo << " seq=" << cid);
+        return;
+    }
+
+    ManagementObjectMap::iterator iter = managementObjects.find(objId);
+
+    if (iter == managementObjects.end() || iter->second->isDeleted()) {
+        headers["qmf.opcode"] = "_exception";
+        (outMap["_values"].asMap())["_status"] = Manageable::STATUS_UNKNOWN_OBJECT;
+        (outMap["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT);
+
+        MapCodec::encode(outMap, content);
+        sendBuffer(content, cid, headers, v2Direct, replyTo);
+        QPID_LOG(trace, "SEND MethodResponse (unknown object) to=" << replyTo << " seq=" << cid);
+        return;
+    }
+
+    // validate
+    AclModule* acl = broker->getAcl();
+    DisallowedMethods::const_iterator i;
+
+    i = disallowed.find(std::make_pair(iter->second->getClassName(), methodName));
+    if (i != disallowed.end()) {
+        headers["qmf.opcode"] = "_exception";
+        (outMap["_values"].asMap())["_status"] = Manageable::STATUS_FORBIDDEN;
+        (outMap["_values"].asMap())["_status_text"] = i->second;
+
+        MapCodec::encode(outMap, content);
+        sendBuffer(content, cid, headers, v2Direct, replyTo);
+        QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << cid);
+        return;
+    }
+
+    if (acl != 0) {
+        string userId = ((const qpid::broker::ConnectionState*) connToken)->getUserId();
+        map<acl::Property, string> params;
+        params[acl::PROP_SCHEMAPACKAGE] = iter->second->getPackageName();
+        params[acl::PROP_SCHEMACLASS]   = iter->second->getClassName();
+
+        if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_METHOD, methodName, &params)) {
+            headers["qmf.opcode"] = "_exception";
+            (outMap["_values"].asMap())["_status"] = Manageable::STATUS_FORBIDDEN;
+            (outMap["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_FORBIDDEN);
+
+            MapCodec::encode(outMap, content);
+            sendBuffer(content, cid, headers, v2Direct, replyTo);
+            QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << cid);
+            return;
+        }
+    }
+
+    // invoke the method
+
+    QPID_LOG(trace, "RECV MethodRequest (v2) class=" << iter->second->getPackageName()
+             << ":" << iter->second->getClassName() << " method=" <<
+             methodName << " replyTo=" << replyTo);
+
+    try {
+        iter->second->doMethod(methodName, inArgs, outMap);
+    } catch(exception& e) {
+        outMap.clear();
+        headers["qmf.opcode"] = "_exception";
+        (outMap["_values"].asMap())["_status"] = Manageable::STATUS_EXCEPTION;
+        (outMap["_values"].asMap())["_status_text"] = e.what();
+
+        MapCodec::encode(outMap, content);
+        sendBuffer(content, cid, headers, v2Direct, replyTo);
+        QPID_LOG(trace, "SEND MethodResponse (exception) to=" << replyTo << " seq=" << cid);
+        return;
+    }
+
+    MapCodec::encode(outMap, content);
+    sendBuffer(content, cid, headers, v2Direct, replyTo);
+    QPID_LOG(trace, "SEND MethodResponse (v2) to=" << replyTo << " seq=" << cid);
+}
+
+
 void ManagementAgent::handleBrokerRequestLH (Buffer&, string replyToKey, uint32_t sequence)
 {
     Buffer   outBuffer (outputBuffer, MA_BUFFER_SIZE);
@@ -781,6 +1264,7 @@ void ManagementAgent::handleClassIndLH (
         uint32_t outLen;
         uint32_t sequence = nextRequestSequence++;
 
+        // Schema Request
         encodeHeader (outBuffer, 'S', sequence);
         outBuffer.putShortString(packageName);
         key.encode(outBuffer);
@@ -803,9 +1287,11 @@ void ManagementAgent::SchemaClass::appen
     // linked in via plug-in), call the schema handler directly.  If the package
     // is from a remote management agent, send the stored schema information.
 
-    if (writeSchemaCall != 0)
-        writeSchemaCall(buf);
-    else
+    if (writeSchemaCall != 0) {
+        std::string schema;
+        writeSchemaCall(schema);
+        buf.putRawData(schema);
+    } else
         buf.putRawData(reinterpret_cast<uint8_t*>(&data[0]), data.size());
 }
 
@@ -981,7 +1467,7 @@ void ManagementAgent::handleAttachReques
     agent->mgmtObject->set_connectionRef(agent->connectionRef);
     agent->mgmtObject->set_label        (label);
     agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId());
-    agent->mgmtObject->set_systemId     (systemId);
+    agent->mgmtObject->set_systemId     ((const unsigned char*)systemId.data());
     agent->mgmtObject->set_brokerBank   (brokerBank);
     agent->mgmtObject->set_agentBank    (assignedBank);
     addObject (agent->mgmtObject, 0);
@@ -1012,7 +1498,7 @@ void ManagementAgent::handleGetQueryLH (
 
     ft.decode(inBuffer);
 
-    QPID_LOG(trace, "RECV GetQuery query=" << ft << " seq=" << sequence);
+    QPID_LOG(trace, "RECV GetQuery (v1) query=" << ft << " seq=" << sequence);
 
     value = ft.get("_class");
     if (value.get() == 0 || !value->convertsTo<string>()) {
@@ -1031,13 +1517,17 @@ void ManagementAgent::handleGetQueryLH (
                 object->setUpdateTime();
 
             if (!object->isDeleted()) {
+                std::string sBuf;
                 encodeHeader(outBuffer, 'g', sequence);
-                object->writeProperties(outBuffer);
-                object->writeStatistics(outBuffer, true);
+                object->writeProperties(sBuf);
+                outBuffer.putRawData(sBuf);
+                sBuf.clear();
+                object->writeStatistics(sBuf, true);
+                outBuffer.putRawData(sBuf);
                 outLen = MA_BUFFER_SIZE - outBuffer.available ();
                 outBuffer.reset ();
                 sendBuffer(outBuffer, outLen, dExchange, replyToKey);
-                QPID_LOG(trace, "SEND GetResponse to=" << replyToKey << " seq=" << sequence);
+                QPID_LOG(trace, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence);
             }
         }
         sendCommandComplete(replyToKey, sequence);
@@ -1058,13 +1548,17 @@ void ManagementAgent::handleGetQueryLH (
                 object->setUpdateTime();
 
             if (!object->isDeleted()) {
+                std::string sBuf;
                 encodeHeader(outBuffer, 'g', sequence);
-                object->writeProperties(outBuffer);
-                object->writeStatistics(outBuffer, true);
+                object->writeProperties(sBuf);
+                outBuffer.putRawData(sBuf);
+                sBuf.clear();
+                object->writeStatistics(sBuf, true);
+                outBuffer.putRawData(sBuf);
                 outLen = MA_BUFFER_SIZE - outBuffer.available ();
                 outBuffer.reset ();
                 sendBuffer(outBuffer, outLen, dExchange, replyToKey);
-                QPID_LOG(trace, "SEND GetResponse to=" << replyToKey << " seq=" << sequence);
+                QPID_LOG(trace, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence);
             }
         }
     }
@@ -1072,64 +1566,285 @@ void ManagementAgent::handleGetQueryLH (
     sendCommandComplete(replyToKey, sequence);
 }
 
+
+void ManagementAgent::handleGetQueryLH(const std::string& body, std::string replyTo, const std::string& cid, const std::string& contentType)
+{
+    FieldTable           ft;
+    FieldTable::ValuePtr value;
+
+    moveNewObjectsLH();
+
+    if (contentType != "_query_v1") {
+        QPID_LOG(warning, "Support for QMF V2 Query format TBD!!!");
+        return;
+    }
+
+    Variant::Map inMap;
+    MapCodec::decode(body, inMap);
+    Variant::Map::const_iterator i;
+    Variant::Map headers;
+
+    QPID_LOG(trace, "RECV GetQuery (v2): map=" << inMap << " seq=" << cid);
+
+    headers["method"] = "response";
+    headers["qmf.opcode"] = "_query_response";
+    headers["qmf.content"] = "_data";
+    headers["qmf.agent"] = name_address;
+    headers["partial"];
+
+    Variant::List list_;
+    Variant::Map  map_;
+    Variant::Map values;
+    string className;
+    string content;
+
+    i = inMap.find("_class");
+    if (i != inMap.end())
+        try {
+            className = i->second.asString();
+        } catch(exception& e) {
+            className.clear();
+            QPID_LOG(trace, "RCVD GetQuery: invalid format - class target ignored.");
+        }
+
+    if (className.empty()) {
+        ObjectId objId;
+        i = inMap.find("_object_id");
+        if (i != inMap.end()) {
+
+            try {
+                objId = ObjectId(i->second.asMap());
+            } catch (exception &e) {
+                objId = ObjectId();   // empty object id - won't find a match (I hope).
+                QPID_LOG(trace, "RCVD GetQuery (invalid Object Id format) to=" << replyTo << " seq=" << cid);
+            }
+
+            ManagementObjectMap::iterator iter = managementObjects.find(objId);
+            if (iter != managementObjects.end()) {
+                ManagementObject* object = iter->second;
+
+                if (object->getConfigChanged() || object->getInstChanged())
+                    object->setUpdateTime();
+
+                if (!object->isDeleted()) {
+                    object->mapEncodeValues(values, true, true); // write both stats and properties
+                    map_["_values"] = values;
+                    list_.push_back(map_);
+
+                    ListCodec::encode(list_, content);
+                    sendBuffer(content, cid, headers, v2Direct, replyTo);
+                }
+            }
+        }
+    } else {
+        for (ManagementObjectMap::iterator iter = managementObjects.begin();
+             iter != managementObjects.end();
+             iter++) {
+            ManagementObject* object = iter->second;
+            if (object->getClassName () == className) {
+
+                // @todo: support multiple objects per message reply
+                values.clear();
+                list_.clear();
+                if (object->getConfigChanged() || object->getInstChanged())
+                    object->setUpdateTime();
+
+                if (!object->isDeleted()) {
+                    object->mapEncodeValues(values, true, true); // write both stats and properties
+                map_["_values"] = values;
+                list_.push_back(map_);
+
+                ListCodec::encode(list_, content);
+                sendBuffer(content, cid, headers, v2Direct, replyTo);
+                }
+            }
+        }
+    }
+
+    // end empty "non-partial" message to indicate CommandComplete
+    list_.clear();
+    headers.erase("partial");
+    ListCodec::encode(list_, content);
+    sendBuffer(content, cid, headers, v2Direct, replyTo);
+    QPID_LOG(trace, "SEND GetResponse (v2) to=" << replyTo << " seq=" << cid);
+}
+
+
+void ManagementAgent::handleLocateRequestLH(const string&, const string& replyTo,
+                                            const string& cid)
+{
+    QPID_LOG(trace, "RCVD AgentLocateRequest");
+
+    Variant::Map map;
+    Variant::Map headers;
+
+    headers["method"] = "indication";
+    headers["qmf.opcode"] = "_agent_locate_response";
+    headers["qmf.agent"] = name_address;
+
+    map["_values"] = attrMap;
+    map["_values"].asMap()["timestamp"] = uint64_t(Duration(now()));
+    map["_values"].asMap()["heartbeat_interval"] = interval;
+
+    string content;
+    MapCodec::encode(map, content);
+    sendBuffer(content, cid, headers, v2Direct, replyTo);
+
+    QPID_LOG(trace, "SENT AgentLocateResponse replyTo=" << replyTo);
+}
+
+
 bool ManagementAgent::authorizeAgentMessageLH(Message& msg)
 {
     Buffer   inBuffer (inputBuffer, MA_BUFFER_SIZE);
-    uint8_t  opcode;
-    uint32_t sequence;
-    string   replyToKey;
+    uint32_t sequence = 0;
+    bool methodReq = false;
+    bool mapMsg = false;
+    string  packageName;
+    string  className;
+    string  methodName;
+    std::string cid;
 
     if (msg.encodedSize() > MA_BUFFER_SIZE)
         return false;
 
     msg.encodeContent(inBuffer);
+    uint32_t bufferLen = inBuffer.getPosition();
     inBuffer.reset();
 
-    if (!checkHeader(inBuffer, &opcode, &sequence))
-        return false;
+    const framing::MessageProperties* p =
+      msg.getFrames().getHeaders()->get<framing::MessageProperties>();
+
+    const framing::FieldTable *headers = msg.getApplicationHeaders();
+
+    if (headers && headers->getAsString("app_id") == "qmf2")
+    {
+        mapMsg = true;
+
+        if (p && p->hasCorrelationId()) {
+            cid = p->getCorrelationId();
+        }
+
+        if (headers->getAsString("qmf.opcode") == "_method_request")
+        {
+            methodReq = true;
+
+            // extract object id and method name
+
+            std::string body;
+            inBuffer.getRawData(body, bufferLen);
+            Variant::Map inMap;
+            MapCodec::decode(body, inMap);
+            Variant::Map::const_iterator oid, mid;
+
+            ObjectId objId;
+
+            if ((oid = inMap.find("_object_id")) == inMap.end() ||
+                (mid = inMap.find("_method_name")) == inMap.end()) {
+                QPID_LOG(warning,
+                         "Missing fields in QMF authorize req received.");
+                return false;
+            }
+
+            try {
+                // coversions will throw if input is invalid.
+                objId = ObjectId(oid->second.asMap());
+                methodName = mid->second.getString();
+            } catch(exception& e) {
+                QPID_LOG(warning,
+                         "Badly formatted QMF authorize req received.");
+                return false;
+            }
+
+            // look up schema for object to get package and class name
+
+            ManagementObjectMap::iterator iter = managementObjects.find(objId);
+
+            if (iter == managementObjects.end() || iter->second->isDeleted()) {
+                QPID_LOG(debug, "ManagementAgent::authorizeAgentMessageLH: stale object id " <<
+                         objId);
+                return false;
+            }
 
-    if (opcode == 'M') {
+            packageName = iter->second->getPackageName();
+            className = iter->second->getClassName();
+        }
+    } else {    // old style binary message format
+
+        uint8_t  opcode;
+
+        if (!checkHeader(inBuffer, &opcode, &sequence))
+            return false;
+
+        if (opcode == 'M') {
+            methodReq = true;
+
+            // extract method name & schema package and class name
+
+            uint8_t hash[16];
+            inBuffer.getLongLong(); // skip over object id
+            inBuffer.getLongLong();
+            inBuffer.getShortString(packageName);
+            inBuffer.getShortString(className);
+            inBuffer.getBin128(hash);
+            inBuffer.getShortString(methodName);
+
+        }
+    }
+
+    if (methodReq) {
         // TODO: check method call against ACL list.
+        map<acl::Property, string> params;
         AclModule* acl = broker->getAcl();
         if (acl == 0)
             return true;
 
         string  userId = ((const qpid::broker::ConnectionState*) msg.getPublisher())->getUserId();
-        string  packageName;
-        string  className;
-        uint8_t hash[16];
-        string  methodName;
-
-        map<acl::Property, string> params;
-        ObjectId objId(inBuffer);
-        inBuffer.getShortString(packageName);
-        inBuffer.getShortString(className);
-        inBuffer.getBin128(hash);
-        inBuffer.getShortString(methodName);
-
         params[acl::PROP_SCHEMAPACKAGE] = packageName;
         params[acl::PROP_SCHEMACLASS]   = className;
 
         if (acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_METHOD, methodName, &params))
             return true;
 
+        // authorization failed, send reply if replyTo present
+
         const framing::MessageProperties* p =
             msg.getFrames().getHeaders()->get<framing::MessageProperties>();
         if (p && p->hasReplyTo()) {
             const framing::ReplyTo& rt = p->getReplyTo();
-            replyToKey = rt.getRoutingKey();
+            string replyToKey = rt.getRoutingKey();
 
-            Buffer   outBuffer(outputBuffer, MA_BUFFER_SIZE);
-            uint32_t outLen;
+            if (mapMsg) {
 
-            encodeHeader(outBuffer, 'm', sequence);
-            outBuffer.putLong(Manageable::STATUS_FORBIDDEN);
-            outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN));
-            outLen = MA_BUFFER_SIZE - outBuffer.available();
-            outBuffer.reset();
-            sendBuffer(outBuffer, outLen, dExchange, replyToKey);
-            QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence)
-                }
+                Variant::Map outMap;
+                Variant::Map headers;
+
+                headers["method"] = "response";
+                headers["qmf.opcode"] = "_method_response";
+                headers["qmf.agent"] = name_address;
+
+                ((outMap["_error"].asMap())["_values"].asMap())["_status"] = Manageable::STATUS_FORBIDDEN;
+                ((outMap["_error"].asMap())["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_FORBIDDEN);
+
+                string content;
+                MapCodec::encode(outMap, content);
+                sendBuffer(content, cid, headers, v2Direct, replyToKey);
+
+            } else {
+
+                Buffer   outBuffer(outputBuffer, MA_BUFFER_SIZE);
+                uint32_t outLen;
+
+                encodeHeader(outBuffer, 'm', sequence);
+                outBuffer.putLong(Manageable::STATUS_FORBIDDEN);
+                outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN));
+                outLen = MA_BUFFER_SIZE - outBuffer.available();
+                outBuffer.reset();
+                sendBuffer(outBuffer, outLen, dExchange, replyToKey);
+            }
+
+            QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence);
+        }
 
         return false;
     }
@@ -1139,9 +1854,6 @@ bool ManagementAgent::authorizeAgentMess
 
 void ManagementAgent::dispatchAgentCommandLH(Message& msg)
 {
-    Buffer   inBuffer(inputBuffer, MA_BUFFER_SIZE);
-    uint8_t  opcode;
-    uint32_t sequence;
     string   replyToKey;
 
     const framing::MessageProperties* p =
@@ -1153,6 +1865,9 @@ void ManagementAgent::dispatchAgentComma
     else
         return;
 
+    Buffer   inBuffer(inputBuffer, MA_BUFFER_SIZE);
+    uint8_t  opcode;
+
     if (msg.encodedSize() > MA_BUFFER_SIZE) {
         QPID_LOG(debug, "ManagementAgent::dispatchAgentCommandLH: Message too large: " <<
                  msg.encodedSize());
@@ -1163,7 +1878,36 @@ void ManagementAgent::dispatchAgentComma
     uint32_t bufferLen = inBuffer.getPosition();
     inBuffer.reset();
 
+    const framing::FieldTable *headers = msg.getApplicationHeaders();
+
+    if (headers && headers->getAsString("app_id") == "qmf2")
+    {
+        std::string opcode = headers->getAsString("qmf.opcode");
+        std::string contentType = headers->getAsString("qmf.content");
+        std::string body;
+        std::string cid;
+
+        inBuffer.getRawData(body, bufferLen);
+
+        if (p && p->hasCorrelationId()) {
+            cid = p->getCorrelationId();
+        }
+
+        if (opcode == "_method_request")
+            return handleMethodRequestLH(body, replyToKey, cid, msg.getPublisher());
+        else if (opcode == "_query_request")
+            return handleGetQueryLH(body, replyToKey, cid, contentType);
+        else if (opcode == "_agent_locate_request")
+            return handleLocateRequestLH(body, replyToKey, cid);
+
+        QPID_LOG(warning, "Support for QMF Opcode [" << opcode << "] TBD!!!");
+        return;
+    }
+
+    // old preV2 binary messages
+
     while (inBuffer.getPosition() < bufferLen) {
+        uint32_t sequence;
         if (!checkHeader(inBuffer, &opcode, &sequence))
             return;
 
@@ -1359,7 +2103,6 @@ ManagementObjectMap::iterator Management
     return iter;
 }
 
-
 void ManagementAgent::setAllocator(std::auto_ptr<IdAllocator> a)
 {
     Mutex::ScopedLock lock (userLock);
@@ -1377,42 +2120,64 @@ void ManagementAgent::disallow(const std
     disallowed[std::make_pair(className, methodName)] = message;
 }
 
+void ManagementAgent::SchemaClassKey::mapEncode(Variant::Map& _map) const {
+    _map["_cname"] = name;
+    _map["_hash"] = qpid::types::Uuid(hash);
+}
+
+void ManagementAgent::SchemaClassKey::mapDecode(const Variant::Map& _map) {
+    Variant::Map::const_iterator i;
+
+    if ((i = _map.find("_cname")) != _map.end()) {
+        name = i->second.asString();
+    }
+
+    if ((i = _map.find("_hash")) != _map.end()) {
+        const qpid::types::Uuid& uuid = i->second.asUuid();
+        memcpy(hash, uuid.data(), uuid.size());
+    }
+}
+
 void ManagementAgent::SchemaClassKey::encode(qpid::framing::Buffer& buffer) const {
-    buffer.checkAvailable(encodedSize());
+    buffer.checkAvailable(encodedBufSize());
     buffer.putShortString(name);
     buffer.putBin128(hash);
 }
 
 void ManagementAgent::SchemaClassKey::decode(qpid::framing::Buffer& buffer) {
-    buffer.checkAvailable(encodedSize());
+    buffer.checkAvailable(encodedBufSize());
     buffer.getShortString(name);
     buffer.getBin128(hash);
 }
 
-uint32_t ManagementAgent::SchemaClassKey::encodedSize() const {
+uint32_t ManagementAgent::SchemaClassKey::encodedBufSize() const {
     return 1 + name.size() + 16 /* bin128 */;
 }
 
-void ManagementAgent::SchemaClass::encode(qpid::framing::Buffer& outBuf) const {
-    outBuf.checkAvailable(encodedSize());
-    outBuf.putOctet(kind);
-    outBuf.putLong(pendingSequence);
-    outBuf.putLongString(data);
-}
-
-void ManagementAgent::SchemaClass::decode(qpid::framing::Buffer& inBuf) {
-    inBuf.checkAvailable(encodedSize());
-    kind = inBuf.getOctet();
-    pendingSequence = inBuf.getLong();
-    inBuf.getLongString(data);
+void ManagementAgent::SchemaClass::mapEncode(Variant::Map& _map) const {
+    _map["_type"] = kind;
+    _map["_pending_sequence"] = pendingSequence;
+    _map["_data"] = data;
 }
 
-uint32_t ManagementAgent::SchemaClass::encodedSize() const {
-    return sizeof(uint8_t) + sizeof(uint32_t) + sizeof(uint32_t) + data.size();
+void ManagementAgent::SchemaClass::mapDecode(const Variant::Map& _map) {
+    Variant::Map::const_iterator i;
+
+    if ((i = _map.find("_type")) != _map.end()) {
+        kind = i->second;
+    }
+    if ((i = _map.find("_pending_sequence")) != _map.end()) {
+        pendingSequence = i->second;
+    }
+    if ((i = _map.find("_data")) != _map.end()) {
+        data = i->second.asString();
+    }
 }
 
 void ManagementAgent::exportSchemas(std::string& out) {
-    out.clear();
+    Variant::List list_;
+    Variant::Map map_, kmap, cmap;
+
     for (PackageMap::const_iterator i = packages.begin(); i != packages.end(); ++i) {
         string name = i->first;
         const ClassMap& classes = i ->second;
@@ -1421,90 +2186,143 @@ void ManagementAgent::exportSchemas(std:
             const SchemaClass& klass = j->second;
             if (klass.writeSchemaCall == 0) { // Ignore built-in schemas.
                 // Encode name, schema-key, schema-class
-                size_t encodedSize = 1+name.size()+key.encodedSize()+klass.encodedSize();
-                size_t end = out.size();
-                out.resize(end + encodedSize);
-                framing::Buffer outBuf(&out[end], encodedSize);
-                outBuf.putShortString(name);
-                key.encode(outBuf);
-                klass.encode(outBuf);
+
+                map_.clear();
+                kmap.clear();
+                cmap.clear();
+
+                key.mapEncode(kmap);
+                klass.mapEncode(cmap);
+
+                map_["_pname"] = name;
+                map_["_key"] = kmap;
+                map_["_class"] = cmap;
+                list_.push_back(map_);
             }
         }
     }
+
+    ListCodec::encode(list_, out);
 }
 
 void ManagementAgent::importSchemas(qpid::framing::Buffer& inBuf) {
-    while (inBuf.available()) {
+
+    string buf(inBuf.getPointer(), inBuf.available());
+    Variant::List content;
+    ListCodec::decode(buf, content);
+    Variant::List::const_iterator l;
+
+
+    for (l = content.begin(); l != content.end(); l++) {
         string package;
         SchemaClassKey key;
         SchemaClass klass;
-        inBuf.getShortString(package);
-        key.decode(inBuf);
-        klass.decode(inBuf);
-        packages[package][key] = klass;
+        Variant::Map map_, kmap, cmap;
+        Variant::Map::const_iterator i;
+        
+        map_ = l->asMap();
+
+        if ((i = map_.find("_pname")) != map_.end()) {
+            package = i->second.asString();
+
+            if ((i = map_.find("_key")) != map_.end()) {
+                key.mapDecode(i->second.asMap());
+
+                if ((i = map_.find("_class")) != map_.end()) {
+                    klass.mapDecode(i->second.asMap());
+
+                    packages[package][key] = klass;
+                }
+            }
+        }
     }
 }
 
-void ManagementAgent::RemoteAgent::encode(qpid::framing::Buffer& outBuf) const {
-    outBuf.checkAvailable(encodedSize());
-    outBuf.putLong(brokerBank);
-    outBuf.putLong(agentBank);
-    outBuf.putShortString(routingKey);
-    // TODO aconway 2010-03-04: we send the v2Key instead of the
-    // ObjectId because that has the same meaning on different
-    // brokers. ObjectId::encode doesn't currently encode the v2Key,
-    // this can be cleaned up when it does.
-    outBuf.putMediumString(connectionRef.getV2Key());
-    mgmtObject->writeProperties(outBuf);
-}
-
-void ManagementAgent::RemoteAgent::decode(qpid::framing::Buffer& inBuf) {
-    brokerBank = inBuf.getLong();
-    agentBank = inBuf.getLong();
-    inBuf.getShortString(routingKey);
-
-    // TODO aconway 2010-03-04: see comment in encode()
-    string connectionKey;
-    inBuf.getMediumString(connectionKey);
-    connectionRef = ObjectId(); // Clear out any existing value.
-    connectionRef.setV2Key(connectionKey);
+void ManagementAgent::RemoteAgent::mapEncode(Variant::Map& map_) const {
+    Variant::Map _objId, _values;
+
+    map_["_brokerBank"] = brokerBank;
+    map_["_agentBank"] = agentBank;
+    map_["_routingKey"] = routingKey;
+
+    connectionRef.mapEncode(_objId);
+    map_["_object_id"] = _objId;
+
+    mgmtObject->mapEncodeValues(_values, true, false);
+    map_["_values"] = _values;
+}
+
+void ManagementAgent::RemoteAgent::mapDecode(const Variant::Map& map_) {
+    Variant::Map::const_iterator i;
+
+    if ((i = map_.find("_brokerBank")) != map_.end()) {
+        brokerBank = i->second;
+    }
+
+    if ((i = map_.find("_agentBank")) != map_.end()) {
+        agentBank = i->second;
+    }
+
+    if ((i = map_.find("_routingKey")) != map_.end()) {
+        routingKey = i->second.getString();
+    }
+
+    if ((i = map_.find("_object_id")) != map_.end()) {
+        connectionRef.mapDecode(i->second.asMap());
+    }
 
     mgmtObject = new _qmf::Agent(&agent, this);
-    mgmtObject->readProperties(inBuf);
+
+    if ((i = map_.find("_values")) != map_.end()) {
+        mgmtObject->mapDecodeValues(i->second.asMap());
+    }
+
     // TODO aconway 2010-03-04: see comment in encode(), readProperties doesn't set v2key.
     mgmtObject->set_connectionRef(connectionRef);
 }
 
-uint32_t ManagementAgent::RemoteAgent::encodedSize() const {
-    // TODO aconway 2010-03-04: see comment in encode()
-    return sizeof(uint32_t) + sizeof(uint32_t) // 2 x Long
-        + routingKey.size() + sizeof(uint8_t) // ShortString
-        + connectionRef.getV2Key().size() + sizeof(uint16_t) // medium string
-        + mgmtObject->writePropertiesSize();
-}
-
 void ManagementAgent::exportAgents(std::string& out) {
-    out.clear();
+    Variant::List list_;
+    Variant::Map map_, omap, amap;
+
     for (RemoteAgentMap::const_iterator i = remoteAgents.begin();
          i != remoteAgents.end();
          ++i)
     {
         // TODO aconway 2010-03-04: see comment in ManagementAgent::RemoteAgent::encode
         RemoteAgent* agent = i->second;
-        size_t encodedSize = agent->encodedSize();
-        size_t end = out.size();
-        out.resize(end + encodedSize);
-        framing::Buffer outBuf(&out[end], encodedSize);
-        agent->encode(outBuf);
+
+        map_.clear();
+        amap.clear();
+
+        agent->mapEncode(amap);
+        map_["_remote_agent"] = amap;
+        list_.push_back(map_);
     }
+
+    ListCodec::encode(list_, out);
 }
 
 void ManagementAgent::importAgents(qpid::framing::Buffer& inBuf) {
-    while (inBuf.available()) {
+    string buf(inBuf.getPointer(), inBuf.available());
+    Variant::List content;
+    ListCodec::decode(buf, content);
+    Variant::List::const_iterator l;
+
+    for (l = content.begin(); l != content.end(); l++) {
         std::auto_ptr<RemoteAgent> agent(new RemoteAgent(*this));
-        agent->decode(inBuf);
-        addObject(agent->mgmtObject, 0);
-        remoteAgents[agent->connectionRef] = agent.release();
+        Variant::Map map_;
+        Variant::Map::const_iterator i;
+
+        map_ = l->asMap();
+
+        if ((i = map_.find("_remote_agent")) != map_.end()) {
+
+            agent->mapDecode(i->second.asMap());
+
+            addObject (agent->mgmtObject, 0, false);
+            remoteAgents[agent->connectionRef] = agent.release();
+        }
     }
 }
 
@@ -1519,3 +2337,198 @@ std::string ManagementAgent::debugSnapsh
     msg << " new objects: " << newManagementObjects.size();
     return msg.str();
 }
+
+Variant::Map ManagementAgent::toMap(const FieldTable& from)
+{
+    Variant::Map map;
+
+    for (FieldTable::const_iterator iter = from.begin(); iter != from.end(); iter++) {
+        const string& key(iter->first);
+        const FieldTable::ValuePtr& val(iter->second);
+
+        map[key] = toVariant(val);
+    }
+
+    return map;
+}
+
+Variant::List ManagementAgent::toList(const List& from)
+{
+    Variant::List _list;
+
+    for (List::const_iterator iter = from.begin(); iter != from.end(); iter++) {
+        const List::ValuePtr& val(*iter);
+
+        _list.push_back(toVariant(val));
+    }
+
+    return _list;
+}
+
+qpid::framing::FieldTable ManagementAgent::fromMap(const Variant::Map& from)
+{
+    qpid::framing::FieldTable ft;
+
+    for (Variant::Map::const_iterator iter = from.begin();
+         iter != from.end();
+         iter++) {
+        const string& key(iter->first);
+        const Variant& val(iter->second);
+
+        ft.set(key, toFieldValue(val));
+    }
+
+    return ft;
+}
+
+
+List ManagementAgent::fromList(const Variant::List& from)
+{
+    List fa;
+
+    for (Variant::List::const_iterator iter = from.begin();
+         iter != from.end();
+         iter++) {
+        const Variant& val(*iter);
+
+        fa.push_back(toFieldValue(val));
+    }
+
+    return fa;
+}
+
+
+boost::shared_ptr<FieldValue> ManagementAgent::toFieldValue(const Variant& in)
+{
+
+    switch(in.getType()) {
+
+    case types::VAR_VOID:   return boost::shared_ptr<FieldValue>(new VoidValue());
+    case types::VAR_BOOL:   return boost::shared_ptr<FieldValue>(new BoolValue(in.asBool()));
+    case types::VAR_UINT8:  return boost::shared_ptr<FieldValue>(new Unsigned8Value(in.asUint8()));
+    case types::VAR_UINT16: return boost::shared_ptr<FieldValue>(new Unsigned16Value(in.asUint16()));
+    case types::VAR_UINT32: return boost::shared_ptr<FieldValue>(new Unsigned32Value(in.asUint32()));
+    case types::VAR_UINT64: return boost::shared_ptr<FieldValue>(new Unsigned64Value(in.asUint64()));
+    case types::VAR_INT8:   return boost::shared_ptr<FieldValue>(new Integer8Value(in.asInt8()));
+    case types::VAR_INT16:  return boost::shared_ptr<FieldValue>(new Integer16Value(in.asInt16()));
+    case types::VAR_INT32:  return boost::shared_ptr<FieldValue>(new Integer32Value(in.asInt32()));
+    case types::VAR_INT64:  return boost::shared_ptr<FieldValue>(new Integer64Value(in.asInt64()));
+    case types::VAR_FLOAT:  return boost::shared_ptr<FieldValue>(new FloatValue(in.asFloat()));
+    case types::VAR_DOUBLE: return boost::shared_ptr<FieldValue>(new DoubleValue(in.asDouble()));
+    case types::VAR_STRING: return boost::shared_ptr<FieldValue>(new Str16Value(in.asString()));
+    case types::VAR_UUID:   return boost::shared_ptr<FieldValue>(new UuidValue(in.asUuid().data()));
+    case types::VAR_MAP:    return boost::shared_ptr<FieldValue>(new FieldTableValue(ManagementAgent::fromMap(in.asMap())));
+    case types::VAR_LIST:   return boost::shared_ptr<FieldValue>(new ListValue(ManagementAgent::fromList(in.asList())));
+    }
+
+    QPID_LOG(error, "Unknown Variant type - not converted: [" << in.getType() << "]");
+    return boost::shared_ptr<FieldValue>(new VoidValue());
+}
+
+// stolen from qpid/client/amqp0_10/Codecs.cpp - TODO: make Codecs public, and remove this dup.
+Variant ManagementAgent::toVariant(const boost::shared_ptr<FieldValue>& in)
+{
+    const std::string iso885915("iso-8859-15");
+    const std::string utf8("utf8");
+    const std::string utf16("utf16");
+    //const std::string binary("binary");
+    const std::string amqp0_10_binary("amqp0-10:binary");
+    //const std::string amqp0_10_bit("amqp0-10:bit");
+    const std::string amqp0_10_datetime("amqp0-10:datetime");
+    const std::string amqp0_10_struct("amqp0-10:struct");
+    Variant out;
+
+    //based on AMQP 0-10 typecode, pick most appropriate variant type
+    switch (in->getType()) {
+        //Fixed Width types:
+    case 0x00: //bin8
+    case 0x01: out.setEncoding(amqp0_10_binary); // int8
+    case 0x02: out = in->getIntegerValue<int8_t, 1>(); break;  //uint8
+    case 0x03: out = in->getIntegerValue<uint8_t, 1>(); break;  // 
+        // case 0x04: break; //TODO: iso-8859-15 char  // char
+    case 0x08: out = static_cast<bool>(in->getIntegerValue<uint8_t, 1>()); break;  // bool int8
+
+    case 0x10: out.setEncoding(amqp0_10_binary);  // bin16
+    case 0x11: out = in->getIntegerValue<int16_t, 2>(); break;  // int16
+    case 0x12: out = in->getIntegerValue<uint16_t, 2>(); break;  //uint16
+
+    case 0x20: out.setEncoding(amqp0_10_binary);   // bin32
+    case 0x21: out = in->getIntegerValue<int32_t, 4>(); break;  // int32
+    case 0x22: out = in->getIntegerValue<uint32_t, 4>(); break; // uint32
+
+    case 0x23: out = in->get<float>(); break;  // float(32)
+
+        // case 0x27: break; //TODO: utf-32 char
+
+    case 0x30: out.setEncoding(amqp0_10_binary); // bin64
+    case 0x31: out = in->getIntegerValue<int64_t, 8>(); break; //int64
+
+    case 0x38: out.setEncoding(amqp0_10_datetime); //treat datetime as uint64_t, but set encoding
+    case 0x32: out = in->getIntegerValue<uint64_t, 8>(); break;  //uint64
+    case 0x33: out = in->get<double>(); break;  // double
+
+    case 0x48: // uuid
+        {
+            unsigned char data[16];
+            in->getFixedWidthValue<16>(data);
+            out = qpid::types::Uuid(data);
+        } break;
+
+        //TODO: figure out whether and how to map values with codes 0x40-0xd8
+
+    case 0xf0: break;//void, which is the default value for Variant
+        // case 0xf1: out.setEncoding(amqp0_10_bit); break;//treat 'bit' as void, which is the default value for Variant
+
+        //Variable Width types:
+        //strings:
+    case 0x80: // str8
+    case 0x90: // str16
+    case 0xa0: // str32
+        out = in->get<std::string>();
+        out.setEncoding(amqp0_10_binary);
+        break;
+
+    case 0x84: // str8
+    case 0x94: // str16
+        out = in->get<std::string>();
+        out.setEncoding(iso885915);
+        break;
+
+    case 0x85: // str8
+    case 0x95: // str16
+        out = in->get<std::string>();
+        out.setEncoding(utf8);
+        break;
+
+    case 0x86: // str8
+    case 0x96: // str16
+        out = in->get<std::string>();
+        out.setEncoding(utf16);
+        break;
+
+    case 0xab:  // str32
+        out = in->get<std::string>();
+        out.setEncoding(amqp0_10_struct);
+        break;
+
+    case 0xa8:  // map
+        out = ManagementAgent::toMap(in->get<FieldTable>());
+        break;
+
+    case 0xa9: // list of variant types
+        out = ManagementAgent::toList(in->get<List>());
+        break;
+        //case 0xaa: //convert amqp0-10 array (uniform type) into variant list
+        // out = Variant::List();
+        // translate<Array>(in, out.asList(), &toVariant);
+        // break;
+
+      default:
+          //error?
+          QPID_LOG(error, "Unknown FieldValue type - not converted: [" << (unsigned int)(in->getType()) << "]");
+          break;
+    }
+
+    return out;
+}
+

Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h?rev=929716&r1=929715&r2=929716&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h Wed Mar 31 21:13:12 2010
@@ -32,7 +32,9 @@
 #include "qpid/management/ManagementEvent.h"
 #include "qpid/management/Manageable.h"
 #include "qmf/org/apache/qpid/broker/Agent.h"
+#include "qpid/types/Variant.h"
 #include <qpid/framing/AMQFrame.h>
+#include <qpid/framing/FieldValue.h>
 #include <memory>
 #include <string>
 #include <map>
@@ -62,7 +64,7 @@ public:
     } severity_t;
 
 
-    ManagementAgent ();
+    ManagementAgent (const bool qmfV1, const bool qmfV2);
     virtual ~ManagementAgent ();
 
     /** Called before plugins are initialized */
@@ -74,6 +76,9 @@ public:
     /** Called by cluster to suppress management output during update. */
     void suppress(bool s) { suppressed = s; }
 
+    void setName(const std::string& vendor,
+                 const std::string& product,
+                 const std::string& instance="");
     void setInterval(uint16_t _interval) { interval = _interval; }
     void setExchange(qpid::broker::Exchange::shared_ptr mgmtExchange,
                      qpid::broker::Exchange::shared_ptr directExchange);
@@ -91,6 +96,9 @@ public:
                                              ManagementObject::writeSchemaCall_t schemaCall);
     QPID_BROKER_EXTERN ObjectId addObject   (ManagementObject* object,
                                              uint64_t          persistId = 0);
+    QPID_BROKER_EXTERN ObjectId addObject   (ManagementObject*  object,
+                                             const std::string& key,
+                                             bool               persistent = true);
     QPID_BROKER_EXTERN void raiseEvent(const ManagementEvent& event,
                                        severity_t severity = SEV_DEFAULT);
     QPID_BROKER_EXTERN void clientAdded     (const std::string& routingKey);
@@ -99,7 +107,8 @@ public:
 
     bool dispatchCommand (qpid::broker::Deliverable&       msg,
                           const std::string&         routingKey,
-                          const framing::FieldTable* args);
+                          const framing::FieldTable* args,
+                          const bool topic);
 
     const framing::Uuid& getUuid() const { return uuid; }
 
@@ -128,6 +137,15 @@ public:
     uint16_t getBootSequence(void) { return bootSequence; }
     void setBootSequence(uint16_t b) { bootSequence = b; }
 
+    // TODO: remove these when Variant API moved into common library.
+    static types::Variant::Map toMap(const framing::FieldTable& from);
+    static framing::FieldTable fromMap(const types::Variant::Map& from);
+    static types::Variant::List toList(const framing::List& from);
+    static framing::List fromList(const types::Variant::List& from);
+    static boost::shared_ptr<framing::FieldValue> toFieldValue(const types::Variant& in);
+    static types::Variant toVariant(const boost::shared_ptr<framing::FieldValue>& val);
+
+
 private:
     struct Periodic : public qpid::sys::TimerTask
     {
@@ -153,9 +171,8 @@ private:
         ManagementObject* GetManagementObject (void) const { return mgmtObject; }
 
         virtual ~RemoteAgent ();
-        void encode(framing::Buffer& buffer) const;
-        void decode(framing::Buffer& buffer);
-        uint32_t encodedSize() const;
+        void mapEncode(qpid::types::Variant::Map& _map) const;
+        void mapDecode(const qpid::types::Variant::Map& _map);
     };
 
     // TODO: Eventually replace string with entire reply-to structure.  reply-to
@@ -175,9 +192,11 @@ private:
         std::string name;
         uint8_t     hash[16];
 
+        void mapEncode(qpid::types::Variant::Map& _map) const;
+        void mapDecode(const qpid::types::Variant::Map& _map);
         void encode(framing::Buffer& buffer) const;
         void decode(framing::Buffer& buffer);
-        uint32_t encodedSize() const;
+        uint32_t encodedBufSize() const;
     };
 
     struct SchemaClassKeyComp
@@ -209,9 +228,8 @@ private:
         bool hasSchema () { return (writeSchemaCall != 0) || !data.empty(); }
         void appendSchema (framing::Buffer& buf);
 
-        void encode(framing::Buffer& buffer) const;
-        void decode(framing::Buffer& buffer);
-        uint32_t encodedSize() const;
+        void mapEncode(qpid::types::Variant::Map& _map) const;
+        void mapDecode(const qpid::types::Variant::Map& _map);
     };
 
     typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap;
@@ -264,6 +282,14 @@ private:
     typedef std::map<MethodName, std::string> DisallowedMethods;
     DisallowedMethods disallowed;
 
+    // Agent name and address
+    qpid::types::Variant::Map attrMap;
+    std::string       name_address;
+
+    // supported management protocol
+    bool qmf1Support;
+    bool qmf2Support;
+
 
 #   define MA_BUFFER_SIZE 65536
     char inputBuffer[MA_BUFFER_SIZE];
@@ -279,6 +305,11 @@ private:
                              uint32_t                     length,
                              qpid::broker::Exchange::shared_ptr exchange,
                              std::string                  routingKey);
+    void sendBuffer(const std::string&     data,
+                    const std::string&     cid,
+                    const qpid::types::Variant::Map& headers,
+                    qpid::broker::Exchange::shared_ptr exchange,
+                    const std::string& routingKey);
     void moveNewObjectsLH();
 
     bool authorizeAgentMessageLH(qpid::broker::Message& msg);
@@ -311,6 +342,10 @@ private:
     void handleAttachRequestLH  (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken);
     void handleGetQueryLH       (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
     void handleMethodRequestLH  (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken);
+    void handleGetQueryLH       (const std::string& body, std::string replyToKey, const std::string& cid, const std::string& contentType);
+    void handleMethodRequestLH  (const std::string& body, std::string replyToKey, const std::string& cid, const qpid::broker::ConnectionToken* connToken);
+    void handleLocateRequestLH  (const std::string& body, const std::string &replyToKey, const std::string& cid);
+
 
     size_t validateSchema(framing::Buffer&, uint8_t kind);
     size_t validateTableSchema(framing::Buffer&);

Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp?rev=929716&r1=929715&r2=929716&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp Wed Mar 31 21:13:12 2010
@@ -29,13 +29,16 @@ using namespace qpid::framing;
 using namespace qpid::sys;
 
 ManagementDirectExchange::ManagementDirectExchange(const string& _name, Manageable* _parent, Broker* b) :
-    Exchange (_name, _parent, b), DirectExchange(_name, _parent, b) {}
+    Exchange (_name, _parent, b),
+    DirectExchange(_name, _parent, b),
+    managementAgent(0) {}
 ManagementDirectExchange::ManagementDirectExchange(const std::string& _name,
                                                    bool               _durable,
                                                    const FieldTable&  _args,
                                                    Manageable*        _parent, Broker* b) :
     Exchange (_name, _durable, _args, _parent, b), 
-    DirectExchange(_name, _durable, _args, _parent, b) {}
+    DirectExchange(_name, _durable, _args, _parent, b),
+    managementAgent(0) {}
 
 void ManagementDirectExchange::route(Deliverable&      msg,
                                      const string&     routingKey,
@@ -43,7 +46,8 @@ void ManagementDirectExchange::route(Del
 {
     bool routeIt = true;
 
-    // TODO: Intercept messages directed to the embedded agent and send them to the management agent.
+    if (managementAgent)
+        routeIt = managementAgent->dispatchCommand(msg, routingKey, args, false /*direct*/);
 
     if (routeIt)
         DirectExchange::route(msg, routingKey, args);

Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.cpp?rev=929716&r1=929715&r2=929716&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.cpp Wed Mar 31 21:13:12 2010
@@ -22,7 +22,10 @@
 #include "qpid/management/Manageable.h"
 #include "qpid/management/ManagementObject.h"
 #include "qpid/framing/FieldTable.h"
+#include "qpid/framing/Buffer.h"
 #include "qpid/sys/Thread.h"
+#include "qpid/log/Statement.h"
+#include <boost/lexical_cast.hpp>
 
 #include <stdlib.h>
 
@@ -36,26 +39,37 @@ void AgentAttachment::setBanks(uint32_t 
         ((uint64_t) (bank   & 0x0fffffff));
 }
 
-ObjectId::ObjectId(uint8_t flags, uint16_t seq, uint32_t broker, uint32_t bank, uint64_t object)
-    : agent(0)
+// Deprecated
+ObjectId::ObjectId(uint8_t flags, uint16_t seq, uint32_t broker, uint64_t object)
+    : agent(0), agentEpoch(seq)
 {
     first =
         ((uint64_t) (flags  &       0x0f)) << 60 |
         ((uint64_t) (seq    &     0x0fff)) << 48 |
-        ((uint64_t) (broker & 0x000fffff)) << 28 |
-        ((uint64_t) (bank   & 0x0fffffff));
+      ((uint64_t) (broker & 0x000fffff)) << 28;
     second = object;
 }
 
-ObjectId::ObjectId(AgentAttachment* _agent, uint8_t flags, uint16_t seq, uint64_t object)
-    : agent(_agent)
+
+ObjectId::ObjectId(uint8_t flags, uint16_t seq, uint32_t broker)
+    : agent(0), second(0), agentEpoch(seq)
 {
     first =
+        ((uint64_t) (flags  &       0x0f)) << 60 |
+        ((uint64_t) (seq    &     0x0fff)) << 48 |
+        ((uint64_t) (broker & 0x000fffff)) << 28;
+}
+
+ObjectId::ObjectId(AgentAttachment* _agent, uint8_t flags, uint16_t seq)
+    : agent(_agent), second(0), agentEpoch(seq)
+{
+
+    first =
         ((uint64_t) (flags &   0x0f)) << 60 |
         ((uint64_t) (seq   & 0x0fff)) << 48;
-    second = object;
 }
 
+
 ObjectId::ObjectId(std::istream& in) : agent(0)
 {
     std::string text;
@@ -75,6 +89,10 @@ void ObjectId::fromString(const std::str
 #  define atoll(X) _atoi64(X)
 #endif
 
+    // format:
+    // V1: <flags>-<sequence>-<broker-bank>-<agent-bank>-<uint64-app-id>
+    // V2: Not used
+
     std::string copy(text.c_str());
     char* cText;
     char* field[FIELDS];
@@ -99,10 +117,13 @@ void ObjectId::fromString(const std::str
     if (idx != FIELDS)
         throw Exception("Invalid ObjectId format");
 
+    agentEpoch = atoll(field[1]);
+
     first = (atoll(field[0]) << 60) +
         (atoll(field[1]) << 48) +
-        (atoll(field[2]) << 28) +
-        atoll(field[3]);
+        (atoll(field[2]) << 28);
+
+    agentName = std::string(field[3]);
     second = atoll(field[4]);
 }
 
@@ -123,21 +144,40 @@ bool ObjectId::equalV1(const ObjectId &o
     return first == otherFirst && second == other.second;
 }
 
-void ObjectId::encode(framing::Buffer& buffer) const
+// encode as V1-format binary
+void ObjectId::encode(std::string& buffer) const
 {
+    const uint32_t len = 16;
+    char _data[len];
+    qpid::framing::Buffer body(_data, len);
+
     if (agent == 0)
-        buffer.putLongLong(first);
+        body.putLongLong(first);
     else
-        buffer.putLongLong(first | agent->first);
-    buffer.putLongLong(second);
+        body.putLongLong(first | agent->first);
+    body.putLongLong(second);
+
+    body.reset();
+    body.getRawData(buffer, len);
 }
 
-void ObjectId::decode(framing::Buffer& buffer)
+// decode as V1-format binary
+void ObjectId::decode(const std::string& buffer)
 {
-    first  = buffer.getLongLong();
-    second = buffer.getLongLong();
+    const uint32_t len = 16;
+    char _data[len];
+    qpid::framing::Buffer body(_data, len);
+
+    body.checkAvailable(buffer.length());
+    body.putRawData(buffer);
+    body.reset();
+    first  = body.getLongLong();
+    second = body.getLongLong();
+    v2Key = boost::lexical_cast<std::string>(second);
 }
 
+// generate the V2 key from the index fields defined
+// in the schema.
 void ObjectId::setV2Key(const ManagementObject& object)
 {
     std::stringstream oname;
@@ -145,6 +185,42 @@ void ObjectId::setV2Key(const Management
     v2Key = oname.str();
 }
 
+// encode as V2-format map
+void ObjectId::mapEncode(types::Variant::Map& map) const
+{
+    map["_object_name"] = v2Key;
+    if (!agentName.empty())
+        map["_agent_name"] = agentName;
+    if (agentEpoch)
+        map["_agent_epoch"] = agentEpoch;
+}
+
+// decode as v2-format map
+void ObjectId::mapDecode(const types::Variant::Map& map)
+{
+    types::Variant::Map::const_iterator i;
+
+    if ((i = map.find("_object_name")) != map.end())
+        v2Key = i->second.asString();
+    else
+        throw Exception("Required _object_name field missing.");
+
+    if ((i = map.find("_agent_name")) != map.end())
+        agentName = i->second.asString();
+
+    if ((i = map.find("_agent_epoch")) != map.end())
+        agentEpoch = i->second.asInt64();
+}
+
+
+ObjectId::operator types::Variant::Map() const
+{
+    types::Variant::Map m;
+    mapEncode(m);
+    return m;
+}
+
+
 
 namespace qpid {
 namespace management {
@@ -158,7 +234,7 @@ std::ostream& operator<<(std::ostream& o
     out << ((virtFirst & 0xF000000000000000LL) >> 60) <<
         "-" << ((virtFirst & 0x0FFF000000000000LL) >> 48) <<
         "-" << ((virtFirst & 0x0000FFFFF0000000LL) >> 28) <<
-        "-" <<  (virtFirst & 0x000000000FFFFFFFLL) <<
+        "-" << i.agentName <<
         "-" << i.second;
     return out;
 }
@@ -168,43 +244,88 @@ std::ostream& operator<<(std::ostream& o
 int ManagementObject::maxThreads = 1;
 int ManagementObject::nextThreadIndex = 0;
 
-void ManagementObject::writeTimestamps (framing::Buffer& buf) const
+void ManagementObject::writeTimestamps (std::string& buf) const
 {
-    buf.putShortString (getPackageName ());
-    buf.putShortString (getClassName ());
-    buf.putBin128      (getMd5Sum ());
-    buf.putLongLong    (updateTime);
-    buf.putLongLong    (createTime);
-    buf.putLongLong    (destroyTime);
-    objectId.encode(buf);
+    char _data[4000];
+    qpid::framing::Buffer body(_data, 4000);
+
+    body.putShortString (getPackageName ());
+    body.putShortString (getClassName ());
+    body.putBin128      (getMd5Sum ());
+    body.putLongLong    (updateTime);
+    body.putLongLong    (createTime);
+    body.putLongLong    (destroyTime);
+
+    uint32_t len = body.getPosition();
+    body.reset();
+    body.getRawData(buf, len);
+
+    std::string oid;
+    objectId.encode(oid);
+    buf += oid;
 }
 
-void ManagementObject::readTimestamps (framing::Buffer& buf)
+void ManagementObject::readTimestamps (const std::string& buf)
 {
+    char _data[4000];
+    qpid::framing::Buffer body(_data, 4000);
     std::string unused;
     uint8_t unusedUuid[16];
-    ObjectId unusedObjectId;
 
-    buf.getShortString(unused);
-    buf.getShortString(unused);
-    buf.getBin128(unusedUuid);
-    updateTime = buf.getLongLong();
-    createTime = buf.getLongLong();
-    destroyTime = buf.getLongLong();
-    unusedObjectId.decode(buf);
+    body.checkAvailable(buf.length());
+    body.putRawData(buf);
+    body.reset();
+
+    body.getShortString(unused);
+    body.getShortString(unused);
+    body.getBin128(unusedUuid);
+    updateTime = body.getLongLong();
+    createTime = body.getLongLong();
+    destroyTime = body.getLongLong();
 }
 
 uint32_t ManagementObject::writeTimestampsSize() const
 {
     return 1 + getPackageName().length() +  // str8
-        1 + getClassName().length() +       // str8
-        16 +                                // bin128
-        8 +                                 // uint64
-        8 +                                 // uint64
-        8 +                                 // uint64
-        objectId.encodedSize();             // objectId
+      1 + getClassName().length() +       // str8
+      16 +                                // bin128
+      8 +                                 // uint64
+      8 +                                 // uint64
+      8 +                                 // uint64
+      objectId.encodedSize();             // objectId
 }
 
+
+void ManagementObject::writeTimestamps (types::Variant::Map& map) const
+{
+    types::Variant::Map oid, sid;
+
+    sid["_package_name"] = getPackageName();
+    sid["_class_name"] = getClassName();
+    sid["_hash"] = qpid::types::Uuid(getMd5Sum());
+    map["_schema_id"] = sid;
+
+    objectId.mapEncode(oid);
+    map["_object_id"] = oid;
+
+    map["_update_ts"] = updateTime;
+    map["_create_ts"] = createTime;
+    map["_delete_ts"] = destroyTime;
+}
+
+void ManagementObject::readTimestamps (const types::Variant::Map& map)
+{
+    types::Variant::Map::const_iterator i;
+
+    if ((i = map.find("_update_ts")) != map.end())
+        updateTime = i->second.asUint64();
+    if ((i = map.find("_create_ts")) != map.end())
+        createTime = i->second.asUint64();
+    if ((i = map.find("_delete_ts")) != map.end())
+        destroyTime = i->second.asUint64();
+}
+
+
 void ManagementObject::setReference(ObjectId) {}
 
 int ManagementObject::getThreadIndex() {
@@ -217,3 +338,26 @@ int ManagementObject::getThreadIndex() {
     }
     return thisIndex;
 }
+
+
+void ManagementObject::mapEncode(types::Variant::Map& map,
+                                 bool includeProperties,
+                                 bool includeStatistics)
+{
+    types::Variant::Map values;
+
+    writeTimestamps(map);
+
+    mapEncodeValues(values, includeProperties, includeStatistics);
+    map["_values"] = values;
+}
+
+void ManagementObject::mapDecode(const types::Variant::Map& map)
+{
+    types::Variant::Map::const_iterator i;
+
+    readTimestamps(map);
+
+    if ((i = map.find("_values")) != map.end())
+        mapDecodeValues(i->second.asMap());
+}

Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp?rev=929716&r1=929715&r2=929716&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp Wed Mar 31 21:13:12 2010
@@ -28,13 +28,16 @@ using namespace qpid::framing;
 using namespace qpid::sys;
 
 ManagementTopicExchange::ManagementTopicExchange(const string& _name, Manageable* _parent, Broker* b) :
-    Exchange (_name, _parent, b), TopicExchange(_name, _parent, b) {}
+    Exchange (_name, _parent, b),
+    TopicExchange(_name, _parent, b),
+    managementAgent(0) {}
 ManagementTopicExchange::ManagementTopicExchange(const std::string& _name,
                                                  bool               _durable,
                                                  const FieldTable&  _args,
                                                  Manageable*        _parent, Broker* b) :
     Exchange (_name, _durable, _args, _parent, b), 
-    TopicExchange(_name, _durable, _args, _parent, b) {}
+    TopicExchange(_name, _durable, _args, _parent, b),
+    managementAgent(0) {}
 
 void ManagementTopicExchange::route(Deliverable&      msg,
                                     const string&     routingKey,
@@ -43,12 +46,8 @@ void ManagementTopicExchange::route(Deli
     bool routeIt = true;
 
     // Intercept management agent commands
-    if (qmfVersion == 1) {
-        if ((routingKey.length() > 6 &&
-             routingKey.substr(0, 6).compare("agent.") == 0) ||
-            (routingKey == "broker"))
-            routeIt = managementAgent->dispatchCommand(msg, routingKey, args);
-    }
+    if (managementAgent)
+        routeIt = managementAgent->dispatchCommand(msg, routingKey, args, true /* topic */);
 
     if (routeIt)
         TopicExchange::route(msg, routingKey, args);

Modified: qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=929716&r1=929715&r2=929716&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/tests/Makefile.am Wed Mar 31 21:13:12 2010
@@ -267,15 +267,15 @@ txjob_LDADD=$(lib_client) 
 
 check_PROGRAMS+=PollerTest
 PollerTest_SOURCES=PollerTest.cpp
-PollerTest_LDADD=$(lib_common) $(SOCKLIBS)
+PollerTest_LDADD=$(lib_common) $(lib_client) $(SOCKLIBS)
 
 check_PROGRAMS+=DispatcherTest
 DispatcherTest_SOURCES=DispatcherTest.cpp
-DispatcherTest_LDADD=$(lib_common) $(SOCKLIBS)
+DispatcherTest_LDADD=$(lib_common) $(lib_client) $(SOCKLIBS)
 
 check_PROGRAMS+=datagen
 datagen_SOURCES=datagen.cpp
-datagen_LDADD=$(lib_common)
+datagen_LDADD=$(lib_common) $(lib_client)
 
 check_PROGRAMS+=qrsh_server
 qrsh_server_SOURCES=qrsh_server.cpp

Modified: qpid/trunk/qpid/cpp/src/tests/ManagementTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ManagementTest.cpp?rev=929716&r1=929715&r2=929716&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ManagementTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/ManagementTest.cpp Wed Mar 31 21:13:12 2010
@@ -56,32 +56,34 @@ QPID_AUTO_TEST_CASE(testObjectIdSerializ
 }
 
 QPID_AUTO_TEST_CASE(testObjectIdEncode) {
-    char buffer[100];
-    Buffer msgBuf(buffer, 100);
-    msgBuf.putLongLong(0x1002000030000004LL);
-    msgBuf.putLongLong(0x0000000000000005LL);
-    msgBuf.reset();
+    qpid::types::Variant::Map oidMap;
 
-    ObjectId oid(msgBuf);
+    ObjectId oid(1, 2, 3, 9999);
+    oid.setV2Key("testkey");
+    oid.setAgentName("myAgent");
 
     std::stringstream out1;
     out1 << oid;
 
-    BOOST_CHECK_EQUAL(out1.str(), "1-2-3-4-5");
+    BOOST_CHECK_EQUAL(out1.str(), "1-2-3-myAgent-9999");
 }
 
 QPID_AUTO_TEST_CASE(testObjectIdAttach) {
     AgentAttachment   agent;
-    ObjectId          oid(&agent, 10, 20, 50);
+    ObjectId          oid(&agent, 10, 20);
+    oid.setV2Key("GabbaGabbaHey");
+    oid.setAgentName("MrSmith");
 
     std::stringstream out1;
     out1 << oid;
-    BOOST_CHECK_EQUAL(out1.str(), "10-20-0-0-50");
+
+    BOOST_CHECK_EQUAL(out1.str(), "10-20-0-MrSmith-0");
 
     agent.setBanks(30, 40);
     std::stringstream out2;
     out2 << oid;
-    BOOST_CHECK_EQUAL(out2.str(), "10-20-30-40-50");
+
+    BOOST_CHECK_EQUAL(out2.str(), "10-20-30-MrSmith-0");
 }
 
 QPID_AUTO_TEST_CASE(testConsoleObjectId) {



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org