You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2008/07/31 15:15:17 UTC

svn commit: r681362 - in /incubator/qpid/trunk/qpid: cpp/examples/qmf-agent/ cpp/managementgen/ cpp/managementgen/templates/ cpp/src/qpid/agent/ cpp/src/qpid/broker/ cpp/src/qpid/management/ python/commands/ python/qpid/ specs/

Author: tross
Date: Thu Jul 31 06:15:16 2008
New Revision: 681362

URL: http://svn.apache.org/viewvc?rev=681362&view=rev
Log:
QPID-1174 - Management updates for remote agents

Modified:
    incubator/qpid/trunk/qpid/cpp/examples/qmf-agent/Makefile
    incubator/qpid/trunk/qpid/cpp/examples/qmf-agent/example.cpp
    incubator/qpid/trunk/qpid/cpp/managementgen/schema.py
    incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h
    incubator/qpid/trunk/qpid/python/commands/qpid-tool
    incubator/qpid/trunk/qpid/python/qpid/management.py
    incubator/qpid/trunk/qpid/python/qpid/managementdata.py
    incubator/qpid/trunk/qpid/specs/management-schema.xml

Modified: incubator/qpid/trunk/qpid/cpp/examples/qmf-agent/Makefile
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/qmf-agent/Makefile?rev=681362&r1=681361&r2=681362&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/qmf-agent/Makefile (original)
+++ incubator/qpid/trunk/qpid/cpp/examples/qmf-agent/Makefile Thu Jul 31 06:15:16 2008
@@ -26,13 +26,13 @@
 CC           = gcc
 LIB_DIR      = $(QPID_DIR)/cpp/src/.libs
 CC_INCLUDES  = -I$(SRC_DIR) -I$(QPID_DIR)/cpp/src -I$(QPID_DIR)/cpp/src/gen -I$(GEN_DIR)
-CC_FLAGS     = -g -O2
+CC_FLAGS     = -g -O3
 LD_FLAGS     = -lqpidclient -lqpidcommon -L$(LIB_DIR)
 SPEC_DIR     = $(QPID_DIR)/specs
+TYPE_FILE    = $(SPEC_DIR)/management-types.xml
 MGEN_DIR     = $(QPID_DIR)/cpp/managementgen
 TEMPLATE_DIR = $(MGEN_DIR)/templates
 MGEN         = $(MGEN_DIR)/main.py
-OBJ_DIR      = $(SRC_DIR)/.libs
 
 vpath %.cpp $(SRC_DIR):$(GEN_DIR)
 vpath %.d   $(OBJ_DIR)
@@ -43,15 +43,17 @@
 deps    = $(addsuffix .d, $(basename $(cpps)))
 objects = $(addsuffix .o, $(basename $(cpps)))
 
-.PHONY: all clean
+.PHONY: all clean gen
 
 #==========================================================
 # Pass 0: generate source files from schema
 ifeq ($(MAKELEVEL), 0)
 
-all:
-	$(MGEN) $(SCHEMA_FILE) $(SPEC_DIR)/management-types.xml $(TEMPLATE_DIR) $(GEN_DIR)
-	$(MAKE)
+all: gen
+	@$(MAKE)
+
+gen:
+	$(MGEN) $(SCHEMA_FILE) $(TYPE_FILE) $(TEMPLATE_DIR) $(GEN_DIR)
 
 clean:
 	rm -rf $(GEN_DIR) $(OUT_FILE) *.d *.o
@@ -62,7 +64,7 @@
 else ifeq ($(MAKELEVEL), 1)
 
 all: $(deps)
-	$(MAKE)
+	@$(MAKE)
 
 %.d : %.cpp
 	$(CC) -M $(CC_FLAGS) $(CC_INCLUDES) $< > $@

Modified: incubator/qpid/trunk/qpid/cpp/examples/qmf-agent/example.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/qmf-agent/example.cpp?rev=681362&r1=681361&r2=681362&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/qmf-agent/example.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/examples/qmf-agent/example.cpp Thu Jul 31 06:15:16 2008
@@ -22,7 +22,10 @@
 #include <qpid/management/Manageable.h>
 #include <qpid/management/ManagementObject.h>
 #include <qpid/agent/ManagementAgent.h>
+#include <qpid/sys/Mutex.h>
 #include "Parent.h"
+#include "Child.h"
+#include "ArgsParentCreate_child.h"
 #include "PackageQmf_example.h"
 
 #include <unistd.h>
@@ -32,7 +35,14 @@
 #include <sstream>
 
 using namespace qpid::management;
+using namespace qpid::sys;
 using namespace std;
+using qpid::management::ManagementObject;
+using qpid::management::Manageable;
+using qpid::management::Args;
+using qpid::sys::Mutex;
+
+class ChildClass;
 
 //==============================================================
 // CoreClass is the operational class that corresponds to the
@@ -40,21 +50,44 @@
 //==============================================================
 class CoreClass : public Manageable
 {
-    string  name;
+    string           name;
+    ManagementAgent* agent;
     Parent* mgmtObject;
+    std::vector<ChildClass*> children;
+    Mutex vectorLock;
 
 public:
 
     CoreClass(ManagementAgent* agent, string _name);
-    ~CoreClass() {}
+    ~CoreClass() { mgmtObject->resourceDestroy(); }
+
+    ManagementObject* GetManagementObject(void) const
+    { return mgmtObject; }
+
+    void doLoop();
+    status_t ManagementMethod (uint32_t methodId, Args& args);
+};
+
+class ChildClass : public Manageable
+{
+    string name;
+    Child* mgmtObject;
+
+public:
 
-    void bumpCounter() { mgmtObject->inc_count(); }
+    ChildClass(ManagementAgent* agent, CoreClass* parent, string name);
+    ~ChildClass() { mgmtObject->resourceDestroy(); }
 
     ManagementObject* GetManagementObject(void) const
     { return mgmtObject; }
+
+    void doWork()
+    {
+        mgmtObject->inc_count(2);
+    }
 };
 
