You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2008/10/07 23:47:36 UTC
svn commit: r702651 [2/2] - in /incubator/qpid/trunk/qpid:
cpp/examples/qmf-agent/ cpp/managementgen/ cpp/managementgen/qmf/
cpp/managementgen/qmf/templates/ cpp/src/qpid/acl/ cpp/src/qpid/agent/
cpp/src/qpid/broker/ cpp/src/qpid/framing/ cpp/src/qpid/...
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp?rev=702651&r1=702650&r2=702651&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp Tue Oct 7 14:47:35 2008
@@ -179,14 +179,24 @@
dExchange = _dexchange;
}
-void ManagementBroker::RegisterClass (string packageName,
- string className,
+void ManagementBroker::registerClass (string& packageName,
+ string& className,
uint8_t* md5Sum,
ManagementObject::writeSchemaCall_t schemaCall)
{
Mutex::ScopedLock lock(userLock);
- PackageMap::iterator pIter = FindOrAddPackageLH(packageName);
- AddClass(pIter, className, md5Sum, schemaCall);
+ PackageMap::iterator pIter = findOrAddPackageLH(packageName);
+ addClassLH(ManagementItem::CLASS_KIND_TABLE, pIter, className, md5Sum, schemaCall);
+}
+
+void ManagementBroker::registerEvent (string& packageName,
+ string& eventName,
+ uint8_t* md5Sum,
+ ManagementObject::writeSchemaCall_t schemaCall)
+{
+ Mutex::ScopedLock lock(userLock);
+ PackageMap::iterator pIter = findOrAddPackageLH(packageName);
+ addClassLH(ManagementItem::CLASS_KIND_EVENT, pIter, eventName, md5Sum, schemaCall);
}
ObjectId ManagementBroker::addObject (ManagementObject* object,
@@ -211,6 +221,23 @@
return objId;
}
+void ManagementBroker::raiseEvent(const ManagementEvent& event)
+{
+ Mutex::ScopedLock lock (userLock);
+ Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ encodeHeader(outBuffer, 'e');
+ outBuffer.putShortString(event.getPackageName());
+ outBuffer.putShortString(event.getEventName());
+ outBuffer.putBin128(event.getMd5Sum());
+ outBuffer.putLongLong(uint64_t(Duration(now())));
+ event.encode(outBuffer);
+ outLen = MA_BUFFER_SIZE - outBuffer.available();
+ outBuffer.reset();
+ sendBuffer(outBuffer, outLen, mExchange, "mgmt.event");
+}
+
ManagementBroker::Periodic::Periodic (ManagementBroker& _broker, uint32_t _seconds)
: TimerTask (qpid::sys::Duration ((_seconds ? _seconds : 1) * qpid::sys::TIME_SEC)), broker(_broker) {}
@@ -219,7 +246,7 @@
void ManagementBroker::Periodic::fire ()
{
broker.timer.add (intrusive_ptr<TimerTask> (new Periodic (broker, broker.interval)));
- broker.PeriodicProcessing ();
+ broker.periodicProcessing ();
}
void ManagementBroker::clientAdded (void)
@@ -233,35 +260,35 @@
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- EncodeHeader (outBuffer, 'x');
+ encodeHeader (outBuffer, 'x');
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
- SendBuffer (outBuffer, outLen, dExchange, aIter->second->routingKey);
+ sendBuffer (outBuffer, outLen, dExchange, aIter->second->routingKey);
}
}
-void ManagementBroker::EncodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq)
+void ManagementBroker::encodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq)
{
buf.putOctet ('A');
buf.putOctet ('M');
- buf.putOctet ('1');
+ buf.putOctet ('2');
buf.putOctet (opcode);
buf.putLong (seq);
}
-bool ManagementBroker::CheckHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq)
+bool ManagementBroker::checkHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq)
{
- uint8_t h1 = buf.getOctet ();
- uint8_t h2 = buf.getOctet ();
- uint8_t h3 = buf.getOctet ();
+ uint8_t h1 = buf.getOctet();
+ uint8_t h2 = buf.getOctet();
+ uint8_t h3 = buf.getOctet();
- *opcode = buf.getOctet ();
- *seq = buf.getLong ();
+ *opcode = buf.getOctet();
+ *seq = buf.getLong();
- return h1 == 'A' && h2 == 'M' && h3 == '1';
+ return h1 == 'A' && h2 == 'M' && h3 == '2';
}
-void ManagementBroker::SendBuffer (Buffer& buf,
+void ManagementBroker::sendBuffer (Buffer& buf,
uint32_t length,
qpid::broker::Exchange::shared_ptr exchange,
string routingKey)
@@ -304,7 +331,7 @@
newManagementObjects.clear();
}
-void ManagementBroker::PeriodicProcessing (void)
+void ManagementBroker::periodicProcessing (void)
{
#define BUFSIZE 65536
Mutex::ScopedLock lock (userLock);
@@ -315,13 +342,13 @@
{
Buffer msgBuffer(msgChars, BUFSIZE);
- EncodeHeader(msgBuffer, 'h');
+ encodeHeader(msgBuffer, 'h');
msgBuffer.putLongLong(uint64_t(Duration(now())));
contentSize = BUFSIZE - msgBuffer.available ();
msgBuffer.reset ();
routingKey = "mgmt." + uuid.str() + ".heartbeat";
- SendBuffer (msgBuffer, contentSize, mExchange, routingKey);
+ sendBuffer (msgBuffer, contentSize, mExchange, routingKey);
}
moveNewObjectsLH();
@@ -350,25 +377,25 @@
if (object->getConfigChanged () || object->isDeleted ())
{
Buffer msgBuffer (msgChars, BUFSIZE);
- EncodeHeader (msgBuffer, 'c');
+ encodeHeader (msgBuffer, 'c');
object->writeProperties(msgBuffer);
contentSize = BUFSIZE - msgBuffer.available ();
msgBuffer.reset ();
routingKey = "mgmt." + uuid.str() + ".prop." + object->getClassName ();
- SendBuffer (msgBuffer, contentSize, mExchange, routingKey);
+ sendBuffer (msgBuffer, contentSize, mExchange, routingKey);
}
if (object->getInstChanged ())
{
Buffer msgBuffer (msgChars, BUFSIZE);
- EncodeHeader (msgBuffer, 'i');
+ encodeHeader (msgBuffer, 'i');
object->writeStatistics(msgBuffer);
contentSize = BUFSIZE - msgBuffer.available ();
msgBuffer.reset ();
routingKey = "mgmt." + uuid.str () + ".stat." + object->getClassName ();
- SendBuffer (msgBuffer, contentSize, mExchange, routingKey);
+ sendBuffer (msgBuffer, contentSize, mExchange, routingKey);
}
if (object->isDeleted ())
@@ -393,12 +420,12 @@
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- EncodeHeader (outBuffer, 'z', sequence);
+ encodeHeader (outBuffer, 'z', sequence);
outBuffer.putLong (code);
outBuffer.putShortString (text);
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
- SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+ sendBuffer (outBuffer, outLen, dExchange, replyToKey);
}
bool ManagementBroker::dispatchCommand (Deliverable& deliverable,
@@ -411,7 +438,7 @@
// Parse the routing key. This management broker should act as though it
// is bound to the exchange to match the following keys:
//
- // agent.0.#
+ // agent.1.0.#
// broker
if (routingKey == "broker") {
@@ -419,12 +446,12 @@
return false;
}
- else if (routingKey.compare(0, 7, "agent.0") == 0) {
+ else if (routingKey.compare(0, 9, "agent.1.0") == 0) {
dispatchAgentCommandLH(msg);
return false;
}
- else if (routingKey.compare(0, 6, "agent.") == 0) {
+ else if (routingKey.compare(0, 8, "agent.1.") == 0) {
return authorizeAgentMessageLH(msg);
}
@@ -447,7 +474,7 @@
inBuffer.getShortString(className);
inBuffer.getBin128(hash);
inBuffer.getShortString(methodName);
- EncodeHeader(outBuffer, 'm', sequence);
+ encodeHeader(outBuffer, 'm', sequence);
if (acl != 0) {
string userId = ((const qpid::broker::ConnectionState*) connToken)->getUserId();
@@ -460,7 +487,7 @@
outBuffer.putShortString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN));
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
- SendBuffer(outBuffer, outLen, dExchange, replyToKey);
+ sendBuffer(outBuffer, outLen, dExchange, replyToKey);
return;
}
}
@@ -476,12 +503,19 @@
outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_INVALID_PARAMETER));
}
else
- iter->second->doMethod(methodName, inBuffer, outBuffer);
+ try {
+ outBuffer.record();
+ iter->second->doMethod(methodName, inBuffer, outBuffer);
+ } catch(std::exception& e) {
+ outBuffer.restore();
+ outBuffer.putLong(Manageable::STATUS_EXCEPTION);
+ outBuffer.putShortString(e.what());
+ }
}
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
- SendBuffer(outBuffer, outLen, dExchange, replyToKey);
+ sendBuffer(outBuffer, outLen, dExchange, replyToKey);
}
void ManagementBroker::handleBrokerRequestLH (Buffer&, string replyToKey, uint32_t sequence)
@@ -489,12 +523,12 @@
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- EncodeHeader (outBuffer, 'b', sequence);
+ encodeHeader (outBuffer, 'b', sequence);
uuid.encode (outBuffer);
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
- SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+ sendBuffer (outBuffer, outLen, dExchange, replyToKey);
}
void ManagementBroker::handlePackageQueryLH (Buffer&, string replyToKey, uint32_t sequence)
@@ -506,11 +540,11 @@
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- EncodeHeader (outBuffer, 'p', sequence);
- EncodePackageIndication (outBuffer, pIter);
+ encodeHeader (outBuffer, 'p', sequence);
+ encodePackageIndication (outBuffer, pIter);
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
- SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+ sendBuffer (outBuffer, outLen, dExchange, replyToKey);
}
sendCommandComplete (replyToKey, sequence);
@@ -521,7 +555,7 @@
std::string packageName;
inBuffer.getShortString(packageName);
- FindOrAddPackageLH(packageName);
+ findOrAddPackageLH(packageName);
}
void ManagementBroker::handleClassQueryLH(Buffer& inBuffer, string replyToKey, uint32_t sequence)
@@ -542,11 +576,11 @@
Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- EncodeHeader(outBuffer, 'q', sequence);
- EncodeClassIndication(outBuffer, pIter, cIter);
+ encodeHeader(outBuffer, 'q', sequence);
+ encodeClassIndication(outBuffer, pIter, cIter);
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
- SendBuffer(outBuffer, outLen, dExchange, replyToKey);
+ sendBuffer(outBuffer, outLen, dExchange, replyToKey);
}
}
}
@@ -558,26 +592,27 @@
std::string packageName;
SchemaClassKey key;
+ uint8_t kind = inBuffer.getOctet();
inBuffer.getShortString(packageName);
inBuffer.getShortString(key.name);
inBuffer.getBin128(key.hash);
- PackageMap::iterator pIter = FindOrAddPackageLH(packageName);
+ PackageMap::iterator pIter = findOrAddPackageLH(packageName);
ClassMap::iterator cIter = pIter->second.find(key);
if (cIter == pIter->second.end()) {
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
uint32_t sequence = nextRequestSequence++;
- EncodeHeader (outBuffer, 'S', sequence);
+ encodeHeader (outBuffer, 'S', sequence);
outBuffer.putShortString(packageName);
outBuffer.putShortString(key.name);
outBuffer.putBin128(key.hash);
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
- SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+ sendBuffer (outBuffer, outLen, dExchange, replyToKey);
- pIter->second.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(sequence)));
+ pIter->second.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(kind, sequence)));
}
}
@@ -612,11 +647,11 @@
SchemaClass& classInfo = cIter->second;
if (classInfo.hasSchema()) {
- EncodeHeader(outBuffer, 's', sequence);
+ encodeHeader(outBuffer, 's', sequence);
classInfo.appendSchema(outBuffer);
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
- SendBuffer(outBuffer, outLen, dExchange, replyToKey);
+ sendBuffer(outBuffer, outLen, dExchange, replyToKey);
}
else
sendCommandComplete(replyToKey, sequence, 1, "Schema not available");
@@ -634,9 +669,10 @@
SchemaClassKey key;
inBuffer.record();
- inBuffer.getShortString (packageName);
- inBuffer.getShortString (key.name);
- inBuffer.getBin128 (key.hash);
+ inBuffer.getOctet();
+ inBuffer.getShortString(packageName);
+ inBuffer.getShortString(key.name);
+ inBuffer.getBin128(key.hash);
inBuffer.restore();
PackageMap::iterator pIter = packages.find(packageName);
@@ -644,7 +680,7 @@
ClassMap& cMap = pIter->second;
ClassMap::iterator cIter = cMap.find(key);
if (cIter != cMap.end() && cIter->second.pendingSequence == sequence) {
- size_t length = ValidateSchema(inBuffer);
+ size_t length = validateSchema(inBuffer, cIter->second.kind);
if (length == 0) {
QPID_LOG(warning, "Management Broker received invalid schema response: " << packageName << "." << key.name);
cMap.erase(key);
@@ -658,11 +694,11 @@
Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- EncodeHeader(outBuffer, 'q');
- EncodeClassIndication(outBuffer, pIter, cIter);
+ encodeHeader(outBuffer, 'q');
+ encodeClassIndication(outBuffer, pIter, cIter);
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
- SendBuffer(outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema");
+ sendBuffer(outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema");
}
}
}
@@ -727,7 +763,7 @@
void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence, const ConnectionToken* connToken)
{
string label;
- uint32_t requestedBank;
+ uint32_t requestedBrokerBank, requestedAgentBank;
uint32_t assignedBank;
ObjectId connectionRef = ((const ConnectionState*) connToken)->GetManagementObject()->getObjectId();
Uuid systemId;
@@ -737,14 +773,15 @@
RemoteAgentMap::iterator aIter = remoteAgents.find(connectionRef);
if (aIter != remoteAgents.end()) {
// There already exists an agent on this session. Reject the request.
- sendCommandComplete (replyToKey, sequence, 1, "Connection already has remote agent");
+ sendCommandComplete(replyToKey, sequence, 1, "Connection already has remote agent");
return;
}
- inBuffer.getShortString (label);
- systemId.decode (inBuffer);
- requestedBank = inBuffer.getLong ();
- assignedBank = assignBankLH (requestedBank);
+ inBuffer.getShortString(label);
+ systemId.decode(inBuffer);
+ requestedBrokerBank = inBuffer.getLong();
+ requestedAgentBank = inBuffer.getLong();
+ assignedBank = assignBankLH(requestedAgentBank);
RemoteAgent* agent = new RemoteAgent;
agent->objIdBank = assignedBank;
@@ -755,7 +792,8 @@
agent->mgmtObject->set_label (label);
agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId());
agent->mgmtObject->set_systemId (systemId);
- agent->mgmtObject->set_objectIdBank (assignedBank);
+ agent->mgmtObject->set_brokerBank (brokerBank);
+ agent->mgmtObject->set_agentBank (assignedBank);
addObject (agent->mgmtObject);
remoteAgents[connectionRef] = agent;
@@ -764,12 +802,12 @@
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- EncodeHeader (outBuffer, 'a', sequence);
+ encodeHeader (outBuffer, 'a', sequence);
outBuffer.putLong (brokerBank);
outBuffer.putLong (assignedBank);
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
- SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+ sendBuffer (outBuffer, outLen, dExchange, replyToKey);
}
void ManagementBroker::handleGetQueryLH (Buffer& inBuffer, string replyToKey, uint32_t sequence)
@@ -799,12 +837,12 @@
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- EncodeHeader (outBuffer, 'g', sequence);
+ encodeHeader (outBuffer, 'g', sequence);
object->writeProperties(outBuffer);
object->writeStatistics(outBuffer, true);
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
- SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+ sendBuffer (outBuffer, outLen, dExchange, replyToKey);
}
}
@@ -824,7 +862,7 @@
msg.encodeContent(inBuffer);
inBuffer.reset();
- if (!CheckHeader(inBuffer, &opcode, &sequence))
+ if (!checkHeader(inBuffer, &opcode, &sequence))
return false;
if (opcode == 'M') {
@@ -861,12 +899,12 @@
Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- EncodeHeader(outBuffer, 'm', sequence);
+ encodeHeader(outBuffer, 'm', sequence);
outBuffer.putLong(Manageable::STATUS_FORBIDDEN);
outBuffer.putShortString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN));
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
- SendBuffer(outBuffer, outLen, dExchange, replyToKey);
+ sendBuffer(outBuffer, outLen, dExchange, replyToKey);
}
return false;
@@ -900,7 +938,7 @@
msg.encodeContent(inBuffer);
inBuffer.reset();
- if (!CheckHeader(inBuffer, &opcode, &sequence))
+ if (!checkHeader(inBuffer, &opcode, &sequence))
return;
if (opcode == 'B') handleBrokerRequestLH (inBuffer, replyToKey, sequence);
@@ -915,7 +953,7 @@
else if (opcode == 'M') handleMethodRequestLH (inBuffer, replyToKey, sequence, msg.getPublisher());
}
-ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackageLH(std::string name)
+ManagementBroker::PackageMap::iterator ManagementBroker::findOrAddPackageLH(std::string name)
{
PackageMap::iterator pIter = packages.find (name);
if (pIter != packages.end ())
@@ -930,19 +968,20 @@
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
- EncodeHeader (outBuffer, 'p');
- EncodePackageIndication (outBuffer, result.first);
+ encodeHeader (outBuffer, 'p');
+ encodePackageIndication (outBuffer, result.first);
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset ();
- SendBuffer (outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema.package");
+ sendBuffer (outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema.package");
return result.first;
}
-void ManagementBroker::AddClass(PackageMap::iterator pIter,
- string className,
- uint8_t* md5Sum,
- ManagementObject::writeSchemaCall_t schemaCall)
+void ManagementBroker::addClassLH(uint8_t kind,
+ PackageMap::iterator pIter,
+ string& className,
+ uint8_t* md5Sum,
+ ManagementObject::writeSchemaCall_t schemaCall)
{
SchemaClassKey key;
ClassMap& cMap = pIter->second;
@@ -958,71 +997,76 @@
QPID_LOG (debug, "ManagementBroker added class " << pIter->first << "." <<
key.name);
- cMap.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(schemaCall)));
+ cMap.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(kind, schemaCall)));
cIter = cMap.find(key);
}
-void ManagementBroker::EncodePackageIndication (Buffer& buf,
- PackageMap::iterator pIter)
+void ManagementBroker::encodePackageIndication(Buffer& buf,
+ PackageMap::iterator pIter)
{
- buf.putShortString ((*pIter).first);
+ buf.putShortString((*pIter).first);
}
-void ManagementBroker::EncodeClassIndication (Buffer& buf,
- PackageMap::iterator pIter,
- ClassMap::iterator cIter)
+void ManagementBroker::encodeClassIndication(Buffer& buf,
+ PackageMap::iterator pIter,
+ ClassMap::iterator cIter)
{
SchemaClassKey key = (*cIter).first;
- buf.putShortString ((*pIter).first);
- buf.putShortString (key.name);
- buf.putBin128 (key.hash);
+ buf.putOctet((*cIter).second.kind);
+ buf.putShortString((*pIter).first);
+ buf.putShortString(key.name);
+ buf.putBin128(key.hash);
}
-size_t ManagementBroker::ValidateSchema(Buffer& inBuffer)
+size_t ManagementBroker::validateSchema(Buffer& inBuffer, uint8_t kind)
+{
+ if (kind == ManagementItem::CLASS_KIND_TABLE)
+ return validateTableSchema(inBuffer);
+ else if (kind == ManagementItem::CLASS_KIND_EVENT)
+ return validateEventSchema(inBuffer);
+ return 0;
+}
+
+size_t ManagementBroker::validateTableSchema(Buffer& inBuffer)
{
uint32_t start = inBuffer.getPosition();
uint32_t end;
string text;
uint8_t hash[16];
- inBuffer.record();
- inBuffer.getShortString(text);
- inBuffer.getShortString(text);
- inBuffer.getBin128(hash);
-
- uint16_t propCount = inBuffer.getShort();
- uint16_t statCount = inBuffer.getShort();
- uint16_t methCount = inBuffer.getShort();
- uint16_t evntCount = inBuffer.getShort();
-
- for (uint16_t idx = 0; idx < propCount + statCount; idx++) {
- FieldTable ft;
- ft.decode(inBuffer);
- }
-
- for (uint16_t idx = 0; idx < methCount; idx++) {
- FieldTable ft;
- ft.decode(inBuffer);
- if (!ft.isSet("argCount"))
+ try {
+ inBuffer.record();
+ uint8_t kind = inBuffer.getOctet();
+ if (kind != ManagementItem::CLASS_KIND_TABLE)
return 0;
- int argCount = ft.getInt("argCount");
- for (int mIdx = 0; mIdx < argCount; mIdx++) {
- FieldTable aft;
- aft.decode(inBuffer);
- }
- }
- for (uint16_t idx = 0; idx < evntCount; idx++) {
- FieldTable ft;
- ft.decode(inBuffer);
- if (!ft.isSet("argCount"))
- return 0;
- int argCount = ft.getInt("argCount");
- for (int mIdx = 0; mIdx < argCount; mIdx++) {
- FieldTable aft;
- aft.decode(inBuffer);
+ inBuffer.getShortString(text);
+ inBuffer.getShortString(text);
+ inBuffer.getBin128(hash);
+
+ uint16_t propCount = inBuffer.getShort();
+ uint16_t statCount = inBuffer.getShort();
+ uint16_t methCount = inBuffer.getShort();
+
+ for (uint16_t idx = 0; idx < propCount + statCount; idx++) {
+ FieldTable ft;
+ ft.decode(inBuffer);
+ }
+
+ for (uint16_t idx = 0; idx < methCount; idx++) {
+ FieldTable ft;
+ ft.decode(inBuffer);
+ if (!ft.isSet("argCount"))
+ return 0;
+ int argCount = ft.getInt("argCount");
+ for (int mIdx = 0; mIdx < argCount; mIdx++) {
+ FieldTable aft;
+ aft.decode(inBuffer);
+ }
}
+ } catch (std::exception& e) {
+ return 0;
}
end = inBuffer.getPosition();
@@ -1030,24 +1074,34 @@
return end - start;
}
-Mutex& ManagementBroker::getMutex()
+size_t ManagementBroker::validateEventSchema(Buffer& inBuffer)
{
- return userLock;
-}
+ uint32_t start = inBuffer.getPosition();
+ uint32_t end;
+ string text;
+ uint8_t hash[16];
-Buffer* ManagementBroker::startEventLH()
-{
- Buffer* outBuffer(new Buffer(eventBuffer, MA_BUFFER_SIZE));
- EncodeHeader(*outBuffer, 'e');
- outBuffer->putLongLong(uint64_t(Duration(now())));
- return outBuffer;
-}
+ try {
+ inBuffer.record();
+ uint8_t kind = inBuffer.getOctet();
+ if (kind != ManagementItem::CLASS_KIND_EVENT)
+ return 0;
-void ManagementBroker::finishEventLH(Buffer* outBuffer)
-{
- uint32_t outLen = MA_BUFFER_SIZE - outBuffer->available();
- outBuffer->reset();
- SendBuffer(*outBuffer, outLen, mExchange, "mgmt.event");
- delete outBuffer;
-}
+ inBuffer.getShortString(text);
+ inBuffer.getShortString(text);
+ inBuffer.getBin128(hash);
+
+ uint16_t argCount = inBuffer.getShort();
+ for (uint16_t idx = 0; idx < argCount; idx++) {
+ FieldTable ft;
+ ft.decode(inBuffer);
+ }
+ } catch (std::exception& e) {
+ return 0;
+ }
+
+ end = inBuffer.getPosition();
+ inBuffer.restore(); // restore original position
+ return end - start;
+}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h?rev=702651&r1=702650&r2=702651&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h Tue Oct 7 14:47:35 2008
@@ -38,11 +38,11 @@
class ManagementBroker : public ManagementAgent
{
- private:
+private:
int threadPoolSize;
- public:
+public:
ManagementBroker ();
virtual ~ManagementBroker ();
@@ -52,13 +52,18 @@
void setExchange (qpid::broker::Exchange::shared_ptr mgmtExchange,
qpid::broker::Exchange::shared_ptr directExchange);
int getMaxThreads () { return threadPoolSize; }
- void RegisterClass (std::string packageName,
- std::string className,
+ void registerClass (std::string& packageName,
+ std::string& className,
+ uint8_t* md5Sum,
+ ManagementObject::writeSchemaCall_t schemaCall);
+ void registerEvent (std::string& packageName,
+ std::string& eventName,
uint8_t* md5Sum,
ManagementObject::writeSchemaCall_t schemaCall);
ObjectId addObject (ManagementObject* object,
uint64_t persistId = 0);
- void clientAdded (void);
+ void raiseEvent(const ManagementEvent& event);
+ void clientAdded ();
bool dispatchCommand (qpid::broker::Deliverable& msg,
const std::string& routingKey,
const framing::FieldTable* args);
@@ -68,7 +73,7 @@
uint32_t pollCallbacks (uint32_t) { assert(0); return 0; }
int getSignalFd () { assert(0); return -1; }
- private:
+private:
friend class ManagementAgent;
struct Periodic : public qpid::broker::TimerTask
@@ -127,15 +132,16 @@
struct SchemaClass
{
+ uint8_t kind;
ManagementObject::writeSchemaCall_t writeSchemaCall;
uint32_t pendingSequence;
size_t bufferLen;
uint8_t* buffer;
- SchemaClass(uint32_t seq) :
- writeSchemaCall(0), pendingSequence(seq), bufferLen(0), buffer(0) {}
- SchemaClass(ManagementObject::writeSchemaCall_t call) :
- writeSchemaCall(call), pendingSequence(0), bufferLen(0), buffer(0) {}
+ SchemaClass(uint8_t _kind, uint32_t seq) :
+ kind(_kind), writeSchemaCall(0), pendingSequence(seq), bufferLen(0), buffer(0) {}
+ SchemaClass(uint8_t _kind, ManagementObject::writeSchemaCall_t call) :
+ kind(_kind), writeSchemaCall(call), pendingSequence(0), bufferLen(0), buffer(0) {}
bool hasSchema () { return (writeSchemaCall != 0) || (buffer != 0); }
void appendSchema (framing::Buffer& buf);
};
@@ -154,12 +160,12 @@
framing::Uuid uuid;
sys::Mutex addLock;
sys::Mutex userLock;
- qpid::broker::Timer timer;
+ qpid::broker::Timer timer;
qpid::broker::Exchange::shared_ptr mExchange;
qpid::broker::Exchange::shared_ptr dExchange;
std::string dataDir;
uint16_t interval;
- qpid::broker::Broker* broker;
+ qpid::broker::Broker* broker;
uint16_t bootSequence;
uint32_t nextObjectId;
uint32_t brokerBank;
@@ -173,10 +179,10 @@
char eventBuffer[MA_BUFFER_SIZE];
void writeData ();
- void PeriodicProcessing (void);
- void EncodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0);
- bool CheckHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
- void SendBuffer (framing::Buffer& buf,
+ void periodicProcessing (void);
+ void encodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0);
+ bool checkHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
+ void sendBuffer (framing::Buffer& buf,
uint32_t length,
qpid::broker::Exchange::shared_ptr exchange,
std::string routingKey);
@@ -185,14 +191,15 @@
bool authorizeAgentMessageLH(qpid::broker::Message& msg);
void dispatchAgentCommandLH(qpid::broker::Message& msg);
- PackageMap::iterator FindOrAddPackageLH(std::string name);
- void AddClass(PackageMap::iterator pIter,
- std::string className,
- uint8_t* md5Sum,
- ManagementObject::writeSchemaCall_t schemaCall);
- void EncodePackageIndication (framing::Buffer& buf,
+ PackageMap::iterator findOrAddPackageLH(std::string name);
+ void addClassLH(uint8_t kind,
+ PackageMap::iterator pIter,
+ std::string& className,
+ uint8_t* md5Sum,
+ ManagementObject::writeSchemaCall_t schemaCall);
+ void encodePackageIndication (framing::Buffer& buf,
PackageMap::iterator pIter);
- void EncodeClassIndication (framing::Buffer& buf,
+ void encodeClassIndication (framing::Buffer& buf,
PackageMap::iterator pIter,
ClassMap::iterator cIter);
bool bankInUse (uint32_t bank);
@@ -212,10 +219,9 @@
void handleGetQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
void handleMethodRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken);
- size_t ValidateSchema(framing::Buffer&);
- sys::Mutex& getMutex();
- framing::Buffer* startEventLH();
- void finishEventLH(framing::Buffer* outBuffer);
+ size_t validateSchema(framing::Buffer&, uint8_t kind);
+ size_t validateTableSchema(framing::Buffer&);
+ size_t validateEventSchema(framing::Buffer&);
};
}}
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementEvent.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementEvent.h?rev=702651&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementEvent.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementEvent.h Tue Oct 7 14:47:35 2008
@@ -0,0 +1,48 @@
+#ifndef _ManagementEvent_
+#define _ManagementEvent_
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ManagementObject.h"
+#include <qpid/framing/Buffer.h>
+#include <string>
+
+namespace qpid {
+namespace management {
+
+class ManagementAgent;
+
+class ManagementEvent : public ManagementItem {
+public:
+ typedef void (*writeSchemaCall_t)(qpid::framing::Buffer&);
+ virtual ~ManagementEvent() {}
+
+ virtual writeSchemaCall_t getWriteSchemaCall(void) = 0;
+ virtual std::string& getEventName() const = 0;
+ virtual std::string& getPackageName() const = 0;
+ virtual uint8_t* getMd5Sum() const = 0;
+ virtual void encode(qpid::framing::Buffer&) const = 0;
+};
+
+}}
+
+#endif /*!_ManagementEvent_*/
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.cpp?rev=702651&r1=702650&r2=702651&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.cpp Tue Oct 7 14:47:35 2008
@@ -84,6 +84,21 @@
second = buffer.getLongLong();
}
+namespace qpid {
+namespace management {
+
+std::ostream& operator<<(std::ostream& out, const ObjectId& i)
+{
+ out << "[" << ((i.first & 0xF000000000000000LL) >> 60) <<
+ "-" << ((i.first & 0x0FFF000000000000LL) >> 48) <<
+ "-" << ((i.first & 0x0000FFFFF0000000LL) >> 32) <<
+ "-" << (i.first & 0x000000000FFFFFFFLL) <<
+ "-" << i.second << "]";
+ return out;
+}
+
+}}
+
int ManagementObject::nextThreadIndex = 0;
void ManagementObject::writeTimestamps (Buffer& buf)
@@ -109,18 +124,3 @@
}
return thisIndex;
}
-
-Mutex& ManagementObject::getMutex()
-{
- return agent->getMutex();
-}
-
-Buffer* ManagementObject::startEventLH()
-{
- return agent->startEventLH();
-}
-
-void ManagementObject::finishEventLH(Buffer* buf)
-{
- agent->finishEventLH(buf);
-}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h?rev=702651&r1=702650&r2=702651&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h Tue Oct 7 14:47:35 2008
@@ -46,7 +46,7 @@
class ObjectId {
-private:
+protected:
const AgentAttachment* agent;
uint64_t first;
uint64_t second;
@@ -59,23 +59,11 @@
bool operator<(const ObjectId &other) const;
void encode(framing::Buffer& buffer);
void decode(framing::Buffer& buffer);
+ friend std::ostream& operator<<(std::ostream&, const ObjectId&);
};
-class ManagementObject
-{
- protected:
-
- uint64_t createTime;
- uint64_t destroyTime;
- ObjectId objectId;
- bool configChanged;
- bool instChanged;
- bool deleted;
- Manageable* coreObject;
- sys::Mutex accessLock;
- ManagementAgent* agent;
- int maxThreads;
-
+class ManagementItem {
+public:
static const uint8_t TYPE_U8 = 1;
static const uint8_t TYPE_U16 = 2;
static const uint8_t TYPE_U32 = 3;
@@ -107,15 +95,35 @@
static const uint8_t FLAG_INDEX = 0x02;
static const uint8_t FLAG_END = 0x80;
- static int nextThreadIndex;
+ const static uint8_t CLASS_KIND_TABLE = 1;
+ const static uint8_t CLASS_KIND_EVENT = 2;
+
+
+
+public:
+ virtual ~ManagementItem() {}
+};
+
+class ManagementObject : public ManagementItem
+{
+ protected:
+
+ uint64_t createTime;
+ uint64_t destroyTime;
+ ObjectId objectId;
+ bool configChanged;
+ bool instChanged;
+ bool deleted;
+ Manageable* coreObject;
+ sys::Mutex accessLock;
+ ManagementAgent* agent;
+ int maxThreads;
+
+ static int nextThreadIndex;
int getThreadIndex();
void writeTimestamps (qpid::framing::Buffer& buf);
- sys::Mutex& getMutex();
- framing::Buffer* startEventLH();
- void finishEventLH(framing::Buffer* buf);
-
public:
typedef void (*writeSchemaCall_t) (qpid::framing::Buffer&);
@@ -129,14 +137,14 @@
virtual void writeProperties(qpid::framing::Buffer& buf) = 0;
virtual void writeStatistics(qpid::framing::Buffer& buf,
bool skipHeaders = false) = 0;
- virtual void doMethod (std::string methodName,
+ virtual void doMethod (std::string& methodName,
qpid::framing::Buffer& inBuf,
qpid::framing::Buffer& outBuf) = 0;
virtual void setReference (ObjectId objectId);
- virtual std::string& getClassName (void) = 0;
- virtual std::string& getPackageName (void) = 0;
- virtual uint8_t* getMd5Sum (void) = 0;
+ virtual std::string& getClassName (void) const = 0;
+ virtual std::string& getPackageName (void) const = 0;
+ virtual uint8_t* getMd5Sum (void) const = 0;
void setObjectId (ObjectId oid) { objectId = oid; }
ObjectId getObjectId (void) { return objectId; }
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/management/Names.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/management/Names.java?rev=702651&r1=702650&r2=702651&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/management/Names.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/management/Names.java Tue Oct 7 14:47:35 2008
@@ -35,7 +35,7 @@
String METHOD_REPLY_QUEUE_PREFIX = "reply.";
String AMQ_DIRECT_QUEUE = "amq.direct";
- String AGENT_ROUTING_KEY = "agent.0";
+ String AGENT_ROUTING_KEY = "agent.1.0";
String BROKER_ROUTING_KEY = "broker";
@@ -49,4 +49,4 @@
String CONFIGURATION_FILE_NAME = "/org/apache/qpid/management/config.xml";
String ARG_COUNT_PARAM_NAME = "argCount";
-}
\ No newline at end of file
+}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/management/Protocol.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/management/Protocol.java?rev=702651&r1=702650&r2=702651&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/management/Protocol.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/management/Protocol.java Tue Oct 7 14:47:35 2008
@@ -27,8 +27,8 @@
*/
public interface Protocol
{
- String MAGIC_NUMBER = "AM1";
+ String MAGIC_NUMBER = "AM2";
byte [] METHOD_REQUEST_FIRST_FOUR_BYTES = (MAGIC_NUMBER+"M").getBytes();
byte [] SCHEMA_REQUEST_FIRST_FOUR_BYTES = (MAGIC_NUMBER+"S").getBytes();
-}
\ No newline at end of file
+}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/management/domain/handler/impl/SchemaResponseMessageHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/management/domain/handler/impl/SchemaResponseMessageHandler.java?rev=702651&r1=702650&r2=702651&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/management/domain/handler/impl/SchemaResponseMessageHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/management/domain/handler/impl/SchemaResponseMessageHandler.java Tue Oct 7 14:47:35 2008
@@ -49,6 +49,10 @@
{
try
{
+ int classKind = decoder.readUint8();
+ if (classKind != 1) {
+ return;
+ }
String packageName = decoder.readStr8();
String className = decoder.readStr8();
@@ -57,7 +61,7 @@
int howManyProperties = decoder.readUint16();
int howManyStatistics = decoder.readUint16();
int howManyMethods = decoder.readUint16();
- int howManyEvents = decoder.readUint16();
+ int howManyEvents = 0;
// FIXME : Divide between schema error and raw data conversion error!!!!
_domainModel.addSchema(
@@ -155,4 +159,4 @@
}
return result;
}
- }
\ No newline at end of file
+ }
Modified: incubator/qpid/trunk/qpid/python/commands/qpid-config
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/commands/qpid-config?rev=702651&r1=702650&r2=702651&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/commands/qpid-config (original)
+++ incubator/qpid/trunk/qpid/python/commands/qpid-config Tue Oct 7 14:47:35 2008
@@ -84,8 +84,8 @@
self.qmf.delBroker(self.broker)
def Overview (self):
- exchanges = self.qmf.getObjects(cls="exchange")
- queues = self.qmf.getObjects(cls="queue")
+ exchanges = self.qmf.getObjects(_class="exchange")
+ queues = self.qmf.getObjects(_class="queue")
print "Total Exchanges: %d" % len (exchanges)
etype = {}
for ex in exchanges:
@@ -106,7 +106,7 @@
print " non-durable: %d" % (len (queues) - _durable)
def ExchangeList (self, filter):
- exchanges = self.qmf.getObjects(cls="exchange")
+ exchanges = self.qmf.getObjects(_class="exchange")
print "Durable Type Bindings Exchange Name"
print "======================================================="
for ex in exchanges:
@@ -114,9 +114,9 @@
print "%4c %-10s%5d %s" % (YN (ex.durable), ex.type, ex.bindingCount, ex.name)
def ExchangeListRecurse (self, filter):
- exchanges = self.qmf.getObjects(cls="exchange")
- bindings = self.qmf.getObjects(cls="binding")
- queues = self.qmf.getObjects(cls="queue")
+ exchanges = self.qmf.getObjects(_class="exchange")
+ bindings = self.qmf.getObjects(_class="binding")
+ queues = self.qmf.getObjects(_class="queue")
for ex in exchanges:
if self.match (ex.name, filter):
print "Exchange '%s' (%s)" % (ex.name, ex.type)
@@ -130,8 +130,8 @@
def QueueList (self, filter):
- queues = self.qmf.getObjects(cls="queue")
- journals = self.qmf.getObjects(cls="journal")
+ queues = self.qmf.getObjects(_class="queue")
+ journals = self.qmf.getObjects(_class="journal")
print " Store Size"
print "Durable AutoDel Excl Bindings (files x file pages) Queue Name"
print "==========================================================================================="
@@ -151,9 +151,9 @@
YN (q.exclusive), q.bindingCount, q.name)
def QueueListRecurse (self, filter):
- exchanges = self.qmf.getObjects(cls="exchange")
- bindings = self.qmf.getObjects(cls="binding")
- queues = self.qmf.getObjects(cls="queue")
+ exchanges = self.qmf.getObjects(_class="exchange")
+ bindings = self.qmf.getObjects(_class="binding")
+ queues = self.qmf.getObjects(_class="queue")
for queue in queues:
if self.match (queue.name, filter):
print "Queue '%s'" % queue.name
Modified: incubator/qpid/trunk/qpid/python/commands/qpid-printevents
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/commands/qpid-printevents?rev=702651&r1=702650&r2=702651&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/commands/qpid-printevents (original)
+++ incubator/qpid/trunk/qpid/python/commands/qpid-printevents Tue Oct 7 14:47:35 2008
@@ -30,16 +30,13 @@
def event(self, broker, event):
print event
- def heartbeat(self, agent, timestamp):
- print "Heartbeat"
-
##
## Main Program
##
def main():
_usage = "%prog [options] [broker-addr]..."
_description = \
-"""Collect and print events from one of more Qpid message brokers. If no broker-addr is
+"""Collect and print events from one or more Qpid message brokers. If no broker-addr is
supplied, %prog will connect to 'localhost:5672'.
broker-addr is of the form: [username/password@] hostname | ip-address [:<port>]
ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost
Modified: incubator/qpid/trunk/qpid/python/commands/qpid-route
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/commands/qpid-route?rev=702651&r1=702650&r2=702651&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/commands/qpid-route (original)
+++ incubator/qpid/trunk/qpid/python/commands/qpid-route Tue Oct 7 14:47:35 2008
@@ -62,7 +62,7 @@
self.qmf.delBroker(self.broker)
def getLink (self):
- links = self.qmf.getObjects(cls="link")
+ links = self.qmf.getObjects(_class="link")
for link in links:
if "%s:%d" % (link.host, link.port) == self.src.name ():
return link
@@ -74,7 +74,7 @@
print "Linking broker to itself is not permitted"
sys.exit(1)
- brokers = self.qmf.getObjects(cls="broker")
+ brokers = self.qmf.getObjects(_class="broker")
broker = brokers[0]
link = self.getLink()
if link != None:
@@ -92,7 +92,7 @@
def DelLink (self, srcBroker):
self.src = qmfconsole.BrokerURL(srcBroker)
- brokers = self.qmf.getObjects(cls="broker")
+ brokers = self.qmf.getObjects(_class="broker")
broker = brokers[0]
link = self.getLink()
if link == None:
@@ -103,7 +103,7 @@
print "Close method returned:", res.status, res.text
def ListLinks (self):
- links = self.qmf.getObjects(cls="link")
+ links = self.qmf.getObjects(_class="link")
if len(links) == 0:
print "No Links Found"
else:
@@ -119,7 +119,7 @@
if self.dest.name() == self.src.name():
raise Exception("Linking broker to itself is not permitted")
- brokers = self.qmf.getObjects(cls="broker")
+ brokers = self.qmf.getObjects(_class="broker")
broker = brokers[0]
link = self.getLink()
@@ -140,7 +140,7 @@
if link == None:
raise Exception("Protocol Error - Missing link ID")
- bridges = self.qmf.getObjects(cls="bridge")
+ bridges = self.qmf.getObjects(_class="bridge")
for bridge in bridges:
if bridge.linkRef == link.getObjectId() and \
bridge.dest == exchange and bridge.key == routingKey:
@@ -164,7 +164,7 @@
raise Exception("No link found from %s to %s" % (self.src.name(), self.dest.name()))
sys.exit (0)
- bridges = self.qmf.getObjects(cls="bridge")
+ bridges = self.qmf.getObjects(_class="bridge")
for bridge in bridges:
if bridge.linkRef == link.getObjectId() and bridge.dest == exchange and bridge.key == routingKey:
if _verbose:
@@ -186,8 +186,8 @@
raise Exception("Route not found")
def ListRoutes (self):
- links = self.qmf.getObjects(cls="link")
- bridges = self.qmf.getObjects(cls="bridge")
+ links = self.qmf.getObjects(_class="link")
+ bridges = self.qmf.getObjects(_class="bridge")
for bridge in bridges:
myLink = None
@@ -199,8 +199,8 @@
print "%s %s:%d %s %s" % (self.dest.name(), myLink.host, myLink.port, bridge.dest, bridge.key)
def ClearAllRoutes (self):
- links = self.qmf.getObjects(cls="link")
- bridges = self.qmf.getObjects(cls="bridge")
+ links = self.qmf.getObjects(_class="link")
+ bridges = self.qmf.getObjects(_class="bridge")
for bridge in bridges:
if _verbose:
@@ -218,7 +218,7 @@
print "Ok"
if _dellink:
- links = self.qmf.getObjects(cls="link")
+ links = self.qmf.getObjects(_class="link")
for link in links:
if _verbose:
print "Deleting Link: %s:%d... " % (link.host, link.port),
Modified: incubator/qpid/trunk/qpid/python/qpid/management.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/management.py?rev=702651&r1=702650&r2=702651&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/management.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/management.py Tue Oct 7 14:47:35 2008
@@ -285,7 +285,7 @@
ft = {}
ft["_class"] = className
codec.write_map (ft)
- msg = channel.message(codec.encoded, routing_key="agent.%d" % bank)
+ msg = channel.message(codec.encoded, routing_key="agent.1.%d" % bank)
channel.send ("qpid.management", msg)
def syncWaitForStable (self, channel):
@@ -398,7 +398,7 @@
""" Compose the header of a management message. """
codec.write_uint8 (ord ('A'))
codec.write_uint8 (ord ('M'))
- codec.write_uint8 (ord ('1'))
+ codec.write_uint8 (ord ('2'))
codec.write_uint8 (opcode)
codec.write_uint32 (seq)
@@ -412,7 +412,7 @@
if octet != 'M':
return None
octet = chr (codec.read_uint8 ())
- if octet != '1':
+ if octet != '2':
return None
opcode = chr (codec.read_uint8 ())
seq = codec.read_uint32 ()
@@ -433,7 +433,7 @@
elif typecode == 6:
codec.write_str8 (value)
elif typecode == 7:
- codec.write_vbin32 (value)
+ codec.write_str16 (value)
elif typecode == 8: # ABSTIME
codec.write_uint64 (long (value))
elif typecode == 9: # DELTATIME
@@ -476,7 +476,7 @@
elif typecode == 6:
data = str (codec.read_str8 ())
elif typecode == 7:
- data = codec.read_vbin32 ()
+ data = codec.read_str16 ()
elif typecode == 8: # ABSTIME
data = codec.read_uint64 ()
elif typecode == 9: # DELTATIME
@@ -604,6 +604,9 @@
ch.send ("qpid.management", smsg)
def handleClassInd (self, ch, codec):
+ kind = codec.read_uint8()
+ if kind != 1: # This API doesn't handle new-style events
+ return
pname = str (codec.read_str8())
cname = str (codec.read_str8())
hash = codec.read_bin128()
@@ -656,13 +659,15 @@
def parseSchema (self, ch, codec):
""" Parse a received schema-description message. """
self.decOutstanding (ch)
+ kind = codec.read_uint8()
+ if kind != 1: # This API doesn't handle new-style events
+ return
packageName = str (codec.read_str8 ())
className = str (codec.read_str8 ())
hash = codec.read_bin128 ()
configCount = codec.read_uint16 ()
instCount = codec.read_uint16 ()
methodCount = codec.read_uint16 ()
- eventCount = codec.read_uint16 ()
if packageName not in self.packages:
return
@@ -676,7 +681,6 @@
configs = []
insts = []
methods = {}
- events = {}
configs.append (("id", 4, "", "", 1, 1, None, None, None, None, None))
insts.append (("id", 4, None, None))
@@ -765,42 +769,14 @@
args.append (arg)
methods[mname] = (mdesc, args)
- for idx in range (eventCount):
- ft = codec.read_map ()
- ename = str (ft["name"])
- argCount = ft["argCount"]
- if "desc" in ft:
- edesc = str (ft["desc"])
- else:
- edesc = None
-
- args = []
- for aidx in range (argCount):
- ft = codec.read_map ()
- name = str (ft["name"])
- type = ft["type"]
- unit = None
- desc = None
-
- for key, value in ft.items ():
- if key == "unit":
- unit = str (value)
- elif key == "desc":
- desc = str (value)
-
- arg = (name, type, unit, desc)
- args.append (arg)
- events[ename] = (edesc, args)
-
schemaClass = {}
schemaClass['C'] = configs
schemaClass['I'] = insts
schemaClass['M'] = methods
- schemaClass['E'] = events
self.schema[classKey] = schemaClass
if self.schemaCb != None:
- self.schemaCb (ch.context, classKey, configs, insts, methods, events)
+ self.schemaCb (ch.context, classKey, configs, insts, methods, {})
def parsePresenceMasks(self, codec, schemaClass):
""" Generate a list of not-present properties """
@@ -896,7 +872,7 @@
codec.write_str8 (classId[1])
codec.write_bin128 (classId[2])
codec.write_str8 (methodName)
- bank = objId.getBank()
+ bank = "%d.%d" % (objId.getBroker(), objId.getBank())
# Encode args according to schema
if classId not in self.schema:
@@ -926,5 +902,5 @@
packageName = classId[0]
className = classId[1]
- msg = channel.message(codec.encoded, "agent." + str(bank))
+ msg = channel.message(codec.encoded, "agent." + bank)
channel.send ("qpid.management", msg)
Modified: incubator/qpid/trunk/qpid/python/qpid/managementdata.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/managementdata.py?rev=702651&r1=702650&r2=702651&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/managementdata.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/managementdata.py Tue Oct 7 14:47:35 2008
@@ -546,10 +546,10 @@
for classKey in sorted:
tuple = self.schema[classKey]
row = (self.displayClassName(classKey), len (tuple[0]), len (tuple[1]),
- len (tuple[2]), len (tuple[3]))
+ len (tuple[2]))
rows.append (row)
self.disp.table ("Classes in Schema:",
- ("Class", "Properties", "Statistics", "Methods", "Events"),
+ ("Class", "Properties", "Statistics", "Methods"),
rows)
finally:
self.lock.release ()
Modified: incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py?rev=702651&r1=702650&r2=702651&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py Tue Oct 7 14:47:35 2008
@@ -49,7 +49,7 @@
""" Invoked when a QMF package is discovered. """
pass
- def newClass(self, classKey):
+ def newClass(self, kind, classKey):
""" Invoked when a new class is discovered. Session.getSchema can be
used to obtain details about the class."""
pass
@@ -158,7 +158,7 @@
raise Exception(broker.error)
self.brokers.append(broker)
- self.getObjects(broker=broker, cls="agent")
+ self.getObjects(broker=broker, _class="agent")
return broker
def delBroker(self, broker):
@@ -219,35 +219,36 @@
The class for queried objects may be specified in one of the following ways:
- schema = <schema> - supply a schema object returned from getSchema
- key = <key> - supply a classKey from the list returned by getClasses
- cls = <name> - supply a class name as a string
+ _schema = <schema> - supply a schema object returned from getSchema.
+ _key = <key> - supply a classKey from the list returned by getClasses.
+ _class = <name> - supply a class name as a string. If the class name exists
+ in multiple packages, a _package argument may also be supplied.
If objects should be obtained from only one agent, use the following argument.
Otherwise, the query will go to all agents.
- agent = <agent> - supply an agent from the list returned by getAgents
+ _agent = <agent> - supply an agent from the list returned by getAgents.
If the get query is to be restricted to one broker (as opposed to all connected brokers),
add the following argument:
- broker = <broker> - supply a broker as returned by addBroker
+ _broker = <broker> - supply a broker as returned by addBroker.
If additional arguments are supplied, they are used as property selectors. For example,
if the argument name="test" is supplied, only objects whose "name" property is "test"
will be returned in the result.
"""
- if "broker" in kwargs:
+ if "_broker" in kwargs:
brokerList = []
- brokerList.append(kwargs["broker"])
+ brokerList.append(kwargs["_broker"])
else:
brokerList = self.brokers
for broker in brokerList:
broker._waitForStable()
agentList = []
- if "agent" in kwargs:
- agent = kwargs["agent"]
+ if "_agent" in kwargs:
+ agent = kwargs["_agent"]
if agent.broker not in brokerList:
raise Exception("Supplied agent is not accessible through the supplied broker")
agentList.append(agent)
@@ -257,11 +258,14 @@
agentList.append(agent)
cname = None
- if "schema" in kwargs: pname, cname, hash = kwargs["schema"].getKey()
- elif "key" in kwargs: pname, cname, hash = kwargs["key"]
- elif "cls" in kwargs: pname, cname, hash = None, kwargs["cls"], None
+ if "_schema" in kwargs: pname, cname, hash = kwargs["_schema"].getKey()
+ elif "_key" in kwargs: pname, cname, hash = kwargs["_key"]
+ elif "_class" in kwargs:
+ pname, cname, hash = None, kwargs["_class"], None
+ if "_package" in kwargs:
+ pname = kwargs["_package"]
if cname == None:
- raise Exception("No class supplied, use 'schema', 'key', or 'cls' argument")
+ raise Exception("No class supplied, use '_schema', '_key', or '_class' argument")
map = {}
map["_class"] = cname
if pname != None: map["_package"] = pname
@@ -269,7 +273,7 @@
self.getSelect = []
for item in kwargs:
- if item != "schema" and item != "key" and item != "cls":
+ if item[0] != '_':
self.getSelect.append((item, kwargs[item]))
self.getResult = []
@@ -282,7 +286,7 @@
self.cv.release()
broker._setHeader(sendCodec, 'G', seq)
sendCodec.write_map(map)
- smsg = broker._message(sendCodec.encoded, "agent.%d" % agent.bank)
+ smsg = broker._message(sendCodec.encoded, "agent.%s" % agent.bank)
broker._send(smsg)
starttime = time()
@@ -382,6 +386,7 @@
self.cv.release()
def _handleClassInd(self, broker, codec, seq):
+ kind = codec.read_uint8()
pname = str(codec.read_str8())
cname = str(codec.read_str8())
hash = codec.read_bin128()
@@ -431,17 +436,18 @@
self.console.event(broker, event)
def _handleSchemaResp(self, broker, codec, seq):
+ kind = codec.read_uint8()
pname = str(codec.read_str8())
cname = str(codec.read_str8())
hash = codec.read_bin128()
classKey = (pname, cname, hash)
- _class = SchemaClass(classKey, codec)
+ _class = SchemaClass(kind, classKey, codec)
self.cv.acquire()
self.packages[pname][(cname, hash)] = _class
self.cv.release()
broker._decOutstanding()
if self.console != None:
- self.console.newClass(classKey)
+ self.console.newClass(kind, classKey)
def _handleContentInd(self, broker, codec, seq, prop=False, stat=False):
pname = str(codec.read_str8())
@@ -485,7 +491,7 @@
def _selectMatch(self, object):
""" Check the object against self.getSelect to check for a match """
for key, value in self.getSelect:
- for prop, propval in object.properties:
+ for prop, propval in object.getProperties():
if key == prop.name and value != propval:
return False
return True
@@ -497,7 +503,7 @@
elif typecode == 3: data = codec.read_uint32() # U32
elif typecode == 4: data = codec.read_uint64() # U64
elif typecode == 6: data = str(codec.read_str8()) # SSTR
- elif typecode == 7: data = codec.read_vbin32() # LSTR
+ elif typecode == 7: data = codec.read_str16() # LSTR
elif typecode == 8: data = codec.read_int64() # ABSTIME
elif typecode == 9: data = codec.read_uint64() # DELTATIME
elif typecode == 10: data = ObjectId(codec) # REF
@@ -521,7 +527,7 @@
elif typecode == 3: codec.write_uint32 (long(value)) # U32
elif typecode == 4: codec.write_uint64 (long(value)) # U64
elif typecode == 6: codec.write_str8 (value) # SSTR
- elif typecode == 7: codec.write_vbin32 (value) # LSTR
+ elif typecode == 7: codec.write_str16 (value) # LSTR
elif typecode == 8: codec.write_int64 (long(value)) # ABSTIME
elif typecode == 9: codec.write_uint64 (long(value)) # DELTATIME
elif typecode == 10: value.encode (codec) # REF
@@ -577,30 +583,42 @@
class SchemaClass:
""" """
- def __init__(self, key, codec):
+ CLASS_KIND_TABLE = 1
+ CLASS_KIND_EVENT = 2
+
+ def __init__(self, kind, key, codec):
+ self.kind = kind
self.classKey = key
self.properties = []
self.statistics = []
- self.methods = []
- self.events = []
+ self.methods = []
+ self.arguments = []
- propCount = codec.read_uint16()
- statCount = codec.read_uint16()
- methodCount = codec.read_uint16()
- eventCount = codec.read_uint16()
-
- for idx in range(propCount):
- self.properties.append(SchemaProperty(codec))
- for idx in range(statCount):
- self.statistics.append(SchemaStatistic(codec))
- for idx in range(methodCount):
- self.methods.append(SchemaMethod(codec))
- for idx in range(eventCount):
- self.events.append(SchemaEvent(codec))
+ if self.kind == self.CLASS_KIND_TABLE:
+ propCount = codec.read_uint16()
+ statCount = codec.read_uint16()
+ methodCount = codec.read_uint16()
+ for idx in range(propCount):
+ self.properties.append(SchemaProperty(codec))
+ for idx in range(statCount):
+ self.statistics.append(SchemaStatistic(codec))
+ for idx in range(methodCount):
+ self.methods.append(SchemaMethod(codec))
+
+ elif self.kind == self.CLASS_KIND_EVENT:
+ argCount = codec.read_uint16()
+ for idx in range(argCount):
+ self.arguments.append(SchemaArgument(codec, methodArg=False))
def __repr__(self):
pname, cname, hash = self.classKey
- result = "Class: %s:%s " % (pname, cname)
+ if self.kind == self.CLASS_KIND_TABLE:
+ kindStr = "Table"
+ elif self.kind == self.CLASS_KIND_EVENT:
+ kindStr = "Event"
+ else:
+ kindStr = "Unsupported"
+ result = "%s Class: %s:%s " % (kindStr, pname, cname)
result += "(%08x-%04x-%04x-%04x-%04x%08x)" % struct.unpack ("!LHHHHL", hash)
return result
@@ -620,9 +638,9 @@
""" Return the list of methods for the class. """
return self.methods
- def getEvents(self):
+ def getArguments(self):
""" Return the list of events for the class. """
- return self.events
+ return self.arguments
class SchemaProperty:
""" """
@@ -693,33 +711,6 @@
result += ")"
return result
-class SchemaEvent:
- """ """
- def __init__(self, codec):
- map = codec.read_map()
- self.name = str(map["name"])
- argCount = map["argCount"]
- if "desc" in map:
- self.desc = str(map["desc"])
- else:
- self.desc = None
- self.arguments = []
-
- for idx in range(argCount):
- self.arguments.append(SchemaArgument(codec, methodArg=False))
-
- def __repr__(self):
- result = self.name + "("
- first = True
- for arg in self.arguments:
- if first:
- first = False
- else:
- result += ", "
- result += arg.name
- result += ")"
- return result
-
class SchemaArgument:
""" """
def __init__(self, codec, methodArg):
@@ -743,7 +734,7 @@
elif key == "desc" : self.desc = str(value)
elif key == "default" : self.default = str(value)
-class ObjectId(object):
+class ObjectId:
""" Object that represents QMF object identifiers """
def __init__(self, codec, first=0, second=0):
if codec:
@@ -800,80 +791,86 @@
""" """
def __init__(self, session, broker, schema, codec, prop, stat):
""" """
- self.session = session
- self.broker = broker
- self.schema = schema
- self.currentTime = codec.read_uint64()
- self.createTime = codec.read_uint64()
- self.deleteTime = codec.read_uint64()
- self.objectId = ObjectId(codec)
- self.properties = []
- self.statistics = []
+ self._session = session
+ self._broker = broker
+ self._schema = schema
+ self._currentTime = codec.read_uint64()
+ self._createTime = codec.read_uint64()
+ self._deleteTime = codec.read_uint64()
+ self._objectId = ObjectId(codec)
+ self._properties = []
+ self._statistics = []
if prop:
notPresent = self._parsePresenceMasks(codec, schema)
for property in schema.getProperties():
if property.name in notPresent:
- self.properties.append((property, None))
+ self._properties.append((property, None))
else:
- self.properties.append((property, self.session._decodeValue(codec, property.type)))
+ self._properties.append((property, self._session._decodeValue(codec, property.type)))
if stat:
for statistic in schema.getStatistics():
- self.statistics.append((statistic, self.session._decodeValue(codec, statistic.type)))
+ self._statistics.append((statistic, self._session._decodeValue(codec, statistic.type)))
def getObjectId(self):
""" Return the object identifier for this object """
- return self.objectId
+ return self._objectId
def getClassKey(self):
""" Return the class-key that references the schema describing this object. """
- return self.schema.getKey()
+ return self._schema.getKey()
def getSchema(self):
""" Return the schema that describes this object. """
- return self.schema
+ return self._schema
def getMethods(self):
""" Return a list of methods available for this object. """
- return self.schema.getMethods()
+ return self._schema.getMethods()
def getTimestamps(self):
""" Return the current, creation, and deletion times for this object. """
- return self.currentTime, self.createTime, self.deleteTime
+ return self._currentTime, self._createTime, self._deleteTime
def getIndex(self):
""" Return a string describing this object's primary key. """
result = ""
- for property, value in self.properties:
+ for property, value in self._properties:
if property.index:
if result != "":
result += ":"
result += str(value)
return result
+ def getProperties(self):
+ return self._properties
+
+ def getStatistics(self):
+ return self._statistics
+
def __repr__(self):
return self.getIndex()
def __getattr__(self, name):
- for method in self.schema.getMethods():
+ for method in self._schema.getMethods():
if name == method.name:
return lambda *args, **kwargs : self._invoke(name, args, kwargs)
- for property, value in self.properties:
+ for property, value in self._properties:
if name == property.name:
return value
- for statistic, value in self.statistics:
+ for statistic, value in self._statistics:
if name == statistic.name:
return value
raise Exception("Type Object has no attribute '%s'" % name)
def _invoke(self, name, args, kwargs):
- for method in self.schema.getMethods():
+ for method in self._schema.getMethods():
if name == method.name:
aIdx = 0
- sendCodec = Codec(self.broker.conn.spec)
- seq = self.session.seqMgr._reserve((self, method))
- self.broker._setHeader(sendCodec, 'M', seq)
- self.objectId.encode(sendCodec)
- pname, cname, hash = self.schema.getKey()
+ sendCodec = Codec(self._broker.conn.spec)
+ seq = self._session.seqMgr._reserve((self, method))
+ self._broker._setHeader(sendCodec, 'M', seq)
+ self._objectId.encode(sendCodec)
+ pname, cname, hash = self._schema.getKey()
sendCodec.write_str8(pname)
sendCodec.write_str8(cname)
sendCodec.write_bin128(hash)
@@ -888,29 +885,30 @@
for arg in method.arguments:
if arg.dir.find("I") != -1:
- self.session._encodeValue(sendCodec, args[aIdx], arg.type)
+ self._session._encodeValue(sendCodec, args[aIdx], arg.type)
aIdx += 1
- smsg = self.broker._message(sendCodec.encoded, "agent." + str(self.objectId.getBank()))
- self.broker.cv.acquire()
- self.broker.syncInFlight = True
- self.broker.cv.release()
+ smsg = self._broker._message(sendCodec.encoded, "agent.%d.%d" %
+ (self._objectId.getBroker(), self._objectId.getBank()))
+ self._broker.cv.acquire()
+ self._broker.syncInFlight = True
+ self._broker.cv.release()
- self.broker._send(smsg)
+ self._broker._send(smsg)
- self.broker.cv.acquire()
+ self._broker.cv.acquire()
starttime = time()
- while self.broker.syncInFlight and self.broker.error == None:
- self.broker.cv.wait(self.broker.SYNC_TIME)
- if time() - starttime > self.broker.SYNC_TIME:
- self.broker.cv.release()
- self.session.seqMgr._release(seq)
+ while self._broker.syncInFlight and self._broker.error == None:
+ self._broker.cv.wait(self._broker.SYNC_TIME)
+ if time() - starttime > self._broker.SYNC_TIME:
+ self._broker.cv.release()
+ self._session.seqMgr._release(seq)
raise RuntimeError("Timed out waiting for method to respond")
- self.broker.cv.release()
- if self.broker.error != None:
- errorText = self.broker.error
- self.broker.error = None
+ self._broker.cv.release()
+ if self._broker.error != None:
+ errorText = self._broker.error
+ self._broker.error = None
raise Exception(errorText)
- return self.broker.syncResult
+ return self._broker.syncResult
raise Exception("Invalid Method (software defect) [%s]" % name)
def _parsePresenceMasks(self, codec, schema):
@@ -954,7 +952,7 @@
self.authUser = authUser
self.authPass = authPass
self.agents = {}
- self.agents[0] = Agent(self, 0, "BrokerAgent")
+ self.agents[0] = Agent(self, "1.0", "BrokerAgent")
self.topicBound = False
self.cv = Condition()
self.syncInFlight = False
@@ -1040,14 +1038,15 @@
self.error = "Connect Failed %d - %s" % (e[0], e[1])
def _updateAgent(self, obj):
- if obj.deleteTime == 0:
- if obj.objectIdBank not in self.agents:
- agent = Agent(self, obj.objectIdBank, obj.label)
- self.agents[obj.objectIdBank] = agent
+ bankKey = "%d.%d" % (obj.brokerBank, obj.agentBank)
+ if obj._deleteTime == 0:
+ if bankKey not in self.agents:
+ agent = Agent(self, bankKey, obj.label)
+ self.agents[bankKey] = agent
if self.session.console != None:
self.session.console.newAgent(agent)
else:
- agent = self.agents.pop(obj.objectIdBank, None)
+ agent = self.agents.pop(bankKey, None)
if agent != None and self.session.console != None:
self.session.console.delAgent(agent)
@@ -1055,7 +1054,7 @@
""" Compose the header of a management message. """
codec.write_uint8(ord('A'))
codec.write_uint8(ord('M'))
- codec.write_uint8(ord('1'))
+ codec.write_uint8(ord('2'))
codec.write_uint8(ord(opcode))
codec.write_uint32(seq)
@@ -1068,7 +1067,7 @@
if octet != 'M':
return None, None
octet = chr(codec.read_uint8())
- if octet != '1':
+ if octet != '2':
return None, None
opcode = chr(codec.read_uint8())
seq = codec.read_uint32()
@@ -1164,28 +1163,24 @@
self.label = label
def __repr__(self):
- return "Agent at bank %d (%s)" % (self.bank, self.label)
+ return "Agent at bank %s (%s)" % (self.bank, self.label)
class Event:
""" """
def __init__(self, session, codec):
self.session = session
- self.timestamp = codec.read_int64()
- self.objectId = ObjectId(codec)
pname = codec.read_str8()
cname = codec.read_str8()
hash = codec.read_bin128()
self.classKey = (pname, cname, hash)
- self.name = codec.read_str8()
+ self.timestamp = codec.read_int64()
+ self.schema = None
if pname in session.packages:
if (cname, hash) in session.packages[pname]:
- schema = session.packages[pname][(cname, hash)]
- for event in schema.getEvents():
- if event.name == self.name:
- self.schemaEvent = event
- self.arguments = {}
- for arg in event.arguments:
- self.arguments[arg.name] = session._decodeValue(codec, arg.type)
+ self.schema = session.packages[pname][(cname, hash)]
+ self.arguments = {}
+ for arg in self.schema.arguments:
+ self.arguments[arg.name] = session._decodeValue(codec, arg.type)
def __repr__(self):
return self.getSyslogText()
@@ -1202,10 +1197,15 @@
def getName(self):
return self.name
+ def getSchema(self):
+ return self.schema
+
def getSyslogText(self):
+ if self.schema == None:
+ return "<uninterpretable>"
out = strftime("%c", gmtime(self.timestamp / 1000000000))
- out += " " + self.classKey[0] + ":" + self.classKey[1] + " " + self.name
- for arg in self.schemaEvent.arguments:
+ out += " " + self.classKey[0] + ":" + self.classKey[1]
+ for arg in self.schema.arguments:
out += " " + arg.name + "=" + self.session._displayValue(self.arguments[arg.name], arg.type)
return out
@@ -1247,8 +1247,8 @@
def newPackage(self, name):
print "newPackage:", name
- def newClass(self, classKey):
- print "newClass:", classKey
+ def newClass(self, kind, classKey):
+ print "newClass:", kind, classKey
def newAgent(self, agent):
print "newAgent:", agent
Modified: incubator/qpid/trunk/qpid/python/tests_0-10/management.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/management.py?rev=702651&r1=702650&r2=702651&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/management.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/management.py Tue Oct 7 14:47:35 2008
@@ -59,7 +59,7 @@
session = self.session
self.startQmf()
- brokers = self.qmf.getObjects(cls="broker")
+ brokers = self.qmf.getObjects(_class="broker")
self.assertEqual (len(brokers), 1)
broker = brokers[0]
@@ -147,43 +147,43 @@
session.queue_declare(queue="dest-queue", exclusive=True, auto_delete=True)
session.exchange_bind(queue="dest-queue", exchange="amq.direct")
- queues = self.qmf.getObjects(cls="queue")
+ queues = self.qmf.getObjects(_class="queue")
"Move 10 messages from src-queue to dest-queue"
- result = self.qmf.getObjects(cls="broker")[0].queueMoveMessages("src-queue", "dest-queue", 10)
+ result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 10)
self.assertEqual (result.status, 0)
- sq = self.qmf.getObjects(cls="queue", name="src-queue")[0]
- dq = self.qmf.getObjects(cls="queue", name="dest-queue")[0]
+ sq = self.qmf.getObjects(_class="queue", name="src-queue")[0]
+ dq = self.qmf.getObjects(_class="queue", name="dest-queue")[0]
self.assertEqual (sq.msgDepth,10)
self.assertEqual (dq.msgDepth,10)
"Move all remaining messages to destination"
- result = self.qmf.getObjects(cls="broker")[0].queueMoveMessages("src-queue", "dest-queue", 0)
+ result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 0)
self.assertEqual (result.status,0)
- sq = self.qmf.getObjects(cls="queue", name="src-queue")[0]
- dq = self.qmf.getObjects(cls="queue", name="dest-queue")[0]
+ sq = self.qmf.getObjects(_class="queue", name="src-queue")[0]
+ dq = self.qmf.getObjects(_class="queue", name="dest-queue")[0]
self.assertEqual (sq.msgDepth,0)
self.assertEqual (dq.msgDepth,20)
"Use a bad source queue name"
- result = self.qmf.getObjects(cls="broker")[0].queueMoveMessages("bad-src-queue", "dest-queue", 0)
+ result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("bad-src-queue", "dest-queue", 0)
self.assertEqual (result.status,4)
"Use a bad destination queue name"
- result = self.qmf.getObjects(cls="broker")[0].queueMoveMessages("src-queue", "bad-dest-queue", 0)
+ result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "bad-dest-queue", 0)
self.assertEqual (result.status,4)
" Use a large qty (40) to move from dest-queue back to "
" src-queue- should move all "
- result = self.qmf.getObjects(cls="broker")[0].queueMoveMessages("dest-queue", "src-queue", 40)
+ result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("dest-queue", "src-queue", 40)
self.assertEqual (result.status,0)
- sq = self.qmf.getObjects(cls="queue", name="src-queue")[0]
- dq = self.qmf.getObjects(cls="queue", name="dest-queue")[0]
+ sq = self.qmf.getObjects(_class="queue", name="src-queue")[0]
+ dq = self.qmf.getObjects(_class="queue", name="dest-queue")[0]
self.assertEqual (sq.msgDepth,20)
self.assertEqual (dq.msgDepth,0)
@@ -216,23 +216,23 @@
msg = Message(props, body)
session.message_transfer(destination="amq.direct", message=msg)
- pq = self.qmf.getObjects(cls="queue", name="purge-queue")[0]
+ pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
"Purge top message from purge-queue"
result = pq.purge(1)
self.assertEqual (result.status, 0)
- pq = self.qmf.getObjects(cls="queue", name="purge-queue")[0]
+ pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
self.assertEqual (pq.msgDepth,19)
"Purge top 9 messages from purge-queue"
result = pq.purge(9)
self.assertEqual (result.status, 0)
- pq = self.qmf.getObjects(cls="queue", name="purge-queue")[0]
+ pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
self.assertEqual (pq.msgDepth,10)
"Purge all messages from purge-queue"
result = pq.purge(0)
self.assertEqual (result.status, 0)
- pq = self.qmf.getObjects(cls="queue", name="purge-queue")[0]
+ pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
self.assertEqual (pq.msgDepth,0)
Modified: incubator/qpid/trunk/qpid/specs/management-schema.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/specs/management-schema.xml?rev=702651&r1=702650&r2=702651&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/specs/management-schema.xml (original)
+++ incubator/qpid/trunk/qpid/specs/management-schema.xml Tue Oct 7 14:47:35 2008
@@ -86,17 +86,6 @@
<arg name="password" dir="I" type="sstr"/>
</method>
- <event name="agentConnect" desc="QMF Management Agent has connected to the broker">
- <arg name="remoteAddress" type="sstr"/>
- <arg name="label" type="sstr"/>
- <arg name="brokerBank" type="uint32"/>
- <arg name="agentBank" type="uint32"/>
- </event>
-
- <event name="agentDisconnect" desc="QMF Management Agent has disconnected from the broker">
- <arg name="remoteAddress" type="sstr"/>
- </event>
-
<method name="queueMoveMessages" desc="Move messages from one queue to another">
<arg name="srcQueue" dir="I" type="sstr" desc="Source queue"/>
<arg name="destQueue" dir="I" type="sstr" desc="Destination queue"/>
@@ -115,7 +104,8 @@
<property name="label" type="sstr" access="RO" desc="Label for agent"/>
<property name="registeredTo" type="objId" references="Broker" access="RO" desc="Broker agent is registered to"/>
<property name="systemId" type="uuid" access="RO" desc="Identifier of system where agent resides"/>
- <property name="objectIdBank" type="uint32" access="RO" desc="Assigned object-id bank"/>
+ <property name="brokerBank" type="uint32" access="RO" desc="Assigned object-id broker bank"/>
+ <property name="agentBank" type="uint32" access="RO" desc="Assigned object-id agent bank"/>
</class>
<!--
@@ -218,7 +208,7 @@
<property name="vhostRef" type="objId" references="Vhost" access="RC" index="y" parentRef="y"/>
<property name="address" type="sstr" access="RC" index="y"/>
<property name="incoming" type="bool" access="RC"/>
- <property name="SystemConnection" type="bool" access="RC" desc="Infrastucture/ Inter-system connection (Cluster, Federation ,...)"/>
+ <property name="SystemConnection" type="bool" access="RC" desc="Infrastucture/ Inter-system connection (Cluster, Federation, ...)"/>
<statistic name="closing" type="bool" desc="This client is closing by management request"/>
<statistic name="federationLink" type="bool" desc="Is this a federation link"/>
@@ -305,5 +295,36 @@
<method name="resetLifespan"/>
<method name="close"/>
</class>
+
+ <eventArguments>
+ <arg name="altEx" type="sstr" desc="Name of the alternate exchange"/>
+ <arg name="args" type="map" desc="Supplemental arguments or parameters supplied"/>
+ <arg name="autoDel" type="bool" desc="Created object is automatically deleted when no longer in use"/>
+ <arg name="dest" type="sstr" desc="Destination tag for a subscription"/>
+ <arg name="disp" type="sstr" desc="Disposition of a declaration: 'created' if object was created, 'existing' if object already existed"/>
+ <arg name="durable" type="bool" desc="Created object is durable"/>
+ <arg name="exName" type="sstr" desc="Name of an exchange"/>
+ <arg name="exType" type="sstr" desc="Type of an exchange"/>
+ <arg name="excl" type="bool" desc="Created object is exclusive for the use of the owner only"/>
+ <arg name="key" type="sstr" desc="Key text used for routing or binding"/>
+ <arg name="qName" type="sstr" desc="Name of a queue"/>
+ <arg name="rhost" type="sstr" desc="Address (i.e. DNS name, IP address, etc.) of a remotely connected host"/>
+ <arg name="user" type="sstr" desc="Authentication identity"/>
+ </eventArguments>
+
+ <event name="clientConnect" args="rhost, user"/>
+ <event name="clientDisconnect" args="rhost, user"/>
+ <event name="agentConnect" args="rhost, user"/>
+ <event name="agentDisconnect" args="rhost, user"/>
+ <event name="brokerConnect" args="rhost, user"/>
+ <event name="brokerDisconnect" args="rhost, user"/>
+ <event name="queueDeclare" args="rhost, user, qName, durable, excl, autoDel, args, disp"/>
+ <event name="queueDelete" args="rhost, user, qName"/>
+ <event name="exchangeDeclare" args="rhost, user, exName, exType, altEx, durable, autoDel, args, disp"/>
+ <event name="exchangeDelete" args="rhost, user, exName"/>
+ <event name="bind" args="rhost, user, exName, qName, key, args"/>
+ <event name="unbind" args="rhost, user, exName, qName, key"/>
+ <event name="subscribe" args="rhost, user, qName, dest, excl, args"/>
+ <event name="unsubscribe" args="rhost, user, dest"/>
</schema>