You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cc...@apache.org on 2008/02/28 19:55:25 UTC
svn commit: r632087 [2/2] - in /incubator/qpid/trunk/qpid:
cpp/managementgen/ cpp/managementgen/templates/ cpp/src/
cpp/src/qpid/broker/ cpp/src/qpid/framing/ cpp/src/qpid/management/
python/mgmt-cli/ python/qpid/ specs/
Modified: incubator/qpid/trunk/qpid/python/qpid/management.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/management.py?rev=632087&r1=632086&r2=632087&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/management.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/management.py Thu Feb 28 10:55:21 2008
@@ -35,12 +35,14 @@
class SequenceManager:
+ """ Manage sequence numbers for asynchronous method calls """
def __init__ (self):
self.lock = Lock ()
self.sequence = 0
self.pending = {}
def reserve (self, data):
+ """ Reserve a unique sequence number """
self.lock.acquire ()
result = self.sequence
self.sequence = self.sequence + 1
@@ -49,6 +51,7 @@
return result
def release (self, seq):
+ """ Release a reserved sequence number """
data = None
self.lock.acquire ()
if seq in self.pending:
@@ -57,12 +60,172 @@
self.lock.release ()
return data
-class ManagementMetadata:
- """One instance of this class is created for each ManagedBroker. It
- is used to store metadata from the broker which is needed for the
- proper interpretation of received management content."""
+
+class managementChannel:
+ """ This class represents a connection to an AMQP broker. """
+
+ def __init__ (self, ch, topicCb, replyCb, cbContext=None):
+ """ Given a channel on an established AMQP broker connection, this method
+ opens a session and performs all of the declarations and bindings needed
+ to participate in the management protocol. """
+ response = ch.session_open (detached_lifetime=300)
+ self.topicName = "mgmt-" + base64.urlsafe_b64encode (response.session_id)
+ self.replyName = "reply-" + base64.urlsafe_b64encode (response.session_id)
+ self.qpidChannel = ch
+ self.tcb = topicCb
+ self.rcb = replyCb
+ self.context = cbContext
+
+ ch.queue_declare (queue=self.topicName, exclusive=1, auto_delete=1)
+ ch.queue_declare (queue=self.replyName, exclusive=1, auto_delete=1)
+
+ ch.queue_bind (exchange="qpid.management",
+ queue=self.topicName, routing_key="mgmt.#")
+ ch.queue_bind (exchange="amq.direct",
+ queue=self.replyName, routing_key=self.replyName)
+ ch.message_subscribe (queue=self.topicName, destination="tdest")
+ ch.message_subscribe (queue=self.replyName, destination="rdest")
+
+ ch.client.queue ("tdest").listen (self.topicCb)
+ ch.client.queue ("rdest").listen (self.replyCb)
+
+ ch.message_flow_mode (destination="tdest", mode=1)
+ ch.message_flow (destination="tdest", unit=0, value=0xFFFFFFFF)
+ ch.message_flow (destination="tdest", unit=1, value=0xFFFFFFFF)
+
+ ch.message_flow_mode (destination="rdest", mode=1)
+ ch.message_flow (destination="rdest", unit=0, value=0xFFFFFFFF)
+ ch.message_flow (destination="rdest", unit=1, value=0xFFFFFFFF)
+
+ def topicCb (self, msg):
+ """ Receive messages via the topic queue on this channel. """
+ self.tcb (self, msg)
+
+ def replyCb (self, msg):
+ """ Receive messages via the reply queue on this channel. """
+ self.rcb (self, msg)
+
+ def send (self, exchange, msg):
+ self.qpidChannel.message_transfer (destination=exchange, content=msg)
+
+
+class managementClient:
+ """ This class provides an API for access to management data on the AMQP
+ network. It implements the management protocol and manages the management
+ schemas as advertised by the various management agents in the network. """
+
+ #========================================================
+ # User API - interacts with the class's user
+ #========================================================
+ def __init__ (self, amqpSpec, ctrlCb, configCb, instCb, methodCb=None):
+ self.spec = amqpSpec
+ self.ctrlCb = ctrlCb
+ self.configCb = configCb
+ self.instCb = instCb
+ self.methodCb = methodCb
+ self.schemaCb = None
+ self.eventCb = None
+ self.channels = []
+ self.seqMgr = SequenceManager ()
+ self.schema = {}
+ self.packages = {}
+
+ def schemaListener (self, schemaCb):
+ """ Optionally register a callback to receive details of the schema of
+ managed objects in the network. """
+ self.schemaCb = schemaCb
+
+ def eventListener (self, eventCb):
+ """ Optionally register a callback to receive events from managed objects
+ in the network. """
+ self.eventCb = eventCb
+
+ def addChannel (self, channel):
+ """ Register a new channel. """
+ self.channels.append (channel)
+ codec = Codec (StringIO (), self.spec)
+ self.setHeader (codec, ord ('H'))
+ msg = Content (codec.stream.getvalue ())
+ msg["content_type"] = "application/octet-stream"
+ msg["routing_key"] = "agent"
+ msg["reply_to"] = self.spec.struct ("reply_to")
+ msg["reply_to"]["exchange_name"] = "amq.direct"
+ msg["reply_to"]["routing_key"] = channel.replyName
+ channel.send ("qpid.management", msg)
+
+ def removeChannel (self, channel):
+ """ Remove a previously added channel from management. """
+ self.channels.remove (channel)
+
+ def callMethod (self, channel, userSequence, objId, className, methodName, args=None):
+ """ Invoke a method on a managed object. """
+ self.method (channel, userSequence, objId, className, methodName, args)
+
+ #========================================================
+ # Channel API - interacts with registered channel objects
+ #========================================================
+ def topicCb (self, ch, msg):
+ """ Receive messages via the topic queue of a particular channel. """
+ codec = Codec (StringIO (msg.content.body), self.spec)
+ hdr = self.checkHeader (codec)
+ if hdr == None:
+ raise ValueError ("outer header invalid");
+ self.parse (ch, codec, hdr[0], hdr[1])
+ msg.complete ()
+
+ def replyCb (self, ch, msg):
+ """ Receive messages via the reply queue of a particular channel. """
+ codec = Codec (StringIO (msg.content.body), self.spec)
+ hdr = self.checkHeader (codec)
+ if hdr == None:
+ msg.complete ()
+ return
+
+ if hdr[0] == 'm':
+ self.handleMethodReply (ch, codec)
+ elif hdr[0] == 'I':
+ self.handleInit (ch, codec)
+ elif hdr[0] == 'p':
+ self.handlePackageInd (ch, codec)
+ elif hdr[0] == 'q':
+ self.handleClassInd (ch, codec)
+ else:
+ self.parse (ch, codec, hdr[0], hdr[1])
+ msg.complete ()
+
+ #========================================================
+ # Internal Functions
+ #========================================================
+ def setHeader (self, codec, opcode, cls = 0):
+ """ Compose the header of a management message. """
+ codec.encode_octet (ord ('A'))
+ codec.encode_octet (ord ('M'))
+ codec.encode_octet (ord ('0'))
+ codec.encode_octet (ord ('1'))
+ codec.encode_octet (opcode)
+ codec.encode_octet (cls)
+
+ def checkHeader (self, codec):
+ """ Check the header of a management message and extract the opcode and
+ class. """
+ octet = chr (codec.decode_octet ())
+ if octet != 'A':
+ return None
+ octet = chr (codec.decode_octet ())
+ if octet != 'M':
+ return None
+ octet = chr (codec.decode_octet ())
+ if octet != '0':
+ return None
+ octet = chr (codec.decode_octet ())
+ if octet != '1':
+ return None
+ opcode = chr (codec.decode_octet ())
+ cls = chr (codec.decode_octet ())
+ return (opcode, cls)
def encodeValue (self, codec, value, typecode):
+ """ Encode, into the codec, a value based on its typecode. """
if typecode == 1:
codec.encode_octet (int (value))
elif typecode == 2:
@@ -85,10 +248,15 @@
codec.encode_longlong (long (value))
elif typecode == 11: # BOOL
codec.encode_octet (int (value))
+ elif typecode == 12: # FLOAT
+ codec.encode_float (float (value))
+ elif typecode == 13: # DOUBLE
+ codec.encode_double (double (value))
else:
raise ValueError ("Invalid type code: %d" % typecode)
def decodeValue (self, codec, typecode):
+ """ Decode, from the codec, a value based on its typecode. """
if typecode == 1:
data = codec.decode_octet ()
elif typecode == 2:
@@ -111,17 +279,119 @@
data = codec.decode_longlong ()
elif typecode == 11: # BOOL
data = codec.decode_octet ()
+ elif typecode == 12: # FLOAT
+ data = codec.decode_float ()
+ elif typecode == 13: # DOUBLE
+ data = codec.decode_double ()
else:
raise ValueError ("Invalid type code: %d" % typecode)
return data
+
+ def handleMethodReply (self, ch, codec):
+ sequence = codec.decode_long ()
+ status = codec.decode_long ()
+ sText = codec.decode_shortstr ()
+
+ data = self.seqMgr.release (sequence)
+ if data == None:
+ return
+
+ (userSequence, classId, methodName) = data
+ args = {}
+
+ if status == 0:
+ schemaClass = self.schema[classId]
+ ms = schemaClass['M']
+ arglist = None
+ for mname in ms:
+ (mdesc, margs) = ms[mname]
+ if mname == methodName:
+ arglist = margs
+ if arglist == None:
+ return
+
+ for arg in arglist:
+ if arg[2].find("O") != -1:
+ args[arg[0]] = self.decodeValue (codec, arg[1])
+
+ if self.methodCb != None:
+ self.methodCb (ch.context, userSequence, status, sText, args)
+
+ def handleInit (self, ch, codec):
+ len = codec.decode_short ()
+ data = codec.decode_raw (len)
+ if self.ctrlCb != None:
+ self.ctrlCb (ch.context, len, data)
+
+ # Send a package request
+ sendCodec = Codec (StringIO (), self.spec)
+ self.setHeader (sendCodec, ord ('P'))
+ smsg = Content (sendCodec.stream.getvalue ())
+ smsg["content_type"] = "application/octet-stream"
+ smsg["routing_key"] = "agent"
+ smsg["reply_to"] = self.spec.struct ("reply_to")
+ smsg["reply_to"]["exchange_name"] = "amq.direct"
+ smsg["reply_to"]["routing_key"] = ch.replyName
+ ch.send ("qpid.management", smsg)
- def parseSchema (self, cls, codec):
+ def handlePackageInd (self, ch, codec):
+ pname = codec.decode_shortstr ()
+ if pname not in self.packages:
+ self.packages[pname] = {}
+
+ # Send a class request
+ sendCodec = Codec (StringIO (), self.spec)
+ self.setHeader (sendCodec, ord ('Q'))
+ sendCodec.encode_shortstr (pname)
+ smsg = Content (sendCodec.stream.getvalue ())
+ smsg["content_type"] = "application/octet-stream"
+ smsg["routing_key"] = "agent"
+ smsg["reply_to"] = self.spec.struct ("reply_to")
+ smsg["reply_to"]["exchange_name"] = "amq.direct"
+ smsg["reply_to"]["routing_key"] = ch.replyName
+ ch.send ("qpid.management", smsg)
+
+ def handleClassInd (self, ch, codec):
+ pname = codec.decode_shortstr ()
+ cname = codec.decode_shortstr ()
+ hash = codec.decode_bin128 ()
+ if pname not in self.packages:
+ return
+
+ if (cname, hash) not in self.packages[pname]:
+ # Send a schema request
+ sendCodec = Codec (StringIO (), self.spec)
+ self.setHeader (sendCodec, ord ('S'))
+ sendCodec.encode_shortstr (pname)
+ sendCodec.encode_shortstr (cname)
+ sendCodec.encode_bin128 (hash)
+ smsg = Content (sendCodec.stream.getvalue ())
+ smsg["content_type"] = "application/octet-stream"
+ smsg["routing_key"] = "agent"
+ smsg["reply_to"] = self.spec.struct ("reply_to")
+ smsg["reply_to"]["exchange_name"] = "amq.direct"
+ smsg["reply_to"]["routing_key"] = ch.replyName
+ ch.send ("qpid.management", smsg)
+
+ def parseSchema (self, ch, cls, codec):
+ """ Parse a received schema-description message. """
+ packageName = codec.decode_shortstr ()
className = codec.decode_shortstr ()
+ hash = codec.decode_bin128 ()
configCount = codec.decode_short ()
instCount = codec.decode_short ()
methodCount = codec.decode_short ()
eventCount = codec.decode_short ()
+ if packageName not in self.packages:
+ return
+ if (className, hash) in self.packages[packageName]:
+ return
+
+ classKey = (packageName, className, hash)
+ if classKey in self.schema:
+ return
+
configs = []
insts = []
methods = {}
@@ -213,25 +483,29 @@
args.append (arg)
methods[mname] = (mdesc, args)
-
- self.schema[(className,'C')] = configs
- self.schema[(className,'I')] = insts
- self.schema[(className,'M')] = methods
- self.schema[(className,'E')] = events
-
- if self.broker.schema_cb != None:
- self.broker.schema_cb[1] (self.broker.schema_cb[0], className,
- configs, insts, methods, events)
-
- def parseContent (self, cls, codec):
- if cls == 'C' and self.broker.config_cb == None:
+ schemaClass = {}
+ schemaClass['C'] = configs
+ schemaClass['I'] = insts
+ schemaClass['M'] = methods
+ schemaClass['E'] = events
+ self.schema[classKey] = schemaClass
+
+ if self.schemaCb != None:
+ self.schemaCb (ch.context, classKey, configs, insts, methods, events)
+
+ def parseContent (self, ch, cls, codec):
+ """ Parse a received content message. """
+ if cls == 'C' and self.configCb == None:
return
- if cls == 'I' and self.broker.inst_cb == None:
+ if cls == 'I' and self.instCb == None:
return
- className = codec.decode_shortstr ()
+ packageName = codec.decode_shortstr ()
+ className = codec.decode_shortstr ()
+ hash = codec.decode_bin128 ()
+ classKey = (packageName, className, hash)
- if (className,cls) not in self.schema:
+ if classKey not in self.schema:
return
row = []
@@ -241,184 +515,49 @@
timestamps.append (codec.decode_longlong ()) # Create Time
timestamps.append (codec.decode_longlong ()) # Delete Time
- for element in self.schema[(className,cls)][:]:
+ schemaClass = self.schema[classKey]
+ for element in schemaClass[cls][:]:
tc = element[1]
name = element[0]
data = self.decodeValue (codec, tc)
row.append ((name, data))
- if cls == 'C':
- self.broker.config_cb[1] (self.broker.config_cb[0], className, row, timestamps)
+ if cls == 'C':
+ self.configCb (ch.context, classKey, row, timestamps)
elif cls == 'I':
- self.broker.inst_cb[1] (self.broker.inst_cb[0], className, row, timestamps)
-
- def parse (self, codec, opcode, cls):
- if opcode == 'S':
- self.parseSchema (cls, codec)
+ self.instCb (ch.context, classKey, row, timestamps)
+ def parse (self, ch, codec, opcode, cls):
+ """ Parse a message received from the topic queue. """
+ if opcode == 's':
+ self.parseSchema (ch, cls, codec)
elif opcode == 'C':
- self.parseContent (cls, codec)
-
+ self.parseContent (ch, cls, codec)
else:
raise ValueError ("Unknown opcode: %c" % opcode);
- def __init__ (self, broker):
- self.broker = broker
- self.schema = {}
-
-
-class ManagedBroker:
- """An object of this class represents a connection (over AMQP) to a
- single managed broker."""
-
- mExchange = "qpid.management"
- dExchange = "amq.direct"
-
- def setHeader (self, codec, opcode, cls = 0):
- codec.encode_octet (ord ('A'))
- codec.encode_octet (ord ('M'))
- codec.encode_octet (ord ('0'))
- codec.encode_octet (ord ('1'))
- codec.encode_octet (opcode)
- codec.encode_octet (cls)
-
- def checkHeader (self, codec):
- octet = chr (codec.decode_octet ())
- if octet != 'A':
- return None
- octet = chr (codec.decode_octet ())
- if octet != 'M':
- return None
- octet = chr (codec.decode_octet ())
- if octet != '0':
- return None
- octet = chr (codec.decode_octet ())
- if octet != '1':
- return None
- opcode = chr (codec.decode_octet ())
- cls = chr (codec.decode_octet ())
- return (opcode, cls)
-
- def publish_cb (self, msg):
- codec = Codec (StringIO (msg.content.body), self.spec)
-
- hdr = self.checkHeader (codec)
- if hdr == None:
- raise ValueError ("outer header invalid");
-
- self.metadata.parse (codec, hdr[0], hdr[1])
- msg.complete ()
-
- def reply_cb (self, msg):
- codec = Codec (StringIO (msg.content.body), self.spec)
- hdr = self.checkHeader (codec)
- if hdr == None:
- msg.complete ()
- return
- if hdr[0] != 'R':
- msg.complete ()
- return
-
- sequence = codec.decode_long ()
- status = codec.decode_long ()
- sText = codec.decode_shortstr ()
-
- data = self.sequenceManager.release (sequence)
- if data == None:
- msg.complete ()
- return
-
- (userSequence, className, methodName) = data
- args = {}
-
- if status == 0:
- ms = self.metadata.schema[(className,'M')]
- arglist = None
- for mname in ms:
- (mdesc, margs) = ms[mname]
- if mname == methodName:
- arglist = margs
- if arglist == None:
- msg.complete ()
- return
-
- for arg in arglist:
- if arg[2].find("O") != -1:
- args[arg[0]] = self.metadata.decodeValue (codec, arg[1])
-
- if self.method_cb != None:
- self.method_cb[1] (self.method_cb[0], userSequence, status, sText, args)
-
- msg.complete ()
-
- def __init__ (self,
- host = "localhost",
- port = 5672,
- username = "guest",
- password = "guest",
- specfile = "/usr/share/amqp/amqp.0-10-preview.xml"):
-
- self.spec = qpid.spec.load (specfile)
- self.client = None
- self.channel = None
- self.queue = None
- self.rqueue = None
- self.qname = None
- self.rqname = None
- self.metadata = ManagementMetadata (self)
- self.sequenceManager = SequenceManager ()
- self.connected = 0
- self.lastConnectError = None
-
- # Initialize the callback records
- self.status_cb = None
- self.schema_cb = None
- self.config_cb = None
- self.inst_cb = None
- self.method_cb = None
-
- self.host = host
- self.port = port
- self.username = username
- self.password = password
-
- def statusListener (self, context, callback):
- self.status_cb = (context, callback)
-
- def schemaListener (self, context, callback):
- self.schema_cb = (context, callback)
-
- def configListener (self, context, callback):
- self.config_cb = (context, callback)
-
- def methodListener (self, context, callback):
- self.method_cb = (context, callback)
-
- def instrumentationListener (self, context, callback):
- self.inst_cb = (context, callback)
-
- def method (self, userSequence, objId, className,
- methodName, args=None, packageName="qpid"):
- codec = Codec (StringIO (), self.spec);
- sequence = self.sequenceManager.reserve ((userSequence, className, methodName))
+ def method (self, channel, userSequence, objId, classId, methodName, args):
+ """ Invoke a method on an object """
+ codec = Codec (StringIO (), self.spec)
+ sequence = self.seqMgr.reserve ((userSequence, classId, methodName))
self.setHeader (codec, ord ('M'))
codec.encode_long (sequence) # Method sequence id
codec.encode_longlong (objId) # ID of object
- #codec.encode_shortstr (self.rqname) # name of reply queue
# Encode args according to schema
- if (className,'M') not in self.metadata.schema:
- self.sequenceManager.release (sequence)
- raise ValueError ("Unknown class name: %s" % className)
+ if classId not in self.schema:
+ self.seqMgr.release (sequence)
+ raise ValueError ("Unknown class name: %s" % classId)
- ms = self.metadata.schema[(className,'M')]
- arglist = None
+ schemaClass = self.schema[classId]
+ ms = schemaClass['M']
+ arglist = None
for mname in ms:
(mdesc, margs) = ms[mname]
if mname == methodName:
arglist = margs
if arglist == None:
- self.sequenceManager.release (sequence)
+ self.seqMgr.release (sequence)
raise ValueError ("Unknown method name: %s" % methodName)
for arg in arglist:
@@ -427,65 +566,17 @@
if arg[0] in args:
value = args[arg[0]]
if value == None:
- self.sequenceManager.release (sequence)
+ self.seqMgr.release (sequence)
raise ValueError ("Missing non-defaulted argument: %s" % arg[0])
- self.metadata.encodeValue (codec, value, arg[1])
+ self.encodeValue (codec, value, arg[1])
+ packageName = classId[0]
+ className = classId[1]
msg = Content (codec.stream.getvalue ())
msg["content_type"] = "application/octet-stream"
- msg["routing_key"] = "method." + packageName + "." + className + "." + methodName
+ msg["routing_key"] = "agent.method." + packageName + "." + \
+ className + "." + methodName
msg["reply_to"] = self.spec.struct ("reply_to")
msg["reply_to"]["exchange_name"] = "amq.direct"
- msg["reply_to"]["routing_key"] = self.rqname
- self.channel.message_transfer (destination="qpid.management", content=msg)
-
- def isConnected (self):
- return connected
-
- def start (self):
- print "Connecting to broker %s:%d" % (self.host, self.port)
-
- try:
- self.client = Client (self.host, self.port, self.spec)
- self.client.start ({"LOGIN": self.username, "PASSWORD": self.password})
- self.channel = self.client.channel (1)
- response = self.channel.session_open (detached_lifetime=300)
- self.qname = "mgmt-" + base64.urlsafe_b64encode (response.session_id)
- self.rqname = "reply-" + base64.urlsafe_b64encode (response.session_id)
-
- self.channel.queue_declare (queue=self.qname, exclusive=1, auto_delete=1)
- self.channel.queue_declare (queue=self.rqname, exclusive=1, auto_delete=1)
-
- self.channel.queue_bind (exchange=ManagedBroker.mExchange, queue=self.qname,
- routing_key="mgmt.#")
- self.channel.queue_bind (exchange=ManagedBroker.dExchange, queue=self.rqname,
- routing_key=self.rqname)
-
- self.channel.message_subscribe (queue=self.qname, destination="mdest")
- self.channel.message_subscribe (queue=self.rqname, destination="rdest")
-
- self.queue = self.client.queue ("mdest")
- self.queue.listen (self.publish_cb)
-
- self.channel.message_flow_mode (destination="mdest", mode=1)
- self.channel.message_flow (destination="mdest", unit=0, value=0xFFFFFFFF)
- self.channel.message_flow (destination="mdest", unit=1, value=0xFFFFFFFF)
-
- self.rqueue = self.client.queue ("rdest")
- self.rqueue.listen (self.reply_cb)
-
- self.channel.message_flow_mode (destination="rdest", mode=1)
- self.channel.message_flow (destination="rdest", unit=0, value=0xFFFFFFFF)
- self.channel.message_flow (destination="rdest", unit=1, value=0xFFFFFFFF)
-
- self.connected = 1
-
- except socket.error, e:
- print "Socket Error:", e[1]
- self.lastConnectError = e
- raise
- except:
- raise
-
- def stop (self):
- pass
+ msg["reply_to"]["routing_key"] = channel.replyName
+ channel.send ("qpid.management", msg)
Modified: incubator/qpid/trunk/qpid/specs/management-schema.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/specs/management-schema.xml?rev=632087&r1=632086&r2=632087&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/specs/management-schema.xml (original)
+++ incubator/qpid/trunk/qpid/specs/management-schema.xml Thu Feb 28 10:55:21 2008
@@ -47,7 +47,11 @@
<class name="system">
<configElement name="sysId" index="y" type="sstr" access="RC"/>
- <!-- RT config/instrumentation TBD -->
+ <instElement name="osName" type="sstr" desc="Operating System Name"/>
+ <instElement name="nodeName" type="sstr" desc="Node Name"/>
+ <instElement name="release" type="sstr"/>
+ <instElement name="version" type="sstr"/>
+ <instElement name="machine" type="sstr"/>
</class>
@@ -57,20 +61,18 @@
===============================================================
-->
<class name="broker">
- <configElement name="systemRef" type="objId" access="RC" index="y" desc="System ID"/>
+ <configElement name="systemRef" type="objId" access="RC" index="y" desc="System ID" parentRef="y"/>
<configElement name="port" type="uint16" access="RC" index="y" desc="TCP Port for AMQP Service"/>
<configElement name="workerThreads" type="uint16" access="RO" desc="Thread pool size"/>
<configElement name="maxConns" type="uint16" access="RO" desc="Maximum allowed connections"/>
<configElement name="connBacklog" type="uint16" access="RO" desc="Connection backlog limit for listening socket"/>
<configElement name="stagingThreshold" type="uint32" access="RO" desc="Broker stages messages over this size to disk"/>
- <configElement name="storeLib" type="sstr" access="RO" desc="Name of persistent storage library"/>
- <configElement name="asyncStore" type="bool" access="RO" desc="Use async persistent store"/>
<configElement name="mgmtPubInterval" type="uint16" access="RW" unit="second" min="1" desc="Interval for management broadcasts"/>
- <configElement name="initialDiskPageSize" type="uint32" access="RO" desc="Number of disk pages allocated for storage"/>
- <configElement name="initialPagesPerQueue" type="uint32" access="RO" desc="Number of disk pages allocated per queue"/>
<configElement name="clusterName" type="sstr" access="RO"
- desc="Name of cluster this server is a member of, zero-length for standalone server"/>
+ desc="Name of cluster this server is a member of"/>
<configElement name="version" type="sstr" access="RO" desc="Running software version"/>
+ <configElement name="dataDirEnabled" type="bool" access="RO" desc="Persistent configuration storage enabled"/>
+ <configElement name="dataDir" type="sstr" access="RO" desc="Persistent configuration storage location"/>
<method name="joinCluster">
<arg name="clusterName" dir="I" type="sstr"/>
@@ -137,9 +139,7 @@
<instElement name="consumers" type="hilo32" unit="consumer" desc="Current consumers on queue"/>
<instElement name="bindings" type="hilo32" unit="binding" desc="Current bindings"/>
<instElement name="unackedMessages" type="hilo32" unit="message" desc="Messages consumed but not yet acked"/>
- <instElement name="messageLatencyMin" type="uint64" unit="nanosecond" desc="Minimum broker latency through this queue"/>
- <instElement name="messageLatencyMax" type="uint64" unit="nanosecond" desc="Maximum broker latency through this queue"/>
- <instElement name="messageLatencyAvg" type="uint64" unit="nanosecond" desc="Average broker latency through this queue"/>
+ <instElement name="messageLatency" type="mmaTime" unit="nanosecond" desc="Broker latency through this queue"/>
<method name="purge" desc="Discard all messages on queue"/>
</class>
@@ -203,6 +203,9 @@
===============================================================
-->
<class name="link">
+
+ This class represents an inter-broker connection.
+
<configElement name="vhostRef" type="objId" access="RC" index="y" parentRef="y"/>
<configElement name="address" type="sstr" access="RC" index="y"/>
Modified: incubator/qpid/trunk/qpid/specs/management-types.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/specs/management-types.xml?rev=632087&r1=632086&r2=632087&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/specs/management-types.xml (original)
+++ incubator/qpid/trunk/qpid/specs/management-types.xml Thu Feb 28 10:55:21 2008
@@ -29,6 +29,8 @@
<type name="lstr" base="LSTR" cpp="std::string" encode="@.putLongString (#)" decode="@.getLongString (#)" accessor="direct" init='""'/>
<type name="absTime" base="ABSTIME" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" accessor="direct" init="0"/>
<type name="deltaTime" base="DELTATIME" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" accessor="direct" init="0"/>
+<type name="float" base="FLOAT" cpp="float" encode="@.putFloat (#)" decode="# = @.getFloat ()" accessor="direct" init="0."/>
+<type name="double" base="DOUBLE" cpp="double" encode="@.putDouble (#)" decode="# = @.getDouble ()" accessor="direct" init="0."/>
<type name="hilo8" base="U8" cpp="uint8_t" encode="@.putOctet (#)" decode="# = @.getOctet ()" style="wm" accessor="counter" init="0"/>
<type name="hilo16" base="U16" cpp="uint16_t" encode="@.putShort (#)" decode="# = @.getShort ()" style="wm" accessor="counter" init="0"/>
@@ -41,8 +43,9 @@
<type name="count64" base="U64" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" accessor="counter" init="0"/>
<!-- Min/Max/Average statistics -->
-<type name="mma32" base="U32" cpp="uint32_t" encode="@.putLong (#)" decode="# = @.getLong ()" style="mma" accessor="direct" init="0"/>
-<type name="mma64" base="U64" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" style="mma" accessor="direct" init="0"/>
+<type name="mma32" base="U32" cpp="uint32_t" encode="@.putLong (#)" decode="# = @.getLong ()" style="mma" accessor="direct" init="0"/>
+<type name="mma64" base="U64" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" style="mma" accessor="direct" init="0"/>
+<type name="mmaTime" base="DELTATIME" cpp="uint64_t" encode="@.putLongLong (#)" decode="# = @.getLongLong ()" style="mma" accessor="direct" init="0"/>
<!-- Some Proposed Syntax for User-Defined Types:
<enum name="enumeratedType" base="U8">