You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2010/03/31 23:13:14 UTC
svn commit: r929716 [3/4] - in /qpid/trunk/qpid: cpp/bindings/qmf/tests/
cpp/examples/qmf-agent/ cpp/include/qpid/agent/ cpp/include/qpid/framing/
cpp/include/qpid/management/ cpp/managementgen/ cpp/managementgen/qmfgen/
cpp/managementgen/qmfgen/templa...
Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=929716&r1=929715&r2=929716&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp Wed Mar 31 21:13:12 2010
@@ -29,20 +29,46 @@
#include "qpid/sys/Time.h"
#include "qpid/broker/ConnectionState.h"
#include "qpid/broker/AclModule.h"
+#include "qpid/types/Variant.h"
+#include "qpid/types/Uuid.h"
+#include "qpid/framing/List.h"
+#include "qpid/amqp_0_10/Codecs.h"
#include <list>
#include <iostream>
#include <fstream>
#include <sstream>
+#include <typeinfo>
using boost::intrusive_ptr;
using qpid::framing::Uuid;
+using qpid::types::Variant;
+using qpid::amqp_0_10::MapCodec;
+using qpid::amqp_0_10::ListCodec;
using namespace qpid::framing;
using namespace qpid::management;
using namespace qpid::broker;
using namespace qpid::sys;
+using namespace qpid;
using namespace std;
namespace _qmf = qmf::org::apache::qpid::broker;
+
+
+static Variant::Map mapEncodeSchemaId(const std::string& pname,
+ const std::string& cname,
+ const std::string& type,
+ const uint8_t *md5Sum)
+{
+ Variant::Map map_;
+
+ map_["_package_name"] = pname;
+ map_["_class_name"] = cname;
+ map_["_type"] = type;
+ map_["_hash"] = qpid::types::Uuid(md5Sum);
+ return map_;
+}
+
+
ManagementAgent::RemoteAgent::~RemoteAgent ()
{
QPID_LOG(trace, "Remote Agent removed bank=[" << brokerBank << "." << agentBank << "]");
@@ -52,10 +78,11 @@ ManagementAgent::RemoteAgent::~RemoteAge
}
}
-ManagementAgent::ManagementAgent () :
+ManagementAgent::ManagementAgent (const bool qmfV1, const bool qmfV2) :
threadPoolSize(1), interval(10), broker(0), timer(0),
startTime(uint64_t(Duration(now()))),
- suppressed(false)
+ suppressed(false),
+ qmf1Support(qmfV1), qmf2Support(qmfV2)
{
nextObjectId = 1;
brokerBank = 1;
@@ -148,6 +175,27 @@ void ManagementAgent::pluginsInitialized
timer->add(new Periodic(*this, interval));
}
+
+void ManagementAgent::setName(const string& vendor, const string& product, const string& instance)
+{
+ attrMap["_vendor"] = vendor;
+ attrMap["_product"] = product;
+ string inst;
+ if (instance.empty()) {
+ if (uuid.isNull())
+ {
+ throw Exception("ManagementAgent::configure() must be called if default name is used.");
+ }
+ inst = uuid.str();
+ } else
+ inst = instance;
+
+ name_address = vendor + ":" + product + ":" + inst;
+ attrMap["_instance"] = inst;
+ attrMap["_name"] = name_address;
+}
+
+
void ManagementAgent::writeData ()
{
string filename (dataDir + "/.mbrokerdata");
@@ -194,6 +242,7 @@ void ManagementAgent::registerEvent (con
addClassLH(ManagementItem::CLASS_KIND_EVENT, pIter, eventName, md5Sum, schemaCall);
}
+// Deprecated: V1 objects
ObjectId ManagementAgent::addObject(ManagementObject* object, uint64_t persistId)
{
uint16_t sequence;
@@ -207,8 +256,47 @@ ObjectId ManagementAgent::addObject(Mana
objectNum = persistId;
}
- ObjectId objId(0 /*flags*/ , sequence, brokerBank, 0, objectNum);
- objId.setV2Key(*object);
+ ObjectId objId(0 /*flags*/, sequence, brokerBank, objectNum);
+ objId.setV2Key(*object); // let object generate the v2 key
+
+ object->setObjectId(objId);
+
+ {
+ Mutex::ScopedLock lock (addLock);
+ ManagementObjectMap::iterator destIter = newManagementObjects.find(objId);
+ if (destIter != newManagementObjects.end()) {
+ if (destIter->second->isDeleted()) {
+ newDeletedManagementObjects.push_back(destIter->second);
+ newManagementObjects.erase(destIter);
+ } else {
+ QPID_LOG(error, "ObjectId collision in addObject. class=" << object->getClassName() <<
+ " key=" << objId.getV2Key());
+ return objId;
+ }
+ }
+ newManagementObjects[objId] = object;
+ }
+
+ return objId;
+}
+
+
+
+ObjectId ManagementAgent::addObject(ManagementObject* object,
+ const std::string& key,
+ bool persistent)
+{
+ uint16_t sequence;
+
+ sequence = persistent ? 0 : bootSequence;
+
+ ObjectId objId(0 /*flags*/, sequence, brokerBank);
+ if (key.empty()) {
+ objId.setV2Key(*object); // let object generate the key
+ } else {
+ objId.setV2Key(key);
+ }
+
object->setObjectId(objId);
{
@@ -233,21 +321,57 @@ ObjectId ManagementAgent::addObject(Mana
void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severity)
{
Mutex::ScopedLock lock (userLock);
- Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
uint8_t sev = (severity == SEV_DEFAULT) ? event.getSeverity() : (uint8_t) severity;
- encodeHeader(outBuffer, 'e');
- outBuffer.putShortString(event.getPackageName());
- outBuffer.putShortString(event.getEventName());
- outBuffer.putBin128(event.getMd5Sum());
- outBuffer.putLongLong(uint64_t(Duration(now())));
- outBuffer.putOctet(sev);
- event.encode(outBuffer);
- outLen = MA_BUFFER_SIZE - outBuffer.available();
- outBuffer.reset();
- sendBuffer(outBuffer, outLen, mExchange,
- "console.event.1.0." + event.getPackageName() + "." + event.getEventName());
+ if (qmf1Support) {
+ Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ encodeHeader(outBuffer, 'e');
+ outBuffer.putShortString(event.getPackageName());
+ outBuffer.putShortString(event.getEventName());
+ outBuffer.putBin128(event.getMd5Sum());
+ outBuffer.putLongLong(uint64_t(Duration(now())));
+ outBuffer.putOctet(sev);
+ std::string sBuf;
+ event.encode(sBuf);
+ outBuffer.putRawData(sBuf);
+ outLen = MA_BUFFER_SIZE - outBuffer.available();
+ outBuffer.reset();
+ sendBuffer(outBuffer, outLen, mExchange,
+ "console.event.1.0." + event.getPackageName() + "." + event.getEventName());
+ QPID_LOG(trace, "SEND raiseEvent (v1) class=" << event.getPackageName() << "." << event.getEventName());
+ }
+
+ if (qmf2Support) {
+ Variant::Map map_;
+ Variant::Map schemaId;
+ Variant::Map values;
+ Variant::Map headers;
+
+ map_["_schema_id"] = mapEncodeSchemaId(event.getPackageName(),
+ event.getEventName(),
+ "_event",
+ event.getMd5Sum());
+ event.mapEncode(values);
+ map_["_values"] = values;
+ map_["_timestamp"] = uint64_t(Duration(now()));
+ map_["_severity"] = sev;
+
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_data_indication";
+ headers["qmf.content"] = "_event";
+ headers["qmf.agent"] = name_address;
+
+ stringstream key;
+ key << "agent.ind.event." << sev << "." << name_address << "." << event.getEventName();
+
+ string content;
+ MapCodec::encode(map_, content);
+ sendBuffer(content, "", headers, v2Topic, key.str());
+ QPID_LOG(trace, "SEND raiseEvent (v2) class=" << event.getPackageName() << "." << event.getEventName());
+ }
+
}
ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds)
@@ -355,6 +479,59 @@ void ManagementAgent::sendBuffer(Buffer&
} catch(exception&) {}
}
+
+void ManagementAgent::sendBuffer(const std::string& data,
+ const std::string& cid,
+ const Variant::Map& headers,
+ qpid::broker::Exchange::shared_ptr exchange,
+ const std::string& routingKey)
+{
+ Variant::Map::const_iterator i;
+
+ if (suppressed) {
+ QPID_LOG(trace, "Suppressed management message to " << routingKey);
+ return;
+ }
+ if (exchange.get() == 0) return;
+
+ intrusive_ptr<Message> msg(new Message());
+ AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange->getName (), 0, 0)));
+ AMQFrame header((AMQHeaderBody()));
+ AMQFrame content((AMQContentBody(data)));
+
+ method.setEof(false);
+ header.setBof(false);
+ header.setEof(false);
+ content.setBof(false);
+
+ msg->getFrames().append(method);
+ msg->getFrames().append(header);
+
+ MessageProperties* props =
+ msg->getFrames().getHeaders()->get<MessageProperties>(true);
+ props->setContentLength(data.length());
+ if (!cid.empty()) {
+ props->setCorrelationId(cid);
+ }
+
+ for (i = headers.begin(); i != headers.end(); ++i) {
+ msg->getOrInsertHeaders().setString(i->first, i->second.asString());
+ }
+ msg->getOrInsertHeaders().setString("app_id", "qmf2");
+
+ DeliveryProperties* dp =
+ msg->getFrames().getHeaders()->get<DeliveryProperties>(true);
+ dp->setRoutingKey(routingKey);
+
+ msg->getFrames().append(content);
+
+ DeliverableMessage deliverable (msg);
+ try {
+ exchange->route(deliverable, routingKey, 0);
+ } catch(exception&) {}
+}
+
+
void ManagementAgent::moveNewObjectsLH()
{
Mutex::ScopedLock lock (addLock);
@@ -391,12 +568,13 @@ void ManagementAgent::periodicProcessing
{
#define BUFSIZE 65536
#define HEADROOM 4096
- QPID_LOG(trace, "Management agent periodic processing")
- Mutex::ScopedLock lock (userLock);
+ QPID_LOG(trace, "Management agent periodic processing");
+ Mutex::ScopedLock lock (userLock);
char msgChars[BUFSIZE];
uint32_t contentSize;
string routingKey;
list<pair<ObjectId, ManagementObject*> > deleteList;
+ std::string sBuf;
uint64_t uptime = uint64_t(Duration(now())) - startTime;
static_cast<_qmf::Broker*>(broker->GetManagementObject())->set_uptime(uptime);
@@ -439,43 +617,90 @@ void ManagementAgent::periodicProcessing
continue;
Buffer msgBuffer(msgChars, BUFSIZE);
+ Variant::List list_;
+
for (ManagementObjectMap::iterator iter = baseIter;
iter != managementObjects.end();
iter++) {
ManagementObject* object = iter->second;
+ bool send_stats, send_props;
if (baseObject->isSameClass(*object) && object->getFlags() == 0) {
object->setFlags(1);
if (object->getConfigChanged() || object->getInstChanged())
object->setUpdateTime();
- if (object->getConfigChanged() || object->getForcePublish() || object->isDeleted()) {
+ send_props = (object->getConfigChanged() || object->getForcePublish() || object->isDeleted());
+ send_stats = (object->hasInst() && (object->getInstChanged() || object->getForcePublish()));
+
+ if (send_props && qmf1Support) {
encodeHeader(msgBuffer, 'c');
- object->writeProperties(msgBuffer);
- pcount++;
+ sBuf.clear();
+ object->writeProperties(sBuf);
+ msgBuffer.putRawData(sBuf);
}
-
- if (object->hasInst() && (object->getInstChanged() || object->getForcePublish())) {
+
+ if (send_stats && qmf1Support) {
encodeHeader(msgBuffer, 'i');
- object->writeStatistics(msgBuffer);
- scount++;
+ sBuf.clear();
+ object->writeStatistics(sBuf);
+ msgBuffer.putRawData(sBuf);
+ }
+
+ if ((send_stats || send_props) && qmf2Support) {
+ Variant::Map map_;
+ Variant::Map values;
+
+ map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
+ object->getClassName(),
+ "_data",
+ object->getMd5Sum());
+ object->mapEncodeValues(values, send_props, send_stats);
+ map_["_values"] = values;
+ list_.push_back(map_);
+
}
+ if (send_props) pcount++;
+ if (send_stats) scount++;
+
if (object->isDeleted())
deleteList.push_back(pair<ObjectId, ManagementObject*>(iter->first, object));
object->setForcePublish(false);
- if (msgBuffer.available() < HEADROOM)
+ if (qmf1Support && (msgBuffer.available() < HEADROOM))
break;
}
}
- contentSize = BUFSIZE - msgBuffer.available();
- if (contentSize > 0) {
- msgBuffer.reset();
- stringstream key;
- key << "console.obj.1.0." << baseObject->getPackageName() << "." << baseObject->getClassName();
- sendBuffer(msgBuffer, contentSize, mExchange, key.str());
- QPID_LOG(trace, "SEND Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount);
+ if (pcount || scount) {
+ if (qmf1Support) {
+ contentSize = BUFSIZE - msgBuffer.available();
+ if (contentSize > 0) {
+ msgBuffer.reset();
+ stringstream key;
+ key << "console.obj.1.0." << baseObject->getPackageName() << "." << baseObject->getClassName();
+ sendBuffer(msgBuffer, contentSize, mExchange, key.str());
+ QPID_LOG(trace, "SEND Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount);
+ }
+ }
+
+ if (qmf2Support) {
+ string content;
+ ListCodec::encode(list_, content);
+ if (content.length()) {
+ stringstream key;
+ Variant::Map headers;
+ key << "agent.ind.data." << baseObject->getPackageName() << "." << baseObject->getClassName();
+ // key << "console.obj.1.0." << baseObject->getPackageName() << "." << baseObject->getClassName();
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_data_indication";
+ headers["qmf.content"] = "_data";
+ headers["qmf.agent"] = name_address;
+
+ sendBuffer(content, "", headers, v2Topic, key.str());
+ QPID_LOG(trace, "SEND Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount);
+ }
+ }
}
}
@@ -492,15 +717,49 @@ void ManagementAgent::periodicProcessing
for (ManagementObjectVector::iterator cdIter = deletedManagementObjects.begin();
cdIter != deletedManagementObjects.end(); cdIter++) {
collisionDeletions = true;
- Buffer msgBuffer(msgChars, BUFSIZE);
- encodeHeader(msgBuffer, 'c');
- (*cdIter)->writeProperties(msgBuffer);
- contentSize = BUFSIZE - msgBuffer.available ();
- msgBuffer.reset ();
- stringstream key;
- key << "console.obj.1.0." << (*cdIter)->getPackageName() << "." << (*cdIter)->getClassName();
- sendBuffer (msgBuffer, contentSize, mExchange, key.str());
- QPID_LOG(trace, "SEND ContentInd for deleted object to=" << key.str());
+ {
+ if (qmf1Support) {
+ Buffer msgBuffer(msgChars, BUFSIZE);
+ encodeHeader(msgBuffer, 'c');
+ sBuf.clear();
+ (*cdIter)->writeProperties(sBuf);
+ msgBuffer.putRawData(sBuf);
+ contentSize = BUFSIZE - msgBuffer.available ();
+ msgBuffer.reset ();
+ stringstream key;
+ key << "console.obj.1.0." << (*cdIter)->getPackageName() << "." << (*cdIter)->getClassName();
+ sendBuffer (msgBuffer, contentSize, mExchange, key.str());
+ QPID_LOG(trace, "SEND ContentInd for deleted object to=" << key.str());
+ }
+
+ if (qmf2Support) {
+ Variant::List list_;
+ Variant::Map map_;
+ Variant::Map values;
+ Variant::Map headers;
+
+ map_["_schema_id"] = mapEncodeSchemaId((*cdIter)->getPackageName(),
+ (*cdIter)->getClassName(),
+ "_data",
+ (*cdIter)->getMd5Sum());
+ (*cdIter)->mapEncodeValues(values, true, false);
+ map_["_values"] = values;
+ list_.push_back(map_);
+
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_data_indication";
+ headers["qmf.content"] = "_data";
+ headers["qmf.agent"] = name_address;
+
+ stringstream key;
+ key << "agent.ind.data." << (*cdIter)->getPackageName() << "." << (*cdIter)->getClassName();
+
+ string content;
+ ListCodec::encode(list_, content);
+ sendBuffer(content, "", headers, v2Topic, key.str());
+ QPID_LOG(trace, "SEND ContentInd for deleted object to=" << key.str());
+ }
+ }
}
if (!deleteList.empty() || collisionDeletions) {
@@ -508,7 +767,12 @@ void ManagementAgent::periodicProcessing
deleteOrphanedAgentsLH();
}
- {
+ // heartbeat generation
+
+ if (qmf1Support) {
+#define BUFSIZE 65536
+ uint32_t contentSize;
+ char msgChars[BUFSIZE];
Buffer msgBuffer(msgChars, BUFSIZE);
encodeHeader(msgBuffer, 'h');
msgBuffer.putLongLong(uint64_t(Duration(now())));
@@ -519,6 +783,27 @@ void ManagementAgent::periodicProcessing
sendBuffer (msgBuffer, contentSize, mExchange, routingKey);
QPID_LOG(trace, "SEND HeartbeatInd to=" << routingKey);
}
+
+ if (qmf2Support) {
+ static const string addr_key("agent.ind.heartbeat");
+
+ Variant::Map map;
+ Variant::Map headers;
+
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_agent_heartbeat_indication";
+ headers["qmf.agent"] = name_address;
+
+ map["_values"] = attrMap;
+ map["_values"].asMap()["timestamp"] = uint64_t(Duration(now()));
+ map["_values"].asMap()["heartbeat_interval"] = interval;
+
+ string content;
+ MapCodec::encode(map, content);
+ sendBuffer(content, "", headers, v2Topic, addr_key);
+
+ QPID_LOG(trace, "SENT AgentHeartbeat name=" << name_address);
+ }
QPID_LOG(debug, "periodic update " << debugSnapshot());
}
@@ -531,19 +816,51 @@ void ManagementAgent::deleteObjectNowLH(
if (!object->isDeleted())
return;
+ if (qmf1Support) {
#define DNOW_BUFSIZE 2048
- char msgChars[DNOW_BUFSIZE];
- uint32_t contentSize;
- Buffer msgBuffer(msgChars, DNOW_BUFSIZE);
-
- encodeHeader(msgBuffer, 'c');
- object->writeProperties(msgBuffer);
- contentSize = msgBuffer.getPosition();
- msgBuffer.reset();
- stringstream key;
- key << "console.obj.1.0." << object->getPackageName() << "." << object->getClassName();
- sendBuffer(msgBuffer, contentSize, mExchange, key.str());
- QPID_LOG(trace, "SEND Immediate(delete) ContentInd to=" << key.str());
+ char msgChars[DNOW_BUFSIZE];
+ uint32_t contentSize;
+ Buffer msgBuffer(msgChars, DNOW_BUFSIZE);
+ std::string sBuf;
+
+ encodeHeader(msgBuffer, 'c');
+ object->writeProperties(sBuf);
+ msgBuffer.putRawData(sBuf);
+ contentSize = msgBuffer.getPosition();
+ msgBuffer.reset();
+ stringstream key;
+ key << "console.obj.1.0." << object->getPackageName() << "." << object->getClassName();
+ sendBuffer(msgBuffer, contentSize, mExchange, key.str());
+ QPID_LOG(trace, "SEND Immediate(delete) ContentInd to=" << key.str());
+ }
+
+ if (qmf2Support) {
+ Variant::List list_;
+ Variant::Map map_;
+ Variant::Map values;
+
+ map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
+ object->getClassName(),
+ "_data",
+ object->getMd5Sum());
+ object->mapEncodeValues(values, true, false);
+ map_["_values"] = values;
+ list_.push_back(map_);
+
+ stringstream key;
+ key << "agent.ind.data." << object->getPackageName() << "." << object->getClassName();
+
+ Variant::Map headers;
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_data_indication";
+ headers["qmf.content"] = "_data";
+ headers["qmf.agent"] = name_address;
+
+ string content;
+ ListCodec::encode(list_, content);
+ sendBuffer(content, "", headers, v2Topic, key.str());
+ QPID_LOG(trace, "SEND Immediate(delete) ContentInd to=" << key.str());
+ }
managementObjects.erase(oid);
}
@@ -566,35 +883,68 @@ void ManagementAgent::sendCommandComplet
bool ManagementAgent::dispatchCommand (Deliverable& deliverable,
const string& routingKey,
- const FieldTable* /*args*/)
+ const FieldTable* /*args*/,
+ const bool topic)
{
Mutex::ScopedLock lock (userLock);
Message& msg = ((DeliverableMessage&) deliverable).getMessage ();
- // Parse the routing key. This management broker should act as though it
- // is bound to the exchange to match the following keys:
- //
- // agent.1.0.#
- // broker
- // schema.#
+ if (qmf1Support && topic) {
- if (routingKey == "broker") {
- dispatchAgentCommandLH(msg);
- return false;
- }
+ // qmf1 is bound only to the topic management exchange.
+ // Parse the routing key. This management broker should act as though it
+ // is bound to the exchange to match the following keys:
+ //
+ // agent.1.0.#
+ // broker
+ // schema.#
- else if (routingKey.compare(0, 9, "agent.1.0") == 0) {
- dispatchAgentCommandLH(msg);
- return false;
- }
+ if (routingKey == "broker") {
+ dispatchAgentCommandLH(msg);
+ return false;
+ }
+
+ if (routingKey.length() > 6) {
- else if (routingKey.compare(0, 8, "agent.1.") == 0) {
- return authorizeAgentMessageLH(msg);
+ if (routingKey.compare(0, 9, "agent.1.0") == 0) {
+ dispatchAgentCommandLH(msg);
+ return false;
+ }
+
+ if (routingKey.compare(0, 8, "agent.1.") == 0) {
+ return authorizeAgentMessageLH(msg);
+ }
+
+ if (routingKey.compare(0, 7, "schema.") == 0) {
+ dispatchAgentCommandLH(msg);
+ return true;
+ }
+ }
}
- else if (routingKey.compare(0, 7, "schema.") == 0) {
- dispatchAgentCommandLH(msg);
- return true;
+ if (qmf2Support) {
+
+ if (topic) {
+
+ // Intercept messages bound to:
+ // "console.ind.locate.# - process these messages, and also allow them to be forwarded.
+
+ if (routingKey.compare(0, 18, "console.ind.locate") == 0) {
+ dispatchAgentCommandLH(msg);
+ return true;
+ }
+
+ } else { // direct exchange
+
+ // Intercept messages bound to:
+ // "broker" - generic alias for the local broker
+ // "<name_address>" - the broker agent's proper name
+ // and do not forward them futher
+ if (routingKey == "broker" || routingKey == name_address) {
+ dispatchAgentCommandLH(msg);
+ return false;
+ }
+ }
}
return true;
@@ -610,14 +960,19 @@ void ManagementAgent::handleMethodReques
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
AclModule* acl = broker->getAcl();
+ std::string inArgs;
- ObjectId objId(inBuffer);
+ std::string sBuf;
+ inBuffer.getRawData(sBuf, 16);
+ ObjectId objId;
+ objId.decode(sBuf);
inBuffer.getShortString(packageName);
inBuffer.getShortString(className);
inBuffer.getBin128(hash);
inBuffer.getShortString(methodName);
+ inBuffer.getRawData(inArgs, inBuffer.available());
- QPID_LOG(trace, "RECV MethodRequest class=" << packageName << ":" << className << "(" << Uuid(hash) << ") method=" <<
+ QPID_LOG(trace, "RECV MethodRequest (v1) class=" << packageName << ":" << className << "(" << Uuid(hash) << ") method=" <<
methodName << " replyTo=" << replyToKey);
encodeHeader(outBuffer, 'm', sequence);
@@ -629,8 +984,8 @@ void ManagementAgent::handleMethodReques
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
sendBuffer(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << sequence)
- return;
+ QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << sequence);
+ return;
}
if (acl != 0) {
@@ -645,8 +1000,8 @@ void ManagementAgent::handleMethodReques
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
sendBuffer(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence)
- return;
+ QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence);
+ return;
}
}
@@ -664,7 +1019,9 @@ void ManagementAgent::handleMethodReques
try {
outBuffer.record();
Mutex::ScopedUnlock u(userLock);
- iter->second->doMethod(methodName, inBuffer, outBuffer);
+ std::string outBuf;
+ iter->second->doMethod(methodName, inArgs, outBuf);
+ outBuffer.putRawData(outBuf);
} catch(exception& e) {
outBuffer.restore();
outBuffer.putLong(Manageable::STATUS_EXCEPTION);
@@ -675,9 +1032,135 @@ void ManagementAgent::handleMethodReques
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
sendBuffer(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(trace, "SEND MethodResponse to=" << replyToKey << " seq=" << sequence);
+ QPID_LOG(trace, "SEND MethodResponse (v1) to=" << replyToKey << " seq=" << sequence);
}
+
+void ManagementAgent::handleMethodRequestLH (const std::string& body, string replyTo,
+ const std::string& cid, const ConnectionToken* connToken)
+{
+ string methodName;
+ Variant::Map inMap;
+ MapCodec::decode(body, inMap);
+ Variant::Map::const_iterator oid, mid;
+ string content;
+
+ Variant::Map outMap;
+ Variant::Map headers;
+
+ headers["method"] = "response";
+ headers["qmf.opcode"] = "_method_response";
+ headers["qmf.agent"] = name_address;
+
+ if ((oid = inMap.find("_object_id")) == inMap.end() ||
+ (mid = inMap.find("_method_name")) == inMap.end())
+ {
+ headers["qmf.opcode"] = "_exception";
+ (outMap["_values"].asMap())["_status"] = Manageable::STATUS_PARAMETER_INVALID;
+ (outMap["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID);
+
+ MapCodec::encode(outMap, content);
+ sendBuffer(content, cid, headers, v2Direct, replyTo);
+ QPID_LOG(trace, "SEND MethodResponse (invalid param) to=" << replyTo << " seq=" << cid);
+ return;
+ }
+
+ ObjectId objId;
+ Variant::Map inArgs;
+
+ try {
+ // coversions will throw if input is invalid.
+ objId = ObjectId(oid->second.asMap());
+ methodName = mid->second.getString();
+
+ mid = inMap.find("_arguments");
+ if (mid != inMap.end()) {
+ inArgs = (mid->second).asMap();
+ }
+ } catch(exception& e) {
+ headers["qmf.opcode"] = "_exception";
+ (outMap["_values"].asMap())["_status"] = Manageable::STATUS_EXCEPTION;
+ (outMap["_values"].asMap())["_status_text"] = e.what();
+
+ MapCodec::encode(outMap, content);
+ sendBuffer(content, cid, headers, v2Direct, replyTo);
+ QPID_LOG(trace, "SEND MethodResponse (invalid format) to=" << replyTo << " seq=" << cid);
+ return;
+ }
+
+ ManagementObjectMap::iterator iter = managementObjects.find(objId);
+
+ if (iter == managementObjects.end() || iter->second->isDeleted()) {
+ headers["qmf.opcode"] = "_exception";
+ (outMap["_values"].asMap())["_status"] = Manageable::STATUS_UNKNOWN_OBJECT;
+ (outMap["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT);
+
+ MapCodec::encode(outMap, content);
+ sendBuffer(content, cid, headers, v2Direct, replyTo);
+ QPID_LOG(trace, "SEND MethodResponse (unknown object) to=" << replyTo << " seq=" << cid);
+ return;
+ }
+
+ // validate
+ AclModule* acl = broker->getAcl();
+ DisallowedMethods::const_iterator i;
+
+ i = disallowed.find(std::make_pair(iter->second->getClassName(), methodName));
+ if (i != disallowed.end()) {
+ headers["qmf.opcode"] = "_exception";
+ (outMap["_values"].asMap())["_status"] = Manageable::STATUS_FORBIDDEN;
+ (outMap["_values"].asMap())["_status_text"] = i->second;
+
+ MapCodec::encode(outMap, content);
+ sendBuffer(content, cid, headers, v2Direct, replyTo);
+ QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << cid);
+ return;
+ }
+
+ if (acl != 0) {
+ string userId = ((const qpid::broker::ConnectionState*) connToken)->getUserId();
+ map<acl::Property, string> params;
+ params[acl::PROP_SCHEMAPACKAGE] = iter->second->getPackageName();
+ params[acl::PROP_SCHEMACLASS] = iter->second->getClassName();
+
+ if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_METHOD, methodName, ¶ms)) {
+ headers["qmf.opcode"] = "_exception";
+ (outMap["_values"].asMap())["_status"] = Manageable::STATUS_FORBIDDEN;
+ (outMap["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_FORBIDDEN);
+
+ MapCodec::encode(outMap, content);
+ sendBuffer(content, cid, headers, v2Direct, replyTo);
+ QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << cid);
+ return;
+ }
+ }
+
+ // invoke the method
+
+ QPID_LOG(trace, "RECV MethodRequest (v2) class=" << iter->second->getPackageName()
+ << ":" << iter->second->getClassName() << " method=" <<
+ methodName << " replyTo=" << replyTo);
+
+ try {
+ iter->second->doMethod(methodName, inArgs, outMap);
+ } catch(exception& e) {
+ outMap.clear();
+ headers["qmf.opcode"] = "_exception";
+ (outMap["_values"].asMap())["_status"] = Manageable::STATUS_EXCEPTION;
+ (outMap["_values"].asMap())["_status_text"] = e.what();
+
+ MapCodec::encode(outMap, content);
+ sendBuffer(content, cid, headers, v2Direct, replyTo);
+ QPID_LOG(trace, "SEND MethodResponse (exception) to=" << replyTo << " seq=" << cid);
+ return;
+ }
+
+ MapCodec::encode(outMap, content);
+ sendBuffer(content, cid, headers, v2Direct, replyTo);
+ QPID_LOG(trace, "SEND MethodResponse (v2) to=" << replyTo << " seq=" << cid);
+}
+
+
void ManagementAgent::handleBrokerRequestLH (Buffer&, string replyToKey, uint32_t sequence)
{
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
@@ -781,6 +1264,7 @@ void ManagementAgent::handleClassIndLH (
uint32_t outLen;
uint32_t sequence = nextRequestSequence++;
+ // Schema Request
encodeHeader (outBuffer, 'S', sequence);
outBuffer.putShortString(packageName);
key.encode(outBuffer);
@@ -803,9 +1287,11 @@ void ManagementAgent::SchemaClass::appen
// linked in via plug-in), call the schema handler directly. If the package
// is from a remote management agent, send the stored schema information.
- if (writeSchemaCall != 0)
- writeSchemaCall(buf);
- else
+ if (writeSchemaCall != 0) {
+ std::string schema;
+ writeSchemaCall(schema);
+ buf.putRawData(schema);
+ } else
buf.putRawData(reinterpret_cast<uint8_t*>(&data[0]), data.size());
}
@@ -981,7 +1467,7 @@ void ManagementAgent::handleAttachReques
agent->mgmtObject->set_connectionRef(agent->connectionRef);
agent->mgmtObject->set_label (label);
agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId());
- agent->mgmtObject->set_systemId (systemId);
+ agent->mgmtObject->set_systemId ((const unsigned char*)systemId.data());
agent->mgmtObject->set_brokerBank (brokerBank);
agent->mgmtObject->set_agentBank (assignedBank);
addObject (agent->mgmtObject, 0);
@@ -1012,7 +1498,7 @@ void ManagementAgent::handleGetQueryLH (
ft.decode(inBuffer);
- QPID_LOG(trace, "RECV GetQuery query=" << ft << " seq=" << sequence);
+ QPID_LOG(trace, "RECV GetQuery (v1) query=" << ft << " seq=" << sequence);
value = ft.get("_class");
if (value.get() == 0 || !value->convertsTo<string>()) {
@@ -1031,13 +1517,17 @@ void ManagementAgent::handleGetQueryLH (
object->setUpdateTime();
if (!object->isDeleted()) {
+ std::string sBuf;
encodeHeader(outBuffer, 'g', sequence);
- object->writeProperties(outBuffer);
- object->writeStatistics(outBuffer, true);
+ object->writeProperties(sBuf);
+ outBuffer.putRawData(sBuf);
+ sBuf.clear();
+ object->writeStatistics(sBuf, true);
+ outBuffer.putRawData(sBuf);
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
sendBuffer(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(trace, "SEND GetResponse to=" << replyToKey << " seq=" << sequence);
+ QPID_LOG(trace, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence);
}
}
sendCommandComplete(replyToKey, sequence);
@@ -1058,13 +1548,17 @@ void ManagementAgent::handleGetQueryLH (
object->setUpdateTime();
if (!object->isDeleted()) {
+ std::string sBuf;
encodeHeader(outBuffer, 'g', sequence);
- object->writeProperties(outBuffer);
- object->writeStatistics(outBuffer, true);
+ object->writeProperties(sBuf);
+ outBuffer.putRawData(sBuf);
+ sBuf.clear();
+ object->writeStatistics(sBuf, true);
+ outBuffer.putRawData(sBuf);
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
sendBuffer(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(trace, "SEND GetResponse to=" << replyToKey << " seq=" << sequence);
+ QPID_LOG(trace, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence);
}
}
}
@@ -1072,64 +1566,285 @@ void ManagementAgent::handleGetQueryLH (
sendCommandComplete(replyToKey, sequence);
}
+
+void ManagementAgent::handleGetQueryLH(const std::string& body, std::string replyTo, const std::string& cid, const std::string& contentType)
+{
+ FieldTable ft;
+ FieldTable::ValuePtr value;
+
+ moveNewObjectsLH();
+
+ if (contentType != "_query_v1") {
+ QPID_LOG(warning, "Support for QMF V2 Query format TBD!!!");
+ return;
+ }
+
+ Variant::Map inMap;
+ MapCodec::decode(body, inMap);
+ Variant::Map::const_iterator i;
+ Variant::Map headers;
+
+ QPID_LOG(trace, "RECV GetQuery (v2): map=" << inMap << " seq=" << cid);
+
+ headers["method"] = "response";
+ headers["qmf.opcode"] = "_query_response";
+ headers["qmf.content"] = "_data";
+ headers["qmf.agent"] = name_address;
+ headers["partial"];
+
+ Variant::List list_;
+ Variant::Map map_;
+ Variant::Map values;
+ string className;
+ string content;
+
+ i = inMap.find("_class");
+ if (i != inMap.end())
+ try {
+ className = i->second.asString();
+ } catch(exception& e) {
+ className.clear();
+ QPID_LOG(trace, "RCVD GetQuery: invalid format - class target ignored.");
+ }
+
+ if (className.empty()) {
+ ObjectId objId;
+ i = inMap.find("_object_id");
+ if (i != inMap.end()) {
+
+ try {
+ objId = ObjectId(i->second.asMap());
+ } catch (exception &e) {
+ objId = ObjectId(); // empty object id - won't find a match (I hope).
+ QPID_LOG(trace, "RCVD GetQuery (invalid Object Id format) to=" << replyTo << " seq=" << cid);
+ }
+
+ ManagementObjectMap::iterator iter = managementObjects.find(objId);
+ if (iter != managementObjects.end()) {
+ ManagementObject* object = iter->second;
+
+ if (object->getConfigChanged() || object->getInstChanged())
+ object->setUpdateTime();
+
+ if (!object->isDeleted()) {
+ object->mapEncodeValues(values, true, true); // write both stats and properties
+ map_["_values"] = values;
+ list_.push_back(map_);
+
+ ListCodec::encode(list_, content);
+ sendBuffer(content, cid, headers, v2Direct, replyTo);
+ }
+ }
+ }
+ } else {
+ for (ManagementObjectMap::iterator iter = managementObjects.begin();
+ iter != managementObjects.end();
+ iter++) {
+ ManagementObject* object = iter->second;
+ if (object->getClassName () == className) {
+
+ // @todo: support multiple objects per message reply
+ values.clear();
+ list_.clear();
+ if (object->getConfigChanged() || object->getInstChanged())
+ object->setUpdateTime();
+
+ if (!object->isDeleted()) {
+ object->mapEncodeValues(values, true, true); // write both stats and properties
+ map_["_values"] = values;
+ list_.push_back(map_);
+
+ ListCodec::encode(list_, content);
+ sendBuffer(content, cid, headers, v2Direct, replyTo);
+ }
+ }
+ }
+ }
+
+ // end empty "non-partial" message to indicate CommandComplete
+ list_.clear();
+ headers.erase("partial");
+ ListCodec::encode(list_, content);
+ sendBuffer(content, cid, headers, v2Direct, replyTo);
+ QPID_LOG(trace, "SEND GetResponse (v2) to=" << replyTo << " seq=" << cid);
+}
+
+
+void ManagementAgent::handleLocateRequestLH(const string&, const string& replyTo,
+ const string& cid)
+{
+ QPID_LOG(trace, "RCVD AgentLocateRequest");
+
+ Variant::Map map;
+ Variant::Map headers;
+
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_agent_locate_response";
+ headers["qmf.agent"] = name_address;
+
+ map["_values"] = attrMap;
+ map["_values"].asMap()["timestamp"] = uint64_t(Duration(now()));
+ map["_values"].asMap()["heartbeat_interval"] = interval;
+
+ string content;
+ MapCodec::encode(map, content);
+ sendBuffer(content, cid, headers, v2Direct, replyTo);
+
+ QPID_LOG(trace, "SENT AgentLocateResponse replyTo=" << replyTo);
+}
+
+
bool ManagementAgent::authorizeAgentMessageLH(Message& msg)
{
Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE);
- uint8_t opcode;
- uint32_t sequence;
- string replyToKey;
+ uint32_t sequence = 0;
+ bool methodReq = false;
+ bool mapMsg = false;
+ string packageName;
+ string className;
+ string methodName;
+ std::string cid;
if (msg.encodedSize() > MA_BUFFER_SIZE)
return false;
msg.encodeContent(inBuffer);
+ uint32_t bufferLen = inBuffer.getPosition();
inBuffer.reset();
- if (!checkHeader(inBuffer, &opcode, &sequence))
- return false;
+ const framing::MessageProperties* p =
+ msg.getFrames().getHeaders()->get<framing::MessageProperties>();
+
+ const framing::FieldTable *headers = msg.getApplicationHeaders();
+
+ if (headers && headers->getAsString("app_id") == "qmf2")
+ {
+ mapMsg = true;
+
+ if (p && p->hasCorrelationId()) {
+ cid = p->getCorrelationId();
+ }
+
+ if (headers->getAsString("qmf.opcode") == "_method_request")
+ {
+ methodReq = true;
+
+ // extract object id and method name
+
+ std::string body;
+ inBuffer.getRawData(body, bufferLen);
+ Variant::Map inMap;
+ MapCodec::decode(body, inMap);
+ Variant::Map::const_iterator oid, mid;
+
+ ObjectId objId;
+
+ if ((oid = inMap.find("_object_id")) == inMap.end() ||
+ (mid = inMap.find("_method_name")) == inMap.end()) {
+ QPID_LOG(warning,
+ "Missing fields in QMF authorize req received.");
+ return false;
+ }
+
+ try {
+ // coversions will throw if input is invalid.
+ objId = ObjectId(oid->second.asMap());
+ methodName = mid->second.getString();
+ } catch(exception& e) {
+ QPID_LOG(warning,
+ "Badly formatted QMF authorize req received.");
+ return false;
+ }
+
+ // look up schema for object to get package and class name
+
+ ManagementObjectMap::iterator iter = managementObjects.find(objId);
+
+ if (iter == managementObjects.end() || iter->second->isDeleted()) {
+ QPID_LOG(debug, "ManagementAgent::authorizeAgentMessageLH: stale object id " <<
+ objId);
+ return false;
+ }
- if (opcode == 'M') {
+ packageName = iter->second->getPackageName();
+ className = iter->second->getClassName();
+ }
+ } else { // old style binary message format
+
+ uint8_t opcode;
+
+ if (!checkHeader(inBuffer, &opcode, &sequence))
+ return false;
+
+ if (opcode == 'M') {
+ methodReq = true;
+
+ // extract method name & schema package and class name
+
+ uint8_t hash[16];
+ inBuffer.getLongLong(); // skip over object id
+ inBuffer.getLongLong();
+ inBuffer.getShortString(packageName);
+ inBuffer.getShortString(className);
+ inBuffer.getBin128(hash);
+ inBuffer.getShortString(methodName);
+
+ }
+ }
+
+ if (methodReq) {
// TODO: check method call against ACL list.
+ map<acl::Property, string> params;
AclModule* acl = broker->getAcl();
if (acl == 0)
return true;
string userId = ((const qpid::broker::ConnectionState*) msg.getPublisher())->getUserId();
- string packageName;
- string className;
- uint8_t hash[16];
- string methodName;
-
- map<acl::Property, string> params;
- ObjectId objId(inBuffer);
- inBuffer.getShortString(packageName);
- inBuffer.getShortString(className);
- inBuffer.getBin128(hash);
- inBuffer.getShortString(methodName);
-
params[acl::PROP_SCHEMAPACKAGE] = packageName;
params[acl::PROP_SCHEMACLASS] = className;
if (acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_METHOD, methodName, ¶ms))
return true;
+ // authorization failed, send reply if replyTo present
+
const framing::MessageProperties* p =
msg.getFrames().getHeaders()->get<framing::MessageProperties>();
if (p && p->hasReplyTo()) {
const framing::ReplyTo& rt = p->getReplyTo();
- replyToKey = rt.getRoutingKey();
+ string replyToKey = rt.getRoutingKey();
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
+ if (mapMsg) {
- encodeHeader(outBuffer, 'm', sequence);
- outBuffer.putLong(Manageable::STATUS_FORBIDDEN);
- outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN));
- outLen = MA_BUFFER_SIZE - outBuffer.available();
- outBuffer.reset();
- sendBuffer(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence)
- }
+ Variant::Map outMap;
+ Variant::Map headers;
+
+ headers["method"] = "response";
+ headers["qmf.opcode"] = "_method_response";
+ headers["qmf.agent"] = name_address;
+
+ ((outMap["_error"].asMap())["_values"].asMap())["_status"] = Manageable::STATUS_FORBIDDEN;
+ ((outMap["_error"].asMap())["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_FORBIDDEN);
+
+ string content;
+ MapCodec::encode(outMap, content);
+ sendBuffer(content, cid, headers, v2Direct, replyToKey);
+
+ } else {
+
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ encodeHeader(outBuffer, 'm', sequence);
+ outBuffer.putLong(Manageable::STATUS_FORBIDDEN);
+ outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN));
+ outLen = MA_BUFFER_SIZE - outBuffer.available();
+ outBuffer.reset();
+ sendBuffer(outBuffer, outLen, dExchange, replyToKey);
+ }
+
+ QPID_LOG(trace, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence);
+ }
return false;
}
@@ -1139,9 +1854,6 @@ bool ManagementAgent::authorizeAgentMess
void ManagementAgent::dispatchAgentCommandLH(Message& msg)
{
- Buffer inBuffer(inputBuffer, MA_BUFFER_SIZE);
- uint8_t opcode;
- uint32_t sequence;
string replyToKey;
const framing::MessageProperties* p =
@@ -1153,6 +1865,9 @@ void ManagementAgent::dispatchAgentComma
else
return;
+ Buffer inBuffer(inputBuffer, MA_BUFFER_SIZE);
+ uint8_t opcode;
+
if (msg.encodedSize() > MA_BUFFER_SIZE) {
QPID_LOG(debug, "ManagementAgent::dispatchAgentCommandLH: Message too large: " <<
msg.encodedSize());
@@ -1163,7 +1878,36 @@ void ManagementAgent::dispatchAgentComma
uint32_t bufferLen = inBuffer.getPosition();
inBuffer.reset();
+ const framing::FieldTable *headers = msg.getApplicationHeaders();
+
+ if (headers && headers->getAsString("app_id") == "qmf2")
+ {
+ std::string opcode = headers->getAsString("qmf.opcode");
+ std::string contentType = headers->getAsString("qmf.content");
+ std::string body;
+ std::string cid;
+
+ inBuffer.getRawData(body, bufferLen);
+
+ if (p && p->hasCorrelationId()) {
+ cid = p->getCorrelationId();
+ }
+
+ if (opcode == "_method_request")
+ return handleMethodRequestLH(body, replyToKey, cid, msg.getPublisher());
+ else if (opcode == "_query_request")
+ return handleGetQueryLH(body, replyToKey, cid, contentType);
+ else if (opcode == "_agent_locate_request")
+ return handleLocateRequestLH(body, replyToKey, cid);
+
+ QPID_LOG(warning, "Support for QMF Opcode [" << opcode << "] TBD!!!");
+ return;
+ }
+
+ // old preV2 binary messages
+
while (inBuffer.getPosition() < bufferLen) {
+ uint32_t sequence;
if (!checkHeader(inBuffer, &opcode, &sequence))
return;
@@ -1359,7 +2103,6 @@ ManagementObjectMap::iterator Management
return iter;
}
-
void ManagementAgent::setAllocator(std::auto_ptr<IdAllocator> a)
{
Mutex::ScopedLock lock (userLock);
@@ -1377,42 +2120,64 @@ void ManagementAgent::disallow(const std
disallowed[std::make_pair(className, methodName)] = message;
}
+void ManagementAgent::SchemaClassKey::mapEncode(Variant::Map& _map) const {
+ _map["_cname"] = name;
+ _map["_hash"] = qpid::types::Uuid(hash);
+}
+
+void ManagementAgent::SchemaClassKey::mapDecode(const Variant::Map& _map) {
+ Variant::Map::const_iterator i;
+
+ if ((i = _map.find("_cname")) != _map.end()) {
+ name = i->second.asString();
+ }
+
+ if ((i = _map.find("_hash")) != _map.end()) {
+ const qpid::types::Uuid& uuid = i->second.asUuid();
+ memcpy(hash, uuid.data(), uuid.size());
+ }
+}
+
void ManagementAgent::SchemaClassKey::encode(qpid::framing::Buffer& buffer) const {
- buffer.checkAvailable(encodedSize());
+ buffer.checkAvailable(encodedBufSize());
buffer.putShortString(name);
buffer.putBin128(hash);
}
void ManagementAgent::SchemaClassKey::decode(qpid::framing::Buffer& buffer) {
- buffer.checkAvailable(encodedSize());
+ buffer.checkAvailable(encodedBufSize());
buffer.getShortString(name);
buffer.getBin128(hash);
}
-uint32_t ManagementAgent::SchemaClassKey::encodedSize() const {
+uint32_t ManagementAgent::SchemaClassKey::encodedBufSize() const {
return 1 + name.size() + 16 /* bin128 */;
}
-void ManagementAgent::SchemaClass::encode(qpid::framing::Buffer& outBuf) const {
- outBuf.checkAvailable(encodedSize());
- outBuf.putOctet(kind);
- outBuf.putLong(pendingSequence);
- outBuf.putLongString(data);
-}
-
-void ManagementAgent::SchemaClass::decode(qpid::framing::Buffer& inBuf) {
- inBuf.checkAvailable(encodedSize());
- kind = inBuf.getOctet();
- pendingSequence = inBuf.getLong();
- inBuf.getLongString(data);
+void ManagementAgent::SchemaClass::mapEncode(Variant::Map& _map) const {
+ _map["_type"] = kind;
+ _map["_pending_sequence"] = pendingSequence;
+ _map["_data"] = data;
}
-uint32_t ManagementAgent::SchemaClass::encodedSize() const {
- return sizeof(uint8_t) + sizeof(uint32_t) + sizeof(uint32_t) + data.size();
+void ManagementAgent::SchemaClass::mapDecode(const Variant::Map& _map) {
+ Variant::Map::const_iterator i;
+
+ if ((i = _map.find("_type")) != _map.end()) {
+ kind = i->second;
+ }
+ if ((i = _map.find("_pending_sequence")) != _map.end()) {
+ pendingSequence = i->second;
+ }
+ if ((i = _map.find("_data")) != _map.end()) {
+ data = i->second.asString();
+ }
}
void ManagementAgent::exportSchemas(std::string& out) {
- out.clear();
+ Variant::List list_;
+ Variant::Map map_, kmap, cmap;
+
for (PackageMap::const_iterator i = packages.begin(); i != packages.end(); ++i) {
string name = i->first;
const ClassMap& classes = i ->second;
@@ -1421,90 +2186,143 @@ void ManagementAgent::exportSchemas(std:
const SchemaClass& klass = j->second;
if (klass.writeSchemaCall == 0) { // Ignore built-in schemas.
// Encode name, schema-key, schema-class
- size_t encodedSize = 1+name.size()+key.encodedSize()+klass.encodedSize();
- size_t end = out.size();
- out.resize(end + encodedSize);
- framing::Buffer outBuf(&out[end], encodedSize);
- outBuf.putShortString(name);
- key.encode(outBuf);
- klass.encode(outBuf);
+
+ map_.clear();
+ kmap.clear();
+ cmap.clear();
+
+ key.mapEncode(kmap);
+ klass.mapEncode(cmap);
+
+ map_["_pname"] = name;
+ map_["_key"] = kmap;
+ map_["_class"] = cmap;
+ list_.push_back(map_);
}
}
}
+
+ ListCodec::encode(list_, out);
}
void ManagementAgent::importSchemas(qpid::framing::Buffer& inBuf) {
- while (inBuf.available()) {
+
+ string buf(inBuf.getPointer(), inBuf.available());
+ Variant::List content;
+ ListCodec::decode(buf, content);
+ Variant::List::const_iterator l;
+
+
+ for (l = content.begin(); l != content.end(); l++) {
string package;
SchemaClassKey key;
SchemaClass klass;
- inBuf.getShortString(package);
- key.decode(inBuf);
- klass.decode(inBuf);
- packages[package][key] = klass;
+ Variant::Map map_, kmap, cmap;
+ Variant::Map::const_iterator i;
+
+ map_ = l->asMap();
+
+ if ((i = map_.find("_pname")) != map_.end()) {
+ package = i->second.asString();
+
+ if ((i = map_.find("_key")) != map_.end()) {
+ key.mapDecode(i->second.asMap());
+
+ if ((i = map_.find("_class")) != map_.end()) {
+ klass.mapDecode(i->second.asMap());
+
+ packages[package][key] = klass;
+ }
+ }
+ }
}
}
-void ManagementAgent::RemoteAgent::encode(qpid::framing::Buffer& outBuf) const {
- outBuf.checkAvailable(encodedSize());
- outBuf.putLong(brokerBank);
- outBuf.putLong(agentBank);
- outBuf.putShortString(routingKey);
- // TODO aconway 2010-03-04: we send the v2Key instead of the
- // ObjectId because that has the same meaning on different
- // brokers. ObjectId::encode doesn't currently encode the v2Key,
- // this can be cleaned up when it does.
- outBuf.putMediumString(connectionRef.getV2Key());
- mgmtObject->writeProperties(outBuf);
-}
-
-void ManagementAgent::RemoteAgent::decode(qpid::framing::Buffer& inBuf) {
- brokerBank = inBuf.getLong();
- agentBank = inBuf.getLong();
- inBuf.getShortString(routingKey);
-
- // TODO aconway 2010-03-04: see comment in encode()
- string connectionKey;
- inBuf.getMediumString(connectionKey);
- connectionRef = ObjectId(); // Clear out any existing value.
- connectionRef.setV2Key(connectionKey);
+void ManagementAgent::RemoteAgent::mapEncode(Variant::Map& map_) const {
+ Variant::Map _objId, _values;
+
+ map_["_brokerBank"] = brokerBank;
+ map_["_agentBank"] = agentBank;
+ map_["_routingKey"] = routingKey;
+
+ connectionRef.mapEncode(_objId);
+ map_["_object_id"] = _objId;
+
+ mgmtObject->mapEncodeValues(_values, true, false);
+ map_["_values"] = _values;
+}
+
+void ManagementAgent::RemoteAgent::mapDecode(const Variant::Map& map_) {
+ Variant::Map::const_iterator i;
+
+ if ((i = map_.find("_brokerBank")) != map_.end()) {
+ brokerBank = i->second;
+ }
+
+ if ((i = map_.find("_agentBank")) != map_.end()) {
+ agentBank = i->second;
+ }
+
+ if ((i = map_.find("_routingKey")) != map_.end()) {
+ routingKey = i->second.getString();
+ }
+
+ if ((i = map_.find("_object_id")) != map_.end()) {
+ connectionRef.mapDecode(i->second.asMap());
+ }
mgmtObject = new _qmf::Agent(&agent, this);
- mgmtObject->readProperties(inBuf);
+
+ if ((i = map_.find("_values")) != map_.end()) {
+ mgmtObject->mapDecodeValues(i->second.asMap());
+ }
+
// TODO aconway 2010-03-04: see comment in encode(), readProperties doesn't set v2key.
mgmtObject->set_connectionRef(connectionRef);
}
-uint32_t ManagementAgent::RemoteAgent::encodedSize() const {
- // TODO aconway 2010-03-04: see comment in encode()
- return sizeof(uint32_t) + sizeof(uint32_t) // 2 x Long
- + routingKey.size() + sizeof(uint8_t) // ShortString
- + connectionRef.getV2Key().size() + sizeof(uint16_t) // medium string
- + mgmtObject->writePropertiesSize();
-}
-
void ManagementAgent::exportAgents(std::string& out) {
- out.clear();
+ Variant::List list_;
+ Variant::Map map_, omap, amap;
+
for (RemoteAgentMap::const_iterator i = remoteAgents.begin();
i != remoteAgents.end();
++i)
{
// TODO aconway 2010-03-04: see comment in ManagementAgent::RemoteAgent::encode
RemoteAgent* agent = i->second;
- size_t encodedSize = agent->encodedSize();
- size_t end = out.size();
- out.resize(end + encodedSize);
- framing::Buffer outBuf(&out[end], encodedSize);
- agent->encode(outBuf);
+
+ map_.clear();
+ amap.clear();
+
+ agent->mapEncode(amap);
+ map_["_remote_agent"] = amap;
+ list_.push_back(map_);
}
+
+ ListCodec::encode(list_, out);
}
void ManagementAgent::importAgents(qpid::framing::Buffer& inBuf) {
- while (inBuf.available()) {
+ string buf(inBuf.getPointer(), inBuf.available());
+ Variant::List content;
+ ListCodec::decode(buf, content);
+ Variant::List::const_iterator l;
+
+ for (l = content.begin(); l != content.end(); l++) {
std::auto_ptr<RemoteAgent> agent(new RemoteAgent(*this));
- agent->decode(inBuf);
- addObject(agent->mgmtObject, 0);
- remoteAgents[agent->connectionRef] = agent.release();
+ Variant::Map map_;
+ Variant::Map::const_iterator i;
+
+ map_ = l->asMap();
+
+ if ((i = map_.find("_remote_agent")) != map_.end()) {
+
+ agent->mapDecode(i->second.asMap());
+
+ addObject (agent->mgmtObject, 0, false);
+ remoteAgents[agent->connectionRef] = agent.release();
+ }
}
}
@@ -1519,3 +2337,198 @@ std::string ManagementAgent::debugSnapsh
msg << " new objects: " << newManagementObjects.size();
return msg.str();
}
+
+Variant::Map ManagementAgent::toMap(const FieldTable& from)
+{
+ Variant::Map map;
+
+ for (FieldTable::const_iterator iter = from.begin(); iter != from.end(); iter++) {
+ const string& key(iter->first);
+ const FieldTable::ValuePtr& val(iter->second);
+
+ map[key] = toVariant(val);
+ }
+
+ return map;
+}
+
+Variant::List ManagementAgent::toList(const List& from)
+{
+ Variant::List _list;
+
+ for (List::const_iterator iter = from.begin(); iter != from.end(); iter++) {
+ const List::ValuePtr& val(*iter);
+
+ _list.push_back(toVariant(val));
+ }
+
+ return _list;
+}
+
+qpid::framing::FieldTable ManagementAgent::fromMap(const Variant::Map& from)
+{
+ qpid::framing::FieldTable ft;
+
+ for (Variant::Map::const_iterator iter = from.begin();
+ iter != from.end();
+ iter++) {
+ const string& key(iter->first);
+ const Variant& val(iter->second);
+
+ ft.set(key, toFieldValue(val));
+ }
+
+ return ft;
+}
+
+
+List ManagementAgent::fromList(const Variant::List& from)
+{
+ List fa;
+
+ for (Variant::List::const_iterator iter = from.begin();
+ iter != from.end();
+ iter++) {
+ const Variant& val(*iter);
+
+ fa.push_back(toFieldValue(val));
+ }
+
+ return fa;
+}
+
+
+boost::shared_ptr<FieldValue> ManagementAgent::toFieldValue(const Variant& in)
+{
+
+ switch(in.getType()) {
+
+ case types::VAR_VOID: return boost::shared_ptr<FieldValue>(new VoidValue());
+ case types::VAR_BOOL: return boost::shared_ptr<FieldValue>(new BoolValue(in.asBool()));
+ case types::VAR_UINT8: return boost::shared_ptr<FieldValue>(new Unsigned8Value(in.asUint8()));
+ case types::VAR_UINT16: return boost::shared_ptr<FieldValue>(new Unsigned16Value(in.asUint16()));
+ case types::VAR_UINT32: return boost::shared_ptr<FieldValue>(new Unsigned32Value(in.asUint32()));
+ case types::VAR_UINT64: return boost::shared_ptr<FieldValue>(new Unsigned64Value(in.asUint64()));
+ case types::VAR_INT8: return boost::shared_ptr<FieldValue>(new Integer8Value(in.asInt8()));
+ case types::VAR_INT16: return boost::shared_ptr<FieldValue>(new Integer16Value(in.asInt16()));
+ case types::VAR_INT32: return boost::shared_ptr<FieldValue>(new Integer32Value(in.asInt32()));
+ case types::VAR_INT64: return boost::shared_ptr<FieldValue>(new Integer64Value(in.asInt64()));
+ case types::VAR_FLOAT: return boost::shared_ptr<FieldValue>(new FloatValue(in.asFloat()));
+ case types::VAR_DOUBLE: return boost::shared_ptr<FieldValue>(new DoubleValue(in.asDouble()));
+ case types::VAR_STRING: return boost::shared_ptr<FieldValue>(new Str16Value(in.asString()));
+ case types::VAR_UUID: return boost::shared_ptr<FieldValue>(new UuidValue(in.asUuid().data()));
+ case types::VAR_MAP: return boost::shared_ptr<FieldValue>(new FieldTableValue(ManagementAgent::fromMap(in.asMap())));
+ case types::VAR_LIST: return boost::shared_ptr<FieldValue>(new ListValue(ManagementAgent::fromList(in.asList())));
+ }
+
+ QPID_LOG(error, "Unknown Variant type - not converted: [" << in.getType() << "]");
+ return boost::shared_ptr<FieldValue>(new VoidValue());
+}
+
+// stolen from qpid/client/amqp0_10/Codecs.cpp - TODO: make Codecs public, and remove this dup.
+Variant ManagementAgent::toVariant(const boost::shared_ptr<FieldValue>& in)
+{
+ const std::string iso885915("iso-8859-15");
+ const std::string utf8("utf8");
+ const std::string utf16("utf16");
+ //const std::string binary("binary");
+ const std::string amqp0_10_binary("amqp0-10:binary");
+ //const std::string amqp0_10_bit("amqp0-10:bit");
+ const std::string amqp0_10_datetime("amqp0-10:datetime");
+ const std::string amqp0_10_struct("amqp0-10:struct");
+ Variant out;
+
+ //based on AMQP 0-10 typecode, pick most appropriate variant type
+ switch (in->getType()) {
+ //Fixed Width types:
+ case 0x00: //bin8
+ case 0x01: out.setEncoding(amqp0_10_binary); // int8
+ case 0x02: out = in->getIntegerValue<int8_t, 1>(); break; //uint8
+ case 0x03: out = in->getIntegerValue<uint8_t, 1>(); break; //
+ // case 0x04: break; //TODO: iso-8859-15 char // char
+ case 0x08: out = static_cast<bool>(in->getIntegerValue<uint8_t, 1>()); break; // bool int8
+
+ case 0x10: out.setEncoding(amqp0_10_binary); // bin16
+ case 0x11: out = in->getIntegerValue<int16_t, 2>(); break; // int16
+ case 0x12: out = in->getIntegerValue<uint16_t, 2>(); break; //uint16
+
+ case 0x20: out.setEncoding(amqp0_10_binary); // bin32
+ case 0x21: out = in->getIntegerValue<int32_t, 4>(); break; // int32
+ case 0x22: out = in->getIntegerValue<uint32_t, 4>(); break; // uint32
+
+ case 0x23: out = in->get<float>(); break; // float(32)
+
+ // case 0x27: break; //TODO: utf-32 char
+
+ case 0x30: out.setEncoding(amqp0_10_binary); // bin64
+ case 0x31: out = in->getIntegerValue<int64_t, 8>(); break; //int64
+
+ case 0x38: out.setEncoding(amqp0_10_datetime); //treat datetime as uint64_t, but set encoding
+ case 0x32: out = in->getIntegerValue<uint64_t, 8>(); break; //uint64
+ case 0x33: out = in->get<double>(); break; // double
+
+ case 0x48: // uuid
+ {
+ unsigned char data[16];
+ in->getFixedWidthValue<16>(data);
+ out = qpid::types::Uuid(data);
+ } break;
+
+ //TODO: figure out whether and how to map values with codes 0x40-0xd8
+
+ case 0xf0: break;//void, which is the default value for Variant
+ // case 0xf1: out.setEncoding(amqp0_10_bit); break;//treat 'bit' as void, which is the default value for Variant
+
+ //Variable Width types:
+ //strings:
+ case 0x80: // str8
+ case 0x90: // str16
+ case 0xa0: // str32
+ out = in->get<std::string>();
+ out.setEncoding(amqp0_10_binary);
+ break;
+
+ case 0x84: // str8
+ case 0x94: // str16
+ out = in->get<std::string>();
+ out.setEncoding(iso885915);
+ break;
+
+ case 0x85: // str8
+ case 0x95: // str16
+ out = in->get<std::string>();
+ out.setEncoding(utf8);
+ break;
+
+ case 0x86: // str8
+ case 0x96: // str16
+ out = in->get<std::string>();
+ out.setEncoding(utf16);
+ break;
+
+ case 0xab: // str32
+ out = in->get<std::string>();
+ out.setEncoding(amqp0_10_struct);
+ break;
+
+ case 0xa8: // map
+ out = ManagementAgent::toMap(in->get<FieldTable>());
+ break;
+
+ case 0xa9: // list of variant types
+ out = ManagementAgent::toList(in->get<List>());
+ break;
+ //case 0xaa: //convert amqp0-10 array (uniform type) into variant list
+ // out = Variant::List();
+ // translate<Array>(in, out.asList(), &toVariant);
+ // break;
+
+ default:
+ //error?
+ QPID_LOG(error, "Unknown FieldValue type - not converted: [" << (unsigned int)(in->getType()) << "]");
+ break;
+ }
+
+ return out;
+}
+
Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h?rev=929716&r1=929715&r2=929716&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h Wed Mar 31 21:13:12 2010
@@ -32,7 +32,9 @@
#include "qpid/management/ManagementEvent.h"
#include "qpid/management/Manageable.h"
#include "qmf/org/apache/qpid/broker/Agent.h"
+#include "qpid/types/Variant.h"
#include <qpid/framing/AMQFrame.h>
+#include <qpid/framing/FieldValue.h>
#include <memory>
#include <string>
#include <map>
@@ -62,7 +64,7 @@ public:
} severity_t;
- ManagementAgent ();
+ ManagementAgent (const bool qmfV1, const bool qmfV2);
virtual ~ManagementAgent ();
/** Called before plugins are initialized */
@@ -74,6 +76,9 @@ public:
/** Called by cluster to suppress management output during update. */
void suppress(bool s) { suppressed = s; }
+ void setName(const std::string& vendor,
+ const std::string& product,
+ const std::string& instance="");
void setInterval(uint16_t _interval) { interval = _interval; }
void setExchange(qpid::broker::Exchange::shared_ptr mgmtExchange,
qpid::broker::Exchange::shared_ptr directExchange);
@@ -91,6 +96,9 @@ public:
ManagementObject::writeSchemaCall_t schemaCall);
QPID_BROKER_EXTERN ObjectId addObject (ManagementObject* object,
uint64_t persistId = 0);
+ QPID_BROKER_EXTERN ObjectId addObject (ManagementObject* object,
+ const std::string& key,
+ bool persistent = true);
QPID_BROKER_EXTERN void raiseEvent(const ManagementEvent& event,
severity_t severity = SEV_DEFAULT);
QPID_BROKER_EXTERN void clientAdded (const std::string& routingKey);
@@ -99,7 +107,8 @@ public:
bool dispatchCommand (qpid::broker::Deliverable& msg,
const std::string& routingKey,
- const framing::FieldTable* args);
+ const framing::FieldTable* args,
+ const bool topic);
const framing::Uuid& getUuid() const { return uuid; }
@@ -128,6 +137,15 @@ public:
uint16_t getBootSequence(void) { return bootSequence; }
void setBootSequence(uint16_t b) { bootSequence = b; }
+ // TODO: remove these when Variant API moved into common library.
+ static types::Variant::Map toMap(const framing::FieldTable& from);
+ static framing::FieldTable fromMap(const types::Variant::Map& from);
+ static types::Variant::List toList(const framing::List& from);
+ static framing::List fromList(const types::Variant::List& from);
+ static boost::shared_ptr<framing::FieldValue> toFieldValue(const types::Variant& in);
+ static types::Variant toVariant(const boost::shared_ptr<framing::FieldValue>& val);
+
+
private:
struct Periodic : public qpid::sys::TimerTask
{
@@ -153,9 +171,8 @@ private:
ManagementObject* GetManagementObject (void) const { return mgmtObject; }
virtual ~RemoteAgent ();
- void encode(framing::Buffer& buffer) const;
- void decode(framing::Buffer& buffer);
- uint32_t encodedSize() const;
+ void mapEncode(qpid::types::Variant::Map& _map) const;
+ void mapDecode(const qpid::types::Variant::Map& _map);
};
// TODO: Eventually replace string with entire reply-to structure. reply-to
@@ -175,9 +192,11 @@ private:
std::string name;
uint8_t hash[16];
+ void mapEncode(qpid::types::Variant::Map& _map) const;
+ void mapDecode(const qpid::types::Variant::Map& _map);
void encode(framing::Buffer& buffer) const;
void decode(framing::Buffer& buffer);
- uint32_t encodedSize() const;
+ uint32_t encodedBufSize() const;
};
struct SchemaClassKeyComp
@@ -209,9 +228,8 @@ private:
bool hasSchema () { return (writeSchemaCall != 0) || !data.empty(); }
void appendSchema (framing::Buffer& buf);
- void encode(framing::Buffer& buffer) const;
- void decode(framing::Buffer& buffer);
- uint32_t encodedSize() const;
+ void mapEncode(qpid::types::Variant::Map& _map) const;
+ void mapDecode(const qpid::types::Variant::Map& _map);
};
typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap;
@@ -264,6 +282,14 @@ private:
typedef std::map<MethodName, std::string> DisallowedMethods;
DisallowedMethods disallowed;
+ // Agent name and address
+ qpid::types::Variant::Map attrMap;
+ std::string name_address;
+
+ // supported management protocol
+ bool qmf1Support;
+ bool qmf2Support;
+
# define MA_BUFFER_SIZE 65536
char inputBuffer[MA_BUFFER_SIZE];
@@ -279,6 +305,11 @@ private:
uint32_t length,
qpid::broker::Exchange::shared_ptr exchange,
std::string routingKey);
+ void sendBuffer(const std::string& data,
+ const std::string& cid,
+ const qpid::types::Variant::Map& headers,
+ qpid::broker::Exchange::shared_ptr exchange,
+ const std::string& routingKey);
void moveNewObjectsLH();
bool authorizeAgentMessageLH(qpid::broker::Message& msg);
@@ -311,6 +342,10 @@ private:
void handleAttachRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken);
void handleGetQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
void handleMethodRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken);
+ void handleGetQueryLH (const std::string& body, std::string replyToKey, const std::string& cid, const std::string& contentType);
+ void handleMethodRequestLH (const std::string& body, std::string replyToKey, const std::string& cid, const qpid::broker::ConnectionToken* connToken);
+ void handleLocateRequestLH (const std::string& body, const std::string &replyToKey, const std::string& cid);
+
size_t validateSchema(framing::Buffer&, uint8_t kind);
size_t validateTableSchema(framing::Buffer&);
Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp?rev=929716&r1=929715&r2=929716&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp Wed Mar 31 21:13:12 2010
@@ -29,13 +29,16 @@ using namespace qpid::framing;
using namespace qpid::sys;
ManagementDirectExchange::ManagementDirectExchange(const string& _name, Manageable* _parent, Broker* b) :
- Exchange (_name, _parent, b), DirectExchange(_name, _parent, b) {}
+ Exchange (_name, _parent, b),
+ DirectExchange(_name, _parent, b),
+ managementAgent(0) {}
ManagementDirectExchange::ManagementDirectExchange(const std::string& _name,
bool _durable,
const FieldTable& _args,
Manageable* _parent, Broker* b) :
Exchange (_name, _durable, _args, _parent, b),
- DirectExchange(_name, _durable, _args, _parent, b) {}
+ DirectExchange(_name, _durable, _args, _parent, b),
+ managementAgent(0) {}
void ManagementDirectExchange::route(Deliverable& msg,
const string& routingKey,
@@ -43,7 +46,8 @@ void ManagementDirectExchange::route(Del
{
bool routeIt = true;
- // TODO: Intercept messages directed to the embedded agent and send them to the management agent.
+ if (managementAgent)
+ routeIt = managementAgent->dispatchCommand(msg, routingKey, args, false /*direct*/);
if (routeIt)
DirectExchange::route(msg, routingKey, args);
Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.cpp?rev=929716&r1=929715&r2=929716&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.cpp Wed Mar 31 21:13:12 2010
@@ -22,7 +22,10 @@
#include "qpid/management/Manageable.h"
#include "qpid/management/ManagementObject.h"
#include "qpid/framing/FieldTable.h"
+#include "qpid/framing/Buffer.h"
#include "qpid/sys/Thread.h"
+#include "qpid/log/Statement.h"
+#include <boost/lexical_cast.hpp>
#include <stdlib.h>
@@ -36,26 +39,37 @@ void AgentAttachment::setBanks(uint32_t
((uint64_t) (bank & 0x0fffffff));
}
-ObjectId::ObjectId(uint8_t flags, uint16_t seq, uint32_t broker, uint32_t bank, uint64_t object)
- : agent(0)
+// Deprecated
+ObjectId::ObjectId(uint8_t flags, uint16_t seq, uint32_t broker, uint64_t object)
+ : agent(0), agentEpoch(seq)
{
first =
((uint64_t) (flags & 0x0f)) << 60 |
((uint64_t) (seq & 0x0fff)) << 48 |
- ((uint64_t) (broker & 0x000fffff)) << 28 |
- ((uint64_t) (bank & 0x0fffffff));
+ ((uint64_t) (broker & 0x000fffff)) << 28;
second = object;
}
-ObjectId::ObjectId(AgentAttachment* _agent, uint8_t flags, uint16_t seq, uint64_t object)
- : agent(_agent)
+
+ObjectId::ObjectId(uint8_t flags, uint16_t seq, uint32_t broker)
+ : agent(0), second(0), agentEpoch(seq)
{
first =
+ ((uint64_t) (flags & 0x0f)) << 60 |
+ ((uint64_t) (seq & 0x0fff)) << 48 |
+ ((uint64_t) (broker & 0x000fffff)) << 28;
+}
+
+ObjectId::ObjectId(AgentAttachment* _agent, uint8_t flags, uint16_t seq)
+ : agent(_agent), second(0), agentEpoch(seq)
+{
+
+ first =
((uint64_t) (flags & 0x0f)) << 60 |
((uint64_t) (seq & 0x0fff)) << 48;
- second = object;
}
+
ObjectId::ObjectId(std::istream& in) : agent(0)
{
std::string text;
@@ -75,6 +89,10 @@ void ObjectId::fromString(const std::str
# define atoll(X) _atoi64(X)
#endif
+ // format:
+ // V1: <flags>-<sequence>-<broker-bank>-<agent-bank>-<uint64-app-id>
+ // V2: Not used
+
std::string copy(text.c_str());
char* cText;
char* field[FIELDS];
@@ -99,10 +117,13 @@ void ObjectId::fromString(const std::str
if (idx != FIELDS)
throw Exception("Invalid ObjectId format");
+ agentEpoch = atoll(field[1]);
+
first = (atoll(field[0]) << 60) +
(atoll(field[1]) << 48) +
- (atoll(field[2]) << 28) +
- atoll(field[3]);
+ (atoll(field[2]) << 28);
+
+ agentName = std::string(field[3]);
second = atoll(field[4]);
}
@@ -123,21 +144,40 @@ bool ObjectId::equalV1(const ObjectId &o
return first == otherFirst && second == other.second;
}
-void ObjectId::encode(framing::Buffer& buffer) const
+// encode as V1-format binary
+void ObjectId::encode(std::string& buffer) const
{
+ const uint32_t len = 16;
+ char _data[len];
+ qpid::framing::Buffer body(_data, len);
+
if (agent == 0)
- buffer.putLongLong(first);
+ body.putLongLong(first);
else
- buffer.putLongLong(first | agent->first);
- buffer.putLongLong(second);
+ body.putLongLong(first | agent->first);
+ body.putLongLong(second);
+
+ body.reset();
+ body.getRawData(buffer, len);
}
-void ObjectId::decode(framing::Buffer& buffer)
+// decode as V1-format binary
+void ObjectId::decode(const std::string& buffer)
{
- first = buffer.getLongLong();
- second = buffer.getLongLong();
+ const uint32_t len = 16;
+ char _data[len];
+ qpid::framing::Buffer body(_data, len);
+
+ body.checkAvailable(buffer.length());
+ body.putRawData(buffer);
+ body.reset();
+ first = body.getLongLong();
+ second = body.getLongLong();
+ v2Key = boost::lexical_cast<std::string>(second);
}
+// generate the V2 key from the index fields defined
+// in the schema.
void ObjectId::setV2Key(const ManagementObject& object)
{
std::stringstream oname;
@@ -145,6 +185,42 @@ void ObjectId::setV2Key(const Management
v2Key = oname.str();
}
+// encode as V2-format map
+void ObjectId::mapEncode(types::Variant::Map& map) const
+{
+ map["_object_name"] = v2Key;
+ if (!agentName.empty())
+ map["_agent_name"] = agentName;
+ if (agentEpoch)
+ map["_agent_epoch"] = agentEpoch;
+}
+
+// decode as v2-format map
+void ObjectId::mapDecode(const types::Variant::Map& map)
+{
+ types::Variant::Map::const_iterator i;
+
+ if ((i = map.find("_object_name")) != map.end())
+ v2Key = i->second.asString();
+ else
+ throw Exception("Required _object_name field missing.");
+
+ if ((i = map.find("_agent_name")) != map.end())
+ agentName = i->second.asString();
+
+ if ((i = map.find("_agent_epoch")) != map.end())
+ agentEpoch = i->second.asInt64();
+}
+
+
+ObjectId::operator types::Variant::Map() const
+{
+ types::Variant::Map m;
+ mapEncode(m);
+ return m;
+}
+
+
namespace qpid {
namespace management {
@@ -158,7 +234,7 @@ std::ostream& operator<<(std::ostream& o
out << ((virtFirst & 0xF000000000000000LL) >> 60) <<
"-" << ((virtFirst & 0x0FFF000000000000LL) >> 48) <<
"-" << ((virtFirst & 0x0000FFFFF0000000LL) >> 28) <<
- "-" << (virtFirst & 0x000000000FFFFFFFLL) <<
+ "-" << i.agentName <<
"-" << i.second;
return out;
}
@@ -168,43 +244,88 @@ std::ostream& operator<<(std::ostream& o
int ManagementObject::maxThreads = 1;
int ManagementObject::nextThreadIndex = 0;
-void ManagementObject::writeTimestamps (framing::Buffer& buf) const
+void ManagementObject::writeTimestamps (std::string& buf) const
{
- buf.putShortString (getPackageName ());
- buf.putShortString (getClassName ());
- buf.putBin128 (getMd5Sum ());
- buf.putLongLong (updateTime);
- buf.putLongLong (createTime);
- buf.putLongLong (destroyTime);
- objectId.encode(buf);
+ char _data[4000];
+ qpid::framing::Buffer body(_data, 4000);
+
+ body.putShortString (getPackageName ());
+ body.putShortString (getClassName ());
+ body.putBin128 (getMd5Sum ());
+ body.putLongLong (updateTime);
+ body.putLongLong (createTime);
+ body.putLongLong (destroyTime);
+
+ uint32_t len = body.getPosition();
+ body.reset();
+ body.getRawData(buf, len);
+
+ std::string oid;
+ objectId.encode(oid);
+ buf += oid;
}
-void ManagementObject::readTimestamps (framing::Buffer& buf)
+void ManagementObject::readTimestamps (const std::string& buf)
{
+ char _data[4000];
+ qpid::framing::Buffer body(_data, 4000);
std::string unused;
uint8_t unusedUuid[16];
- ObjectId unusedObjectId;
- buf.getShortString(unused);
- buf.getShortString(unused);
- buf.getBin128(unusedUuid);
- updateTime = buf.getLongLong();
- createTime = buf.getLongLong();
- destroyTime = buf.getLongLong();
- unusedObjectId.decode(buf);
+ body.checkAvailable(buf.length());
+ body.putRawData(buf);
+ body.reset();
+
+ body.getShortString(unused);
+ body.getShortString(unused);
+ body.getBin128(unusedUuid);
+ updateTime = body.getLongLong();
+ createTime = body.getLongLong();
+ destroyTime = body.getLongLong();
}
uint32_t ManagementObject::writeTimestampsSize() const
{
return 1 + getPackageName().length() + // str8
- 1 + getClassName().length() + // str8
- 16 + // bin128
- 8 + // uint64
- 8 + // uint64
- 8 + // uint64
- objectId.encodedSize(); // objectId
+ 1 + getClassName().length() + // str8
+ 16 + // bin128
+ 8 + // uint64
+ 8 + // uint64
+ 8 + // uint64
+ objectId.encodedSize(); // objectId
}
+
+void ManagementObject::writeTimestamps (types::Variant::Map& map) const
+{
+ types::Variant::Map oid, sid;
+
+ sid["_package_name"] = getPackageName();
+ sid["_class_name"] = getClassName();
+ sid["_hash"] = qpid::types::Uuid(getMd5Sum());
+ map["_schema_id"] = sid;
+
+ objectId.mapEncode(oid);
+ map["_object_id"] = oid;
+
+ map["_update_ts"] = updateTime;
+ map["_create_ts"] = createTime;
+ map["_delete_ts"] = destroyTime;
+}
+
+void ManagementObject::readTimestamps (const types::Variant::Map& map)
+{
+ types::Variant::Map::const_iterator i;
+
+ if ((i = map.find("_update_ts")) != map.end())
+ updateTime = i->second.asUint64();
+ if ((i = map.find("_create_ts")) != map.end())
+ createTime = i->second.asUint64();
+ if ((i = map.find("_delete_ts")) != map.end())
+ destroyTime = i->second.asUint64();
+}
+
+
void ManagementObject::setReference(ObjectId) {}
int ManagementObject::getThreadIndex() {
@@ -217,3 +338,26 @@ int ManagementObject::getThreadIndex() {
}
return thisIndex;
}
+
+
+void ManagementObject::mapEncode(types::Variant::Map& map,
+ bool includeProperties,
+ bool includeStatistics)
+{
+ types::Variant::Map values;
+
+ writeTimestamps(map);
+
+ mapEncodeValues(values, includeProperties, includeStatistics);
+ map["_values"] = values;
+}
+
+void ManagementObject::mapDecode(const types::Variant::Map& map)
+{
+ types::Variant::Map::const_iterator i;
+
+ readTimestamps(map);
+
+ if ((i = map.find("_values")) != map.end())
+ mapDecodeValues(i->second.asMap());
+}
Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp?rev=929716&r1=929715&r2=929716&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp Wed Mar 31 21:13:12 2010
@@ -28,13 +28,16 @@ using namespace qpid::framing;
using namespace qpid::sys;
ManagementTopicExchange::ManagementTopicExchange(const string& _name, Manageable* _parent, Broker* b) :
- Exchange (_name, _parent, b), TopicExchange(_name, _parent, b) {}
+ Exchange (_name, _parent, b),
+ TopicExchange(_name, _parent, b),
+ managementAgent(0) {}
ManagementTopicExchange::ManagementTopicExchange(const std::string& _name,
bool _durable,
const FieldTable& _args,
Manageable* _parent, Broker* b) :
Exchange (_name, _durable, _args, _parent, b),
- TopicExchange(_name, _durable, _args, _parent, b) {}
+ TopicExchange(_name, _durable, _args, _parent, b),
+ managementAgent(0) {}
void ManagementTopicExchange::route(Deliverable& msg,
const string& routingKey,
@@ -43,12 +46,8 @@ void ManagementTopicExchange::route(Deli
bool routeIt = true;
// Intercept management agent commands
- if (qmfVersion == 1) {
- if ((routingKey.length() > 6 &&
- routingKey.substr(0, 6).compare("agent.") == 0) ||
- (routingKey == "broker"))
- routeIt = managementAgent->dispatchCommand(msg, routingKey, args);
- }
+ if (managementAgent)
+ routeIt = managementAgent->dispatchCommand(msg, routingKey, args, true /* topic */);
if (routeIt)
TopicExchange::route(msg, routingKey, args);
Modified: qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=929716&r1=929715&r2=929716&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/tests/Makefile.am Wed Mar 31 21:13:12 2010
@@ -267,15 +267,15 @@ txjob_LDADD=$(lib_client)
check_PROGRAMS+=PollerTest
PollerTest_SOURCES=PollerTest.cpp
-PollerTest_LDADD=$(lib_common) $(SOCKLIBS)
+PollerTest_LDADD=$(lib_common) $(lib_client) $(SOCKLIBS)
check_PROGRAMS+=DispatcherTest
DispatcherTest_SOURCES=DispatcherTest.cpp
-DispatcherTest_LDADD=$(lib_common) $(SOCKLIBS)
+DispatcherTest_LDADD=$(lib_common) $(lib_client) $(SOCKLIBS)
check_PROGRAMS+=datagen
datagen_SOURCES=datagen.cpp
-datagen_LDADD=$(lib_common)
+datagen_LDADD=$(lib_common) $(lib_client)
check_PROGRAMS+=qrsh_server
qrsh_server_SOURCES=qrsh_server.cpp
Modified: qpid/trunk/qpid/cpp/src/tests/ManagementTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ManagementTest.cpp?rev=929716&r1=929715&r2=929716&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ManagementTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/ManagementTest.cpp Wed Mar 31 21:13:12 2010
@@ -56,32 +56,34 @@ QPID_AUTO_TEST_CASE(testObjectIdSerializ
}
QPID_AUTO_TEST_CASE(testObjectIdEncode) {
- char buffer[100];
- Buffer msgBuf(buffer, 100);
- msgBuf.putLongLong(0x1002000030000004LL);
- msgBuf.putLongLong(0x0000000000000005LL);
- msgBuf.reset();
+ qpid::types::Variant::Map oidMap;
- ObjectId oid(msgBuf);
+ ObjectId oid(1, 2, 3, 9999);
+ oid.setV2Key("testkey");
+ oid.setAgentName("myAgent");
std::stringstream out1;
out1 << oid;
- BOOST_CHECK_EQUAL(out1.str(), "1-2-3-4-5");
+ BOOST_CHECK_EQUAL(out1.str(), "1-2-3-myAgent-9999");
}
QPID_AUTO_TEST_CASE(testObjectIdAttach) {
AgentAttachment agent;
- ObjectId oid(&agent, 10, 20, 50);
+ ObjectId oid(&agent, 10, 20);
+ oid.setV2Key("GabbaGabbaHey");
+ oid.setAgentName("MrSmith");
std::stringstream out1;
out1 << oid;
- BOOST_CHECK_EQUAL(out1.str(), "10-20-0-0-50");
+
+ BOOST_CHECK_EQUAL(out1.str(), "10-20-0-MrSmith-0");
agent.setBanks(30, 40);
std::stringstream out2;
out2 << oid;
- BOOST_CHECK_EQUAL(out2.str(), "10-20-30-40-50");
+
+ BOOST_CHECK_EQUAL(out2.str(), "10-20-30-MrSmith-0");
}
QPID_AUTO_TEST_CASE(testConsoleObjectId) {
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org