You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2009/06/11 17:54:39 UTC

svn commit: r783818 [3/3] - in /qpid/trunk/qpid: cpp/bindings/qmf/ruby/ cpp/examples/qmf-agent/ cpp/managementgen/qmfgen/ cpp/managementgen/qmfgen/templates/ cpp/src/qmf/ cpp/src/qpid/agent/ cpp/src/qpid/console/ cpp/src/qpid/framing/ cpp/src/qpid/mana...

Modified: qpid/trunk/qpid/python/qmf/console.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qmf/console.py?rev=783818&r1=783817&r2=783818&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qmf/console.py (original)
+++ qpid/trunk/qpid/python/qmf/console.py Thu Jun 11 15:54:37 2009
@@ -25,10 +25,13 @@
 import struct
 import socket
 import re
+from qpid.datatypes  import UUID
+from qpid.datatypes  import timestamp
+from qpid.datatypes  import datetime
 from qpid.peer       import Closed
 from qpid.session    import SessionDetached
-from qpid.connection import Connection, ConnectionFailed
-from qpid.datatypes  import Message, RangedSet
+from qpid.connection import Connection, ConnectionFailed, Timeout
+from qpid.datatypes  import Message, RangedSet, UUID
 from qpid.util       import connect, ssl, URL
 from qpid.codec010   import StringCodec as Codec
 from threading       import Lock, Condition, Thread
@@ -107,6 +110,289 @@
   def match(self, host, port):
     return socket.gethostbyname(self.host) == socket.gethostbyname(host) and self.port == port
 
