You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2010/03/17 15:36:35 UTC

svn commit: r924309 - in /qpid/branches/qmf-devel0.7a/qpid/cpp: include/qpid/agent/ManagementAgent.h src/qpid/agent/ManagementAgentImpl.cpp src/qpid/agent/ManagementAgentImpl.h src/qpid/management/ManagementAgent.cpp src/qpid/management/ManagementAgent.h

Author: kgiusti
Date: Wed Mar 17 14:36:35 2010
New Revision: 924309

URL: http://svn.apache.org/viewvc?rev=924309&view=rev
Log:
checkpoint

Modified:
    qpid/branches/qmf-devel0.7a/qpid/cpp/include/qpid/agent/ManagementAgent.h
    qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
    qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
    qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementAgent.cpp
    qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementAgent.h

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/include/qpid/agent/ManagementAgent.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/include/qpid/agent/ManagementAgent.h?rev=924309&r1=924308&r2=924309&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/include/qpid/agent/ManagementAgent.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/include/qpid/agent/ManagementAgent.h Wed Mar 17 14:36:35 2010
@@ -54,9 +54,10 @@ class ManagementAgent
 
     class Name {
     public:
-        QMF_AGENT_EXTERN Name(std::string vendor,
-                              std::string product,
-                              std::string name);
+        QMF_AGENT_EXTERN Name(const std::string &vendor,
+                              const std::string &product,
+                              const std::string &name);
+        QMF_AGENT_EXTERN Name(const std::string &fullName);
         QMF_AGENT_EXTERN Name();
         QMF_AGENT_EXTERN operator std::string() const;
 
@@ -118,6 +119,9 @@ class ManagementAgent
                       bool useExternalThread = false,
                       const std::string& storeFile = "") = 0;
 
+    // Extract the unique name for this agent
+    virtual const Name& getName() = 0;
+
     // Register a schema with the management agent.  This is normally called by the
     // package initializer generated by the management code generator.
     //

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp?rev=924309&r1=924308&r2=924309&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp Wed Mar 17 14:36:35 2010
@@ -23,6 +23,8 @@
 #include "qpid/log/Statement.h"
 #include "qpid/agent/ManagementAgentImpl.h"
 #include "qpid/messaging/Message.h"
+#include "qpid/messaging/ListContent.h"
+#include "qpid/messaging/MapContent.h"
 #include <list>
 #include <string.h>
 #include <stdlib.h>