-CoreClass::CoreClass(ManagementAgent* agent, string _name) : name(_name)
+CoreClass::CoreClass(ManagementAgent* _agent, string _name) : name(_name), agent(_agent)
 {
     mgmtObject = new Parent(agent, this, name);
 
@@ -62,6 +95,52 @@
     mgmtObject->set_state("IDLE");
 }
 
+void CoreClass::doLoop()
+{
+    // Periodically bump a counter to provide a changing statistical value
+    while (1) {
+        sleep(1);
+        mgmtObject->inc_count();
+        mgmtObject->set_state("IN LOOP");
+
+        {
+            Mutex::ScopedLock _lock(vectorLock);
+
+            for (std::vector<ChildClass*>::iterator iter = children.begin();
+                 iter != children.end();
+                 iter++) {
+                (*iter)->doWork();
+            }
+        }
+    }
+}
+
+Manageable::status_t CoreClass::ManagementMethod(uint32_t methodId, Args& args)
+{
+    Mutex::ScopedLock _lock(vectorLock);
+
+    switch (methodId) {
+    case Parent::METHOD_CREATE_CHILD:
+        ArgsParentCreate_child& ioArgs = (ArgsParentCreate_child&) args;
+
+        ChildClass *child = new ChildClass(agent, this, ioArgs.i_name);
+        ioArgs.o_childRef = child->GetManagementObject()->getObjectId();
+
+        children.push_back(child);
+
+        return STATUS_OK;
+    }
+
+    return STATUS_NOT_IMPLEMENTED;
+}
+
+ChildClass::ChildClass(ManagementAgent* agent, CoreClass* parent, string name)
+{
+    mgmtObject = new Child(agent, this, parent, name);
+
+    agent->addObject(mgmtObject);
+}
+
 
 //==============================================================
 // Main program
@@ -79,19 +158,14 @@
 
     // Start the agent.  It will attempt to make a connection to the
     // management broker
-    agent->init (string(host), port);
+    agent->init(string(host), port);
 
     // Allocate some core objects
     CoreClass core1(agent, "Example Core Object #1");
     CoreClass core2(agent, "Example Core Object #2");
     CoreClass core3(agent, "Example Core Object #3");
 
-    // Periodically bump a counter in core1 to provide a changing statistical value
-    while (1)
-    {
-        sleep(1);
-        core1.bumpCounter();
-    }
+    core1.doLoop();
 }
 
 

Modified: incubator/qpid/trunk/qpid/cpp/managementgen/schema.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/managementgen/schema.py?rev=681362&r1=681361&r2=681362&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/managementgen/schema.py (original)
+++ incubator/qpid/trunk/qpid/cpp/managementgen/schema.py Thu Jul 31 06:15:16 2008
@@ -577,7 +577,7 @@
     return self.name
 
   def getFullName (self):
-    return self.parent.getName().capitalize() + self.name[0:1].upper() +\
+    return capitalize(self.parent.getName()) + self.name[0:1].upper() +\
            self.name[1:]
 
   def getArgCount (self):
@@ -644,7 +644,7 @@
     return self.name
 
   def getFullName (self):
-    return self.parent.getName ().capitalize() + self.name.capitalize ()
+    return capitalize(self.parent.getName()) + capitalize(self.name)
 
   def getArgCount (self):
     return len (self.args)
@@ -938,7 +938,7 @@
       method.genSchema (stream, variables)
 
   def genNameCap (self, stream, variables):
-    stream.write (self.name.capitalize ())
+    stream.write (capitalize(self.name))
 
   def genNameLower (self, stream, variables):
     stream.write (self.name.lower ())

Modified: incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.h?rev=681362&r1=681361&r2=681362&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.h (original)
+++ incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.h Thu Jul 31 06:15:16 2008
@@ -86,9 +86,9 @@
 
     /*MGEN:Class.SetGeneralReferenceDeclaration*/
 
-    std::string getPackageName (void) { return packageName; }
-    std::string getClassName   (void) { return className; }
-    uint8_t*    getMd5Sum      (void) { return md5Sum; }
+    std::string& getPackageName (void) { return packageName; }
+    std::string& getClassName   (void) { return className; }
+    uint8_t*     getMd5Sum      (void) { return md5Sum; }
 
     // Method IDs
 /*MGEN:Class.MethodIdDeclarations*/

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp?rev=681362&r1=681361&r2=681362&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp Thu Jul 31 06:15:16 2008
@@ -67,16 +67,21 @@
 }
 
 ManagementAgentImpl::ManagementAgentImpl() :
-    clientWasAdded(true), objIdPrefix(0), bgThread(*this), thread(bgThread)
+    clientWasAdded(true), objIdPrefix(0), bgThread(*this), thread(bgThread), startupWait(false)
 {
     // TODO: Establish system ID
 }
 