+class Object(object):
+  """ This class defines a 'proxy' object representing a real managed object on an agent.
+      Actions taken on this proxy are remotely affected on the real managed object.
+  """
+  def __init__(self, session, broker, schema, codec, prop, stat, managed=True, kwargs={}):
+    self._session = session
+    self._broker  = broker
+    self._schema  = schema
+    self._managed = managed
+    if self._managed:
+      self._currentTime = codec.read_uint64()
+      self._createTime  = codec.read_uint64()
+      self._deleteTime  = codec.read_uint64()
+      self._objectId    = ObjectId(codec)
+    else:
+      self._currentTime = None
+      self._createTime  = None
+      self._deleteTime  = None
+      self._objectId    = None
+    self._properties  = []
+    self._statistics  = []
+    if codec:
+      if prop:
+        notPresent = self._parsePresenceMasks(codec, schema)
+        for property in schema.getProperties():
+          if property.name in notPresent:
+            self._properties.append((property, None))
+          else:
+            self._properties.append((property, self._session._decodeValue(codec, property.type, broker)))
+      if stat:
+        for statistic in schema.getStatistics():
+          self._statistics.append((statistic, self._session._decodeValue(codec, statistic.type, broker)))
+    else:
+      for property in schema.getProperties():
+        if property.optional:
+          self._properties.append((property, None))
+        else:
+          self._properties.append((property, self._session._defaultValue(property, broker, kwargs)))
+      for statistic in schema.getStatistics():
+          self._statistics.append((statistic, self._session._defaultValue(statistic, broker, kwargs)))
+
+  def getBroker(self):
+    """ Return the broker from which this object was sent """
+    return self._broker
+
+  def getObjectId(self):
+    """ Return the object identifier for this object """
+    return self._objectId
+
+  def getClassKey(self):
+    """ Return the class-key that references the schema describing this object. """
+    return self._schema.getKey()
+
+  def getSchema(self):
+    """ Return the schema that describes this object. """
+    return self._schema
+
+  def getMethods(self):
+    """ Return a list of methods available for this object. """
+    return self._schema.getMethods()
+
+  def getTimestamps(self):
+    """ Return the current, creation, and deletion times for this object. """
+    return self._currentTime, self._createTime, self._deleteTime
+
+  def isDeleted(self):
+    """ Return True iff this object has been deleted. """
+    return self._deleteTime != 0
+
+  def isManaged(self):
+    """ Return True iff this object is a proxy for a managed object on an agent. """
+    return self._managed
+
+  def getIndex(self):
+    """ Return a string describing this object's primary key. """
+    result = u""
+    for property, value in self._properties:
+      if property.index:
+        if result != u"":
+          result += u":"
+        try:
+          valstr = unicode(self._session._displayValue(value, property.type))
+        except:
+          valstr = u"<undecodable>"
+        result += valstr
+    return result
+
+  def getProperties(self):
+    """ Return a list of object properties """
+    return self._properties
+
+  def getStatistics(self):
+    """ Return a list of object statistics """
+    return self._statistics
+
+  def mergeUpdate(self, newer):
+    """ Replace properties and/or statistics with a newly received update """
+    if not self.isManaged():
+      raise Exception("Object is not managed")
+    if self._objectId != newer._objectId:
+      raise Exception("Objects with different object-ids")
+    if len(newer.getProperties()) > 0:
+      self._properties = newer.getProperties()
+    if len(newer.getStatistics()) > 0:
+      self._statistics = newer.getStatistics()
+
+  def update(self):
+    """ Contact the agent and retrieve the lastest property and statistic values for this object. """
+    if not self.isManaged():
+      raise Exception("Object is not managed")
+    obj = self._session.getObjects(_objectId = self._objectId, _broker=self._broker)
+    if obj:
+      self.mergeUpdate(obj[0])
+    else:
+      raise Exception("Underlying object no longer exists")
+
+  def __repr__(self):
+    if self.isManaged():
+      id = self.getObjectId().__repr__()
+    else:
+      id = "unmanaged"
+    key = self.getClassKey()
+    return key.getPackageName() + ":" + key.getClassName() +\
+        "[" + id + "] " + self.getIndex().encode("utf8")
+
+  def __getattr__(self, name):
+    for method in self._schema.getMethods():
+      if name == method.name:
+        return lambda *args, **kwargs : self._invoke(name, args, kwargs)
+    for property, value in self._properties:
+      if name == property.name:
+        return value
+      if name == "_" + property.name + "_" and property.type == 10:  # Dereference references
+        deref = self._session.getObjects(_objectId=value, _broker=self._broker)
+        if len(deref) != 1:
+          return None
+        else:
+          return deref[0]
+    for statistic, value in self._statistics:
+      if name == statistic.name:
+        return value
+    raise Exception("Type Object has no attribute '%s'" % name)
+
+  def __setattr__(self, name, value):
+    if name[0] == '_':
+      super.__setattr__(self, name, value)
+      return
+
+    for prop, unusedValue in self._properties:
+      if name == prop.name:
+        newprop = (prop, value)
+        newlist = []
+        for old, val in self._properties:
+          if name == old.name:
+            newlist.append(newprop)
+          else:
+            newlist.append((old, val))
+        self._properties = newlist
+        return
+    super.__setattr__(self, name, value)
+
+  def _sendMethodRequest(self, name, args, kwargs, synchronous=False, timeWait=None):
+    for method in self._schema.getMethods():
+      if name == method.name:
+        aIdx = 0
+        sendCodec = Codec(self._broker.conn.spec)
+        seq = self._session.seqMgr._reserve((method, synchronous))
+        self._broker._setHeader(sendCodec, 'M', seq)
+        self._objectId.encode(sendCodec)
+        self._schema.getKey().encode(sendCodec)
+        sendCodec.write_str8(name)
+
+        count = 0
+        for arg in method.arguments:
+          if arg.dir.find("I") != -1:
+            count += 1
+        if count != len(args):
+          raise Exception("Incorrect number of arguments: expected %d, got %d" % (count, len(args)))
+
+        for arg in method.arguments:
+          if arg.dir.find("I") != -1:
+            self._session._encodeValue(sendCodec, args[aIdx], arg.type)
+            aIdx += 1
+        if timeWait:
+          ttl = timeWait * 1000
+        else:
+          ttl = None
+        smsg = self._broker._message(sendCodec.encoded, "agent.%d.%d" %
+                                     (self._objectId.getBrokerBank(), self._objectId.getAgentBank()),
+                                     ttl=ttl)
+        if synchronous:
+          try:
+            self._broker.cv.acquire()
+            self._broker.syncInFlight = True
+          finally:
+            self._broker.cv.release()
+        self._broker._send(smsg)
+        return seq
+    return None
+
+  def _invoke(self, name, args, kwargs):
+    if not self.isManaged():
+      raise Exception("Object is not managed")
+    if "_timeout" in kwargs:
+      timeout = kwargs["_timeout"]
+    else:
+      timeout = self._broker.SYNC_TIME
+
+    if "_async" in kwargs and kwargs["_async"]:
+      sync = False
+      if "_timeout" not in kwargs:
+        timeout = None
+    else:
+      sync = True
+
+    seq = self._sendMethodRequest(name, args, kwargs, sync, timeout)
+    if seq:
+      if not sync:
+        return seq
+      try:
+        self._broker.cv.acquire()
+        starttime = time()
+        while self._broker.syncInFlight and self._broker.error == None:
+          self._broker.cv.wait(timeout)
+          if time() - starttime > timeout:
+            self._session.seqMgr._release(seq)
+            raise RuntimeError("Timed out waiting for method to respond")
+      finally:
+        self._broker.cv.release()
+      if self._broker.error != None:
+        errorText = self._broker.error
+        self._broker.error = None
+        raise Exception(errorText)
+      return self._broker.syncResult
+    raise Exception("Invalid Method (software defect) [%s]" % name)
+
+  def _encodeUnmanaged(self, codec):
+    # emit presence masks for optional properties
+    mask = 0
+    bit  = 0
+    for prop, value in self._properties:
+      if prop.optional:
+        if bit == 0:
+          bit = 1
+        if value:
+          mask |= bit
+        bit = bit << 1
+        if bit == 256:
+          bit = 0
+          codec.write_uint8(mask)
+          mask = 0
+    if bit != 0:
+      codec.write_uint8(mask)
+
+    codec.write_uint8(20) 
+    codec.write_str8(self._schema.getKey().getPackageName())
+    codec.write_str8(self._schema.getKey().getClassName())
+    codec.write_bin128(self._schema.getKey().getHash())    
+    
+    # encode properties
+    for prop, value in self._properties:
+      if value != None: 
+        self._session._encodeValue(codec, value, prop.type)
+
+    # encode statistics
+    for stat, value in self._statistics:
+      self._session._encodeValue(codec, value, stat.type)
+
+  def _parsePresenceMasks(self, codec, schema):
+    excludeList = []
+    bit = 0
+    for property in schema.getProperties():
+      if property.optional:
+        if bit == 0:
+          mask = codec.read_uint8()
+          bit = 1
+        if (mask & bit) == 0:
+          excludeList.append(property.name)
+        bit *= 2
+        if bit == 256:
+          bit = 0
+    return excludeList    
+
 class Session:
   """
   An instance of the Session class represents a console session running
@@ -119,6 +405,18 @@
 
   DEFAULT_GET_WAIT_TIME = 60
 
+  ENCODINGS = {
+    str: 7,
+    timestamp: 8,
+    datetime: 8,  
+    int: 9,
+    long: 9,
+    float: 13,
+    UUID: 14,      
+    Object: 20,    
+    list: 21
+    }  
+
   def __init__(self, console=None, rcvObjects=True, rcvEvents=True, rcvHeartbeats=True,
                manageConnections=False, userBindings=False):
     """
