You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2008/07/31 15:15:17 UTC
svn commit: r681362 - in /incubator/qpid/trunk/qpid: cpp/examples/qmf-agent/
cpp/managementgen/ cpp/managementgen/templates/ cpp/src/qpid/agent/
cpp/src/qpid/broker/ cpp/src/qpid/management/ python/commands/ python/qpid/
specs/
Author: tross
Date: Thu Jul 31 06:15:16 2008
New Revision: 681362
URL: http://svn.apache.org/viewvc?rev=681362&view=rev
Log:
QPID-1174 - Management updates for remote agents
Modified:
incubator/qpid/trunk/qpid/cpp/examples/qmf-agent/Makefile
incubator/qpid/trunk/qpid/cpp/examples/qmf-agent/example.cpp
incubator/qpid/trunk/qpid/cpp/managementgen/schema.py
incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.h
incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h
incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h
incubator/qpid/trunk/qpid/python/commands/qpid-tool
incubator/qpid/trunk/qpid/python/qpid/management.py
incubator/qpid/trunk/qpid/python/qpid/managementdata.py
incubator/qpid/trunk/qpid/specs/management-schema.xml
Modified: incubator/qpid/trunk/qpid/cpp/examples/qmf-agent/Makefile
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/qmf-agent/Makefile?rev=681362&r1=681361&r2=681362&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/qmf-agent/Makefile (original)
+++ incubator/qpid/trunk/qpid/cpp/examples/qmf-agent/Makefile Thu Jul 31 06:15:16 2008
@@ -26,13 +26,13 @@
CC = gcc
LIB_DIR = $(QPID_DIR)/cpp/src/.libs
CC_INCLUDES = -I$(SRC_DIR) -I$(QPID_DIR)/cpp/src -I$(QPID_DIR)/cpp/src/gen -I$(GEN_DIR)
-CC_FLAGS = -g -O2
+CC_FLAGS = -g -O3
LD_FLAGS = -lqpidclient -lqpidcommon -L$(LIB_DIR)
SPEC_DIR = $(QPID_DIR)/specs
+TYPE_FILE = $(SPEC_DIR)/management-types.xml
MGEN_DIR = $(QPID_DIR)/cpp/managementgen
TEMPLATE_DIR = $(MGEN_DIR)/templates
MGEN = $(MGEN_DIR)/main.py
-OBJ_DIR = $(SRC_DIR)/.libs
vpath %.cpp $(SRC_DIR):$(GEN_DIR)
vpath %.d $(OBJ_DIR)
@@ -43,15 +43,17 @@
deps = $(addsuffix .d, $(basename $(cpps)))
objects = $(addsuffix .o, $(basename $(cpps)))
-.PHONY: all clean
+.PHONY: all clean gen
#==========================================================
# Pass 0: generate source files from schema
ifeq ($(MAKELEVEL), 0)
-all:
- $(MGEN) $(SCHEMA_FILE) $(SPEC_DIR)/management-types.xml $(TEMPLATE_DIR) $(GEN_DIR)
- $(MAKE)
+all: gen
+ @$(MAKE)
+
+gen:
+ $(MGEN) $(SCHEMA_FILE) $(TYPE_FILE) $(TEMPLATE_DIR) $(GEN_DIR)
clean:
rm -rf $(GEN_DIR) $(OUT_FILE) *.d *.o
@@ -62,7 +64,7 @@
else ifeq ($(MAKELEVEL), 1)
all: $(deps)
- $(MAKE)
+ @$(MAKE)
%.d : %.cpp
$(CC) -M $(CC_FLAGS) $(CC_INCLUDES) $< > $@
Modified: incubator/qpid/trunk/qpid/cpp/examples/qmf-agent/example.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/qmf-agent/example.cpp?rev=681362&r1=681361&r2=681362&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/qmf-agent/example.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/examples/qmf-agent/example.cpp Thu Jul 31 06:15:16 2008
@@ -22,7 +22,10 @@
#include <qpid/management/Manageable.h>
#include <qpid/management/ManagementObject.h>
#include <qpid/agent/ManagementAgent.h>
+#include <qpid/sys/Mutex.h>
#include "Parent.h"
+#include "Child.h"
+#include "ArgsParentCreate_child.h"
#include "PackageQmf_example.h"
#include <unistd.h>
@@ -32,7 +35,14 @@
#include <sstream>
using namespace qpid::management;
+using namespace qpid::sys;
using namespace std;
+using qpid::management::ManagementObject;
+using qpid::management::Manageable;
+using qpid::management::Args;
+using qpid::sys::Mutex;
+
+class ChildClass;
//==============================================================
// CoreClass is the operational class that corresponds to the
@@ -40,21 +50,44 @@
//==============================================================
class CoreClass : public Manageable
{
- string name;
+ string name;
+ ManagementAgent* agent;
Parent* mgmtObject;
+ std::vector<ChildClass*> children;
+ Mutex vectorLock;
public:
CoreClass(ManagementAgent* agent, string _name);
- ~CoreClass() {}
+ ~CoreClass() { mgmtObject->resourceDestroy(); }
+
+ ManagementObject* GetManagementObject(void) const
+ { return mgmtObject; }
+
+ void doLoop();
+ status_t ManagementMethod (uint32_t methodId, Args& args);
+};
+
+class ChildClass : public Manageable
+{
+ string name;
+ Child* mgmtObject;
+
+public:
- void bumpCounter() { mgmtObject->inc_count(); }
+ ChildClass(ManagementAgent* agent, CoreClass* parent, string name);
+ ~ChildClass() { mgmtObject->resourceDestroy(); }
ManagementObject* GetManagementObject(void) const
{ return mgmtObject; }
+
+ void doWork()
+ {
+ mgmtObject->inc_count(2);
+ }
};
-CoreClass::CoreClass(ManagementAgent* agent, string _name) : name(_name)
+CoreClass::CoreClass(ManagementAgent* _agent, string _name) : name(_name), agent(_agent)
{
mgmtObject = new Parent(agent, this, name);
@@ -62,6 +95,52 @@
mgmtObject->set_state("IDLE");
}
+void CoreClass::doLoop()
+{
+ // Periodically bump a counter to provide a changing statistical value
+ while (1) {
+ sleep(1);
+ mgmtObject->inc_count();
+ mgmtObject->set_state("IN LOOP");
+
+ {
+ Mutex::ScopedLock _lock(vectorLock);
+
+ for (std::vector<ChildClass*>::iterator iter = children.begin();
+ iter != children.end();
+ iter++) {
+ (*iter)->doWork();
+ }
+ }
+ }
+}
+
+Manageable::status_t CoreClass::ManagementMethod(uint32_t methodId, Args& args)
+{
+ Mutex::ScopedLock _lock(vectorLock);
+
+ switch (methodId) {
+ case Parent::METHOD_CREATE_CHILD:
+ ArgsParentCreate_child& ioArgs = (ArgsParentCreate_child&) args;
+
+ ChildClass *child = new ChildClass(agent, this, ioArgs.i_name);
+ ioArgs.o_childRef = child->GetManagementObject()->getObjectId();
+
+ children.push_back(child);
+
+ return STATUS_OK;
+ }
+
+ return STATUS_NOT_IMPLEMENTED;
+}
+
+ChildClass::ChildClass(ManagementAgent* agent, CoreClass* parent, string name)
+{
+ mgmtObject = new Child(agent, this, parent, name);
+
+ agent->addObject(mgmtObject);
+}
+
//==============================================================
// Main program
@@ -79,19 +158,14 @@
// Start the agent. It will attempt to make a connection to the
// management broker
- agent->init (string(host), port);
+ agent->init(string(host), port);
// Allocate some core objects
CoreClass core1(agent, "Example Core Object #1");
CoreClass core2(agent, "Example Core Object #2");
CoreClass core3(agent, "Example Core Object #3");
- // Periodically bump a counter in core1 to provide a changing statistical value
- while (1)
- {
- sleep(1);
- core1.bumpCounter();
- }
+ core1.doLoop();
}
Modified: incubator/qpid/trunk/qpid/cpp/managementgen/schema.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/managementgen/schema.py?rev=681362&r1=681361&r2=681362&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/managementgen/schema.py (original)
+++ incubator/qpid/trunk/qpid/cpp/managementgen/schema.py Thu Jul 31 06:15:16 2008
@@ -577,7 +577,7 @@
return self.name
def getFullName (self):
- return self.parent.getName().capitalize() + self.name[0:1].upper() +\
+ return capitalize(self.parent.getName()) + self.name[0:1].upper() +\
self.name[1:]
def getArgCount (self):
@@ -644,7 +644,7 @@
return self.name
def getFullName (self):
- return self.parent.getName ().capitalize() + self.name.capitalize ()
+ return capitalize(self.parent.getName()) + capitalize(self.name)
def getArgCount (self):
return len (self.args)
@@ -938,7 +938,7 @@
method.genSchema (stream, variables)
def genNameCap (self, stream, variables):
- stream.write (self.name.capitalize ())
+ stream.write (capitalize(self.name))
def genNameLower (self, stream, variables):
stream.write (self.name.lower ())
Modified: incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.h?rev=681362&r1=681361&r2=681362&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.h (original)
+++ incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.h Thu Jul 31 06:15:16 2008
@@ -86,9 +86,9 @@
/*MGEN:Class.SetGeneralReferenceDeclaration*/
- std::string getPackageName (void) { return packageName; }
- std::string getClassName (void) { return className; }
- uint8_t* getMd5Sum (void) { return md5Sum; }
+ std::string& getPackageName (void) { return packageName; }
+ std::string& getClassName (void) { return className; }
+ uint8_t* getMd5Sum (void) { return md5Sum; }
// Method IDs
/*MGEN:Class.MethodIdDeclarations*/
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp?rev=681362&r1=681361&r2=681362&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp Thu Jul 31 06:15:16 2008
@@ -67,16 +67,21 @@
}
ManagementAgentImpl::ManagementAgentImpl() :
- clientWasAdded(true), objIdPrefix(0), bgThread(*this), thread(bgThread)
+ clientWasAdded(true), objIdPrefix(0), bgThread(*this), thread(bgThread), startupWait(false)
{
// TODO: Establish system ID
}
-void ManagementAgentImpl::init (std::string brokerHost,
- uint16_t brokerPort,
- uint16_t intervalSeconds,
- bool useExternalThread)
+void ManagementAgentImpl::init(std::string brokerHost,
+ uint16_t brokerPort,
+ uint16_t intervalSeconds,
+ bool useExternalThread)
{
+ {
+ Mutex::ScopedLock lock(agentLock);
+ startupWait = true;
+ }
+
interval = intervalSeconds;
extThread = useExternalThread;
nextObjectId = 1;
@@ -92,17 +97,17 @@
session.queueDeclare (arg::queue=queueName.str());
session.exchangeBind (arg::exchange="amq.direct", arg::queue=queueName.str(),
- arg::bindingKey=queueName.str ());
+ arg::bindingKey=queueName.str());
session.messageSubscribe (arg::queue=queueName.str(),
arg::destination=dest);
session.messageFlow (arg::destination=dest, arg::unit=0, arg::value=0xFFFFFFFF);
session.messageFlow (arg::destination=dest, arg::unit=1, arg::value=0xFFFFFFFF);
Message attachRequest;
- char rawbuffer[512]; // TODO: Modify Buffer so it can use stringstream
+ char rawbuffer[512];
Buffer buffer (rawbuffer, 512);
- attachRequest.getDeliveryProperties().setRoutingKey("agent");
+ attachRequest.getDeliveryProperties().setRoutingKey("broker");
attachRequest.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str()));
EncodeHeader (buffer, 'A');
@@ -115,15 +120,22 @@
string stringBuffer (rawbuffer, length);
attachRequest.setData (stringBuffer);
- session.messageTransfer (arg::content=attachRequest, arg::destination="qpid.management");
+ session.messageTransfer(arg::content=attachRequest, arg::destination="qpid.management");
- dispatcher->listen (dest, this);
- dispatcher->start ();
+ dispatcher->listen(dest, this);
+ dispatcher->start();
+
+ {
+ Mutex::ScopedLock lock(agentLock);
+ if (startupWait)
+ startupCond.wait(agentLock);
+ }
}
-ManagementAgentImpl::~ManagementAgentImpl ()
+ManagementAgentImpl::~ManagementAgentImpl()
{
- dispatcher->stop ();
+ dispatcher->stop();
+ session.close();
delete dispatcher;
}
@@ -151,24 +163,33 @@
return objectId;
}
-uint32_t ManagementAgentImpl::pollCallbacks (uint32_t /*callLimit*/)
+uint32_t ManagementAgentImpl::pollCallbacks(uint32_t /*callLimit*/)
{
return 0;
}
-int ManagementAgentImpl::getSignalFd (void)
+int ManagementAgentImpl::getSignalFd(void)
{
return -1;
}
-void ManagementAgentImpl::handleAttachResponse (Buffer& inBuffer)
+void ManagementAgentImpl::handleAttachResponse(Buffer& inBuffer)
{
Mutex::ScopedLock lock(agentLock);
uint32_t assigned;
+ stringstream key;
assigned = inBuffer.getLong();
objIdPrefix = ((uint64_t) assigned) << 24;
+ startupWait = false;
+ startupCond.notify();
+
+ // Bind to qpid.management to receive commands
+ key << "agent." << assigned;
+ session.exchangeBind (arg::exchange="qpid.management", arg::queue=queueName.str(),
+ arg::bindingKey=key.str());
+
// Send package indications for all local packages
for (PackageMap::iterator pIter = packages.begin();
pIter != packages.end();
@@ -180,7 +201,7 @@
EncodePackageIndication(outBuffer, pIter);
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset();
- SendBuffer(outBuffer, outLen, "qpid.management", "agent");
+ SendBuffer(outBuffer, outLen, "qpid.management", "broker");
// Send class indications for all local classes
ClassMap cMap = pIter->second;
@@ -190,7 +211,7 @@
EncodeClassIndication(outBuffer, pIter, cIter);
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset();
- SendBuffer(outBuffer, outLen, "qpid.management", "agent");
+ SendBuffer(outBuffer, outLen, "qpid.management", "broker");
}
}
}
@@ -218,7 +239,7 @@
schema.writeSchemaCall(outBuffer);
outLen = MA_BUFFER_SIZE - outBuffer.available ();
outBuffer.reset();
- SendBuffer(outBuffer, outLen, "qpid.management", "agent");
+ SendBuffer(outBuffer, outLen, "qpid.management", "broker");
}
}
}
@@ -229,18 +250,50 @@
clientWasAdded = true;
}
-void ManagementAgentImpl::received (Message& msg)
+void ManagementAgentImpl::handleMethodRequest(Buffer& inBuffer, uint32_t sequence, string replyTo)
+{
+ string methodName;
+ Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ uint64_t objId = inBuffer.getLongLong();
+ inBuffer.getShortString(methodName);
+
+ EncodeHeader(outBuffer, 'm', sequence);
+
+ ManagementObjectMap::iterator iter = managementObjects.find(objId);
+ if (iter == managementObjects.end() || iter->second->isDeleted()) {
+ outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT);
+ outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT));
+ } else {
+ iter->second->doMethod(methodName, inBuffer, outBuffer);
+ }
+
+ outLen = MA_BUFFER_SIZE - outBuffer.available();
+ outBuffer.reset();
+ SendBuffer(outBuffer, outLen, "amq.direct", replyTo);
+}
+
+void ManagementAgentImpl::received(Message& msg)
{
- string data = msg.getData ();
- Buffer inBuffer (const_cast<char*>(data.c_str()), data.size());
+ string data = msg.getData();
+ Buffer inBuffer(const_cast<char*>(data.c_str()), data.size());
uint8_t opcode;
uint32_t sequence;
+ string replyToKey;
+
+ framing::MessageProperties p = msg.getMessageProperties();
+ if (p.hasReplyTo()) {
+ const framing::ReplyTo& rt = p.getReplyTo();
+ replyToKey = rt.getRoutingKey();
+ }
if (CheckHeader (inBuffer, &opcode, &sequence))
{
if (opcode == 'a') handleAttachResponse(inBuffer);
else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence);
else if (opcode == 'x') handleConsoleAddedIndication();
+ else if (opcode == 'M') handleMethodRequest(inBuffer, sequence, replyToKey);
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h?rev=681362&r1=681361&r2=681362&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h Thu Jul 31 06:15:16 2008
@@ -30,6 +30,7 @@
#include "qpid/sys/Thread.h"
#include "qpid/sys/Runnable.h"
#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Condition.h"
#include "qpid/framing/Uuid.h"
#include <iostream>
#include <sstream>
@@ -127,6 +128,8 @@
BackgroundThread bgThread;
sys::Thread thread;
+ sys::Condition startupCond;
+ bool startupWait;
PackageMap::iterator FindOrAddPackage (std::string name);
void moveNewObjectsLH();
@@ -149,6 +152,7 @@
void handlePackageRequest (qpid::framing::Buffer& inBuffer);
void handleClassQuery (qpid::framing::Buffer& inBuffer);
void handleSchemaRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence);
+ void handleMethodRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo);
void handleConsoleAddedIndication();
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp?rev=681362&r1=681361&r2=681362&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp Thu Jul 31 06:15:16 2008
@@ -335,8 +335,7 @@
return Manageable::STATUS_OK;
case management::Link::METHOD_BRIDGE :
- management::ArgsLinkBridge iargs =
- dynamic_cast<const management::ArgsLinkBridge&>(args);
+ management::ArgsLinkBridge& iargs = (management::ArgsLinkBridge&) args;
// Durable bridges are only valid on durable links
if (iargs.i_durable && !durable)
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp?rev=681362&r1=681361&r2=681362&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp Thu Jul 31 06:15:16 2008
@@ -97,6 +97,7 @@
// objects that will be invalid.
dExchange.reset();
mExchange.reset();
+ timer.stop();
moveNewObjectsLH();
for (ManagementObjectMap::iterator iter = managementObjects.begin ();
@@ -117,33 +118,33 @@
timer.add (intrusive_ptr<TimerTask> (new Periodic(*this, interval)));
// Get from file or generate and save to file.
- if (dataDir.empty ())
+ if (dataDir.empty())
{
- uuid.generate ();
+ uuid.generate();
QPID_LOG (info, "ManagementBroker has no data directory, generated new broker ID: "
<< uuid);
}
else
{
- string filename (dataDir + "/.mbrokerdata");
- ifstream inFile (filename.c_str ());
+ string filename(dataDir + "/.mbrokerdata");
+ ifstream inFile(filename.c_str ());
- if (inFile.good ())
+ if (inFile.good())
{
inFile >> uuid;
inFile >> bootSequence;
inFile >> nextRemoteBank;
- inFile.close ();
+ inFile.close();
QPID_LOG (debug, "ManagementBroker restored broker ID: " << uuid);
bootSequence++;
- writeData ();
+ writeData();
}
else
{
- uuid.generate ();
+ uuid.generate();
QPID_LOG (info, "ManagementBroker generated broker ID: " << uuid);
- writeData ();
+ writeData();
}
QPID_LOG (debug, "ManagementBroker boot sequence: " << bootSequence);
@@ -155,10 +156,10 @@
string filename (dataDir + "/.mbrokerdata");
ofstream outFile (filename.c_str ());
- if (outFile.good ())
+ if (outFile.good())
{
outFile << uuid << " " << bootSequence << " " << nextRemoteBank << endl;
- outFile.close ();
+ outFile.close();
}
}
@@ -174,7 +175,7 @@
uint8_t* md5Sum,
ManagementObject::writeSchemaCall_t schemaCall)
{
- Mutex::ScopedLock lock (userLock);
+ Mutex::ScopedLock lock(userLock);
PackageMap::iterator pIter = FindOrAddPackageLH(packageName);
AddClass(pIter, className, md5Sum, schemaCall);
}
@@ -391,124 +392,64 @@
SendBuffer (outBuffer, outLen, dExchange, replyToKey);
}
-void ManagementBroker::dispatchCommand (Deliverable& deliverable,
+bool ManagementBroker::dispatchCommand (Deliverable& deliverable,
const string& routingKey,
const FieldTable* /*args*/)
{
Mutex::ScopedLock lock (userLock);
Message& msg = ((DeliverableMessage&) deliverable).getMessage ();
- if (routingKey.compare (0, 13, "agent.method.") == 0)
- dispatchMethodLH (msg, routingKey, 13);
+ // Parse the routing key. This management broker should act as though it
+ // is bound to the exchange to match the following keys:
+ //
+ // agent.<X>.#
+ // broker.#
+ //
+ // where <X> is any non-negative decimal integer less than the lowest remote
+ // object-id bank.
- else if (routingKey.length () == 5 &&
- routingKey.compare (0, 5, "agent") == 0)
+ if (routingKey == "broker") {
dispatchAgentCommandLH (msg);
-
- else
- {
- QPID_LOG (debug, "Illegal routing key for dispatch: " << routingKey);
- return;
- }
-}
-
-void ManagementBroker::dispatchMethodLH (Message& msg,
- const string& routingKey,
- size_t first)
-{
- size_t pos, start = first;
- uint32_t contentSize;
-
- if (routingKey.length () == start)
- {
- QPID_LOG (debug, "Missing package-name in routing key: " << routingKey);
- return;
- }
-
- pos = routingKey.find ('.', start);
- if (pos == string::npos || routingKey.length () == pos + 1)
- {
- QPID_LOG (debug, "Missing class-name in routing key: " << routingKey);
- return;
+ return false;
}
- string packageName = routingKey.substr (start, pos - start);
-
- start = pos + 1;
- pos = routingKey.find ('.', start);
- if (pos == string::npos || routingKey.length () == pos + 1)
- {
- QPID_LOG (debug, "Missing method-name in routing key: " << routingKey);
- return;
+ else if (routingKey.compare(0, 6, "agent.") == 0) {
+ uint32_t delim = routingKey.find('.', 6);
+ if (delim == string::npos)
+ delim = routingKey.length();
+ string bank = routingKey.substr(6, delim - 6);
+ if ((uint32_t) atoi(bank.c_str()) <= localBank) {
+ dispatchAgentCommandLH (msg);
+ return false;
+ }
}
- string className = routingKey.substr (start, pos - start);
-
- start = pos + 1;
- string methodName = routingKey.substr (start, routingKey.length () - start);
-
- contentSize = msg.encodedContentSize ();
- if (contentSize < 8 || contentSize > MA_BUFFER_SIZE)
- return;
+ return true;
+}
- Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE);
+void ManagementBroker::handleMethodRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence)
+{
+ string methodName;
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen, sequence;
- uint8_t opcode;
-
- if (msg.encodedSize() > MA_BUFFER_SIZE) {
- QPID_LOG(debug, "ManagementBroker::dispatchMethodLH: Message too large: " <<
- msg.encodedSize());
- return;
- }
-
- msg.encodeContent (inBuffer);
- inBuffer.reset ();
-
- if (!CheckHeader (inBuffer, &opcode, &sequence))
- {
- QPID_LOG (debug, " Invalid content header");
- return;
- }
-
- if (opcode != 'M')
- {
- QPID_LOG (debug, " Unexpected opcode " << opcode);
- return;
- }
-
- uint64_t objId = inBuffer.getLongLong ();
- string replyToKey;
+ uint32_t outLen;
- const framing::MessageProperties* p =
- msg.getFrames().getHeaders()->get<framing::MessageProperties>();
- if (p && p->hasReplyTo())
- {
- const framing::ReplyTo& rt = p->getReplyTo ();
- replyToKey = rt.getRoutingKey ();
- }
- else
- {
- QPID_LOG (debug, " Reply-to missing");
- return;
- }
+ uint64_t objId = inBuffer.getLongLong();
+ inBuffer.getShortString(methodName);
- EncodeHeader (outBuffer, 'm', sequence);
+ std::cout << "ManagementBroker::handleMethodRequest (" << objId << ", " << methodName << ")" << std::endl;
+ EncodeHeader(outBuffer, 'm', sequence);
- ManagementObjectMap::iterator iter = managementObjects.find (objId);
- if (iter == managementObjects.end () || iter->second->isDeleted ())
- {
+ ManagementObjectMap::iterator iter = managementObjects.find(objId);
+ if (iter == managementObjects.end() || iter->second->isDeleted()) {
outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT);
outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT));
- }
- else
- {
- iter->second->doMethod (methodName, inBuffer, outBuffer);
+ } else {
+ iter->second->doMethod(methodName, inBuffer, outBuffer);
}
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+ outLen = MA_BUFFER_SIZE - outBuffer.available();
+ outBuffer.reset();
+ SendBuffer(outBuffer, outLen, dExchange, replyToKey);
}
void ManagementBroker::handleBrokerRequestLH (Buffer&, string replyToKey, uint32_t sequence)
@@ -737,6 +678,8 @@
requestedBank = inBuffer.getLong ();
assignedBank = assignBankLH (requestedBank);
+ // TODO: Make a pass over the agents and delete any that no longer have a session.
+
RemoteAgentMap::iterator aIter = remoteAgents.find (sessionName);
if (aIter != remoteAgents.end())
{
@@ -755,6 +698,7 @@
agent->mgmtObject->set_label (label);
agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId());
agent->mgmtObject->set_systemId (systemId);
+ agent->mgmtObject->set_objectIdBank (assignedBank);
addObject (agent->mgmtObject);
remoteAgents[sessionName] = agent;
@@ -818,10 +762,9 @@
const framing::MessageProperties* p =
msg.getFrames().getHeaders()->get<framing::MessageProperties>();
- if (p && p->hasReplyTo())
- {
- const framing::ReplyTo& rt = p->getReplyTo ();
- replyToKey = rt.getRoutingKey ();
+ if (p && p->hasReplyTo()) {
+ const framing::ReplyTo& rt = p->getReplyTo();
+ replyToKey = rt.getRoutingKey();
}
else
return;
@@ -832,10 +775,10 @@
return;
}
- msg.encodeContent (inBuffer);
- inBuffer.reset ();
+ msg.encodeContent(inBuffer);
+ inBuffer.reset();
- if (!CheckHeader (inBuffer, &opcode, &sequence))
+ if (!CheckHeader(inBuffer, &opcode, &sequence))
return;
if (opcode == 'B') handleBrokerRequestLH (inBuffer, replyToKey, sequence);
@@ -847,6 +790,7 @@
else if (opcode == 's') handleSchemaResponseLH (inBuffer, replyToKey, sequence);
else if (opcode == 'A') handleAttachRequestLH (inBuffer, replyToKey, sequence);
else if (opcode == 'G') handleGetQueryLH (inBuffer, replyToKey, sequence);
+ else if (opcode == 'M') handleMethodRequestLH (inBuffer, replyToKey, sequence);
}
ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackageLH(std::string name)
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h?rev=681362&r1=681361&r2=681362&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h Thu Jul 31 06:15:16 2008
@@ -59,7 +59,7 @@
uint32_t persistId = 0,
uint32_t persistBank = 4);
void clientAdded (void);
- void dispatchCommand (broker::Deliverable& msg,
+ bool dispatchCommand (broker::Deliverable& msg,
const std::string& routingKey,
const framing::FieldTable* args);
@@ -177,9 +177,6 @@
std::string routingKey);
void moveNewObjectsLH();
- void dispatchMethodLH (broker::Message& msg,
- const std::string& routingKey,
- size_t first);
void dispatchAgentCommandLH (broker::Message& msg);
PackageMap::iterator FindOrAddPackageLH(std::string name);
@@ -206,6 +203,7 @@
void handleSchemaResponseLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
void handleAttachRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
void handleGetQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handleMethodRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
size_t ValidateSchema(framing::Buffer&);
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.cpp?rev=681362&r1=681361&r2=681362&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.cpp Thu Jul 31 06:15:16 2008
@@ -40,17 +40,16 @@
const string& routingKey,
const FieldTable* args)
{
+ bool routeIt = true;
+
// Intercept management agent commands
- if ((routingKey.length () > 6 &&
- routingKey.substr (0, 6).compare ("agent.") == 0) ||
- (routingKey.length () == 5 &&
- routingKey.substr (0, 5).compare ("agent") == 0))
- {
- managementAgent->dispatchCommand (msg, routingKey, args);
- return;
- }
+ if ((routingKey.length() > 6 &&
+ routingKey.substr(0, 6).compare("agent.") == 0) ||
+ (routingKey == "broker"))
+ routeIt = managementAgent->dispatchCommand(msg, routingKey, args);
- TopicExchange::route (msg, routingKey, args);
+ if (routeIt)
+ TopicExchange::route(msg, routingKey, args);
}
bool ManagementExchange::bind (Queue::shared_ptr queue,
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h?rev=681362&r1=681361&r2=681362&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h Thu Jul 31 06:15:16 2008
@@ -98,8 +98,8 @@
qpid::framing::Buffer& outBuf) = 0;
virtual void setReference (uint64_t objectId);
- virtual std::string getClassName (void) = 0;
- virtual std::string getPackageName (void) = 0;
+ virtual std::string& getClassName (void) = 0;
+ virtual std::string& getPackageName (void) = 0;
virtual uint8_t* getMd5Sum (void) = 0;
void setObjectId (uint64_t oid) { objectId = oid; }
Modified: incubator/qpid/trunk/qpid/python/commands/qpid-tool
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/commands/qpid-tool?rev=681362&r1=681361&r2=681362&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/commands/qpid-tool (original)
+++ incubator/qpid/trunk/qpid/python/commands/qpid-tool Thu Jul 31 06:15:16 2008
@@ -65,6 +65,7 @@
print " schema <className> - Print details of an object class"
print " set time-format short - Select short timestamp format (default)"
print " set time-format long - Select long timestamp format"
+ print " id [<ID>] - Display translations of display object ids"
print " quit or ^D - Exit the program"
print
@@ -93,6 +94,9 @@
except:
pass
+ def do_id (self, data):
+ self.dataObject.do_id(data)
+
def complete_schema (self, text, line, begidx, endidx):
tokens = split (line)
if len (tokens) > 2:
Modified: incubator/qpid/trunk/qpid/python/qpid/management.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/management.py?rev=681362&r1=681361&r2=681362&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/management.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/management.py Thu Jul 31 06:15:16 2008
@@ -154,7 +154,7 @@
def accept (self, msg):
self.qpidChannel.message_accept(RangedSet(msg.id))
- def message (self, body, routing_key="agent"):
+ def message (self, body, routing_key="broker"):
dp = self.qpidChannel.delivery_properties()
dp.routing_key = routing_key
mp = self.qpidChannel.message_properties()
@@ -227,14 +227,14 @@
""" Invoke a method on a managed object. """
self.method (channel, userSequence, objId, className, methodName, args)
- def getObjects (self, channel, userSequence, className):
+ def getObjects (self, channel, userSequence, className, bank=0):
""" Request immediate content from broker """
codec = Codec (self.spec)
self.setHeader (codec, ord ('G'), userSequence)
ft = {}
ft["_class"] = className
codec.write_map (ft)
- msg = channel.message(codec.encoded)
+ msg = channel.message(codec.encoded, routing_key="agent.%d" % bank)
channel.send ("qpid.management", msg)
def syncWaitForStable (self, channel):
@@ -273,14 +273,14 @@
self.cv.release ()
return result
- def syncGetObjects (self, channel, className):
+ def syncGetObjects (self, channel, className, bank=0):
""" Synchronous (blocking) get call """
self.cv.acquire ()
self.syncInFlight = True
self.syncResult = []
self.syncSequence = self.seqMgr.reserve ("sync")
self.cv.release ()
- self.getObjects (channel, self.syncSequence, className)
+ self.getObjects (channel, self.syncSequence, className, bank)
self.cv.acquire ()
starttime = time ()
while self.syncInFlight:
@@ -748,6 +748,8 @@
sequence = self.seqMgr.reserve ((userSequence, classId, methodName))
self.setHeader (codec, ord ('M'), sequence)
codec.write_uint64 (objId) # ID of object
+ codec.write_str8 (methodName)
+ bank = (objId & 0x0000FFFFFF000000) >> 24
# Encode args according to schema
if classId not in self.schema:
@@ -777,6 +779,5 @@
packageName = classId[0]
className = classId[1]
- msg = channel.message(codec.encoded, "agent.method." + packageName + "." + \
- className + "." + methodName)
+ msg = channel.message(codec.encoded, "agent." + str(bank))
channel.send ("qpid.management", msg)
Modified: incubator/qpid/trunk/qpid/python/qpid/managementdata.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/managementdata.py?rev=681362&r1=681361&r2=681362&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/managementdata.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/managementdata.py Thu Jul 31 06:15:16 2008
@@ -167,10 +167,14 @@
if self.cli != None:
self.cli.setPromptMessage ("Broker Disconnected")
- def schemaHandler (self, context, className, configs, insts, methods, events):
+ def schemaHandler (self, context, classKey, configs, insts, methods, events):
""" Callback for schema updates """
- if className not in self.schema:
- self.schema[className] = (configs, insts, methods, events)
+ if classKey not in self.schema:
+ schemaRev = 0
+ for key in self.schema:
+ if classKey[0] == key[0] and classKey[1] == key[1]:
+ schemaRev += 1
+ self.schema[classKey] = (configs, insts, methods, events, schemaRev)
def setCli (self, cliobj):
self.cli = cliobj
@@ -248,17 +252,17 @@
return str (value)
return "*type-error*"
- def getObjIndex (self, className, config):
+ def getObjIndex (self, classKey, config):
""" Concatenate the values from index columns to form a unique object name """
result = ""
- schemaConfig = self.schema[className][0]
+ schemaConfig = self.schema[classKey][0]
for item in schemaConfig:
if item[5] == 1 and item[0] != "id":
if result != "":
result = result + "."
for key,val in config:
if key == item[0]:
- result = result + self.valueDisplay (className, key, val)
+ result = result + self.valueDisplay (classKey, key, val)
return result
def getClassKey (self, className):
@@ -268,11 +272,17 @@
if key[1] == className:
return key
else:
- package = className[0:dotPos]
- name = className[dotPos + 1:]
+ package = className[0:dotPos]
+ name = className[dotPos + 1:]
+ schemaRev = 0
+ delim = name.find(".")
+ if delim != -1:
+ schemaRev = int(name[delim + 1:])
+ name = name[0:delim]
for key in self.schema:
if key[0] == package and key[1] == name:
- return key
+ if self.schema[key][4] == schemaRev:
+ return key
return None
def classCompletions (self, prefix):
@@ -508,7 +518,11 @@
sorted.sort ()
for classKey in sorted:
tuple = self.schema[classKey]
- className = classKey[0] + "." + classKey[1]
+ if tuple[4] == 0:
+ suffix = ""
+ else:
+ suffix = ".%d" % tuple[4]
+ className = classKey[0] + "." + classKey[1] + suffix
row = (className, len (tuple[0]), len (tuple[1]), len (tuple[2]), len (tuple[3]))
rows.append (row)
self.disp.table ("Classes in Schema:",
@@ -527,6 +541,7 @@
raise ValueError ()
rows = []
+ schemaRev = self.schema[classKey][4]
for config in self.schema[classKey][0]:
name = config[0]
if name != "id":
@@ -554,7 +569,7 @@
rows.append ((name, typename, unit, "", "", desc))
titles = ("Element", "Type", "Unit", "Access", "Notes", "Description")
- self.disp.table ("Schema for class '%s.%s':" % (classKey[0], classKey[1]), titles, rows)
+ self.disp.table ("Schema for class '%s.%s.%d':" % (classKey[0], classKey[1], schemaRev), titles, rows)
for mname in self.schema[classKey][2]:
(mdesc, args) = self.schema[classKey][2][mname]
@@ -603,13 +618,20 @@
raise ValueError ()
schemaMethod = self.schema[classKey][2][methodName]
- if len (args) != len (schemaMethod[1]):
- print "Wrong number of method args: Need %d, Got %d" % (len (schemaMethod[1]), len (args))
+ count = 0
+ for arg in range(len(schemaMethod[1])):
+ if schemaMethod[1][arg][2].find("I") != -1:
+ count += 1
+ if len (args) != count:
+ print "Wrong number of method args: Need %d, Got %d" % (count, len (args))
raise ValueError ()
namedArgs = {}
- for idx in range (len (args)):
- namedArgs[schemaMethod[1][idx][0]] = args[idx]
+ idx = 0
+ for arg in range(len(schemaMethod[1])):
+ if schemaMethod[1][arg][2].find("I") != -1:
+ namedArgs[schemaMethod[1][arg][0]] = args[idx]
+ idx += 1
self.methodSeq = self.methodSeq + 1
self.methodsPending[self.methodSeq] = methodName
@@ -623,6 +645,35 @@
# except ValueError, e:
# print "Error invoking method:", e
+ def makeIdRow (self, displayId):
+ if displayId in self.idMap:
+ rawId = self.idMap[displayId]
+ else:
+ return None
+ return (displayId,
+ rawId,
+ (rawId & 0x7FFF000000000000) >> 48,
+ (rawId & 0x0000FFFFFF000000) >> 24,
+ (rawId & 0x0000000000FFFFFF))
+
+ def listIds (self, select):
+ rows = []
+ if select == 0:
+ sorted = self.idMap.keys()
+ sorted.sort()
+ for displayId in sorted:
+ row = self.makeIdRow (displayId)
+ rows.append(row)
+ else:
+ row = self.makeIdRow (select)
+ if row == None:
+ print "Display Id %d not known" % select
+ return
+ rows.append(row)
+ self.disp.table("Translation of Display IDs:",
+ ("DisplayID", "RawID", "BootSequence", "Bank", "Object"),
+ rows)
+
def do_list (self, data):
tokens = data.split ()
if len (tokens) == 0:
@@ -644,10 +695,17 @@
print "Not enough arguments supplied"
return
- userOid = long (tokens[0])
+ displayId = long (tokens[0])
methodName = tokens[1]
args = tokens[2:]
- self.callMethod (userOid, methodName, args)
+ self.callMethod (displayId, methodName, args)
+
+ def do_id (self, data):
+ if data == "":
+ select = 0
+ else:
+ select = int(data)
+ self.listIds(select)
def do_exit (self):
self.mclient.removeChannel (self.mch)
Modified: incubator/qpid/trunk/qpid/specs/management-schema.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/specs/management-schema.xml?rev=681362&r1=681361&r2=681362&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/specs/management-schema.xml (original)
+++ incubator/qpid/trunk/qpid/specs/management-schema.xml Thu Jul 31 06:15:16 2008
@@ -102,10 +102,11 @@
===============================================================
-->
<class name="Agent">
- <property name="sessionName" type="sstr" access="RO" index="y" desc="Session ID for Agent"/>
- <property name="label" type="sstr" access="RO" desc="Label for agent"/>
- <property name="registeredTo" type="objId" references="Broker" access="RO" desc="Broker agent is registered to"/>
- <property name="systemId" type="uuid" access="RO" desc="Identifier of system where agent resides"/>
+ <property name="sessionName" type="sstr" access="RO" index="y" desc="Session ID for Agent"/>
+ <property name="label" type="sstr" access="RO" desc="Label for agent"/>
+ <property name="registeredTo" type="objId" references="Broker" access="RO" desc="Broker agent is registered to"/>
+ <property name="systemId" type="uuid" access="RO" desc="Identifier of system where agent resides"/>
+ <property name="objectIdBank" type="uint32" access="RO" desc="Assigned object-id bank"/>
</class>
<!--