You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cc...@apache.org on 2008/02/28 19:55:25 UTC

svn commit: r632087 [2/2] - in /incubator/qpid/trunk/qpid: cpp/managementgen/ cpp/managementgen/templates/ cpp/src/ cpp/src/qpid/broker/ cpp/src/qpid/framing/ cpp/src/qpid/management/ python/mgmt-cli/ python/qpid/ specs/

Modified: incubator/qpid/trunk/qpid/python/qpid/management.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/management.py?rev=632087&r1=632086&r2=632087&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/management.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/management.py Thu Feb 28 10:55:21 2008
@@ -35,12 +35,14 @@
 
 
 class SequenceManager:
+  """ Manage sequence numbers for asynchronous method calls """
   def __init__ (self):
     self.lock     = Lock ()
     self.sequence = 0
     self.pending  = {}
 
   def reserve (self, data):
+    """ Reserve a unique sequence number """
     self.lock.acquire ()
     result = self.sequence
     self.sequence = self.sequence + 1
@@ -49,6 +51,7 @@
     return result
 
   def release (self, seq):
+    """ Release a reserved sequence number """
     data = None
     self.lock.acquire ()
     if seq in self.pending:
@@ -57,12 +60,172 @@
     self.lock.release ()
     return data
 
-class ManagementMetadata:
-  """One instance of this class is created for each ManagedBroker.  It
-     is used to store metadata from the broker which is needed for the
-     proper interpretation of received management content."""
+
+class managementChannel:
+  """ This class represents a connection to an AMQP broker. """
+
+  def __init__ (self, ch, topicCb, replyCb, cbContext=None):
+    """ Given a channel on an established AMQP broker connection, this method
+    opens a session and performs all of the declarations and bindings needed
+    to participate in the management protocol. """
+    response         = ch.session_open (detached_lifetime=300)
+    self.topicName   = "mgmt-"  + base64.urlsafe_b64encode (response.session_id)
+    self.replyName   = "reply-" + base64.urlsafe_b64encode (response.session_id)
+    self.qpidChannel = ch
+    self.tcb         = topicCb
+    self.rcb         = replyCb
+    self.context     = cbContext
+
+    ch.queue_declare (queue=self.topicName, exclusive=1, auto_delete=1)
+    ch.queue_declare (queue=self.replyName, exclusive=1, auto_delete=1)
+
+    ch.queue_bind (exchange="qpid.management",
+                   queue=self.topicName, routing_key="mgmt.#")
+    ch.queue_bind (exchange="amq.direct",
+                   queue=self.replyName, routing_key=self.replyName)
+    ch.message_subscribe (queue=self.topicName, destination="tdest")
+    ch.message_subscribe (queue=self.replyName, destination="rdest")
+
+    ch.client.queue ("tdest").listen (self.topicCb)
+    ch.client.queue ("rdest").listen (self.replyCb)
+
+    ch.message_flow_mode (destination="tdest", mode=1)
+    ch.message_flow (destination="tdest", unit=0, value=0xFFFFFFFF)
+    ch.message_flow (destination="tdest", unit=1, value=0xFFFFFFFF)
+
+    ch.message_flow_mode (destination="rdest", mode=1)
+    ch.message_flow (destination="rdest", unit=0, value=0xFFFFFFFF)
+    ch.message_flow (destination="rdest", unit=1, value=0xFFFFFFFF)
+
+  def topicCb (self, msg):
+    """ Receive messages via the topic queue on this channel. """
+    self.tcb (self, msg)
+
+  def replyCb (self, msg):
+    """ Receive messages via the reply queue on this channel. """
+    self.rcb (self, msg)
+
+  def send (self, exchange, msg):
+    self.qpidChannel.message_transfer (destination=exchange, content=msg)
+
+
+class managementClient:
+  """ This class provides an API for access to management data on the AMQP
+  network.  It implements the management protocol and manages the management
+  schemas as advertised by the various management agents in the network. """
+
+  #========================================================
+  # User API - interacts with the class's user
+  #========================================================
+  def __init__ (self, amqpSpec, ctrlCb, configCb, instCb, methodCb=None):
+    self.spec     = amqpSpec
+    self.ctrlCb   = ctrlCb
+    self.configCb = configCb
+    self.instCb   = instCb
+    self.methodCb = methodCb
+    self.schemaCb = None
+    self.eventCb  = None
+    self.channels = []
+    self.seqMgr   = SequenceManager ()
+    self.schema   = {}
+    self.packages = {}
+
+  def schemaListener (self, schemaCb):
+    """ Optionally register a callback to receive details of the schema of
+    managed objects in the network. """
+    self.schemaCb = schemaCb
+
+  def eventListener (self, eventCb):
+    """ Optionally register a callback to receive events from managed objects
+    in the network. """
+    self.eventCb = eventCb
+
+  def addChannel (self, channel):
+    """ Register a new channel. """
+    self.channels.append (channel)
+    codec = Codec (StringIO (), self.spec)
+    self.setHeader (codec, ord ('H'))
+    msg = Content  (codec.stream.getvalue ())
+    msg["content_type"] = "application/octet-stream"
+    msg["routing_key"]  = "agent"
+    msg["reply_to"]     = self.spec.struct ("reply_to")
+    msg["reply_to"]["exchange_name"] = "amq.direct"
+    msg["reply_to"]["routing_key"]   = channel.replyName
+    channel.send ("qpid.management", msg)
+
+  def removeChannel (self, channel):
+    """ Remove a previously added channel from management. """
+    self.channels.remove (channel)
+
+  def callMethod (self, channel, userSequence, objId, className, methodName, args=None):
+    """ Invoke a method on a managed object. """
+    self.method (channel, userSequence, objId, className, methodName, args)
+
+  #========================================================
+  # Channel API - interacts with registered channel objects
+  #========================================================
+  def topicCb (self, ch, msg):
+    """ Receive messages via the topic queue of a particular channel. """
+    codec = Codec (StringIO (msg.content.body), self.spec)
+    hdr   = self.checkHeader (codec)
+    if hdr == None:
+      raise ValueError ("outer header invalid");
+    self.parse (ch, codec, hdr[0], hdr[1])
+    msg.complete ()
+
+  def replyCb (self, ch, msg):
+    """ Receive messages via the reply queue of a particular channel. """
+    codec = Codec (StringIO (msg.content.body), self.spec)
+    hdr   = self.checkHeader (codec)
+    if hdr == None:
+      msg.complete ()
+      return
+
+    if   hdr[0] == 'm':
+      self.handleMethodReply (ch, codec)
+    elif hdr[0] == 'I':
+      self.handleInit (ch, codec)
+    elif hdr[0] == 'p':
+      self.handlePackageInd (ch, codec)
+    elif hdr[0] == 'q':
+      self.handleClassInd (ch, codec)
+    else:
+      self.parse (ch, codec, hdr[0], hdr[1])
+    msg.complete ()
+
+  #========================================================
+  # Internal Functions
+  #========================================================
+  def setHeader (self, codec, opcode, cls = 0):
+    """ Compose the header of a management message. """
+    codec.encode_octet (ord ('A'))
+    codec.encode_octet (ord ('M'))
+    codec.encode_octet (ord ('0'))
+    codec.encode_octet (ord ('1'))
+    codec.encode_octet (opcode)
+    codec.encode_octet (cls)
+
+  def checkHeader (self, codec):
+    """ Check the header of a management message and extract the opcode and
+    class. """
+    octet = chr (codec.decode_octet ())
+    if octet != 'A':
+      return None
+    octet = chr (codec.decode_octet ())
+    if octet != 'M':
+      return None
+    octet = chr (codec.decode_octet ())
+    if octet != '0':
+      return None
+    octet = chr (codec.decode_octet ())
+    if octet != '1':
+      return None
+    opcode = chr (codec.decode_octet ())
+    cls    = chr (codec.decode_octet ())
+    return (opcode, cls)
 
   def encodeValue (self, codec, value, typecode):
