You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2011/02/02 19:11:08 UTC
svn commit: r1066557 - in /qpid/trunk/qpid/cpp/src/qpid:
agent/ManagementAgentImpl.cpp agent/ManagementAgentImpl.h
management/ManagementAgent.cpp management/ManagementAgent.h
Author: tross
Date: Wed Feb 2 18:11:07 2011
New Revision: 1066557
URL: http://svn.apache.org/viewvc?rev=1066557&view=rev
Log:
QPID-3032 - Bug-fix: The broker management agent and the remote C++ agent hard-coded the
assumed value of the "exchange" component of reply-to headers to either "amq.direct" or
"qmf.default.direct", depending on the circumstance.
This commit fixes this such that message replies are sent to the exchange/key pair supplied
in the reply-to header.
Modified:
qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h
Modified: qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp?rev=1066557&r1=1066556&r2=1066557&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp Wed Feb 2 18:11:07 2011
@@ -362,7 +362,7 @@ uint32_t ManagementAgentImpl::pollCallba
methodQueue.pop_front();
{
sys::Mutex::ScopedUnlock unlock(agentLock);
- invokeMethodRequest(item->body, item->cid, item->replyTo, item->userId);
+ invokeMethodRequest(item->body, item->cid, item->replyToExchange, item->replyToKey, item->userId);
delete item;
}
}
@@ -497,7 +497,7 @@ void ManagementAgentImpl::sendHeartbeat(
QPID_LOG(trace, "SENT AgentHeartbeat name=" << name_address);
}
-void ManagementAgentImpl::sendException(const string& replyToKey, const string& cid,
+void ManagementAgentImpl::sendException(const string& rte, const string& rtk, const string& cid,
const string& text, uint32_t code)
{
Variant::Map map;
@@ -514,12 +514,12 @@ void ManagementAgentImpl::sendException(
map["_values"] = values;
MapCodec::encode(map, content);
- connThreadBody.sendBuffer(content, cid, headers, directExchange, replyToKey);
+ connThreadBody.sendBuffer(content, cid, headers, rte, rtk);
QPID_LOG(trace, "SENT Exception code=" << code <<" text=" << text);
}
-void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, const string& replyTo)
+void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, const string& rte, const string& rtk)
{
sys::Mutex::ScopedLock lock(agentLock);
string packageName;
@@ -546,7 +546,7 @@ void ManagementAgentImpl::handleSchemaRe
outBuffer.putRawData(body);
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
- connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyTo);
+ connThreadBody.sendBuffer(outBuffer, outLen, rte, rtk);
QPID_LOG(trace, "SENT SchemaInd: package=" << packageName << " class=" << key.name);
}
@@ -561,7 +561,7 @@ void ManagementAgentImpl::handleConsoleA
QPID_LOG(trace, "RCVD ConsoleAddedInd");
}
-void ManagementAgentImpl::invokeMethodRequest(const string& body, const string& cid, const string& replyTo, const string& userId)
+void ManagementAgentImpl::invokeMethodRequest(const string& body, const string& cid, const string& rte, const string& rtk, const string& userId)
{
string methodName;
bool failed = false;
@@ -572,11 +572,9 @@ void ManagementAgentImpl::invokeMethodRe
MapCodec::decode(body, inMap);
- outMap["_values"] = Variant::Map();
-
if ((oid = inMap.find("_object_id")) == inMap.end() ||
(mid = inMap.find("_method_name")) == inMap.end()) {
- sendException(replyTo, cid, Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID),
+ sendException(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID),
Manageable::STATUS_PARAMETER_INVALID);
failed = true;
} else {
@@ -595,6 +593,8 @@ void ManagementAgentImpl::invokeMethodRe
inArgs = (mid->second).asMap();
}
+ QPID_LOG(trace, "Invoking Method: name=" << methodName << " args=" << inArgs);
+
boost::shared_ptr<ManagementObject> oPtr;
{
sys::Mutex::ScopedLock lock(agentLock);
@@ -604,7 +604,7 @@ void ManagementAgentImpl::invokeMethodRe
}
if (oPtr.get() == 0) {
- sendException(replyTo, cid, Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT),
+ sendException(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT),
Manageable::STATUS_UNKNOWN_OBJECT);
failed = true;
} else {
@@ -617,13 +617,13 @@ void ManagementAgentImpl::invokeMethodRe
if (iter->first != "_status_code" && iter->first != "_status_text")
outMap["_arguments"].asMap()[iter->first] = iter->second;
} else {
- sendException(replyTo, cid, callMap["_status_text"], callMap["_status_code"]);
+ sendException(rte, rtk, cid, callMap["_status_text"], callMap["_status_code"]);
failed = true;
}
}
} catch(types::InvalidConversion& e) {
- sendException(replyTo, cid, e.what(), Manageable::STATUS_EXCEPTION);
+ sendException(rte, rtk, cid, e.what(), Manageable::STATUS_EXCEPTION);
failed = true;
}
}
@@ -635,11 +635,11 @@ void ManagementAgentImpl::invokeMethodRe
headers["qmf.opcode"] = "_method_response";
QPID_LOG(trace, "SENT MethodResponse map=" << outMap);
MapCodec::encode(outMap, content);
- connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo);
+ connThreadBody.sendBuffer(content, cid, headers, rte, rtk);
}
}
-void ManagementAgentImpl::handleGetQuery(const string& body, const string& cid, const string& replyTo)
+void ManagementAgentImpl::handleGetQuery(const string& body, const string& cid, const string& rte, const string& rtk)
{
moveNewObjectsLH();
@@ -666,12 +666,12 @@ void ManagementAgentImpl::handleGetQuery
*/
i = inMap.find("_what");
if (i == inMap.end()) {
- sendException(replyTo, cid, "_what element missing in Query");
+ sendException(rte, rtk, cid, "_what element missing in Query");
return;
}
if (i->second.getType() != qpid::types::VAR_STRING) {
- sendException(replyTo, cid, "_what element is not a string");
+ sendException(rte, rtk, cid, "_what element is not a string");
return;
}
@@ -709,8 +709,8 @@ void ManagementAgentImpl::handleGetQuery
headers.erase("partial");
ListCodec::encode(list_, content);
- connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo, "amqp/list");
- QPID_LOG(trace, "SENT QueryResponse (query by object_id) to=" << replyTo);
+ connThreadBody.sendBuffer(content, cid, headers, rte, rtk, "amqp/list");
+ QPID_LOG(trace, "SENT QueryResponse (query by object_id) to=" << rte << "/" << rtk);
return;
}
} else { // match using schema_id, if supplied
@@ -771,8 +771,8 @@ void ManagementAgentImpl::handleGetQuery
if (++objCount >= maxV2ReplyObjs) {
objCount = 0;
ListCodec::encode(list_, content);
- connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo, "amqp/list");
- QPID_LOG(trace, "SENT QueryResponse (query by schema_id) to=" << replyTo);
+ connThreadBody.sendBuffer(content, cid, headers, rte, rtk, "amqp/list");
+ QPID_LOG(trace, "SENT QueryResponse (query by schema_id) to=" << rte << "/" << rtk);
content.clear();
list_.clear();
}
@@ -784,8 +784,8 @@ void ManagementAgentImpl::handleGetQuery
// Send last "non-partial" message to indicate CommandComplete
headers.erase("partial");
ListCodec::encode(list_, content);
- connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo, "amqp/list");
- QPID_LOG(trace, "SENT QueryResponse (empty with no 'partial' indicator) to=" << replyTo);
+ connThreadBody.sendBuffer(content, cid, headers, rte, rtk, "amqp/list");
+ QPID_LOG(trace, "SENT QueryResponse (last message, no 'partial' indicator) to=" << rte << "/" << rtk);
} else if (i->second.asString() == "SCHEMA_ID") {
headers["qmf.content"] = "_schema_id";
@@ -806,16 +806,16 @@ void ManagementAgentImpl::handleGetQuery
headers.erase("partial");
ListCodec::encode(list_, content);
- connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo, "amqp/list");
- QPID_LOG(trace, "SENT QueryResponse (SchemaId) to=" << replyTo);
+ connThreadBody.sendBuffer(content, cid, headers, rte, rtk, "amqp/list");
+ QPID_LOG(trace, "SENT QueryResponse (SchemaId) to=" << rte << "/" << rtk);
} else {
// Unknown query target
- sendException(replyTo, cid, "Query for _what => '" + i->second.asString() + "' not supported");
+ sendException(rte, rtk, cid, "Query for _what => '" + i->second.asString() + "' not supported");
}
}
-void ManagementAgentImpl::handleLocateRequest(const string&, const string& cid, const string& replyTo)
+void ManagementAgentImpl::handleLocateRequest(const string&, const string& cid, const string& rte, const string& rtk)
{
QPID_LOG(trace, "RCVD AgentLocateRequest");
@@ -829,9 +829,9 @@ void ManagementAgentImpl::handleLocateRe
getHeartbeatContent(map);
MapCodec::encode(map, content);
- connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo);
+ connThreadBody.sendBuffer(content, cid, headers, rte, rtk);
- QPID_LOG(trace, "SENT AgentLocateResponse replyTo=" << replyTo);
+ QPID_LOG(trace, "SENT AgentLocateResponse replyTo=" << rte << "/" << rtk);
{
sys::Mutex::ScopedLock lock(agentLock);
@@ -839,12 +839,12 @@ void ManagementAgentImpl::handleLocateRe
}
}
-void ManagementAgentImpl::handleMethodRequest(const string& body, const string& cid, const string& replyTo, const string& userId)
+void ManagementAgentImpl::handleMethodRequest(const string& body, const string& cid, const string& rte, const string& rtk, const string& userId)
{
if (extThread) {
sys::Mutex::ScopedLock lock(agentLock);
- methodQueue.push_back(new QueuedMethod(cid, replyTo, body, userId));
+ methodQueue.push_back(new QueuedMethod(cid, rte, rtk, body, userId));
if (pipeHandle != 0) {
pipeHandle->write("X", 1);
} else if (notifyable != 0) {
@@ -863,7 +863,7 @@ void ManagementAgentImpl::handleMethodRe
inCallback = false;
}
} else {
- invokeMethodRequest(body, cid, replyTo, userId);
+ invokeMethodRequest(body, cid, rte, rtk, userId);
}
QPID_LOG(trace, "RCVD MethodRequest");
@@ -871,10 +871,12 @@ void ManagementAgentImpl::handleMethodRe
void ManagementAgentImpl::received(Message& msg)
{
+ string replyToExchange;
string replyToKey;
framing::MessageProperties mp = msg.getMessageProperties();
if (mp.hasReplyTo()) {
const framing::ReplyTo& rt = mp.getReplyTo();
+ replyToExchange = rt.getExchange();
replyToKey = rt.getRoutingKey();
}
@@ -887,9 +889,9 @@ void ManagementAgentImpl::received(Messa
string opcode = mp.getApplicationHeaders().getAsString("qmf.opcode");
string cid = msg.getMessageProperties().getCorrelationId();
- if (opcode == "_agent_locate_request") handleLocateRequest(msg.getData(), cid, replyToKey);
- else if (opcode == "_method_request") handleMethodRequest(msg.getData(), cid, replyToKey, userId);
- else if (opcode == "_query_request") handleGetQuery(msg.getData(), cid, replyToKey);
+ if (opcode == "_agent_locate_request") handleLocateRequest(msg.getData(), cid, replyToExchange, replyToKey);
+ else if (opcode == "_method_request") handleMethodRequest(msg.getData(), cid, replyToExchange, replyToKey, userId);
+ else if (opcode == "_query_request") handleGetQuery(msg.getData(), cid, replyToExchange, replyToKey);
else {
QPID_LOG(warning, "Support for QMF V2 Opcode [" << opcode << "] TBD!!!");
}
@@ -906,7 +908,7 @@ void ManagementAgentImpl::received(Messa
if (checkHeader(inBuffer, &opcode, &sequence))
{
- if (opcode == 'S') handleSchemaRequest(inBuffer, sequence, replyToKey);
+ if (opcode == 'S') handleSchemaRequest(inBuffer, sequence, replyToExchange, replyToKey);
else if (opcode == 'x') handleConsoleAddedIndication();
else
QPID_LOG(warning, "Ignoring old-format QMF Request! opcode=" << char(opcode));
Modified: qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h?rev=1066557&r1=1066556&r2=1066557&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h Wed Feb 2 18:11:07 2011
@@ -128,11 +128,12 @@ class ManagementAgentImpl : public Manag
};
struct QueuedMethod {
- QueuedMethod(const std::string& _cid, const std::string& _reply, const std::string& _body, const std::string& _uid) :
- cid(_cid), replyTo(_reply), body(_body), userId(_uid) {}
+ QueuedMethod(const std::string& _cid, const std::string& _rte, const std::string& _rtk, const std::string& _body, const std::string& _uid) :
+ cid(_cid), replyToExchange(_rte), replyToKey(_rtk), body(_body), userId(_uid) {}
std::string cid;
- std::string replyTo;
+ std::string replyToExchange;
+ std::string replyToKey;
std::string body;
std::string userId;
};
@@ -278,16 +279,16 @@ class ManagementAgentImpl : public Manag
uint8_t type=ManagementItem::CLASS_KIND_TABLE);
bool checkHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
void sendHeartbeat();
- void sendException(const std::string& replyToKey, const std::string& cid,
+ void sendException(const std::string& replyToExchange, const std::string& replyToKey, const std::string& cid,
const std::string& text, uint32_t code=1);
void handlePackageRequest (qpid::framing::Buffer& inBuffer);
void handleClassQuery (qpid::framing::Buffer& inBuffer);
- void handleSchemaRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence, const std::string& replyTo);
- void invokeMethodRequest (const std::string& body, const std::string& cid, const std::string& replyTo, const std::string& userId);
+ void handleSchemaRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence, const std::string& rte, const std::string& rtk);
+ void invokeMethodRequest (const std::string& body, const std::string& cid, const std::string& rte, const std::string& rtk, const std::string& userId);
- void handleGetQuery (const std::string& body, const std::string& cid, const std::string& replyTo);
- void handleLocateRequest (const std::string& body, const std::string& sequence, const std::string& replyTo);
- void handleMethodRequest (const std::string& body, const std::string& sequence, const std::string& replyTo, const std::string& userId);
+ void handleGetQuery (const std::string& body, const std::string& cid, const std::string& rte, const std::string& rtk);
+ void handleLocateRequest (const std::string& body, const std::string& sequence, const std::string& rte, const std::string& rtk);
+ void handleMethodRequest (const std::string& body, const std::string& sequence, const std::string& rte, const std::string& rtk, const std::string& userId);
void handleConsoleAddedIndication();
void getHeartbeatContent (qpid::types::Variant::Map& map);
};
Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=1066557&r1=1066556&r2=1066557&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp Wed Feb 2 18:11:07 2011
@@ -503,7 +503,7 @@ bool ManagementAgent::checkHeader (Buffe
void ManagementAgent::sendBufferLH(Buffer& buf,
uint32_t length,
qpid::broker::Exchange::shared_ptr exchange,
- string routingKey)
+ const string& routingKey)
{
if (suppressed) {
QPID_LOG(debug, "Suppressing management message to " << routingKey);
@@ -548,6 +548,17 @@ void ManagementAgent::sendBufferLH(Buffe
}
+void ManagementAgent::sendBufferLH(Buffer& buf,
+ uint32_t length,
+ const string& exchange,
+ const string& routingKey)
+{
+ qpid::broker::Exchange::shared_ptr ex(broker->getExchanges().get(exchange));
+ if (ex.get() != 0)
+ sendBufferLH(buf, length, ex, routingKey);
+}
+
+
// NOTE WELL: assumes userLock is held by caller (LH)
// NOTE EVEN WELLER: drops this lock when delivering the message!!!
void ManagementAgent::sendBufferLH(const string& data,
@@ -612,6 +623,20 @@ void ManagementAgent::sendBufferLH(const
}
+void ManagementAgent::sendBufferLH(const string& data,
+ const string& cid,
+ const Variant::Map& headers,
+ const string& content_type,
+ const string& exchange,
+ const string& routingKey,
+ uint64_t ttl_msec)
+{
+ qpid::broker::Exchange::shared_ptr ex(broker->getExchanges().get(exchange));
+ if (ex.get() != 0)
+ sendBufferLH(data, cid, headers, content_type, ex, routingKey, ttl_msec);
+}
+
+
/** Objects that have been added since the last periodic poll are temporarily
* saved in the newManagementObjects list. This allows objects to be
* added without needing to block on the userLock (addLock is used instead).
@@ -1106,7 +1131,7 @@ void ManagementAgent::sendCommandComplet
replyToKey << " seq=" << sequence);
}
-void ManagementAgent::sendExceptionLH(const string& replyToKey, const string& cid,
+void ManagementAgent::sendExceptionLH(const string& rte, const string& rtk, const string& cid,
const string& text, uint32_t code, bool viaLocal)
{
static const string addr_exchange("qmf.default.direct");
@@ -1125,7 +1150,7 @@ void ManagementAgent::sendExceptionLH(co
map["_values"] = values;
MapCodec::encode(map, content);
- sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyToKey);
+ sendBufferLH(content, cid, headers, "amqp/map", rte, rtk);
QPID_LOG(debug, "SENT Exception code=" << code <<" text=" << text);
}
@@ -1295,7 +1320,7 @@ void ManagementAgent::handleMethodReques
}
-void ManagementAgent::handleMethodRequestLH (const string& body, const string& replyTo,
+void ManagementAgent::handleMethodRequestLH (const string& body, const string& rte, const string& rtk,
const string& cid, const ConnectionToken* connToken, bool viaLocal)
{
moveNewObjectsLH();
@@ -1317,7 +1342,7 @@ void ManagementAgent::handleMethodReques
if ((oid = inMap.find("_object_id")) == inMap.end() ||
(mid = inMap.find("_method_name")) == inMap.end()) {
- sendExceptionLH(replyTo, cid, Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID),
+ sendExceptionLH(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID),
Manageable::STATUS_PARAMETER_INVALID, viaLocal);
return;
}
@@ -1336,7 +1361,7 @@ void ManagementAgent::handleMethodReques
inArgs = (mid->second).asMap();
}
} catch(exception& e) {
- sendExceptionLH(replyTo, cid, e.what(), Manageable::STATUS_EXCEPTION, viaLocal);
+ sendExceptionLH(rte, rtk, cid, e.what(), Manageable::STATUS_EXCEPTION, viaLocal);
return;
}
@@ -1345,7 +1370,7 @@ void ManagementAgent::handleMethodReques
if (iter == managementObjects.end() || iter->second->isDeleted()) {
stringstream estr;
estr << "No object found with ID=" << objId;
- sendExceptionLH(replyTo, cid, estr.str(), 1, viaLocal);
+ sendExceptionLH(rte, rtk, cid, estr.str(), 1, viaLocal);
return;
}
@@ -1355,7 +1380,7 @@ void ManagementAgent::handleMethodReques
i = disallowed.find(make_pair(iter->second->getClassName(), methodName));
if (i != disallowed.end()) {
- sendExceptionLH(replyTo, cid, i->second, Manageable::STATUS_FORBIDDEN, viaLocal);
+ sendExceptionLH(rte, rtk, cid, i->second, Manageable::STATUS_FORBIDDEN, viaLocal);
return;
}
@@ -1366,7 +1391,7 @@ void ManagementAgent::handleMethodReques
params[acl::PROP_SCHEMACLASS] = iter->second->getClassName();
if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_METHOD, methodName, ¶ms)) {
- sendExceptionLH(replyTo, cid, Manageable::StatusText(Manageable::STATUS_FORBIDDEN),
+ sendExceptionLH(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_FORBIDDEN),
Manageable::STATUS_FORBIDDEN, viaLocal);
return;
}
@@ -1376,7 +1401,7 @@ void ManagementAgent::handleMethodReques
QPID_LOG(debug, "RECV MethodRequest (v2) class=" << iter->second->getPackageName()
<< ":" << iter->second->getClassName() << " method=" <<
- methodName << " replyTo=" << replyTo << " objId=" << objId << " inArgs=" << inArgs);
+ methodName << " replyTo=" << rte << "/" << rtk << " objId=" << objId << " inArgs=" << inArgs);
try {
sys::Mutex::ScopedUnlock u(userLock);
@@ -1391,18 +1416,18 @@ void ManagementAgent::handleMethodReques
} else
error = callMap["_status_text"].asString();
} catch(exception& e) {
- sendExceptionLH(replyTo, cid, e.what(), Manageable::STATUS_EXCEPTION, viaLocal);
+ sendExceptionLH(rte, rtk, cid, e.what(), Manageable::STATUS_EXCEPTION, viaLocal);
return;
}
if (errorCode != 0) {
- sendExceptionLH(replyTo, cid, error, errorCode, viaLocal);
+ sendExceptionLH(rte, rtk, cid, error, errorCode, viaLocal);
return;
}
MapCodec::encode(outMap, content);
- sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyTo);
- QPID_LOG(debug, "SEND MethodResponse (v2) to=" << replyTo << " seq=" << cid << " map=" << outMap);
+ sendBufferLH(content, cid, headers, "amqp/map", rte, rtk);
+ QPID_LOG(debug, "SEND MethodResponse (v2) to=" << rte << "/" << rtk << " seq=" << cid << " map=" << outMap);
}
@@ -1549,7 +1574,7 @@ void ManagementAgent::SchemaClass::appen
buf.putRawData(reinterpret_cast<uint8_t*>(&data[0]), data.size());
}
-void ManagementAgent::handleSchemaRequestLH(Buffer& inBuffer, const string& replyToKey, uint32_t sequence)
+void ManagementAgent::handleSchemaRequestLH(Buffer& inBuffer, const string& rte, const string& rtk, uint32_t sequence)
{
string packageName;
SchemaClassKey key;
@@ -1558,7 +1583,7 @@ void ManagementAgent::handleSchemaReques
key.decode(inBuffer);
QPID_LOG(debug, "RECV SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) <<
- "), replyTo=" << replyToKey << " seq=" << sequence);
+ "), replyTo=" << rte << "/" << rtk << " seq=" << sequence);
PackageMap::iterator pIter = packages.find(packageName);
if (pIter != packages.end()) {
@@ -1574,17 +1599,17 @@ void ManagementAgent::handleSchemaReques
classInfo.appendSchema(outBuffer);
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
- sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(debug, "SEND SchemaResponse to=" << replyToKey << " seq=" << sequence);
+ sendBufferLH(outBuffer, outLen, rte, rtk);
+ QPID_LOG(debug, "SEND SchemaResponse to=" << rte << "/" << rtk << " seq=" << sequence);
}
else
- sendCommandCompleteLH(replyToKey, sequence, 1, "Schema not available");
+ sendCommandCompleteLH(rtk, sequence, 1, "Schema not available");
}
else
- sendCommandCompleteLH(replyToKey, sequence, 1, "Class key not found");
+ sendCommandCompleteLH(rtk, sequence, 1, "Class key not found");
}
else
- sendCommandCompleteLH(replyToKey, sequence, 1, "Package not found");
+ sendCommandCompleteLH(rtk, sequence, 1, "Package not found");
}
void ManagementAgent::handleSchemaResponseLH(Buffer& inBuffer, const string& /*replyToKey*/, uint32_t sequence)
@@ -1844,7 +1869,7 @@ void ManagementAgent::handleGetQueryLH(B
}
-void ManagementAgent::handleGetQueryLH(const string& body, const string& replyTo, const string& cid, bool viaLocal)
+void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, const string& rtk, const string& cid, bool viaLocal)
{
moveNewObjectsLH();
@@ -1865,17 +1890,17 @@ void ManagementAgent::handleGetQueryLH(c
*/
i = inMap.find("_what");
if (i == inMap.end()) {
- sendExceptionLH(replyTo, cid, "_what element missing in Query");
+ sendExceptionLH(rte, rtk, cid, "_what element missing in Query");
return;
}
if (i->second.getType() != qpid::types::VAR_STRING) {
- sendExceptionLH(replyTo, cid, "_what element is not a string");
+ sendExceptionLH(rte, rtk, cid, "_what element is not a string");
return;
}
if (i->second.asString() != "OBJECT") {
- sendExceptionLH(replyTo, cid, "Query for _what => '" + i->second.asString() + "' not supported");
+ sendExceptionLH(rte, rtk, cid, "Query for _what => '" + i->second.asString() + "' not supported");
return;
}
@@ -1934,8 +1959,8 @@ void ManagementAgent::handleGetQueryLH(c
string content;
ListCodec::encode(list_, content);
- sendBufferLH(content, cid, headers, "amqp/list", v2Direct, replyTo);
- QPID_LOG(debug, "SENT QueryResponse (query by object_id) to=" << replyTo);
+ sendBufferLH(content, cid, headers, "amqp/list", rte, rtk);
+ QPID_LOG(debug, "SENT QueryResponse (query by object_id) to=" << rte << "/" << rtk);
return;
}
} else {
@@ -1987,27 +2012,26 @@ void ManagementAgent::handleGetQueryLH(c
string content;
while (_list.size() > 1) {
ListCodec::encode(_list.front().asList(), content);
- sendBufferLH(content, cid, headers, "amqp/list", v2Direct, replyTo);
+ sendBufferLH(content, cid, headers, "amqp/list", rte, rtk);
_list.pop_front();
- QPID_LOG(debug, "SENT QueryResponse (partial, query by schema_id) to=" << replyTo << " len=" << content.length());
+ QPID_LOG(debug, "SENT QueryResponse (partial, query by schema_id) to=" << rte << "/" << rtk << " len=" << content.length());
}
headers.erase("partial");
ListCodec::encode(_list.size() ? _list.front().asList() : Variant::List(), content);
- sendBufferLH(content, cid, headers, "amqp/list", v2Direct, replyTo);
- QPID_LOG(debug, "SENT QueryResponse (query by schema_id) to=" << replyTo << " len=" << content.length());
+ sendBufferLH(content, cid, headers, "amqp/list", rte, rtk);
+ QPID_LOG(debug, "SENT QueryResponse (query by schema_id) to=" << rte << "/" << rtk << " len=" << content.length());
return;
}
// Unrecognized query - Send empty message to indicate CommandComplete
string content;
ListCodec::encode(Variant::List(), content);
- sendBufferLH(content, cid, headers, "amqp/list", v2Direct, replyTo);
- QPID_LOG(debug, "SENT QueryResponse (empty) to=" << replyTo);
+ sendBufferLH(content, cid, headers, "amqp/list", rte, rtk);
+ QPID_LOG(debug, "SENT QueryResponse (empty) to=" << rte << "/" << rtk);
}
-void ManagementAgent::handleLocateRequestLH(const string&, const string& replyTo,
- const string& cid)
+void ManagementAgent::handleLocateRequestLH(const string&, const string& rte, const string& rtk, const string& cid)
{
QPID_LOG(debug, "RCVD AgentLocateRequest");
@@ -2025,10 +2049,10 @@ void ManagementAgent::handleLocateReques
string content;
MapCodec::encode(map, content);
- sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyTo);
+ sendBufferLH(content, cid, headers, "amqp/map", rte, rtk);
clientWasAdded = true;
- QPID_LOG(debug, "SENT AgentLocateResponse replyTo=" << replyTo);
+ QPID_LOG(debug, "SENT AgentLocateResponse replyTo=" << rte << "/" << rtk);
}
@@ -2155,13 +2179,14 @@ bool ManagementAgent::authorizeAgentMess
msg.getFrames().getHeaders()->get<framing::MessageProperties>();
if (p && p->hasReplyTo()) {
const framing::ReplyTo& rt = p->getReplyTo();
- string replyToKey = rt.getRoutingKey();
+ string rte = rt.getExchange();
+ string rtk = rt.getRoutingKey();
string cid;
if (p && p->hasCorrelationId())
cid = p->getCorrelationId();
if (mapMsg) {
- sendExceptionLH(replyToKey, cid, Manageable::StatusText(Manageable::STATUS_FORBIDDEN),
+ sendExceptionLH(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_FORBIDDEN),
Manageable::STATUS_FORBIDDEN, false);
} else {
@@ -2173,7 +2198,7 @@ bool ManagementAgent::authorizeAgentMess
outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN));
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
- sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
+ sendBufferLH(outBuffer, outLen, rte, rtk);
}
QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence);
@@ -2187,12 +2212,14 @@ bool ManagementAgent::authorizeAgentMess
void ManagementAgent::dispatchAgentCommandLH(Message& msg, bool viaLocal)
{
- string replyToKey;
+ string rte;
+ string rtk;
const framing::MessageProperties* p =
msg.getFrames().getHeaders()->get<framing::MessageProperties>();
if (p && p->hasReplyTo()) {
const framing::ReplyTo& rt = p->getReplyTo();
- replyToKey = rt.getRoutingKey();
+ rte = rt.getExchange();
+ rtk = rt.getRoutingKey();
}
else
return;
@@ -2224,11 +2251,11 @@ void ManagementAgent::dispatchAgentComma
}
if (opcode == "_method_request")
- return handleMethodRequestLH(body, replyToKey, cid, msg.getPublisher(), viaLocal);
+ return handleMethodRequestLH(body, rte, rtk, cid, msg.getPublisher(), viaLocal);
else if (opcode == "_query_request")
- return handleGetQueryLH(body, replyToKey, cid, viaLocal);
+ return handleGetQueryLH(body, rte, rtk, cid, viaLocal);
else if (opcode == "_agent_locate_request")
- return handleLocateRequestLH(body, replyToKey, cid);
+ return handleLocateRequestLH(body, rte, rtk, cid);
QPID_LOG(warning, "Support for QMF Opcode [" << opcode << "] TBD!!!");
return;
@@ -2241,16 +2268,16 @@ void ManagementAgent::dispatchAgentComma
if (!checkHeader(inBuffer, &opcode, &sequence))
return;
- if (opcode == 'B') handleBrokerRequestLH (inBuffer, replyToKey, sequence);
- else if (opcode == 'P') handlePackageQueryLH (inBuffer, replyToKey, sequence);
- else if (opcode == 'p') handlePackageIndLH (inBuffer, replyToKey, sequence);
- else if (opcode == 'Q') handleClassQueryLH (inBuffer, replyToKey, sequence);
- else if (opcode == 'q') handleClassIndLH (inBuffer, replyToKey, sequence);
- else if (opcode == 'S') handleSchemaRequestLH (inBuffer, replyToKey, sequence);
- else if (opcode == 's') handleSchemaResponseLH (inBuffer, replyToKey, sequence);
- else if (opcode == 'A') handleAttachRequestLH (inBuffer, replyToKey, sequence, msg.getPublisher());
- else if (opcode == 'G') handleGetQueryLH (inBuffer, replyToKey, sequence);
- else if (opcode == 'M') handleMethodRequestLH (inBuffer, replyToKey, sequence, msg.getPublisher());
+ if (opcode == 'B') handleBrokerRequestLH (inBuffer, rtk, sequence);
+ else if (opcode == 'P') handlePackageQueryLH (inBuffer, rtk, sequence);
+ else if (opcode == 'p') handlePackageIndLH (inBuffer, rtk, sequence);
+ else if (opcode == 'Q') handleClassQueryLH (inBuffer, rtk, sequence);
+ else if (opcode == 'q') handleClassIndLH (inBuffer, rtk, sequence);
+ else if (opcode == 'S') handleSchemaRequestLH (inBuffer, rte, rtk, sequence);
+ else if (opcode == 's') handleSchemaResponseLH (inBuffer, rtk, sequence);
+ else if (opcode == 'A') handleAttachRequestLH (inBuffer, rtk, sequence, msg.getPublisher());
+ else if (opcode == 'G') handleGetQueryLH (inBuffer, rtk, sequence);
+ else if (opcode == 'M') handleMethodRequestLH (inBuffer, rtk, sequence, msg.getPublisher());
}
}
Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h?rev=1066557&r1=1066556&r2=1066557&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h Wed Feb 2 18:11:07 2011
@@ -354,7 +354,11 @@ private:
void sendBufferLH(framing::Buffer& buf,
uint32_t length,
qpid::broker::Exchange::shared_ptr exchange,
- std::string routingKey);
+ const std::string& routingKey);
+ void sendBufferLH(framing::Buffer& buf,
+ uint32_t length,
+ const std::string& exchange,
+ const std::string& routingKey);
void sendBufferLH(const std::string& data,
const std::string& cid,
const qpid::types::Variant::Map& headers,
@@ -362,6 +366,13 @@ private:
qpid::broker::Exchange::shared_ptr exchange,
const std::string& routingKey,
uint64_t ttl_msec = 0);
+ void sendBufferLH(const std::string& data,
+ const std::string& cid,
+ const qpid::types::Variant::Map& headers,
+ const std::string& content_type,
+ const std::string& exchange,
+ const std::string& routingKey,
+ uint64_t ttl_msec = 0);
void moveNewObjectsLH();
bool moveDeletedObjectsLH();
@@ -386,20 +397,20 @@ private:
void deleteOrphanedAgentsLH();
void sendCommandCompleteLH(const std::string& replyToKey, uint32_t sequence,
uint32_t code = 0, const std::string& text = "OK");
- void sendExceptionLH(const std::string& replyToKey, const std::string& cid, const std::string& text, uint32_t code=1, bool viaLocal=false);
+ void sendExceptionLH(const std::string& rte, const std::string& rtk, const std::string& cid, const std::string& text, uint32_t code=1, bool viaLocal=false);
void handleBrokerRequestLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
void handlePackageQueryLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
void handlePackageIndLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
void handleClassQueryLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
void handleClassIndLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
- void handleSchemaRequestLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
+ void handleSchemaRequestLH (framing::Buffer& inBuffer, const std::string& replyToEx, const std::string& replyToKey, uint32_t sequence);
void handleSchemaResponseLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
void handleAttachRequestLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken);
void handleGetQueryLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
void handleMethodRequestLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken);
- void handleGetQueryLH (const std::string& body, const std::string& replyToKey, const std::string& cid, bool viaLocal);
- void handleMethodRequestLH (const std::string& body, const std::string& replyToKey, const std::string& cid, const qpid::broker::ConnectionToken* connToken, bool viaLocal);
- void handleLocateRequestLH (const std::string& body, const std::string &replyToKey, const std::string& cid);
+ void handleGetQueryLH (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, bool viaLocal);
+ void handleMethodRequestLH (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, const qpid::broker::ConnectionToken* connToken, bool viaLocal);
+ void handleLocateRequestLH (const std::string& body, const std::string& replyToEx, const std::string &replyToKey, const std::string& cid);
size_t validateSchema(framing::Buffer&, uint8_t kind);
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org