@@ -198,7 +200,6 @@ void ManagementAgentImpl::raiseEvent(con
 {
     Mutex::ScopedLock lock(agentLock);
     Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE);
-    uint32_t outLen;
     uint8_t sev = (severity == SEV_DEFAULT) ? event.getSeverity() : (uint8_t) severity;
     stringstream key;
 
@@ -210,8 +211,7 @@ void ManagementAgentImpl::raiseEvent(con
     ::qpid::messaging::VariantMap &map_ = content.asMap();
     ::qpid::messaging::VariantMap schemaId;
     ::qpid::messaging::VariantMap values;
-
-    mapEncodeHeader(map_, 'e');
+    ::qpid::messaging::VariantMap headers;
 
     map_["_schema_id"] = mapEncodeSchemaId(event.getPackageName(),
                                            event.getEventName(),
@@ -221,8 +221,15 @@ void ManagementAgentImpl::raiseEvent(con
     map_["_timestamp"] = uint64_t(Duration(now()));
     map_["_severity"] = sev;
 
+    headers["method"] = "indication";
+    headers["qmf.opcode"] = "_data_indication";
+    headers["qmf.content"] = "_event";
+    headers["qmf.agent"] = std::string(agentName);
+
     content.encode();
-    connThreadBody.sendBuffer(msg.getContent(), "qpid.management", key.str());
+    connThreadBody.sendBuffer(msg.getContent(), 0,
+                              headers,
+                              "qpid.management", key.str());
 }
 
 uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit)
@@ -416,12 +423,13 @@ void ManagementAgentImpl::handleSchemaRe
         ClassMap& cMap = pIter->second;
         ClassMap::iterator cIter = cMap.find(key);
         if (cIter != cMap.end()) {
-            SchemaClass& schema = cIter->second;
+            //SchemaClass& schema = cIter->second;
             Buffer   outBuffer(outputBuffer, MA_BUFFER_SIZE);
             uint32_t outLen;
 
             encodeHeader(outBuffer, 's', sequence);
-            schema.writeSchemaCall(outBuffer);
+            //schema.writeSchemaCall(outBuffer);
+            assert(false); // TODO FIX ABOVE
             outLen = MA_BUFFER_SIZE - outBuffer.available();
             outBuffer.reset();
             connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker");
@@ -448,7 +456,9 @@ void ManagementAgentImpl::invokeMethodRe
     Buffer   outBuffer(outputBuffer, MA_BUFFER_SIZE);
     uint32_t outLen;
 
-    ObjectId objId(inBuffer);
+    assert(false);  // TODO FIX OBJ ID!!
+    //ObjectId objId(inBuffer);
+    ObjectId objId(std::string("foobag?"));
     inBuffer.getShortString(packageName);
     inBuffer.getShortString(className);
     inBuffer.getBin128(hash);
@@ -469,7 +479,8 @@ void ManagementAgentImpl::invokeMethodRe
         else
             try {
                 outBuffer.record();
-                iter->second->doMethod(methodName, inBuffer, outBuffer);
+                //iter->second->doMethod(methodName, inBuffer, outBuffer);
+                assert(false); // TODO: fix above
             } catch(exception& e) {
                 outBuffer.restore();
                 outBuffer.putLong(Manageable::STATUS_EXCEPTION);
@@ -508,17 +519,22 @@ void ManagementAgentImpl::handleGetQuery
             ::qpid::messaging::Variant::List &list_ = content.asList();
             ::qpid::messaging::Variant::Map  map_;
             ::qpid::messaging::Variant::Map values;
+            ::qpid::messaging::Variant::Map headers;
 
             if (object->getConfigChanged() || object->getInstChanged())
                 object->setUpdateTime();
 
-            mapEncodeHeader(map_, 'g', sequence);
             object->mapEncodeValues(values, true, true); // write both stats and properties
             map_["_values"] = values;
-            list.push_back(map_);
+            list_.push_back(map_);
+
+            headers["method"] = "response";
+            headers["qmf.opcode"] = "_query_response";
+            headers["qmf.content"] = "_data";
+            headers["qmf.agent"] = std::string(agentName);
 
             content.encode();
-            connThreadBody.sendBuffer(m.getContent(), "amq.direct", replyTo);
+            connThreadBody.sendBuffer(m.getContent(), sequence, headers, "amq.direct", replyTo);
 
             QPID_LOG(trace, "SENT ObjectInd");
         }
@@ -538,17 +554,22 @@ void ManagementAgentImpl::handleGetQuery
             ::qpid::messaging::Variant::List &list_ = content.asList();
             ::qpid::messaging::Variant::Map  map_;
             ::qpid::messaging::Variant::Map values;
+            ::qpid::messaging::Variant::Map headers;
 
             if (object->getConfigChanged() || object->getInstChanged())
                 object->setUpdateTime();
 
-            mapEncodeHeader(map_, 'g', sequence);
             object->mapEncodeValues(values, true, true); // write both stats and properties
             map_["_values"] = values;
-            list.push_back(map_);
+            list_.push_back(map_);
+
+            headers["method"] = "response";
+            headers["qmf.opcode"] = "_query_response";
+            headers["qmf.content"] = "_data";
+            headers["qmf.agent"] = std::string(agentName);
 
             content.encode();
-            connThreadBody.sendBuffer(m.getContent(), "amq.direct", replyTo);
+            connThreadBody.sendBuffer(m.getContent(), sequence, headers, "amq.direct", replyTo);
 
             QPID_LOG(trace, "SENT ObjectInd");
         }
@@ -623,22 +644,6 @@ void ManagementAgentImpl::encodeHeader(B
     buf.putLong (seq);
 }
 
-void ManagementAgentImpl::mapEncodeHeader(::qpid::messaging::VariantMap &map_, uint8_t opcode, uint32_t seq)
-{
-    map_["_version"] = "AM2";
-    map_["_opcode"] = opcode;
-    map_["_sequence"] = seq;
-}
-
-
-void ManagementAgentImpl::mapEncodeHeader(::qpid::messaging::VariantMap &map_, uint8_t opcode, uint32_t seq)
-{
-    map_["_version"] = "AM2";
-    map_["_opcode"] = opcode;
-    map_["_sequence"] = seq;
-}
-
-
 qpid::messaging::Variant::Map ManagementAgentImpl::mapEncodeSchemaId(const std::string& pname,
                                                                      const std::string& cname,
                                                                      const uint8_t *md5Sum)
@@ -648,7 +653,7 @@ qpid::messaging::Variant::Map Management
     map_["_package_name"] = pname;
     map_["_class_name"] = cname;
     map_["_hash_str"] = std::string((const char *)md5Sum,
-                                    qpid::managment::ManagmentObject::MD5_LEN);
+                                    qpid::management::ManagementObject::MD5_LEN);
     return map_;
 }
 