+    """ Encode, into the codec, a value based on its typecode. """
     if   typecode == 1:
       codec.encode_octet    (int  (value))
     elif typecode == 2:
@@ -85,10 +248,15 @@
       codec.encode_longlong (long (value))
     elif typecode == 11: # BOOL
       codec.encode_octet    (int  (value))
+    elif typecode == 12: # FLOAT
+      codec.encode_float    (float (value))
+    elif typecode == 13: # DOUBLE
+      codec.encode_double   (double (value))
     else:
       raise ValueError ("Invalid type code: %d" % typecode)
 
   def decodeValue (self, codec, typecode):
+    """ Decode, from the codec, a value based on its typecode. """
     if   typecode == 1:
       data = codec.decode_octet ()
     elif typecode == 2:
@@ -111,17 +279,119 @@
       data = codec.decode_longlong ()
     elif typecode == 11: # BOOL
       data = codec.decode_octet ()
+    elif typecode == 12: # FLOAT
+      data = codec.decode_float ()
+    elif typecode == 13: # DOUBLE
+      data = codec.decode_double ()
     else:
       raise ValueError ("Invalid type code: %d" % typecode)
     return data
+
+  def handleMethodReply (self, ch, codec):
+    sequence = codec.decode_long ()
+    status   = codec.decode_long ()
+    sText    = codec.decode_shortstr ()
+
+    data = self.seqMgr.release (sequence)
+    if data == None:
+      return
+
+    (userSequence, classId, methodName) = data
+    args = {}
+
+    if status == 0:
+      schemaClass = self.schema[classId]
+      ms = schemaClass['M']
+      arglist = None
+      for mname in ms:
+        (mdesc, margs) = ms[mname]
+        if mname == methodName:
+          arglist = margs
+      if arglist == None:
+        return
+
+      for arg in arglist:
+        if arg[2].find("O") != -1:
+          args[arg[0]] = self.decodeValue (codec, arg[1])
+
+    if self.methodCb != None:
+      self.methodCb (ch.context, userSequence, status, sText, args)
+
+  def handleInit (self, ch, codec):
+    len = codec.decode_short ()
+    data = codec.decode_raw (len)
+    if self.ctrlCb != None:
+      self.ctrlCb (ch.context, len, data)
+
+    # Send a package request
+    sendCodec = Codec (StringIO (), self.spec)
+    self.setHeader (sendCodec, ord ('P'))
+    smsg = Content  (sendCodec.stream.getvalue ())
+    smsg["content_type"] = "application/octet-stream"
+    smsg["routing_key"]  = "agent"
+    smsg["reply_to"]     = self.spec.struct ("reply_to")
+    smsg["reply_to"]["exchange_name"] = "amq.direct"
+    smsg["reply_to"]["routing_key"]   = ch.replyName
+    ch.send ("qpid.management", smsg)
     