@@ -265,6 +563,13 @@
         agentList.append(a)
     return agentList
 
+  def makeObject(self, classKey, broker=None, **kwargs):
+    """ Create a new, unmanaged object of the schema indicated by classKey """
+    schema = self.getSchema(classKey)
+    if schema == None:
+      raise Exception("Schema not found for classKey")
+    return Object(self, broker, schema, None, True, True, False, kwargs)
+
   def getObjects(self, **kwargs):
     """ Get a list of objects from QMF agents.
     All arguments are passed by name(keyword).
@@ -521,7 +826,7 @@
     if code == 0:
       for arg in method.arguments:
         if arg.dir.find("O") != -1:
-          outArgs[arg.name] = self._decodeValue(codec, arg.type)
+          outArgs[arg.name] = self._decodeValue(codec, arg.type, broker)
     result = MethodResult(code, text, outArgs)
     if synchronous:
       try:
@@ -559,7 +864,7 @@
   def _handleSchemaResp(self, broker, codec, seq):
     kind  = codec.read_uint8()
     classKey = ClassKey(codec)
-    _class = SchemaClass(kind, classKey, codec)
+    _class = SchemaClass(kind, classKey, codec, self)
     try:
       self.cv.acquire()
       self.packages[classKey.getPackageName()][classKey.getPackageKey()] = _class
@@ -605,9 +910,10 @@
         self.console.objectStats(broker, object)
 
   def _handleError(self, error):
-    self.error = error
     try:
       self.cv.acquire()
+      if len(self.syncSequenceList) > 0:
+        self.error = error
       self.syncSequenceList = []
       self.cv.notify()
     finally:
@@ -621,7 +927,7 @@
           return False
     return True
   
-  def _decodeValue(self, codec, typecode):
+  def _decodeValue(self, codec, typecode, broker=None):
     """ Decode, from the codec, a value based on its typecode. """
     if   typecode == 1:  data = codec.read_uint8()      # U8
     elif typecode == 2:  data = codec.read_uint16()     # U16
@@ -636,11 +942,59 @@
     elif typecode == 12: data = codec.read_float()      # FLOAT
     elif typecode == 13: data = codec.read_double()     # DOUBLE
     elif typecode == 14: data = codec.read_uuid()       # UUID
-    elif typecode == 15: data = codec.read_map()        # FTABLE
     elif typecode == 16: data = codec.read_int8()       # S8
     elif typecode == 17: data = codec.read_int16()      # S16
     elif typecode == 18: data = codec.read_int32()      # S32
     elif typecode == 19: data = codec.read_int64()      # S63
+    elif typecode == 15:                                # FTABLE
+      data = {}
+      sc = Codec(codec.spec, codec.read_vbin32())
+      if sc.encoded:
+        count = sc.read_uint32()
+        while count > 0:
+          k = sc.read_str8()
+          code = sc.read_uint8()
+          v = self._decodeValue(sc, code, broker)
+          data[k] = v
+          count -= 1
+    elif typecode == 20:                                # OBJECT
+      # Peek at the type, and if it is still 20 pull it decode. If
+      # Not, call back into self.
+      inner_type_code = codec.read_uint8()
+      if inner_type_code == 20:
+          classKey = ClassKey(codec)
+          try:
+            self.cv.acquire()
+            pname = classKey.getPackageName()
+            if pname not in self.packages:
+              return None
+            pkey = classKey.getPackageKey()
+            if pkey not in self.packages[pname]:
+              return None
+            schema = self.packages[pname][pkey]
+          finally:
+            self.cv.release()
+          data = Object(self, broker, schema, codec, True, True, False)
+      else:
+          data = self._decodeValue(codec, inner_type_code, broker)
+    elif typecode == 21:                                # List
+        #taken from codec10.read_list
+        sc = Codec(codec.spec, codec.read_vbin32())
+        count = sc.read_uint32()
+        data = []
+        while count > 0:
+          type = sc.read_uint8()
+          data.append(self._decodeValue(sc,type,broker))
+          count -= 1
+    elif typecode == 22:                                #Array
+        #taken from codec10.read_array
+        sc = Codec(codec.spec, codec.read_vbin32()) 
+        count = sc.read_uint32()
+        type = sc.read_uint8()
+        data = []
+        while count > 0:
+          data.append(self._decodeValue(sc,type,broker))
+          count -= 1
     else:
       raise ValueError("Invalid type code: %d" % typecode)
     return data
@@ -660,14 +1014,54 @@
     elif typecode == 12: codec.write_float  (float(value))  # FLOAT
     elif typecode == 13: codec.write_double (float(value))  # DOUBLE
     elif typecode == 14: codec.write_uuid   (value.bytes)   # UUID
-    elif typecode == 15: codec.write_map    (value)         # FTABLE
     elif typecode == 16: codec.write_int8   (int(value))    # S8
     elif typecode == 17: codec.write_int16  (int(value))    # S16
     elif typecode == 18: codec.write_int32  (int(value))    # S32
     elif typecode == 19: codec.write_int64  (int(value))    # S64
+    elif typecode == 20: value._encodeUnmanaged(codec)      # OBJECT
+    elif typecode == 15:                                    # FTABLE
+        sc = Codec(codec.spec)    
+        if value is not None:
+          sc.write_uint32(len(value))
+          for k, v in value.items():
+            mtype = self.encoding(v)
+            sc.write_str8(k)
+            sc.write_uint8(mtype)
+            self._encodeValue(sc, v, mtype)
+        else:
+          sc.write_uint32(0)
+        codec.write_vbin32(sc.encoded)
+    elif typecode == 21:                                    # List
+        sc = Codec(codec.spec)
+        self._encodeValue(sc, len(value), 3)
+        for o in value:
+          ltype=self.encoding(o)
+          self._encodeValue(sc,ltype,1)
+          self._encodeValue(sc, o, ltype)
+        codec.write_vbin32(sc.encoded)
+    elif typecode == 22:                                    # Array
+        sc = Codec(codec.spec)    
+        self._encodeValue(sc, len(value), 3)
+        if len(value) > 0:
+            ltype = self.encoding(value[0])
+            self._encodeValue(sc,ltype,1)
+            for o in value:
+              self._encodeValue(sc, o, ltype)
+        codec.write_vbin32(sc.encoded)
     else:
       raise ValueError ("Invalid type code: %d" % typecode)
 
+  def encoding(self, value):
+      return self._encoding(value.__class__)
+      
+  def _encoding(self, klass):
+    if Session.ENCODINGS.has_key(klass):
+      return self.ENCODINGS[klass]
+    for base in klass.__bases__:
+      result = self._encoding(base, obj)
+      if result != None:
+        return result
+            
   def _displayValue(self, value, typecode):
     """ """
     if   typecode == 1:  return unicode(value)
@@ -690,8 +1084,64 @@
     elif typecode == 17: return unicode(value)
     elif typecode == 18: return unicode(value)
     elif typecode == 19: return unicode(value)
+    elif typecode == 20: return unicode(value.__repr__())
+    elif typecode == 21: return unicode(value.__repr__())
+    elif typecode == 22: return unicode(value.__repr__())
+    else:
+      raise ValueError ("Invalid type code: %d" % typecode)
+    
+  def _defaultValue(self, stype, broker=None, kwargs={}):
+    """ """
+    typecode = stype.type
+    if   typecode == 1:  return 0
+    elif typecode == 2:  return 0
+    elif typecode == 3:  return 0
+    elif typecode == 4:  return 0
+    elif typecode == 6:  return ""
+    elif typecode == 7:  return ""
+    elif typecode == 8:  return 0
+    elif typecode == 9:  return 0
+    elif typecode == 10: return ObjectId(None)
+    elif typecode == 11: return False
+    elif typecode == 12: return 0.0
+    elif typecode == 13: return 0.0
+    elif typecode == 14: return UUID([0 for i in range(16)])
+    elif typecode == 15: return {}
+    elif typecode == 16: return 0
+    elif typecode == 17: return 0
+    elif typecode == 18: return 0
+    elif typecode == 19: return 0
+    elif typecode == 21: return []
+    elif typecode == 22: return []
+    elif typecode == 20:
+      try:
+        if "classKeys" in kwargs:
+          keyList = kwargs["classKeys"]
+        else:
+          keyList = None
+        classKey = self._bestClassKey(stype.refPackage, stype.refClass, keyList)
+        if classKey:
+          return self.makeObject(classKey, broker, kwargs)
+      except:
+        pass
+      return None
     else:
       raise ValueError ("Invalid type code: %d" % typecode)
+
+  def _bestClassKey(self, pname, cname, preferredList):
+    """ """
+    if pname == None or cname == None:
+      if len(preferredList) == 0:
+        return None
+      return preferredList[0]
+    for p in preferredList:
+      if p.getPackageName() == pname and p.getClassName() == cname:
+        return p
+    clist = self.getClasses(pname)
+    for c in clist:
+      if c.getClassName() == cname:
+        return c
+    return None
     
   def _sendMethodRequest(self, broker, schemaKey, objectId, name, argList):
     """ This function can be used to send a method request to an object given only the
