You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2011/02/02 19:11:08 UTC

svn commit: r1066557 - in /qpid/trunk/qpid/cpp/src/qpid: agent/ManagementAgentImpl.cpp agent/ManagementAgentImpl.h management/ManagementAgent.cpp management/ManagementAgent.h

Author: tross
Date: Wed Feb  2 18:11:07 2011
New Revision: 1066557

URL: http://svn.apache.org/viewvc?rev=1066557&view=rev
Log:
QPID-3032 - Bug-fix: The broker management agent and the remote C++ agent hard-coded the
assumed value of the "exchange" component of reply-to headers to either "amq.direct" or
"qmf.default.direct", depending on the circumstance.

This commit fixes this such that message replies are sent to the exchange/key pair supplied
in the reply-to header.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
    qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
    qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h

Modified: qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp?rev=1066557&r1=1066556&r2=1066557&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp Wed Feb  2 18:11:07 2011
@@ -362,7 +362,7 @@ uint32_t ManagementAgentImpl::pollCallba
         methodQueue.pop_front();
         {
             sys::Mutex::ScopedUnlock unlock(agentLock);
-            invokeMethodRequest(item->body, item->cid, item->replyTo, item->userId);
+            invokeMethodRequest(item->body, item->cid, item->replyToExchange, item->replyToKey, item->userId);
             delete item;
         }
     }
@@ -497,7 +497,7 @@ void ManagementAgentImpl::sendHeartbeat(
     QPID_LOG(trace, "SENT AgentHeartbeat name=" << name_address);
 }
 
-void ManagementAgentImpl::sendException(const string& replyToKey, const string& cid,
+void ManagementAgentImpl::sendException(const string& rte, const string& rtk, const string& cid,
                                         const string& text, uint32_t code)
 {
     Variant::Map map;
@@ -514,12 +514,12 @@ void ManagementAgentImpl::sendException(
     map["_values"] = values;
 
     MapCodec::encode(map, content);
-    connThreadBody.sendBuffer(content, cid, headers, directExchange, replyToKey);
+    connThreadBody.sendBuffer(content, cid, headers, rte, rtk);
 
     QPID_LOG(trace, "SENT Exception code=" << code <<" text=" << text);
 }
 
-void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, const string& replyTo)
+void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, const string& rte, const string& rtk)
 {
     sys::Mutex::ScopedLock lock(agentLock);
     string packageName;
@@ -546,7 +546,7 @@ void ManagementAgentImpl::handleSchemaRe
             outBuffer.putRawData(body);
             outLen = MA_BUFFER_SIZE - outBuffer.available();
             outBuffer.reset();
-            connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyTo);
+            connThreadBody.sendBuffer(outBuffer, outLen, rte, rtk);
 
             QPID_LOG(trace, "SENT SchemaInd: package=" << packageName << " class=" << key.name);
         }
@@ -561,7 +561,7 @@ void ManagementAgentImpl::handleConsoleA
     QPID_LOG(trace, "RCVD ConsoleAddedInd");
 }
 
-void ManagementAgentImpl::invokeMethodRequest(const string& body, const string& cid, const string& replyTo, const string& userId)
+void ManagementAgentImpl::invokeMethodRequest(const string& body, const string& cid, const string& rte, const string& rtk, const string& userId)
 {
     string  methodName;
     bool    failed = false;
@@ -572,11 +572,9 @@ void ManagementAgentImpl::invokeMethodRe
 
     MapCodec::decode(body, inMap);
 
-    outMap["_values"] = Variant::Map();
-
     if ((oid = inMap.find("_object_id")) == inMap.end() ||
         (mid = inMap.find("_method_name")) == inMap.end()) {
-        sendException(replyTo, cid, Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID),
+        sendException(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID),
                       Manageable::STATUS_PARAMETER_INVALID);
         failed = true;
     } else {
@@ -595,6 +593,8 @@ void ManagementAgentImpl::invokeMethodRe
                 inArgs = (mid->second).asMap();
             }
 
+            QPID_LOG(trace, "Invoking Method: name=" << methodName << " args=" << inArgs);
+
             boost::shared_ptr<ManagementObject> oPtr;
             {
                 sys::Mutex::ScopedLock lock(agentLock);
@@ -604,7 +604,7 @@ void ManagementAgentImpl::invokeMethodRe
             }
 
             if (oPtr.get() == 0) {
-                sendException(replyTo, cid, Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT),
+                sendException(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT),
                               Manageable::STATUS_UNKNOWN_OBJECT);
                 failed = true;
             } else {
@@ -617,13 +617,13 @@ void ManagementAgentImpl::invokeMethodRe
                         if (iter->first != "_status_code" && iter->first != "_status_text")
                             outMap["_arguments"].asMap()[iter->first] = iter->second;
                 } else {
-                    sendException(replyTo, cid, callMap["_status_text"], callMap["_status_code"]);
+                    sendException(rte, rtk, cid, callMap["_status_text"], callMap["_status_code"]);
                     failed = true;
                 }
             }
 
         } catch(types::InvalidConversion& e) {
-            sendException(replyTo, cid, e.what(), Manageable::STATUS_EXCEPTION);
+            sendException(rte, rtk, cid, e.what(), Manageable::STATUS_EXCEPTION);
             failed = true;
         }
     }