-  def parseSchema (self, cls, codec):
+  def handlePackageInd (self, ch, codec):
+    pname = codec.decode_shortstr ()
+    if pname not in self.packages:
+      self.packages[pname] = {}
+
+      # Send a class request
+      sendCodec = Codec (StringIO (), self.spec)
+      self.setHeader (sendCodec, ord ('Q'))
+      sendCodec.encode_shortstr (pname)
+      smsg = Content  (sendCodec.stream.getvalue ())
+      smsg["content_type"] = "application/octet-stream"
+      smsg["routing_key"]  = "agent"
+      smsg["reply_to"]     = self.spec.struct ("reply_to")
+      smsg["reply_to"]["exchange_name"] = "amq.direct"
+      smsg["reply_to"]["routing_key"]   = ch.replyName
+      ch.send ("qpid.management", smsg)
+
+  def handleClassInd (self, ch, codec):
+    pname = codec.decode_shortstr ()
+    cname = codec.decode_shortstr ()
+    hash  = codec.decode_bin128   ()
+    if pname not in self.packages:
+      return
+
+    if (cname, hash) not in self.packages[pname]:
+      # Send a schema request
+      sendCodec = Codec (StringIO (), self.spec)
+      self.setHeader (sendCodec, ord ('S'))
+      sendCodec.encode_shortstr (pname)
+      sendCodec.encode_shortstr (cname)
+      sendCodec.encode_bin128   (hash)
+      smsg = Content  (sendCodec.stream.getvalue ())
+      smsg["content_type"] = "application/octet-stream"
+      smsg["routing_key"]  = "agent"
+      smsg["reply_to"]     = self.spec.struct ("reply_to")
+      smsg["reply_to"]["exchange_name"] = "amq.direct"
+      smsg["reply_to"]["routing_key"]   = ch.replyName
+      ch.send ("qpid.management", smsg)
+
+  def parseSchema (self, ch, cls, codec):
+    """ Parse a received schema-description message. """
+    packageName = codec.decode_shortstr ()
     className   = codec.decode_shortstr ()
