You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2008/09/03 20:01:45 UTC
svn commit: r691700 [2/2] - in /incubator/qpid/trunk/qpid:
cpp/examples/qmf-agent/ cpp/managementgen/ cpp/managementgen/templates/
cpp/src/qpid/agent/ cpp/src/qpid/broker/ cpp/src/qpid/framing/
cpp/src/qpid/management/ python/qpid/ specs/
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp?rev=691700&r1=691699&r2=691700&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp Wed Sep 3 11:01:44 2008
@@ -27,6 +27,7 @@
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/sys/Time.h"
#include "qpid/broker/ConnectionState.h"
+#include "qpid/broker/AclModule.h"
#include <list>
#include <iostream>
#include <fstream>
@@ -80,8 +81,8 @@
ManagementBroker::ManagementBroker () :
threadPoolSize(1), interval(10), broker(0)
{
- localBank = 5;
nextObjectId = 1;
+ brokerBank = 1;
bootSequence = 1;
nextRemoteBank = 10;
nextRequestSequence = 1;
@@ -112,7 +113,7 @@
}
}
-void ManagementBroker::configure(string _dataDir, uint16_t _interval, Manageable* _broker, int _threads)
+void ManagementBroker::configure(string _dataDir, uint16_t _interval, broker::Broker* _broker, int _threads)
{
dataDir = _dataDir;
interval = _interval;
@@ -140,7 +141,10 @@
inFile.close();
QPID_LOG (debug, "ManagementBroker restored broker ID: " << uuid);
+ // if sequence goes beyond a 12-bit field, skip zero and wrap to 1.
bootSequence++;
+ if (bootSequence & 0xF000)
+ bootSequence = 1;
writeData();
}
else
@@ -183,29 +187,26 @@
AddClass(pIter, className, md5Sum, schemaCall);
}
-uint64_t ManagementBroker::addObject (ManagementObject* object,
- uint32_t persistId,
- uint32_t persistBank)
+ObjectId ManagementBroker::addObject (ManagementObject* object,
+ uint64_t persistId)
{
Mutex::ScopedLock lock (addLock);
- uint64_t objectId;
+ uint16_t sequence;
+ uint64_t objectNum;
- if (persistId == 0)
- {
- objectId = ((uint64_t) bootSequence) << 48 |
- ((uint64_t) localBank) << 24 | nextObjectId++;
- if ((nextObjectId & 0xFF000000) != 0)
- {
- nextObjectId = 1;
- localBank++;
- }
+ if (persistId == 0) {
+ sequence = bootSequence;
+ objectNum = nextObjectId++;
+ } else {
+ sequence = 0;
+ objectNum = persistId;
}
- else
- objectId = ((uint64_t) persistBank) << 24 | persistId;
- object->setObjectId (objectId);
- newManagementObjects[objectId] = object;
- return objectId;
+ ObjectId objId(0 /*flags*/ , sequence, brokerBank, 0, objectNum);
+
+ object->setObjectId(objId);
+ newManagementObjects[objId] = object;
+ return objId;
}
ManagementBroker::Periodic::Periodic (ManagementBroker& _broker, uint32_t _seconds)
@@ -308,7 +309,7 @@
char msgChars[BUFSIZE];
uint32_t contentSize;
string routingKey;
- std::list<uint64_t> deleteList;
+ std::list<ObjectId> deleteList;
{
Buffer msgBuffer(msgChars, BUFSIZE);
@@ -373,7 +374,7 @@
}
// Delete flagged objects
- for (std::list<uint64_t>::reverse_iterator iter = deleteList.rbegin ();
+ for (std::list<ObjectId>::reverse_iterator iter = deleteList.rbegin ();
iter != deleteList.rend ();
iter++)
managementObjects.erase (*iter);
@@ -408,48 +409,72 @@
// Parse the routing key. This management broker should act as though it
// is bound to the exchange to match the following keys:
//
- // agent.<X>.#
- // broker.#
- //
- // where <X> is any non-negative decimal integer less than the lowest remote
- // object-id bank.
+ // agent.0.#
+ // broker
if (routingKey == "broker") {
- dispatchAgentCommandLH (msg);
+ dispatchAgentCommandLH(msg);
+ return false;
+ }
+
+ else if (routingKey.compare(0, 7, "agent.0") == 0) {
+ dispatchAgentCommandLH(msg);
return false;
}
else if (routingKey.compare(0, 6, "agent.") == 0) {
- std::string::size_type delim = routingKey.find('.', 6);
- if (delim == string::npos)
- delim = routingKey.length();
- string bank = routingKey.substr(6, delim - 6);
- if ((uint32_t) atoi(bank.c_str()) <= localBank) {
- dispatchAgentCommandLH (msg);
- return false;
- }
+ return authorizeAgentMessageLH(msg);
}
return true;
}
-void ManagementBroker::handleMethodRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence)
+void ManagementBroker::handleMethodRequestLH (Buffer& inBuffer, string replyToKey,
+ uint32_t sequence, const ConnectionToken* connToken)
{
string methodName;
+ string packageName;
+ string className;
+ uint8_t hash[16];
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
+ AclModule* acl = broker->getAcl();
- uint64_t objId = inBuffer.getLongLong();
+ ObjectId objId(inBuffer);
+ inBuffer.getShortString(packageName);
+ inBuffer.getShortString(className);
+ inBuffer.getBin128(hash);
inBuffer.getShortString(methodName);
-
EncodeHeader(outBuffer, 'm', sequence);
+ if (acl != 0) {
+ string userId = ((const broker::ConnectionState*) connToken)->getUserId();
+ std::map<acl::Property, string> params;
+ params[acl::SCHEMAPACKAGE] = packageName;
+ params[acl::SCHEMACLASS] = className;
+
+ if (!acl->authorise(userId, acl::ACCESS, acl::METHOD, methodName, ¶ms)) {
+ outBuffer.putLong(Manageable::STATUS_FORBIDDEN);
+ outBuffer.putShortString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN));
+ outLen = MA_BUFFER_SIZE - outBuffer.available();
+ outBuffer.reset();
+ SendBuffer(outBuffer, outLen, dExchange, replyToKey);
+ return;
+ }
+ }
+
ManagementObjectMap::iterator iter = managementObjects.find(objId);
if (iter == managementObjects.end() || iter->second->isDeleted()) {
outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT);
outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT));
} else {
- iter->second->doMethod(methodName, inBuffer, outBuffer);
+ if ((iter->second->getPackageName() != packageName) ||
+ (iter->second->getClassName() != className)) {
+ outBuffer.putLong (Manageable::STATUS_INVALID_PARAMETER);
+ outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_INVALID_PARAMETER));
+ }
+ else
+ iter->second->doMethod(methodName, inBuffer, outBuffer);
}
outLen = MA_BUFFER_SIZE - outBuffer.available();
@@ -497,34 +522,33 @@
FindOrAddPackageLH(packageName);
}
-void ManagementBroker::handleClassQueryLH (Buffer& inBuffer, string replyToKey, uint32_t sequence)
+void ManagementBroker::handleClassQueryLH(Buffer& inBuffer, string replyToKey, uint32_t sequence)
{
std::string packageName;
- inBuffer.getShortString (packageName);
- PackageMap::iterator pIter = packages.find (packageName);
- if (pIter != packages.end ())
+ inBuffer.getShortString(packageName);
+ PackageMap::iterator pIter = packages.find(packageName);
+ if (pIter != packages.end())
{
ClassMap cMap = pIter->second;
- for (ClassMap::iterator cIter = cMap.begin ();
- cIter != cMap.end ();
+ for (ClassMap::iterator cIter = cMap.begin();
+ cIter != cMap.end();
cIter++)
{
- if (cIter->second->hasSchema ())
+ if (cIter->second.hasSchema())
{
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- EncodeHeader (outBuffer, 'q', sequence);
- EncodeClassIndication (outBuffer, pIter, cIter);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+ EncodeHeader(outBuffer, 'q', sequence);
+ EncodeClassIndication(outBuffer, pIter, cIter);
+ outLen = MA_BUFFER_SIZE - outBuffer.available();
+ outBuffer.reset();
+ SendBuffer(outBuffer, outLen, dExchange, replyToKey);
}
}
}
-
- sendCommandComplete (replyToKey, sequence);
+ sendCommandComplete(replyToKey, sequence);
}
void ManagementBroker::handleClassIndLH (Buffer& inBuffer, string replyToKey, uint32_t)
@@ -551,9 +575,7 @@
outBuffer.reset ();
SendBuffer (outBuffer, outLen, dExchange, replyToKey);
- SchemaClass* newSchema = new SchemaClass;
- newSchema->pendingSequence = sequence;
- pIter->second[key] = newSchema;
+ pIter->second.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(sequence)));
}
}
@@ -569,7 +591,7 @@
buf.putRawData(buffer, bufferLen);
}
-void ManagementBroker::handleSchemaRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence)
+void ManagementBroker::handleSchemaRequestLH(Buffer& inBuffer, string replyToKey, uint32_t sequence)
{
string packageName;
SchemaClassKey key;
@@ -578,33 +600,33 @@
inBuffer.getShortString (key.name);
inBuffer.getBin128 (key.hash);
- PackageMap::iterator pIter = packages.find (packageName);
+ PackageMap::iterator pIter = packages.find(packageName);
if (pIter != packages.end()) {
ClassMap cMap = pIter->second;
- ClassMap::iterator cIter = cMap.find (key);
+ ClassMap::iterator cIter = cMap.find(key);
if (cIter != cMap.end()) {
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- SchemaClass* classInfo = cIter->second;
+ SchemaClass& classInfo = cIter->second;
- if (classInfo->hasSchema()) {
+ if (classInfo.hasSchema()) {
EncodeHeader(outBuffer, 's', sequence);
- classInfo->appendSchema (outBuffer);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
+ classInfo.appendSchema(outBuffer);
+ outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
- SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+ SendBuffer(outBuffer, outLen, dExchange, replyToKey);
}
else
- sendCommandComplete (replyToKey, sequence, 1, "Schema not available");
+ sendCommandComplete(replyToKey, sequence, 1, "Schema not available");
}
else
- sendCommandComplete (replyToKey, sequence, 1, "Class key not found");
+ sendCommandComplete(replyToKey, sequence, 1, "Class key not found");
}
else
- sendCommandComplete (replyToKey, sequence, 1, "Package not found");
+ sendCommandComplete(replyToKey, sequence, 1, "Package not found");
}
-void ManagementBroker::handleSchemaResponseLH (Buffer& inBuffer, string /*replyToKey*/, uint32_t sequence)
+void ManagementBroker::handleSchemaResponseLH(Buffer& inBuffer, string /*replyToKey*/, uint32_t sequence)
{
string packageName;
SchemaClassKey key;
@@ -619,24 +641,26 @@
if (pIter != packages.end()) {
ClassMap cMap = pIter->second;
ClassMap::iterator cIter = cMap.find(key);
- if (cIter != cMap.end() && cIter->second->pendingSequence == sequence) {
+ if (cIter != cMap.end() && cIter->second.pendingSequence == sequence) {
size_t length = ValidateSchema(inBuffer);
- if (length == 0)
+ if (length == 0) {
+ QPID_LOG(warning, "Management Broker received invalid schema response: " << packageName << "." << key.name);
cMap.erase(key);
+ }
else {
- cIter->second->buffer = (uint8_t*) malloc(length);
- cIter->second->bufferLen = length;
- inBuffer.getRawData(cIter->second->buffer, cIter->second->bufferLen);
+ cIter->second.buffer = (uint8_t*) malloc(length);
+ cIter->second.bufferLen = length;
+ inBuffer.getRawData(cIter->second.buffer, cIter->second.bufferLen);
// Publish a class-indication message
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- EncodeHeader (outBuffer, 'q');
- EncodeClassIndication (outBuffer, pIter, cIter);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- SendBuffer (outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema");
+ EncodeHeader(outBuffer, 'q');
+ EncodeClassIndication(outBuffer, pIter, cIter);
+ outLen = MA_BUFFER_SIZE - outBuffer.available();
+ outBuffer.reset();
+ SendBuffer(outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema");
}
}
}
@@ -671,14 +695,14 @@
void ManagementBroker::deleteOrphanedAgentsLH()
{
- vector<uint64_t> deleteList;
+ vector<ObjectId> deleteList;
for (RemoteAgentMap::iterator aIter = remoteAgents.begin(); aIter != remoteAgents.end(); aIter++) {
- uint64_t connectionRef = aIter->first;
+ ObjectId connectionRef = aIter->first;
bool found = false;
- for (ManagementObjectMap::iterator iter = managementObjects.begin ();
- iter != managementObjects.end ();
+ for (ManagementObjectMap::iterator iter = managementObjects.begin();
+ iter != managementObjects.end();
iter++) {
if (iter->first == connectionRef && !iter->second->isDeleted()) {
found = true;
@@ -692,10 +716,8 @@
}
}
- for (vector<uint64_t>::iterator dIter = deleteList.begin(); dIter != deleteList.end(); dIter++) {
-
+ for (vector<ObjectId>::iterator dIter = deleteList.begin(); dIter != deleteList.end(); dIter++)
remoteAgents.erase(*dIter);
- }
deleteList.clear();
}
@@ -705,7 +727,7 @@
string label;
uint32_t requestedBank;
uint32_t assignedBank;
- uint64_t connectionRef = ((const ConnectionState*) connToken)->GetManagementObject()->getObjectId();
+ ObjectId connectionRef = ((const ConnectionState*) connToken)->GetManagementObject()->getObjectId();
Uuid systemId;
moveNewObjectsLH();
@@ -741,6 +763,7 @@
uint32_t outLen;
EncodeHeader (outBuffer, 'a', sequence);
+ outBuffer.putLong (brokerBank);
outBuffer.putLong (assignedBank);
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
@@ -786,13 +809,77 @@
sendCommandComplete (replyToKey, sequence);
}
-void ManagementBroker::dispatchAgentCommandLH (Message& msg)
+bool ManagementBroker::authorizeAgentMessageLH(Message& msg)
{
Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE);
uint8_t opcode;
uint32_t sequence;
string replyToKey;
+ if (msg.encodedSize() > MA_BUFFER_SIZE)
+ return false;
+
+ msg.encodeContent(inBuffer);
+ inBuffer.reset();
+
+ if (!CheckHeader(inBuffer, &opcode, &sequence))
+ return false;
+
+ if (opcode == 'M') {
+ // TODO: check method call against ACL list.
+ AclModule* acl = broker->getAcl();
+ if (acl == 0)
+ return true;
+
+ string userId = ((const broker::ConnectionState*) msg.getPublisher())->getUserId();
+ string packageName;
+ string className;
+ uint8_t hash[16];
+ string methodName;
+
+ std::map<acl::Property, string> params;
+ ObjectId objId(inBuffer);
+ inBuffer.getShortString(packageName);
+ inBuffer.getShortString(className);
+ inBuffer.getBin128(hash);
+ inBuffer.getShortString(methodName);
+
+ params[acl::SCHEMAPACKAGE] = packageName;
+ params[acl::SCHEMACLASS] = className;
+
+ if (acl->authorise(userId, acl::ACCESS, acl::METHOD, methodName, ¶ms))
+ return true;
+
+ const framing::MessageProperties* p =
+ msg.getFrames().getHeaders()->get<framing::MessageProperties>();
+ if (p && p->hasReplyTo()) {
+ const framing::ReplyTo& rt = p->getReplyTo();
+ replyToKey = rt.getRoutingKey();
+
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ EncodeHeader(outBuffer, 'm', sequence);
+ outBuffer.putLong(Manageable::STATUS_FORBIDDEN);
+ outBuffer.putShortString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN));
+ outLen = MA_BUFFER_SIZE - outBuffer.available();
+ outBuffer.reset();
+ SendBuffer(outBuffer, outLen, dExchange, replyToKey);
+ }
+
+ return false;
+ }
+
+ return true;
+}
+
+void ManagementBroker::dispatchAgentCommandLH(Message& msg)
+{
+ Buffer inBuffer(inputBuffer, MA_BUFFER_SIZE);
+ uint8_t opcode;
+ uint32_t sequence;
+ string replyToKey;
+
const framing::MessageProperties* p =
msg.getFrames().getHeaders()->get<framing::MessageProperties>();
if (p && p->hasReplyTo()) {
@@ -823,7 +910,7 @@
else if (opcode == 's') handleSchemaResponseLH (inBuffer, replyToKey, sequence);
else if (opcode == 'A') handleAttachRequestLH (inBuffer, replyToKey, sequence, msg.getPublisher());
else if (opcode == 'G') handleGetQueryLH (inBuffer, replyToKey, sequence);
- else if (opcode == 'M') handleMethodRequestLH (inBuffer, replyToKey, sequence);
+ else if (opcode == 'M') handleMethodRequestLH (inBuffer, replyToKey, sequence, msg.getPublisher());
}
ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackageLH(std::string name)
@@ -834,7 +921,7 @@
// No such package found, create a new map entry.
pair<PackageMap::iterator, bool> result =
- packages.insert (pair<string, ClassMap> (name, ClassMap ()));
+ packages.insert(pair<string, ClassMap>(name, ClassMap()));
QPID_LOG (debug, "ManagementBroker added package " << name);
// Publish a package-indication message
@@ -859,20 +946,18 @@
ClassMap& cMap = pIter->second;
key.name = className;
- memcpy (&key.hash, md5Sum, 16);
+ memcpy(&key.hash, md5Sum, 16);
- ClassMap::iterator cIter = cMap.find (key);
- if (cIter != cMap.end ())
+ ClassMap::iterator cIter = cMap.find(key);
+ if (cIter != cMap.end())
return;
// No such class found, create a new class with local information.
QPID_LOG (debug, "ManagementBroker added class " << pIter->first << "." <<
key.name);
- SchemaClass* classInfo = new SchemaClass;
- classInfo->writeSchemaCall = schemaCall;
- cMap[key] = classInfo;
- cIter = cMap.find (key);
+ cMap.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(schemaCall)));
+ cIter = cMap.find(key);
}
void ManagementBroker::EncodePackageIndication (Buffer& buf,
@@ -917,6 +1002,8 @@
for (uint16_t idx = 0; idx < methCount; idx++) {
FieldTable ft;
ft.decode(inBuffer);
+ if (!ft.isSet("argCount"))
+ return 0;
int argCount = ft.getInt("argCount");
for (int mIdx = 0; mIdx < argCount; mIdx++) {
FieldTable aft;
@@ -924,10 +1011,41 @@
}
}
- if (evntCount != 0)
- return 0;
+ for (uint16_t idx = 0; idx < evntCount; idx++) {
+ FieldTable ft;
+ ft.decode(inBuffer);
+ if (!ft.isSet("argCount"))
+ return 0;
+ int argCount = ft.getInt("argCount");
+ for (int mIdx = 0; mIdx < argCount; mIdx++) {
+ FieldTable aft;
+ aft.decode(inBuffer);
+ }
+ }
end = inBuffer.getPosition();
inBuffer.restore(); // restore original position
return end - start;
}
+
+Mutex& ManagementBroker::getMutex()
+{
+ return userLock;
+}
+
+Buffer* ManagementBroker::startEventLH()
+{
+ Buffer* outBuffer(new Buffer(eventBuffer, MA_BUFFER_SIZE));
+ EncodeHeader(*outBuffer, 'e');
+ outBuffer->putLongLong(uint64_t(Duration(now())));
+ return outBuffer;
+}
+
+void ManagementBroker::finishEventLH(Buffer* outBuffer)
+{
+ uint32_t outLen = MA_BUFFER_SIZE - outBuffer->available();
+ outBuffer->reset();
+ SendBuffer(*outBuffer, outLen, mExchange, "mgmt.event");
+ delete outBuffer;
+}
+
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h?rev=691700&r1=691699&r2=691700&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h Wed Sep 3 11:01:44 2008
@@ -47,7 +47,7 @@
ManagementBroker ();
virtual ~ManagementBroker ();
- void configure (std::string dataDir, uint16_t interval, Manageable* broker, int threadPoolSize);
+ void configure (std::string dataDir, uint16_t interval, broker::Broker* broker, int threadPoolSize);
void setInterval (uint16_t _interval) { interval = _interval; }
void setExchange (broker::Exchange::shared_ptr mgmtExchange,
broker::Exchange::shared_ptr directExchange);
@@ -56,16 +56,15 @@
std::string className,
uint8_t* md5Sum,
ManagementObject::writeSchemaCall_t schemaCall);
- uint64_t addObject (ManagementObject* object,
- uint32_t persistId = 0,
- uint32_t persistBank = 4);
+ ObjectId addObject (ManagementObject* object,
+ uint64_t persistId = 0);
void clientAdded (void);
bool dispatchCommand (broker::Deliverable& msg,
const std::string& routingKey,
const framing::FieldTable* args);
// Stubs for remote management agent calls
- void init (std::string, uint16_t, uint16_t, bool) { assert(0); }
+ void init (std::string, uint16_t, uint16_t, bool, std::string) { assert(0); }
uint32_t pollCallbacks (uint32_t) { assert(0); return 0; }
int getSignalFd () { assert(0); return -1; }
@@ -88,7 +87,7 @@
{
uint32_t objIdBank;
std::string routingKey;
- uint64_t connectionRef;
+ ObjectId connectionRef;
Agent* mgmtObject;
ManagementObject* GetManagementObject (void) const { return mgmtObject; }
virtual ~RemoteAgent ();
@@ -97,7 +96,7 @@
// TODO: Eventually replace string with entire reply-to structure. reply-to
// currently assumes that the exchange is "amq.direct" even though it could
// in theory be specified differently.
- typedef std::map<uint64_t, RemoteAgent*> RemoteAgentMap;
+ typedef std::map<ObjectId, RemoteAgent*> RemoteAgentMap;
typedef std::vector<std::string> ReplyToVector;
// Storage for known schema classes:
@@ -133,12 +132,15 @@
size_t bufferLen;
uint8_t* buffer;
- SchemaClass () : writeSchemaCall(0), pendingSequence(0), bufferLen(0), buffer(0) {}
+ SchemaClass(uint32_t seq) :
+ writeSchemaCall(0), pendingSequence(seq), bufferLen(0), buffer(0) {}
+ SchemaClass(ManagementObject::writeSchemaCall_t call) :
+ writeSchemaCall(call), pendingSequence(0), bufferLen(0), buffer(0) {}
bool hasSchema () { return (writeSchemaCall != 0) || (buffer != 0); }
void appendSchema (framing::Buffer& buf);
};
- typedef std::map<SchemaClassKey, SchemaClass*, SchemaClassKeyComp> ClassMap;
+ typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap;
typedef std::map<std::string, ClassMap> PackageMap;
RemoteAgentMap remoteAgents;
@@ -157,10 +159,10 @@
broker::Exchange::shared_ptr dExchange;
std::string dataDir;
uint16_t interval;
- Manageable* broker;
+ broker::Broker* broker;
uint16_t bootSequence;
- uint32_t localBank;
uint32_t nextObjectId;
+ uint32_t brokerBank;
uint32_t nextRemoteBank;
uint32_t nextRequestSequence;
bool clientWasAdded;
@@ -168,6 +170,7 @@
# define MA_BUFFER_SIZE 65536
char inputBuffer[MA_BUFFER_SIZE];
char outputBuffer[MA_BUFFER_SIZE];
+ char eventBuffer[MA_BUFFER_SIZE];
void writeData ();
void PeriodicProcessing (void);
@@ -179,7 +182,8 @@
std::string routingKey);
void moveNewObjectsLH();
- void dispatchAgentCommandLH (broker::Message& msg);
+ bool authorizeAgentMessageLH(broker::Message& msg);
+ void dispatchAgentCommandLH(broker::Message& msg);
PackageMap::iterator FindOrAddPackageLH(std::string name);
void AddClass(PackageMap::iterator pIter,
@@ -206,9 +210,12 @@
void handleSchemaResponseLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
void handleAttachRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const broker::ConnectionToken* connToken);
void handleGetQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
- void handleMethodRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handleMethodRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const broker::ConnectionToken* connToken);
size_t ValidateSchema(framing::Buffer&);
+ sys::Mutex& getMutex();
+ framing::Buffer* startEventLH();
+ void finishEventLH(framing::Buffer* outBuffer);
};
}}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.cpp?rev=691700&r1=691699&r2=691700&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.cpp Wed Sep 3 11:01:44 2008
@@ -28,6 +28,62 @@
using namespace qpid::management;
using namespace qpid::sys;
+void AgentAttachment::setBanks(uint32_t broker, uint32_t bank)
+{
+ first =
+ ((uint64_t) (broker & 0x000fffff)) << 28 |
+ ((uint64_t) (bank & 0x0fffffff));
+}
+
+ObjectId::ObjectId(uint8_t flags, uint16_t seq, uint32_t broker, uint32_t bank, uint64_t object)
+ : agent(0)
+{
+ first =
+ ((uint64_t) (flags & 0x0f)) << 60 |
+ ((uint64_t) (seq & 0x0fff)) << 48 |
+ ((uint64_t) (broker & 0x000fffff)) << 28 |
+ ((uint64_t) (bank & 0x0fffffff));
+ second = object;
+}
+
+ObjectId::ObjectId(AgentAttachment* _agent, uint8_t flags, uint16_t seq, uint64_t object)
+ : agent(_agent)
+{
+ first =
+ ((uint64_t) (flags & 0x0f)) << 60 |
+ ((uint64_t) (seq & 0x0fff)) << 48;
+ second = object;
+}
+
+bool ObjectId::operator==(const ObjectId &other) const
+{
+ uint64_t otherFirst = agent == 0 ? other.first : other.first & 0xffff000000000000LL;
+
+ return first == otherFirst && second == other.second;
+}
+
+bool ObjectId::operator<(const ObjectId &other) const
+{
+ uint64_t otherFirst = agent == 0 ? other.first : other.first & 0xffff000000000000LL;
+
+ return (first < otherFirst) || ((first == otherFirst) && (second < other.second));
+}
+
+void ObjectId::encode(framing::Buffer& buffer)
+{
+ if (agent == 0)
+ buffer.putLongLong(first);
+ else
+ buffer.putLongLong(first | agent->first);
+ buffer.putLongLong(second);
+}
+
+void ObjectId::decode(framing::Buffer& buffer)
+{
+ first = buffer.getLongLong();
+ second = buffer.getLongLong();
+}
+
int ManagementObject::nextThreadIndex = 0;
void ManagementObject::writeTimestamps (Buffer& buf)
@@ -38,10 +94,10 @@
buf.putLongLong (uint64_t (Duration (now ())));
buf.putLongLong (createTime);
buf.putLongLong (destroyTime);
- buf.putLongLong (objectId);
+ objectId.encode(buf);
}
-void ManagementObject::setReference(uint64_t) {}
+void ManagementObject::setReference(ObjectId) {}
int ManagementObject::getThreadIndex() {
static __thread int thisIndex = -1;
@@ -54,3 +110,17 @@
return thisIndex;
}
+Mutex& ManagementObject::getMutex()
+{
+ return agent->getMutex();
+}
+
+Buffer* ManagementObject::startEventLH()
+{
+ return agent->startEventLH();
+}
+
+void ManagementObject::finishEventLH(Buffer* buf)
+{
+ agent->finishEventLH(buf);
+}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h?rev=691700&r1=691699&r2=691700&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h Wed Sep 3 11:01:44 2008
@@ -32,6 +32,34 @@
class Manageable;
class ManagementAgent;
+class ObjectId;
+
+
+class AgentAttachment {
+ friend class ObjectId;
+private:
+ uint64_t first;
+public:
+ AgentAttachment() : first(0) {}
+ void setBanks(uint32_t broker, uint32_t bank);
+};
+
+
+class ObjectId {
+private:
+ const AgentAttachment* agent;
+ uint64_t first;
+ uint64_t second;
+public:
+ ObjectId() : agent(0), first(0), second(0) {}
+ ObjectId(framing::Buffer& buf) : agent(0) { decode(buf); }
+ ObjectId(uint8_t flags, uint16_t seq, uint32_t broker, uint32_t bank, uint64_t object);
+ ObjectId(AgentAttachment* _agent, uint8_t flags, uint16_t seq, uint64_t object);
+ bool operator==(const ObjectId &other) const;
+ bool operator<(const ObjectId &other) const;
+ void encode(framing::Buffer& buffer);
+ void decode(framing::Buffer& buffer);
+};
class ManagementObject
{
@@ -39,7 +67,7 @@
uint64_t createTime;
uint64_t destroyTime;
- uint64_t objectId;
+ ObjectId objectId;
bool configChanged;
bool instChanged;
bool deleted;
@@ -84,11 +112,15 @@
int getThreadIndex();
void writeTimestamps (qpid::framing::Buffer& buf);
+ sys::Mutex& getMutex();
+ framing::Buffer* startEventLH();
+ void finishEventLH(framing::Buffer* buf);
+
public:
typedef void (*writeSchemaCall_t) (qpid::framing::Buffer&);
ManagementObject (ManagementAgent* _agent, Manageable* _core) :
- destroyTime(0), objectId (0), configChanged(true),
+ destroyTime(0), configChanged(true),
instChanged(true), deleted(false), coreObject(_core), agent(_agent)
{ createTime = uint64_t (qpid::sys::Duration (qpid::sys::now ())); }
virtual ~ManagementObject () {}
@@ -100,14 +132,14 @@
virtual void doMethod (std::string methodName,
qpid::framing::Buffer& inBuf,
qpid::framing::Buffer& outBuf) = 0;
- virtual void setReference (uint64_t objectId);
+ virtual void setReference (ObjectId objectId);
virtual std::string& getClassName (void) = 0;
virtual std::string& getPackageName (void) = 0;
virtual uint8_t* getMd5Sum (void) = 0;
- void setObjectId (uint64_t oid) { objectId = oid; }
- uint64_t getObjectId (void) { return objectId; }
+ void setObjectId (ObjectId oid) { objectId = oid; }
+ ObjectId getObjectId (void) { return objectId; }
inline bool getConfigChanged (void) { return configChanged; }
virtual bool getInstChanged (void) { return instChanged; }
inline void setAllChanged (void) {
@@ -120,10 +152,9 @@
deleted = true;
}
inline bool isDeleted (void) { return deleted; }
- inline sys::Mutex& getLock() { return accessLock; }
};
-typedef std::map<uint64_t,ManagementObject*> ManagementObjectMap;
+typedef std::map<ObjectId, ManagementObject*> ManagementObjectMap;
}}
Modified: incubator/qpid/trunk/qpid/python/qpid/management.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/management.py?rev=691700&r1=691699&r2=691700&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/management.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/management.py Wed Sep 3 11:01:44 2008
@@ -69,6 +69,53 @@
for cell in row:
setattr (self, cell[0], cell[1])
+class objectId(object):
+ """ Object that represents QMF object identifiers """
+
+ def __init__(self, codec):
+ self.first = codec.read_uint64()
+ self.second = codec.read_uint64()
+
+ def __cmp__(self, other):
+ if other == None:
+ return 1
+ if self.first < other.first:
+ return -1
+ if self.first > other.first:
+ return 1
+ if self.second < other.second:
+ return -1
+ if self.second > other.second:
+ return 1
+ return 0
+
+
+ def index(self):
+ return (self.first, self.second)
+
+ def getFlags(self):
+ return (self.first & 0xF000000000000000) >> 60
+
+ def getSequence(self):
+ return (self.first & 0x0FFF000000000000) >> 48
+
+ def getBroker(self):
+ return (self.first & 0x0000FFFFF0000000) >> 28
+
+ def getBank(self):
+ return self.first & 0x000000000FFFFFFF
+
+ def getObject(self):
+ return self.second
+
+ def isDurable(self):
+ return self.getSequence() == 0
+
+ def encode(self, codec):
+ codec.write_uint64(self.first)
+ codec.write_uint64(self.second)
+
+
class methodResult:
""" Object that contains the result of a method call """
@@ -308,6 +355,8 @@
self.handleClassInd (ch, codec)
elif hdr[0] == 'h':
self.handleHeartbeat (ch, codec)
+ elif hdr[0] == 'e':
+ self.handleEvent (ch, codec)
else:
self.parse (ch, codec, hdr[0], hdr[1])
ch.accept(msg)
@@ -386,7 +435,7 @@
elif typecode == 9: # DELTATIME
codec.write_uint64 (long (value))
elif typecode == 10: # REF
- codec.write_uint64 (long (value))
+ value.encode(codec)
elif typecode == 11: # BOOL
codec.write_uint8 (int (value))
elif typecode == 12: # FLOAT
@@ -429,7 +478,7 @@
elif typecode == 9: # DELTATIME
data = codec.read_uint64 ()
elif typecode == 10: # REF
- data = codec.read_uint64 ()
+ data = objectId(codec)
elif typecode == 11: # BOOL
data = codec.read_uint8 ()
elif typecode == 12: # FLOAT
@@ -551,9 +600,9 @@
ch.send ("qpid.management", smsg)
def handleClassInd (self, ch, codec):
- pname = str (codec.read_str8 ())
- cname = str (codec.read_str8 ())
- hash = codec.read_bin128 ()
+ pname = str (codec.read_str8())
+ cname = str (codec.read_str8())
+ hash = codec.read_bin128()
if pname not in self.packages:
return
@@ -574,6 +623,32 @@
if self.ctrlCb != None:
self.ctrlCb (ch.context, self.CTRL_HEARTBEAT, timestamp)
+ def handleEvent (self, ch, codec):
+ if self.eventCb == None:
+ return
+ timestamp = codec.read_uint64()
+ objId = objectId(codec)
+ packageName = str(codec.read_str8())
+ className = str(codec.read_str8())
+ hash = codec.read_bin128()
+ name = str(codec.read_str8())
+ classKey = (packageName, className, hash)
+ if classKey not in self.schema:
+ return;
+ schemaClass = self.schema[classKey]
+ row = []
+ es = schemaClass['E']
+ arglist = None
+ for ename in es:
+ (edesc, eargs) = es[ename]
+ if ename == name:
+ arglist = eargs
+ if arglist == None:
+ return
+ for arg in arglist:
+ row.append((arg[0], self.decodeValue(codec, arg[1])))
+ self.eventCb(ch.context, classKey, objId, name, row)
+
def parseSchema (self, ch, codec):
""" Parse a received schema-description message. """
self.decOutstanding (ch)
@@ -597,22 +672,23 @@
configs = []
insts = []
methods = {}
- events = []
+ events = {}
configs.append (("id", 4, "", "", 1, 1, None, None, None, None, None))
insts.append (("id", 4, None, None))
for idx in range (configCount):
ft = codec.read_map ()
- name = str (ft["name"])
- type = ft["type"]
- access = ft["access"]
- index = ft["index"]
- unit = None
- min = None
- max = None
- maxlen = None
- desc = None
+ name = str (ft["name"])
+ type = ft["type"]
+ access = ft["access"]
+ index = ft["index"]
+ optional = ft["optional"]
+ unit = None
+ min = None
+ max = None
+ maxlen = None
+ desc = None
for key, value in ft.items ():
if key == "unit":
@@ -626,7 +702,7 @@
elif key == "desc":
desc = str (value)
- config = (name, type, unit, desc, access, index, min, max, maxlen)
+ config = (name, type, unit, desc, access, index, min, max, maxlen, optional)
configs.append (config)
for idx in range (instCount):
@@ -685,6 +761,33 @@
args.append (arg)
methods[mname] = (mdesc, args)
+ for idx in range (eventCount):
+ ft = codec.read_map ()
+ ename = str (ft["name"])
+ argCount = ft["argCount"]
+ if "desc" in ft:
+ edesc = str (ft["desc"])
+ else:
+ edesc = None
+
+ args = []
+ for aidx in range (argCount):
+ ft = codec.read_map ()
+ name = str (ft["name"])
+ type = ft["type"]
+ unit = None
+ desc = None
+
+ for key, value in ft.items ():
+ if key == "unit":
+ unit = str (value)
+ elif key == "desc":
+ desc = str (value)
+
+ arg = (name, type, unit, desc)
+ args.append (arg)
+ events[ename] = (edesc, args)
+
schemaClass = {}
schemaClass['C'] = configs
schemaClass['I'] = insts
@@ -695,6 +798,22 @@
if self.schemaCb != None:
self.schemaCb (ch.context, classKey, configs, insts, methods, events)
+ def parsePresenceMasks(self, codec, schemaClass):
+ """ Generate a list of not-present properties """
+ excludeList = []
+ bit = 0
+ for element in schemaClass['C'][1:]:
+ if element[9] == 1:
+ if bit == 0:
+ mask = codec.read_uint8()
+ bit = 1
+ if (mask & bit) == 0:
+ excludeList.append(element[0])
+ bit = bit * 2
+ if bit == 256:
+ bit = 0
+ return excludeList
+
def parseContent (self, ch, cls, codec, seq=0):
""" Parse a received content message. """
if (cls == 'C' or (cls == 'B' and seq == 0)) and self.configCb == None:
@@ -716,21 +835,26 @@
timestamps.append (codec.read_uint64 ()) # Current Time
timestamps.append (codec.read_uint64 ()) # Create Time
timestamps.append (codec.read_uint64 ()) # Delete Time
-
+ objId = objectId(codec)
schemaClass = self.schema[classKey]
if cls == 'C' or cls == 'B':
- for element in schemaClass['C'][:]:
+ notPresent = self.parsePresenceMasks(codec, schemaClass)
+
+ if cls == 'C' or cls == 'B':
+ row.append(("id", objId))
+ for element in schemaClass['C'][1:]:
tc = element[1]
name = element[0]
- data = self.decodeValue (codec, tc)
- row.append ((name, data))
+ if name in notPresent:
+ row.append((name, None))
+ else:
+ data = self.decodeValue(codec, tc)
+ row.append((name, data))
if cls == 'I' or cls == 'B':
- if cls == 'B':
- start = 1
- else:
- start = 0
- for element in schemaClass['I'][start:]:
+ if cls == 'I':
+ row.append(("id", objId))
+ for element in schemaClass['I'][1:]:
tc = element[1]
name = element[0]
data = self.decodeValue (codec, tc)
@@ -763,9 +887,12 @@
codec = Codec (self.spec)
sequence = self.seqMgr.reserve ((userSequence, classId, methodName))
self.setHeader (codec, ord ('M'), sequence)
- codec.write_uint64 (objId) # ID of object
+ objId.encode(codec)
+ codec.write_str8 (classId[0])
+ codec.write_str8 (classId[1])
+ codec.write_bin128 (classId[2])
codec.write_str8 (methodName)
- bank = (objId & 0x0000FFFFFF000000) >> 24
+ bank = objId.getBank()
# Encode args according to schema
if classId not in self.schema:
Modified: incubator/qpid/trunk/qpid/python/qpid/managementdata.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/managementdata.py?rev=691700&r1=691699&r2=691700&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/managementdata.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/managementdata.py Wed Sep 3 11:01:44 2008
@@ -71,14 +71,14 @@
#
def registerObjId (self, objId):
- if not objId in self.idBackMap:
- self.idBackMap[objId] = self.nextId
+ if not objId.index() in self.idBackMap:
+ self.idBackMap[objId.index()] = self.nextId
self.idMap[self.nextId] = objId
self.nextId += 1
- def displayObjId (self, objId):
- if objId in self.idBackMap:
- return self.idBackMap[objId]
+ def displayObjId (self, objIdIndex):
+ if objIdIndex in self.idBackMap:
+ return self.idBackMap[objIdIndex]
else:
return 0
@@ -86,7 +86,7 @@
if displayId in self.idMap:
return self.idMap[displayId]
else:
- return 0
+ return None
def displayClassName (self, cls):
(packageName, className, hash) = cls
@@ -102,19 +102,20 @@
self.tables[className] = {}
# Register the ID so a more friendly presentation can be displayed
- id = long (list[0][1])
- self.registerObjId (id)
+ objId = list[0][1]
+ oidx = objId.index()
+ self.registerObjId (objId)
# If this object hasn't been seen before, create a new object record with
# the timestamps and empty lists for configuration and instrumentation data.
- if id not in self.tables[className]:
- self.tables[className][id] = (timestamps, [], [])
+ if oidx not in self.tables[className]:
+ self.tables[className][oidx] = (timestamps, [], [])
- (unused, oldConf, oldInst) = self.tables[className][id]
+ (unused, oldConf, oldInst) = self.tables[className][oidx]
# For config updates, simply replace old config list with the new one.
if context == 0: #config
- self.tables[className][id] = (timestamps, list, oldInst)
+ self.tables[className][oidx] = (timestamps, list, oldInst)
# For instrumentation updates, carry the minimum and maximum values for
# "hi-lo" stats forward.
@@ -132,7 +133,7 @@
if oldInst[idx][1] < value:
value = oldInst[idx][1]
newInst.append ((key, value))
- self.tables[className][id] = (timestamps, oldConf, newInst)
+ self.tables[className][oidx] = (timestamps, oldConf, newInst)
finally:
self.lock.release ()
@@ -211,11 +212,13 @@
pass
def refName (self, oid):
- if oid == 0:
+ if oid == None:
return "NULL"
- return str (self.displayObjId (oid))
+ return str (self.displayObjId (oid.index()))
def valueDisplay (self, classKey, key, value):
+ if value == None:
+ return "<NULL>"
for kind in range (2):
schema = self.schema[classKey][kind]
for item in schema:
@@ -437,7 +440,7 @@
if classKey in self.tables:
ids = self.listOfIds(classKey, tokens[1:])
for objId in ids:
- (ts, config, inst) = self.tables[classKey][self.rawObjId(objId)]
+ (ts, config, inst) = self.tables[classKey][self.rawObjId(objId).index()]
createTime = self.disp.timestamp (ts[1])
destroyTime = "-"
if ts[2] > 0:
@@ -486,32 +489,32 @@
rows = []
timestamp = None
- config = self.tables[classKey][ids[0]][1]
+ config = self.tables[classKey][ids[0].index()][1]
for eIdx in range (len (config)):
key = config[eIdx][0]
if key != "id":
row = ("property", key)
for id in ids:
if timestamp == None or \
- timestamp < self.tables[classKey][id][0][0]:
- timestamp = self.tables[classKey][id][0][0]
- (key, value) = self.tables[classKey][id][1][eIdx]
+ timestamp < self.tables[classKey][id.index()][0][0]:
+ timestamp = self.tables[classKey][id.index()][0][0]
+ (key, value) = self.tables[classKey][id.index()][1][eIdx]
row = row + (self.valueDisplay (classKey, key, value),)
rows.append (row)
- inst = self.tables[classKey][ids[0]][2]
+ inst = self.tables[classKey][ids[0].index()][2]
for eIdx in range (len (inst)):
key = inst[eIdx][0]
if key != "id":
row = ("statistic", key)
for id in ids:
- (key, value) = self.tables[classKey][id][2][eIdx]
+ (key, value) = self.tables[classKey][id.index()][2][eIdx]
row = row + (self.valueDisplay (classKey, key, value),)
rows.append (row)
titleRow = ("Type", "Element")
for id in ids:
- titleRow = titleRow + (self.refName (id),)
+ titleRow = titleRow + (self.refName(id),)
caption = "Object of type %s.%s:" % (classKey[0], classKey[1])
if timestamp != None:
caption = caption + " (last sample time: " + self.disp.timestamp (timestamp) + ")"
@@ -563,13 +566,15 @@
access = self.accessName (config[4])
extra = ""
if config[5] == 1:
- extra = extra + "index "
+ extra += "index "
if config[6] != None:
- extra = extra + "Min: " + str (config[6])
+ extra += "Min: " + str(config[6]) + " "
if config[7] != None:
- extra = extra + "Max: " + str (config[7])
+ extra += "Max: " + str(config[7]) + " "
if config[8] != None:
- extra = extra + "MaxLen: " + str (config[8])
+ extra += "MaxLen: " + str(config[8]) + " "
+ if config[9] == 1:
+ extra += "optional "
rows.append ((name, typename, unit, access, extra, desc))
for config in self.schema[classKey][1]:
@@ -613,7 +618,7 @@
def getClassForId (self, objId):
""" Given an object ID, return the class key for the referenced object """
for classKey in self.tables:
- if objId in self.tables[classKey]:
+ if objId.index() in self.tables[classKey]:
return classKey
return None
@@ -659,14 +664,19 @@
def makeIdRow (self, displayId):
if displayId in self.idMap:
- rawId = self.idMap[displayId]
+ objId = self.idMap[displayId]
else:
return None
- return (displayId,
- rawId,
- (rawId & 0x7FFF000000000000) >> 48,
- (rawId & 0x0000FFFFFF000000) >> 24,
- (rawId & 0x0000000000FFFFFF))
+ if objId.getFlags() == 0:
+ flags = ""
+ else:
+ flags = str(objId.getFlags())
+ seq = objId.getSequence()
+ if seq == 0:
+ seqText = "<durable>"
+ else:
+ seqText = str(seq)
+ return (displayId, flags, seqText, objId.getBroker(), objId.getBank(), hex(objId.getObject()))
def listIds (self, select):
rows = []
@@ -683,7 +693,7 @@
return
rows.append(row)
self.disp.table("Translation of Display IDs:",
- ("DisplayID", "RawID", "BootSequence", "Bank", "Object"),
+ ("DisplayID", "Flags", "BootSequence", "Broker", "Bank", "Object"),
rows)
def do_list (self, data):
Modified: incubator/qpid/trunk/qpid/specs/management-schema.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/specs/management-schema.xml?rev=691700&r1=691699&r2=691700&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/specs/management-schema.xml (original)
+++ incubator/qpid/trunk/qpid/specs/management-schema.xml Wed Sep 3 11:01:44 2008
@@ -61,18 +61,16 @@
===============================================================
-->
<class name="Broker">
- <property name="systemRef" type="objId" references="System" access="RC" index="y" desc="System ID" parentRef="y"/>
- <property name="port" type="uint16" access="RC" index="y" desc="TCP Port for AMQP Service"/>
- <property name="workerThreads" type="uint16" access="RO" desc="Thread pool size"/>
- <property name="maxConns" type="uint16" access="RO" desc="Maximum allowed connections"/>
- <property name="connBacklog" type="uint16" access="RO" desc="Connection backlog limit for listening socket"/>
- <property name="stagingThreshold" type="uint32" access="RO" desc="Broker stages messages over this size to disk"/>
- <property name="mgmtPubInterval" type="uint16" access="RW" unit="second" min="1" desc="Interval for management broadcasts"/>
- <property name="clusterName" type="sstr" access="RO"
- desc="Name of cluster this server is a member of"/>
- <property name="version" type="sstr" access="RO" desc="Running software version"/>
- <property name="dataDirEnabled" type="bool" access="RO" desc="Persistent configuration storage enabled"/>
- <property name="dataDir" type="sstr" access="RO" desc="Persistent configuration storage location"/>
+ <property name="systemRef" type="objId" references="System" access="RC" index="y" desc="System ID" parentRef="y"/>
+ <property name="port" type="uint16" access="RC" index="y" desc="TCP Port for AMQP Service"/>
+ <property name="workerThreads" type="uint16" access="RO" desc="Thread pool size"/>
+ <property name="maxConns" type="uint16" access="RO" desc="Maximum allowed connections"/>
+ <property name="connBacklog" type="uint16" access="RO" desc="Connection backlog limit for listening socket"/>
+ <property name="stagingThreshold" type="uint32" access="RO" desc="Broker stages messages over this size to disk"/>
+ <property name="mgmtPubInterval" type="uint16" access="RW" unit="second" min="1" desc="Interval for management broadcasts"/>
+ <property name="clusterName" type="sstr" access="RO" desc="Name of cluster this server is a member of"/>
+ <property name="version" type="sstr" access="RO" desc="Running software version"/>
+ <property name="dataDir" type="sstr" access="RO" optional="y" desc="Persistent configuration storage location"/>
<method name="joinCluster">
<arg name="clusterName" dir="I" type="sstr"/>
@@ -94,6 +92,17 @@
<arg name="username" dir="I" type="sstr"/>
<arg name="password" dir="I" type="sstr"/>
</method>
+
+ <event name="agentConnect" desc="QMF Management Agent has connected to the broker">
+ <arg name="remoteAddress" type="sstr"/>
+ <arg name="label" type="sstr"/>
+ <arg name="brokerBank" type="uint32"/>
+ <arg name="agentBank" type="uint32"/>
+ </event>
+
+ <event name="agentDisconnect" desc="QMF Management Agent has disconnected from the broker">
+ <arg name="remoteAddress" type="sstr"/>
+ </event>
</class>
<!--
Modified: incubator/qpid/trunk/qpid/specs/management-types.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/specs/management-types.xml?rev=691700&r1=691699&r2=691700&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/specs/management-types.xml (original)
+++ incubator/qpid/trunk/qpid/specs/management-types.xml Wed Sep 3 11:01:44 2008
@@ -19,7 +19,7 @@
under the License.
-->
-<type name="objId" base="REF" cpp="uint64_t" encode="@.putLongLong(#)" decode="# = @.getLongLong()" accessor="direct" init="0"/>
+<type name="objId" base="REF" cpp="ObjectId" encode="#.encode(@)" decode="#.decode(@)" accessor="direct" init="0"/>
<type name="uint8" base="U8" cpp="uint8_t" encode="@.putOctet(#)" decode="# = @.getOctet()" accessor="direct" init="0"/>
<type name="uint16" base="U16" cpp="uint16_t" encode="@.putShort(#)" decode="# = @.getShort()" accessor="direct" init="0"/>
<type name="uint32" base="U32" cpp="uint32_t" encode="@.putLong(#)" decode="# = @.getLong()" accessor="direct" init="0"/>