-void ManagementAgentImpl::init (std::string brokerHost,
-                                uint16_t    brokerPort,
-                                uint16_t    intervalSeconds,
-                                bool        useExternalThread)
+void ManagementAgentImpl::init(std::string brokerHost,
+                               uint16_t    brokerPort,
+                               uint16_t    intervalSeconds,
+                               bool        useExternalThread)
 {
+    {
+        Mutex::ScopedLock lock(agentLock);
+        startupWait = true;
+    }
+
     interval     = intervalSeconds;
     extThread    = useExternalThread;
     nextObjectId = 1;
@@ -92,17 +97,17 @@
 
     session.queueDeclare (arg::queue=queueName.str());
     session.exchangeBind (arg::exchange="amq.direct", arg::queue=queueName.str(),
-                          arg::bindingKey=queueName.str ());
+                          arg::bindingKey=queueName.str());
     session.messageSubscribe (arg::queue=queueName.str(),
                               arg::destination=dest);
     session.messageFlow (arg::destination=dest, arg::unit=0, arg::value=0xFFFFFFFF);
     session.messageFlow (arg::destination=dest, arg::unit=1, arg::value=0xFFFFFFFF);
 
     Message attachRequest;
-    char    rawbuffer[512];   // TODO: Modify Buffer so it can use stringstream
+    char    rawbuffer[512];
     Buffer  buffer (rawbuffer, 512);
 
-    attachRequest.getDeliveryProperties().setRoutingKey("agent");
+    attachRequest.getDeliveryProperties().setRoutingKey("broker");
     attachRequest.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str()));
 
     EncodeHeader (buffer, 'A');
@@ -115,15 +120,22 @@
     string stringBuffer (rawbuffer, length);
     attachRequest.setData (stringBuffer);
 
-    session.messageTransfer (arg::content=attachRequest, arg::destination="qpid.management");
+    session.messageTransfer(arg::content=attachRequest, arg::destination="qpid.management");
 
-    dispatcher->listen (dest, this);
-    dispatcher->start ();
+    dispatcher->listen(dest, this);
+    dispatcher->start();
+
+    {
+        Mutex::ScopedLock lock(agentLock);
+        if (startupWait)
+            startupCond.wait(agentLock);
+    }
 }
 
-ManagementAgentImpl::~ManagementAgentImpl ()
+ManagementAgentImpl::~ManagementAgentImpl()
 {
-    dispatcher->stop ();
+    dispatcher->stop();
+    session.close();
     delete dispatcher;
 }
 
@@ -151,24 +163,33 @@
     return objectId;
 }
 
-uint32_t ManagementAgentImpl::pollCallbacks (uint32_t /*callLimit*/)
+uint32_t ManagementAgentImpl::pollCallbacks(uint32_t /*callLimit*/)
 {
     return 0;
 }
 
-int ManagementAgentImpl::getSignalFd (void)
+int ManagementAgentImpl::getSignalFd(void)
 {
     return -1;
 }
 
-void ManagementAgentImpl::handleAttachResponse (Buffer& inBuffer)
+void ManagementAgentImpl::handleAttachResponse(Buffer& inBuffer)
 {
     Mutex::ScopedLock lock(agentLock);
     uint32_t assigned;
+    stringstream key;
 
     assigned = inBuffer.getLong();
     objIdPrefix = ((uint64_t) assigned) << 24;
 
+    startupWait = false;
+    startupCond.notify();
+
+    // Bind to qpid.management to receive commands
+    key << "agent." << assigned;
+    session.exchangeBind (arg::exchange="qpid.management", arg::queue=queueName.str(),
+                          arg::bindingKey=key.str());
+
     // Send package indications for all local packages
     for (PackageMap::iterator pIter = packages.begin();
          pIter != packages.end();
@@ -180,7 +201,7 @@
         EncodePackageIndication(outBuffer, pIter);
         outLen = MA_BUFFER_SIZE - outBuffer.available ();
         outBuffer.reset();
-        SendBuffer(outBuffer, outLen, "qpid.management", "agent");
+        SendBuffer(outBuffer, outLen, "qpid.management", "broker");
 
         // Send class indications for all local classes
         ClassMap cMap = pIter->second;
@@ -190,7 +211,7 @@
             EncodeClassIndication(outBuffer, pIter, cIter);
             outLen = MA_BUFFER_SIZE - outBuffer.available ();
             outBuffer.reset();
-            SendBuffer(outBuffer, outLen, "qpid.management", "agent");
+            SendBuffer(outBuffer, outLen, "qpid.management", "broker");
         }
     }
 }
@@ -218,7 +239,7 @@
              schema.writeSchemaCall(outBuffer);
              outLen = MA_BUFFER_SIZE - outBuffer.available ();
              outBuffer.reset();
-             SendBuffer(outBuffer, outLen, "qpid.management", "agent");
+             SendBuffer(outBuffer, outLen, "qpid.management", "broker");
         }
     }
 }
@@ -229,18 +250,50 @@
     clientWasAdded = true;
 }
 