@@ -635,11 +635,11 @@ void ManagementAgentImpl::invokeMethodRe
         headers["qmf.opcode"] = "_method_response";
         QPID_LOG(trace, "SENT MethodResponse map=" << outMap);
         MapCodec::encode(outMap, content);
-        connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo);
+        connThreadBody.sendBuffer(content, cid, headers, rte, rtk);
     }
 }
 
-void ManagementAgentImpl::handleGetQuery(const string& body, const string& cid, const string& replyTo)
+void ManagementAgentImpl::handleGetQuery(const string& body, const string& cid, const string& rte, const string& rtk)
 {
     moveNewObjectsLH();
 
@@ -666,12 +666,12 @@ void ManagementAgentImpl::handleGetQuery
      */
     i = inMap.find("_what");
     if (i == inMap.end()) {
-        sendException(replyTo, cid, "_what element missing in Query");
+        sendException(rte, rtk, cid, "_what element missing in Query");
         return;
     }
 
     if (i->second.getType() != qpid::types::VAR_STRING) {
-        sendException(replyTo, cid, "_what element is not a string");
+        sendException(rte, rtk, cid, "_what element is not a string");
         return;
     }
 
@@ -709,8 +709,8 @@ void ManagementAgentImpl::handleGetQuery
                 headers.erase("partial");
 
                 ListCodec::encode(list_, content);
-                connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo, "amqp/list");
-                QPID_LOG(trace, "SENT QueryResponse (query by object_id) to=" << replyTo);
+                connThreadBody.sendBuffer(content, cid, headers, rte, rtk, "amqp/list");
+                QPID_LOG(trace, "SENT QueryResponse (query by object_id) to=" << rte << "/" << rtk);
                 return;
             }
         } else { // match using schema_id, if supplied
@@ -771,8 +771,8 @@ void ManagementAgentImpl::handleGetQuery
                         if (++objCount >= maxV2ReplyObjs) {
                             objCount = 0;
                             ListCodec::encode(list_, content);
-                            connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo, "amqp/list");
-                            QPID_LOG(trace, "SENT QueryResponse (query by schema_id) to=" << replyTo);
+                            connThreadBody.sendBuffer(content, cid, headers, rte, rtk, "amqp/list");
+                            QPID_LOG(trace, "SENT QueryResponse (query by schema_id) to=" << rte << "/" << rtk);
                             content.clear();
                             list_.clear();
                         }
@@ -784,8 +784,8 @@ void ManagementAgentImpl::handleGetQuery
         // Send last "non-partial" message to indicate CommandComplete
         headers.erase("partial");
         ListCodec::encode(list_, content);
-        connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo, "amqp/list");
-        QPID_LOG(trace, "SENT QueryResponse (empty with no 'partial' indicator) to=" << replyTo);
+        connThreadBody.sendBuffer(content, cid, headers, rte, rtk, "amqp/list");
+        QPID_LOG(trace, "SENT QueryResponse (last message, no 'partial' indicator) to=" << rte << "/" << rtk);
 
     } else if (i->second.asString() == "SCHEMA_ID") {
         headers["qmf.content"] = "_schema_id";
@@ -806,16 +806,16 @@ void ManagementAgentImpl::handleGetQuery
 
         headers.erase("partial");
         ListCodec::encode(list_, content);
-        connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo, "amqp/list");
-        QPID_LOG(trace, "SENT QueryResponse (SchemaId) to=" << replyTo);
+        connThreadBody.sendBuffer(content, cid, headers, rte, rtk, "amqp/list");
+        QPID_LOG(trace, "SENT QueryResponse (SchemaId) to=" << rte << "/" << rtk);
 
     } else {
         // Unknown query target
-        sendException(replyTo, cid, "Query for _what => '" + i->second.asString() + "' not supported");
+        sendException(rte, rtk, cid, "Query for _what => '" + i->second.asString() + "' not supported");
     }
 }
 
-void ManagementAgentImpl::handleLocateRequest(const string&, const string& cid, const string& replyTo)
+void ManagementAgentImpl::handleLocateRequest(const string&, const string& cid, const string& rte, const string& rtk)
 {
     QPID_LOG(trace, "RCVD AgentLocateRequest");
 
@@ -829,9 +829,9 @@ void ManagementAgentImpl::handleLocateRe
 
     getHeartbeatContent(map);
     MapCodec::encode(map, content);
-    connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo);
+    connThreadBody.sendBuffer(content, cid, headers, rte, rtk);
 