@@ -797,32 +802,25 @@ void ManagementAgentImpl::periodicProces
              iter != managementObjects.end();
              iter++) {
             ManagementObject* object = iter->second;
+            bool send_stats, send_props;
             if (baseObject->isSameClass(*object) && object->getFlags() == 0) {
                 object->setFlags(1);
                 if (object->getConfigChanged() || object->getInstChanged())
                     object->setUpdateTime();
 
-                if (object->getConfigChanged() || object->getForcePublish() || object->isDeleted()) {
-                    ::qpid::messaging::Variant::Map  map_;
-                    ::qpid::messaging::Variant::Map values;
-                    mapEncodeHeader(map_, 'c');
+                send_props = (object->getConfigChanged() || object->getForcePublish() || object->isDeleted());
+                send_stats = (object->hasInst() && (object->getInstChanged() || object->getForcePublish()));
 
-                    object->getPackageName();
-                    object->getClassName();
-                    (object->getMd5Sum(), MD5_LEN);
-                    
-                    object->mapEncodeValues(values, true, false);  // encode properties only
-                    map_["_values"] = values;
-                    list.push_back(map_);
-                }
-
-                if (object->hasInst() && (object->getInstChanged() || object->getForcePublish())) {
+                if (send_stats || send_props) {
                     ::qpid::messaging::Variant::Map  map_;
                     ::qpid::messaging::Variant::Map values;
-                    mapEncodeHeader(map_, 'i');
-                    object->mapEncodeValues(values, false, true);  // encode statistics only
+
+                    map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
+                                                           object->getClassName(),
+                                                           object->getMd5Sum());
+                    object->mapEncodeValues(values, send_props, send_stats);
                     map_["_values"] = values;
-                    list.push_back(map_);
+                    list_.push_back(map_);
                 }
 
                 if (object->isDeleted())
@@ -835,9 +833,15 @@ void ManagementAgentImpl::periodicProces
         const std::string &str = m.getContent();
         if (str.length()) {
             stringstream key;
+            ::qpid::messaging::Variant::Map  headers;
             key << "console.obj." << assignedBrokerBank << "." << assignedAgentBank << "." <<
                 baseObject->getPackageName() << "." << baseObject->getClassName();
-            connThreadBody.sendBuffer(str, "qpid.management", key.str());
+            headers["method"] = "indication";
+            headers["qmf.opcode"] = "_data_indication";
+            headers["qmf.content"] = "_data";
+            headers["qmf.agent"] = std::string(agentName);
+
+            connThreadBody.sendBuffer(str, 0, headers, "qpid.management", key.str());
         }
     }
 
@@ -951,18 +955,47 @@ void ManagementAgentImpl::ConnectionThre
                                                        const string& exchange,
                                                        const string& routingKey)
 {
+    Message msg;
     string  data;
 
     buf.getRawData(data, length);
-    sendBuffer(data, exchange, routingKey);
+    msg.setData(data);
+    sendMessage(msg, exchange, routingKey);
 }
 
 
 
 void ManagementAgentImpl::ConnectionThread::sendBuffer(const string& data,
+                                                       uint32_t sequence,
+                                                       const qpid::messaging::VariantMap headers,
                                                        const string& exchange,
                                                        const string& routingKey)
 {
+    Message msg;
+    qpid::messaging::VariantMap::const_iterator i;
+
+    if (sequence) {
+        std::stringstream seqstr;
+        seqstr << sequence;
+        msg.getMessageProperties().setCorrelationId(seqstr.str());
+    }
+    for (i = headers.begin(); i != headers.end(); ++i) {
+        msg.getHeaders().setString(i->first, i->second.asString());
+    }
+    msg.getHeaders().setString("app_id", "qmf2");
+
+    msg.setData(data);
+    sendMessage(msg, exchange, routingKey);
+}
+
+
+
+
+
+void ManagementAgentImpl::ConnectionThread::sendMessage(Message msg,
+                                                        const string& exchange,
+                                                        const string& routingKey)
+{
     ConnectionThread::shared_ptr s;
     {
         Mutex::ScopedLock _lock(connLock);
@@ -971,15 +1004,12 @@ void ManagementAgentImpl::ConnectionThre
         s = subscriptions;
     }
 
-    Message msg;
-
     msg.getDeliveryProperties().setRoutingKey(routingKey);
     msg.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str()));
-    msg.setData(data);
     try {
         session.messageTransfer(arg::content=msg, arg::destination=exchange);
     } catch(exception& e) {
-        QPID_LOG(error, "Exception caught in sendBuffer: " << e.what());
+        QPID_LOG(error, "Exception caught in sendMessage: " << e.what());
         // Bounce the connection
         if (s)
             s->stop();

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h?rev=924309&r1=924308&r2=924309&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h Wed Mar 17 14:36:35 2010
@@ -64,6 +64,7 @@ class ManagementAgentImpl : public Manag
               uint16_t intervalSeconds = 10,
               bool useExternalThread = false,
               const std::string& storeFile = "");
+    const Name& getName();
     bool isConnected() { return connected; }
     std::string& getLastFailure() { return lastFailure; }
     void registerClass(const std::string& packageName,
@@ -200,8 +201,13 @@ class ManagementAgentImpl : public Manag
                         const std::string&     exchange,
                         const std::string&     routingKey);
         void sendBuffer(const std::string&     data,
+                        const uint32_t sequence,
+                        const qpid::messaging::VariantMap headers,
                         const std::string&     exchange,
                         const std::string&     routingKey);