-void ManagementAgentImpl::received (Message& msg)
+void ManagementAgentImpl::handleMethodRequest(Buffer& inBuffer, uint32_t sequence, string replyTo)
+{
+    string   methodName;
+    Buffer   outBuffer (outputBuffer, MA_BUFFER_SIZE);
+    uint32_t outLen;
+
+    uint64_t objId = inBuffer.getLongLong();
+    inBuffer.getShortString(methodName);
+
+    EncodeHeader(outBuffer, 'm', sequence);
+
+    ManagementObjectMap::iterator iter = managementObjects.find(objId);
+    if (iter == managementObjects.end() || iter->second->isDeleted()) {
+        outBuffer.putLong        (Manageable::STATUS_UNKNOWN_OBJECT);
+        outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT));
+    } else {
+        iter->second->doMethod(methodName, inBuffer, outBuffer);
+    }
+
+    outLen = MA_BUFFER_SIZE - outBuffer.available();
+    outBuffer.reset();
+    SendBuffer(outBuffer, outLen, "amq.direct", replyTo);
+}
+
+void ManagementAgentImpl::received(Message& msg)
 {
-    string   data = msg.getData ();
-    Buffer   inBuffer (const_cast<char*>(data.c_str()), data.size());
+    string   data = msg.getData();
+    Buffer   inBuffer(const_cast<char*>(data.c_str()), data.size());
     uint8_t  opcode;
     uint32_t sequence;
+    string   replyToKey;
+
+    framing::MessageProperties p = msg.getMessageProperties();
+    if (p.hasReplyTo()) {
+        const framing::ReplyTo& rt = p.getReplyTo();
+        replyToKey = rt.getRoutingKey();
+    }
 
     if (CheckHeader (inBuffer, &opcode, &sequence))
     {
         if      (opcode == 'a') handleAttachResponse(inBuffer);
         else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence);
         else if (opcode == 'x') handleConsoleAddedIndication();
+        else if (opcode == 'M') handleMethodRequest(inBuffer, sequence, replyToKey);
     }
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h?rev=681362&r1=681361&r2=681362&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h Thu Jul 31 06:15:16 2008
@@ -30,6 +30,7 @@
 #include "qpid/sys/Thread.h"
 #include "qpid/sys/Runnable.h"
 #include "qpid/sys/Mutex.h"
+#include "qpid/sys/Condition.h"
 #include "qpid/framing/Uuid.h"
 #include <iostream>
 #include <sstream>
@@ -127,6 +128,8 @@
 
     BackgroundThread bgThread;
     sys::Thread      thread;
+    sys::Condition   startupCond;
+    bool             startupWait;
 
     PackageMap::iterator FindOrAddPackage (std::string name);
     void moveNewObjectsLH();
@@ -149,6 +152,7 @@
     void handlePackageRequest (qpid::framing::Buffer& inBuffer);
     void handleClassQuery     (qpid::framing::Buffer& inBuffer);
     void handleSchemaRequest  (qpid::framing::Buffer& inBuffer, uint32_t sequence);
+    void handleMethodRequest  (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo);
     void handleConsoleAddedIndication();
 };
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp?rev=681362&r1=681361&r2=681362&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp Thu Jul 31 06:15:16 2008
@@ -335,8 +335,7 @@
         return Manageable::STATUS_OK;
 
     case management::Link::METHOD_BRIDGE :
-        management::ArgsLinkBridge iargs =
-            dynamic_cast<const management::ArgsLinkBridge&>(args);
+        management::ArgsLinkBridge& iargs = (management::ArgsLinkBridge&) args;
 
         // Durable bridges are only valid on durable links
         if (iargs.i_durable && !durable)

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp?rev=681362&r1=681361&r2=681362&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp Thu Jul 31 06:15:16 2008
@@ -97,6 +97,7 @@
     // objects that will be invalid.
     dExchange.reset();
     mExchange.reset();
+    timer.stop();
 
     moveNewObjectsLH();
     for (ManagementObjectMap::iterator iter = managementObjects.begin ();
@@ -117,33 +118,33 @@
     timer.add (intrusive_ptr<TimerTask> (new Periodic(*this, interval)));
 
     // Get from file or generate and save to file.
-    if (dataDir.empty ())
+    if (dataDir.empty())
     {
-        uuid.generate ();
+        uuid.generate();
         QPID_LOG (info, "ManagementBroker has no data directory, generated new broker ID: "
                   << uuid);
     }
     else
     {
-        string   filename (dataDir + "/.mbrokerdata");
-        ifstream inFile   (filename.c_str ());
+        string   filename(dataDir + "/.mbrokerdata");
+        ifstream inFile(filename.c_str ());
 
-        if (inFile.good ())
+        if (inFile.good())
         {
             inFile >> uuid;
             inFile >> bootSequence;
             inFile >> nextRemoteBank;
-            inFile.close ();
+            inFile.close();
             QPID_LOG (debug, "ManagementBroker restored broker ID: " << uuid);
 
             bootSequence++;
-            writeData ();
+            writeData();
         }
         else
         {
-            uuid.generate ();
+            uuid.generate();
             QPID_LOG (info, "ManagementBroker generated broker ID: " << uuid);
-            writeData ();
+            writeData();
         }
 
         QPID_LOG (debug, "ManagementBroker boot sequence: " << bootSequence);
@@ -155,10 +156,10 @@
     string   filename (dataDir + "/.mbrokerdata");
     ofstream outFile (filename.c_str ());
 
-    if (outFile.good ())
+    if (outFile.good())
     {
         outFile << uuid << " " << bootSequence << " " << nextRemoteBank << endl;
-        outFile.close ();
+        outFile.close();
     }
 }
 
@@ -174,7 +175,7 @@
                                       uint8_t* md5Sum,
                                       ManagementObject::writeSchemaCall_t schemaCall)
 {
-    Mutex::ScopedLock lock (userLock);
+    Mutex::ScopedLock lock(userLock);
     PackageMap::iterator pIter = FindOrAddPackageLH(packageName);
     AddClass(pIter, className, md5Sum, schemaCall);
 }
