You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2010/03/18 22:14:55 UTC

svn commit: r924995 - in /qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid: agent/ManagementAgentImpl.cpp management/ManagementAgent.cpp management/ManagementAgent.h

Author: kgiusti
Date: Thu Mar 18 21:14:54 2010
New Revision: 924995

URL: http://svn.apache.org/viewvc?rev=924995&view=rev
Log:
agent side method request handling

Modified:
    qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
    qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementAgent.cpp
    qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementAgent.h

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp?rev=924995&r1=924994&r2=924995&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp Thu Mar 18 21:14:54 2010
@@ -472,60 +472,56 @@ void ManagementAgentImpl::handleConsoleA
 
 void ManagementAgentImpl::invokeMethodRequest(const std::string& body, uint32_t sequence, string replyTo)
 {
-#if 1
-    (void)body;
-    (void)sequence;
-    (void)replyTo;
-#else
     string   methodName;
-    string   packageName;
-    string   className;
-    uint8_t  hash[16];
-    Buffer   outBuffer(outputBuffer, MA_BUFFER_SIZE);
-    uint32_t outLen;
     qpid::messaging::Message inMsg(body);
     qpid::messaging::MapView inMap(inMsg);
-    qpid::messaging::MapView::const_iterator i;
+    qpid::messaging::MapView::const_iterator oid, mid;
 
-    if ((i = inMap.find("_object_id")) == _map.end()) {
-        // KAG TODO: TBD!!
-    }
-    //ObjectId objId(inBuffer);
-    ObjectId objId(std::string("foobag?"));
+    qpid::messaging::Message outMsg;
+    qpid::messaging::MapContent outMap(outMsg);
 
-    inBuffer.getShortString(packageName);
-    inBuffer.getShortString(className);
-    inBuffer.getBin128(hash);
-    inBuffer.getShortString(methodName);
+    if ((oid = inMap.find("_object_id")) == inMap.end() ||
+        (mid = inMap.find("_method_name")) == inMap.end()) {
+        ((outMap["_error"].asMap())["_values"].asMap())["_status"] = Manageable::STATUS_PARAMETER_INVALID;
+        ((outMap["_error"].asMap())["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID);
+    } else {
+        std::string methodName;
+        ObjectId objId;
+        qpid::messaging::Variant::Map inArgs;
 
-    encodeHeader(outBuffer, 'm', sequence);
+        try {
+            // coversions will throw if input is invalid.
+            objId = ObjectId(oid->second.asMap());
+            methodName = mid->second.getString();
+
+            mid = inMap.find("_arguments");
+            if (mid != inMap.end()) {
+                inArgs = (mid->second).asMap();
+            }
 
-    ManagementObjectMap::iterator iter = managementObjects.find(objId);
-    if (iter == managementObjects.end() || iter->second->isDeleted()) {
-        outBuffer.putLong        (Manageable::STATUS_UNKNOWN_OBJECT);
-        outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT));
-    } else {
-        if ((iter->second->getPackageName() != packageName) ||
-            (iter->second->getClassName()   != className)) {
-            outBuffer.putLong        (Manageable::STATUS_PARAMETER_INVALID);
-            outBuffer.putMediumString(Manageable::StatusText (Manageable::STATUS_PARAMETER_INVALID));
-        }
-        else
-            try {
-                outBuffer.record();
-                //iter->second->doMethod(methodName, inBuffer, outBuffer);
-                assert(false); // TODO: fix above
-            } catch(exception& e) {
-                outBuffer.restore();
-                outBuffer.putLong(Manageable::STATUS_EXCEPTION);
-                outBuffer.putMediumString(e.what());
+            ManagementObjectMap::iterator iter = managementObjects.find(objId);
+            if (iter == managementObjects.end() || iter->second->isDeleted()) {
+                ((outMap["_error"].asMap())["_values"].asMap())["_status"] = Manageable::STATUS_UNKNOWN_OBJECT;
+                ((outMap["_error"].asMap())["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT);
+            } else {
+
+                iter->second->doMethod(methodName, inArgs, outMap.asMap());
             }
+
+        } catch(exception& e) {
+            ((outMap["_error"].asMap())["_values"].asMap())["_status"] = Manageable::STATUS_EXCEPTION;
+            ((outMap["_error"].asMap())["_values"].asMap())["_status_text"] = e.what();
+        }
     }
 
-    outLen = MA_BUFFER_SIZE - outBuffer.available();
-    outBuffer.reset();
-    connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyTo);
-#endif    
+    qpid::messaging::Variant::Map headers;
+    headers["method"] = "response";
+    headers["qmf.opcode"] = "_method_response";
+    headers["qmf.content"] = "_data";
+    headers["qmf.agent"] = name_address;
+
+    outMap.encode();
+    connThreadBody.sendBuffer(outMsg.getContent(), sequence, headers, "qmf.default.direct", replyTo);
 }
 
 void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, string replyTo)

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=924995&r1=924994&r2=924995&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementAgent.cpp Thu Mar 18 21:14:54 2010
@@ -439,9 +439,7 @@ void ManagementAgent::sendBuffer(const s
         msg->getFrames().getHeaders()->get<MessageProperties>(true);
     props->setContentLength(data.length());
     if (sequence) {
-        std::stringstream seqstr;
-        seqstr << sequence;
-        props->setCorrelationId(seqstr.str());
+        props->setCorrelationId(boost::lexical_cast<std::string>(sequence));
     }
 
     for (i = headers.begin(); i != headers.end(); ++i) {
@@ -754,6 +752,13 @@ bool ManagementAgent::dispatchCommand (D
 void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey,
                                              uint32_t sequence, const ConnectionToken* connToken)
 {
+    QPID_LOG(warning, "Ignoring old-format QMF Method Request!!!");
+#if 1
+    (void)inBuffer;
+    (void)replyToKey;
+    (void)sequence;
+    (void)connToken;
+#else
     // @todo KAG use new method req format
     string   methodName;
     string   packageName;
@@ -831,8 +836,111 @@ void ManagementAgent::handleMethodReques
     outBuffer.reset();
     sendBuffer(outBuffer, outLen, dExchange, replyToKey);
     QPID_LOG(trace, "SEND MethodResponse to=" << replyToKey << " seq=" << sequence);
+#endif
 }
 
+
+void ManagementAgent::handleMethodRequestLH (const std::string& body, string replyTo,
+                                             uint32_t sequence, const ConnectionToken* connToken)
+{
+    string   methodName;
+    qpid::messaging::Message inMsg(body);
+    qpid::messaging::MapView inMap(inMsg);
+    qpid::messaging::MapView::const_iterator oid, mid;
+
+    qpid::messaging::Message outMsg;
+    qpid::messaging::MapContent outMap(outMsg);
+    qpid::messaging::Variant::Map headers;
+
+    headers["method"] = "response";
+    headers["qmf.opcode"] = "_method_response";
+    headers["qmf.content"] = "_data";
+    headers["qmf.agent"] = std::string(agentName);
+
+    if ((oid = inMap.find("_object_id")) == inMap.end() ||
+        (mid = inMap.find("_method_name")) == inMap.end())
+    {
+        ((outMap["_error"].asMap())["_values"].asMap())["_status"] = Manageable::STATUS_PARAMETER_INVALID;
+        ((outMap["_error"].asMap())["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID);
+        outMap.encode();
+        sendBuffer(outMsg.getContent(), sequence, headers, dExchange, replyTo);
+        return;
+    }
+
+    ObjectId objId;
+    qpid::messaging::Variant::Map inArgs;
+
+    try {
+        // coversions will throw if input is invalid.
+        objId = ObjectId(oid->second.asMap());
+        methodName = mid->second.getString();
+
+        mid = inMap.find("_arguments");
+        if (mid != inMap.end()) {
+            inArgs = (mid->second).asMap();
+        }
+    } catch(exception& e) {
+        ((outMap["_error"].asMap())["_values"].asMap())["_status"] = Manageable::STATUS_EXCEPTION;
+        ((outMap["_error"].asMap())["_values"].asMap())["_status_text"] = e.what();
+        outMap.encode();
+        sendBuffer(outMsg.getContent(), sequence, headers, dExchange, replyTo);
+        return;
+    }
+
+    ManagementObjectMap::iterator iter = managementObjects.find(objId);
+
+    if (iter == managementObjects.end() || iter->second->isDeleted()) {
+        ((outMap["_error"].asMap())["_values"].asMap())["_status"] = Manageable::STATUS_UNKNOWN_OBJECT;
+        ((outMap["_error"].asMap())["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT);
+        outMap.encode();
+        sendBuffer(outMsg.getContent(), sequence, headers, dExchange, replyTo);
+        return;
+    }
+
+    // validate
+    AclModule* acl = broker->getAcl();
+    DisallowedMethods::const_iterator i;
+
+    i = disallowed.find(std::make_pair(iter->second->getClassName(), methodName));
+    if (i != disallowed.end()) {
+        ((outMap["_error"].asMap())["_values"].asMap())["_status"] = Manageable::STATUS_FORBIDDEN;
+        ((outMap["_error"].asMap())["_values"].asMap())["_status_text"] = i->second;
+        outMap.encode();
+        sendBuffer(outMsg.getContent(), sequence, headers, dExchange, replyTo);
+        QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << sequence);
+        return;
+    }
+
+    if (acl != 0) {
+        string userId = ((const qpid::broker::ConnectionState*) connToken)->getUserId();
+        map<acl::Property, string> params;
+        params[acl::PROP_SCHEMAPACKAGE] = iter->second->getPackageName();
+        params[acl::PROP_SCHEMACLASS]   = iter->second->getClassName();
+
+        if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_METHOD, methodName, &params)) {
+            ((outMap["_error"].asMap())["_values"].asMap())["_status"] = Manageable::STATUS_FORBIDDEN;
+            ((outMap["_error"].asMap())["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_FORBIDDEN);
+            outMap.encode();
+            sendBuffer(outMsg.getContent(), sequence, headers, dExchange, replyTo);
+            QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence);
+            return;
+        }
+    }
+
+    // invoke the method
+
+    try {
+        iter->second->doMethod(methodName, inArgs, outMap.asMap());
+    } catch(exception& e) {
+        ((outMap["_error"].asMap())["_values"].asMap())["_status"] = Manageable::STATUS_EXCEPTION;
+        ((outMap["_error"].asMap())["_values"].asMap())["_status_text"] = e.what();
+    }
+
+    outMap.encode();
+    sendBuffer(outMsg.getContent(), sequence, headers, dExchange, replyTo);
+}
+
+
 void ManagementAgent::handleBrokerRequestLH (Buffer&, string replyToKey, uint32_t sequence)
 {
     Buffer   outBuffer (outputBuffer, MA_BUFFER_SIZE);
@@ -1326,8 +1434,6 @@ bool ManagementAgent::authorizeAgentMess
 
 void ManagementAgent::dispatchAgentCommandLH(Message& msg)
 {
-    Buffer   inBuffer(inputBuffer, MA_BUFFER_SIZE);
-    uint8_t  opcode;
     uint32_t sequence;
     string   replyToKey;
 
@@ -1340,6 +1446,9 @@ void ManagementAgent::dispatchAgentComma
     else
         return;
 
+    Buffer   inBuffer(inputBuffer, MA_BUFFER_SIZE);
+    uint8_t  opcode;
+
     if (msg.encodedSize() > MA_BUFFER_SIZE) {
         QPID_LOG(debug, "ManagementAgent::dispatchAgentCommandLH: Message too large: " <<
                  msg.encodedSize());
@@ -1350,7 +1459,39 @@ void ManagementAgent::dispatchAgentComma
     uint32_t bufferLen = inBuffer.getPosition();
     inBuffer.reset();
 
-    // KAG TODO: need to handle map style method requests
+    const framing::FieldTable *headers = msg.getApplicationHeaders();
+
+    if (headers && headers->getAsString("app_id") == "qmf2")
+    {
+        std::string opcode = headers->getAsString("qmf.opcode");
+
+        sequence = 0;
+        if (p && p->hasCorrelationId()) {
+            std::string cid = p->getCorrelationId();
+            if (!cid.empty()) {
+                try {
+                    sequence = boost::lexical_cast<uint32_t>(cid);
+                } catch(const boost::bad_lexical_cast&) {
+                    QPID_LOG(warning, "Bad correlation Id for received QMF request.");
+                    return;
+                }
+            }
+        }
+
+        if (opcode == "_method_request") {
+            std::string body;
+            inBuffer.getRawData(body, bufferLen);
+            handleMethodRequestLH(body, replyToKey, sequence, msg.getPublisher());
+            return;
+        }
+
+        QPID_LOG(warning, "Support for QMF Opcode [" << opcode << "] TBD!!!");
+        return;
+    }
+
+    // old preV2 binary messages
+
+
     while (inBuffer.getPosition() < bufferLen) {
         if (!checkHeader(inBuffer, &opcode, &sequence))
             return;

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementAgent.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementAgent.h?rev=924995&r1=924994&r2=924995&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementAgent.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementAgent.h Thu Mar 18 21:14:54 2010
@@ -310,6 +310,7 @@ private:
     void handleAttachRequestLH  (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken);
     void handleGetQueryLH       (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
     void handleMethodRequestLH  (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken);
+    void handleMethodRequestLH  (const std::string& body, std::string replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken);
 
     size_t validateSchema(framing::Buffer&, uint8_t kind);
     size_t validateTableSchema(framing::Buffer&);



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org