+        void sendMessage(qpid::client::Message msg,
+                         const std::string&     exchange,
+                         const std::string&     routingKey);
         void bindToBank(uint32_t brokerBank, uint32_t agentBank);
         void close();
         bool isSleeping() const;
@@ -225,6 +231,8 @@ class ManagementAgentImpl : public Manag
 
     static const std::string storeMagicNumber;
 
+    Name agentName;
+
     void startProtocol();
     void storeData(bool requested=false);
     void retrieveData();
@@ -241,7 +249,6 @@ class ManagementAgentImpl : public Manag
                                 PackageMap::iterator   pIter,
                                 ClassMap::iterator     cIter);
     void encodeHeader (framing::Buffer& buf, uint8_t  opcode, uint32_t  seq = 0);
-    void mapEncodeHeader (::qpid::messaging::VariantMap& map_, uint8_t  opcode, uint32_t  seq = 0);
     qpid::messaging::Variant::Map mapEncodeSchemaId(const std::string& pname,
                                                     const std::string& cname,
                                                     const uint8_t *md5Sum);

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=924309&r1=924308&r2=924309&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementAgent.cpp Wed Mar 17 14:36:35 2010
@@ -31,6 +31,8 @@
 #include "qpid/broker/AclModule.h"
 #include "qpid/messaging/Variant.h"
 #include "qpid/messaging/Uuid.h"
+#include "qpid/messaging/Message.h"
+#include "qpid/messaging/ListContent.h"
 #include <list>
 #include <iostream>
 #include <fstream>
@@ -47,6 +49,22 @@ using namespace qpid::sys;
 using namespace std;
 namespace _qmf = qmf::org::apache::qpid::broker;
 