+    hash        = codec.decode_bin128 ()
     configCount = codec.decode_short ()
     instCount   = codec.decode_short ()
     methodCount = codec.decode_short ()
     eventCount  = codec.decode_short ()
 
+    if packageName not in self.packages:
+      return
+    if (className, hash) in self.packages[packageName]:
+      return
+
+    classKey = (packageName, className, hash)
+    if classKey in self.schema:
+      return
+
     configs = []
     insts   = []
     methods = {}
@@ -213,25 +483,29 @@
         args.append (arg)
       methods[mname] = (mdesc, args)
 
-
-    self.schema[(className,'C')] = configs
-    self.schema[(className,'I')] = insts
-    self.schema[(className,'M')] = methods
-    self.schema[(className,'E')] = events
-
-    if self.broker.schema_cb != None:
-      self.broker.schema_cb[1] (self.broker.schema_cb[0], className,
-                                configs, insts, methods, events)
-
-  def parseContent (self, cls, codec):
-    if cls == 'C' and self.broker.config_cb == None:
+    schemaClass = {}
+    schemaClass['C'] = configs
+    schemaClass['I'] = insts
+    schemaClass['M'] = methods
+    schemaClass['E'] = events
+    self.schema[classKey] = schemaClass
+
+    if self.schemaCb != None:
+      self.schemaCb (ch.context, classKey, configs, insts, methods, events)
+
+  def parseContent (self, ch, cls, codec):
+    """ Parse a received content message. """
+    if cls == 'C' and self.configCb == None:
       return
-    if cls == 'I' and self.broker.inst_cb == None:
+    if cls == 'I' and self.instCb == None:
       return
 
-    className = codec.decode_shortstr ()
+    packageName = codec.decode_shortstr ()
+    className   = codec.decode_shortstr ()
+    hash        = codec.decode_bin128 ()
+    classKey    = (packageName, className, hash)
 
-    if (className,cls) not in self.schema:
+    if classKey not in self.schema:
       return
 
     row        = []
@@ -241,184 +515,49 @@
     timestamps.append (codec.decode_longlong ())  # Create Time
     timestamps.append (codec.decode_longlong ())  # Delete Time
 
-    for element in self.schema[(className,cls)][:]:
+    schemaClass = self.schema[classKey]
+    for element in schemaClass[cls][:]:
       tc   = element[1]
       name = element[0]
       data = self.decodeValue (codec, tc)
       row.append ((name, data))
 
-    if cls == 'C':
-      self.broker.config_cb[1] (self.broker.config_cb[0], className, row, timestamps)
+    if   cls == 'C':
+      self.configCb (ch.context, classKey, row, timestamps)
     elif cls == 'I':
-      self.broker.inst_cb[1]   (self.broker.inst_cb[0], className, row, timestamps)
-
-  def parse (self, codec, opcode, cls):
-    if opcode == 'S':
-      self.parseSchema (cls, codec)
+      self.instCb   (ch.context, classKey, row, timestamps)
 
