You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2008/07/11 22:14:08 UTC

svn commit: r676067 - in /incubator/qpid/trunk/qpid: cpp/examples/qmf-agent/ cpp/managementgen/ cpp/managementgen/templates/ cpp/src/ cpp/src/qpid/agent/ cpp/src/qpid/management/ python/qpid/ specs/

Author: tross
Date: Fri Jul 11 13:14:07 2008
New Revision: 676067

URL: http://svn.apache.org/viewvc?rev=676067&view=rev
Log:
QPID-1174 Remote Management Agent for management of external components

Added:
    incubator/qpid/trunk/qpid/cpp/examples/qmf-agent/
    incubator/qpid/trunk/qpid/cpp/examples/qmf-agent/Makefile
    incubator/qpid/trunk/qpid/cpp/examples/qmf-agent/example.cpp
    incubator/qpid/trunk/qpid/cpp/examples/qmf-agent/schema.xml
    incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
Modified:
    incubator/qpid/trunk/qpid/cpp/managementgen/main.py
    incubator/qpid/trunk/qpid/cpp/managementgen/schema.py
    incubator/qpid/trunk/qpid/cpp/managementgen/templates/Package.cpp
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h
    incubator/qpid/trunk/qpid/python/qpid/managementdata.py
    incubator/qpid/trunk/qpid/specs/management-schema.xml

