You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2010/03/17 18:26:29 UTC

svn commit: r924376 - in /qpid/branches/qmf-devel0.7a/qpid/cpp: examples/qmf-agent/example.cpp include/qpid/agent/ManagementAgent.h src/qpid/acl/Acl.cpp src/qpid/agent/ManagementAgentImpl.cpp src/qpid/agent/ManagementAgentImpl.h

Author: tross
Date: Wed Mar 17 17:26:29 2010
New Revision: 924376

URL: http://svn.apache.org/viewvc?rev=924376&view=rev
Log:
Added agent-naming, agent-heartbeat.

Modified:
    qpid/branches/qmf-devel0.7a/qpid/cpp/examples/qmf-agent/example.cpp
    qpid/branches/qmf-devel0.7a/qpid/cpp/include/qpid/agent/ManagementAgent.h
    qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/acl/Acl.cpp
    qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
    qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/examples/qmf-agent/example.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/examples/qmf-agent/example.cpp?rev=924376&r1=924375&r2=924376&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/examples/qmf-agent/example.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/examples/qmf-agent/example.cpp Wed Mar 17 17:26:29 2010
@@ -178,6 +178,9 @@ int main_int(int argc, char** argv)
     // Register the Qmf_example schema with the agent
     _qmf::Package packageInit(agent);
 
+    // Name the agent.
+    agent->setName("apache.org", "qmf-example");
+
     // Start the agent.  It will attempt to make a connection to the
     // management broker
     agent->init(settings, 5, false, ".magentdata");

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/include/qpid/agent/ManagementAgent.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/include/qpid/agent/ManagementAgent.h?rev=924376&r1=924375&r2=924376&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/include/qpid/agent/ManagementAgent.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/include/qpid/agent/ManagementAgent.h Wed Mar 17 17:26:29 2010
@@ -52,21 +52,6 @@ class ManagementAgent
         static ManagementAgent* agent;
     };
 
-    class Name {
-    public:
-        QMF_AGENT_EXTERN Name(const std::string &vendor,
-                              const std::string &product,
-                              const std::string &name);
-        QMF_AGENT_EXTERN Name(const std::string &fullName);
-        QMF_AGENT_EXTERN Name();
-        QMF_AGENT_EXTERN operator std::string() const;
-
-    private:
-        std::string vendor;
-        std::string product;
-        std::string name;
-    };
-
     typedef enum {
     SEV_EMERG = 0,
     SEV_ALERT = 1,
@@ -84,6 +69,17 @@ class ManagementAgent
 
     virtual int getMaxThreads() = 0;
 
+    // Set the name of the agent
+    //
+    //   vendor   - Vendor name or domain (i.e. "apache.org")
+    //   product  - Product name (i.e. "qpid")
+    //   instance - A unique identifier for this instance of the agent.
+    //              If empty, the agent will create a GUID for the instance.
+    //
+    virtual void setName(const std::string& vendor,
+                         const std::string& product,
+                         const std::string& instance="") = 0;
+
     // Connect to a management broker
     //
     //   brokerHost        - Hostname or IP address (dotted-quad) of broker.
@@ -119,9 +115,6 @@ class ManagementAgent
                       bool useExternalThread = false,
                       const std::string& storeFile = "") = 0;
 
-    // Extract the unique name for this agent
-    virtual const Name& getName() = 0;
-
     // Register a schema with the management agent.  This is normally called by the
     // package initializer generated by the management code generator.
     //

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/acl/Acl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/acl/Acl.cpp?rev=924376&r1=924375&r2=924376&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/acl/Acl.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/acl/Acl.cpp Wed Mar 17 17:26:29 2010
@@ -23,6 +23,7 @@
 #include "qpid/Plugin.h"
 #include "qpid/Options.h"
 #include "qpid/log/Logger.h"
+#include "qpid/messaging/Variant.h"
 #include "qmf/org/apache/qpid/acl/Package.h"
 #include "qmf/org/apache/qpid/acl/EventAllow.h"
 #include "qmf/org/apache/qpid/acl/EventDeny.h"
@@ -94,7 +95,7 @@ Acl::Acl (AclValues& av, Broker& b): acl
                    " ObjectType:" << AclHelper::getObjectTypeStr(objType) << " Name:" << name );
           agent->raiseEvent(_qmf::EventAllow(id,  AclHelper::getActionStr(action),
                                              AclHelper::getObjectTypeStr(objType),
-                                             name, framing::FieldTable()));
+                                             name, messaging::Variant::Map()));
 	  case ALLOW:
 	      return true;
 	  case DENY:
@@ -106,7 +107,7 @@ Acl::Acl (AclValues& av, Broker& b): acl
           QPID_LOG(info, "ACL Deny id:" << id << " action:" << AclHelper::getActionStr(action) << " ObjectType:" << AclHelper::getObjectTypeStr(objType) << " Name:" << name);
           agent->raiseEvent(_qmf::EventDeny(id, AclHelper::getActionStr(action),
                                             AclHelper::getObjectTypeStr(objType),
-                                            name, framing::FieldTable()));
+                                            name, messaging::Variant::Map()));
           return false;
 	  }
       return false;

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp?rev=924376&r1=924375&r2=924376&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp Wed Mar 17 17:26:29 2010
@@ -118,6 +118,21 @@ ManagementAgentImpl::~ManagementAgentImp
     }
 }
 
+void ManagementAgentImpl::setName(const string& vendor, const string& product, const string& instance)
+{
+    attrMap["_vendor"] = vendor;
+    attrMap["_product"] = product;
+    string inst;
+    if (instance.empty()) {
+        inst = qpid::messaging::Uuid(true).str();
+    } else
+        inst = instance;
+
+   name_address = vendor + ":" + product + ":" + inst;
+   attrMap["_instance"] = inst;
+   attrMap["_name"] = name_address;
+}
+
 void ManagementAgentImpl::init(const string& brokerHost,
                                uint16_t brokerPort,
                                uint16_t intervalSeconds,
@@ -224,7 +239,7 @@ void ManagementAgentImpl::raiseEvent(con
     headers["method"] = "indication";
     headers["qmf.opcode"] = "_data_indication";
     headers["qmf.content"] = "_event";
-    headers["qmf.agent"] = std::string(agentName);
+    headers["qmf.agent"] = name_address;
 
     content.encode();
     connThreadBody.sendBuffer(msg.getContent(), 0,
@@ -288,20 +303,7 @@ void ManagementAgentImpl::setSignalCallb
 
 void ManagementAgentImpl::startProtocol()
 {
-    char    rawbuffer[512];
-    Buffer  buffer(rawbuffer, 512);
-
-    connected = true;
-    encodeHeader(buffer, 'A');
-    buffer.putShortString("RemoteAgent [C++]");
-    systemId.encode (buffer);
-    buffer.putLong(requestedBrokerBank);
-    buffer.putLong(requestedAgentBank);
-    uint32_t length = buffer.getPosition();
-    buffer.reset();
-    connThreadBody.sendBuffer(buffer, length, "qpid.management", "broker");
-    QPID_LOG(trace, "SENT AttachRequest: reqBroker=" << requestedBrokerBank <<
-             " reqAgent=" << requestedAgentBank);
+    sendHeartbeat();
 }
 
 void ManagementAgentImpl::storeData(bool requested)
@@ -337,6 +339,27 @@ void ManagementAgentImpl::retrieveData()
     }
 }
 
+void ManagementAgentImpl::sendHeartbeat()
+{
+    static const string addr_exchange("qmf.default.topic");
+    static const string addr_key("agent.ind.heartbeat");
+
+    messaging::Message msg;
+    messaging::MapContent content(msg);
+    messaging::Variant::Map& map(content.asMap());
+    messaging::Variant::Map headers;
+
+    headers["method"] = "indication";
+    headers["qmf.opcode"] = "_agent_heartbeat_indication";
+    headers["qmf.agent"] = name_address;
+
+    map["_values"] = attrMap;
+    content.encode();
+    connThreadBody.sendBuffer(msg.getContent(), 0, headers, addr_exchange, addr_key);
+
+    QPID_LOG(trace, "SENT AgentHeartbeat name=" << name_address);
+}
+
 void ManagementAgentImpl::sendCommandComplete(string replyToKey, uint32_t sequence,
                                               uint32_t code, string text)
 {
@@ -531,7 +554,7 @@ void ManagementAgentImpl::handleGetQuery
             headers["method"] = "response";
             headers["qmf.opcode"] = "_query_response";
             headers["qmf.content"] = "_data";
-            headers["qmf.agent"] = std::string(agentName);
+            headers["qmf.agent"] = name_address;
 
             content.encode();
             connThreadBody.sendBuffer(m.getContent(), sequence, headers, "amq.direct", replyTo);
@@ -566,7 +589,7 @@ void ManagementAgentImpl::handleGetQuery
             headers["method"] = "response";
             headers["qmf.opcode"] = "_query_response";
             headers["qmf.content"] = "_data";
-            headers["qmf.agent"] = std::string(agentName);
+            headers["qmf.agent"] = name_address;
 
             content.encode();
             connThreadBody.sendBuffer(m.getContent(), sequence, headers, "amq.direct", replyTo);
@@ -753,7 +776,6 @@ void ManagementAgentImpl::encodeClassInd
 void ManagementAgentImpl::periodicProcessing()
 {
     Mutex::ScopedLock lock(agentLock);
-    uint32_t            contentSize;
     list<pair<ObjectId, ManagementObject*> > deleteList;
 
     if (!connected)
@@ -839,7 +861,7 @@ void ManagementAgentImpl::periodicProces
             headers["method"] = "indication";
             headers["qmf.opcode"] = "_data_indication";
             headers["qmf.content"] = "_data";
-            headers["qmf.agent"] = std::string(agentName);
+            headers["qmf.agent"] = name_address;
 
             connThreadBody.sendBuffer(str, 0, headers, "qpid.management", key.str());
         }
@@ -854,20 +876,7 @@ void ManagementAgentImpl::periodicProces
     }
 
     deleteList.clear();
-
-    {
-#define BUFSIZE   65536
-        char msgChars[BUFSIZE];
-        Buffer msgBuffer(msgChars, BUFSIZE);
-        encodeHeader(msgBuffer, 'h');
-        msgBuffer.putLongLong(uint64_t(Duration(now())));
-        stringstream key;
-        key << "console.heartbeat." << assignedBrokerBank << "." << assignedAgentBank;
-
-        contentSize = BUFSIZE - msgBuffer.available();
-        msgBuffer.reset();
-        connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", key.str());
-    }
+    sendHeartbeat();
 }
 
 void ManagementAgentImpl::ConnectionThread::run()