+  def parse (self, ch, codec, opcode, cls):
+    """ Parse a message received from the topic queue. """
+    if opcode   == 's':
+      self.parseSchema  (ch, cls, codec)
     elif opcode == 'C':
-      self.parseContent (cls, codec)
-
+      self.parseContent (ch, cls, codec)
     else:
       raise ValueError ("Unknown opcode: %c" % opcode);
 
-  def __init__ (self, broker):
-    self.broker = broker
-    self.schema = {}
-
-
-class ManagedBroker:
-  """An object of this class represents a connection (over AMQP) to a
-     single managed broker."""
-
-  mExchange = "qpid.management"
-  dExchange = "amq.direct"
-
-  def setHeader (self, codec, opcode, cls = 0):
-    codec.encode_octet (ord ('A'))
-    codec.encode_octet (ord ('M'))
-    codec.encode_octet (ord ('0'))
-    codec.encode_octet (ord ('1'))
-    codec.encode_octet (opcode)
-    codec.encode_octet (cls)
-
-  def checkHeader (self, codec):
-    octet = chr (codec.decode_octet ())
-    if octet != 'A':
-      return None
-    octet = chr (codec.decode_octet ())
-    if octet != 'M':
-      return None
-    octet = chr (codec.decode_octet ())
-    if octet != '0':
-      return None
-    octet = chr (codec.decode_octet ())
-    if octet != '1':
-      return None
-    opcode = chr (codec.decode_octet ())
-    cls    = chr (codec.decode_octet ())
-    return (opcode, cls)
-
-  def publish_cb (self, msg):
-    codec = Codec (StringIO (msg.content.body), self.spec)
-
-    hdr = self.checkHeader (codec)
-    if hdr == None:
-      raise ValueError ("outer header invalid");
-
-    self.metadata.parse (codec, hdr[0], hdr[1])
-    msg.complete ()
-
-  def reply_cb (self, msg):
-    codec    = Codec (StringIO (msg.content.body), self.spec)
-    hdr = self.checkHeader (codec)
-    if hdr == None:
-      msg.complete ()
-      return
-    if hdr[0] != 'R':
-      msg.complete ()
-      return
-
-    sequence = codec.decode_long  ()
-    status   = codec.decode_long  ()
-    sText    = codec.decode_shortstr ()
-
-    data = self.sequenceManager.release (sequence)
-    if data == None:
-      msg.complete ()
-      return
-
-    (userSequence, className, methodName) = data
-    args = {}
-
-    if status == 0:
-      ms = self.metadata.schema[(className,'M')]
-      arglist = None
-      for mname in ms:
-        (mdesc, margs) = ms[mname]
-        if mname == methodName:
-          arglist = margs
-      if arglist == None:
-        msg.complete ()
-        return
-
-      for arg in arglist:
-        if arg[2].find("O") != -1:
-          args[arg[0]] = self.metadata.decodeValue (codec, arg[1])
-
-    if self.method_cb != None:
-      self.method_cb[1] (self.method_cb[0], userSequence, status, sText, args)
-
-    msg.complete ()
-
-  def __init__ (self,
-                host     = "localhost",
-                port     = 5672,
-                username = "guest",
-                password = "guest",
-                specfile = "/usr/share/amqp/amqp.0-10-preview.xml"):
-
-    self.spec             = qpid.spec.load (specfile)
-    self.client           = None
-    self.channel          = None
-    self.queue            = None
-    self.rqueue           = None
-    self.qname            = None
-    self.rqname           = None
-    self.metadata         = ManagementMetadata (self)
-    self.sequenceManager  = SequenceManager ()
-    self.connected        = 0
-    self.lastConnectError = None
-
-    #  Initialize the callback records
-    self.status_cb = None
-    self.schema_cb = None
-    self.config_cb = None
-    self.inst_cb   = None
-    self.method_cb = None
-
-    self.host     = host
-    self.port     = port
-    self.username = username
-    self.password = password
-
-  def statusListener (self, context, callback):
-    self.status_cb = (context, callback)
-
-  def schemaListener (self, context, callback):
-    self.schema_cb = (context, callback)
-
-  def configListener (self, context, callback):
-    self.config_cb = (context, callback)
-
-  def methodListener (self, context, callback):
-    self.method_cb = (context, callback)
-
-  def instrumentationListener (self, context, callback):
-    self.inst_cb = (context, callback)
-
-  def method (self, userSequence, objId, className,
-              methodName, args=None, packageName="qpid"):
-    codec = Codec (StringIO (), self.spec);
-    sequence = self.sequenceManager.reserve ((userSequence, className, methodName))
+  def method (self, channel, userSequence, objId, classId, methodName, args):
+    """ Invoke a method on an object """
+    codec = Codec (StringIO (), self.spec)
+    sequence = self.seqMgr.reserve ((userSequence, classId, methodName))
     self.setHeader (codec, ord ('M'))
     codec.encode_long     (sequence)    # Method sequence id
     codec.encode_longlong (objId)       # ID of object
