You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/12/10 21:22:29 UTC

svn commit: r603034 - in /incubator/qpid/trunk/qpid: cpp/managementgen/templates/ cpp/src/qpid/broker/ cpp/src/qpid/management/ python/mgmt-cli/ python/qpid/ specs/

Author: aconway
Date: Mon Dec 10 12:22:23 2007
New Revision: 603034

URL: http://svn.apache.org/viewvc?rev=603034&view=rev
Log:
Patches from Ted Ross <tr...@redhat.com>

QPID-697

Fixed access-rights constants for management schema.
Added mutex to fix problems associated with concurrent invocation of accessors for queue statistics.
Removed queue schema content that is not relevant to QPID. 

QPID-698

This patch creates a new subdirectory in python called "mgmt-cli".
python/mgmt-cli/main.py can be executed from the shell. If no arguments are supplied, it attempts to connect to the broker at localhost:5672. The first argument is the hostname for the target broker and the second (optional) argument is the TCP port (defaults to 5672).
It is assumed that the AMQP spec file is in the following location:
        /usr/share/amqp/amqp.0-10-preview.xml
It is also required that the qpid/python directory be in the PYTHONPATH environment variable.

Added:
    incubator/qpid/trunk/qpid/python/mgmt-cli/
    incubator/qpid/trunk/qpid/python/mgmt-cli/disp.py   (with props)
    incubator/qpid/trunk/qpid/python/mgmt-cli/main.py   (with props)
    incubator/qpid/trunk/qpid/python/mgmt-cli/managementdata.py   (with props)
    incubator/qpid/trunk/qpid/python/qpid/management.py.rej
Modified:
    incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h
    incubator/qpid/trunk/qpid/python/qpid/management.py
    incubator/qpid/trunk/qpid/specs/management-schema.xml

Modified: incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.h?rev=603034&r1=603033&r2=603034&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.h (original)
+++ incubator/qpid/trunk/qpid/cpp/managementgen/templates/Class.h Mon Dec 10 12:22:23 2007
@@ -22,6 +22,7 @@
 
 /*MGEN:Root.Disclaimer*/
 
