You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2010/03/18 16:58:57 UTC

svn commit: r924873 - in /qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent: ManagementAgentImpl.cpp ManagementAgentImpl.h

Author: kgiusti
Date: Thu Mar 18 15:58:57 2010
New Revision: 924873

URL: http://svn.apache.org/viewvc?rev=924873&view=rev
Log:
checkpoint prior to schema revert

Modified:
    qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
    qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp?rev=924873&r1=924872&r2=924873&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp Thu Mar 18 15:58:57 2010
@@ -244,7 +244,7 @@ void ManagementAgentImpl::raiseEvent(con
     content.encode();
     connThreadBody.sendBuffer(msg.getContent(), 0,
                               headers,
-                              "qpid.management", key.str());
+                              "qmf.default.topic", key.str());
 }
 
 uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit)
@@ -264,8 +264,7 @@ uint32_t ManagementAgentImpl::pollCallba
         methodQueue.pop_front();
         {
             Mutex::ScopedUnlock unlock(agentLock);
-            Buffer inBuffer(const_cast<char*>(item->body.c_str()), item->body.size());
-            invokeMethodRequest(inBuffer, item->sequence, item->replyTo);
+            invokeMethodRequest(item->body, item->sequence, item->replyTo);
             delete item;
         }
     }
@@ -446,17 +445,20 @@ void ManagementAgentImpl::handleSchemaRe
         ClassMap& cMap = pIter->second;
         ClassMap::iterator cIter = cMap.find(key);
         if (cIter != cMap.end()) {
-            //SchemaClass& schema = cIter->second;
-            Buffer   outBuffer(outputBuffer, MA_BUFFER_SIZE);
-            uint32_t outLen;
-
-            encodeHeader(outBuffer, 's', sequence);
-            //schema.writeSchemaCall(outBuffer);
-            assert(false); // TODO FIX ABOVE
-            outLen = MA_BUFFER_SIZE - outBuffer.available();
-            outBuffer.reset();
-            connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker");
+            SchemaClass& schema = cIter->second;
+            ::qpid::messaging::Message m;
+            ::qpid::messaging::MapContent content(m);
 
+            schema.writeSchemaCall(content.asMap());
+            
+            ::qpid::messaging::VariantMap headers;
+            headers["method"] = "response";
+            headers["qmf.opcode"] = "_query_response";
+            headers["qmf.content"] = "_schema_class";
+            headers["qmf.agent"] = name_address;
+
+            content.encode();
+            connThreadBody.sendBuffer(m.getContent(), sequence, headers, "qpid.management", "broker");
             QPID_LOG(trace, "SENT SchemaInd: package=" << packageName << " class=" << key.name);
         }
     }
@@ -470,18 +472,29 @@ void ManagementAgentImpl::handleConsoleA
     QPID_LOG(trace, "RCVD ConsoleAddedInd");
 }
 
-void ManagementAgentImpl::invokeMethodRequest(Buffer& inBuffer, uint32_t sequence, string replyTo)
+void ManagementAgentImpl::invokeMethodRequest(const std::string& body, uint32_t sequence, string replyTo)
 {
+#if 1
+    (void)body;
+    (void)sequence;
+    (void)replyTo;
+#else
     string   methodName;
     string   packageName;
     string   className;
     uint8_t  hash[16];
     Buffer   outBuffer(outputBuffer, MA_BUFFER_SIZE);
     uint32_t outLen;
+    qpid::messaging::Message inMsg(body);
+    qpid::messaging::MapView inMap(inMsg);
+    qpid::messaging::MapView::const_iterator i;
 
-    assert(false);  // TODO FIX OBJ ID!!
+    if ((i = inMap.find("_object_id")) == _map.end()) {
+        // KAG TODO: TBD!!
+    }
     //ObjectId objId(inBuffer);
     ObjectId objId(std::string("foobag?"));
+
     inBuffer.getShortString(packageName);
     inBuffer.getShortString(className);
     inBuffer.getBin128(hash);
@@ -514,6 +527,7 @@ void ManagementAgentImpl::invokeMethodRe
     outLen = MA_BUFFER_SIZE - outBuffer.available();
     outBuffer.reset();
     connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyTo);
+#endif    
 }
 
 void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, string replyTo)
@@ -557,7 +571,7 @@ void ManagementAgentImpl::handleGetQuery
             headers["qmf.agent"] = name_address;
 
             content.encode();
-            connThreadBody.sendBuffer(m.getContent(), sequence, headers, "amq.direct", replyTo);
+            connThreadBody.sendBuffer(m.getContent(), sequence, headers, "qmf.default.direct", replyTo);
 
             QPID_LOG(trace, "SENT ObjectInd");
         }
@@ -592,7 +606,7 @@ void ManagementAgentImpl::handleGetQuery
             headers["qmf.agent"] = name_address;
 
             content.encode();