-    #codec.encode_shortstr (self.rqname) # name of reply queue
 
     # Encode args according to schema
-    if (className,'M') not in self.metadata.schema:
-      self.sequenceManager.release (sequence)
-      raise ValueError ("Unknown class name: %s" % className)
+    if classId not in self.schema:
+      self.seqMgr.release (sequence)
+      raise ValueError ("Unknown class name: %s" % classId)
     
-    ms = self.metadata.schema[(className,'M')]
-    arglist = None
+    schemaClass = self.schema[classId]
+    ms          = schemaClass['M']
+    arglist     = None
     for mname in ms:
       (mdesc, margs) = ms[mname]
       if mname == methodName:
         arglist = margs
     if arglist == None:
-      self.sequenceManager.release (sequence)
+      self.seqMgr.release (sequence)
       raise ValueError ("Unknown method name: %s" % methodName)
 
     for arg in arglist:
@@ -427,65 +566,17 @@
         if arg[0] in args:
           value = args[arg[0]]
           if value == None:
-            self.sequenceManager.release (sequence)
+            self.seqMgr.release (sequence)
             raise ValueError ("Missing non-defaulted argument: %s" % arg[0])
-          self.metadata.encodeValue (codec, value, arg[1])
+          self.encodeValue (codec, value, arg[1])
 
+    packageName = classId[0]
+    className   = classId[1]
     msg = Content (codec.stream.getvalue ())
     msg["content_type"] = "application/octet-stream"
-    msg["routing_key"]  = "method." + packageName + "." + className + "." + methodName
+    msg["routing_key"]  = "agent.method." + packageName + "." + \
+        className + "." + methodName
     msg["reply_to"]     = self.spec.struct ("reply_to")
     msg["reply_to"]["exchange_name"] = "amq.direct"
