You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2010/03/31 23:13:14 UTC
svn commit: r929716 [2/4] - in /qpid/trunk/qpid: cpp/bindings/qmf/tests/
cpp/examples/qmf-agent/ cpp/include/qpid/agent/ cpp/include/qpid/framing/
cpp/include/qpid/management/ cpp/managementgen/ cpp/managementgen/qmfgen/
cpp/managementgen/qmfgen/templa...
Modified: qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp?rev=929716&r1=929715&r2=929716&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp Wed Mar 31 21:13:12 2010
@@ -22,7 +22,7 @@
#include "qpid/management/ManagementObject.h"
#include "qpid/log/Statement.h"
#include "qpid/agent/ManagementAgentImpl.h"
-#include "qpid/sys/Mutex.h"
+#include "qpid/amqp_0_10/Codecs.h"
#include <list>
#include <string.h>
#include <stdlib.h>
@@ -41,6 +41,9 @@ using std::ofstream;
using std::ifstream;
using std::string;
using std::endl;
+using qpid::types::Variant;
+using qpid::amqp_0_10::MapCodec;
+using qpid::amqp_0_10::ListCodec;
namespace {
Mutex lock;
@@ -81,7 +84,7 @@ const string ManagementAgentImpl::storeM
ManagementAgentImpl::ManagementAgentImpl() :
interval(10), extThread(false), pipeHandle(0), notifyCallback(0), notifyContext(0),
notifyable(0), inCallback(false),
- initialized(false), connected(false), lastFailure("never connected"),
+ initialized(false), connected(false), useMapMsg(false), lastFailure("never connected"),
clientWasAdded(true), requestedBrokerBank(0), requestedAgentBank(0),
assignedBrokerBank(0), assignedAgentBank(0), bootSequence(0),
connThreadBody(*this), connThread(connThreadBody),
@@ -117,6 +120,21 @@ ManagementAgentImpl::~ManagementAgentImp
}
}
+void ManagementAgentImpl::setName(const string& vendor, const string& product, const string& instance)
+{
+ attrMap["_vendor"] = vendor;
+ attrMap["_product"] = product;
+ string inst;
+ if (instance.empty()) {
+ inst = qpid::types::Uuid(true).str();
+ } else
+ inst = instance;
+
+ name_address = vendor + ":" + product + ":" + inst;
+ attrMap["_instance"] = inst;
+ attrMap["_name"] = name_address;
+}
+
void ManagementAgentImpl::init(const string& brokerHost,
uint16_t brokerPort,
uint16_t intervalSeconds,
@@ -140,7 +158,7 @@ void ManagementAgentImpl::init(const str
void ManagementAgentImpl::init(const qpid::client::ConnectionSettings& settings,
uint16_t intervalSeconds,
bool useExternalThread,
- const std::string& _storeFile)
+ const string& _storeFile)
{
interval = intervalSeconds;
extThread = useExternalThread;
@@ -157,13 +175,16 @@ void ManagementAgentImpl::init(const qpi
bootSequence = 1;
storeData(true);
+ if (attrMap.empty())
+ setName("vendor", "product");
+
initialized = true;
}
void ManagementAgentImpl::registerClass(const string& packageName,
const string& className,
uint8_t* md5Sum,
- qpid::management::ManagementObject::writeSchemaCall_t schemaCall)
+ ManagementObject::writeSchemaCall_t schemaCall)
{
Mutex::ScopedLock lock(agentLock);
PackageMap::iterator pIter = findOrAddPackage(packageName);
@@ -173,49 +194,77 @@ void ManagementAgentImpl::registerClass(
void ManagementAgentImpl::registerEvent(const string& packageName,
const string& eventName,
uint8_t* md5Sum,
- qpid::management::ManagementObject::writeSchemaCall_t schemaCall)
+ ManagementObject::writeSchemaCall_t schemaCall)
{
Mutex::ScopedLock lock(agentLock);
PackageMap::iterator pIter = findOrAddPackage(packageName);
addClassLocal(ManagementItem::CLASS_KIND_EVENT, pIter, eventName, md5Sum, schemaCall);
}
+// old-style add object: 64bit id - deprecated
ObjectId ManagementAgentImpl::addObject(ManagementObject* object,
uint64_t persistId)
{
+ std::string key;
+ if (persistId) {
+ key = boost::lexical_cast<std::string>(persistId);
+ }
+ return addObject(object, key, persistId != 0);
+}
+
+
+// new style add object - use this approach!
+ObjectId ManagementAgentImpl::addObject(ManagementObject* object,
+ const std::string& key,
+ bool persistent)
+{
Mutex::ScopedLock lock(addLock);
- uint16_t sequence = persistId ? 0 : bootSequence;
- uint64_t objectNum = persistId ? persistId : nextObjectId++;
- ObjectId objectId(&attachment, 0, sequence, objectNum);
+ uint16_t sequence = persistent ? 0 : bootSequence;
+
+ ObjectId objectId(&attachment, 0, sequence);
+ if (key.empty())
+ objectId.setV2Key(*object); // let object generate the key
+ else
+ objectId.setV2Key(key);
- // TODO: fix object-id handling
object->setObjectId(objectId);
newManagementObjects[objectId] = object;
return objectId;
}
+
void ManagementAgentImpl::raiseEvent(const ManagementEvent& event, severity_t severity)
{
Mutex::ScopedLock lock(agentLock);
Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
uint8_t sev = (severity == SEV_DEFAULT) ? event.getSeverity() : (uint8_t) severity;
stringstream key;
key << "console.event." << assignedBrokerBank << "." << assignedAgentBank << "." <<
event.getPackageName() << "." << event.getEventName();
- encodeHeader(outBuffer, 'e');
- outBuffer.putShortString(event.getPackageName());
- outBuffer.putShortString(event.getEventName());
- outBuffer.putBin128(event.getMd5Sum());
- outBuffer.putLongLong(uint64_t(Duration(now())));
- outBuffer.putOctet(sev);
- event.encode(outBuffer);
- outLen = MA_BUFFER_SIZE - outBuffer.available();
- outBuffer.reset();
- connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", key.str());
+ Variant::Map map_;
+ Variant::Map schemaId;
+ Variant::Map values;
+ Variant::Map headers;
+ string content;
+
+ map_["_schema_id"] = mapEncodeSchemaId(event.getPackageName(),
+ event.getEventName(),
+ event.getMd5Sum());
+ event.mapEncode(values);
+ map_["_values"] = values;
+ map_["_timestamp"] = uint64_t(Duration(now()));
+ map_["_severity"] = sev;
+
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_data_indication";
+ headers["qmf.content"] = "_event";
+ headers["qmf.agent"] = name_address;
+
+ MapCodec::encode(map_, content);
+ connThreadBody.sendBuffer(content, "", headers, "qmf.default.topic", key.str());
}
uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit)
@@ -235,8 +284,7 @@ uint32_t ManagementAgentImpl::pollCallba
methodQueue.pop_front();
{
Mutex::ScopedUnlock unlock(agentLock);
- Buffer inBuffer(const_cast<char*>(item->body.c_str()), item->body.size());
- invokeMethodRequest(inBuffer, item->sequence, item->replyTo);
+ invokeMethodRequest(item->body, item->cid, item->replyTo);
delete item;
}
}
@@ -274,20 +322,7 @@ void ManagementAgentImpl::setSignalCallb
void ManagementAgentImpl::startProtocol()
{
- char rawbuffer[512];
- Buffer buffer(rawbuffer, 512);
-
- connected = true;
- encodeHeader(buffer, 'A');
- buffer.putShortString("RemoteAgent [C++]");
- systemId.encode (buffer);
- buffer.putLong(requestedBrokerBank);
- buffer.putLong(requestedAgentBank);
- uint32_t length = buffer.getPosition();
- buffer.reset();
- connThreadBody.sendBuffer(buffer, length, "qpid.management", "broker");
- QPID_LOG(trace, "SENT AttachRequest: reqBroker=" << requestedBrokerBank <<
- " reqAgent=" << requestedAgentBank);
+ sendHeartbeat();
}
void ManagementAgentImpl::storeData(bool requested)
@@ -323,76 +358,54 @@ void ManagementAgentImpl::retrieveData()
}
}
-void ManagementAgentImpl::sendCommandComplete(string replyToKey, uint32_t sequence,
- uint32_t code, string text)
+void ManagementAgentImpl::sendHeartbeat()
{
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
+ static const string addr_exchange("qmf.default.topic");
+ static const string addr_key("agent.ind.heartbeat");
+
+ Variant::Map map;
+ Variant::Map headers;
+ string content;
- encodeHeader(outBuffer, 'z', sequence);
- outBuffer.putLong(code);
- outBuffer.putShortString(text);
- outLen = MA_BUFFER_SIZE - outBuffer.available();
- outBuffer.reset();
- connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyToKey);
- QPID_LOG(trace, "SENT CommandComplete: seq=" << sequence << " code=" << code << " text=" << text);
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_agent_heartbeat_indication";
+ headers["qmf.agent"] = name_address;
+
+ map["_values"] = attrMap;
+ map["_values"].asMap()["timestamp"] = uint64_t(Duration(now()));
+ map["_values"].asMap()["heartbeat_interval"] = interval;
+
+ MapCodec::encode(map, content);
+ connThreadBody.sendBuffer(content, "", headers, addr_exchange, addr_key);
+
+ QPID_LOG(trace, "SENT AgentHeartbeat name=" << name_address);
}
-void ManagementAgentImpl::handleAttachResponse(Buffer& inBuffer)
+void ManagementAgentImpl::sendException(const string& replyToKey, const string& cid,
+ const string& text, uint32_t code)
{
- Mutex::ScopedLock lock(agentLock);
+ static const string addr_exchange("qmf.default.direct");
- assignedBrokerBank = inBuffer.getLong();
- assignedAgentBank = inBuffer.getLong();
+ Variant::Map map;
+ Variant::Map headers;
+ Variant::Map values;
+ string content;
- QPID_LOG(trace, "RCVD AttachResponse: broker=" << assignedBrokerBank << " agent=" << assignedAgentBank);
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_exception";
+ headers["qmf.agent"] = name_address;
- if ((assignedBrokerBank != requestedBrokerBank) ||
- (assignedAgentBank != requestedAgentBank)) {
- if (requestedAgentBank == 0) {
- QPID_LOG(notice, "Initial object-id bank assigned: " << assignedBrokerBank << "." <<
- assignedAgentBank);
- } else {
- QPID_LOG(warning, "Collision in object-id! New bank assigned: " << assignedBrokerBank <<
- "." << assignedAgentBank);
- }
- storeData();
- requestedBrokerBank = assignedBrokerBank;
- requestedAgentBank = assignedAgentBank;
- }
-
- attachment.setBanks(assignedBrokerBank, assignedAgentBank);
-
- // Bind to qpid.management to receive commands
- connThreadBody.bindToBank(assignedBrokerBank, assignedAgentBank);
-
- // Send package indications for all local packages
- for (PackageMap::iterator pIter = packages.begin();
- pIter != packages.end();
- pIter++) {
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
+ values["error_code"] = code;
+ values["error_text"] = text;
+ map["_values"] = values;
- encodeHeader(outBuffer, 'p');
- encodePackageIndication(outBuffer, pIter);
- outLen = MA_BUFFER_SIZE - outBuffer.available();
- outBuffer.reset();
- connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker");
+ MapCodec::encode(map, content);
+ connThreadBody.sendBuffer(content, cid, headers, addr_exchange, replyToKey);
- // Send class indications for all local classes
- ClassMap cMap = pIter->second;
- for (ClassMap::iterator cIter = cMap.begin(); cIter != cMap.end(); cIter++) {
- outBuffer.reset();
- encodeHeader(outBuffer, 'q');
- encodeClassIndication(outBuffer, pIter, cIter);
- outLen = MA_BUFFER_SIZE - outBuffer.available();
- outBuffer.reset();
- connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker");
- }
- }
+ QPID_LOG(trace, "SENT Exception code=" << code <<" text=" << text);
}
-void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence)
+void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, const string& replyTo)
{
Mutex::ScopedLock lock(agentLock);
string packageName;
@@ -412,12 +425,14 @@ void ManagementAgentImpl::handleSchemaRe
SchemaClass& schema = cIter->second;
Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
+ string body;
encodeHeader(outBuffer, 's', sequence);
- schema.writeSchemaCall(outBuffer);
+ schema.writeSchemaCall(body);
+ outBuffer.putRawData(body);
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
- connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "broker");
+ connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyTo);
QPID_LOG(trace, "SENT SchemaInd: package=" << packageName << " class=" << key.name);
}
@@ -432,124 +447,250 @@ void ManagementAgentImpl::handleConsoleA
QPID_LOG(trace, "RCVD ConsoleAddedInd");
}
-void ManagementAgentImpl::invokeMethodRequest(Buffer& inBuffer, uint32_t sequence, string replyTo)
+void ManagementAgentImpl::invokeMethodRequest(const string& body, const string& cid, const string& replyTo)
{
- string methodName;
- string packageName;
- string className;
- uint8_t hash[16];
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
-
- ObjectId objId(inBuffer);
- inBuffer.getShortString(packageName);
- inBuffer.getShortString(className);
- inBuffer.getBin128(hash);
- inBuffer.getShortString(methodName);
-
- encodeHeader(outBuffer, 'm', sequence);
-
- ManagementObjectMap::iterator iter = managementObjects.find(objId);
- if (iter == managementObjects.end() || iter->second->isDeleted()) {
- outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT);
- outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT));
+ string methodName;
+ bool failed = false;
+ Variant::Map inMap;
+ Variant::Map outMap;
+ Variant::Map::const_iterator oid, mid;
+ string content;
+
+ MapCodec::decode(body, inMap);
+
+ outMap["_values"] = Variant::Map();
+
+ if ((oid = inMap.find("_object_id")) == inMap.end() ||
+ (mid = inMap.find("_method_name")) == inMap.end()) {
+ (outMap["_values"].asMap())["_status_code"] = Manageable::STATUS_PARAMETER_INVALID;
+ (outMap["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID);
+ failed = true;
} else {
- if ((iter->second->getPackageName() != packageName) ||
- (iter->second->getClassName() != className)) {
- outBuffer.putLong (Manageable::STATUS_PARAMETER_INVALID);
- outBuffer.putMediumString(Manageable::StatusText (Manageable::STATUS_PARAMETER_INVALID));
- }
- else
- try {
- outBuffer.record();
- iter->second->doMethod(methodName, inBuffer, outBuffer);
- } catch(exception& e) {
- outBuffer.restore();
- outBuffer.putLong(Manageable::STATUS_EXCEPTION);
- outBuffer.putMediumString(e.what());
+ string methodName;
+ ObjectId objId;
+ Variant::Map inArgs;
+ Variant::Map callMap;
+
+ try {
+ // conversions will throw if input is invalid.
+ objId = ObjectId(oid->second.asMap());
+ methodName = mid->second.getString();
+
+ mid = inMap.find("_arguments");
+ if (mid != inMap.end()) {
+ inArgs = (mid->second).asMap();
+ }
+
+ ManagementObjectMap::iterator iter = managementObjects.find(objId);
+ if (iter == managementObjects.end() || iter->second->isDeleted()) {
+ (outMap["_values"].asMap())["_status_code"] = Manageable::STATUS_UNKNOWN_OBJECT;
+ (outMap["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT);
+ failed = true;
+ } else {
+ iter->second->doMethod(methodName, inArgs, callMap);
}
+
+ if (callMap["_status_code"].asUint32() == 0) {
+ outMap["_arguments"] = Variant::Map();
+ for (Variant::Map::const_iterator iter = callMap.begin();
+ iter != callMap.end(); iter++)
+ if (iter->first != "_status_code" && iter->first != "_status_text")
+ outMap["_arguments"].asMap()[iter->first] = iter->second;
+ } else {
+ (outMap["_values"].asMap())["_status_code"] = callMap["_status_code"];
+ (outMap["_values"].asMap())["_status_text"] = callMap["_status_text"];
+ failed = true;
+ }
+
+ } catch(types::InvalidConversion& e) {
+ outMap.clear();
+ outMap["_values"] = Variant::Map();
+ (outMap["_values"].asMap())["_status_code"] = Manageable::STATUS_EXCEPTION;
+ (outMap["_values"].asMap())["_status_text"] = e.what();
+ failed = true;
+ }
+ }
+
+ Variant::Map headers;
+ headers["method"] = "response";
+ headers["qmf.agent"] = name_address;
+ if (failed) {
+ headers["qmf.opcode"] = "_exception";
+ QPID_LOG(trace, "SENT Exception map=" << outMap);
+ } else {
+ headers["qmf.opcode"] = "_method_response";
+ QPID_LOG(trace, "SENT MethodResponse map=" << outMap);
}
- outLen = MA_BUFFER_SIZE - outBuffer.available();
- outBuffer.reset();
- connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyTo);
+ MapCodec::encode(outMap, content);
+ connThreadBody.sendBuffer(content, cid, headers, "qmf.default.direct", replyTo);
}
-void ManagementAgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, string replyTo)
+void ManagementAgentImpl::handleGetQuery(const string& body, const string& cid, const string& replyTo)
{
- FieldTable ft;
- FieldTable::ValuePtr value;
-
moveNewObjectsLH();
- ft.decode(inBuffer);
+ Variant::Map inMap;
+ Variant::Map::const_iterator i;
+ Variant::Map headers;
+
+ MapCodec::decode(body, inMap);
+ QPID_LOG(trace, "RCVD GetQuery: map=" << inMap << " cid=" << cid);
+
+ headers["method"] = "response";
+ headers["qmf.opcode"] = "_query_response";
+ headers["qmf.content"] = "_data";
+ headers["qmf.agent"] = name_address;
+ headers["partial"] = Variant();
+
+ Variant::List list_;
+ Variant::Map map_;
+ Variant::Map values;
+ Variant::Map oidMap;
+ string content;
+
+ /*
+ * Unpack the _what element of the query. Currently we only support OBJECT queries.
+ */
+ i = inMap.find("_what");
+ if (i == inMap.end()) {
+ sendException(replyTo, cid, "_what element missing in Query");
+ return;
+ }
- QPID_LOG(trace, "RCVD GetQuery: map=" << ft);
+ if (i->second.getType() != qpid::types::VAR_STRING) {
+ sendException(replyTo, cid, "_what element is not a string");
+ return;
+ }
- value = ft.get("_class");
- if (value.get() == 0 || !value->convertsTo<string>()) {
- value = ft.get("_objectid");
- if (value.get() == 0 || !value->convertsTo<string>())
- return;
+ if (i->second.asString() != "OBJECT") {
+ sendException(replyTo, cid, "Query for _what => '" + i->second.asString() + "' not supported");
+ return;
+ }
- ObjectId selector(value->get<string>());
- ManagementObjectMap::iterator iter = managementObjects.find(selector);
+ string className;
+ string packageName;
+
+ /*
+ * Handle the _schema_id element, if supplied.
+ */
+ i = inMap.find("_schema_id");
+ if (i != inMap.end() && i->second.getType() == qpid::types::VAR_MAP) {
+ const Variant::Map& schemaIdMap(i->second.asMap());
+
+ Variant::Map::const_iterator s_iter = schemaIdMap.find("_class_name");
+ if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::types::VAR_STRING)
+ className = s_iter->second.asString();
+
+ s_iter = schemaIdMap.find("_package_name");
+ if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::types::VAR_STRING)
+ packageName = s_iter->second.asString();
+ }
+
+ /*
+ * Unpack the _object_id element of the query if it is present. If it is present, find that one
+ * object and return it. If it is not present, send a class-based result.
+ */
+ i = inMap.find("_object_id");
+ if (i != inMap.end() && i->second.getType() == qpid::types::VAR_MAP) {
+ ObjectId objId(i->second.asMap());
+
+ ManagementObjectMap::iterator iter = managementObjects.find(objId);
if (iter != managementObjects.end()) {
ManagementObject* object = iter->second;
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
if (object->getConfigChanged() || object->getInstChanged())
object->setUpdateTime();
- encodeHeader(outBuffer, 'g', sequence);
- object->writeProperties(outBuffer);
- object->writeStatistics(outBuffer, true);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyTo);
+ object->mapEncodeValues(values, true, true); // write both stats and properties
+ objId.mapEncode(oidMap);
+ map_["_values"] = values;
+ map_["_object_id"] = oidMap;
+ map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
+ object->getClassName(),
+ object->getMd5Sum());
+ list_.push_back(map_);
+ headers.erase("partial");
+
+ ListCodec::encode(list_, content);
+ connThreadBody.sendBuffer(content, cid, headers, "qmf.default.direct", replyTo, "amqp/list");
+ QPID_LOG(trace, "SENT QueryResponse (query by object_id) to=" << replyTo);
+ return;
+ }
+ } else {
+ for (ManagementObjectMap::iterator iter = managementObjects.begin();
+ iter != managementObjects.end();
+ iter++) {
+ ManagementObject* object = iter->second;
+ if (object->getClassName() == className &&
+ (packageName.empty() || object->getPackageName() == packageName)) {
+
+ // @todo support multiple object reply per message
+ values.clear();
+ list_.clear();
+ oidMap.clear();
+
+ if (object->getConfigChanged() || object->getInstChanged())
+ object->setUpdateTime();
- QPID_LOG(trace, "SENT ObjectInd");
+ object->mapEncodeValues(values, true, true); // write both stats and properties
+ iter->first.mapEncode(oidMap);
+ map_["_values"] = values;
+ map_["_object_id"] = oidMap;
+ map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
+ object->getClassName(),
+ object->getMd5Sum());
+ list_.push_back(map_);
+
+ ListCodec::encode(list_, content);
+ connThreadBody.sendBuffer(content, cid, headers, "qmf.default.direct", replyTo, "amqp/list");
+ QPID_LOG(trace, "SENT QueryResponse (query by schema_id) to=" << replyTo);
+ }
}
- sendCommandComplete(replyTo, sequence);
- return;
}
- string className(value->get<string>());
+ // end empty "non-partial" message to indicate CommandComplete
+ list_.clear();
+ headers.erase("partial");
+ ListCodec::encode(list_, content);
+ connThreadBody.sendBuffer(content, cid, headers, "qmf.default.direct", replyTo, "amqp/list");
+ QPID_LOG(trace, "SENT QueryResponse (empty with no 'partial' indicator) to=" << replyTo);
+}
- for (ManagementObjectMap::iterator iter = managementObjects.begin();
- iter != managementObjects.end();
- iter++) {
- ManagementObject* object = iter->second;
- if (object->getClassName() == className) {
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
+void ManagementAgentImpl::handleLocateRequest(const string&, const string& cid, const string& replyTo)
+{
+ QPID_LOG(trace, "RCVD AgentLocateRequest");
+ static const string addr_exchange("qmf.default.direct");
- if (object->getConfigChanged() || object->getInstChanged())
- object->setUpdateTime();
+ Variant::Map map;
+ Variant::Map headers;
+ string content;
- encodeHeader(outBuffer, 'g', sequence);
- object->writeProperties(outBuffer);
- object->writeStatistics(outBuffer, true);
- outLen = MA_BUFFER_SIZE - outBuffer.available();
- outBuffer.reset();
- connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyTo);
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_agent_locate_response";
+ headers["qmf.agent"] = name_address;
- QPID_LOG(trace, "SENT ObjectInd");
- }
- }
+ map["_values"] = attrMap;
+ map["_values"].asMap()["timestamp"] = uint64_t(Duration(now()));
+ map["_values"].asMap()["heartbeat_interval"] = interval;
- sendCommandComplete(replyTo, sequence);
+ MapCodec::encode(map, content);
+ connThreadBody.sendBuffer(content, cid, headers, addr_exchange, replyTo);
+
+ QPID_LOG(trace, "SENT AgentLocateResponse replyTo=" << replyTo);
+
+ {
+ Mutex::ScopedLock lock(agentLock);
+ clientWasAdded = true;
+ }
}
-void ManagementAgentImpl::handleMethodRequest(Buffer& inBuffer, uint32_t sequence, string replyTo)
+void ManagementAgentImpl::handleMethodRequest(const string& body, const string& cid, const string& replyTo)
{
if (extThread) {
Mutex::ScopedLock lock(agentLock);
- string body;
- inBuffer.getRawData(body, inBuffer.available());
- methodQueue.push_back(new QueuedMethod(sequence, replyTo, body));
+ methodQueue.push_back(new QueuedMethod(cid, replyTo, body));
if (pipeHandle != 0) {
pipeHandle->write("X", 1);
} else if (notifyable != 0) {
@@ -568,7 +709,7 @@ void ManagementAgentImpl::handleMethodRe
inCallback = false;
}
} else {
- invokeMethodRequest(inBuffer, sequence, replyTo);
+ invokeMethodRequest(body, cid, replyTo);
}
QPID_LOG(trace, "RCVD MethodRequest");
@@ -576,28 +717,45 @@ void ManagementAgentImpl::handleMethodRe
void ManagementAgentImpl::received(Message& msg)
{
+ string replyToKey;
+ framing::MessageProperties mp = msg.getMessageProperties();
+ if (mp.hasReplyTo()) {
+ const framing::ReplyTo& rt = mp.getReplyTo();
+ replyToKey = rt.getRoutingKey();
+ }
+
+ if (mp.hasAppId() && mp.getAppId() == "qmf2")
+ {
+ string opcode = mp.getApplicationHeaders().getAsString("qmf.opcode");
+ string cid = msg.getMessageProperties().getCorrelationId();
+
+ if (opcode == "_agent_locate_request") handleLocateRequest(msg.getData(), cid, replyToKey);
+ else if (opcode == "_method_request") handleMethodRequest(msg.getData(), cid, replyToKey);
+ else if (opcode == "_query_request") handleGetQuery(msg.getData(), cid, replyToKey);
+ else {
+ QPID_LOG(warning, "Support for QMF V2 Opcode [" << opcode << "] TBD!!!");
+ }
+ return;
+ }
+
+ // old preV2 binary messages
+
+ uint32_t sequence;
string data = msg.getData();
Buffer inBuffer(const_cast<char*>(data.c_str()), data.size());
uint8_t opcode;
- uint32_t sequence;
- string replyToKey;
- framing::MessageProperties p = msg.getMessageProperties();
- if (p.hasReplyTo()) {
- const framing::ReplyTo& rt = p.getReplyTo();
- replyToKey = rt.getRoutingKey();
- }
if (checkHeader(inBuffer, &opcode, &sequence))
{
- if (opcode == 'a') handleAttachResponse(inBuffer);
- else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence);
+ if (opcode == 'S') handleSchemaRequest(inBuffer, sequence, replyToKey);
else if (opcode == 'x') handleConsoleAddedIndication();
- else if (opcode == 'G') handleGetQuery(inBuffer, sequence, replyToKey);
- else if (opcode == 'M') handleMethodRequest(inBuffer, sequence, replyToKey);
+ else
+ QPID_LOG(warning, "Ignoring old-format QMF Request! opcode=" << char(opcode));
}
}
+
void ManagementAgentImpl::encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq)
{
buf.putOctet('A');
@@ -607,6 +765,19 @@ void ManagementAgentImpl::encodeHeader(B
buf.putLong (seq);
}
+Variant::Map ManagementAgentImpl::mapEncodeSchemaId(const string& pname,
+ const string& cname,
+ const uint8_t *md5Sum)
+{
+ Variant::Map map_;
+
+ map_["_package_name"] = pname;
+ map_["_class_name"] = cname;
+ map_["_hash"] = types::Uuid(md5Sum);
+ return map_;
+}
+
+
bool ManagementAgentImpl::checkHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq)
{
if (buf.getSize() < 8)
@@ -661,7 +832,7 @@ void ManagementAgentImpl::addClassLocal(
PackageMap::iterator pIter,
const string& className,
uint8_t* md5Sum,
- qpid::management::ManagementObject::writeSchemaCall_t schemaCall)
+ ManagementObject::writeSchemaCall_t schemaCall)
{
SchemaClassKey key;
ClassMap& cMap = pIter->second;
@@ -701,10 +872,7 @@ void ManagementAgentImpl::encodeClassInd
void ManagementAgentImpl::periodicProcessing()
{
-#define BUFSIZE 65536
Mutex::ScopedLock lock(agentLock);
- char msgChars[BUFSIZE];
- uint32_t contentSize;
list<pair<ObjectId, ManagementObject*> > deleteList;
if (!connected)
@@ -745,42 +913,53 @@ void ManagementAgentImpl::periodicProces
!baseObject->isDeleted()))
continue;
- Buffer msgBuffer(msgChars, BUFSIZE);
+ Variant::List list_;
+
for (ManagementObjectMap::iterator iter = baseIter;
iter != managementObjects.end();
iter++) {
ManagementObject* object = iter->second;
+ bool send_stats, send_props;
if (baseObject->isSameClass(*object) && object->getFlags() == 0) {
object->setFlags(1);
if (object->getConfigChanged() || object->getInstChanged())
object->setUpdateTime();
- if (object->getConfigChanged() || object->getForcePublish() || object->isDeleted()) {
- encodeHeader(msgBuffer, 'c');
- object->writeProperties(msgBuffer);
- }
-
- if (object->hasInst() && (object->getInstChanged() || object->getForcePublish())) {
- encodeHeader(msgBuffer, 'i');
- object->writeStatistics(msgBuffer);
+ send_props = (object->getConfigChanged() || object->getForcePublish() || object->isDeleted());
+ send_stats = (object->hasInst() && (object->getInstChanged() || object->getForcePublish()));
+
+ if (send_stats || send_props) {
+ Variant::Map map_;
+ Variant::Map values;
+ Variant::Map oid;
+
+ object->getObjectId().mapEncode(oid);
+ map_["_object_id"] = oid;
+ map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
+ object->getClassName(),
+ object->getMd5Sum());
+ object->mapEncodeValues(values, send_props, send_stats);
+ map_["_values"] = values;
+ list_.push_back(map_);
}
if (object->isDeleted())
deleteList.push_back(pair<ObjectId, ManagementObject*>(iter->first, object));
object->setForcePublish(false);
-
- if (msgBuffer.available() < (BUFSIZE / 2))
- break;
}
}
- contentSize = BUFSIZE - msgBuffer.available();
- if (contentSize > 0) {
- msgBuffer.reset();
- stringstream key;
- key << "console.obj." << assignedBrokerBank << "." << assignedAgentBank << "." <<
- baseObject->getPackageName() << "." << baseObject->getClassName();
- connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", key.str());
+ string content;
+ ListCodec::encode(list_, content);
+ if (content.length()) {
+ Variant::Map headers;
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_data_indication";
+ headers["qmf.content"] = "_data";
+ headers["qmf.agent"] = name_address;
+
+ connThreadBody.sendBuffer(content, "", headers, "qmf.default.topic", "agent.ind.data", "amqp/list");
+ QPID_LOG(trace, "SENT DataIndication");
}
}
@@ -793,18 +972,7 @@ void ManagementAgentImpl::periodicProces
}
deleteList.clear();
-
- {
- Buffer msgBuffer(msgChars, BUFSIZE);
- encodeHeader(msgBuffer, 'h');
- msgBuffer.putLongLong(uint64_t(Duration(now())));
- stringstream key;
- key << "console.heartbeat." << assignedBrokerBank << "." << assignedAgentBank;
-
- contentSize = BUFSIZE - msgBuffer.available();
- msgBuffer.reset();
- connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", key.str());
- }
+ sendHeartbeat();
}
void ManagementAgentImpl::ConnectionThread::run()
@@ -831,6 +999,10 @@ void ManagementAgentImpl::ConnectionThre
arg::exclusive=true);
session.exchangeBind(arg::exchange="amq.direct", arg::queue=queueName.str(),
arg::bindingKey=queueName.str());
+ session.exchangeBind(arg::exchange="qmf.default.direct", arg::queue=queueName.str(),
+ arg::bindingKey=agent.name_address);
+ session.exchangeBind(arg::exchange="qmf.default.topic", arg::queue=queueName.str(),
+ arg::bindingKey="console.#");
subscriptions->subscribe(agent, queueName.str(), dest);
QPID_LOG(info, "Connection established with broker");
@@ -839,6 +1011,7 @@ void ManagementAgentImpl::ConnectionThre
if (shutdown)
return;
operational = true;
+ agent.connected = true;
agent.startProtocol();
try {
Mutex::ScopedUnlock _unlock(connLock);
@@ -892,6 +1065,48 @@ void ManagementAgentImpl::ConnectionThre
const string& exchange,
const string& routingKey)
{
+ Message msg;
+ string data;
+
+ buf.getRawData(data, length);
+ msg.setData(data);
+ sendMessage(msg, exchange, routingKey);
+}
+
+
+
+void ManagementAgentImpl::ConnectionThread::sendBuffer(const string& data,
+ const string& cid,
+ const Variant::Map headers,
+ const string& exchange,
+ const string& routingKey,
+ const string& contentType)
+{
+ Message msg;
+ Variant::Map::const_iterator i;
+
+ if (!cid.empty())
+ msg.getMessageProperties().setCorrelationId(cid);
+
+ if (!contentType.empty())
+ msg.getMessageProperties().setContentType(contentType);
+ for (i = headers.begin(); i != headers.end(); ++i) {
+ msg.getHeaders().setString(i->first, i->second.asString());
+ }
+ msg.getHeaders().setString("app_id", "qmf2");
+
+ msg.setData(data);
+ sendMessage(msg, exchange, routingKey);
+}
+
+
+
+
+
+void ManagementAgentImpl::ConnectionThread::sendMessage(Message msg,
+ const string& exchange,
+ const string& routingKey)
+{
ConnectionThread::shared_ptr s;
{
Mutex::ScopedLock _lock(connLock);
@@ -900,23 +1115,21 @@ void ManagementAgentImpl::ConnectionThre
s = subscriptions;
}
- Message msg;
- string data;
-
- buf.getRawData(data, length);
msg.getDeliveryProperties().setRoutingKey(routingKey);
msg.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str()));
- msg.setData(data);
+ msg.getMessageProperties().getApplicationHeaders().setString("qmf.agent", agent.name_address);
try {
session.messageTransfer(arg::content=msg, arg::destination=exchange);
} catch(exception& e) {
- QPID_LOG(error, "Exception caught in sendBuffer: " << e.what());
+ QPID_LOG(error, "Exception caught in sendMessage: " << e.what());
// Bounce the connection
if (s)
s->stop();
}
}
+
+
void ManagementAgentImpl::ConnectionThread::bindToBank(uint32_t brokerBank, uint32_t agentBank)
{
stringstream key;
Modified: qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h?rev=929716&r1=929715&r2=929716&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h Wed Mar 31 21:13:12 2010
@@ -51,6 +51,9 @@ class ManagementAgentImpl : public Manag
// Methods from ManagementAgent
//
int getMaxThreads() { return 1; }
+ void setName(const std::string& vendor,
+ const std::string& product,
+ const std::string& instance="");
void init(const std::string& brokerHost = "localhost",
uint16_t brokerPort = 5672,
uint16_t intervalSeconds = 10,
@@ -75,6 +78,8 @@ class ManagementAgentImpl : public Manag
uint8_t* md5Sum,
management::ManagementObject::writeSchemaCall_t schemaCall);
ObjectId addObject(management::ManagementObject* objectPtr, uint64_t persistId = 0);
+ ObjectId addObject(management::ManagementObject* objectPtr, const std::string& key,
+ bool persistent);
void raiseEvent(const management::ManagementEvent& event, severity_t severity = SEV_DEFAULT);
uint32_t pollCallbacks(uint32_t callLimit = 0);
int getSignalFd();
@@ -120,10 +125,10 @@ class ManagementAgentImpl : public Manag
};
struct QueuedMethod {
- QueuedMethod(uint32_t _seq, std::string _reply, std::string _body) :
- sequence(_seq), replyTo(_reply), body(_body) {}
+ QueuedMethod(const std::string& _cid, const std::string& _reply, const std::string& _body) :
+ cid(_cid), replyTo(_reply), body(_body) {}
- uint32_t sequence;
+ std::string cid;
std::string replyTo;
std::string body;
};
@@ -140,6 +145,8 @@ class ManagementAgentImpl : public Manag
void received (client::Message& msg);
+ qpid::types::Variant::Map attrMap;
+ std::string name_address;
uint16_t interval;
bool extThread;
sys::PipeHandle* pipeHandle;
@@ -155,6 +162,7 @@ class ManagementAgentImpl : public Manag
client::ConnectionSettings connectionSettings;
bool initialized;
bool connected;
+ bool useMapMsg;
std::string lastFailure;
bool clientWasAdded;
@@ -198,6 +206,15 @@ class ManagementAgentImpl : public Manag
uint32_t length,
const std::string& exchange,
const std::string& routingKey);
+ void sendBuffer(const std::string& data,
+ const std::string& cid,
+ const qpid::types::Variant::Map headers,
+ const std::string& exchange,
+ const std::string& routingKey,
+ const std::string& contentType="amqp/map");
+ void sendMessage(qpid::client::Message msg,
+ const std::string& exchange,
+ const std::string& routingKey);
void bindToBank(uint32_t brokerBank, uint32_t agentBank);
void close();
bool isSleeping() const;
@@ -237,16 +254,21 @@ class ManagementAgentImpl : public Manag
PackageMap::iterator pIter,
ClassMap::iterator cIter);
void encodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0);
+ qpid::types::Variant::Map mapEncodeSchemaId(const std::string& pname,
+ const std::string& cname,
+ const uint8_t *md5Sum);
bool checkHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
- void sendCommandComplete (std::string replyToKey, uint32_t sequence,
- uint32_t code = 0, std::string text = std::string("OK"));
- void handleAttachResponse (qpid::framing::Buffer& inBuffer);
+ void sendHeartbeat();
+ void sendException(const std::string& replyToKey, const std::string& cid,
+ const std::string& text, uint32_t code=1);
void handlePackageRequest (qpid::framing::Buffer& inBuffer);
void handleClassQuery (qpid::framing::Buffer& inBuffer);
- void handleSchemaRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence);
- void invokeMethodRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo);
- void handleGetQuery (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo);
- void handleMethodRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo);
+ void handleSchemaRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence, const std::string& replyTo);
+ void invokeMethodRequest (const std::string& body, const std::string& cid, const std::string& replyTo);
+
+ void handleGetQuery (const std::string& body, const std::string& cid, const std::string& replyTo);
+ void handleLocateRequest (const std::string& body, const std::string& sequence, const std::string& replyTo);
+ void handleMethodRequest (const std::string& body, const std::string& sequence, const std::string& replyTo);
void handleConsoleAddedIndication();
};
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=929716&r1=929715&r2=929716&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Wed Mar 31 21:13:12 2010
@@ -93,7 +93,8 @@ Broker::Options::Options(const std::stri
tcpNoDelay(false),
requireEncrypted(false),
maxSessionRate(0),
- asyncQueueEvents(false) // Must be false in a cluster.
+ asyncQueueEvents(false), // Must be false in a cluster.
+ qmf2Support(false)
{
int c = sys::SystemInfo::concurrency();
workerThreads=c+1;
@@ -114,6 +115,7 @@ Broker::Options::Options(const std::stri
("connection-backlog", optValue(connectionBacklog, "N"), "Sets the connection backlog limit for the server socket")
("staging-threshold", optValue(stagingThreshold, "N"), "Stages messages over N bytes to disk")
("mgmt-enable,m", optValue(enableMgmt,"yes|no"), "Enable Management")
+ ("mgmt-qmf2", optValue(qmf2Support,"yes|no"), "Use QMF v2 for Broker Management")
("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"), "Management Publish Interval")
("queue-purge-interval", optValue(queueCleanInterval, "SECONDS"),
"Interval between attempts to purge any expired messages from queues")
@@ -138,7 +140,9 @@ const std::string knownHostsNone("none")
Broker::Broker(const Broker::Options& conf) :
poller(new Poller),
config(conf),
- managementAgent(conf.enableMgmt ? new ManagementAgent() : 0),
+ managementAgent(conf.enableMgmt ? new ManagementAgent(!conf.qmf2Support,
+ conf.qmf2Support)
+ : 0),
store(new NullMessageStore),
acl(0),
dataDir(conf.noDataDir ? std::string() : conf.dataDir),
@@ -164,6 +168,7 @@ Broker::Broker(const Broker::Options& co
QPID_LOG(info, "Management enabled");
managementAgent->configure(dataDir.isEnabled() ? dataDir.getPath() : string(),
conf.mgmtPubInterval, this, conf.workerThreads + 3);
+ managementAgent->setName("apache.org", "qpidd");
_qmf::Package packageInitializer(managementAgent.get());
System* system = new System (dataDir.isEnabled() ? dataDir.getPath() : string(), this);
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=929716&r1=929715&r2=929716&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Wed Mar 31 21:13:12 2010
@@ -113,6 +113,7 @@ public:
std::string knownHosts;
uint32_t maxSessionRate;
bool asyncQueueEvents;
+ bool qmf2Support;
private:
std::string getHome();
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=929716&r1=929715&r2=929716&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp Wed Mar 31 21:13:12 2010
@@ -149,7 +149,7 @@ Exchange::Exchange(const string& _name,
mgmtExchange = new _qmf::Exchange (agent, this, parent, _name);
mgmtExchange->set_durable(durable);
mgmtExchange->set_autoDelete(false);
- mgmtExchange->set_arguments(args);
+ mgmtExchange->set_arguments(ManagementAgent::toMap(args));
if (!durable) {
if (name.empty()) {
agent->addObject (mgmtExchange, 0x1000000000000004LL); // Special default exchange ID
@@ -336,7 +336,7 @@ void Exchange::Binding::startManagement(
{
management::ObjectId queueId = mo->getObjectId();
mgmtBinding = new _qmf::Binding
- (agent, this, (Manageable*) parent, queueId, key, args);
+ (agent, this, (Manageable*) parent, queueId, key, ManagementAgent::toMap(args));
if (!origin.empty())
mgmtBinding->set_origin(origin);
agent->addObject (mgmtBinding, agent->allocateId(this));
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=929716&r1=929715&r2=929716&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Wed Mar 31 21:13:12 2010
@@ -873,7 +873,7 @@ void Queue::configure(const FieldTable&
if (p && p->convertsTo<std::string>()) insertSequenceNumbers(p->get<std::string>());
if (mgmtObject != 0)
- mgmtObject->set_arguments (_settings);
+ mgmtObject->set_arguments(ManagementAgent::toMap(_settings));
if ( isDurable() && ! getPersistenceId() && ! recovering )
store->create(*this, _settings);
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=929716&r1=929715&r2=929716&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Wed Mar 31 21:13:12 2010
@@ -280,7 +280,7 @@ SemanticState::ConsumerImpl::ConsumerImp
if (agent != 0)
{
mgmtObject = new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId() ,name,
- !acquire, ackExpected, exclusive ,arguments);
+ !acquire, ackExpected, exclusive, ManagementAgent::toMap(arguments));
agent->addObject (mgmtObject, agent->allocateId(this));
mgmtObject->set_creditMode("WINDOW");
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=929716&r1=929715&r2=929716&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Wed Mar 31 21:13:12 2010
@@ -106,7 +106,7 @@ void SessionAdapter::ExchangeHandlerImpl
ManagementAgent* agent = getBroker().getManagementAgent();
if (agent)
agent->raiseEvent(_qmf::EventExchangeDeclare(getConnection().getUrl(), getConnection().getUserId(), exchange, type,
- alternateExchange, durable, false, args,
+ alternateExchange, durable, false, ManagementAgent::toMap(args),
response.second ? "created" : "existing"));
}catch(UnknownExchangeTypeException& /*e*/){
@@ -194,7 +194,8 @@ void SessionAdapter::ExchangeHandlerImpl
ManagementAgent* agent = getBroker().getManagementAgent();
if (agent)
- agent->raiseEvent(_qmf::EventBind(getConnection().getUrl(), getConnection().getUserId(), exchangeName, queueName, exchangeRoutingKey, arguments));
+ agent->raiseEvent(_qmf::EventBind(getConnection().getUrl(), getConnection().getUserId(), exchangeName,
+ queueName, exchangeRoutingKey, ManagementAgent::toMap(arguments)));
}
}else{
throw NotFoundException("Bind failed. No such exchange: " + exchangeName);
@@ -389,7 +390,7 @@ void SessionAdapter::QueueHandlerImpl::d
ManagementAgent* agent = getBroker().getManagementAgent();
if (agent)
agent->raiseEvent(_qmf::EventQueueDeclare(getConnection().getUrl(), getConnection().getUserId(),
- name, durable, exclusive, autoDelete, arguments,
+ name, durable, exclusive, autoDelete, ManagementAgent::toMap(arguments),
queue_created.second ? "created" : "existing"));
}
@@ -499,7 +500,7 @@ SessionAdapter::MessageHandlerImpl::subs
ManagementAgent* agent = getBroker().getManagementAgent();
if (agent)
agent->raiseEvent(_qmf::EventSubscribe(getConnection().getUrl(), getConnection().getUserId(),
- queueName, destination, exclusive, arguments));
+ queueName, destination, exclusive, ManagementAgent::toMap(arguments)));
}
void
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/System.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/System.cpp?rev=929716&r1=929715&r2=929716&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/System.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/System.cpp Wed Mar 31 21:13:12 2010
@@ -22,6 +22,7 @@
#include "qpid/management/ManagementAgent.h"
#include "qpid/framing/Uuid.h"
#include "qpid/sys/SystemInfo.h"
+#include "qpid/types/Uuid.h"
#include <iostream>
#include <fstream>
@@ -64,7 +65,7 @@ System::System (string _dataDir, Broker*
}
}
- mgmtObject = new _qmf::System (agent, this, systemId);
+ mgmtObject = new _qmf::System(agent, this, types::Uuid(systemId.c_array()));
std::string sysname, nodename, release, version, machine;
qpid::sys::SystemInfo::getSystemId (sysname,
nodename,
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org