@@ -783,18 +1233,24 @@
   CLASS_KIND_TABLE = 1
   CLASS_KIND_EVENT = 2
 
-  def __init__(self, kind, key, codec):
+  def __init__(self, kind, key, codec, session):
     self.kind = kind
     self.classKey = key
     self.properties = []
     self.statistics = []
     self.methods = []
     self.arguments = []
+    self.session = session
 
+    hasSupertype = codec.read_uint8()
     if self.kind == self.CLASS_KIND_TABLE:
       propCount   = codec.read_uint16()
       statCount   = codec.read_uint16()
       methodCount = codec.read_uint16()
+      if hasSupertype == 1:
+        self.superTypeKey = ClassKey(codec)
+      else:
+        self.superTypeKey = None ;
       for idx in range(propCount):
         self.properties.append(SchemaProperty(codec))
       for idx in range(statCount):
@@ -804,6 +1260,10 @@
 
     elif self.kind == self.CLASS_KIND_EVENT:
       argCount = codec.read_uint16()
+      if (hasSupertype):
+        self.superTypeKey = ClassKey(codec)
+      else:
+        self.superTypeKey = None ;      
       for idx in range(argCount):
         self.arguments.append(SchemaArgument(codec, methodArg=False))
 
@@ -823,19 +1283,32 @@
 
   def getProperties(self):
     """ Return the list of properties for the class. """