@@ -391,124 +392,64 @@
     SendBuffer (outBuffer, outLen, dExchange, replyToKey);
 }
 
-void ManagementBroker::dispatchCommand (Deliverable&      deliverable,
+bool ManagementBroker::dispatchCommand (Deliverable&      deliverable,
                                         const string&     routingKey,
                                         const FieldTable* /*args*/)
 {
     Mutex::ScopedLock lock (userLock);
     Message&  msg = ((DeliverableMessage&) deliverable).getMessage ();
 
-    if (routingKey.compare (0, 13, "agent.method.") == 0)
-        dispatchMethodLH (msg, routingKey, 13);
+    // Parse the routing key.  This management broker should act as though it
+    // is bound to the exchange to match the following keys:
+    //
+    //    agent.<X>.#
+    //    broker.#
+    //
+    // where <X> is any non-negative decimal integer less than the lowest remote
+    // object-id bank.
 
-    else if (routingKey.length () == 5 &&
-        routingKey.compare (0, 5, "agent") == 0)
+    if (routingKey == "broker") {
         dispatchAgentCommandLH (msg);
-
-    else
-    {
-        QPID_LOG (debug, "Illegal routing key for dispatch: " << routingKey);
-        return;
-    }
-}
-
-void ManagementBroker::dispatchMethodLH (Message&      msg,
-                                         const string& routingKey,
-                                         size_t        first)
-{
-    size_t    pos, start = first;
-    uint32_t  contentSize;
-
-    if (routingKey.length () == start)
-    {
-        QPID_LOG (debug, "Missing package-name in routing key: " << routingKey);
-        return;
-    }
-
-    pos = routingKey.find ('.', start);
-    if (pos == string::npos || routingKey.length () == pos + 1)
-    {
-        QPID_LOG (debug, "Missing class-name in routing key: " << routingKey);
-        return;
+        return false;
     }
 
-    string packageName = routingKey.substr (start, pos - start);
-
-    start = pos + 1;
-    pos = routingKey.find ('.', start);
-    if (pos == string::npos || routingKey.length () == pos + 1)
-    {
-        QPID_LOG (debug, "Missing method-name in routing key: " << routingKey);
-        return;
+    else if (routingKey.compare(0, 6, "agent.") == 0) {
+        uint32_t delim = routingKey.find('.', 6);
+        if (delim == string::npos)
+            delim = routingKey.length();
+        string bank = routingKey.substr(6, delim - 6);
+        if ((uint32_t) atoi(bank.c_str()) <= localBank) {
+            dispatchAgentCommandLH (msg);
+            return false;
+        }
     }
 
-    string className = routingKey.substr (start, pos - start);
-
-    start = pos + 1;
-    string methodName = routingKey.substr (start, routingKey.length () - start);
-
-    contentSize = msg.encodedContentSize ();
-    if (contentSize < 8 || contentSize > MA_BUFFER_SIZE)
-        return;
+    return true;
+}
 
-    Buffer   inBuffer  (inputBuffer,  MA_BUFFER_SIZE);
+void ManagementBroker::handleMethodRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence)
+{
+    string   methodName;
     Buffer   outBuffer (outputBuffer, MA_BUFFER_SIZE);
-    uint32_t outLen, sequence;
-    uint8_t  opcode;
-
-    if (msg.encodedSize() > MA_BUFFER_SIZE) {
-        QPID_LOG(debug, "ManagementBroker::dispatchMethodLH: Message too large: " <<
-                 msg.encodedSize());
-        return;
-    }
-
-    msg.encodeContent (inBuffer);
-    inBuffer.reset ();
-
-    if (!CheckHeader (inBuffer, &opcode, &sequence))
-    {
-        QPID_LOG (debug, "    Invalid content header");
-        return;
-    }
-
-    if (opcode != 'M')
-    {
-        QPID_LOG (debug, "    Unexpected opcode " << opcode);
-        return;
-    }
-
-    uint64_t objId = inBuffer.getLongLong ();
-    string   replyToKey;
+    uint32_t outLen;
 
-    const framing::MessageProperties* p =
-        msg.getFrames().getHeaders()->get<framing::MessageProperties>();
-    if (p && p->hasReplyTo())
-    {
-        const framing::ReplyTo& rt = p->getReplyTo ();
-        replyToKey = rt.getRoutingKey ();
-    }
-    else
-    {
-        QPID_LOG (debug, "    Reply-to missing");
-        return;
-    }
+    uint64_t objId = inBuffer.getLongLong();
+    inBuffer.getShortString(methodName);
 
-    EncodeHeader (outBuffer, 'm', sequence);
+    std::cout << "ManagementBroker::handleMethodRequest (" << objId << ", " << methodName << ")" << std::endl;
+    EncodeHeader(outBuffer, 'm', sequence);
 
-    ManagementObjectMap::iterator iter = managementObjects.find (objId);
-    if (iter == managementObjects.end () || iter->second->isDeleted ())
-    {
+    ManagementObjectMap::iterator iter = managementObjects.find(objId);
+    if (iter == managementObjects.end() || iter->second->isDeleted()) {
         outBuffer.putLong        (Manageable::STATUS_UNKNOWN_OBJECT);
         outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT));
-    }
-    else
-    {
-        iter->second->doMethod (methodName, inBuffer, outBuffer);
+    } else {
+        iter->second->doMethod(methodName, inBuffer, outBuffer);
     }
 
-    outLen = MA_BUFFER_SIZE - outBuffer.available ();
-    outBuffer.reset ();
-    SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+    outLen = MA_BUFFER_SIZE - outBuffer.available();
+    outBuffer.reset();
+    SendBuffer(outBuffer, outLen, dExchange, replyToKey);
 }
 
 void ManagementBroker::handleBrokerRequestLH (Buffer&, string replyToKey, uint32_t sequence)
@@ -737,6 +678,8 @@
     requestedBank = inBuffer.getLong ();
     assignedBank  = assignBankLH (requestedBank);
 
+    // TODO: Make a pass over the agents and delete any that no longer have a session.
+
     RemoteAgentMap::iterator aIter = remoteAgents.find (sessionName);
     if (aIter != remoteAgents.end())
     {
@@ -755,6 +698,7 @@
     agent->mgmtObject->set_label        (label);
     agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId());
     agent->mgmtObject->set_systemId     (systemId);
+    agent->mgmtObject->set_objectIdBank (assignedBank);
     addObject (agent->mgmtObject);
 
     remoteAgents[sessionName] = agent;
@@ -818,10 +762,9 @@
 
     const framing::MessageProperties* p =
         msg.getFrames().getHeaders()->get<framing::MessageProperties>();
-    if (p && p->hasReplyTo())
-    {
-        const framing::ReplyTo& rt = p->getReplyTo ();
-        replyToKey = rt.getRoutingKey ();
+    if (p && p->hasReplyTo()) {
+        const framing::ReplyTo& rt = p->getReplyTo();
+        replyToKey = rt.getRoutingKey();
     }
     else
         return;
@@ -832,10 +775,10 @@
         return;
     }
 
-    msg.encodeContent (inBuffer);
-    inBuffer.reset ();
+    msg.encodeContent(inBuffer);
+    inBuffer.reset();
 
-    if (!CheckHeader (inBuffer, &opcode, &sequence))
+    if (!CheckHeader(inBuffer, &opcode, &sequence))
         return;
 
     if      (opcode == 'B') handleBrokerRequestLH  (inBuffer, replyToKey, sequence);
@@ -847,6 +790,7 @@
     else if (opcode == 's') handleSchemaResponseLH (inBuffer, replyToKey, sequence);
     else if (opcode == 'A') handleAttachRequestLH  (inBuffer, replyToKey, sequence);
     else if (opcode == 'G') handleGetQueryLH       (inBuffer, replyToKey, sequence);
+    else if (opcode == 'M') handleMethodRequestLH  (inBuffer, replyToKey, sequence);
 }
 
 ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackageLH(std::string name)

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h?rev=681362&r1=681361&r2=681362&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h Thu Jul 31 06:15:16 2008
@@ -59,7 +59,7 @@
                           uint32_t          persistId   = 0,
                           uint32_t          persistBank = 4);
     void clientAdded     (void);