@@ -902,6 +911,7 @@ void ManagementAgentImpl::ConnectionThre
                     if (shutdown)
                         return;
                     operational = true;
+                    agent.connected = true;
                     agent.startProtocol();
                     try {
                         Mutex::ScopedUnlock _unlock(connLock);

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h?rev=924376&r1=924375&r2=924376&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h Wed Mar 17 17:26:29 2010
@@ -51,6 +51,9 @@ class ManagementAgentImpl : public Manag
     // Methods from ManagementAgent
     //
     int getMaxThreads() { return 1; }
+    void setName(const std::string& vendor,
+                 const std::string& product,
+                 const std::string& instance="");
     void init(const std::string& brokerHost = "localhost",
               uint16_t brokerPort = 5672,
               uint16_t intervalSeconds = 10,
@@ -64,7 +67,6 @@ class ManagementAgentImpl : public Manag
               uint16_t intervalSeconds = 10,
               bool useExternalThread = false,
               const std::string& storeFile = "");
-    const Name& getName();
     bool isConnected() { return connected; }
     std::string& getLastFailure() { return lastFailure; }
     void registerClass(const std::string& packageName,
@@ -141,6 +143,8 @@ class ManagementAgentImpl : public Manag
 
     void received (client::Message& msg);
 
+    qpid::messaging::Variant::Map attrMap;
+    std::string       name_address;
     uint16_t          interval;
     bool              extThread;
     sys::PipeHandle*  pipeHandle;
@@ -231,8 +235,6 @@ class ManagementAgentImpl : public Manag
 
     static const std::string storeMagicNumber;
 
-    Name agentName;
-
     void startProtocol();
     void storeData(bool requested=false);
     void retrieveData();
@@ -253,6 +255,7 @@ class ManagementAgentImpl : public Manag
                                                     const std::string& cname,
                                                     const uint8_t *md5Sum);
     bool checkHeader  (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
+    void sendHeartbeat();
     void sendCommandComplete  (std::string replyToKey, uint32_t sequence,
                                uint32_t code = 0, std::string text = std::string("OK"));
     void handleAttachResponse (qpid::framing::Buffer& inBuffer);



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org