-    return self.properties
+    if (self.superTypeKey == None):
+        return self.properties
+    else:
+        return self.properties + self.session.getSchema(self.superTypeKey).getProperties() 
 
   def getStatistics(self):
     """ Return the list of statistics for the class. """
-    return self.statistics
+    if (self.superTypeKey == None):
+        return self.statistics
+    else:
+        return self.statistics + self.session.getSchema(self.superTypeKey).getStatistics()     
 
   def getMethods(self):
     """ Return the list of methods for the class. """
-    return self.methods
+    if (self.superTypeKey == None):
+        return self.methods
+    else:
+        return self.methods + self.session.getSchema(self.superTypeKey).getMethods()    
 
   def getArguments(self):
     """ Return the list of events for the class. """
-    return self.arguments
+    """ Return the list of methods for the class. """
+    if (self.superTypeKey == None):
+        return self.arguments
+    else:
+        return self.arguments + self.session.getSchema(self.superTypeKey).getArguments()  
 
 class SchemaProperty:
   """ """
@@ -846,18 +1319,22 @@
     self.access   = str(map["access"])
     self.index    = map["index"] != 0
     self.optional = map["optional"] != 0
-    self.unit     = None
-    self.min      = None
-    self.max      = None
-    self.maxlen   = None
-    self.desc     = None
+    self.refPackage = None
+    self.refClass   = None
+    self.unit       = None
+    self.min        = None
+    self.max        = None
+    self.maxlen     = None
+    self.desc       = None
 
     for key, value in map.items():