+
+
+static qpid::messaging::Variant::Map mapEncodeSchemaId(const std::string& pname,
+                                                       const std::string& cname,
+                                                       const uint8_t *md5Sum)
+{
+    qpid::messaging::Variant::Map map_;
+
+    map_["_package_name"] = pname;
+    map_["_class_name"] = cname;
+    map_["_hash_str"] = std::string((const char *)md5Sum,
+                                    qpid::management::ManagementObject::MD5_LEN);
+    return map_;
+}
+
+
 ManagementAgent::RemoteAgent::~RemoteAgent ()
 {
     QPID_LOG(trace, "Remote Agent removed bank=[" << brokerBank << "." << agentBank << "]");
@@ -59,7 +77,7 @@ ManagementAgent::RemoteAgent::~RemoteAge
 ManagementAgent::ManagementAgent () :
     threadPoolSize(1), interval(10), broker(0), timer(0),
     startTime(uint64_t(Duration(now()))),
-    suppressed(false)
+    suppressed(false), agentName("")
 {
     nextObjectId   = 1;
     brokerBank     = 1;
@@ -223,17 +241,29 @@ ObjectId ManagementAgent::addObject(Mana
     newManagementObjects[objId] = object;
 
     if (publishNow) {
-#define IMM_BUFSIZE 65536
-        char rawBuf[IMM_BUFSIZE];
-        Buffer msgBuffer(rawBuf, IMM_BUFSIZE);
-
-        encodeHeader(msgBuffer, 'c');
-        object->writeProperties(msgBuffer);
-        uint32_t contentSize = msgBuffer.getPosition();
+        ::qpid::messaging::Message m;
+        ::qpid::messaging::ListContent content(m);
+        ::qpid::messaging::Variant::List &list_ = content.asList();
+        ::qpid::messaging::Variant::Map  map_;
+        ::qpid::messaging::Variant::Map values;
+        ::qpid::messaging::Variant::Map  headers;
+
+        map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
+                                               object->getClassName(),
+                                               object->getMd5Sum());
+        object->mapEncodeValues(values, true, false);  // send props only
+        map_["_values"] = values;
+        list_.push_back(map_);
+
+        headers["method"] = "indication";
+        headers["qmf.opcode"] = "_data_indication";
+        headers["qmf.content"] = "_data";
+        headers["qmf.agent"] = std::string(agentName);
+
+        content.encode();
         stringstream key;
         key << "console.obj.1.0." << object->getPackageName() << "." << object->getClassName();
-        msgBuffer.reset();
-        sendBuffer(msgBuffer, contentSize, mExchange, key.str());
+        sendBuffer(m.getContent(), 0, headers, mExchange, key.str());
         QPID_LOG(trace, "SEND Immediate ContentInd to=" << key.str());
     }
 
@@ -243,20 +273,31 @@ ObjectId ManagementAgent::addObject(Mana
 void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severity)
 {
     Mutex::ScopedLock lock (userLock);
-    Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE);
-    uint32_t outLen;
     uint8_t sev = (severity == SEV_DEFAULT) ? event.getSeverity() : (uint8_t) severity;
 
-    encodeHeader(outBuffer, 'e');
-    outBuffer.putShortString(event.getPackageName());
-    outBuffer.putShortString(event.getEventName());
-    outBuffer.putBin128(event.getMd5Sum());
-    outBuffer.putLongLong(uint64_t(Duration(now())));
-    outBuffer.putOctet(sev);
-    event.encode(outBuffer);
-    outLen = MA_BUFFER_SIZE - outBuffer.available();
-    outBuffer.reset();
-    sendBuffer(outBuffer, outLen, mExchange,
+    ::qpid::messaging::Message msg;
+    ::qpid::messaging::MapContent content(msg);
+    ::qpid::messaging::VariantMap &map_ = content.asMap();
+    ::qpid::messaging::VariantMap schemaId;
+    ::qpid::messaging::VariantMap values;
+    ::qpid::messaging::VariantMap headers;
+
+    map_["_schema_id"] = mapEncodeSchemaId(event.getPackageName(),
+                                           event.getEventName(),
+                                           event.getMd5Sum());
+    event.mapEncode(values);
+    map_["_values"] = values;
+    map_["_timestamp"] = uint64_t(Duration(now()));
+    map_["_severity"] = sev;
+
+    headers["method"] = "indication";
+    headers["qmf.opcode"] = "_data_indication";
+    headers["qmf.content"] = "_event";
+    headers["qmf.agent"] = std::string(agentName);
+
+    content.encode();
+
+    sendBuffer(msg.getContent(), 0, headers, mExchange,
                "console.event.1.0." + event.getPackageName() + "." + event.getEventName());
 }
 
@@ -365,6 +406,61 @@ void ManagementAgent::sendBuffer(Buffer&
     } catch(exception&) {}
 }
 
+
+void ManagementAgent::sendBuffer(const std::string& data,
+                                 const uint32_t sequence,
+                                 const qpid::messaging::VariantMap headers,
+                                 qpid::broker::Exchange::shared_ptr exchange,
+                                 string   routingKey)
+{
+    qpid::messaging::VariantMap::const_iterator i;
+
+    if (suppressed) {
+        QPID_LOG(trace, "Suppressed management message to " << routingKey);
+        return;
+    }
+    if (exchange.get() == 0) return;
+
+    intrusive_ptr<Message> msg(new Message());
+    AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange->getName (), 0, 0)));
+    AMQFrame header((AMQHeaderBody()));
+    AMQFrame content((AMQContentBody(data)));
+
+    method.setEof(false);
+    header.setBof(false);
+    header.setEof(false);
+    content.setBof(false);
+
+    msg->getFrames().append(method);
+    msg->getFrames().append(header);
+
+    MessageProperties* props =
+        msg->getFrames().getHeaders()->get<MessageProperties>(true);
+    props->setContentLength(data.length());
+    if (sequence) {
+        std::stringstream seqstr;
+        seqstr << sequence;
+        props->setCorrelationId(seqstr.str());
+    }
+
+    for (i = headers.begin(); i != headers.end(); ++i) {
+        msg->getOrInsertHeaders().setString(i->first, i->second.asString());
+    }
+    msg->getOrInsertHeaders().setString("app_id", "qmf2");
+
+    DeliveryProperties* dp =
+        msg->getFrames().getHeaders()->get<DeliveryProperties>(true);
+    dp->setRoutingKey(routingKey);
+
+    msg->getFrames().append(content);
+
+    DeliverableMessage deliverable (msg);
+    try {
+        exchange->route(deliverable, routingKey, 0);
+    } catch(exception&) {}
+}
+
+
 void ManagementAgent::moveNewObjectsLH()
 {
     Mutex::ScopedLock lock (addLock);
@@ -399,12 +495,8 @@ void ManagementAgent::moveNewObjectsLH()
 
 void ManagementAgent::periodicProcessing (void)
 {
-#define BUFSIZE   65536
-#define HEADROOM  4096
-    QPID_LOG(trace, "Management agent periodic processing")
-        Mutex::ScopedLock lock (userLock);
-    char                msgChars[BUFSIZE];
-    uint32_t            contentSize;
+    QPID_LOG(trace, "Management agent periodic processing");
+    Mutex::ScopedLock lock (userLock);
     string              routingKey;
     list<pair<ObjectId, ManagementObject*> > deleteList;
 
@@ -448,43 +540,57 @@ void ManagementAgent::periodicProcessing
              !baseObject->isDeleted()))
             continue;
 