-    QPID_LOG(trace, "SENT AgentLocateResponse replyTo=" << replyTo);
+    QPID_LOG(trace, "SENT AgentLocateResponse replyTo=" << rte << "/" << rtk);
 
     {
         sys::Mutex::ScopedLock lock(agentLock);
@@ -839,12 +839,12 @@ void ManagementAgentImpl::handleLocateRe
     }
 }
 
-void ManagementAgentImpl::handleMethodRequest(const string& body, const string& cid, const string& replyTo, const string& userId)
+void ManagementAgentImpl::handleMethodRequest(const string& body, const string& cid, const string& rte, const string& rtk, const string& userId)
 {
     if (extThread) {
         sys::Mutex::ScopedLock lock(agentLock);
 
-        methodQueue.push_back(new QueuedMethod(cid, replyTo, body, userId));
+        methodQueue.push_back(new QueuedMethod(cid, rte, rtk, body, userId));
         if (pipeHandle != 0) {
             pipeHandle->write("X", 1);
         } else if (notifyable != 0) {
@@ -863,7 +863,7 @@ void ManagementAgentImpl::handleMethodRe
             inCallback = false;
         }
     } else {
-        invokeMethodRequest(body, cid, replyTo, userId);
+        invokeMethodRequest(body, cid, rte, rtk, userId);
     }
 
     QPID_LOG(trace, "RCVD MethodRequest");
@@ -871,10 +871,12 @@ void ManagementAgentImpl::handleMethodRe
 
 void ManagementAgentImpl::received(Message& msg)
 {
+    string   replyToExchange;
     string   replyToKey;
     framing::MessageProperties mp = msg.getMessageProperties();
     if (mp.hasReplyTo()) {
         const framing::ReplyTo& rt = mp.getReplyTo();
+        replyToExchange = rt.getExchange();
         replyToKey = rt.getRoutingKey();
     }
 
@@ -887,9 +889,9 @@ void ManagementAgentImpl::received(Messa
         string opcode = mp.getApplicationHeaders().getAsString("qmf.opcode");
         string cid = msg.getMessageProperties().getCorrelationId();
 
-        if      (opcode == "_agent_locate_request") handleLocateRequest(msg.getData(), cid, replyToKey);
-        else if (opcode == "_method_request")       handleMethodRequest(msg.getData(), cid, replyToKey, userId);
-        else if (opcode == "_query_request")        handleGetQuery(msg.getData(), cid, replyToKey);
+        if      (opcode == "_agent_locate_request") handleLocateRequest(msg.getData(), cid, replyToExchange, replyToKey);
+        else if (opcode == "_method_request")       handleMethodRequest(msg.getData(), cid, replyToExchange, replyToKey, userId);
+        else if (opcode == "_query_request")        handleGetQuery(msg.getData(), cid, replyToExchange, replyToKey);
         else {
             QPID_LOG(warning, "Support for QMF V2 Opcode [" << opcode << "] TBD!!!");
         }
@@ -906,7 +908,7 @@ void ManagementAgentImpl::received(Messa
 
     if (checkHeader(inBuffer, &opcode, &sequence))
     {
-        if      (opcode == 'S') handleSchemaRequest(inBuffer, sequence, replyToKey);
+        if      (opcode == 'S') handleSchemaRequest(inBuffer, sequence, replyToExchange, replyToKey);
         else if (opcode == 'x') handleConsoleAddedIndication();
         else
             QPID_LOG(warning, "Ignoring old-format QMF Request! opcode=" << char(opcode));

Modified: qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h?rev=1066557&r1=1066556&r2=1066557&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h Wed Feb  2 18:11:07 2011
@@ -128,11 +128,12 @@ class ManagementAgentImpl : public Manag
     };
 
     struct QueuedMethod {
-    QueuedMethod(const std::string& _cid, const std::string& _reply, const std::string& _body, const std::string& _uid) :
-        cid(_cid), replyTo(_reply), body(_body), userId(_uid) {}
+    QueuedMethod(const std::string& _cid, const std::string& _rte, const std::string& _rtk, const std::string& _body, const std::string& _uid) :
+        cid(_cid), replyToExchange(_rte), replyToKey(_rtk), body(_body), userId(_uid) {}
 
         std::string cid;
-        std::string replyTo;
+        std::string replyToExchange;
+        std::string replyToKey;
         std::string body;
         std::string userId;
     };
@@ -278,16 +279,16 @@ class ManagementAgentImpl : public Manag
                                                 uint8_t type=ManagementItem::CLASS_KIND_TABLE);
     bool checkHeader  (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
     void sendHeartbeat();
-    void sendException(const std::string& replyToKey, const std::string& cid,
+    void sendException(const std::string& replyToExchange, const std::string& replyToKey, const std::string& cid,
                        const std::string& text, uint32_t code=1);
     void handlePackageRequest (qpid::framing::Buffer& inBuffer);
     void handleClassQuery     (qpid::framing::Buffer& inBuffer);
-    void handleSchemaRequest  (qpid::framing::Buffer& inBuffer, uint32_t sequence, const std::string& replyTo);
-    void invokeMethodRequest  (const std::string& body, const std::string& cid, const std::string& replyTo, const std::string& userId);
+    void handleSchemaRequest  (qpid::framing::Buffer& inBuffer, uint32_t sequence, const std::string& rte, const std::string& rtk);
+    void invokeMethodRequest  (const std::string& body, const std::string& cid, const std::string& rte, const std::string& rtk, const std::string& userId);
 
-    void handleGetQuery       (const std::string& body, const std::string& cid, const std::string& replyTo);
-    void handleLocateRequest  (const std::string& body, const std::string& sequence, const std::string& replyTo);
-    void handleMethodRequest  (const std::string& body, const std::string& sequence, const std::string& replyTo, const std::string& userId);
+    void handleGetQuery       (const std::string& body, const std::string& cid, const std::string& rte, const std::string& rtk);
+    void handleLocateRequest  (const std::string& body, const std::string& sequence, const std::string& rte, const std::string& rtk);
+    void handleMethodRequest  (const std::string& body, const std::string& sequence, const std::string& rte, const std::string& rtk, const std::string& userId);
     void handleConsoleAddedIndication();
     void getHeartbeatContent  (qpid::types::Variant::Map& map);
 };

Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=1066557&r1=1066556&r2=1066557&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp Wed Feb  2 18:11:07 2011
@@ -503,7 +503,7 @@ bool ManagementAgent::checkHeader (Buffe
 void ManagementAgent::sendBufferLH(Buffer&  buf,
                                    uint32_t length,
                                    qpid::broker::Exchange::shared_ptr exchange,
-                                   string   routingKey)
+                                   const string&  routingKey)
 {
     if (suppressed) {
         QPID_LOG(debug, "Suppressing management message to " << routingKey);
@@ -548,6 +548,17 @@ void ManagementAgent::sendBufferLH(Buffe
 }
 
 
+void ManagementAgent::sendBufferLH(Buffer&  buf,
+                                   uint32_t length,
+                                   const string& exchange,
+                                   const string& routingKey)
+{
+    qpid::broker::Exchange::shared_ptr ex(broker->getExchanges().get(exchange));
+    if (ex.get() != 0)
+        sendBufferLH(buf, length, ex, routingKey);
+}
+
+
 // NOTE WELL: assumes userLock is held by caller (LH)
 // NOTE EVEN WELLER: drops this lock when delivering the message!!!
 void ManagementAgent::sendBufferLH(const string& data,
@@ -612,6 +623,20 @@ void ManagementAgent::sendBufferLH(const
 }
 
 
+void ManagementAgent::sendBufferLH(const string& data,
+                                   const string& cid,
+                                   const Variant::Map& headers,
+                                   const string& content_type,
+                                   const string& exchange,
+                                   const string& routingKey,
+                                   uint64_t ttl_msec)
+{
+    qpid::broker::Exchange::shared_ptr ex(broker->getExchanges().get(exchange));
+    if (ex.get() != 0)
+        sendBufferLH(data, cid, headers, content_type, ex, routingKey, ttl_msec);
+}
+
+
 /** Objects that have been added since the last periodic poll are temporarily
  * saved in the newManagementObjects list.  This allows objects to be
  * added without needing to block on the userLock (addLock is used instead).
@@ -1106,7 +1131,7 @@ void ManagementAgent::sendCommandComplet
              replyToKey << " seq=" << sequence);
 }
 
-void ManagementAgent::sendExceptionLH(const string& replyToKey, const string& cid,
+void ManagementAgent::sendExceptionLH(const string& rte, const string& rtk, const string& cid,
                                       const string& text, uint32_t code, bool viaLocal)
 {
     static const string addr_exchange("qmf.default.direct");
@@ -1125,7 +1150,7 @@ void ManagementAgent::sendExceptionLH(co
     map["_values"] = values;
 
     MapCodec::encode(map, content);
-    sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyToKey);
+    sendBufferLH(content, cid, headers, "amqp/map", rte, rtk);
 
     QPID_LOG(debug, "SENT Exception code=" << code <<" text=" << text);
 }
@@ -1295,7 +1320,7 @@ void ManagementAgent::handleMethodReques
 }
 
 
-void ManagementAgent::handleMethodRequestLH (const string& body, const string& replyTo,
+void ManagementAgent::handleMethodRequestLH (const string& body, const string& rte, const string& rtk,
                                              const string& cid, const ConnectionToken* connToken, bool viaLocal)
 {
     moveNewObjectsLH();
@@ -1317,7 +1342,7 @@ void ManagementAgent::handleMethodReques
 
     if ((oid = inMap.find("_object_id")) == inMap.end() ||
         (mid = inMap.find("_method_name")) == inMap.end()) {
-        sendExceptionLH(replyTo, cid, Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID),
+        sendExceptionLH(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID),
                         Manageable::STATUS_PARAMETER_INVALID, viaLocal);
         return;
     }
@@ -1336,7 +1361,7 @@ void ManagementAgent::handleMethodReques
             inArgs = (mid->second).asMap();
         }
     } catch(exception& e) {
-        sendExceptionLH(replyTo, cid, e.what(), Manageable::STATUS_EXCEPTION, viaLocal);
+        sendExceptionLH(rte, rtk, cid, e.what(), Manageable::STATUS_EXCEPTION, viaLocal);
         return;
     }
 
@@ -1345,7 +1370,7 @@ void ManagementAgent::handleMethodReques
     if (iter == managementObjects.end() || iter->second->isDeleted()) {
         stringstream estr;
         estr << "No object found with ID=" << objId;
-        sendExceptionLH(replyTo, cid, estr.str(), 1, viaLocal);
+        sendExceptionLH(rte, rtk, cid, estr.str(), 1, viaLocal);
         return;
     }
 
@@ -1355,7 +1380,7 @@ void ManagementAgent::handleMethodReques
 
     i = disallowed.find(make_pair(iter->second->getClassName(), methodName));
     if (i != disallowed.end()) {
-        sendExceptionLH(replyTo, cid, i->second, Manageable::STATUS_FORBIDDEN, viaLocal);
+        sendExceptionLH(rte, rtk, cid, i->second, Manageable::STATUS_FORBIDDEN, viaLocal);
         return;
     }
 
@@ -1366,7 +1391,7 @@ void ManagementAgent::handleMethodReques
         params[acl::PROP_SCHEMACLASS]   = iter->second->getClassName();
 
         if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_METHOD, methodName, &params)) {
-            sendExceptionLH(replyTo, cid, Manageable::StatusText(Manageable::STATUS_FORBIDDEN),
+            sendExceptionLH(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_FORBIDDEN),
                             Manageable::STATUS_FORBIDDEN, viaLocal);
             return;
         }
@@ -1376,7 +1401,7 @@ void ManagementAgent::handleMethodReques
 
     QPID_LOG(debug, "RECV MethodRequest (v2) class=" << iter->second->getPackageName()
              << ":" << iter->second->getClassName() << " method=" <<
-             methodName << " replyTo=" << replyTo << " objId=" << objId << " inArgs=" << inArgs);
+             methodName << " replyTo=" << rte << "/" << rtk << " objId=" << objId << " inArgs=" << inArgs);
 
     try {
         sys::Mutex::ScopedUnlock u(userLock);
@@ -1391,18 +1416,18 @@ void ManagementAgent::handleMethodReques
         } else
             error = callMap["_status_text"].asString();
     } catch(exception& e) {
-        sendExceptionLH(replyTo, cid, e.what(), Manageable::STATUS_EXCEPTION, viaLocal);
+        sendExceptionLH(rte, rtk, cid, e.what(), Manageable::STATUS_EXCEPTION, viaLocal);
         return;
     }
 
     if (errorCode != 0) {
-        sendExceptionLH(replyTo, cid, error, errorCode, viaLocal);
+        sendExceptionLH(rte, rtk, cid, error, errorCode, viaLocal);
         return;
     }
 
     MapCodec::encode(outMap, content);
-    sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyTo);
-    QPID_LOG(debug, "SEND MethodResponse (v2) to=" << replyTo << " seq=" << cid << " map=" << outMap);
+    sendBufferLH(content, cid, headers, "amqp/map", rte, rtk);
+    QPID_LOG(debug, "SEND MethodResponse (v2) to=" << rte << "/" << rtk << " seq=" << cid << " map=" << outMap);
 }
 
 
@@ -1549,7 +1574,7 @@ void ManagementAgent::SchemaClass::appen
         buf.putRawData(reinterpret_cast<uint8_t*>(&data[0]), data.size());
 }
 
-void ManagementAgent::handleSchemaRequestLH(Buffer& inBuffer, const string& replyToKey, uint32_t sequence)
+void ManagementAgent::handleSchemaRequestLH(Buffer& inBuffer, const string& rte, const string& rtk, uint32_t sequence)
 {
     string         packageName;
     SchemaClassKey key;
@@ -1558,7 +1583,7 @@ void ManagementAgent::handleSchemaReques
     key.decode(inBuffer);
 
     QPID_LOG(debug, "RECV SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) <<
-             "), replyTo=" << replyToKey << " seq=" << sequence);
+             "), replyTo=" << rte << "/" << rtk << " seq=" << sequence);
 
     PackageMap::iterator pIter = packages.find(packageName);
     if (pIter != packages.end()) {
@@ -1574,17 +1599,17 @@ void ManagementAgent::handleSchemaReques
                 classInfo.appendSchema(outBuffer);
                 outLen = MA_BUFFER_SIZE - outBuffer.available();
                 outBuffer.reset();
-                sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
-                QPID_LOG(debug, "SEND SchemaResponse to=" << replyToKey << " seq=" << sequence);
+                sendBufferLH(outBuffer, outLen, rte, rtk);
+                QPID_LOG(debug, "SEND SchemaResponse to=" << rte << "/" << rtk << " seq=" << sequence);
             }
             else
-                sendCommandCompleteLH(replyToKey, sequence, 1, "Schema not available");
+                sendCommandCompleteLH(rtk, sequence, 1, "Schema not available");
         }
         else
-            sendCommandCompleteLH(replyToKey, sequence, 1, "Class key not found");
+            sendCommandCompleteLH(rtk, sequence, 1, "Class key not found");
     }
     else
-        sendCommandCompleteLH(replyToKey, sequence, 1, "Package not found");
+        sendCommandCompleteLH(rtk, sequence, 1, "Package not found");
 }
 
 void ManagementAgent::handleSchemaResponseLH(Buffer& inBuffer, const string& /*replyToKey*/, uint32_t sequence)
@@ -1844,7 +1869,7 @@ void ManagementAgent::handleGetQueryLH(B
 }
 
 
-void ManagementAgent::handleGetQueryLH(const string& body, const string& replyTo, const string& cid, bool viaLocal)
+void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, const string& rtk, const string& cid, bool viaLocal)
 {
     moveNewObjectsLH();
 
@@ -1865,17 +1890,17 @@ void ManagementAgent::handleGetQueryLH(c
      */
     i = inMap.find("_what");
     if (i == inMap.end()) {
-        sendExceptionLH(replyTo, cid, "_what element missing in Query");
+        sendExceptionLH(rte, rtk, cid, "_what element missing in Query");
         return;
     }
 
     if (i->second.getType() != qpid::types::VAR_STRING) {
-        sendExceptionLH(replyTo, cid, "_what element is not a string");
+        sendExceptionLH(rte, rtk, cid, "_what element is not a string");
         return;
     }
 
     if (i->second.asString() != "OBJECT") {
-        sendExceptionLH(replyTo, cid, "Query for _what => '" + i->second.asString() + "' not supported");
+        sendExceptionLH(rte, rtk, cid, "Query for _what => '" + i->second.asString() + "' not supported");
         return;
     }
 
@@ -1934,8 +1959,8 @@ void ManagementAgent::handleGetQueryLH(c
             string content;
 
             ListCodec::encode(list_, content);
-            sendBufferLH(content, cid, headers, "amqp/list", v2Direct, replyTo);
-            QPID_LOG(debug, "SENT QueryResponse (query by object_id) to=" << replyTo);
+            sendBufferLH(content, cid, headers, "amqp/list", rte, rtk);
+            QPID_LOG(debug, "SENT QueryResponse (query by object_id) to=" << rte << "/" << rtk);
             return;
         }
     } else {
@@ -1987,27 +2012,26 @@ void ManagementAgent::handleGetQueryLH(c
         string content;
         while (_list.size() > 1) {
             ListCodec::encode(_list.front().asList(), content);
-            sendBufferLH(content, cid, headers, "amqp/list", v2Direct, replyTo);
+            sendBufferLH(content, cid, headers, "amqp/list", rte, rtk);
             _list.pop_front();
-            QPID_LOG(debug, "SENT QueryResponse (partial, query by schema_id) to=" << replyTo << " len=" << content.length());
+            QPID_LOG(debug, "SENT QueryResponse (partial, query by schema_id) to=" << rte << "/" << rtk << " len=" << content.length());
         }
         headers.erase("partial");
         ListCodec::encode(_list.size() ? _list.front().asList() : Variant::List(), content);
-        sendBufferLH(content, cid, headers, "amqp/list", v2Direct, replyTo);
-        QPID_LOG(debug, "SENT QueryResponse (query by schema_id) to=" << replyTo << " len=" << content.length());
+        sendBufferLH(content, cid, headers, "amqp/list", rte, rtk);
+        QPID_LOG(debug, "SENT QueryResponse (query by schema_id) to=" << rte << "/" << rtk << " len=" << content.length());
         return;
     }
 
     // Unrecognized query - Send empty message to indicate CommandComplete
     string content;
     ListCodec::encode(Variant::List(), content);
-    sendBufferLH(content, cid, headers, "amqp/list", v2Direct, replyTo);
-    QPID_LOG(debug, "SENT QueryResponse (empty) to=" << replyTo);
+    sendBufferLH(content, cid, headers, "amqp/list", rte, rtk);
+    QPID_LOG(debug, "SENT QueryResponse (empty) to=" << rte << "/" << rtk);
 }
 
 
-void ManagementAgent::handleLocateRequestLH(const string&, const string& replyTo,
-                                            const string& cid)
+void ManagementAgent::handleLocateRequestLH(const string&, const string& rte, const string& rtk, const string& cid)
 {
     QPID_LOG(debug, "RCVD AgentLocateRequest");
 
@@ -2025,10 +2049,10 @@ void ManagementAgent::handleLocateReques
 
     string content;
     MapCodec::encode(map, content);
-    sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyTo);
+    sendBufferLH(content, cid, headers, "amqp/map", rte, rtk);
     clientWasAdded = true;
 
-    QPID_LOG(debug, "SENT AgentLocateResponse replyTo=" << replyTo);
+    QPID_LOG(debug, "SENT AgentLocateResponse replyTo=" << rte << "/" << rtk);
 }
 
 
@@ -2155,13 +2179,14 @@ bool ManagementAgent::authorizeAgentMess
             msg.getFrames().getHeaders()->get<framing::MessageProperties>();
         if (p && p->hasReplyTo()) {
             const framing::ReplyTo& rt = p->getReplyTo();
-            string replyToKey = rt.getRoutingKey();
+            string rte = rt.getExchange();
+            string rtk = rt.getRoutingKey();
             string cid;
             if (p && p->hasCorrelationId())
                 cid = p->getCorrelationId();
 
             if (mapMsg) {
-                sendExceptionLH(replyToKey, cid, Manageable::StatusText(Manageable::STATUS_FORBIDDEN),
+                sendExceptionLH(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_FORBIDDEN),
                                 Manageable::STATUS_FORBIDDEN, false);
             } else {
 
@@ -2173,7 +2198,7 @@ bool ManagementAgent::authorizeAgentMess
                 outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN));
                 outLen = MA_BUFFER_SIZE - outBuffer.available();
                 outBuffer.reset();
-                sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
+                sendBufferLH(outBuffer, outLen, rte, rtk);
             }
 
             QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence);
@@ -2187,12 +2212,14 @@ bool ManagementAgent::authorizeAgentMess
 
 void ManagementAgent::dispatchAgentCommandLH(Message& msg, bool viaLocal)
 {
-    string   replyToKey;
+    string   rte;
+    string   rtk;
     const framing::MessageProperties* p =
         msg.getFrames().getHeaders()->get<framing::MessageProperties>();
     if (p && p->hasReplyTo()) {
         const framing::ReplyTo& rt = p->getReplyTo();
-        replyToKey = rt.getRoutingKey();
+        rte = rt.getExchange();
+        rtk = rt.getRoutingKey();
     }
     else
         return;
@@ -2224,11 +2251,11 @@ void ManagementAgent::dispatchAgentComma
         }
 
         if (opcode == "_method_request")
-            return handleMethodRequestLH(body, replyToKey, cid, msg.getPublisher(), viaLocal);
+            return handleMethodRequestLH(body, rte, rtk, cid, msg.getPublisher(), viaLocal);
         else if (opcode == "_query_request")
-            return handleGetQueryLH(body, replyToKey, cid, viaLocal);
+            return handleGetQueryLH(body, rte, rtk, cid, viaLocal);
         else if (opcode == "_agent_locate_request")
-            return handleLocateRequestLH(body, replyToKey, cid);
+            return handleLocateRequestLH(body, rte, rtk, cid);
 
         QPID_LOG(warning, "Support for QMF Opcode [" << opcode << "] TBD!!!");
         return;
@@ -2241,16 +2268,16 @@ void ManagementAgent::dispatchAgentComma
         if (!checkHeader(inBuffer, &opcode, &sequence))
             return;
 
-        if      (opcode == 'B') handleBrokerRequestLH  (inBuffer, replyToKey, sequence);
-        else if (opcode == 'P') handlePackageQueryLH   (inBuffer, replyToKey, sequence);
-        else if (opcode == 'p') handlePackageIndLH     (inBuffer, replyToKey, sequence);
-        else if (opcode == 'Q') handleClassQueryLH     (inBuffer, replyToKey, sequence);
-        else if (opcode == 'q') handleClassIndLH       (inBuffer, replyToKey, sequence);
-        else if (opcode == 'S') handleSchemaRequestLH  (inBuffer, replyToKey, sequence);
-        else if (opcode == 's') handleSchemaResponseLH (inBuffer, replyToKey, sequence);
-        else if (opcode == 'A') handleAttachRequestLH  (inBuffer, replyToKey, sequence, msg.getPublisher());
-        else if (opcode == 'G') handleGetQueryLH       (inBuffer, replyToKey, sequence);
-        else if (opcode == 'M') handleMethodRequestLH  (inBuffer, replyToKey, sequence, msg.getPublisher());
+        if      (opcode == 'B') handleBrokerRequestLH  (inBuffer, rtk, sequence);
+        else if (opcode == 'P') handlePackageQueryLH   (inBuffer, rtk, sequence);
+        else if (opcode == 'p') handlePackageIndLH     (inBuffer, rtk, sequence);
+        else if (opcode == 'Q') handleClassQueryLH     (inBuffer, rtk, sequence);
+        else if (opcode == 'q') handleClassIndLH       (inBuffer, rtk, sequence);
+        else if (opcode == 'S') handleSchemaRequestLH  (inBuffer, rte, rtk, sequence);
+        else if (opcode == 's') handleSchemaResponseLH (inBuffer, rtk, sequence);
+        else if (opcode == 'A') handleAttachRequestLH  (inBuffer, rtk, sequence, msg.getPublisher());
+        else if (opcode == 'G') handleGetQueryLH       (inBuffer, rtk, sequence);
+        else if (opcode == 'M') handleMethodRequestLH  (inBuffer, rtk, sequence, msg.getPublisher());
     }
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h?rev=1066557&r1=1066556&r2=1066557&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h Wed Feb  2 18:11:07 2011
@@ -354,7 +354,11 @@ private:
     void sendBufferLH(framing::Buffer&             buf,
                       uint32_t                     length,
                       qpid::broker::Exchange::shared_ptr exchange,
-                      std::string                  routingKey);
+                      const std::string&           routingKey);
+    void sendBufferLH(framing::Buffer&             buf,
+                      uint32_t                     length,
+                      const std::string&           exchange,
+                      const std::string&           routingKey);
     void sendBufferLH(const std::string&     data,
                       const std::string&     cid,
                       const qpid::types::Variant::Map& headers,
@@ -362,6 +366,13 @@ private:
                       qpid::broker::Exchange::shared_ptr exchange,
                       const std::string& routingKey,
                       uint64_t ttl_msec = 0);
+    void sendBufferLH(const std::string& data,
+                      const std::string& cid,
+                      const qpid::types::Variant::Map& headers,
+                      const std::string& content_type,
+                      const std::string& exchange,
+                      const std::string& routingKey,
+                      uint64_t ttl_msec = 0);
     void moveNewObjectsLH();
     bool moveDeletedObjectsLH();
 
@@ -386,20 +397,20 @@ private:
     void deleteOrphanedAgentsLH();
     void sendCommandCompleteLH(const std::string& replyToKey, uint32_t sequence,
                               uint32_t code = 0, const std::string& text = "OK");
-    void sendExceptionLH(const std::string& replyToKey, const std::string& cid, const std::string& text, uint32_t code=1, bool viaLocal=false);
+    void sendExceptionLH(const std::string& rte, const std::string& rtk, const std::string& cid, const std::string& text, uint32_t code=1, bool viaLocal=false);
     void handleBrokerRequestLH  (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
     void handlePackageQueryLH   (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
     void handlePackageIndLH     (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
     void handleClassQueryLH     (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
     void handleClassIndLH       (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
-    void handleSchemaRequestLH  (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
+    void handleSchemaRequestLH  (framing::Buffer& inBuffer, const std::string& replyToEx, const std::string& replyToKey, uint32_t sequence);
     void handleSchemaResponseLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
     void handleAttachRequestLH  (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken);
     void handleGetQueryLH       (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
     void handleMethodRequestLH  (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken);
-    void handleGetQueryLH       (const std::string& body, const std::string& replyToKey, const std::string& cid, bool viaLocal);
-    void handleMethodRequestLH  (const std::string& body, const std::string& replyToKey, const std::string& cid, const qpid::broker::ConnectionToken* connToken, bool viaLocal);
-    void handleLocateRequestLH  (const std::string& body, const std::string &replyToKey, const std::string& cid);
+    void handleGetQueryLH       (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, bool viaLocal);
+    void handleMethodRequestLH  (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, const qpid::broker::ConnectionToken* connToken, bool viaLocal);
+    void handleLocateRequestLH  (const std::string& body, const std::string& replyToEx, const std::string &replyToKey, const std::string& cid);
 
 
     size_t validateSchema(framing::Buffer&, uint8_t kind);



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org