-      if   key == "unit"   : self.unit   = value
-      elif key == "min"    : self.min    = value
-      elif key == "max"    : self.max    = value
-      elif key == "maxlen" : self.maxlen = value
-      elif key == "desc"   : self.desc   = value
+      if   key == "unit"       : self.unit = value
+      elif key == "min"        : self.min = value
+      elif key == "max"        : self.max = value
+      elif key == "maxlen"     : self.maxlen = value
+      elif key == "desc"       : self.desc = value
+      elif key == "refPackage" : self.refPackage = value
+      elif key == "refClass"   : self.refClass = value
 
   def __repr__(self):
     return self.name
@@ -920,6 +1397,8 @@
     self.maxlen  = None
     self.desc    = None
     self.default = None
+    self.refPackage = None
+    self.refClass   = None
 
     for key, value in map.items():
       if   key == "unit"    : self.unit    = value
@@ -928,6 +1407,8 @@
       elif key == "maxlen"  : self.maxlen  = value
       elif key == "desc"    : self.desc    = value
       elif key == "default" : self.default = value
+      elif key == "refPackage" : self.refPackage = value
+      elif key == "refClass"   : self.refClass = value
 
 class ObjectId:
   """ Object that represents QMF object identifiers """
@@ -987,209 +1468,6 @@
   def __eq__(self, other):
     return (self.first, self.second).__eq__(other)
 
-class Object(object):
-  """ This class defines a 'proxy' object representing a real managed object on an agent.
-      Actions taken on this proxy are remotely affected on the real managed object.
-  """
-  def __init__(self, session, broker, schema, codec, prop, stat):
-    self._session = session
-    self._broker  = broker
-    self._schema  = schema
-    self._currentTime = codec.read_uint64()
-    self._createTime  = codec.read_uint64()
-    self._deleteTime  = codec.read_uint64()
-    self._objectId    = ObjectId(codec)
-    self._properties  = []
-    self._statistics  = []
-    if prop:
-      notPresent = self._parsePresenceMasks(codec, schema)
-      for property in schema.getProperties():
-        if property.name in notPresent:
-          self._properties.append((property, None))
-        else:
-          self._properties.append((property, self._session._decodeValue(codec, property.type)))
-    if stat:
-      for statistic in schema.getStatistics():
-        self._statistics.append((statistic, self._session._decodeValue(codec, statistic.type)))
-
-  def getBroker(self):
-    """ Return the broker from which this object was sent """
-    return self._broker
-
-  def getObjectId(self):
-    """ Return the object identifier for this object """
-    return self._objectId
-
-  def getClassKey(self):
-    """ Return the class-key that references the schema describing this object. """
-    return self._schema.getKey()
-
-  def getSchema(self):
-    """ Return the schema that describes this object. """
-    return self._schema
-
-  def getMethods(self):
-    """ Return a list of methods available for this object. """
-    return self._schema.getMethods()
-
-  def getTimestamps(self):
-    """ Return the current, creation, and deletion times for this object. """
-    return self._currentTime, self._createTime, self._deleteTime
-
-  def isDeleted(self):
-    """ Return True iff this object has been deleted. """
-    return self._deleteTime != 0
-
-  def getIndex(self):
-    """ Return a string describing this object's primary key. """
-    result = u""
-    for property, value in self._properties:
-      if property.index:
-        if result != u"":
-          result += u":"
-        try:
-          valstr = unicode(self._session._displayValue(value, property.type))
-        except:
-          valstr = u"<undecodable>"
-        result += valstr
-    return result
-
-  def getProperties(self):
-    """ Return a list of object properties """
-    return self._properties
-
-  def getStatistics(self):
-    """ Return a list of object statistics """
-    return self._statistics
-
-  def mergeUpdate(self, newer):
-    """ Replace properties and/or statistics with a newly received update """
-    if self._objectId != newer._objectId:
-      raise Exception("Objects with different object-ids")
-    if len(newer.getProperties()) > 0:
-      self._properties = newer.getProperties()
-    if len(newer.getStatistics()) > 0:
-      self._statistics = newer.getStatistics()
-
-  def update(self):
-    """ Contact the agent and retrieve the lastest property and statistic values for this object. """
-    obj = self._session.getObjects(_objectId = self._objectId, _broker=self._broker)
-    if obj:
-      self.mergeUpdate(obj[0])
-    else:
-      raise Exception("Underlying object no longer exists")
-
-  def __repr__(self):
-    key = self.getClassKey()
-    return key.getPackageName() + ":" + key.getClassName() +\
-        "[" + self.getObjectId().__repr__() + "] " + self.getIndex().encode("utf8")
-
-  def __getattr__(self, name):
-    for method in self._schema.getMethods():
-      if name == method.name:
-        return lambda *args, **kwargs : self._invoke(name, args, kwargs)
-    for property, value in self._properties:
-      if name == property.name:
-        return value
-      if name == "_" + property.name + "_" and property.type == 10:  # Dereference references
-        deref = self._session.getObjects(_objectId=value, _broker=self._broker)
-        if len(deref) != 1:
-          return None
-        else:
-          return deref[0]
-    for statistic, value in self._statistics:
-      if name == statistic.name:
-        return value
-    raise Exception("Type Object has no attribute '%s'" % name)
-
-  def _sendMethodRequest(self, name, args, kwargs, synchronous=False, timeWait=None):
-    for method in self._schema.getMethods():
-      if name == method.name:
-        aIdx = 0
-        sendCodec = Codec(self._broker.conn.spec)
-        seq = self._session.seqMgr._reserve((method, synchronous))
-        self._broker._setHeader(sendCodec, 'M', seq)
-        self._objectId.encode(sendCodec)
-        self._schema.getKey().encode(sendCodec)
-        sendCodec.write_str8(name)
-
-        count = 0
-        for arg in method.arguments:
-          if arg.dir.find("I") != -1:
-            count += 1
-        if count != len(args):
-          raise Exception("Incorrect number of arguments: expected %d, got %d" % (count, len(args)))
-
-        for arg in method.arguments:
-          if arg.dir.find("I") != -1:
-            self._session._encodeValue(sendCodec, args[aIdx], arg.type)
-            aIdx += 1
-        if timeWait:
-          ttl = timeWait * 1000
-        else:
-          ttl = None
-        smsg = self._broker._message(sendCodec.encoded, "agent.%d.%d" %
-                                     (self._objectId.getBrokerBank(), self._objectId.getAgentBank()),
-                                     ttl=ttl)
-        if synchronous:
-          try:
-            self._broker.cv.acquire()
-            self._broker.syncInFlight = True
-          finally:
-            self._broker.cv.release()
-        self._broker._send(smsg)
-        return seq
-    return None
-
-  def _invoke(self, name, args, kwargs):
-    if "_timeout" in kwargs:
-      timeout = kwargs["_timeout"]
-    else:
-      timeout = self._broker.SYNC_TIME
-
-    if "_async" in kwargs and kwargs["_async"]:
-      sync = False
-      if "_timeout" not in kwargs:
-        timeout = None
-    else:
-      sync = True
-
-    seq = self._sendMethodRequest(name, args, kwargs, sync, timeout)
-    if seq:
-      if not sync:
-        return seq
-      try:
-        self._broker.cv.acquire()
-        starttime = time()
-        while self._broker.syncInFlight and self._broker.error == None:
-          self._broker.cv.wait(timeout)
-          if time() - starttime > timeout:
-            self._session.seqMgr._release(seq)
-            raise RuntimeError("Timed out waiting for method to respond")
-      finally:
-        self._broker.cv.release()
-      if self._broker.error != None:
-        errorText = self._broker.error
-        self._broker.error = None
-        raise Exception(errorText)
-      return self._broker.syncResult
-    raise Exception("Invalid Method (software defect) [%s]" % name)
-
-  def _parsePresenceMasks(self, codec, schema):
-    excludeList = []
-    bit = 0
-    for property in schema.getProperties():
-      if property.optional:
-        if bit == 0:
-          mask = codec.read_uint8()
-          bit = 1
-        if (mask & bit) == 0:
-          excludeList.append(property.name)
-        bit *= 2
-        if bit == 256:
-          bit = 0
-    return excludeList
-
 class MethodResult(object):
   """ """
   def __init__(self, status, text, outArgs):