-        Buffer msgBuffer(msgChars, BUFSIZE);
+        ::qpid::messaging::Message m;
+        ::qpid::messaging::ListContent content(m);
+        ::qpid::messaging::Variant::List &list_ = content.asList();
+
         for (ManagementObjectMap::iterator iter = baseIter;
              iter != managementObjects.end();
              iter++) {
             ManagementObject* object = iter->second;
+            bool send_stats, send_props;
             if (baseObject->isSameClass(*object) && object->getFlags() == 0) {
                 object->setFlags(1);
                 if (object->getConfigChanged() || object->getInstChanged())
                     object->setUpdateTime();
 
-                if (object->getConfigChanged() || object->getForcePublish() || object->isDeleted()) {
-                    encodeHeader(msgBuffer, 'c');
-                    object->writeProperties(msgBuffer);
-                    pcount++;
-                }
-        
-                if (object->hasInst() && (object->getInstChanged() || object->getForcePublish())) {
-                    encodeHeader(msgBuffer, 'i');
-                    object->writeStatistics(msgBuffer);
-                    scount++;
+                send_props = (object->getConfigChanged() || object->getForcePublish() || object->isDeleted());
+                send_stats = (object->hasInst() && (object->getInstChanged() || object->getForcePublish()));
+
+                if (send_stats || send_props) {
+                    ::qpid::messaging::Variant::Map  map_;
+                    ::qpid::messaging::Variant::Map values;
+
+                    map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
+                                                           object->getClassName(),
+                                                           object->getMd5Sum());
+                    object->mapEncodeValues(values, send_props, send_stats);
+                    map_["_values"] = values;
+                    list_.push_back(map_);
+
+                    if (send_props) pcount++;
+                    if (send_stats) scount++;
                 }
 
                 if (object->isDeleted())
                     deleteList.push_back(pair<ObjectId, ManagementObject*>(iter->first, object));
                 object->setForcePublish(false);
-
-                if (msgBuffer.available() < HEADROOM)
-                    break;
             }
         }
 
