You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2010/03/29 17:48:34 UTC

svn commit: r928808 - in /qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid: broker/Broker.cpp management/ManagementAgent.cpp management/ManagementAgent.h

Author: kgiusti
Date: Mon Mar 29 15:48:33 2010
New Revision: 928808

URL: http://svn.apache.org/viewvc?rev=928808&view=rev
Log:
broker agent qmf2 heartbeat and naming support

Modified:
    qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/Broker.cpp
    qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementAgent.cpp
    qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementAgent.h

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/Broker.cpp?rev=928808&r1=928807&r2=928808&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/broker/Broker.cpp Mon Mar 29 15:48:33 2010
@@ -168,6 +168,7 @@ Broker::Broker(const Broker::Options& co
         QPID_LOG(info, "Management enabled");
         managementAgent->configure(dataDir.isEnabled() ? dataDir.getPath() : string(),
                                    conf.mgmtPubInterval, this, conf.workerThreads + 3);
+        managementAgent->setName("apache.org", "qpidd");
         _qmf::Package packageInitializer(managementAgent.get());
 
         System* system = new System (dataDir.isEnabled() ? dataDir.getPath() : string(), this);

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=928808&r1=928807&r2=928808&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementAgent.cpp Mon Mar 29 15:48:33 2010
@@ -81,7 +81,7 @@ ManagementAgent::RemoteAgent::~RemoteAge
 ManagementAgent::ManagementAgent (const bool qmfV1, const bool qmfV2) :
     threadPoolSize(1), interval(10), broker(0), timer(0),
     startTime(uint64_t(Duration(now()))),
-    suppressed(false), agentName(""),
+    suppressed(false),
     qmf1Support(qmfV1), qmf2Support(qmfV2)
 {
     nextObjectId   = 1;
@@ -175,6 +175,27 @@ void ManagementAgent::pluginsInitialized
     timer->add(new Periodic(*this, interval));
 }
 
+
+void ManagementAgent::setName(const string& vendor, const string& product, const string& instance)
+{
+    attrMap["_vendor"] = vendor;
+    attrMap["_product"] = product;
+    string inst;
+    if (instance.empty()) {
+        if (uuid.isNull())
+        {
+            throw Exception("ManagementAgent::configure() must be called if default name is used.");
+        }
+        inst = uuid.str();
+    } else
+        inst = instance;
+
+   name_address = vendor + ":" + product + ":" + inst;
+   attrMap["_instance"] = inst;
+   attrMap["_name"] = name_address;
+}
+
+
 void ManagementAgent::writeData ()
 {
     string   filename (dataDir + "/.mbrokerdata");
@@ -342,10 +363,10 @@ void ManagementAgent::raiseEvent(const M
         headers["method"] = "indication";
         headers["qmf.opcode"] = "_data_indication";
         headers["qmf.content"] = "_event";
-        headers["qmf.agent"] = std::string(agentName);
+        headers["qmf.agent"] = name_address;
 
         stringstream key;
-        key << "agent.ind.event." << sev << "." << std::string(agentName) << "." << event.getEventName();
+        key << "agent.ind.event." << sev << "." << name_address << "." << event.getEventName();
 
         content.encode();
         sendBuffer(msg.getContent(), "", headers, v2Topic, key.str());
@@ -464,7 +485,7 @@ void ManagementAgent::sendBuffer(const s
                                  const std::string& cid,
                                  const qpid::messaging::VariantMap& headers,
                                  qpid::broker::Exchange::shared_ptr exchange,
-                                 string   routingKey)
+                                 const std::string& routingKey)
 {
     qpid::messaging::VariantMap::const_iterator i;
 
@@ -677,7 +698,7 @@ void ManagementAgent::periodicProcessing
                     headers["method"] = "indication";
                     headers["qmf.opcode"] = "_data_indication";
                     headers["qmf.content"] = "_data";
-                    headers["qmf.agent"] = std::string(agentName);
+                    headers["qmf.agent"] = name_address;
 
                     sendBuffer(body, "", headers, v2Topic, key.str());
                     QPID_LOG(trace, "SEND Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount);
@@ -733,7 +754,7 @@ void ManagementAgent::periodicProcessing
                 headers["method"] = "indication";
                 headers["qmf.opcode"] = "_data_indication";
                 headers["qmf.content"] = "_data";
-                headers["qmf.agent"] = std::string(agentName);
+                headers["qmf.agent"] = name_address;
 
                 stringstream key;
                 key << "agent.ind.data." << (*cdIter)->getPackageName() << "." << (*cdIter)->getClassName();
@@ -750,7 +771,9 @@ void ManagementAgent::periodicProcessing
         deleteOrphanedAgentsLH();
     }
 
-    {
+    // heartbeat generation
+
+    if (qmf1Support) {
 #define BUFSIZE   65536
         uint32_t            contentSize;
         char                msgChars[BUFSIZE];
@@ -764,6 +787,27 @@ void ManagementAgent::periodicProcessing
         sendBuffer (msgBuffer, contentSize, mExchange, routingKey);
         QPID_LOG(trace, "SEND HeartbeatInd to=" << routingKey);
     }
+
+    if (qmf2Support) {
+        static const string addr_key("agent.ind.heartbeat");
+
+        messaging::Message msg;
+        messaging::MapContent content(msg);
+        messaging::Variant::Map& map(content.asMap());
+        messaging::Variant::Map headers;
+
+        headers["method"] = "indication";
+        headers["qmf.opcode"] = "_agent_heartbeat_indication";
+        headers["qmf.agent"] = name_address;
+
+        map["_values"] = attrMap;
+        map["_values"].asMap()["timestamp"] = uint64_t(Duration(now()));
+        map["_values"].asMap()["heartbeat_interval"] = interval;
+        content.encode();
+        sendBuffer(msg.getContent(), "", headers, v2Topic, addr_key);
+
+        QPID_LOG(trace, "SENT AgentHeartbeat name=" << name_address);
+    }
     QPID_LOG(debug, "periodic update " << debugSnapshot());
 }
 
@@ -816,7 +860,7 @@ void ManagementAgent::deleteObjectNowLH(
         headers["method"] = "indication";
         headers["qmf.opcode"] = "_data_indication";
         headers["qmf.content"] = "_data";
-        headers["qmf.agent"] = std::string(agentName);
+        headers["qmf.agent"] = name_address;
 
         content.encode();
         sendBuffer(m.getContent(), "", headers, v2Topic, key.str());
@@ -978,7 +1022,7 @@ void ManagementAgent::handleMethodReques
 
     headers["method"] = "response";
     headers["qmf.opcode"] = "_method_response";
-    headers["qmf.agent"] = std::string(agentName);
+    headers["qmf.agent"] = name_address;
 
     if ((oid = inMap.find("_object_id")) == inMap.end() ||
         (mid = inMap.find("_method_name")) == inMap.end())
@@ -1489,7 +1533,7 @@ void ManagementAgent::handleGetQueryLH (
 }
 
 
-void ManagementAgent::handleGetQueryLH(const std::string& body, std::string& replyTo, const std::string& cid, const std::string& contentType)
+void ManagementAgent::handleGetQueryLH(const std::string& body, std::string replyTo, const std::string& cid, const std::string& contentType)
 {
     FieldTable           ft;
     FieldTable::ValuePtr value;
@@ -1511,7 +1555,7 @@ void ManagementAgent::handleGetQueryLH(c
     headers["method"] = "response";
     headers["qmf.opcode"] = "_query_response";
     headers["qmf.content"] = "_data";
-    headers["qmf.agent"] = std::string(agentName);
+    headers["qmf.agent"] = name_address;
     headers["partial"];
 
     ::qpid::messaging::Message outMsg;
@@ -1592,6 +1636,31 @@ void ManagementAgent::handleGetQueryLH(c
     QPID_LOG(trace, "SEND GetResponse (v2) to=" << replyTo << " seq=" << cid);
 }
 
+
+void ManagementAgent::handleLocateRequestLH(const string&, const string& replyTo,
+                                            const string& cid)
+{
+    QPID_LOG(trace, "RCVD AgentLocateRequest");
+
+    messaging::Message msg;
+    messaging::MapContent content(msg);
+    messaging::Variant::Map& map(content.asMap());
+    messaging::Variant::Map headers;
+
+    headers["method"] = "indication";
+    headers["qmf.opcode"] = "_agent_locate_response";
+    headers["qmf.agent"] = name_address;
+
+    map["_values"] = attrMap;
+    map["_values"].asMap()["timestamp"] = uint64_t(Duration(now()));
+    map["_values"].asMap()["heartbeat_interval"] = interval;
+    content.encode();
+    sendBuffer(msg.getContent(), cid, headers, v2Direct, replyTo);
+
+    QPID_LOG(trace, "SENT AgentLocateResponse replyTo=" << replyTo);
+}
+
+
 bool ManagementAgent::authorizeAgentMessageLH(Message& msg)
 {
     Buffer   inBuffer (inputBuffer, MA_BUFFER_SIZE);
@@ -1720,7 +1789,7 @@ bool ManagementAgent::authorizeAgentMess
 
                 headers["method"] = "response";
                 headers["qmf.opcode"] = "_method_response";
-                headers["qmf.agent"] = std::string(agentName);
+                headers["qmf.agent"] = name_address;
 
                 ((outMap["_error"].asMap())["_values"].asMap())["_status"] = Manageable::STATUS_FORBIDDEN;
                 ((outMap["_error"].asMap())["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_FORBIDDEN);
@@ -1794,6 +1863,8 @@ void ManagementAgent::dispatchAgentComma
             return handleMethodRequestLH(body, replyToKey, cid, msg.getPublisher());
         else if (opcode == "_query_request")
             return handleGetQueryLH(body, replyToKey, cid, contentType);
+        else if (opcode == "_agent_locate_request")
+            return handleLocateRequestLH(body, replyToKey, cid);
 
         QPID_LOG(warning, "Support for QMF Opcode [" << opcode << "] TBD!!!");
         return;

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementAgent.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementAgent.h?rev=928808&r1=928807&r2=928808&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementAgent.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementAgent.h Mon Mar 29 15:48:33 2010
@@ -76,6 +76,9 @@ public:
     /** Called by cluster to suppress management output during update. */
     void suppress(bool s) { suppressed = s; }
 
+    void setName(const std::string& vendor,
+                 const std::string& product,
+                 const std::string& instance="");
     void setInterval(uint16_t _interval) { interval = _interval; }
     void setExchange(qpid::broker::Exchange::shared_ptr mgmtExchange,
                      qpid::broker::Exchange::shared_ptr directExchange);
@@ -277,7 +280,12 @@ private:
     typedef std::pair<std::string,std::string> MethodName;
     typedef std::map<MethodName, std::string> DisallowedMethods;
     DisallowedMethods disallowed;
-    std::string agentName;  // KAG TODO FIX
+
+    // Agent name and address
+    qpid::messaging::Variant::Map attrMap;
+    std::string       name_address;
+
+    // supported management protocol
     bool qmf1Support;
     bool qmf2Support;
 
@@ -300,7 +308,7 @@ private:
                     const std::string&     cid,
                     const qpid::messaging::VariantMap& headers,
                     qpid::broker::Exchange::shared_ptr exchange,
-                    std::string routingKey);
+                    const std::string& routingKey);
     void moveNewObjectsLH();
 
     bool authorizeAgentMessageLH(qpid::broker::Message& msg);
@@ -333,8 +341,10 @@ private:
     void handleAttachRequestLH  (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken);
     void handleGetQueryLH       (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
     void handleMethodRequestLH  (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken);
-    void handleGetQueryLH       (const std::string& body, std::string& replyToKey, const std::string& cid, const std::string& contentType);
+    void handleGetQueryLH       (const std::string& body, std::string replyToKey, const std::string& cid, const std::string& contentType);
     void handleMethodRequestLH  (const std::string& body, std::string replyToKey, const std::string& cid, const qpid::broker::ConnectionToken* connToken);
+    void handleLocateRequestLH  (const std::string& body, const std::string &replyToKey, const std::string& cid);
+
 
     size_t validateSchema(framing::Buffer&, uint8_t kind);
     size_t validateTableSchema(framing::Buffer&);



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org