@@ -1361,9 +1639,13 @@
       self.reqsOutstanding = 1
 
       sock = connect(self.host, self.port)
+      sock.settimeout(5)
       if self.ssl:
         sock = ssl(sock)
-      self.conn = Connection(sock, username=self.authUser, password=self.authPass)
+      self.conn = Connection(sock, username=self.authUser, password=self.authPass, heartbeat=2)
+      def aborted():
+        raise Timeout("read timed out")
+      self.conn.aborted = aborted
       self.conn.start()
       self.replyName = "reply-%s" % self.amqpSessionId
       self.amqpSession = self.conn.session(self.amqpSessionId)
@@ -1424,7 +1706,7 @@
     """ Compose the header of a management message. """
     codec.write_uint8(ord('A'))
     codec.write_uint8(ord('M'))
-    codec.write_uint8(ord('2'))
+    codec.write_uint8(ord('3'))
     codec.write_uint8(ord(opcode))
     codec.write_uint32(seq)
 
@@ -1438,7 +1720,7 @@
       if octet != 'M':
         return None, None
       octet = chr(codec.read_uint8())
-      if octet != '2':
+      if octet != '3':
         return None, None
       opcode = chr(codec.read_uint8())
       seq    = codec.read_uint32()
@@ -1453,6 +1735,7 @@
       dp.ttl = ttl
     mp = self.amqpSession.message_properties()
     mp.content_type = "x-application/qmf"