-        contentSize = BUFSIZE - msgBuffer.available();
-        if (contentSize > 0) {
-            msgBuffer.reset();
+        content.encode();
+        const std::string &str = m.getContent();
+        if (str.length()) {
             stringstream key;
+            ::qpid::messaging::Variant::Map  headers;
             key << "console.obj.1.0." << baseObject->getPackageName() << "." << baseObject->getClassName();
-            sendBuffer(msgBuffer, contentSize, mExchange, key.str());
+
+            headers["method"] = "indication";
+            headers["qmf.opcode"] = "_data_indication";
+            headers["qmf.content"] = "_data";
+            headers["qmf.agent"] = std::string(agentName);
+
+            sendBuffer(str, 0, headers, mExchange, key.str());
             QPID_LOG(trace, "SEND Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount);
         }
     }
@@ -502,15 +608,33 @@ void ManagementAgent::periodicProcessing
     for (ManagementObjectVector::iterator cdIter = deletedManagementObjects.begin();
          cdIter != deletedManagementObjects.end(); cdIter++) {
         collisionDeletions = true;
-        Buffer msgBuffer(msgChars, BUFSIZE);
-        encodeHeader(msgBuffer, 'c');
-        (*cdIter)->writeProperties(msgBuffer);
-        contentSize = BUFSIZE - msgBuffer.available ();
-        msgBuffer.reset ();
-        stringstream key;
-        key << "console.obj.1.0." << (*cdIter)->getPackageName() << "." << (*cdIter)->getClassName();
-        sendBuffer (msgBuffer, contentSize, mExchange, key.str());
-        QPID_LOG(trace, "SEND ContentInd for deleted object to=" << key.str());
+        {
+            ::qpid::messaging::Message m;
+            ::qpid::messaging::ListContent content(m);
+            ::qpid::messaging::Variant::List &list_ = content.asList();
+            ::qpid::messaging::Variant::Map  map_;
+            ::qpid::messaging::Variant::Map values;
+            ::qpid::messaging::Variant::Map  headers;
+
+            map_["_schema_id"] = mapEncodeSchemaId((*cdIter)->getPackageName(),
+                                                   (*cdIter)->getClassName(),
+                                                   (*cdIter)->getMd5Sum());
+            (*cdIter)->mapEncodeValues(values, true, false);
+            map_["_values"] = values;
+            list_.push_back(map_);
+
+            headers["method"] = "indication";
+            headers["qmf.opcode"] = "_data_indication";
+            headers["qmf.content"] = "_data";
+            headers["qmf.agent"] = std::string(agentName);
+
+            content.encode();
+
+            stringstream key;
+            key << "console.obj.1.0." << (*cdIter)->getPackageName() << "." << (*cdIter)->getClassName();
+            sendBuffer(m.getContent(), 0, headers, mExchange, key.str());
+            QPID_LOG(trace, "SEND ContentInd for deleted object to=" << key.str());
+        }
     }
 
     if (!deleteList.empty() || collisionDeletions) {
@@ -519,6 +643,9 @@ void ManagementAgent::periodicProcessing
     }
 
     {
+#define BUFSIZE   65536
+        uint32_t            contentSize;
+        char                msgChars[BUFSIZE];
         Buffer msgBuffer(msgChars, BUFSIZE);
         encodeHeader(msgBuffer, 'h');
         msgBuffer.putLongLong(uint64_t(Duration(now())));
@@ -541,18 +668,31 @@ void ManagementAgent::deleteObjectNowLH(
     if (!object->isDeleted())
         return;
 
-#define DNOW_BUFSIZE 2048
-    char     msgChars[DNOW_BUFSIZE];
-    uint32_t contentSize;
-    Buffer   msgBuffer(msgChars, DNOW_BUFSIZE);
-
-    encodeHeader(msgBuffer, 'c');
-    object->writeProperties(msgBuffer);
-    contentSize = msgBuffer.getPosition();
-    msgBuffer.reset();
+    ::qpid::messaging::Message m;
+    ::qpid::messaging::ListContent content(m);
+    ::qpid::messaging::Variant::List &list_ = content.asList();
+    ::qpid::messaging::Variant::Map  map_;
+    ::qpid::messaging::Variant::Map values;
+
+    map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
+                                           object->getClassName(),
+                                           object->getMd5Sum());
+    object->mapEncodeValues(values, true, false);
+    map_["_values"] = values;
+    list_.push_back(map_);
+
     stringstream key;
     key << "console.obj.1.0." << object->getPackageName() << "." << object->getClassName();
-    sendBuffer(msgBuffer, contentSize, mExchange, key.str());
+
+    content.encode();
+
+    ::qpid::messaging::Variant::Map  headers;
+    headers["method"] = "indication";
+    headers["qmf.opcode"] = "_data_indication";
+    headers["qmf.content"] = "_data";
+    headers["qmf.agent"] = std::string(agentName);
+
+    sendBuffer(m.getContent(), 0, headers, mExchange, key.str());
     QPID_LOG(trace, "SEND Immediate(delete) ContentInd to=" << key.str());
 
     managementObjects.erase(oid);
@@ -621,7 +761,9 @@ void ManagementAgent::handleMethodReques
     uint32_t outLen;
     AclModule* acl = broker->getAcl();
 
-    ObjectId objId(inBuffer);
+    //ObjectId objId(inBuffer);
+    assert(false); // KAG TODO FIXME
+    ObjectId objId(std::string("fleabag???"));
     inBuffer.getShortString(packageName);
     inBuffer.getShortString(className);
     inBuffer.getBin128(hash);
@@ -674,7 +816,8 @@ void ManagementAgent::handleMethodReques
             try {
                 outBuffer.record();
                 Mutex::ScopedUnlock u(userLock);
-                iter->second->doMethod(methodName, inBuffer, outBuffer);
+                //iter->second->doMethod(methodName, inBuffer, outBuffer);
+                assert(false); // KAG TODO FIX
             } catch(exception& e) {
                 outBuffer.restore();
                 outBuffer.putLong(Manageable::STATUS_EXCEPTION);
@@ -814,7 +957,8 @@ void ManagementAgent::SchemaClass::appen
     // is from a remote management agent, send the stored schema information.
 
     if (writeSchemaCall != 0)
-        writeSchemaCall(buf);
+        //writeSchemaCall(buf);
+        assert(false); // KAG TODO FIX
     else
         buf.putRawData(reinterpret_cast<uint8_t*>(&data[0]), data.size());
 }
@@ -991,7 +1135,7 @@ void ManagementAgent::handleAttachReques
     agent->mgmtObject->set_connectionRef(agent->connectionRef);
     agent->mgmtObject->set_label        (label);
     agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId());
-    agent->mgmtObject->set_systemId     (systemId);
+    agent->mgmtObject->set_systemId     ((const unsigned char*)systemId.data());
     agent->mgmtObject->set_brokerBank   (brokerBank);
     agent->mgmtObject->set_agentBank    (assignedBank);
     addObject (agent->mgmtObject, 0, true);
@@ -1034,19 +1178,29 @@ void ManagementAgent::handleGetQueryLH (
         ManagementObjectMap::iterator iter = numericFind(selector);
         if (iter != managementObjects.end()) {
             ManagementObject* object = iter->second;
-            Buffer   outBuffer (outputBuffer, MA_BUFFER_SIZE);
-            uint32_t outLen;
+            ::qpid::messaging::Message m;
+            ::qpid::messaging::ListContent content(m);
+            ::qpid::messaging::Variant::List &list_ = content.asList();
+            ::qpid::messaging::Variant::Map  map_;
+            ::qpid::messaging::Variant::Map values;
+            ::qpid::messaging::Variant::Map headers;
 
             if (object->getConfigChanged() || object->getInstChanged())
                 object->setUpdateTime();
 
             if (!object->isDeleted()) {
-                encodeHeader(outBuffer, 'g', sequence);
-                object->writeProperties(outBuffer);
-                object->writeStatistics(outBuffer, true);
-                outLen = MA_BUFFER_SIZE - outBuffer.available ();
-                outBuffer.reset ();
-                sendBuffer(outBuffer, outLen, dExchange, replyToKey);
+                object->mapEncodeValues(values, true, true); // write both stats and properties
+                map_["_values"] = values;
+                list_.push_back(map_);
+
+                headers["method"] = "response";
+                headers["qmf.opcode"] = "_query_response";
+                headers["qmf.content"] = "_data";
+                headers["qmf.agent"] = std::string(agentName);
+
+                content.encode();
+
+                sendBuffer(m.getContent(), sequence, headers, dExchange, replyToKey);
                 QPID_LOG(trace, "SEND GetResponse to=" << replyToKey << " seq=" << sequence);
             }
         }
@@ -1061,19 +1215,29 @@ void ManagementAgent::handleGetQueryLH (
          iter++) {
         ManagementObject* object = iter->second;
         if (object->getClassName () == className) {
-            Buffer   outBuffer (outputBuffer, MA_BUFFER_SIZE);
-            uint32_t outLen;
+            ::qpid::messaging::Message m;
+            ::qpid::messaging::ListContent content(m);
+            ::qpid::messaging::Variant::List &list_ = content.asList();
+            ::qpid::messaging::Variant::Map  map_;
+            ::qpid::messaging::Variant::Map values;
+            ::qpid::messaging::Variant::Map headers;
 
             if (object->getConfigChanged() || object->getInstChanged())
                 object->setUpdateTime();
 
             if (!object->isDeleted()) {
-                encodeHeader(outBuffer, 'g', sequence);
-                object->writeProperties(outBuffer);
-                object->writeStatistics(outBuffer, true);
-                outLen = MA_BUFFER_SIZE - outBuffer.available ();
-                outBuffer.reset ();
-                sendBuffer(outBuffer, outLen, dExchange, replyToKey);
+                object->mapEncodeValues(values, true, true); // write both stats and properties
+                map_["_values"] = values;
+                list_.push_back(map_);
+
+                headers["method"] = "response";
+                headers["qmf.opcode"] = "_query_response";
+                headers["qmf.content"] = "_data";
+                headers["qmf.agent"] = std::string(agentName);
+
+                content.encode();
+
+                sendBuffer(m.getContent(), sequence, headers, dExchange, replyToKey);
                 QPID_LOG(trace, "SEND GetResponse to=" << replyToKey << " seq=" << sequence);
             }
         }
@@ -1096,7 +1260,9 @@ bool ManagementAgent::authorizeAgentMess
     inBuffer.reset();
 
     if (!checkHeader(inBuffer, &opcode, &sequence))
-        return false;
+        // KAG TODO: handle new map style messages also!
+        //return false;
+        assert(false);
 
     if (opcode == 'M') {
         // TODO: check method call against ACL list.
@@ -1111,7 +1277,9 @@ bool ManagementAgent::authorizeAgentMess
         string  methodName;
 
         map<acl::Property, string> params;
-        ObjectId objId(inBuffer);
+        //ObjectId objId(inBuffer);
+        inBuffer.getLongLong();
+        inBuffer.getLongLong();
         inBuffer.getShortString(packageName);
         inBuffer.getShortString(className);
         inBuffer.getBin128(hash);
@@ -1132,6 +1300,7 @@ bool ManagementAgent::authorizeAgentMess
             Buffer   outBuffer(outputBuffer, MA_BUFFER_SIZE);
             uint32_t outLen;
 
+            // KAG TODO: old-style response
             encodeHeader(outBuffer, 'm', sequence);
             outBuffer.putLong(Manageable::STATUS_FORBIDDEN);
             outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN));
@@ -1173,6 +1342,7 @@ void ManagementAgent::dispatchAgentComma
     uint32_t bufferLen = inBuffer.getPosition();
     inBuffer.reset();
 
+    // KAG TODO: need to handle map style method requests
     while (inBuffer.getPosition() < bufferLen) {
         if (!checkHeader(inBuffer, &opcode, &sequence))
             return;

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementAgent.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementAgent.h?rev=924309&r1=924308&r2=924309&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementAgent.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementAgent.h Wed Mar 17 14:36:35 2010
@@ -256,6 +256,7 @@ private:
     typedef std::pair<std::string,std::string> MethodName;
     typedef std::map<MethodName, std::string> DisallowedMethods;
     DisallowedMethods disallowed;
+    std::string agentName;  // KAG TODO FIX
 
 
 #   define MA_BUFFER_SIZE 65536
@@ -272,6 +273,11 @@ private:
                              uint32_t                     length,
                              qpid::broker::Exchange::shared_ptr exchange,
                              std::string                  routingKey);
+    void sendBuffer(const std::string&     data,
+                    const uint32_t sequence,
+                    const qpid::messaging::VariantMap headers,
+                    qpid::broker::Exchange::shared_ptr exchange,
+                    std::string routingKey);
     void moveNewObjectsLH();
 
     bool authorizeAgentMessageLH(qpid::broker::Message& msg);



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org