-    void dispatchCommand (broker::Deliverable&       msg,
+    bool dispatchCommand (broker::Deliverable&       msg,
                           const std::string&         routingKey,
                           const framing::FieldTable* args);
 
@@ -177,9 +177,6 @@
                              std::string                  routingKey);
     void moveNewObjectsLH();
 
-    void dispatchMethodLH (broker::Message&   msg,
-                           const std::string& routingKey,
-                           size_t             first);
     void dispatchAgentCommandLH (broker::Message& msg);
 
     PackageMap::iterator FindOrAddPackageLH(std::string name);
@@ -206,6 +203,7 @@
     void handleSchemaResponseLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
     void handleAttachRequestLH  (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
     void handleGetQueryLH       (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+    void handleMethodRequestLH  (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
 
     size_t ValidateSchema(framing::Buffer&);
 };

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.cpp?rev=681362&r1=681361&r2=681362&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.cpp Thu Jul 31 06:15:16 2008
@@ -40,17 +40,16 @@
                                 const string&     routingKey,
                                 const FieldTable* args)
 {
+    bool routeIt = true;
+
     // Intercept management agent commands
-    if ((routingKey.length () > 6 &&
-         routingKey.substr (0, 6).compare ("agent.") == 0) ||
-        (routingKey.length () == 5 &&
-         routingKey.substr (0, 5).compare ("agent") == 0))
-    {
-        managementAgent->dispatchCommand (msg, routingKey, args);
-        return;
-    }
+    if ((routingKey.length() > 6 &&
+         routingKey.substr(0, 6).compare("agent.") == 0) ||
+        (routingKey == "broker"))
+        routeIt = managementAgent->dispatchCommand(msg, routingKey, args);
 
-    TopicExchange::route (msg, routingKey, args);
+    if (routeIt)
+        TopicExchange::route(msg, routingKey, args);
 }
 
 bool ManagementExchange::bind (Queue::shared_ptr queue,

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h?rev=681362&r1=681361&r2=681362&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h Thu Jul 31 06:15:16 2008
@@ -98,8 +98,8 @@
                                  qpid::framing::Buffer& outBuf) = 0;
     virtual void setReference   (uint64_t objectId);
 
-    virtual std::string  getClassName   (void) = 0;
-    virtual std::string  getPackageName (void) = 0;
+    virtual std::string& getClassName   (void) = 0;
+    virtual std::string& getPackageName (void) = 0;
     virtual uint8_t*     getMd5Sum      (void) = 0;
 
     void         setObjectId      (uint64_t oid) { objectId = oid; }

Modified: incubator/qpid/trunk/qpid/python/commands/qpid-tool
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/commands/qpid-tool?rev=681362&r1=681361&r2=681362&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/commands/qpid-tool (original)
+++ incubator/qpid/trunk/qpid/python/commands/qpid-tool Thu Jul 31 06:15:16 2008
@@ -65,6 +65,7 @@
     print "    schema <className>              - Print details of an object class"
     print "    set time-format short           - Select short timestamp format (default)"
     print "    set time-format long            - Select long timestamp format"
+    print "    id [<ID>]                       - Display translations of display object ids"
     print "    quit or ^D                      - Exit the program"
     print
 
@@ -93,6 +94,9 @@
     except:
       pass
 
+  def do_id (self, data):
+    self.dataObject.do_id(data)
+
   def complete_schema (self, text, line, begidx, endidx):
     tokens = split (line)
     if len (tokens) > 2:

Modified: incubator/qpid/trunk/qpid/python/qpid/management.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/management.py?rev=681362&r1=681361&r2=681362&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/management.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/management.py Thu Jul 31 06:15:16 2008
@@ -154,7 +154,7 @@
   def accept (self, msg):
     self.qpidChannel.message_accept(RangedSet(msg.id))
 
-  def message (self, body, routing_key="agent"):
+  def message (self, body, routing_key="broker"):
     dp = self.qpidChannel.delivery_properties()
     dp.routing_key  = routing_key
     mp = self.qpidChannel.message_properties()
@@ -227,14 +227,14 @@
     """ Invoke a method on a managed object. """
     self.method (channel, userSequence, objId, className, methodName, args)
 
-  def getObjects (self, channel, userSequence, className):
+  def getObjects (self, channel, userSequence, className, bank=0):
     """ Request immediate content from broker """
     codec = Codec (self.spec)
     self.setHeader (codec, ord ('G'), userSequence)
     ft = {}
     ft["_class"] = className
     codec.write_map (ft)
-    msg = channel.message(codec.encoded)
+    msg = channel.message(codec.encoded, routing_key="agent.%d" % bank)
     channel.send ("qpid.management", msg)
 
   def syncWaitForStable (self, channel):
@@ -273,14 +273,14 @@
     self.cv.release ()
     return result
 
-  def syncGetObjects (self, channel, className):
+  def syncGetObjects (self, channel, className, bank=0):
     """ Synchronous (blocking) get call """
     self.cv.acquire ()
     self.syncInFlight = True
     self.syncResult   = []
     self.syncSequence = self.seqMgr.reserve ("sync")
     self.cv.release ()
-    self.getObjects (channel, self.syncSequence, className)
+    self.getObjects (channel, self.syncSequence, className, bank)
     self.cv.acquire ()
     starttime = time ()
     while self.syncInFlight:
@@ -748,6 +748,8 @@
     sequence = self.seqMgr.reserve ((userSequence, classId, methodName))
     self.setHeader (codec, ord ('M'), sequence)
     codec.write_uint64 (objId)       # ID of object
+    codec.write_str8 (methodName)
+    bank = (objId & 0x0000FFFFFF000000) >> 24
 
     # Encode args according to schema
     if classId not in self.schema:
@@ -777,6 +779,5 @@
 
     packageName = classId[0]
     className   = classId[1]
-    msg = channel.message(codec.encoded, "agent.method." + packageName + "." + \
-                            className + "." + methodName)
+    msg = channel.message(codec.encoded, "agent." + str(bank))
     channel.send ("qpid.management", msg)

Modified: incubator/qpid/trunk/qpid/python/qpid/managementdata.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/managementdata.py?rev=681362&r1=681361&r2=681362&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/managementdata.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/managementdata.py Thu Jul 31 06:15:16 2008
@@ -167,10 +167,14 @@
     if self.cli != None:
       self.cli.setPromptMessage ("Broker Disconnected")
 
-  def schemaHandler (self, context, className, configs, insts, methods, events):
+  def schemaHandler (self, context, classKey, configs, insts, methods, events):
     """ Callback for schema updates """
-    if className not in self.schema:
-      self.schema[className] = (configs, insts, methods, events)
+    if classKey not in self.schema:
+      schemaRev = 0
+      for key in self.schema:
+        if classKey[0] == key[0] and classKey[1] == key[1]:
+          schemaRev += 1
+      self.schema[classKey] = (configs, insts, methods, events, schemaRev)
 
   def setCli (self, cliobj):
     self.cli = cliobj
@@ -248,17 +252,17 @@
             return str (value)
     return "*type-error*"
 
-  def getObjIndex (self, className, config):
+  def getObjIndex (self, classKey, config):
     """ Concatenate the values from index columns to form a unique object name """
     result = ""
-    schemaConfig = self.schema[className][0]
+    schemaConfig = self.schema[classKey][0]
     for item in schemaConfig:
       if item[5] == 1 and item[0] != "id":
         if result != "":
           result = result + "."
         for key,val in config:
           if key == item[0]:
-            result = result + self.valueDisplay (className, key, val)
+            result = result + self.valueDisplay (classKey, key, val)
     return result
 
   def getClassKey (self, className):
@@ -268,11 +272,17 @@
         if key[1] == className:
           return key
     else:
-      package = className[0:dotPos]
-      name    = className[dotPos + 1:]
+      package   = className[0:dotPos]
+      name      = className[dotPos + 1:]
+      schemaRev = 0
+      delim = name.find(".")
+      if delim != -1:
+        schemaRev = int(name[delim + 1:])
+        name      = name[0:delim]
       for key in self.schema:
         if key[0] == package and key[1] == name:
-          return key
+          if self.schema[key][4] == schemaRev:
+            return key
     return None
 
   def classCompletions (self, prefix):
@@ -508,7 +518,11 @@
       sorted.sort ()
       for classKey in sorted:
         tuple = self.schema[classKey]
-        className = classKey[0] + "." + classKey[1]
+        if tuple[4] == 0:
+          suffix = ""
+        else:
+          suffix = ".%d" % tuple[4]
+        className = classKey[0] + "." + classKey[1] + suffix
         row = (className, len (tuple[0]), len (tuple[1]), len (tuple[2]), len (tuple[3]))
         rows.append (row)
       self.disp.table ("Classes in Schema:",
@@ -527,6 +541,7 @@
         raise ValueError ()
 
       rows = []
+      schemaRev =  self.schema[classKey][4]
       for config in self.schema[classKey][0]:
         name     = config[0]
         if name != "id":
@@ -554,7 +569,7 @@
           rows.append ((name, typename, unit, "", "", desc))
 
       titles = ("Element", "Type", "Unit", "Access", "Notes", "Description")
-      self.disp.table ("Schema for class '%s.%s':" % (classKey[0], classKey[1]), titles, rows)
+      self.disp.table ("Schema for class '%s.%s.%d':" % (classKey[0], classKey[1], schemaRev), titles, rows)
 
       for mname in self.schema[classKey][2]:
         (mdesc, args) = self.schema[classKey][2][mname]
@@ -603,13 +618,20 @@
         raise ValueError ()
 
       schemaMethod = self.schema[classKey][2][methodName]
-      if len (args) != len (schemaMethod[1]):
-        print "Wrong number of method args: Need %d, Got %d" % (len (schemaMethod[1]), len (args))
+      count = 0
+      for arg in range(len(schemaMethod[1])):
+        if schemaMethod[1][arg][2].find("I") != -1:
+          count += 1
+      if len (args) != count:
+        print "Wrong number of method args: Need %d, Got %d" % (count, len (args))
         raise ValueError ()
 
       namedArgs = {}
-      for idx in range (len (args)):
-        namedArgs[schemaMethod[1][idx][0]] = args[idx]
+      idx = 0
+      for arg in range(len(schemaMethod[1])):
+        if schemaMethod[1][arg][2].find("I") != -1:
+          namedArgs[schemaMethod[1][arg][0]] = args[idx]
+          idx += 1
 
       self.methodSeq = self.methodSeq + 1
       self.methodsPending[self.methodSeq] = methodName
@@ -623,6 +645,35 @@
 #      except ValueError, e:
 #        print "Error invoking method:", e
 
+  def makeIdRow (self, displayId):
+    if displayId in self.idMap:
+      rawId = self.idMap[displayId]
+    else:
+      return None
+    return (displayId,
+            rawId,
+            (rawId & 0x7FFF000000000000) >> 48,
+            (rawId & 0x0000FFFFFF000000) >> 24,
+            (rawId & 0x0000000000FFFFFF))
+
+  def listIds (self, select):
+    rows = []
+    if select == 0:
+      sorted = self.idMap.keys()
+      sorted.sort()
+      for displayId in sorted:
+        row = self.makeIdRow (displayId)
+        rows.append(row)
+    else:
+      row = self.makeIdRow (select)
+      if row == None:
+        print "Display Id %d not known" % select
+        return
+      rows.append(row)
+    self.disp.table("Translation of Display IDs:",
+                    ("DisplayID", "RawID", "BootSequence", "Bank", "Object"),
+                    rows)
+
   def do_list (self, data):
     tokens = data.split ()
     if len (tokens) == 0:
@@ -644,10 +695,17 @@
       print "Not enough arguments supplied"
       return
     
-    userOid    = long (tokens[0])
+    displayId  = long (tokens[0])
     methodName = tokens[1]
     args       = tokens[2:]
-    self.callMethod (userOid, methodName, args)
+    self.callMethod (displayId, methodName, args)
+
+  def do_id (self, data):
+    if data == "":
+      select = 0
+    else:
+      select = int(data)
+    self.listIds(select)
 
   def do_exit (self):
     self.mclient.removeChannel (self.mch)

Modified: incubator/qpid/trunk/qpid/specs/management-schema.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/specs/management-schema.xml?rev=681362&r1=681361&r2=681362&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/specs/management-schema.xml (original)
+++ incubator/qpid/trunk/qpid/specs/management-schema.xml Thu Jul 31 06:15:16 2008
@@ -102,10 +102,11 @@
   ===============================================================
   -->
   <class name="Agent">
-    <property name="sessionName"  type="sstr"  access="RO" index="y" desc="Session ID for Agent"/>
-    <property name="label"        type="sstr"  access="RO"           desc="Label for agent"/>
-    <property name="registeredTo" type="objId" references="Broker" access="RO" desc="Broker agent is registered to"/>
-    <property name="systemId"     type="uuid"  access="RO"           desc="Identifier of system where agent resides"/>
+    <property name="sessionName"  type="sstr"   access="RO" index="y" desc="Session ID for Agent"/>
+    <property name="label"        type="sstr"   access="RO"           desc="Label for agent"/>
+    <property name="registeredTo" type="objId"  references="Broker" access="RO" desc="Broker agent is registered to"/>
+    <property name="systemId"     type="uuid"   access="RO"           desc="Identifier of system where agent resides"/>
+    <property name="objectIdBank" type="uint32" access="RO"           desc="Assigned object-id bank"/>
   </class>
 
   <!--