+    mp.user_id = self.authUser
     mp.reply_to = self.amqpSession.reply_to("amq.direct", self.replyName)
     return Message(dp, mp, body)
 
@@ -1576,7 +1859,7 @@
         self.schema = session.packages[pname][pkey]
         self.arguments = {}
         for arg in self.schema.arguments:
-          self.arguments[arg.name] = session._decodeValue(codec, arg.type)
+          self.arguments[arg.name] = session._decodeValue(codec, arg.type, broker)
 
   def __repr__(self):
     if self.schema == None:

Modified: qpid/trunk/qpid/python/qpid/management.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/management.py?rev=783818&r1=783817&r2=783818&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/management.py (original)
+++ qpid/trunk/qpid/python/qpid/management.py Thu Jun 11 15:54:37 2009
@@ -401,7 +401,7 @@
     """ Compose the header of a management message. """
     codec.write_uint8 (ord ('A'))
     codec.write_uint8 (ord ('M'))
-    codec.write_uint8 (ord ('2'))
+    codec.write_uint8 (ord ('3'))
     codec.write_uint8 (opcode)
     codec.write_uint32  (seq)
 
@@ -415,7 +415,7 @@
       if octet != 'M':
         return None
       octet = chr (codec.read_uint8 ())
-      if octet != '2':
+      if octet != '3':
         return None
       opcode = chr (codec.read_uint8 ())
       seq    = codec.read_uint32 ()
@@ -672,9 +672,14 @@
     packageName = codec.read_str8 ()
     className   = codec.read_str8 ()
     hash        = codec.read_bin128 ()
+    hasSupertype = codec.read_uint8()
     configCount = codec.read_uint16 ()
     instCount   = codec.read_uint16 ()
     methodCount = codec.read_uint16 ()
+    if hasSupertype != 0:
+      supertypePackage = codec.read_str8()
+      supertypeClass   = codec.read_str8()
+      supertypeHash    = codec.read_bin128()
 
     if packageName not in self.packages:
       return

Modified: qpid/trunk/qpid/python/qpid/managementdata.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/managementdata.py?rev=783818&r1=783817&r2=783818&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/managementdata.py (original)
+++ qpid/trunk/qpid/python/qpid/managementdata.py Thu Jun 11 15:54:37 2009
@@ -360,6 +360,12 @@
       return "int32"
     elif typecode == 19:
       return "int64"
+    elif typecode == 20:
+      return "object"
+    elif typecode == 21:
+      return "list"
+    elif typecode == 22:
+      return "array"      
     else:
       raise ValueError ("Invalid type code: %d" % typecode)
 

Modified: qpid/trunk/qpid/ruby/lib/qpid/qmf.rb
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/ruby/lib/qpid/qmf.rb?rev=783818&r1=783817&r2=783818&view=diff
==============================================================================
--- qpid/trunk/qpid/ruby/lib/qpid/qmf.rb (original)
+++ qpid/trunk/qpid/ruby/lib/qpid/qmf.rb Thu Jun 11 15:54:37 2009
@@ -773,10 +773,16 @@
       @methods = []
       @arguments = []
 
+      has_supertype = codec.read_uint8
       if @kind == CLASS_KIND_TABLE
         prop_count   = codec.read_uint16
         stat_count   = codec.read_uint16
         method_count = codec.read_uint16
+        if has_supertype == 1
+          codec.read_str8
+          codec.read_str8
+          codec.read_bin128
+        end
         prop_count.times { |idx|
           @properties << SchemaProperty.new(codec) }
         stat_count.times { |idx|
@@ -1111,13 +1117,11 @@
     def invoke(method, name, args)
       kwargs = args[args.size - 1]
       sync = true
-      timeout = nil
+      timeout = DEFAULT_METHOD_WAIT_TIME
 
       if kwargs.class == Hash
         if kwargs.include?(:timeout)
           timeout = kwargs[:timeout]
-        else
-          timeout = DEFAULT_METHOD_WAIT_TIME
         end
 
         if kwargs.include?(:async)
@@ -1343,7 +1347,7 @@
     def set_header(codec, opcode, seq=0)
       codec.write_uint8(?A)
       codec.write_uint8(?M)
-      codec.write_uint8(?2)
+      codec.write_uint8(?3)
       codec.write_uint8(opcode)
       codec.write_uint32(seq)
     end
@@ -1508,7 +1512,7 @@
       begin
         return [nil, nil] unless codec.read_uint8 == ?A
         return [nil, nil] unless codec.read_uint8 == ?M
-        return [nil, nil] unless codec.read_uint8 == ?2
+        return [nil, nil] unless codec.read_uint8 == ?3
         opcode = codec.read_uint8
         seq    = codec.read_uint32
         return [opcode, seq]



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org