-            connThreadBody.sendBuffer(m.getContent(), sequence, headers, "amq.direct", replyTo);
+            connThreadBody.sendBuffer(m.getContent(), sequence, headers, "qmf.default.direct", replyTo);
 
             QPID_LOG(trace, "SENT ObjectInd");
         }
@@ -601,13 +615,11 @@ void ManagementAgentImpl::handleGetQuery
     sendCommandComplete(replyTo, sequence);
 }
 
-void ManagementAgentImpl::handleMethodRequest(Buffer& inBuffer, uint32_t sequence, string replyTo)
+void ManagementAgentImpl::handleMethodRequest(const std::string& body, uint32_t sequence, string replyTo)
 {
     if (extThread) {
         Mutex::ScopedLock lock(agentLock);
-        string body;
 
-        inBuffer.getRawData(body, inBuffer.available());
         methodQueue.push_back(new QueuedMethod(sequence, replyTo, body));
         if (pipeHandle != 0) {
             pipeHandle->write("X", 1);
@@ -627,7 +639,7 @@ void ManagementAgentImpl::handleMethodRe
             inCallback = false;
         }
     } else {
-        invokeMethodRequest(inBuffer, sequence, replyTo);
+        invokeMethodRequest(body, sequence, replyTo);
     }
 
     QPID_LOG(trace, "RCVD MethodRequest");
@@ -635,25 +647,52 @@ void ManagementAgentImpl::handleMethodRe
 
 void ManagementAgentImpl::received(Message& msg)
 {
-    string   data = msg.getData();
-    Buffer   inBuffer(const_cast<char*>(data.c_str()), data.size());
-    uint8_t  opcode;
-    uint32_t sequence;
     string   replyToKey;
-
     framing::MessageProperties p = msg.getMessageProperties();
     if (p.hasReplyTo()) {
         const framing::ReplyTo& rt = p.getReplyTo();
         replyToKey = rt.getRoutingKey();
     }
 
+    if (msg.getHeaders().getAsString("app_id") == "qmf2")
+    {
+        uint32_t sequence = 0;
+        std::string opcode = msg.getHeaders().getAsString("qmf.opcode");
+        std::string cid = msg.getMessageProperties().getCorrelationId();
+        if (!cid.empty()) {
+            try {
+                sequence = boost::lexical_cast<uint32_t>(cid);
+            } catch(const boost::bad_lexical_cast&) {
+                QPID_LOG(warning, "Bad correlation Id for received QMF request.");
+                return;
+            }
+        }
+
+        if (opcode == "_method_request") {
+            handleMethodRequest(msg.getData(), sequence, replyToKey);
+            return;
+        }
+
+        QPID_LOG(warning, "Support for QMF Opcode [" << opcode << "] TBD!!!");
+        return;
+    }
+
+    // old preV2 binary messages
+    
+    uint32_t sequence;
+    string   data = msg.getData();
+    Buffer   inBuffer(const_cast<char*>(data.c_str()), data.size());
+    uint8_t  opcode;
+
+
     if (checkHeader(inBuffer, &opcode, &sequence))
     {
         if      (opcode == 'a') handleAttachResponse(inBuffer);
         else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence);
         else if (opcode == 'x') handleConsoleAddedIndication();
         else if (opcode == 'G') handleGetQuery(inBuffer, sequence, replyToKey);
-        else if (opcode == 'M') handleMethodRequest(inBuffer, sequence, replyToKey);
+        else if (opcode == 'M')
+            QPID_LOG(warning, "Ignoring old-format QMF Method Request!!!");
     }
 }
 
@@ -862,7 +901,7 @@ void ManagementAgentImpl::periodicProces
             headers["qmf.content"] = "_data";
             headers["qmf.agent"] = name_address;
 
-            connThreadBody.sendBuffer(str, 0, headers, "qpid.management", key.str(), "amqp/list");
+            connThreadBody.sendBuffer(str, 0, headers, "qmf.default.topic", key.str(), "amqp/list");
             QPID_LOG(trace, "SENT DataIndication key=" << key.str());
         }
     }

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h?rev=924873&r1=924872&r2=924873&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h Thu Mar 18 15:58:57 2010
@@ -263,9 +263,9 @@ class ManagementAgentImpl : public Manag
     void handlePackageRequest (qpid::framing::Buffer& inBuffer);
     void handleClassQuery     (qpid::framing::Buffer& inBuffer);
     void handleSchemaRequest  (qpid::framing::Buffer& inBuffer, uint32_t sequence);
-    void invokeMethodRequest  (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo);
+    void invokeMethodRequest  (const std::string& body, uint32_t sequence, std::string replyTo);
     void handleGetQuery       (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo);
-    void handleMethodRequest  (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo);
+    void handleMethodRequest  (const std::string& body, uint32_t sequence, std::string replyTo);
     void handleConsoleAddedIndication();
 };
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org