-    msg["reply_to"]["routing_key"]   = self.rqname
-    self.channel.message_transfer (destination="qpid.management", content=msg)
-
-  def isConnected (self):
-    return connected
-
-  def start (self):
-    print "Connecting to broker %s:%d" % (self.host, self.port)
-
-    try:
-      self.client = Client (self.host, self.port, self.spec)
-      self.client.start ({"LOGIN": self.username, "PASSWORD": self.password})
-      self.channel = self.client.channel (1)
-      response = self.channel.session_open (detached_lifetime=300)
-      self.qname  = "mgmt-"  + base64.urlsafe_b64encode (response.session_id)
-      self.rqname = "reply-" + base64.urlsafe_b64encode (response.session_id)
-
-      self.channel.queue_declare (queue=self.qname,  exclusive=1, auto_delete=1)
-      self.channel.queue_declare (queue=self.rqname, exclusive=1, auto_delete=1)
-      
-      self.channel.queue_bind (exchange=ManagedBroker.mExchange, queue=self.qname,
-                               routing_key="mgmt.#")
-      self.channel.queue_bind (exchange=ManagedBroker.dExchange, queue=self.rqname,
-                               routing_key=self.rqname)
-
-      self.channel.message_subscribe (queue=self.qname,  destination="mdest")
-      self.channel.message_subscribe (queue=self.rqname, destination="rdest")
-
-      self.queue = self.client.queue ("mdest")
-      self.queue.listen (self.publish_cb)
-
-      self.channel.message_flow_mode (destination="mdest", mode=1)
-      self.channel.message_flow (destination="mdest", unit=0, value=0xFFFFFFFF)
-      self.channel.message_flow (destination="mdest", unit=1, value=0xFFFFFFFF)
-
-      self.rqueue = self.client.queue ("rdest")
-      self.rqueue.listen (self.reply_cb)
-
-      self.channel.message_flow_mode (destination="rdest", mode=1)
-      self.channel.message_flow (destination="rdest", unit=0, value=0xFFFFFFFF)
-      self.channel.message_flow (destination="rdest", unit=1, value=0xFFFFFFFF)
-
-      self.connected = 1
-
-    except socket.error, e:
-      print "Socket Error:", e[1]
-      self.lastConnectError = e
-      raise
-    except:
-      raise
-
-  def stop (self):
-    pass
+    msg["reply_to"]["routing_key"]   = channel.replyName
+    channel.send ("qpid.management", msg)

Modified: incubator/qpid/trunk/qpid/specs/management-schema.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/specs/management-schema.xml?rev=632087&r1=632086&r2=632087&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/specs/management-schema.xml (original)
+++ incubator/qpid/trunk/qpid/specs/management-schema.xml Thu Feb 28 10:55:21 2008
@@ -47,7 +47,11 @@
   <class name="system">
     <configElement name="sysId" index="y" type="sstr" access="RC"/>
 
-    <!-- RT config/instrumentation TBD -->
+    <instElement name="osName"   type="sstr" desc="Operating System Name"/>
+    <instElement name="nodeName" type="sstr" desc="Node Name"/>
+    <instElement name="release"  type="sstr"/>
+    <instElement name="version"  type="sstr"/>
+    <instElement name="machine"  type="sstr"/>
 
   </class>
 
@@ -57,20 +61,18 @@
   ===============================================================
   -->
   <class name="broker">
-    <configElement name="systemRef"            type="objId"  access="RC" index="y" desc="System ID"/>
+    <configElement name="systemRef"            type="objId"  access="RC" index="y" desc="System ID" parentRef="y"/>
     <configElement name="port"                 type="uint16" access="RC" index="y" desc="TCP Port for AMQP Service"/>
     <configElement name="workerThreads"        type="uint16" access="RO" desc="Thread pool size"/>
     <configElement name="maxConns"             type="uint16" access="RO" desc="Maximum allowed connections"/>
     <configElement name="connBacklog"          type="uint16" access="RO" desc="Connection backlog limit for listening socket"/>
     <configElement name="stagingThreshold"     type="uint32" access="RO" desc="Broker stages messages over this size to disk"/>
-    <configElement name="storeLib"             type="sstr"   access="RO" desc="Name of persistent storage library"/>
-    <configElement name="asyncStore"           type="bool"   access="RO" desc="Use async persistent store"/>
     <configElement name="mgmtPubInterval"      type="uint16" access="RW" unit="second" min="1" desc="Interval for management broadcasts"/>
-    <configElement name="initialDiskPageSize"  type="uint32" access="RO" desc="Number of disk pages allocated for storage"/>
-    <configElement name="initialPagesPerQueue" type="uint32" access="RO" desc="Number of disk pages allocated per queue"/>
     <configElement name="clusterName"          type="sstr"   access="RO"
-                   desc="Name of cluster this server is a member of, zero-length for standalone server"/>
+                   desc="Name of cluster this server is a member of"/>
     <configElement name="version"              type="sstr"   access="RO" desc="Running software version"/>
+    <configElement name="dataDirEnabled"       type="bool"   access="RO" desc="Persistent configuration storage enabled"/>
+    <configElement name="dataDir"              type="sstr"   access="RO" desc="Persistent configuration storage location"/>
 
     <method name="joinCluster">
       <arg name="clusterName" dir="I" type="sstr"/>