+#include "qpid/sys/Mutex.h"
 #include "qpid/management/ManagementObject.h"
 
 namespace qpid { 
@@ -52,6 +53,7 @@
   public:
 
     typedef boost::shared_ptr</*MGEN:Class.NameCap*/> shared_ptr;
+    qpid::sys::Mutex accessorLock;
 
     /*MGEN:Class.NameCap*/ (Manageable* coreObject, Manageable* parentObject,
         /*MGEN:Class.ConstructorArgs*/);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=603034&r1=603033&r2=603034&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Mon Dec 10 12:22:23 2007
@@ -97,6 +97,7 @@
             push(msg);
             msg->enqueueComplete();
             if (mgmtObject.get() != 0) {
+                Mutex::ScopedLock alock(mgmtObject->accessorLock);
                 mgmtObject->inc_msgTotalEnqueues ();
                 mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
                 mgmtObject->inc_msgDepth ();
@@ -104,6 +105,7 @@
             }
         }else {
             if (mgmtObject.get() != 0) {
+                Mutex::ScopedLock alock(mgmtObject->accessorLock);
                 mgmtObject->inc_msgTotalEnqueues ();
                 mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
                 mgmtObject->inc_msgDepth ();
@@ -122,6 +124,7 @@
     push(msg);
     msg->enqueueComplete(); // mark the message as enqueued
     if (mgmtObject.get() != 0) {
+        Mutex::ScopedLock alock(mgmtObject->accessorLock);
         mgmtObject->inc_msgTotalEnqueues ();
         mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
         mgmtObject->inc_msgPersistEnqueues ();
@@ -140,6 +143,7 @@
 void Queue::process(intrusive_ptr<Message>& msg){
     push(msg);
     if (mgmtObject.get() != 0) {
+        Mutex::ScopedLock alock(mgmtObject->accessorLock);
         mgmtObject->inc_msgTotalEnqueues ();
         mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
         mgmtObject->inc_msgTxnEnqueues ();
@@ -323,6 +327,7 @@
     consumerCount++;
 
     if (mgmtObject.get() != 0){
+        Mutex::ScopedLock alock(mgmtObject->accessorLock);
         mgmtObject->inc_consumers ();
     }
 }
@@ -333,6 +338,7 @@
     consumerCount--;
     if(exclusive) exclusive = false;
     if (mgmtObject.get() != 0){
+        Mutex::ScopedLock alock(mgmtObject->accessorLock);
         mgmtObject->dec_consumers ();
     }
 }
@@ -363,6 +369,7 @@
 
     if (policy.get()) policy->dequeued(msg.payload->contentSize());
     if (mgmtObject.get() != 0){
+        Mutex::ScopedLock alock(mgmtObject->accessorLock);
         mgmtObject->inc_msgTotalDequeues  ();
         mgmtObject->inc_byteTotalDequeues (msg.payload->contentSize());
         mgmtObject->dec_msgDepth ();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h?rev=603034&r1=603033&r2=603034&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementObject.h Mon Dec 10 12:22:23 2007
@@ -54,8 +54,8 @@
     static const uint8_t TYPE_LSTR = 7;
 
     static const uint8_t ACCESS_RC = 1;
-    static const uint8_t ACCESS_RW = 1;
-    static const uint8_t ACCESS_RO = 1;
+    static const uint8_t ACCESS_RW = 2;
+    static const uint8_t ACCESS_RO = 3;
 
     static const uint8_t DIR_I     = 1;
     static const uint8_t DIR_O     = 2;

Added: incubator/qpid/trunk/qpid/python/mgmt-cli/disp.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/mgmt-cli/disp.py?rev=603034&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/python/mgmt-cli/disp.py (added)
+++ incubator/qpid/trunk/qpid/python/mgmt-cli/disp.py Mon Dec 10 12:22:23 2007
@@ -0,0 +1,77 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from time import strftime, gmtime
+
+class Display:
+  """ Display formatting for QPID Management CLI """
+  
+  def __init__ (self):
+    self.tableSpacing    = 2
+    self.tablePrefix     = "    "
+    self.timestampFormat = "%X"
+
+  def table (self, title, heads, rows):
+    """ Print a formatted table with autosized columns """
+    print title
+    if len (rows) == 0:
+      return
+    colWidth = []
+    col      = 0
+    line     = self.tablePrefix
+    for head in heads:
+      width = len (head)
+      for row in rows:
+        cellWidth = len (str (row[col]))
+        if cellWidth > width:
+          width = cellWidth
+      colWidth.append (width + self.tableSpacing)
+      line = line + head
+      for i in range (colWidth[col] - len (head)):
+        line = line + " "
+      col = col + 1
+    print line
+    line = self.tablePrefix
+    for width in colWidth:
+      for i in range (width):
+        line = line + "="
+    print line
+
+    for row in rows:
+      line = self.tablePrefix
+      col  = 0
+      for width in colWidth:
+        line = line + str (row[col])
+        for i in range (width - len (str (row[col]))):
+          line = line + " "
+        col = col + 1
+      print line
+
+  def do_setTimeFormat (self, fmt):
+    """ Select timestamp format """
+    if fmt == "long":
+      self.timestampFormat = "%c"
+    elif fmt == "short":
+      self.timestampFormat = "%X"
+
+  def timestamp (self, nsec):
+    """ Format a nanosecond-since-the-epoch timestamp for printing """
+    return strftime (self.timestampFormat, gmtime (nsec / 1000000000))

Propchange: incubator/qpid/trunk/qpid/python/mgmt-cli/disp.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/python/mgmt-cli/main.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/mgmt-cli/main.py?rev=603034&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/python/mgmt-cli/main.py (added)
+++ incubator/qpid/trunk/qpid/python/mgmt-cli/main.py Mon Dec 10 12:22:23 2007
@@ -0,0 +1,164 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os
+import getopt
+import sys
+import socket
+from cmd            import Cmd
+from managementdata import ManagementData
+from shlex          import split
+from disp           import Display
+from qpid.peer      import Closed
+
+class Mcli (Cmd):
+  """ Management Command Interpreter """
+  prompt = "qpid: "
+
+  def __init__ (self, dataObject, dispObject):
+    Cmd.__init__ (self)
+    self.dataObject = dataObject
+    self.dispObject = dispObject
+    
+  def emptyline (self):
+    pass
+
+  def do_help (self, data):
+    print "Management Tool for QPID"
+    print
+    print "Commands:"
+    print "    list                            - Print summary of existing objects by class"
+    print "    list <className>                - Print list of objects of the specified class"
+    print "    list <className> all            - Print contents of all objects of specified class"
+    print "    list <className> <list-of-IDs>  - Print contents of one or more objects"
+    print "        list is space-separated, ranges may be specified (i.e. 1004-1010)"
+    print "    call <ID> <methodName> [<args>] - Invoke a method on an object"
+    print "    schema                          - Print summary of object classes seen on the target"
+    print "    schema [className]              - Print details of an object class"
+    print "    set time-format short           - Select short timestamp format (default)"
+    print "    set time-format long            - Select long timestamp format"
+    print "    quit or ^D                      - Exit the program"
+    print
+
+  def complete_set (self, text, line, begidx, endidx):
+    """ Command completion for the 'set' command """
+    tokens = split (line)
+    if len (tokens) < 2:
+      return ["time-format "]
+    elif tokens[1] == "time-format":
+      if len (tokens) == 2:
+        return ["long", "short"]
+      elif len (tokens) == 3:
+        if "long".find (text) == 0:
+          return ["long"]
+        elif "short".find (text) == 0:
+          return ["short"]
+    elif "time-format".find (text) == 0:
+      return ["time-format "]
+    return []
+
+  def do_set (self, data):
+    tokens = split (data)
+    try:
+      if tokens[0] == "time-format":
+        self.dispObject.do_setTimeFormat (tokens[1])
+    except:
+      pass
+
+  def complete_schema (self, text, line, begidx, endidx):
+    tokens = split (line)
+    if len (tokens) > 2:
+      return []
+    return self.dataObject.classCompletions (text)
+
+  def do_schema (self, data):
+    self.dataObject.do_schema (data)
+
+  def complete_list (self, text, line, begidx, endidx):
+    tokens = split (line)
+    if len (tokens) > 2:
+      return []
+    return self.dataObject.classCompletions (text)
+
+  def do_list (self, data):
+    self.dataObject.do_list (data)
+
+  def do_call (self, data):
+    self.dataObject.do_call (data)
+
+  def do_EOF (self, data):
+    print "quit"
+    return True
+
+  def do_quit (self, data):
+    return True
+
+  def postcmd (self, stop, line):
+    return stop
+
+  def postloop (self):
+    print "Exiting..."
+    self.dataObject.close ()
+
+def Usage ():
+  print sys.argv[0], "[<target-host> [<tcp-port>]]"
+  print
+  sys.exit (1)
+
+#=========================================================
+# Main Program
+#=========================================================
+
+# Get host name and port if specified on the command line
+try:
+  (optlist, cargs) = getopt.getopt (sys.argv[1:], 's:')
+except:
+  Usage ()
+
+specpath = "/usr/share/amqp/amqp.0-10-preview.xml"
+host     = "localhost"
+port     = 5672
+
+if "s" in optlist:
+  specpath = optlist["s"]
+
+if len (cargs) > 0:
+  host = cargs[0]
+
+if len (cargs) > 1:
+  port = int (cargs[1])
+
+print ("Management Tool for QPID")
+disp = Display ()
+
+# Attempt to make a connection to the target broker
+try:
+  data = ManagementData (disp, host, port, spec=specpath)
+except socket.error, e:
+  sys.exit (0)
+except Closed, e:
+  if str(e).find ("Exchange not found") != -1:
+    print "Management not enabled on broker:  Use '-m yes' option on broker startup."
+  sys.exit (0)
+
+# Instantiate the CLI interpreter and launch it.
+cli = Mcli (data, disp)
+cli.cmdloop ()

Propchange: incubator/qpid/trunk/qpid/python/mgmt-cli/main.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/python/mgmt-cli/managementdata.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/mgmt-cli/managementdata.py?rev=603034&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/python/mgmt-cli/managementdata.py (added)
+++ incubator/qpid/trunk/qpid/python/mgmt-cli/managementdata.py Mon Dec 10 12:22:23 2007
@@ -0,0 +1,417 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpid.management import ManagedBroker
+from threading       import Lock
+from disp            import Display
+from shlex           import split
+
+class ManagementData:
+
+  #
+  # Data Structure:
+  #
+  # Please note that this data structure holds only the most recent
+  # configuration and instrumentation data for each object.  It does
+  # not hold the detailed historical data that is sent from the broker.
+  # The only historical data it keeps are the high and low watermarks
+  # for hi-lo statistics.
+  #
+  #    tables        :== {<class-name>}
+  #                        {<obj-id>}
+  #                          (timestamp, config-record, inst-record)
+  #    timestamp     :== (<last-interval-time>, <create-time>, <delete-time>)
+  #    config-record :== [element]
+  #    inst-record   :== [element]
+  #    element       :== (<element-name>, <element-value>)
+  #
+
+  def dataHandler (self, context, className, list, timestamps):
+    """ Callback for configuration and instrumentation data updates """
+    self.lock.acquire ()
+    try:
+      # If this class has not been seen before, create an empty dictionary to
+      # hold objects of this class
+      if className not in self.tables:
+        self.tables[className] = {}
+
+      # Calculate a base-id so displayed IDs are reasonable 4-digit numbers
+      id = long (list[0][1])
+      if self.baseId == 0:
+        self.baseId = id - 1000
+
+      # If this object hasn't been seen before, create a new object record with
+      # the timestamps and empty lists for configuration and instrumentation data.
+      if id not in self.tables[className]:
+        self.tables[className][id] = (timestamps, [], [])
+
+      (unused, oldConf, oldInst) = self.tables[className][id]
+
+      # For config updates, simply replace old config list with the new one.
+      if   context == 0: #config
+        self.tables[className][id] = (timestamps, list, oldInst)
+
+      # For instrumentation updates, carry the minimum and maximum values for
+      # "hi-lo" stats forward.
+      elif context == 1: #inst
+        if len (oldInst) == 0:
+          newInst = list
+        else:
+          newInst = []
+          for idx in range (len (list)):
+            (key, value) = list[idx]
+            if key.find ("High") == len (key) - 4:
+              if oldInst[idx][1] > value:
+                value = oldInst[idx][1]
+            if key.find ("Low") == len (key) - 3:
+              if oldInst[idx][1] < value:
+                value = oldInst[idx][1]
+            newInst.append ((key, value))
+        self.tables[className][id] = (timestamps, oldConf, newInst)
+      
+    finally:
+      self.lock.release ()
+
+  def methodReply (self, broker, methodId, status, sText, args):
+    """ Callback for method-reply messages """
+    pass
+
+  def schemaHandler (self, context, className, configs, insts, methods, events):
+    """ Callback for schema updates """
+    if className not in self.schema:
+      self.schema[className] = (configs, insts, methods, events)
+
+  def __init__ (self, disp, host, port=5672, username="guest", password="guest",
+                spec="../../specs/amqp.0-10-preview.xml"):
+    self.broker = ManagedBroker (host, port, username, password, spec)
+    self.broker.configListener          (0,    self.dataHandler)
+    self.broker.instrumentationListener (1,    self.dataHandler)
+    self.broker.methodListener          (None, self.methodReply)
+    self.broker.schemaListener          (None, self.schemaHandler)
+    self.lock   = Lock ()
+    self.tables = {}
+    self.schema = {}
+    self.baseId = 0
+    self.disp   = disp
+    self.broker.start ()
+
+  def close (self):
+    self.broker.stop ()
+
+  def getObjIndex (self, className, config):
+    """ Concatenate the values from index columns to form a unique object name """
+    result = ""
+    schemaConfig = self.schema[className][0]
+    for item in schemaConfig:
+      if item[5] == 1 and item[0] != "id":
+        if result != "":
+          result = result + "."
+        for key,val in config:
+          if key == item[0]:
+            if key.find ("Ref") != -1:
+              val = val - self.baseId
+            result = result + str (val)
+    return result
+
+  def classCompletions (self, prefix):
+    """ Provide a list of candidate class names for command completion """
+    self.lock.acquire ()
+    complist = []
+    try:
+      for name in self.tables:
+        if name.find (prefix) == 0:
+          complist.append (name)
+    finally:
+      self.lock.release ()
+    return complist
+
+  def typeName (self, typecode):
+    """ Convert type-codes to printable strings """
+    if   typecode == 1:
+      return "uint8"
+    elif typecode == 2:
+      return "uint16"
+    elif typecode == 3:
+      return "uint32"
+    elif typecode == 4:
+      return "uint64"
+    elif typecode == 5:
+      return "bool"
+    elif typecode == 6:
+      return "short-string"
+    elif typecode == 7:
+      return "long-string"
+    else:
+      raise ValueError ("Invalid type code: %d" % typecode)
+
+  def accessName (self, code):
+    """ Convert element access codes to printable strings """
+    if code == 1:
+      return "ReadCreate"
+    elif code == 2:
+      return "ReadWrite"
+    elif code == 3:
+      return "ReadOnly"
+    else:
+      raise ValueErrir ("Invalid access code: %d" %code)
+
+  def notNone (self, text):
+    if text == None:
+      return ""
+    else:
+      return text
+
+  def listOfIds (self, className, tokens):
+    """ Generate a tuple of object ids for a classname based on command tokens. """
+    list = []
+    if tokens[0] == "all":
+      for id in self.tables[className]:
+        list.append (id - self.baseId)
+
+    else:
+      for token in tokens:
+        if token.find ("-") != -1:
+          ids = token.split("-", 2)
+          for id in range (int (ids[0]), int (ids[1]) + 1):
+            if self.getClassForId (long (id) + self.baseId) == className:
+              list.append (id)
+        else:
+          list.append (token)
+
+    list.sort ()
+    result = ()
+    for item in list:
+      result = result + (item,)
+    return result
+
+  def listClasses (self):
+    """ Generate a display of the list of classes """
+    self.lock.acquire ()
+    try:
+      rows = []
+      sorted = self.tables.keys ()
+      sorted.sort ()
+      for name in sorted:
+        active  = 0
+        deleted = 0
+        for record in self.tables[name]:
+          isdel = False
+          ts    = self.tables[name][record][0]
+          if ts[2] > 0:
+            isdel = True
+          if isdel:
+            deleted = deleted + 1
+          else:
+            active = active + 1
+        rows.append ((name, active, deleted))
+      self.disp.table ("Management Object Types:",
+                       ("ObjectType", "Active", "Deleted"), rows)
+    finally:
+      self.lock.release ()
+
+  def listObjects (self, className):
+    """ Generate a display of a list of objects in a class """
+    self.lock.acquire ()
+    try:
+      if className not in self.tables:
+        print ("Object type %s not known" % className)
+      else:
+        rows = []
+        sorted = self.tables[className].keys ()
+        sorted.sort ()
+        for objId in sorted:
+          (ts, config, inst) = self.tables[className][objId]
+          createTime  = self.disp.timestamp (ts[1])
+          destroyTime = "-"
+          if ts[2] > 0:
+            destroyTime = self.disp.timestamp (ts[2])
+          objIndex = self.getObjIndex (className, config)
+          row = (objId - self.baseId, createTime, destroyTime, objIndex)
+          rows.append (row)
+        self.disp.table ("Objects of type %s" % className,
+                         ("ID", "Created", "Destroyed", "Index"),
+                         rows)
+    finally:
+      self.lock.release ()
+
+  def showObjects (self, tokens):
+    """ Generate a display of object data for a particular class """
+    self.lock.acquire ()
+    try:
+      className = tokens[0]
+      if className not in self.tables:
+        print "Class not known: %s" % className
+        raise ValueError ()
+        
+      userIds = self.listOfIds (className, tokens[1:])
+      if len (userIds) == 0:
+        print "No object IDs supplied"
+        raise ValueError ()
+
+      ids = []
+      for id in userIds:
+        if self.getClassForId (long (id) + self.baseId) == className:
+          ids.append (long (id) + self.baseId)
+
+      rows = []
+      config = self.tables[className][ids[0]][1]
+      for eIdx in range (len (config)):
+        key = config[eIdx][0]
+        if key != "id":
+          isRef = key.find ("Ref") == len (key) - 3
+          row   = ("config", key)
+          for id in ids:
+            value = self.tables[className][id][1][eIdx][1]
+            if isRef:
+              value = value - self.baseId
+            row = row + (value,)
+          rows.append (row)
+
+      inst = self.tables[className][ids[0]][2]
+      for eIdx in range (len (inst)):
+        key = inst[eIdx][0]
+        if key != "id":
+          isRef = key.find ("Ref") == len (key) - 3
+          row = ("inst", key)
+          for id in ids:
+            value = self.tables[className][id][2][eIdx][1]
+            if isRef:
+              value = value - self.baseId
+            row = row + (value,)
+          rows.append (row)
+
+      titleRow = ("Type", "Element")
+      for id in ids:
+        titleRow = titleRow + (str (id - self.baseId),)
+      self.disp.table ("Object of type %s:" % className, titleRow, rows)
+
+    except:
+      pass
+    self.lock.release ()
+
+  def schemaSummary (self):
+    """ Generate a display of the list of classes in the schema """
+    self.lock.acquire ()
+    try:
+      rows = []
+      sorted = self.schema.keys ()
+      sorted.sort ()
+      for className in sorted:
+        tuple = self.schema[className]
+        row = (className, len (tuple[0]), len (tuple[1]), len (tuple[2]), len (tuple[3]))
+        rows.append (row)
+      self.disp.table ("Classes in Schema:",
+                       ("Class", "ConfigElements", "InstElements", "Methods", "Events"),
+                       rows)
+    finally:
+      self.lock.release ()
+
+  def schemaTable (self, className):
+    """ Generate a display of details of the schema of a particular class """
+    self.lock.acquire ()
+    try:
+      if className not in self.schema:
+        print ("Class name %s not known" % className)
+        raise ValueError ()
+
+      rows = []
+      for config in self.schema[className][0]:
+        name     = config[0]
+        if name != "id":
+          typename = self.typeName(config[1])
+          unit     = self.notNone (config[2])
+          desc     = self.notNone (config[3])
+          access   = self.accessName (config[4])
+          extra    = ""
+          if config[5] == 1:
+            extra = extra + "index "
+          if config[6] != None:
+            extra = extra + "Min: " + str (config[6])
+          if config[7] != None:
+            extra = extra + "Max: " + str (config[7])
+          if config[8] != None:
+            extra = extra + "MaxLen: " + str (config[8])
+          rows.append ((name, typename, unit, access, extra, desc))
+        
+      for config in self.schema[className][1]:
+        name     = config[0]
+        if name != "id":
+          typename = self.typeName(config[1])
+          unit     = self.notNone (config[2])
+          desc     = self.notNone (config[3])
+          rows.append ((name, typename, unit, "", "", desc))
+
+      titles = ("Element", "Type", "Unit", "Access", "Notes", "Description")
+      self.disp.table ("Schema for class '%s':" % className, titles, rows)
+
+      for method in self.schema[className][2]:
+        mname = method[0]
+        mdesc = method[1]
+        args  = method[2]
+        caption = "\nMethod '%s' %s" % (mname, self.notNone (mdesc))
+        rows = []
+        for arg in args:
+          name     = arg[0]
+          typename = self.typeName (arg[1])
+          dir      = arg[2]
+          unit     = self.notNone (arg[3])
+          desc     = self.notNone (arg[4])
+          extra    = ""
+          if arg[5] != None:
+            extra = extra + "Min: " + str (arg[5])
+          if arg[6] != None:
+            extra = extra + "Max: " + str (arg[6])
+          if arg[7] != None:
+            extra = extra + "MaxLen: " + str (arg[7])
+          if arg[8] != None:
+            extra = extra + "Default: " + str (arg[8])
+          rows.append ((name, typename, dir, unit, extra, desc))
+        titles = ("Argument", "Type", "Direction", "Unit", "Notes", "Description")
+        self.disp.table (caption, titles, rows)
+
+    except:
+      pass
+    self.lock.release ()
+
+  def getClassForId (self, objId):
+    """ Given an object ID, return the class name for the referenced object """
+    for className in self.tables:
+      if objId in self.tables[className]:
+        return className
+    return None
+
+  def do_list (self, data):
+    tokens = data.split ()
+    if len (tokens) == 0:
+      self.listClasses ()
+    elif len (tokens) == 1:
+      self.listObjects (data)
+    else:
+      self.showObjects (tokens)
+
+  def do_schema (self, data):
+    if data == "":
+      self.schemaSummary ()
+    else:
+      self.schemaTable (data)
+
+  def do_call (self, data):
+    print "Not yet implemented"

Propchange: incubator/qpid/trunk/qpid/python/mgmt-cli/managementdata.py
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/qpid/trunk/qpid/python/qpid/management.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/management.py?rev=603034&r1=603033&r2=603034&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/management.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/management.py Mon Dec 10 12:22:23 2007
@@ -31,17 +31,74 @@
 from qpid.content import Content
 from cStringIO    import StringIO
 from codec        import Codec, EOF
+from threading    import Lock
+
+
+class SequenceManager:
+  def __init__ (self):
+    self.lock     = Lock ()
+    self.sequence = 0
+    self.pending  = {}
+
+  def reserve (self, data):
+    self.lock.acquire ()
+    result = self.sequence
+    self.sequence = self.sequence + 1
+    self.pending[result] = data
+    self.lock.release ()
+    return result
+
+  def release (self, seq):
+    data = None
+    self.lock.acquire ()
+    if seq in self.pending:
+      data = self.pending[seq]
+      del self.pending[seq]
+    self.lock.release ()
+    return data
 
-#===================================================================
-# ManagementMetadata
-#
-#    One instance of this class is created for each ManagedBroker.  It
-#    is used to store metadata from the broker which is needed for the
-#    proper interpretation of recevied management content.
-#
-#===================================================================
 class ManagementMetadata:
+  """One instance of this class is created for each ManagedBroker.  It
+     is used to store metadata from the broker which is needed for the
+     proper interpretation of received management content."""
+
+  def encodeValue (self, codec, value, typecode):
+    if   typecode == 1:
+      codec.encode_octet    (int  (value))
+    elif typecode == 2:
+      codec.encode_short    (int  (value))
+    elif typecode == 3:
+      codec.encode_long     (long (value))
+    elif typecode == 4:
+      codec.encode_longlong (long (value))
+    elif typecode == 5:
+      codec.encode_octet    (int  (value))
+    elif typecode == 6:
+      codec.encode_shortstr (value)
+    elif typecode == 7:
+      codec.encode_longstr  (value)
+    else:
+      raise ValueError ("Invalid type code: %d" % typecode)
 
+  def decodeValue (self, codec, typecode):
+    if   typecode == 1:
+      data = codec.decode_octet ()
+    elif typecode == 2:
+      data = codec.decode_short ()
+    elif typecode == 3:
+      data = codec.decode_long ()
+    elif typecode == 4:
+      data = codec.decode_longlong ()
+    elif typecode == 5:
+      data = codec.decode_octet ()
+    elif typecode == 6:
+      data = codec.decode_shortstr ()
+    elif typecode == 7:
+      data = codec.decode_longstr ()
+    else:
+      raise ValueError ("Invalid type code: %d" % typecode)
+    return data
+    
   def parseSchema (self, cls, codec):
     className   = codec.decode_shortstr ()
     configCount = codec.decode_short ()
@@ -100,12 +157,56 @@
       inst = (name, type, unit, desc)
       insts.append (inst)
 
-    # TODO: Handle notification of schema change outbound
+    for idx in range (methodCount):
+      ft = codec.decode_table ()
+      mname    = ft["name"]
+      argCount = ft["argCount"]
+      if "desc" in ft:
+        mdesc = ft["desc"]
+      else:
+        mdesc = None
+
+      args = []
+      for aidx in range (argCount):
+        ft = codec.decode_table ()
+        name    = ft["name"]
+        type    = ft["type"]
+        dir     = ft["dir"].upper ()
+        unit    = None
+        min     = None
+        max     = None
+        maxlen  = None
+        desc    = None
+        default = None
+
+        for key, value in ft.items ():
+          if   key == "unit":
+            unit = value
+          elif key == "min":
+            min = value
+          elif key == "max":
+            max = value
+          elif key == "maxlen":
+            maxlen = value
+          elif key == "desc":
+            desc = value
+          elif key == "default":
+            default = value
+
+        arg = (name, type, dir, unit, desc, min, max, maxlen, default)
+        args.append (arg)
+      methods.append ((mname, mdesc, args))
+
+
     self.schema[(className,'C')] = configs
     self.schema[(className,'I')] = insts
     self.schema[(className,'M')] = methods
     self.schema[(className,'E')] = events
 
+    if self.broker.schema_cb != None:
+      self.broker.schema_cb[1] (self.broker.schema_cb[0], className,
+                                configs, insts, methods, events)
+
   def parseContent (self, cls, codec):
     if cls == 'C' and self.broker.config_cb == None:
       return
@@ -127,22 +228,7 @@
     for element in self.schema[(className,cls)][:]:
       tc   = element[1]
       name = element[0]
-      if   tc == 1: # TODO: Define constants for these
-        data = codec.decode_octet ()
-      elif tc == 2:
-        data = codec.decode_short ()
-      elif tc == 3:
-        data = codec.decode_long ()
-      elif tc == 4:
-        data = codec.decode_longlong ()
-      elif tc == 5:
-        data = codec.decode_octet ()
-      elif tc == 6:
-        data = codec.decode_shortstr ()
-      elif tc == 7:
-        data = codec.decode_longstr ()
-      else:
-        raise ValueError ("Invalid type code: %d" % tc)
+      data = self.decodeValue (codec, tc)
       row.append ((name, data))
 
     if cls == 'C':
@@ -168,14 +254,9 @@
     self.schema = {}
 
 
-#===================================================================
-# ManagedBroker
-#
-#    An object of this class represents a connection (over AMQP) to a
-#    single managed broker.
-#
-#===================================================================
 class ManagedBroker:
+  """An object of this class represents a connection (over AMQP) to a
+     single managed broker."""
 
   mExchange = "qpid.management"
   dExchange = "amq.direct"
@@ -205,18 +286,35 @@
     msg.complete ()
 
   def reply_cb (self, msg):
-    codec = Codec (StringIO (msg.content.body), self.spec)
-    methodId = codec.decode_long ()
+    codec    = Codec (StringIO (msg.content.body), self.spec)
+    sequence = codec.decode_long ()
     status   = codec.decode_long ()
     sText    = codec.decode_shortstr ()
 
-    args = {}
+    data = self.sequenceManager.release (sequence)
+    if data == None:
+      msg.complete ()
+      return
+
+    (userSequence, className, methodName) = data
+
     if status == 0:
-      args["sequence"] = codec.decode_long ()
-      args["body"]     = codec.decode_longstr ()
+      ms = self.metadata.schema[(className,'M')]
+      arglist = None
+      for (mname, mdesc, margs) in ms:
+        if mname == methodName:
+          arglist = margs
+      if arglist == None:
+        msg.complete ()
+        return
+
+      args = {}
+      for arg in arglist:
+        if arg[2].find("O") != -1:
+          args[arg[0]] = self.metadata.decodeValue (codec, arg[1])
 
     if self.method_cb != None:
-      self.method_cb[1] (self.method_cb[0], methodId, status, sText, args)
+      self.method_cb[1] (self.method_cb[0], userSequence, status, sText, args)
 
     msg.complete ()
 
@@ -225,17 +323,18 @@
                 port     = 5672,
                 username = "guest",
                 password = "guest",
-                specfile = "../specs/amqp.0-10-preview.xml"):
+                specfile = "/usr/share/amqp/amqp.0-10-preview.xml"):
 
-    self.spec = qpid.spec.load (specfile)
-    self.client    = None
-    self.channel   = None
-    self.queue     = None
-    self.rqueue    = None
-    self.qname     = None
-    self.rqname    = None
-    self.metadata  = ManagementMetadata (self)
-    self.connected = 0
+    self.spec             = qpid.spec.load (specfile)
+    self.client           = None
+    self.channel          = None
+    self.queue            = None
+    self.rqueue           = None
+    self.qname            = None
+    self.rqname           = None
+    self.metadata         = ManagementMetadata (self)
+    self.sequenceManager  = SequenceManager ()
+    self.connected        = 0
     self.lastConnectError = None
 
     #  Initialize the callback records
@@ -265,17 +364,37 @@
   def instrumentationListener (self, context, callback):
     self.inst_cb = (context, callback)
 
-  def method (self, methodId, objId, className,
+  def method (self, userSequence, objId, className,
               methodName, args=None, packageName="qpid"):
     codec = Codec (StringIO (), self.spec);
-    codec.encode_long     (methodId)
-    codec.encode_longlong (objId)
-    codec.encode_shortstr (self.rqname)
-
-    # TODO: Encode args according to schema
-    if methodName == "echo":
-      codec.encode_long (args["sequence"])
-      codec.encode_longstr (args["body"])
+    sequence = self.sequenceManager.reserve ((userSequence, className, methodName))
+    codec.encode_long     (sequence)    # Method sequence id
+    codec.encode_longlong (objId)       # ID of object
+    codec.encode_shortstr (self.rqname) # name of reply queue
+
+    # Encode args according to schema
+    if (className,'M') not in self.metadata.schema:
+      self.sequenceManager.release (sequence)
+      raise ValueError ("Unknown class name: %s" % className)
+    
+    ms = self.metadata.schema[(className,'M')]
+    arglist = None
+    for (mname, mdesc, margs) in ms:
+      if mname == methodName:
+        arglist = margs
+    if arglist == None:
+      self.sequenceManager.release (sequence)
+      raise ValueError ("Unknown method name: %s" % methodName)
+
+    for arg in arglist:
+      if arg[2].find("I") != -1:
+        value = arg[8]  # default
+        if arg[0] in args:
+          value = args[arg[0]]
+          if value == None:
+            self.sequenceManager.release (sequence)
+            raise ValueError ("Missing non-defaulted argument: %s" % arg[0])
+          self.metadata.encodeValue (codec, value, arg[1])
 
     msg = Content (codec.stream.getvalue ())
     msg["content_type"] = "application/octet-stream"
@@ -325,7 +444,7 @@
       self.connected = 1
 
     except socket.error, e:
-      print "Socket Error Detected:", e[1]
+      print "Socket Error:", e[1]
       self.lastConnectError = e
       raise
     except:

Added: incubator/qpid/trunk/qpid/python/qpid/management.py.rej
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/management.py.rej?rev=603034&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/management.py.rej (added)
+++ incubator/qpid/trunk/qpid/python/qpid/management.py.rej Mon Dec 10 12:22:23 2007
@@ -0,0 +1,457 @@
+***************
+*** 18,24 ****
+  #
+  
+  """
+- Management classes for AMQP
+  """
+  
+  import qpid
+--- 18,24 ----
+  #
+  
+  """
++ Management API for Qpid
+  """
+  
+  import qpid
+***************
+*** 42,91 ****
+  #===================================================================
+  class ManagementMetadata:
+  
+-   def parseSchema (self, cls, oid, len, codec):
+-     #print "Schema Record: objId=", oid
+  
+-     config = []
+-     inst   = []
+-     while 1:
+-       flags = codec.decode_octet ()
+-       if flags == 0x80:
+-         break
+  
+-       tc   = codec.decode_octet ()
+-       name = codec.decode_shortstr ()
+-       desc = codec.decode_shortstr ()
+  
+-       if flags & 1: # TODO: Define constants for these
+-         config.append ((tc, name, desc))
+-       if (flags & 1) == 0 or (flags & 2) == 2:
+-         inst.append   ((tc, name, desc))
+  
+      # TODO: Handle notification of schema change outbound
+-     self.schema[(oid,'C')] = config
+-     self.schema[(oid,'I')] = inst
+  
+-   def parseContent (self, cls, oid, len, codec):
+-     #print "Content Record: Class=", cls, ", objId=", oid
+- 
+      if cls == 'C' and self.broker.config_cb == None:
+        return
+      if cls == 'I' and self.broker.inst_cb == None:
+        return
+  
+-     if (oid,cls) not in self.schema:
+        return
+  
+      row        = []
+      timestamps = []
+  
+-     timestamps.append (codec.decode_longlong ()); # Current Time
+-     timestamps.append (codec.decode_longlong ()); # Create Time
+-     timestamps.append (codec.decode_longlong ()); # Delete Time
+  
+-     for element in self.schema[(oid,cls)][:]:
+-       tc   = element[0]
+-       name = element[1]
+        if   tc == 1: # TODO: Define constants for these
+          data = codec.decode_octet ()
+        elif tc == 2:
+--- 42,132 ----
+  #===================================================================
+  class ManagementMetadata:
+  
++   def parseSchema (self, cls, codec):
++     className   = codec.decode_shortstr ()
++     configCount = codec.decode_short ()
++     instCount   = codec.decode_short ()
++     methodCount = codec.decode_short ()
++     eventCount  = codec.decode_short ()
+  
++     configs = []
++     insts   = []
++     methods = []
++     events  = []
+  
++     configs.append (("id", 4, "", "", 1, 1, None, None, None, None, None))
++     insts.append   (("id", 4, None, None))
+  
++     for idx in range (configCount):
++       ft = codec.decode_table ()
++       name   = ft["name"]
++       type   = ft["type"]
++       access = ft["access"]
++       index  = ft["index"]
++       unit   = None
++       min    = None
++       max    = None
++       maxlen = None
++       desc   = None
+  
++       for key, value in ft.items ():
++         if   key == "unit":
++           unit = value
++         elif key == "min":
++           min = value
++         elif key == "max":
++           max = value
++         elif key == "maxlen":
++           maxlen = value
++         elif key == "desc":
++           desc = value
++ 
++       config = (name, type, unit, desc, access, index, min, max, maxlen)
++       configs.append (config)
++ 
++     for idx in range (instCount):
++       ft = codec.decode_table ()
++       name   = ft["name"]
++       type   = ft["type"]
++       unit   = None
++       desc   = None
++ 
++       for key, value in ft.items ():
++         if   key == "unit":
++           unit = value
++         elif key == "desc":
++           desc = value
++ 
++       inst = (name, type, unit, desc)
++       insts.append (inst)
++ 
+      # TODO: Handle notification of schema change outbound
++     self.schema[(className,'C')] = configs
++     self.schema[(className,'I')] = insts
++     self.schema[(className,'M')] = methods
++     self.schema[(className,'E')] = events
+  
++   def parseContent (self, cls, codec):
+      if cls == 'C' and self.broker.config_cb == None:
+        return
+      if cls == 'I' and self.broker.inst_cb == None:
+        return
+  
++     className = codec.decode_shortstr ()
++ 
++     if (className,cls) not in self.schema:
+        return
+  
+      row        = []
+      timestamps = []
+  
++     timestamps.append (codec.decode_longlong ())  # Current Time
++     timestamps.append (codec.decode_longlong ())  # Create Time
++     timestamps.append (codec.decode_longlong ())  # Delete Time
+  
++     for element in self.schema[(className,cls)][:]:
++       tc   = element[1]
++       name = element[0]
+        if   tc == 1: # TODO: Define constants for these
+          data = codec.decode_octet ()
+        elif tc == 2:
+***************
+*** 98,130 ****
+          data = codec.decode_octet ()
+        elif tc == 6:
+          data = codec.decode_shortstr ()
+        row.append ((name, data))
+  
+      if cls == 'C':
+-       self.broker.config_cb[1] (self.broker.config_cb[0], oid, row, timestamps)
+-     if cls == 'I':
+-       self.broker.inst_cb[1]   (self.broker.inst_cb[0], oid, row, timestamps)
+  
+    def parse (self, codec):
+-     try:
+-       opcode = chr (codec.decode_octet ())
+-     except EOF:
+-       return 0
+  
+-     cls = chr (codec.decode_octet ())
+-     oid = codec.decode_short ()
+-     len = codec.decode_long  ()
+- 
+-     if len < 8:
+-       raise ValueError ("parse error: value of length field too small")
+- 
+      if opcode == 'S':
+-       self.parseSchema (cls, oid, len, codec)
+  
+-     if opcode == 'C':
+-       self.parseContent (cls, oid, len, codec)
+  
+-     return 1
+  
+    def __init__ (self, broker):
+      self.broker = broker
+--- 139,167 ----
+          data = codec.decode_octet ()
+        elif tc == 6:
+          data = codec.decode_shortstr ()
++       elif tc == 7:
++         data = codec.decode_longstr ()
++       else:
++         raise ValueError ("Invalid type code: %d" % tc)
+        row.append ((name, data))
+  
+      if cls == 'C':
++       self.broker.config_cb[1] (self.broker.config_cb[0], className, row, timestamps)
++     elif cls == 'I':
++       self.broker.inst_cb[1]   (self.broker.inst_cb[0], className, row, timestamps)
+  
+    def parse (self, codec):
++     opcode = chr (codec.decode_octet ())
++     cls    = chr (codec.decode_octet ())
+  
+      if opcode == 'S':
++       self.parseSchema (cls, codec)
+  
++     elif opcode == 'C':
++       self.parseContent (cls, codec)
+  
++     else:
++       raise ValueError ("Unknown opcode: %c" % opcode);
+  
+    def __init__ (self, broker):
+      self.broker = broker
+***************
+*** 140,146 ****
+  #===================================================================
+  class ManagedBroker:
+  
+-   exchange = "qpid.management"
+  
+    def checkHeader (self, codec):
+      octet = chr (codec.decode_octet ())
+--- 177,184 ----
+  #===================================================================
+  class ManagedBroker:
+  
++   mExchange = "qpid.management"
++   dExchange = "amq.direct"
+  
+    def checkHeader (self, codec):
+      octet = chr (codec.decode_octet ())
+***************
+*** 157,225 ****
+        return 0
+      return 1
+  
+-   def receive_cb (self, msg):
+      codec = Codec (StringIO (msg.content.body), self.spec)
+  
+      if self.checkHeader (codec) == 0:
+        raise ValueError ("outer header invalid");
+  
+-     while self.metadata.parse (codec):
+-       pass
+  
+      msg.complete ()
+  
+-   def __init__ (self, host = "localhost", port = 5672,
+-                 username = "guest", password = "guest"):
+  
+-     self.spec = qpid.spec.load ("../specs/amqp.0-10-preview.xml")
+-     self.client   = None
+-     self.channel  = None
+-     self.queue    = None
+-     self.qname    = None
+-     self.metadata = ManagementMetadata (self)
+  
+      #  Initialize the callback records
+      self.schema_cb = None
+      self.config_cb = None
+      self.inst_cb   = None
+  
+      self.host     = host
+      self.port     = port
+      self.username = username
+      self.password = password
+  
+    def schemaListener (self, context, callback):
+      self.schema_cb = (context, callback)
+  
+    def configListener (self, context, callback):
+      self.config_cb = (context, callback)
+  
+    def instrumentationListener (self, context, callback):
+      self.inst_cb = (context, callback)
+  
+    def start (self):
+-     print "Connecting to broker", self.host
+  
+      try:
+        self.client = Client (self.host, self.port, self.spec)
+        self.client.start ({"LOGIN": self.username, "PASSWORD": self.password})
+        self.channel = self.client.channel (1)
+-       response = self.channel.session_open (detached_lifetime=300)
+-       self.qname = "mgmt-" + base64.urlsafe_b64encode(response.session_id)
+  
+-       self.channel.queue_declare (queue=self.qname, exclusive=1, auto_delete=1)
+-       self.channel.queue_bind (exchange=ManagedBroker.exchange, queue=self.qname,
+-                                routing_key="mgmt")
+-       self.channel.message_subscribe (queue=self.qname, destination="dest")
+-       self.queue = self.client.queue ("dest")
+-       self.queue.listen (self.receive_cb)
+  
+-       self.channel.message_flow_mode (destination="dest", mode=1)
+-       self.channel.message_flow (destination="dest", unit=0, value=0xFFFFFFFF)
+-       self.channel.message_flow (destination="dest", unit=1, value=0xFFFFFFFF)
+  
+      except socket.error, e:
+        print "Socket Error Detected:", e[1]
+        raise
+      except:
+        raise
+--- 195,335 ----
+        return 0
+      return 1
+  
++   def publish_cb (self, msg):
+      codec = Codec (StringIO (msg.content.body), self.spec)
+  
+      if self.checkHeader (codec) == 0:
+        raise ValueError ("outer header invalid");
+  
++     self.metadata.parse (codec)
++     msg.complete ()
+  
++   def reply_cb (self, msg):
++     codec = Codec (StringIO (msg.content.body), self.spec)
++     methodId = codec.decode_long ()
++     status   = codec.decode_long ()
++     sText    = codec.decode_shortstr ()
++ 
++     args = {}
++     if status == 0:
++       args["sequence"] = codec.decode_long ()
++       args["body"]     = codec.decode_longstr ()
++ 
++     if self.method_cb != None:
++       self.method_cb[1] (self.method_cb[0], methodId, status, sText, args)
++ 
+      msg.complete ()
+  
++   def __init__ (self,
++                 host     = "localhost",
++                 port     = 5672,
++                 username = "guest",
++                 password = "guest",
++                 specfile = "../specs/amqp.0-10-preview.xml"):
+  
++     self.spec = qpid.spec.load (specfile)
++     self.client    = None
++     self.channel   = None
++     self.queue     = None
++     self.rqueue    = None
++     self.qname     = None
++     self.rqname    = None
++     self.metadata  = ManagementMetadata (self)
++     self.connected = 0
++     self.lastConnectError = None
+  
+      #  Initialize the callback records
++     self.status_cb = None
+      self.schema_cb = None
+      self.config_cb = None
+      self.inst_cb   = None
++     self.method_cb = None
+  
+      self.host     = host
+      self.port     = port
+      self.username = username
+      self.password = password
+  
++   def statusListener (self, context, callback):
++     self.status_cb = (context, callback)
++ 
+    def schemaListener (self, context, callback):
+      self.schema_cb = (context, callback)
+  
+    def configListener (self, context, callback):
+      self.config_cb = (context, callback)
+  
++   def methodListener (self, context, callback):
++     self.method_cb = (context, callback)
++ 
+    def instrumentationListener (self, context, callback):
+      self.inst_cb = (context, callback)
+  
++   def method (self, methodId, objId, className,
++               methodName, args=None, packageName="qpid"):
++     codec = Codec (StringIO (), self.spec);
++     codec.encode_long     (methodId)
++     codec.encode_longlong (objId)
++     codec.encode_shortstr (self.rqname)
++ 
++     # TODO: Encode args according to schema
++     if methodName == "echo":
++       codec.encode_long (args["sequence"])
++       codec.encode_longstr (args["body"])
++ 
++     msg = Content (codec.stream.getvalue ())
++     msg["content_type"] = "application/octet-stream"
++     msg["routing_key"]  = "method." + packageName + "." + className + "." + methodName
++     msg["reply_to"]     = self.spec.struct ("reply_to")
++     self.channel.message_transfer (destination="qpid.management", content=msg)
++ 
++   def isConnected (self):
++     return connected
++ 
+    def start (self):
++     print "Connecting to broker %s:%d" % (self.host, self.port)
+  
+      try:
+        self.client = Client (self.host, self.port, self.spec)
+        self.client.start ({"LOGIN": self.username, "PASSWORD": self.password})
+        self.channel = self.client.channel (1)
++       response = self.channel.session_open (detached_lifetime=10)
++       self.qname  = "mgmt-"  + base64.urlsafe_b64encode (response.session_id)
++       self.rqname = "reply-" + base64.urlsafe_b64encode (response.session_id)
+  
++       self.channel.queue_declare (queue=self.qname,  exclusive=1, auto_delete=1)
++       self.channel.queue_declare (queue=self.rqname, exclusive=1, auto_delete=1)
++       
++       self.channel.queue_bind (exchange=ManagedBroker.mExchange, queue=self.qname,
++                                routing_key="mgmt.#")
++       self.channel.queue_bind (exchange=ManagedBroker.dExchange, queue=self.rqname,
++                                routing_key=self.rqname)
+  
++       self.channel.message_subscribe (queue=self.qname,  destination="mdest")
++       self.channel.message_subscribe (queue=self.rqname, destination="rdest")
+  
++       self.queue = self.client.queue ("mdest")
++       self.queue.listen (self.publish_cb)
++ 
++       self.channel.message_flow_mode (destination="mdest", mode=1)
++       self.channel.message_flow (destination="mdest", unit=0, value=0xFFFFFFFF)
++       self.channel.message_flow (destination="mdest", unit=1, value=0xFFFFFFFF)
++ 
++       self.rqueue = self.client.queue ("rdest")
++       self.rqueue.listen (self.reply_cb)
++ 
++       self.channel.message_flow_mode (destination="rdest", mode=1)
++       self.channel.message_flow (destination="rdest", unit=0, value=0xFFFFFFFF)
++       self.channel.message_flow (destination="rdest", unit=1, value=0xFFFFFFFF)
++ 
++       self.connected = 1
++ 
+      except socket.error, e:
+        print "Socket Error Detected:", e[1]
++       self.lastConnectError = e
+        raise
+      except:
+        raise
++ 
++   def stop (self):
++     pass

Modified: incubator/qpid/trunk/qpid/specs/management-schema.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/specs/management-schema.xml?rev=603034&r1=603033&r2=603034&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/specs/management-schema.xml (original)
+++ incubator/qpid/trunk/qpid/specs/management-schema.xml Mon Dec 10 12:22:23 2007
@@ -109,27 +109,6 @@
     <configElement name="durable"         type="bool"   access="RC"/>
     <configElement name="autoDelete"      type="bool"   access="RC"/>
     <configElement name="exclusive"       type="bool"   access="RC"/>
-    <configElement name="pageMemoryLimit" type="uint32" access="RO"/>
-
-    <!-- Persistent Journal Support -->
-    <instElement name="journalLocation"            type="sstr"                  desc="Logical directory on disk"/>
-    <instElement name="journalBaseFileName"        type="sstr"                  desc="Base filename prefix for journal"/>
-    <instElement name="journalInitialFileCount"    type="uint32"                desc="Number of files initially allocated to this journal"/>
-    <instElement name="journalCurrentFileCount"    type="uint32"                desc="Number of files currently allocated to this journal"/>
-    <instElement name="journalDataFileSize"        type="uint32"  unit="byte"   desc="Size of each journal data file"/>
-    <instElement name="journalFreeFileCount"       type="hilo32"                desc="Number of files free on this journal. Includes free files trapped in holes."/>
-    <instElement name="journalAvailableFileCount"  type="hilo32"                desc="Number of files available to be written.  Excluding holes"/>
-    <instElement name="journalRecordDepth"         type="hilo32"  unit="record" desc="Number of enqueued records (durable messages)"/>
-    <instElement name="journalRecordEnqueues"      type="count64" unit="record" desc="Total enqueued records on journal"/>
-    <instElement name="journalRecordDequeues"      type="count64" unit="record" desc="Total dequeued records on journal"/>
-    <instElement name="journalWriteWaitFailures"   type="count64" unit="record" desc="AIO Wait failures on write"/>
-    <instElement name="journalWriteBusyFailures"   type="count64" unit="record" desc="AIO Busy failures on write"/>
-    <instElement name="journalReadRecordCount"     type="count64" unit="record" desc="Records read from the journal"/>
-    <instElement name="journalReadBusyFailures"    type="count64" unit="record" desc="AIO Busy failures on read"/>
-    <instElement name="journalWritePageCacheDepth" type="hilo32"  unit="page"   desc="Current depth of write-page-cache"/>
-    <instElement name="journalWritePageSize"       type="uint32"  unit="byte"   desc="Page size in write-page-cache"/>
-    <instElement name="journalReadPageCacheDepth"  type="hilo32"  unit="page"   desc="Current depth of read-page-cache"/>
-    <instElement name="journalReadPageSize"        type="uint32"  unit="byte"   desc="Page size in read-page-cache"/>
 
     <instElement name="msgTotalEnqueues"    type="count64" unit="message"     desc="Total messages enqueued"/>
     <instElement name="msgTotalDequeues"    type="count64" unit="message"     desc="Total messages dequeued"/>