Added: incubator/qpid/trunk/qpid/cpp/examples/qmf-agent/Makefile
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/qmf-agent/Makefile?rev=676067&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/qmf-agent/Makefile (added)
+++ incubator/qpid/trunk/qpid/cpp/examples/qmf-agent/Makefile Fri Jul 11 13:14:07 2008
@@ -0,0 +1,85 @@
+# 
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#  
+#    http://www.apache.org/licenses/LICENSE-2.0
+#  
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+# 
+
+SRC_DIR     = .
+QPID_DIR    = ../../..
+SCHEMA_FILE = $(SRC_DIR)/schema.xml
+GEN_DIR     = $(SRC_DIR)/gen
+OUT_FILE    = $(SRC_DIR)/qmf-agent
+
+CC           = gcc
+LIB_DIR      = $(QPID_DIR)/cpp/src/.libs
+CC_INCLUDES  = -I$(SRC_DIR) -I$(QPID_DIR)/cpp/src -I$(QPID_DIR)/cpp/src/gen -I$(GEN_DIR)
+CC_FLAGS     = -g -O2
+LD_FLAGS     = -lqpidclient -lqpidcommon -L$(LIB_DIR)
+SPEC_DIR     = $(QPID_DIR)/specs
+MGEN_DIR     = $(QPID_DIR)/cpp/managementgen
+TEMPLATE_DIR = $(MGEN_DIR)/templates
+MGEN         = $(MGEN_DIR)/main.py
+OBJ_DIR      = $(SRC_DIR)/.libs
+
+vpath %.cpp $(SRC_DIR):$(GEN_DIR)
+vpath %.d   $(OBJ_DIR)
+vpath %.o   $(OBJ_DIR)
+
+cpps    = $(wildcard $(SRC_DIR)/*.cpp)
+cpps   += $(wildcard $(GEN_DIR)/*.cpp)
+deps    = $(addsuffix .d, $(basename $(cpps)))
+objects = $(addsuffix .o, $(basename $(cpps)))
+
+.PHONY: all clean
+
+#==========================================================
+# Pass 0: generate source files from schema
+ifeq ($(MAKELEVEL), 0)
+
+all:
+	$(MGEN) $(SCHEMA_FILE) $(SPEC_DIR)/management-types.xml $(TEMPLATE_DIR) $(GEN_DIR)
+	$(MAKE)
+
+clean:
+	rm -rf $(GEN_DIR) $(OUT_FILE) *.d *.o
+
+
+#==========================================================
+# Pass 1: generate dependencies
+else ifeq ($(MAKELEVEL), 1)
+
+all: $(deps)
+	$(MAKE)
+
+%.d : %.cpp
+	$(CC) -M $(CC_FLAGS) $(CC_INCLUDES) $< > $@
+
+
+#==========================================================
+# Pass 2: build project
+else ifeq ($(MAKELEVEL), 2)
+
+$(OUT_FILE) : $(objects)
+	$(CC) -o $(OUT_FILE) $(CC_FLAGS) $(LD_FLAGS) $(objects)
+
+include $(deps)
+
+%.o : %.cpp
+	$(CC) -c $(CC_FLAGS) $(CC_INCLUDES) -o $@ $<
+
+endif
+
+

Added: incubator/qpid/trunk/qpid/cpp/examples/qmf-agent/example.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/qmf-agent/example.cpp?rev=676067&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/qmf-agent/example.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/examples/qmf-agent/example.cpp Fri Jul 11 13:14:07 2008
@@ -0,0 +1,97 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/management/Manageable.h>
+#include <qpid/management/ManagementObject.h>
+#include <qpid/agent/ManagementAgent.h>
+#include <qpid/agent/ManagementAgentImpl.h>
+#include "Parent.h"
+#include "PackageQmf_example.h"
+
+#include <unistd.h>
+#include <cstdlib>
+#include <iostream>
+
+#include <sstream>
+
+using namespace qpid::management;
+using namespace std;
+
+//==============================================================
+// CoreClass is the operational class that corresponds to the
+// "Parent" class in the management schema.
+//==============================================================
+class CoreClass : public Manageable
+{
+    string  name;
+    Parent* mgmtObject;
+
+public:
+
+    CoreClass(ManagementAgent* agent, string _name);
+    ~CoreClass() {}
+
+    void bumpCounter() { mgmtObject->inc_count(); }
+
+    ManagementObject* GetManagementObject(void) const
+    { return mgmtObject; }
+};
+
+CoreClass::CoreClass(ManagementAgent* agent, string _name) : name(_name)
+{
+    mgmtObject = new Parent(agent, this, name);
+
+    agent->addObject(mgmtObject);
+    mgmtObject->set_state("IDLE");
+}
+
+
+//==============================================================
+// Main program
+//==============================================================
+int main(int argc, char** argv) {
+    const char* host = argc>1 ? argv[1] : "127.0.0.1";
+    int port = argc>2 ? atoi(argv[2]) : 5672;
+
+    // Create the qmf management agent
+    ManagementAgent* agent = new ManagementAgentImpl();
+
+    // Register the Qmf_example schema with the agent
+    PackageQmf_example packageInit(agent);
+
+    // Start the agent.  It will attempt to make a connection to the
+    // management broker
+    agent->init (string(host), port);
+
+    // Allocate some core objects
+    CoreClass core1(agent, "Example Core Object #1");
+    CoreClass core2(agent, "Example Core Object #2");
+    CoreClass core3(agent, "Example Core Object #3");
+
+    // Periodically bump a counter in core1 to provide a changing statistical value
+    while (1)
+    {
+        sleep(1);
+        core1.bumpCounter();
+    }
+}
+
+

Added: incubator/qpid/trunk/qpid/cpp/examples/qmf-agent/schema.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/qmf-agent/schema.xml?rev=676067&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/qmf-agent/schema.xml (added)
+++ incubator/qpid/trunk/qpid/cpp/examples/qmf-agent/schema.xml Fri Jul 11 13:14:07 2008
@@ -0,0 +1,57 @@
+<schema package="qmf_example">
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+  
+    http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+
+  <!--
+  ===============================================================
+  Parent
+  ===============================================================
+  -->
+  <class name="Parent">
+
+    This class represents a parent object
+
+    <property name="name"      type="sstr" access="RC" index="y"/>
+
+    <statistic name="state" type="sstr"                desc="Operational state of the link"/>
+    <statistic name="count" type="count64" unit="tick" desc="Counter that increases monotonically"/>
+
+    <method name="create_child" desc="Create child object">
+      <arg name="name"     dir="I" type="sstr"/>
+      <arg name="childRef" dir="O" type="objId"/>
+    </method>
+  </class>
+
+
+  <!--
+  ===============================================================
+  Child
+  ===============================================================
+  -->
+  <class name="Child">
+    <property name="ParentRef" type="objId"  references="Parent" access="RC" index="y" parentRef="y"/>
+    <property name="name"      type="sstr"                       access="RC" index="y"/>
+
+    <statistic name="count" type="count64" unit="tick" desc="Counter that increases monotonically"/>
+
+    <method name="delete"/> 
+  </class>
+</schema>
+

Modified: incubator/qpid/trunk/qpid/cpp/managementgen/main.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/managementgen/main.py?rev=676067&r1=676066&r2=676067&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/managementgen/main.py (original)
+++ incubator/qpid/trunk/qpid/cpp/managementgen/main.py Fri Jul 11 13:14:07 2008
@@ -28,9 +28,6 @@
 parser = OptionParser (usage=usage)
 parser.add_option ("-m", "--makefile", dest="makefile", metavar="FILE",
                    help="Makefile fragment")
-parser.add_option ("-i", "--include-prefix", dest="include_prefix", metavar="PATH",
-                   default="qpid/management/",
-                   help="Prefix for #include of generated headers in generated source, default: qpid/management/")
 
 (opts, args) = parser.parse_args ()
 
@@ -42,9 +39,6 @@
 templatedir = args[2]
 outdir      = args[3]
 
-if opts.include_prefix == ".":
-  opts.include_prefix = None
-
 gen    = Generator     (outdir,   templatedir)
 schema = PackageSchema (typefile, schemafile, opts)
 

Modified: incubator/qpid/trunk/qpid/cpp/managementgen/schema.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/managementgen/schema.py?rev=676067&r1=676066&r2=676067&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/managementgen/schema.py (original)
+++ incubator/qpid/trunk/qpid/cpp/managementgen/schema.py Fri Jul 11 13:14:07 2008
@@ -890,8 +890,7 @@
   def genMethodArgIncludes (self, stream, variables):
     for method in self.methods:
       if method.getArgCount () > 0:
-        stream.write ("#include \"" + (self.options.include_prefix or "") +\
-                      "Args" + method.getFullName () + ".h\"\n")
+        stream.write ("#include \"Args" + method.getFullName () + ".h\"\n")
 
   def genMethodCount (self, stream, variables):
     stream.write ("%d" % len (self.methods))
@@ -1040,7 +1039,7 @@
 
   def genClassIncludes (self, stream, variables):
     for _class in self.classes:
-      stream.write ("#include \"qpid/management/")
+      stream.write ("#include \"")
       _class.genNameCap (stream, variables)
       stream.write (".h\"\n")
 

Modified: incubator/qpid/trunk/qpid/cpp/managementgen/templates/Package.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/managementgen/templates/Package.cpp?rev=676067&r1=676066&r2=676067&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/managementgen/templates/Package.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/managementgen/templates/Package.cpp Fri Jul 11 13:14:07 2008
@@ -20,7 +20,7 @@
 
 /*MGEN:Root.Disclaimer*/
 
-#include "qpid/management/Package/*MGEN:Schema.PackageNameCap*/.h"
+#include "Package/*MGEN:Schema.PackageNameCap*/.h"
 /*MGEN:Schema.ClassIncludes*/
 
 using namespace qpid::management;

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=676067&r1=676066&r2=676067&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Fri Jul 11 13:14:07 2008
@@ -221,6 +221,8 @@
   qpid/Plugin.cpp \
   qpid/StringUtils.cpp \
   qpid/Url.cpp \
+  qpid/management/Manageable.cpp \
+  qpid/management/ManagementObject.cpp \
   qpid/sys/AggregateOutput.cpp \
   qpid/sys/AsynchIOHandler.cpp \
   qpid/sys/Dispatcher.cpp \
@@ -304,10 +306,8 @@
   qpid/broker/TxBuffer.cpp \
   qpid/broker/TxPublish.cpp \
   qpid/broker/Vhost.cpp \
-  qpid/management/Manageable.cpp \
   qpid/management/ManagementBroker.cpp \
   qpid/management/ManagementExchange.cpp \
-  qpid/management/ManagementObject.cpp \
   qpid/sys/TCPIOPlugin.cpp
 
 if HAVE_XML
@@ -319,6 +319,7 @@
 
 libqpidclient_la_SOURCES =			\
   $(rgen_client_srcs)				\
+  qpid/agent/ManagementAgentImpl.cpp \
   qpid/client/AckPolicy.cpp			\
   qpid/client/Bounds.cpp			\
   qpid/client/ConnectionImpl.cpp		\
@@ -367,6 +368,7 @@
   qpid/memory.h \
   qpid/shared_ptr.h \
   qpid/agent/ManagementAgent.h \
+  qpid/agent/ManagementAgentImpl.h \
   qpid/broker/Broker.h \
   qpid/broker/SessionAdapter.h \
   qpid/broker/Exchange.h \

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp?rev=676067&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp Fri Jul 11 13:14:07 2008
@@ -0,0 +1,426 @@
+
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+#include "qpid/management/Manageable.h"
+#include "qpid/management/ManagementObject.h"
+#include "ManagementAgentImpl.h"
+#include <list>
+#include <unistd.h>
+
+using namespace qpid::client;
+using namespace qpid::framing;
+using namespace qpid::management;
+using namespace qpid::sys;
+using std::stringstream;
+using std::string;
+using std::cout;
+using std::endl;
+
+ManagementAgent* ManagementAgent::getAgent()
+{
+    //static ManagementAgent* agent = 0;
+
+    //if (agent == 0)
+    //    agent = new ManagementAgentImpl();
+    //return agent;
+    return 0;
+}
+
+ManagementAgentImpl::ManagementAgentImpl() :
+    clientWasAdded(true), objIdPrefix(0), bgThread(*this), thread(bgThread)
+{
+    // TODO: Establish system ID
+}
+
+void ManagementAgentImpl::init (std::string brokerHost,
+                                uint16_t    brokerPort,
+                                uint16_t    intervalSeconds,
+                                bool        useExternalThread)
+{
+    interval     = intervalSeconds;
+    extThread    = useExternalThread;
+    nextObjectId = 1;
+
+    sessionId.generate();
+    queueName << "qmfagent-" << sessionId;
+    string dest = "qmfagent";
+
+    connection.open(brokerHost.c_str(), brokerPort);
+    session = connection.newSession (queueName.str());
+    dispatcher = new client::Dispatcher(session);
+
+
+    session.queueDeclare (arg::queue=queueName.str());
+    session.exchangeBind (arg::exchange="amq.direct", arg::queue=queueName.str(),
+                          arg::bindingKey=queueName.str ());
+    session.messageSubscribe (arg::queue=queueName.str(),
+                              arg::destination=dest);
+    session.messageFlow (arg::destination=dest, arg::unit=0, arg::value=0xFFFFFFFF);
+    session.messageFlow (arg::destination=dest, arg::unit=1, arg::value=0xFFFFFFFF);
+
+    Message attachRequest;
+    char    rawbuffer[512];   // TODO: Modify Buffer so it can use stringstream
+    Buffer  buffer (rawbuffer, 512);
+
+    attachRequest.getDeliveryProperties().setRoutingKey("agent");
+    attachRequest.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str()));
+
+    EncodeHeader (buffer, 'A');
+    buffer.putShortString ("RemoteAgent [C++]");
+    buffer.putShortString (queueName.str());
+    systemId.encode  (buffer);
+    buffer.putLong (11);
+
+    size_t length = 512 - buffer.available ();
+    string stringBuffer (rawbuffer, length);
+    attachRequest.setData (stringBuffer);
+
+    session.messageTransfer (arg::content=attachRequest, arg::destination="qpid.management");
+
+    dispatcher->listen (dest, this);
+    dispatcher->start ();
+}
+
+ManagementAgentImpl::~ManagementAgentImpl ()
+{
+    dispatcher->stop ();
+    delete dispatcher;
+}
+
+void ManagementAgentImpl::RegisterClass (std::string packageName,
+                                         std::string className,
+                                         uint8_t*    md5Sum,
+                                         management::ManagementObject::writeSchemaCall_t schemaCall)
+{ 
+    Mutex::ScopedLock lock(agentLock);
+    PackageMap::iterator pIter = FindOrAddPackage (packageName);
+    AddClassLocal (pIter, className, md5Sum, schemaCall);
+}
+
+uint64_t ManagementAgentImpl::addObject (ManagementObject* object,
+                                         uint32_t          /*persistId*/,
+                                         uint32_t          /*persistBank*/)
+{
+    Mutex::ScopedLock lock(addLock);
+    uint64_t objectId;
+
+    // TODO: fix object-id handling
+    objectId = objIdPrefix | ((nextObjectId++) & 0x00FFFFFF);
+    object->setObjectId (objectId);
+    newManagementObjects[objectId] = object;
+    return objectId;
+}
+
+uint32_t ManagementAgentImpl::pollCallbacks (uint32_t /*callLimit*/)
+{
+    return 0;
+}
+
+int ManagementAgentImpl::getSignalFd (void)
+{
+    return -1;
+}
+
+void ManagementAgentImpl::handleAttachResponse (Buffer& inBuffer)
+{
+    Mutex::ScopedLock lock(agentLock);
+    uint32_t assigned;
+
+    assigned = inBuffer.getLong();
+    objIdPrefix = ((uint64_t) assigned) << 24;
+
+    // Send package indications for all local packages
+    for (PackageMap::iterator pIter = packages.begin();
+         pIter != packages.end();
+         pIter++) {
+        Buffer   outBuffer(outputBuffer, MA_BUFFER_SIZE);
+        uint32_t outLen;
+
+        EncodeHeader(outBuffer, 'p');
+        EncodePackageIndication(outBuffer, pIter);
+        outLen = MA_BUFFER_SIZE - outBuffer.available ();
+        outBuffer.reset();
+        SendBuffer(outBuffer, outLen, "qpid.management", "agent");
+
+        // Send class indications for all local classes
+        ClassMap cMap = pIter->second;
+        for (ClassMap::iterator cIter = cMap.begin(); cIter != cMap.end(); cIter++) {
+            outBuffer.reset();
+            EncodeHeader(outBuffer, 'q');
+            EncodeClassIndication(outBuffer, pIter, cIter);
+            outLen = MA_BUFFER_SIZE - outBuffer.available ();
+            outBuffer.reset();
+            SendBuffer(outBuffer, outLen, "qpid.management", "agent");
+        }
+    }
+}
+
+void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence)
+{
+    Mutex::ScopedLock lock(agentLock);
+    string packageName;
+    SchemaClassKey key;
+
+    inBuffer.getShortString(packageName);
+    inBuffer.getShortString(key.name);
+    inBuffer.getBin128(key.hash);
+
+    PackageMap::iterator pIter = packages.find(packageName);
+    if (pIter != packages.end()) {
+        ClassMap cMap = pIter->second;
+        ClassMap::iterator cIter = cMap.find(key);
+        if (cIter != cMap.end()) {
+            SchemaClass schema = cIter->second;
+             Buffer   outBuffer(outputBuffer, MA_BUFFER_SIZE);
+             uint32_t outLen;
+
+             EncodeHeader(outBuffer, 's', sequence);
+             schema.writeSchemaCall(outBuffer);
+             outLen = MA_BUFFER_SIZE - outBuffer.available ();
+             outBuffer.reset();
+             SendBuffer(outBuffer, outLen, "qpid.management", "agent");
+        }
+    }
+}
+
+void ManagementAgentImpl::handleConsoleAddedIndication()
+{
+    Mutex::ScopedLock lock(agentLock);
+    clientWasAdded = true;
+}
+
+void ManagementAgentImpl::received (Message& msg)
+{
+    string   data = msg.getData ();
+    Buffer   inBuffer (const_cast<char*>(data.c_str()), data.size());
+    uint8_t  opcode;
+    uint32_t sequence;
+
+    if (CheckHeader (inBuffer, &opcode, &sequence))
+    {
+        if      (opcode == 'a') handleAttachResponse(inBuffer);
+        else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence);
+        else if (opcode == 'x') handleConsoleAddedIndication();
+    }
+}
+
+void ManagementAgentImpl::EncodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq)
+{
+    buf.putOctet ('A');
+    buf.putOctet ('M');
+    buf.putOctet ('1');
+    buf.putOctet (opcode);
+    buf.putLong  (seq);
+}
+
+bool ManagementAgentImpl::CheckHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq)
+{
+    if (buf.getSize() < 8)
+        return false;
+
+    uint8_t h1 = buf.getOctet ();
+    uint8_t h2 = buf.getOctet ();
+    uint8_t h3 = buf.getOctet ();
+
+    *opcode = buf.getOctet ();
+    *seq    = buf.getLong ();
+
+    return h1 == 'A' && h2 == 'M' && h3 == '1';
+}
+
+void ManagementAgentImpl::SendBuffer (Buffer&  buf,
+                                      uint32_t length,
+                                      string   exchange,
+                                      string   routingKey)
+{
+    Message msg;
+    string  data;
+
+    if (objIdPrefix == 0)
+        return;
+
+    buf.getRawData(data, length);
+    msg.getDeliveryProperties().setRoutingKey(routingKey);
+    msg.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str()));
+    msg.setData (data);
+    session.messageTransfer (arg::content=msg, arg::destination=exchange);
+}
+
+ManagementAgentImpl::PackageMap::iterator ManagementAgentImpl::FindOrAddPackage (std::string name)
+{
+    PackageMap::iterator pIter = packages.find (name);
+    if (pIter != packages.end ())
+        return pIter;
+
+    // No such package found, create a new map entry.
+    std::pair<PackageMap::iterator, bool> result =
+        packages.insert (std::pair<string, ClassMap> (name, ClassMap ()));
+
+    // Publish a package-indication message
+    Buffer   outBuffer (outputBuffer, MA_BUFFER_SIZE);
+    uint32_t outLen;
+
+    EncodeHeader (outBuffer, 'p');
+    EncodePackageIndication (outBuffer, result.first);
+    outLen = MA_BUFFER_SIZE - outBuffer.available ();
+    outBuffer.reset ();
+    SendBuffer (outBuffer, outLen, "qpid.management", "mgmt.schema.package");
+
+    return result.first;
+}
+
+void ManagementAgentImpl::moveNewObjectsLH()
+{
+    Mutex::ScopedLock lock (addLock);
+    for (ManagementObjectMap::iterator iter = newManagementObjects.begin ();
+         iter != newManagementObjects.end ();
+         iter++)
+        managementObjects[iter->first] = iter->second;
+    newManagementObjects.clear();
+}
+
+void ManagementAgentImpl::AddClassLocal (PackageMap::iterator  pIter,
+                                         string                className,
+                                         uint8_t*              md5Sum,
+                                         management::ManagementObject::writeSchemaCall_t schemaCall)
+{
+    SchemaClassKey key;
+    ClassMap&      cMap = pIter->second;
+
+    key.name = className;
+    memcpy (&key.hash, md5Sum, 16);
+
+    ClassMap::iterator cIter = cMap.find (key);
+    if (cIter != cMap.end ())
+        return;
+
+    // No such class found, create a new class with local information.
+    SchemaClass classInfo;
+
+    classInfo.writeSchemaCall = schemaCall;
+    cMap[key] = classInfo;
+
+    // TODO: Publish a class-indication message
+}
+
+void ManagementAgentImpl::EncodePackageIndication (Buffer&              buf,
+                                                   PackageMap::iterator pIter)
+{
+    buf.putShortString ((*pIter).first);
+}
+
+void ManagementAgentImpl::EncodeClassIndication (Buffer&              buf,
+                                                 PackageMap::iterator pIter,
+                                                 ClassMap::iterator   cIter)
+{
+    SchemaClassKey key = (*cIter).first;
+
+    buf.putShortString ((*pIter).first);
+    buf.putShortString (key.name);
+    buf.putBin128      (key.hash);
+}
+
+void ManagementAgentImpl::PeriodicProcessing()
+{
+#define BUFSIZE   65536
+    Mutex::ScopedLock lock(agentLock);
+    char                msgChars[BUFSIZE];
+    uint32_t            contentSize;
+    string              routingKey;
+    std::list<uint64_t> deleteList;
+
+    {
+        Buffer msgBuffer(msgChars, BUFSIZE);
+        EncodeHeader(msgBuffer, 'h');
+        msgBuffer.putLongLong(uint64_t(Duration(now())));
+
+        contentSize = BUFSIZE - msgBuffer.available ();
+        msgBuffer.reset ();
+        routingKey = "mgmt." + systemId.str() + ".heartbeat";
+        SendBuffer (msgBuffer, contentSize, "qpid.management", routingKey);
+    }
+
+    moveNewObjectsLH();
+
+    if (clientWasAdded)
+    {
+        clientWasAdded = false;
+        for (ManagementObjectMap::iterator iter = managementObjects.begin ();
+             iter != managementObjects.end ();
+             iter++)
+        {
+            ManagementObject* object = iter->second;
+            object->setAllChanged ();
+        }
+    }
+
+    if (managementObjects.empty ())
+        return;
+        
+    for (ManagementObjectMap::iterator iter = managementObjects.begin ();
+         iter != managementObjects.end ();
+         iter++)
+    {
+        ManagementObject* object = iter->second;
+
+        if (object->getConfigChanged () || object->isDeleted ())
+        {
+            Buffer msgBuffer (msgChars, BUFSIZE);
+            EncodeHeader (msgBuffer, 'c');
+            object->writeProperties(msgBuffer);
+
+            contentSize = BUFSIZE - msgBuffer.available ();
+            msgBuffer.reset ();
+            routingKey = "mgmt." + systemId.str() + ".prop." + object->getClassName ();
+            SendBuffer (msgBuffer, contentSize, "qpid.management", routingKey);
+        }
+        
+        if (object->getInstChanged ())
+        {
+            Buffer msgBuffer (msgChars, BUFSIZE);
+            EncodeHeader (msgBuffer, 'i');
+            object->writeStatistics(msgBuffer);
+
+            contentSize = BUFSIZE - msgBuffer.available ();
+            msgBuffer.reset ();
+            routingKey = "mgmt." + systemId.str () + ".stat." + object->getClassName ();
+            SendBuffer (msgBuffer, contentSize, "qpid.management", routingKey);
+        }
+
+        if (object->isDeleted ())
+            deleteList.push_back (iter->first);
+    }
+
+    // Delete flagged objects
+    for (std::list<uint64_t>::reverse_iterator iter = deleteList.rbegin ();
+         iter != deleteList.rend ();
+         iter++)
+        managementObjects.erase (*iter);
+
+    deleteList.clear ();
+}
+
+void ManagementAgentImpl::BackgroundThread::run()
+{
+    while (true) {
+        ::sleep(5);
+        agent.PeriodicProcessing();
+    }
+}

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h?rev=676067&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h Fri Jul 11 13:14:07 2008
@@ -0,0 +1,157 @@
+#ifndef _qpid_agent_ManagementAgentImpl_
+#define _qpid_agent_ManagementAgentImpl_
+
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+#include "ManagementAgent.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/Dispatcher.h"
+#include "qpid/client/Session.h"
+#include "qpid/client/AsyncSession.h"
+#include "qpid/client/Message.h"
+#include "qpid/client/MessageListener.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/sys/Runnable.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/framing/Uuid.h"
+#include <iostream>
+#include <sstream>
+
+namespace qpid { 
+namespace management {
+
+class ManagementAgentImpl : public ManagementAgent, public client::MessageListener
+{
+  public:
+
+    ManagementAgentImpl ();
+    virtual ~ManagementAgentImpl ();
+
+    int getMaxThreads() { return 1; }
+    void init(std::string brokerHost        = "localhost",
+              uint16_t    brokerPort        = 5672,
+              uint16_t    intervalSeconds   = 10,
+              bool        useExternalThread = false);
+    void RegisterClass(std::string packageName,
+                       std::string className,
+                       uint8_t*    md5Sum,
+                       management::ManagementObject::writeSchemaCall_t schemaCall);
+    uint64_t addObject     (management::ManagementObject* objectPtr,
+                            uint32_t          persistId   = 0,
+                            uint32_t          persistBank = 4);
+    uint32_t pollCallbacks (uint32_t callLimit = 0);
+    int      getSignalFd   (void);
+
+    void PeriodicProcessing();
+
+  private:
+
+    struct SchemaClassKey
+    {
+        std::string name;
+        uint8_t     hash[16];
+    };
+
+    struct SchemaClassKeyComp
+    {
+        bool operator() (const SchemaClassKey& lhs, const SchemaClassKey& rhs) const
+        {
+            if (lhs.name != rhs.name)
+                return lhs.name < rhs.name;
+            else
+                for (int i = 0; i < 16; i++)
+                    if (lhs.hash[i] != rhs.hash[i])
+                        return lhs.hash[i] < rhs.hash[i];
+            return false;
+        }
+    };
+
+    struct SchemaClass
+    {
+        management::ManagementObject::writeSchemaCall_t writeSchemaCall;
+
+        SchemaClass () : writeSchemaCall(0) {}
+    };
+
+    typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap;
+    typedef std::map<std::string, ClassMap> PackageMap;
+
+    PackageMap                       packages;
+    management::ManagementObjectMap  managementObjects;
+    management::ManagementObjectMap  newManagementObjects;
+
+    void received (client::Message& msg);
+
+    uint16_t          interval;
+    bool              extThread;
+    uint64_t          nextObjectId;
+    sys::Mutex        agentLock;
+    sys::Mutex        addLock;
+    framing::Uuid     sessionId;
+    framing::Uuid     systemId;
+
+    int signalFdIn, signalFdOut;
+    client::Connection   connection;
+    client::Session      session;
+    client::Dispatcher*  dispatcher;
+    bool                 clientWasAdded;
+    uint64_t    objIdPrefix;
+    std::stringstream queueName;
+#   define MA_BUFFER_SIZE 65536
+    char outputBuffer[MA_BUFFER_SIZE];
+
+    class BackgroundThread : public sys::Runnable
+    {
+        ManagementAgentImpl& agent;
+        void run();
+    public:
+        BackgroundThread(ManagementAgentImpl& _agent) : agent(_agent) {}
+    };
+
+    BackgroundThread bgThread;
+    sys::Thread      thread;
+
+    PackageMap::iterator FindOrAddPackage (std::string name);
+    void moveNewObjectsLH();
+    void AddClassLocal (PackageMap::iterator  pIter,
+                        std::string           className,
+                        uint8_t*              md5Sum,
+                        management::ManagementObject::writeSchemaCall_t schemaCall);
+    void EncodePackageIndication (qpid::framing::Buffer& buf,
+                                  PackageMap::iterator   pIter);
+    void EncodeClassIndication (qpid::framing::Buffer& buf,
+                                PackageMap::iterator   pIter,
+                                ClassMap::iterator     cIter);
+    void EncodeHeader (qpid::framing::Buffer& buf, uint8_t  opcode, uint32_t  seq = 0);
+    bool CheckHeader  (qpid::framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
+    void SendBuffer         (qpid::framing::Buffer&  buf,
+                             uint32_t                length,
+                             std::string             exchange,
+                             std::string             routingKey);
+    void handleAttachResponse (qpid::framing::Buffer& inBuffer);
+    void handlePackageRequest (qpid::framing::Buffer& inBuffer);
+    void handleClassQuery     (qpid::framing::Buffer& inBuffer);
+    void handleSchemaRequest  (qpid::framing::Buffer& inBuffer, uint32_t sequence);
+    void handleConsoleAddedIndication();
+};
+
+}}
+
+#endif  /*!_qpid_agent_ManagementAgentImpl_*/

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp?rev=676067&r1=676066&r2=676067&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp Fri Jul 11 13:14:07 2008
@@ -55,6 +55,7 @@
     nextObjectId   = 1;
     bootSequence   = 1;
     nextRemoteBank = 10;
+    nextRequestSequence = 1;
     clientWasAdded = false;
 
     // Get from file or generate and save to file.
@@ -155,8 +156,8 @@
                                       ManagementObject::writeSchemaCall_t schemaCall)
 {
     Mutex::ScopedLock lock (userLock);
-    PackageMap::iterator pIter = FindOrAddPackage (packageName);
-    AddClassLocal (pIter, className, md5Sum, schemaCall);
+    PackageMap::iterator pIter = FindOrAddPackageLH(packageName);
+    AddClass(pIter, className, md5Sum, schemaCall);
 }
 
 uint64_t ManagementBroker::addObject (ManagementObject* object,
@@ -200,6 +201,17 @@
     Mutex::ScopedLock lock (userLock);
 
     clientWasAdded = true;
+    for (RemoteAgentMap::iterator aIter = remoteAgents.begin();
+         aIter != remoteAgents.end();
+         aIter++) {
+        Buffer   outBuffer (outputBuffer, MA_BUFFER_SIZE);
+        uint32_t outLen;
+
+        EncodeHeader (outBuffer, 'x');
+        outLen = MA_BUFFER_SIZE - outBuffer.available ();
+        outBuffer.reset ();
+        SendBuffer (outBuffer, outLen, dExchange, aIter->second->routingKey);
+    }
 }
 
 void ManagementBroker::EncodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq)