@@ -137,9 +139,7 @@
     <instElement name="consumers"           type="hilo32"  unit="consumer"    desc="Current consumers on queue"/>
     <instElement name="bindings"            type="hilo32"  unit="binding"     desc="Current bindings"/>
     <instElement name="unackedMessages"     type="hilo32"  unit="message"     desc="Messages consumed but not yet acked"/>
-    <instElement name="messageLatencyMin"   type="uint64"  unit="nanosecond"  desc="Minimum broker latency through this queue"/>
-    <instElement name="messageLatencyMax"   type="uint64"  unit="nanosecond"  desc="Maximum broker latency through this queue"/>
-    <instElement name="messageLatencyAvg"   type="uint64"  unit="nanosecond"  desc="Average broker latency through this queue"/>
+    <instElement name="messageLatency"      type="mmaTime" unit="nanosecond"  desc="Broker latency through this queue"/>
 
     <method name="purge" desc="Discard all messages on queue"/>
   </class>
@@ -203,6 +203,9 @@
   ===============================================================
   -->
   <class name="link">
+
+    This class represents an inter-broker connection.
+
     <configElement name="vhostRef" type="objId"  access="RC" index="y" parentRef="y"/>
     <configElement name="address"  type="sstr"   access="RC" index="y"/>
 

Modified: incubator/qpid/trunk/qpid/specs/management-types.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/specs/management-types.xml?rev=632087&r1=632086&r2=632087&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/specs/management-types.xml (original)
+++ incubator/qpid/trunk/qpid/specs/management-types.xml Thu Feb 28 10:55:21 2008
@@ -29,6 +29,8 @@
 <type name="lstr"      base="LSTR"      cpp="std::string" encode="@.putLongString (#)"  decode="@.getLongString (#)"   accessor="direct" init='""'/>
 <type name="absTime"   base="ABSTIME"   cpp="uint64_t"    encode="@.putLongLong (#)"    decode="# = @.getLongLong ()"  accessor="direct" init="0"/>
 <type name="deltaTime" base="DELTATIME" cpp="uint64_t"    encode="@.putLongLong (#)"    decode="# = @.getLongLong ()"  accessor="direct" init="0"/>
+<type name="float"     base="FLOAT"     cpp="float"       encode="@.putFloat (#)"       decode="# = @.getFloat ()"     accessor="direct" init="0."/>
+<type name="double"    base="DOUBLE"    cpp="double"      encode="@.putDouble (#)"      decode="# = @.getDouble ()"    accessor="direct" init="0."/>
 
 <type name="hilo8"   base="U8"   cpp="uint8_t"  encode="@.putOctet (#)"    decode="# = @.getOctet ()"    style="wm" accessor="counter" init="0"/>
 <type name="hilo16"  base="U16"  cpp="uint16_t" encode="@.putShort (#)"    decode="# = @.getShort ()"    style="wm" accessor="counter" init="0"/>
@@ -41,8 +43,9 @@
 <type name="count64" base="U64"  cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" accessor="counter" init="0"/>
 
 <!-- Min/Max/Average statistics -->
-<type name="mma32" base="U32"  cpp="uint32_t" encode="@.putLong (#)"     decode="# = @.getLong ()"     style="mma" accessor="direct" init="0"/>
-<type name="mma64" base="U64"  cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" style="mma" accessor="direct" init="0"/>
+<type name="mma32"   base="U32"       cpp="uint32_t" encode="@.putLong (#)"     decode="# = @.getLong ()"     style="mma" accessor="direct" init="0"/>
+<type name="mma64"   base="U64"       cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" style="mma" accessor="direct" init="0"/>
+<type name="mmaTime" base="DELTATIME" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" style="mma" accessor="direct" init="0"/>
 
 <!-- Some Proposed Syntax for User-Defined Types:
 <enum name="enumeratedType" base="U8">