@@ -512,8 +524,12 @@
     sendCommandComplete (replyToKey, sequence);
 }
 
-void ManagementBroker::handlePackageIndLH (Buffer& /*inBuffer*/, string /*replyToKey*/, uint32_t /*sequence*/)
+void ManagementBroker::handlePackageIndLH (Buffer& inBuffer, string /*replyToKey*/, uint32_t /*sequence*/)
 {
+    std::string packageName;
+
+    inBuffer.getShortString(packageName);
+    FindOrAddPackageLH(packageName);
 }
 
 void ManagementBroker::handleClassQueryLH (Buffer& inBuffer, string replyToKey, uint32_t sequence)
@@ -529,7 +545,7 @@
              cIter != cMap.end ();
              cIter++)
         {
-            if (cIter->second.hasSchema ())
+            if (cIter->second->hasSchema ())
             {
                 Buffer   outBuffer (outputBuffer, MA_BUFFER_SIZE);
                 uint32_t outLen;
@@ -546,16 +562,46 @@
     sendCommandComplete (replyToKey, sequence);
 }
 
-void ManagementBroker::SchemaClass::appendSchema (Buffer& buf)
+void ManagementBroker::handleClassIndLH (Buffer& inBuffer, string replyToKey, uint32_t)
+{
+    std::string packageName;
+    SchemaClassKey key;
+
+    inBuffer.getShortString(packageName);
+    inBuffer.getShortString(key.name);
+    inBuffer.getBin128(key.hash);
+
+    PackageMap::iterator pIter = FindOrAddPackageLH(packageName);
+    ClassMap::iterator   cIter = pIter->second.find(key);
+    if (cIter == pIter->second.end()) {
+        Buffer   outBuffer (outputBuffer, MA_BUFFER_SIZE);
+        uint32_t outLen;
+        uint32_t sequence = nextRequestSequence++;
+
+        EncodeHeader (outBuffer, 'S', sequence);
+        outBuffer.putShortString(packageName);
+        outBuffer.putShortString(key.name);
+        outBuffer.putBin128(key.hash);
+        outLen = MA_BUFFER_SIZE - outBuffer.available ();
+        outBuffer.reset ();
+        SendBuffer (outBuffer, outLen, dExchange, replyToKey);
+
+        SchemaClass* newSchema = new SchemaClass;
+        newSchema->pendingSequence = sequence;
+        pIter->second[key] = newSchema;
+    }
+}
+
+void ManagementBroker::SchemaClass::appendSchema(Buffer& buf)
 {
     // If the management package is attached locally (embedded in the broker or
     // linked in via plug-in), call the schema handler directly.  If the package
     // is from a remote management agent, send the stored schema information.
 
     if (writeSchemaCall != 0)
-        writeSchemaCall (buf);
+        writeSchemaCall(buf);
     else
-        buf.putRawData (buffer, bufferLen);
+        buf.putRawData(buffer, bufferLen);
 }
 
 void ManagementBroker::handleSchemaRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence)
@@ -568,22 +614,19 @@
     inBuffer.getBin128      (key.hash);
 
     PackageMap::iterator pIter = packages.find (packageName);
-    if (pIter != packages.end ())
-    {
+    if (pIter != packages.end()) {
         ClassMap cMap = pIter->second;
         ClassMap::iterator cIter = cMap.find (key);
-        if (cIter != cMap.end ())
-        {
+        if (cIter != cMap.end()) {
             Buffer   outBuffer (outputBuffer, MA_BUFFER_SIZE);
             uint32_t outLen;
-            SchemaClass classInfo = cIter->second;
+            SchemaClass* classInfo = cIter->second;
 
-            if (classInfo.hasSchema())
-            {
-                EncodeHeader (outBuffer, 's', sequence);
-                classInfo.appendSchema (outBuffer);
+            if (classInfo->hasSchema()) {
+                EncodeHeader(outBuffer, 's', sequence);
+                classInfo->appendSchema (outBuffer);
                 outLen = MA_BUFFER_SIZE - outBuffer.available ();
-                outBuffer.reset ();
+                outBuffer.reset();
                 SendBuffer (outBuffer, outLen, dExchange, replyToKey);
             }
             else
@@ -596,6 +639,44 @@
         sendCommandComplete (replyToKey, sequence, 1, "Package not found");
 }
 
+void ManagementBroker::handleSchemaResponseLH (Buffer& inBuffer, string /*replyToKey*/, uint32_t sequence)
+{
+    string         packageName;
+    SchemaClassKey key;
+
+    inBuffer.record();
+    inBuffer.getShortString (packageName);
+    inBuffer.getShortString (key.name);
+    inBuffer.getBin128      (key.hash);
+    inBuffer.restore();
+
+    PackageMap::iterator pIter = packages.find(packageName);
+    if (pIter != packages.end()) {
+        ClassMap cMap = pIter->second;
+        ClassMap::iterator cIter = cMap.find(key);
+        if (cIter != cMap.end() && cIter->second->pendingSequence == sequence) {
+            size_t length = ValidateSchema(inBuffer);
+            if (length == 0)
+                cMap.erase(key);
+            else {
+                cIter->second->buffer    = (uint8_t*) malloc(length);
+                cIter->second->bufferLen = length;
+                inBuffer.getRawData(cIter->second->buffer, cIter->second->bufferLen);
+
+                // Publish a class-indication message
+                Buffer   outBuffer (outputBuffer, MA_BUFFER_SIZE);
+                uint32_t outLen;
+
+                EncodeHeader (outBuffer, 'q');
+                EncodeClassIndication (outBuffer, pIter, cIter);
+                outLen = MA_BUFFER_SIZE - outBuffer.available ();
+                outBuffer.reset ();
+                SendBuffer (outBuffer, outLen, mExchange, "mgmt." + uuid.str() + ".schema");
+            }
+        }
+    }
+}
+
 bool ManagementBroker::bankInUse (uint32_t bank)
 {
     for (RemoteAgentMap::iterator aIter = remoteAgents.begin();
@@ -628,16 +709,16 @@
     string   label;
     uint32_t requestedBank;
     uint32_t assignedBank;
-    Uuid     sessionId;
+    string   sessionName;
     Uuid     systemId;
 
     inBuffer.getShortString (label);
-    sessionId.decode (inBuffer);
+    inBuffer.getShortString (sessionName);
     systemId.decode  (inBuffer);
     requestedBank = inBuffer.getLong ();
     assignedBank  = assignBankLH (requestedBank);
 
-    RemoteAgentMap::iterator aIter = remoteAgents.find (sessionId);
+    RemoteAgentMap::iterator aIter = remoteAgents.find (sessionName);
     if (aIter != remoteAgents.end())
     {
         // There already exists an agent on this session.  Reject the request.
@@ -645,17 +726,21 @@
         return;
     }
 
+    // TODO: Reject requests for which the session name does not match an existing session.
+
     RemoteAgent* agent = new RemoteAgent;
     agent->objIdBank  = assignedBank;
+    agent->routingKey = replyToKey;
     agent->mgmtObject = new management::Agent (this, agent);
-    agent->mgmtObject->set_sessionId    (sessionId);
+    agent->mgmtObject->set_sessionName  (sessionName);
     agent->mgmtObject->set_label        (label);
     agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId());
     agent->mgmtObject->set_systemId     (systemId);
     addObject (agent->mgmtObject);
 
-    remoteAgents[sessionId] = agent;
+    remoteAgents[sessionName] = agent;
 
+    // Send an Attach Response
     Buffer   outBuffer (outputBuffer, MA_BUFFER_SIZE);
     uint32_t outLen;
 
@@ -734,16 +819,18 @@
     if (!CheckHeader (inBuffer, &opcode, &sequence))
         return;
 
-    if      (opcode == 'B') handleBrokerRequestLH (inBuffer, replyToKey, sequence);
-    else if (opcode == 'P') handlePackageQueryLH  (inBuffer, replyToKey, sequence);
-    else if (opcode == 'p') handlePackageIndLH    (inBuffer, replyToKey, sequence);
-    else if (opcode == 'Q') handleClassQueryLH    (inBuffer, replyToKey, sequence);
-    else if (opcode == 'S') handleSchemaRequestLH (inBuffer, replyToKey, sequence);
-  //else if (opcode == 'A') handleAttachRequestLH (inBuffer, replyToKey, sequence);
-    else if (opcode == 'G') handleGetQueryLH      (inBuffer, replyToKey, sequence);
+    if      (opcode == 'B') handleBrokerRequestLH  (inBuffer, replyToKey, sequence);
+    else if (opcode == 'P') handlePackageQueryLH   (inBuffer, replyToKey, sequence);
+    else if (opcode == 'p') handlePackageIndLH     (inBuffer, replyToKey, sequence);
+    else if (opcode == 'Q') handleClassQueryLH     (inBuffer, replyToKey, sequence);
+    else if (opcode == 'q') handleClassIndLH       (inBuffer, replyToKey, sequence);
+    else if (opcode == 'S') handleSchemaRequestLH  (inBuffer, replyToKey, sequence);
+    else if (opcode == 's') handleSchemaResponseLH (inBuffer, replyToKey, sequence);
+    else if (opcode == 'A') handleAttachRequestLH  (inBuffer, replyToKey, sequence);
+    else if (opcode == 'G') handleGetQueryLH       (inBuffer, replyToKey, sequence);
 }
 
-ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackage (std::string name)
+ManagementBroker::PackageMap::iterator ManagementBroker::FindOrAddPackageLH(std::string name)
 {
     PackageMap::iterator pIter = packages.find (name);
     if (pIter != packages.end ())
@@ -767,10 +854,10 @@
     return result.first;
 }
 
-void ManagementBroker::AddClassLocal (PackageMap::iterator  pIter,
-                                      string                className,
-                                      uint8_t*              md5Sum,
-                                      ManagementObject::writeSchemaCall_t schemaCall)
+void ManagementBroker::AddClass(PackageMap::iterator  pIter,
+                                string                className,
+                                uint8_t*              md5Sum,
+                                ManagementObject::writeSchemaCall_t schemaCall)
 {
     SchemaClassKey key;
     ClassMap&      cMap = pIter->second;
@@ -785,12 +872,11 @@
     // No such class found, create a new class with local information.
     QPID_LOG (debug, "ManagementBroker added class " << pIter->first << "." <<
               key.name);
-    SchemaClass classInfo;
+    SchemaClass* classInfo = new SchemaClass;
 
-    classInfo.writeSchemaCall = schemaCall;
+    classInfo->writeSchemaCall = schemaCall;
     cMap[key] = classInfo;
-
-    // TODO: Publish a class-indication message
+    cIter     = cMap.find (key);
 }
 
 void ManagementBroker::EncodePackageIndication (Buffer&              buf,
@@ -810,3 +896,42 @@
     buf.putBin128      (key.hash);
 }
 
+size_t ManagementBroker::ValidateSchema(Buffer& inBuffer)
+{
+    uint32_t start = inBuffer.getPosition();
+    uint32_t end;
+    string   text;
+    uint8_t  hash[16];
+
+    inBuffer.record();
+    inBuffer.getShortString(text);
+    inBuffer.getShortString(text);
+    inBuffer.getBin128(hash);
+
+    uint16_t propCount = inBuffer.getShort();
+    uint16_t statCount = inBuffer.getShort();
+    uint16_t methCount = inBuffer.getShort();
+    uint16_t evntCount = inBuffer.getShort();
+
+    for (uint16_t idx = 0; idx < propCount + statCount; idx++) {
+        FieldTable ft;
+        ft.decode(inBuffer);
+    }
+
+    for (uint16_t idx = 0; idx < methCount; idx++) {
+        FieldTable ft;
+        ft.decode(inBuffer);
+        int argCount = ft.getInt("argCount");
+        for (int mIdx = 0; mIdx < argCount; mIdx++) {
+            FieldTable aft;
+            aft.decode(inBuffer);
+        }
+    }
+
+    if (evntCount != 0)
+        return 0;
+
+    end = inBuffer.getPosition();
+    inBuffer.restore(); // restore original position
+    return end - start;
+}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h?rev=676067&r1=676066&r2=676067&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h Fri Jul 11 13:14:07 2008
@@ -89,6 +89,7 @@
     struct RemoteAgent : public Manageable
     {
         uint32_t          objIdBank;
+        std::string       routingKey;
         Agent*            mgmtObject;
         ManagementObject* GetManagementObject (void) const { return mgmtObject; }
         virtual ~RemoteAgent ();
@@ -97,8 +98,8 @@
     // TODO: Eventually replace string with entire reply-to structure.  reply-to
     //       currently assumes that the exchange is "amq.direct" even though it could
     //       in theory be specified differently.
-    typedef std::map<framing::Uuid, RemoteAgent*> RemoteAgentMap;
-    typedef std::vector<std::string>              ReplyToVector;
+    typedef std::map<std::string, RemoteAgent*> RemoteAgentMap;
+    typedef std::vector<std::string>            ReplyToVector;
 
     //  Storage for known schema classes:
     //
@@ -129,16 +130,16 @@
     struct SchemaClass
     {
         ManagementObject::writeSchemaCall_t writeSchemaCall;
-        ReplyToVector                       remoteAgents;
-        size_t                              bufferLen;
-        uint8_t*                            buffer;
+        uint32_t pendingSequence;
+        size_t   bufferLen;
+        uint8_t* buffer;
 
-        SchemaClass () : writeSchemaCall(0), bufferLen(0), buffer(0) {}
+        SchemaClass () : writeSchemaCall(0), pendingSequence(0), bufferLen(0), buffer(0) {}
         bool hasSchema () { return (writeSchemaCall != 0) || (buffer != 0); }
         void appendSchema (framing::Buffer& buf);
     };
 
-    typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap;
+    typedef std::map<SchemaClassKey, SchemaClass*, SchemaClassKeyComp> ClassMap;
     typedef std::map<std::string, ClassMap> PackageMap;
 
     RemoteAgentMap               remoteAgents;
@@ -162,6 +163,7 @@
     uint32_t                     localBank;
     uint32_t                     nextObjectId;
     uint32_t                     nextRemoteBank;
+    uint32_t                     nextRequestSequence;
     bool                         clientWasAdded;
 
 #   define MA_BUFFER_SIZE 65536
@@ -183,11 +185,11 @@
                            size_t             first);
     void dispatchAgentCommandLH (broker::Message& msg);
 
-    PackageMap::iterator FindOrAddPackage (std::string name);
-    void AddClassLocal (PackageMap::iterator         pIter,
-                        std::string                  className,
-                        uint8_t*                     md5Sum,
-                        ManagementObject::writeSchemaCall_t schemaCall);
+    PackageMap::iterator FindOrAddPackageLH(std::string name);
+    void AddClass(PackageMap::iterator         pIter,
+                  std::string                  className,
+                  uint8_t*                     md5Sum,
+                  ManagementObject::writeSchemaCall_t schemaCall);
     void EncodePackageIndication (framing::Buffer&     buf,
                                   PackageMap::iterator pIter);
     void EncodeClassIndication (framing::Buffer&     buf,
@@ -198,13 +200,17 @@
     uint32_t assignBankLH (uint32_t requestedPrefix);
     void sendCommandComplete (std::string replyToKey, uint32_t sequence,
                               uint32_t code = 0, std::string text = std::string("OK"));
-    void handleBrokerRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
-    void handlePackageQueryLH  (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
-    void handlePackageIndLH    (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
-    void handleClassQueryLH    (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
-    void handleSchemaRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
-    void handleAttachRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
-    void handleGetQueryLH      (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+    void handleBrokerRequestLH  (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+    void handlePackageQueryLH   (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+    void handlePackageIndLH     (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+    void handleClassQueryLH     (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+    void handleClassIndLH       (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+    void handleSchemaRequestLH  (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+    void handleSchemaResponseLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+    void handleAttachRequestLH  (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+    void handleGetQueryLH       (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+
+    size_t ValidateSchema(framing::Buffer&);
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/python/qpid/managementdata.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/managementdata.py?rev=676067&r1=676066&r2=676067&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/managementdata.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/managementdata.py Fri Jul 11 13:14:07 2008
@@ -71,24 +71,22 @@
   #
 
   def registerObjId (self, objId):
-    boot = objId & 0x7FFF000000000000L
-    if boot == 0:
-      return
-    self.bootSequence = boot
+    if not objId in self.idBackMap:
+      self.idBackMap[objId]   = self.nextId
+      self.idMap[self.nextId] = objId
+      self.nextId += 1
 
   def displayObjId (self, objId):
-    bank = (objId & 0x0000FFFFFF000000L) >> 24
-    id   =  objId & 0x0000000000FFFFFFL
-    return bank * 10000 + id
+    if objId in self.idBackMap:
+      return self.idBackMap[objId]
+    else:
+      return 0
 
   def rawObjId (self, displayId):
-    bank  = displayId / 10000
-    id    = displayId % 10000
-    if bank < 5:
-      objId = (bank << 24) + id
+    if displayId in self.idMap:
+      return self.idMap[displayId]
     else:
-      objId = self.bootSequence + (bank << 24) + id
-    return objId
+      return 0
 
   def displayClassName (self, cls):
     (packageName, className, hash) = cls
@@ -201,6 +199,9 @@
     self.mclient.schemaListener (self.schemaHandler)
     self.mch = self.mclient.addChannel (self.conn.session(self.sessionId))
     self.operational = True
+    self.idMap = {}
+    self.idBackMap = {}
+    self.nextId = 101
 
   def close (self):
     pass

Modified: incubator/qpid/trunk/qpid/specs/management-schema.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/specs/management-schema.xml?rev=676067&r1=676066&r2=676067&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/specs/management-schema.xml (original)
+++ incubator/qpid/trunk/qpid/specs/management-schema.xml Fri Jul 11 13:14:07 2008
@@ -102,7 +102,7 @@
   ===============================================================
   -->
   <class name="Agent">
-    <property name="sessionId"    type="uuid"  access="RO" index="y" desc="Session ID for Agent"/>
+    <property name="sessionName"  type="sstr"  access="RO" index="y" desc="Session ID for Agent"/>
     <property name="label"        type="sstr"  access="RO"           desc="Label for agent"/>
     <property name="registeredTo" type="objId" references="Broker" access="RO" desc="Broker agent is registered to"/>
     <property name="systemId"     type="uuid"  access="RO"           desc="Identifier of system where agent resides"/>