You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2010/03/31 23:13:14 UTC

svn commit: r929716 [4/4] - in /qpid/trunk/qpid: cpp/bindings/qmf/tests/ cpp/examples/qmf-agent/ cpp/include/qpid/agent/ cpp/include/qpid/framing/ cpp/include/qpid/management/ cpp/managementgen/ cpp/managementgen/qmfgen/ cpp/managementgen/qmfgen/templa...

Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py?rev=929716&r1=929715&r2=929716&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py (original)
+++ qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py Wed Mar 31 21:13:12 2010
@@ -41,6 +41,9 @@ from cStringIO       import StringIO
 #import qpid.log
 #qpid.log.enable(name="qpid.io.cmd", level=qpid.log.DEBUG)
 
+#===================================================================================================
+# CONSOLE
+#===================================================================================================
 class Console:
   """ To access the asynchronous operations, a class must be derived from
   Console with overrides of any combination of the available methods. """
@@ -94,6 +97,10 @@ class Console:
     """ Invoked when a method response from an asynchronous method call is received. """
     pass
 
+
+#===================================================================================================
+# BrokerURL
+#===================================================================================================
 class BrokerURL(URL):
   def __init__(self, text):
     URL.__init__(self, text)
@@ -115,16 +122,30 @@ class BrokerURL(URL):
   def match(self, host, port):
     return socket.getaddrinfo(self.host, self.port)[0][4] == socket.getaddrinfo(host, port)[0][4]
 
+
+#===================================================================================================
+# Object
+#===================================================================================================
 class Object(object):
-  """ This class defines a 'proxy' object representing a real managed object on an agent.
-      Actions taken on this proxy are remotely affected on the real managed object.
   """
-  def __init__(self, session, broker, schema, codec, prop, stat, managed=True, kwargs={}):
-    self._session = session
-    self._broker  = broker
+  This class defines a 'proxy' object representing a real managed object on an agent.
+  Actions taken on this proxy are remotely affected on the real managed object.
+  """
+  def __init__(self, agent, schema, codec=None, prop=None, stat=None, v2Map=None, agentName=None, kwargs={}):
+    self._agent   = agent
+    self._session = None
+    self._broker  = None
+    if agent:
+      self._session = agent.session
+      self._broker  = agent.broker
     self._schema  = schema
-    self._managed = managed
-    if self._managed:
+    self._properties  = []
+    self._statistics  = []
+    if v2Map:
+      self.v2Init(v2Map, agentName)
+      return
+
+    if self._agent:
       self._currentTime = codec.read_uint64()
       self._createTime  = codec.read_uint64()
       self._deleteTime  = codec.read_uint64()
@@ -134,8 +155,6 @@ class Object(object):
       self._createTime  = None
       self._deleteTime  = None
       self._objectId    = None
-    self._properties  = []
-    self._statistics  = []
     if codec:
       if prop:
         notPresent = self._parsePresenceMasks(codec, schema)
@@ -143,18 +162,38 @@ class Object(object):
           if property.name in notPresent:
             self._properties.append((property, None))
           else:
-            self._properties.append((property, self._session._decodeValue(codec, property.type, broker)))
+            self._properties.append((property, self._session._decodeValue(codec, property.type, self._broker)))
       if stat:
         for statistic in schema.getStatistics():
-          self._statistics.append((statistic, self._session._decodeValue(codec, statistic.type, broker)))
+          self._statistics.append((statistic, self._session._decodeValue(codec, statistic.type, self._broker)))
     else:
       for property in schema.getProperties():
         if property.optional:
           self._properties.append((property, None))
         else:
-          self._properties.append((property, self._session._defaultValue(property, broker, kwargs)))
+          self._properties.append((property, self._session._defaultValue(property, self._broker, kwargs)))
       for statistic in schema.getStatistics():
-          self._statistics.append((statistic, self._session._defaultValue(statistic, broker, kwargs)))
+          self._statistics.append((statistic, self._session._defaultValue(statistic, self._broker, kwargs)))
+
+  def v2Init(self, omap, agentName):
+    if omap.__class__ != dict:
+      raise Exception("QMFv2 object data must be a map/dict")
+    if '_values' not in omap:
+      raise Exception("QMFv2 object must have '_values' element")
+
+    values = omap['_values']
+    for prop in self._schema.getProperties():
+      if prop.name in values:
+        self._properties.append((prop, values[prop.name]))
+    for stat in self._schema.getStatistics():
+      if stat.name in values:
+        self._statistics.append((stat, values[stat.name]))
+    if '_subtypes' in omap:
+      self._subtypes = omap['_subtypes']
+    if '_object_id' in omap:
+      self._objectId = ObjectId(omap['_object_id'], agentName=agentName)
+    else:
+      self._objectId = None
 
   def getBroker(self):
     """ Return the broker from which this object was sent """
@@ -186,7 +225,7 @@ class Object(object):
 
   def isManaged(self):
     """ Return True iff this object is a proxy for a managed object on an agent. """
-    return self._managed
+    return self._objectId and self._agent
 
   def getIndex(self):
     """ Return a string describing this object's primary key. """
@@ -225,7 +264,7 @@ class Object(object):
     """ Contact the agent and retrieve the lastest property and statistic values for this object. """
     if not self.isManaged():
       raise Exception("Object is not managed")
-    obj = self._session.getObjects(_objectId = self._objectId, _broker=self._broker)
+    obj = self._agent.getObjects(_objectId=self._objectId)
     if obj:
       self.mergeUpdate(obj[0])
     else:
@@ -244,17 +283,17 @@ class Object(object):
     for method in self._schema.getMethods():
       if name == method.name:
         return lambda *args, **kwargs : self._invoke(name, args, kwargs)
-    for property, value in self._properties:
-      if name == property.name:
+    for prop, value in self._properties:
+      if name == prop.name:
         return value
-      if name == "_" + property.name + "_" and property.type == 10:  # Dereference references
-        deref = self._session.getObjects(_objectId=value, _broker=self._broker)
+      if name == "_" + prop.name + "_" and prop.type == 10:  # Dereference references
+        deref = self._agent.getObjects(_objectId=value)
         if len(deref) != 1:
           return None
         else:
           return deref[0]
-    for statistic, value in self._statistics:
-      if name == statistic.name:
+    for stat, value in self._statistics:
+      if name == stat.name:
         return value
     raise Exception("Type Object has no attribute '%s'" % name)
 
@@ -282,10 +321,6 @@ class Object(object):
         aIdx = 0
         sendCodec = Codec()
         seq = self._session.seqMgr._reserve((method, synchronous))
-        self._broker._setHeader(sendCodec, 'M', seq)
-        self._objectId.encode(sendCodec)
-        self._schema.getKey().encode(sendCodec)
-        sendCodec.write_str8(name)
 
         count = 0
         for arg in method.arguments:
@@ -294,24 +329,64 @@ class Object(object):
         if count != len(args):
           raise Exception("Incorrect number of arguments: expected %d, got %d" % (count, len(args)))
 
-        for arg in method.arguments:
-          if arg.dir.find("I") != -1:
-            self._session._encodeValue(sendCodec, args[aIdx], arg.type)
-            aIdx += 1
-        if timeWait:
-          ttl = timeWait * 1000
+        if self._agent.isV2:
+          #
+          # Compose and send a QMFv2 method request
+          #
+          call = {}
+          call['_object_id'] = self._objectId.asMap()
+          call['_method_name'] = name
+          argMap = {}
+          for arg in method.arguments:
+            if arg.dir.find("I") != -1:
+              argMap[arg.name] = args[aIdx]
+              aIdx += 1
+          call['_arguments'] = argMap
+
+          dp = self._broker.amqpSession.delivery_properties()
+          dp.routing_key = self._objectId.getAgentBank()
+          mp = self._broker.amqpSession.message_properties()
+          mp.content_type = "amqp/map"
+          mp.user_id = self._broker.authUser
+          mp.correlation_id = str(seq)
+          mp.app_id = "qmf2"
+          mp.reply_to = self._broker.amqpSession.reply_to("qmf.default.direct", self._broker.v2_queue_name)
+          mp.application_headers = {'qmf.opcode':'_method_request'}
+          sendCodec.write_map(call)
+          smsg = Message(dp, mp, sendCodec.encoded)
+          exchange = "qmf.default.direct"
+
         else:
-          ttl = None
-        smsg = self._broker._message(sendCodec.encoded, "agent.%d.%d" %
-                                     (self._objectId.getBrokerBank(), self._objectId.getAgentBank()),
-                                     ttl=ttl)
+          #
+          # Associate this sequence with the agent hosting the object so we can correctly
+          # route the method-response
+          #
+          agent = self._broker.getAgent(self._broker.getBrokerBank(), self._objectId.getAgentBank())
+          self._broker._setSequence(seq, agent)
+
+          #
+          # Compose and send a QMFv1 method request
+          #
+          self._broker._setHeader(sendCodec, 'M', seq)
+          self._objectId.encode(sendCodec)
+          self._schema.getKey().encode(sendCodec)
+          sendCodec.write_str8(name)
+
+          for arg in method.arguments:
+            if arg.dir.find("I") != -1:
+              self._session._encodeValue(sendCodec, args[aIdx], arg.type)
+              aIdx += 1
+          smsg = self._broker._message(sendCodec.encoded, "agent.%d.%s" %
+                                       (self._objectId.getBrokerBank(), self._objectId.getAgentBank()))
+          exchange = "qpid.management"
+
         if synchronous:
           try:
             self._broker.cv.acquire()
             self._broker.syncInFlight = True
           finally:
             self._broker.cv.release()
-        self._broker._send(smsg)
+        self._broker._send(smsg, exchange)
         return seq
     return None
 
@@ -352,7 +427,6 @@ class Object(object):
     raise Exception("Invalid Method (software defect) [%s]" % name)
 
   def _encodeUnmanaged(self, codec):
-
     codec.write_uint8(20) 
     codec.write_str8(self._schema.getKey().getPackageName())
     codec.write_str8(self._schema.getKey().getClassName())
@@ -399,6 +473,10 @@ class Object(object):
           bit = 0
     return excludeList    
 
+
+#===================================================================================================
+# Session
+#===================================================================================================
 class Session:
   """
   An instance of the Session class represents a console session running
@@ -423,6 +501,7 @@ class Session:
     list: 21
     }  
 
+
   def __init__(self, console=None, rcvObjects=True, rcvEvents=True, rcvHeartbeats=True,
                manageConnections=False, userBindings=False):
     """
@@ -450,7 +529,7 @@ class Session:
     """
     self.console           = console
     self.brokers           = []
-    self.packages          = {}
+    self.schemaCache       = SchemaCache()
     self.seqMgr            = SequenceManager()
     self.cv                = Condition()
     self.syncSequenceList  = []
@@ -471,9 +550,35 @@ class Session:
     if self.userBindings and not self.rcvObjects:
       raise Exception("userBindings can't be set unless rcvObjects is set and a console is provided")
 
+
+  def _getBrokerForAgentAddr(self, agent_addr):
+    try:
+      self.cv.acquire()
+      key = (1, agent_addr)
+      for b in self.brokers:
+        if key in b.agents:
+          return b
+    finally:
+      self.cv.release()
+    return None
+
+
+  def _getAgentForAgentAddr(self, agent_addr):
+    try:
+      self.cv.acquire()
+      key = agent_addr
+      for b in self.brokers:
+        if key in b.agents:
+          return b.agents[key]
+    finally:
+      self.cv.release()
+    return None
+
+
   def __repr__(self):
     return "QMF Console Session Manager (brokers: %d)" % len(self.brokers)
 
+
   def addBroker(self, target="localhost", timeout=None, mechanisms=None):
     """ Connect to a Qpid broker.  Returns an object of type Broker. """
     url = BrokerURL(target)
@@ -482,9 +587,12 @@ class Session:
 
     self.brokers.append(broker)
     if not self.manageConnections:
-      self.getObjects(broker=broker, _class="agent")
+      agent = broker.getAgent(1,0)
+      if agent:
+        agent.getObjects(_class="agent")
     return broker
 
+
   def delBroker(self, broker):
     """ Disconnect from a broker.  The 'broker' argument is the object
     returned from the addBroker call """
@@ -495,34 +603,27 @@ class Session:
     self.brokers.remove(broker)
     del broker
 
+
   def getPackages(self):
     """ Get the list of known QMF packages """
     for broker in self.brokers:
       broker._waitForStable()
-    list = []
-    for package in self.packages:
-      list.append(package)
-    return list
+    return self.schemaCache.getPackages()
+
 
   def getClasses(self, packageName):
     """ Get the list of known classes within a QMF package """
     for broker in self.brokers:
       broker._waitForStable()
-    list = []
-    if packageName in self.packages:
-      for pkey in self.packages[packageName]:
-        list.append(self.packages[packageName][pkey].getKey())
-    return list
+    return self.schemaCache.getClasses(packageName)
+
 
   def getSchema(self, classKey):
     """ Get the schema for a QMF class """
     for broker in self.brokers:
       broker._waitForStable()
-    pname = classKey.getPackageName()
-    pkey = classKey.getPackageKey()
-    if pname in self.packages:
-      if pkey in self.packages[pname]:
-        return self.packages[pname][pkey]
+    return self.schemaCache.getSchema(classKey)
+
 
   def bindPackage(self, packageName):
     """ Request object updates for all table classes within a package. """
@@ -535,6 +636,7 @@ class Session:
         broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName,
                                          binding_key=key)
 
+
   def bindClass(self, pname, cname):
     """ Request object updates for a particular table class by package and class name. """
     if not self.userBindings or not self.rcvObjects:
@@ -545,6 +647,7 @@ class Session:
       if broker.isConnected():
         broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName,
                                          binding_key=key)
+
       
   def bindClassKey(self, classKey):
     """ Request object updates for a particular table class by class key. """
@@ -552,6 +655,7 @@ class Session:
     cname = classKey.getClassName()
     self.bindClass(pname, cname)
 
+
   def getAgents(self, broker=None):
     """ Get a list of currently known agents """
     brokerList = []
@@ -569,12 +673,14 @@ class Session:
         agentList.append(a)
     return agentList
 
-  def makeObject(self, classKey, broker=None, **kwargs):
+
+  def makeObject(self, classKey, **kwargs):
     """ Create a new, unmanaged object of the schema indicated by classKey """
     schema = self.getSchema(classKey)
     if schema == None:
       raise Exception("Schema not found for classKey")
-    return Object(self, broker, schema, None, True, True, False, kwargs)
+    return Object(None, schema, None, True, True, kwargs)
+
 
   def getObjects(self, **kwargs):
     """ Get a list of objects from QMF agents.
@@ -644,81 +750,24 @@ class Session:
     if len(agentList) == 0:
       return []
 
-    pname = None
-    cname = None
-    hash = None
-    classKey = None
-    if   "_schema" in kwargs: classKey = kwargs["_schema"].getKey()
-    elif "_key"    in kwargs: classKey = kwargs["_key"]
-    elif "_class"  in kwargs:
-      cname = kwargs["_class"]
-      if "_package" in kwargs:
-        pname = kwargs["_package"]
-    if cname == None and classKey == None and "_objectId" not in kwargs:
-      raise Exception("No class supplied, use '_schema', '_key', '_class', or '_objectId' argument")
-
-    map = {}
-    self.getSelect = []
-    if "_objectId" in kwargs:
-      map["_objectid"] = kwargs["_objectId"].__repr__()
-    else:
-      if cname == None:
-        cname = classKey.getClassName()
-        pname = classKey.getPackageName()
-        hash = classKey.getHash()
-      map["_class"] = cname
-      if pname != None: map["_package"] = pname
-      if hash  != None: map["_hash"]    = hash
-      for item in kwargs:
-        if item[0] != '_':
-          self.getSelect.append((item, kwargs[item]))
-
-    self.getResult = []
+    #
+    # We now have a list of agents to query, start the queries and gather the results.
+    #
+    request = SessionGetRequest(len(agentList))
     for agent in agentList:
-      broker = agent.broker
-      sendCodec = Codec()
-      try:
-        self.cv.acquire()
-        seq = self.seqMgr._reserve(self._CONTEXT_MULTIGET)
-        self.syncSequenceList.append(seq)
-      finally:
-        self.cv.release()
-      broker._setHeader(sendCodec, 'G', seq)
-      sendCodec.write_map(map)
-      smsg = broker._message(sendCodec.encoded, "agent.%d.%d" % (agent.brokerBank, agent.agentBank))
-      broker._send(smsg)
-
-    starttime = time()
-    timeout = False
-    if "_timeout" in kwargs:
-      waitTime = kwargs["_timeout"]
-    else:
-      waitTime = self.DEFAULT_GET_WAIT_TIME
-    try:
-      self.cv.acquire()
-      while len(self.syncSequenceList) > 0 and self.error == None:
-        self.cv.wait(waitTime)
-        if time() - starttime > waitTime:
-          for pendingSeq in self.syncSequenceList:
-            self.seqMgr._release(pendingSeq)
-          self.syncSequenceList = []
-          timeout = True
-    finally:
-      self.cv.release()
-
-    if self.error:
-      errorText = self.error
-      self.error = None
-      raise Exception(errorText)
+      agent.getObjects(request, **kwargs)
+    timeout = 60
+    if '_timeout' in kwargs:
+        timeout = kwargs['_timeout']
+    request.wait(timeout)
+    return request.result
 
-    if len(self.getResult) == 0 and timeout:
-      raise RuntimeError("No agent responded within timeout period")
-    return self.getResult
 
   def setEventFilter(self, **kwargs):
     """ """
     pass
 
+
   def _bindingKeys(self):
     keyList = []
     keyList.append("schema.#")
@@ -735,18 +784,21 @@ class Session:
         keyList.append("console.heartbeat.#")
     return keyList
 
+
   def _handleBrokerConnect(self, broker):
     if self.console:
       for agent in broker.getAgents():
         self.console.newAgent(agent)
       self.console.brokerConnected(broker)
 
+
   def _handleBrokerDisconnect(self, broker):
     if self.console:
       for agent in broker.getAgents():
         self.console.delAgent(agent)
       self.console.brokerDisconnected(broker)
 
+
   def _handleBrokerResp(self, broker, codec, seq):
     broker.brokerId = codec.read_uuid()
     if self.console != None:
@@ -760,16 +812,10 @@ class Session:
     smsg = broker._message(sendCodec.encoded)
     broker._send(smsg)
 
+
   def _handlePackageInd(self, broker, codec, seq):
     pname = str(codec.read_str8())
-    notify = False
-    try:
-      self.cv.acquire()
-      if pname not in self.packages:
-        self.packages[pname] = {}
-        notify = True
-    finally:
-      self.cv.release()
+    notify = self.schemaCache.declarePackage(pname)
     if notify and self.console != None:
       self.console.newPackage(pname)
 
@@ -782,7 +828,8 @@ class Session:
     smsg = broker._message(sendCodec.encoded)
     broker._send(smsg)
 
-  def _handleCommandComplete(self, broker, codec, seq):
+
+  def _handleCommandComplete(self, broker, codec, seq, agent):
     code = codec.read_uint32()
     text = codec.read_str8()
     context = self.seqMgr._release(seq)
@@ -804,20 +851,16 @@ class Session:
       finally:
         self.cv.release()
 
+    if agent:
+      agent._handleV1Completion(seq, code, text)
+
+
   def _handleClassInd(self, broker, codec, seq):
     kind  = codec.read_uint8()
     classKey = ClassKey(codec)
-    unknown = False
-
-    try:
-      self.cv.acquire()
-      if classKey.getPackageName() in self.packages:
-        if classKey.getPackageKey() not in self.packages[classKey.getPackageName()]:
-          unknown = True
-    finally:
-      self.cv.release()
+    schema = self.schemaCache.getSchema(classKey)
 
-    if unknown:
+    if not schema:
       # Send a schema request for the unknown class
       broker._incOutstanding()
       sendCodec = Codec()
@@ -827,30 +870,6 @@ class Session:
       smsg = broker._message(sendCodec.encoded)
       broker._send(smsg)
 
-  def _handleMethodResp(self, broker, codec, seq):
-    code = codec.read_uint32()
-    text = codec.read_str16()
-    outArgs = {}
-    pair = self.seqMgr._release(seq)
-    if pair == None:
-      return
-    method, synchronous = pair
-    if code == 0:
-      for arg in method.arguments:
-        if arg.dir.find("O") != -1:
-          outArgs[arg.name] = self._decodeValue(codec, arg.type, broker)
-    result = MethodResult(code, text, outArgs)
-    if synchronous:
-      try:
-        broker.cv.acquire()
-        broker.syncResult = result
-        broker.syncInFlight = False
-        broker.cv.notify()
-      finally:
-        broker.cv.release()
-    else:
-      if self.console:
-        self.console.methodResponse(broker, seq, result)
 
   def _handleHeartbeatInd(self, broker, codec, seq, msg):
     brokerBank = 1
@@ -873,59 +892,49 @@ class Session:
     timestamp = codec.read_uint64()
     if self.console != None and agent != None:
       self.console.heartbeat(agent, timestamp)
+    broker._ageAgents()
 
-  def _handleEventInd(self, broker, codec, seq):
-    if self.console != None:
-      event = Event(self, broker, codec)
-      self.console.event(broker, event)
 
-  def _handleSchemaResp(self, broker, codec, seq):
+  def _handleSchemaResp(self, broker, codec, seq, agent_addr):
     kind  = codec.read_uint8()
     classKey = ClassKey(codec)
     _class = SchemaClass(kind, classKey, codec, self)
-    try:
-      self.cv.acquire()
-      self.packages[classKey.getPackageName()][classKey.getPackageKey()] = _class
-    finally:
-      self.cv.release()
-      
-    self.seqMgr._release(seq)
-    broker._decOutstanding()
+    self.schemaCache.declareClass(classKey, _class)
+    ctx = self.seqMgr._release(seq)
+    if ctx:
+      broker._decOutstanding()
     if self.console != None:
       self.console.newClass(kind, classKey)
 
-  def _handleContentInd(self, broker, codec, seq, prop=False, stat=False):
-    classKey = ClassKey(codec)
-    try:
-      self.cv.acquire()
-      pname = classKey.getPackageName()
-      if pname not in self.packages:
-        return
-      pkey = classKey.getPackageKey()
-      if pkey not in self.packages[pname]:
-        return
-      schema = self.packages[pname][pkey]
-    finally:
-      self.cv.release()
+    if agent_addr and (agent_addr.__class__ == str or agent_addr.__class__ == unicode):
+      agent = self._getAgentForAgentAddr(agent_addr)
+      if agent:
+        agent._schemaInfoFromV2Agent()
 
-    object = Object(self, broker, schema, codec, prop, stat)
-    if pname == "org.apache.qpid.broker" and classKey.getClassName() == "agent" and prop:
-      broker._updateAgent(object)
 
+  def _v2HandleHeartbeatInd(self, broker, mp, ah, content):
     try:
-      self.cv.acquire()
-      if seq in self.syncSequenceList:
-        if object.getTimestamps()[2] == 0 and self._selectMatch(object):
-          self.getResult.append(object)
-        return
-    finally:
-      self.cv.release()
+      agentName = ah["qmf.agent"]
+      values = content["_values"]
+      timestamp = values["timestamp"]
+      interval = values["heartbeat_interval"]
+    except:
+      return
+
+    agent = broker.getAgent(1, agentName)
+    if agent == None:
+      agent = Agent(broker, agentName, "QMFv2 Agent", True, interval)
+      broker._addAgent(agentName, agent)
+    else:
+      agent.touch()
+    if self.console and agent:
+      self.console.heartbeat(agent, timestamp)
+    broker._ageAgents()
+
+
+  def _v2HandleAgentLocateRsp(self, broker, mp, ah, content):
+    self._v2HandleHeartbeatInd(broker, mp, ah, content)
 
-    if self.console and self.rcvObjects:
-      if prop:
-        self.console.objectProps(broker, object)
-      if stat:
-        self.console.objectStats(broker, object)
 
   def _handleError(self, error):
     try:
@@ -937,6 +946,7 @@ class Session:
     finally:
       self.cv.release()
 
+
   def _selectMatch(self, object):
     """ Check the object against self.getSelect to check for a match """
     for key, value in self.getSelect:
@@ -945,6 +955,7 @@ class Session:
           return False
     return True
   
+
   def _decodeValue(self, codec, typecode, broker=None):
     """ Decode, from the codec, a value based on its typecode. """
     if   typecode == 1:  data = codec.read_uint8()      # U8
@@ -971,17 +982,9 @@ class Session:
       inner_type_code = codec.read_uint8()
       if inner_type_code == 20:
           classKey = ClassKey(codec)
-          try:
-            self.cv.acquire()
-            pname = classKey.getPackageName()
-            if pname not in self.packages:
-              return None
-            pkey = classKey.getPackageKey()
-            if pkey not in self.packages[pname]:
-              return None
-            schema = self.packages[pname][pkey]
-          finally:
-            self.cv.release()
+          schema = self.schemaCache.getSchema(classKey)
+          if not schema:
+            return None
           data = Object(self, broker, schema, codec, True, True, False)
       else:
           data = self._decodeValue(codec, inner_type_code, broker)
@@ -999,6 +1002,7 @@ class Session:
       raise ValueError("Invalid type code: %d" % typecode)
     return data
 
+
   def _encodeValue(self, codec, value, typecode):
     """ Encode, into the codec, a value based on its typecode. """
     if   typecode == 1:  codec.write_uint8  (int(value))    # U8
@@ -1033,9 +1037,11 @@ class Session:
     else:
       raise ValueError ("Invalid type code: %d" % typecode)
 
+
   def encoding(self, value):
       return self._encoding(value.__class__)
       
+
   def _encoding(self, klass):
     if Session.ENCODINGS.has_key(klass):
       return self.ENCODINGS[klass]
@@ -1044,6 +1050,7 @@ class Session:
       if result != None:
         return result
             
+
   def _displayValue(self, value, typecode):
     """ """
     if   typecode == 1:  return unicode(value)
@@ -1072,6 +1079,7 @@ class Session:
     else:
       raise ValueError ("Invalid type code: %d" % typecode)
     
+
   def _defaultValue(self, stype, broker=None, kwargs={}):
     """ """
     typecode = stype.type
@@ -1110,6 +1118,7 @@ class Session:
     else:
       raise ValueError ("Invalid type code: %d" % typecode)
 
+
   def _bestClassKey(self, pname, cname, preferredList):
     """ """
     if pname == None or cname == None:
@@ -1125,6 +1134,7 @@ class Session:
         return c
     return None
     
+
   def _sendMethodRequest(self, broker, schemaKey, objectId, name, argList):
     """ This function can be used to send a method request to an object given only the
     broker, schemaKey, and objectId.  This is an uncommon usage pattern as methods are
@@ -1133,14 +1143,10 @@ class Session:
     schema = self.getSchema(schemaKey)
     for method in schema.getMethods():
       if name == method.name:
-        aIdx = 0
-        sendCodec = Codec()
-        seq = self.seqMgr._reserve((method, False))
-        broker._setHeader(sendCodec, 'M', seq)
-        objectId.encode(sendCodec)
-        schemaKey.encode(sendCodec)
-        sendCodec.write_str8(name)
-
+        #
+        # Count the arguments supplied and validate that the number is what is expected
+        # based on the schema.
+        #
         count = 0
         for arg in method.arguments:
           if arg.dir.find("I") != -1:
@@ -1148,25 +1154,192 @@ class Session:
         if count != len(argList):
           raise Exception("Incorrect number of arguments: expected %d, got %d" % (count, len(argList)))
 
-        for arg in method.arguments:
-          if arg.dir.find("I") != -1:
-            self._encodeValue(sendCodec, argList[aIdx], arg.type)
-            aIdx += 1
-        smsg = broker._message(sendCodec.encoded, "agent.%d.%d" %
-                               (objectId.getBrokerBank(), objectId.getAgentBank()))
-        broker._send(smsg)
+        aIdx = 0
+        sendCodec = Codec()
+        seq = self.seqMgr._reserve((method, False))
+
+        if objectId.isV2():
+          #
+          # Compose and send a QMFv2 method request
+          #
+          call = {}
+          call['_object_id'] = objectId.asMap()
+          call['_method_name'] = name
+          args = {}
+          for arg in method.arguments:
+            if arg.dir.find("I") != -1:
+              args[arg.name] = argList[aIdx]
+              aIdx += 1
+          call['_arguments'] = args
+
+          dp = broker.amqpSession.delivery_properties()
+          dp.routing_key = objectId.getAgentBank()
+          mp = broker.amqpSession.message_properties()
+          mp.content_type = "amqp/map"
+          mp.user_id = broker.authUser
+          mp.correlation_id = str(seq)
+          mp.app_id = "qmf2"
+          mp.reply_to = broker.amqpSession.reply_to("qmf.default.direct", broker.v2_queue_name)
+          mp.application_headers = {'qmf.opcode':'_method_request'}
+          sendCodec.write_map(call)
+          msg = Message(dp, mp, sendCodec.encoded)
+          broker._send(msg, "qmf.default.direct")
+
+        else:
+          #
+          # Associate this sequence with the agent hosting the object so we can correctly
+          # route the method-response
+          #
+          agent = broker.getAgent(broker.getBrokerBank(), objectId.getAgentBank())
+          broker._setSequence(seq, agent)
+
+          #
+          # Compose and send a QMFv1 method request
+          #
+          broker._setHeader(sendCodec, 'M', seq)
+          objectId.encode(sendCodec)
+          schemaKey.encode(sendCodec)
+          sendCodec.write_str8(name)
+
+          for arg in method.arguments:
+            if arg.dir.find("I") != -1:
+              self._encodeValue(sendCodec, argList[aIdx], arg.type)
+              aIdx += 1
+              smsg = broker._message(sendCodec.encoded, "agent.%d.%s" %
+                                     (objectId.getBrokerBank(), objectId.getAgentBank()))
+          broker._send(smsg)
         return seq
     return None
 
-class Package:
-  """ """
-  def __init__(self, name):
-    self.name = name
 
+#===================================================================================================
+# SessionGetRequest
+#===================================================================================================
+class SessionGetRequest(object):
+  """
+  This class is used to track get-object queries at the Session level.
+  """
+  def __init__(self, agentCount):
+    self.agentCount = agentCount
+    self.result = []
+    self.cv = Condition()
+    self.waiting = True
+
+  def __call__(self, **kwargs):
+    """
+    Callable entry point for gathering collected objects.
+    """
+    try:
+      self.cv.acquire()
+      if 'qmf_object' in kwargs:
+        self.result.append(kwargs['qmf_object'])
+      elif 'qmf_complete' in kwargs or 'qmf_exception' in kwargs:
+        self.agentCount -= 1
+        if self.agentCount == 0:
+          self.waiting = None
+          self.cv.notify()
+    finally:
+      self.cv.release()
+
+  def wait(self, timeout):
+    starttime = time()
+    try:
+      self.cv.acquire()
+      while self.waiting:
+        if (time() - starttime) > timeout:
+          raise Exception("Timed out after %d seconds" % timeout)
+        self.cv.wait(1)
+    finally:
+      self.cv.release()
+
+
+#===================================================================================================
+# SchemaCache
+#===================================================================================================
+class SchemaCache(object):
+  """
+  The SchemaCache is a data structure that stores learned schema information.
+  """
+  def __init__(self):
+    """
+    Create a map of schema packages and a lock to protect this data structure.
+    Note that this lock is at the bottom of any lock hierarchy.  If it is held, no other
+    lock in the system should attempt to be acquired.
+    """
+    self.packages = {}
+    self.lock = Lock()
+
+  def getPackages(self):
+    """ Get the list of known QMF packages """
+    list = []
+    try:
+      self.lock.acquire()
+      for package in self.packages:
+        list.append(package)
+    finally:
+      self.lock.release()
+    return list
+
+  def getClasses(self, packageName):
+    """ Get the list of known classes within a QMF package """
+    list = []
+    try:
+      self.lock.acquire()
+      if packageName in self.packages:
+        for pkey in self.packages[packageName]:
+          list.append(self.packages[packageName][pkey].getKey())
+    finally:
+      self.lock.release()
+    return list
+
+  def getSchema(self, classKey):
+    """ Get the schema for a QMF class """
+    pname = classKey.getPackageName()
+    pkey = classKey.getPackageKey()
+    try:
+      self.lock.acquire()
+      if pname in self.packages:
+        if pkey in self.packages[pname]:
+          return self.packages[pname][pkey]
+    finally:
+      self.lock.release()
+    return None
+
+  def declarePackage(self, pname):
+    """ Maybe add a package to the cache.  Return True if package was added, None if it pre-existed. """
+    try:
+      self.lock.acquire()
+      if pname in self.packages:
+        return None
+      self.packages[pname] = {}
+    finally:
+      self.lock.release()
+    return True
+
+  def declareClass(self, classKey, classDef):
+    """ Maybe add a class definition to the cache.  Return True if added, None if pre-existed. """
+    pname = classKey.getPackageName()
+    pkey = classKey.getPackageKey()
+    try:
+      self.lock.acquire()
+      if pname not in self.packages:
+        self.packages[pname] = {}
+      packageMap = self.packages[pname]
+      if pkey in packageMap:
+        return None
+      packageMap[pkey] = classDef
+    finally:
+      self.lock.release()
+    return True
+
+
+#===================================================================================================
+# ClassKey
+#===================================================================================================
 class ClassKey:
   """ A ClassKey uniquely identifies a class from the schema. """
   def __init__(self, constructor):
-    if type(constructor) == str:
+    if constructor.__class__ == str:
       # construct from __repr__ string
       try:
         self.pname, cls = constructor.split(":")
@@ -1177,20 +1350,33 @@ class ClassKey:
         h1 = int(hexValues[1], 16)
         h2 = int(hexValues[2], 16)
         h3 = int(hexValues[3], 16)
-        self.hash = struct.pack("!LLLL", h0, h1, h2, h3)
+        h4 = int(hexValues[4][0:4], 16)
+        h5 = int(hexValues[4][4:12], 16)
+        self.hash = UUID(struct.pack("!LHHHHL", h0, h1, h2, h3, h4, h5))
       except:
         raise Exception("Invalid ClassKey format")
+    elif constructor.__class__ == dict:
+      # construct from QMFv2 map
+      try:
+        self.pname = constructor['_package_name']
+        self.cname = constructor['_class_name']
+        self.hash  = constructor['_hash']
+      except:
+        raise Exception("Invalid ClassKey map format")
     else:
       # construct from codec
       codec = constructor
       self.pname = str(codec.read_str8())
       self.cname = str(codec.read_str8())
-      self.hash  = codec.read_bin128()
+      self.hash  = UUID(codec.read_bin128())
 
   def encode(self, codec):
     codec.write_str8(self.pname)
     codec.write_str8(self.cname)
-    codec.write_bin128(self.hash)
+    codec.write_bin128(self.hash.bytes)
+
+  def asMap(self):
+    return {'_package_name': self.pname, '_class_name': self.cname, '_hash': self.hash}
 
   def getPackageName(self):
     return self.pname
@@ -1202,7 +1388,7 @@ class ClassKey:
     return self.hash
 
   def getHashString(self):
-    return "%08x-%08x-%08x-%08x" % struct.unpack ("!LLLL", self.hash)
+    return str(self.hash)
 
   def getPackageKey(self):
     return (self.cname, self.hash)
@@ -1210,6 +1396,10 @@ class ClassKey:
   def __repr__(self):
     return self.pname + ":" + self.cname + "(" + self.getHashString() + ")"
 
+
+#===================================================================================================
+# SchemaClass
+#===================================================================================================
 class SchemaClass:
   """ """
   CLASS_KIND_TABLE = 1
@@ -1292,6 +1482,10 @@ class SchemaClass:
     else:
         return self.arguments + self.session.getSchema(self.superTypeKey).getArguments()  
 
+
+#===================================================================================================
+# SchemaProperty
+#===================================================================================================
 class SchemaProperty:
   """ """
   def __init__(self, codec):
@@ -1321,6 +1515,10 @@ class SchemaProperty:
   def __repr__(self):
     return self.name
 
+
+#===================================================================================================
+# SchemaStatistic
+#===================================================================================================
 class SchemaStatistic:
   """ """
   def __init__(self, codec):
@@ -1337,6 +1535,10 @@ class SchemaStatistic:
   def __repr__(self):
     return self.name
 
+
+#===================================================================================================
+# SchemaMethod
+#===================================================================================================
 class SchemaMethod:
   """ """
   def __init__(self, codec):
@@ -1365,6 +1567,10 @@ class SchemaMethod:
     result += ")"
     return result
 
+
+#===================================================================================================
+# SchemaArgument
+#===================================================================================================
 class SchemaArgument:
   """ """
   def __init__(self, codec, methodArg):
@@ -1392,64 +1598,113 @@ class SchemaArgument:
       elif key == "refPackage" : self.refPackage = value
       elif key == "refClass"   : self.refClass = value
 
+
+#===================================================================================================
+# ObjectId
+#===================================================================================================
 class ObjectId:
   """ Object that represents QMF object identifiers """
-  def __init__(self, codec, first=0, second=0):
-    if codec:
-      self.first  = codec.read_uint64()
-      self.second = codec.read_uint64()
-    else:
-      self.first = first
-      self.second = second
+  def __init__(self, constructor, first=0, second=0, agentName=None):
+    if  constructor.__class__ == dict:
+      self.agentName = agentName
+      self.agentEpoch = 0
+      if '_agent_name' in constructor:  self.agentName = constructor['_agent_name']
+      if '_agent_epoch' in constructor: self.agentEpoch = constructor['_agent_epoch']
+      if '_object_name' not in constructor:
+        raise Exception("QMFv2 OBJECT_ID must have the '_object_name' field.")
+      self.objectName = constructor['_object_name']
+    else:
+      if not constructor:
+        first = first
+        second = second
+      else:
+        first  = constructor.read_uint64()
+        second = constructor.read_uint64()
+      self.agentName = str(first & 0x000000000FFFFFFF)
+      self.agentEpoch = (first & 0x0FFF000000000000) >> 48
+      self.objectName = str(second)
 
   def __cmp__(self, other):    
     if other == None or not isinstance(other, ObjectId) :
       return 1
-    if self.first < other.first:
+
+    if self.objectName < other.objectName:
+      return -1
+    if self.objectName > other.objectName:
+      return 1
+
+    if self.agentName < other.agentName:
       return -1
-    if self.first > other.first:
+    if self.agentName > other.agentName:
       return 1
-    if self.second < other.second:
+
+    if self.agentEpoch < other.agentEpoch:
       return -1
-    if self.second > other.second:
+    if self.agentEpoch > other.agentEpoch:
       return 1
     return 0
 
   def __repr__(self):
-    return "%d-%d-%d-%d-%d" % (self.getFlags(), self.getSequence(),
+    return "%d-%d-%d-%s-%s" % (self.getFlags(), self.getSequence(),
                                self.getBrokerBank(), self.getAgentBank(), self.getObject())
 
+  def isV2(self):
+    return not self.agentName.isdigit()
+
   def index(self):
-    return (self.first, self.second)
+    return self.__repr__()
 
   def getFlags(self):
-    return (self.first & 0xF000000000000000) >> 60
+    return 0
 
   def getSequence(self):
-    return (self.first & 0x0FFF000000000000) >> 48
+    return self.agentEpoch
 
   def getBrokerBank(self):
-    return (self.first & 0x0000FFFFF0000000) >> 28
+    return 1
 
   def getAgentBank(self):
-    return self.first & 0x000000000FFFFFFF
+    return self.agentName
 
   def getObject(self):
-    return self.second
+    return self.objectName
 
   def isDurable(self):
     return self.getSequence() == 0
 
   def encode(self, codec):
-    codec.write_uint64(self.first)
-    codec.write_uint64(self.second)
-
-  def __hash__(self):
-    return (self.first, self.second).__hash__()
+    first = (self.agentEpoch << 48) + (1 << 28)
+    second = 0
+
+    try:
+      first += int(self.agentName)
+    except:
+      pass
+
+    try:
+      second = int(self.objectName)
+    except:
+      pass
+
+    codec.write_uint64(first)
+    codec.write_uint64(second)
+
+  def asMap(self):
+    omap = {'_agent_name': self.agentName, '_object_name': self.objectName}
+    if self.agentEpoch != 0:
+      omap['_agent_epoch'] = self.agentEpoch
+    return omap
+
+  def __hash__(self):
+    return self.__repr__().__hash__()
 
   def __eq__(self, other):
-    return (self.first, self.second).__eq__(other)
+    return self.__repr__().__eq__(other)
+
 
+#===================================================================================================
+# MethodResult
+#===================================================================================================
 class MethodResult(object):
   """ """
   def __init__(self, status, text, outArgs):
@@ -1465,6 +1720,10 @@ class MethodResult(object):
   def __repr__(self):
     return "%s (%d) - %s" % (self.text, self.status, self.outArgs)
 
+
+#===================================================================================================
+# ManagedConnection
+#===================================================================================================
 class ManagedConnection(Thread):
   """ Thread class for managing a connection. """
   DELAY_MIN = 1
@@ -1527,6 +1786,10 @@ class ManagedConnection(Thread):
       finally:
         self.cv.release()
 
+
+#===================================================================================================
+# Broker
+#===================================================================================================
 class Broker:
   """ This object represents a connection (or potential connection) to a QMF broker. """
   SYNC_TIME = 60
@@ -1542,6 +1805,7 @@ class Broker:
     self.authUser = authUser
     self.authPass = authPass
     self.cv = Condition()
+    self.seqToAgentMap = {}
     self.error = None
     self.brokerId = None
     self.connected = False
@@ -1574,9 +1838,13 @@ class Broker:
 
   def getAgent(self, brokerBank, agentBank):
     """ Return the agent object associated with a particular broker and agent bank value."""
-    bankKey = (brokerBank, agentBank)
-    if bankKey in self.agents:
-      return self.agents[bankKey]
+    bankKey = str(agentBank)
+    try:
+      self.cv.acquire()
+      if bankKey in self.agents:
+        return self.agents[bankKey]
+    finally:
+      self.cv.release()
     return None
 
   def getSessionId(self):
@@ -1585,7 +1853,11 @@ class Broker:
 
   def getAgents(self):
     """ Get the list of agents reachable via this broker """
-    return self.agents.values()
+    try:
+      self.cv.acquire()
+      return self.agents.values()
+    finally:
+      self.cv.release()
 
   def getAmqpSession(self):
     """ Get the AMQP session object for this connected broker. """
@@ -1612,10 +1884,29 @@ class Broker:
     else:
       return "Disconnected Broker"
 
+  def _setSequence(self, sequence, agent):
+    try:
+      self.cv.acquire()
+      self.seqToAgentMap[sequence] = agent
+    finally:
+      self.cv.release()
+
+  def _clearSequence(self, sequence):
+    try:
+      self.cv.acquire()
+      self.seqToAgentMap.pop(sequence)
+    finally:
+      self.cv.release()
+
   def _tryToConnect(self):
     try:
-      self.agents = {}
-      self.agents[(1,0)] = Agent(self, 0, "BrokerAgent")
+      try:
+        self.cv.acquire()
+        self.agents = {}
+        self.agents['0'] = Agent(self, 0, "BrokerAgent")
+      finally:
+        self.cv.release()
+
       self.topicBound = False
       self.syncInFlight = False
       self.syncRequest = 0
@@ -1649,7 +1940,7 @@ class Broker:
       self.amqpSession.message_subscribe(queue=self.replyName, destination="rdest",
                                          accept_mode=self.amqpSession.accept_mode.none,
                                          acquire_mode=self.amqpSession.acquire_mode.pre_acquired)
-      self.amqpSession.incoming("rdest").listen(self._replyCb, self._exceptionCb)
+      self.amqpSession.incoming("rdest").listen(self._v1Cb, self._exceptionCb)
       self.amqpSession.message_set_flow_mode(destination="rdest", flow_mode=1)
       self.amqpSession.message_flow(destination="rdest", unit=0, value=0xFFFFFFFFL)
       self.amqpSession.message_flow(destination="rdest", unit=1, value=0xFFFFFFFFL)
@@ -1659,11 +1950,29 @@ class Broker:
       self.amqpSession.message_subscribe(queue=self.topicName, destination="tdest",
                                          accept_mode=self.amqpSession.accept_mode.none,
                                          acquire_mode=self.amqpSession.acquire_mode.pre_acquired)
-      self.amqpSession.incoming("tdest").listen(self._replyCb)
+      self.amqpSession.incoming("tdest").listen(self._v1Cb)
       self.amqpSession.message_set_flow_mode(destination="tdest", flow_mode=1)
       self.amqpSession.message_flow(destination="tdest", unit=0, value=0xFFFFFFFFL)
       self.amqpSession.message_flow(destination="tdest", unit=1, value=0xFFFFFFFFL)
 
+      ##
+      ## Set up connectivity for QMFv2
+      ##
+      self.v2_queue_name = "qmfc-v2-%s" % self.amqpSessionId
+      self.amqpSession.queue_declare(queue=self.v2_queue_name, exclusive=True, auto_delete=True)
+      self.amqpSession.exchange_bind(exchange="qmf.default.direct",
+                                     queue=self.v2_queue_name, binding_key=self.v2_queue_name)
+      self.amqpSession.exchange_bind(exchange="qmf.default.topic",
+                                     queue=self.v2_queue_name, binding_key="agent.#")
+      ## Other bindings here...
+      self.amqpSession.message_subscribe(queue=self.v2_queue_name, destination="v2dest",
+                                         accept_mode=self.amqpSession.accept_mode.none,
+                                         acquire_mode=self.amqpSession.acquire_mode.pre_acquired)
+      self.amqpSession.incoming("v2dest").listen(self._v2Cb, self._exceptionCb)
+      self.amqpSession.message_set_flow_mode(destination="v2dest", flow_mode=1)
+      self.amqpSession.message_flow(destination="v2dest", unit=0, value=0xFFFFFFFFL)
+      self.amqpSession.message_flow(destination="v2dest", unit=1, value=0xFFFFFFFFL)
+
       self.connected = True
       self.session._handleBrokerConnect(self)
 
@@ -1671,6 +1980,7 @@ class Broker:
       self._setHeader(codec, 'B')
       msg = self._message(codec.encoded)
       self._send(msg)
+      self._v2SendAgentLocate()
 
     except socket.error, e:
       self.error = "Socket Error %s - %s" % (e.__class__.__name__, e)
@@ -1683,18 +1993,73 @@ class Broker:
       raise
 
   def _updateAgent(self, obj):
-    bankKey = (obj.brokerBank, obj.agentBank)
+    bankKey = str(obj.agentBank)
+    agent = None
     if obj._deleteTime == 0:
-      if bankKey not in self.agents:
-        agent = Agent(self, obj.agentBank, obj.label)
-        self.agents[bankKey] = agent
-        if self.session.console != None:
-          self.session.console.newAgent(agent)
+      try:
+        self.cv.acquire()
+        if bankKey not in self.agents:
+          agent = Agent(self, obj.agentBank, obj.label)
+          self.agents[bankKey] = agent
+      finally:
+        self.cv.release()
+      if agent and self.session.console:
+        self.session.console.newAgent(agent)
     else:
-      agent = self.agents.pop(bankKey, None)
-      if agent != None and self.session.console != None:
+      try:
+        self.cv.acquire()
+        agent = self.agents.pop(bankKey, None)
+        if agent:
+          agent.close()
+      finally:
+        self.cv.release()
+      if agent and self.session.console:
         self.session.console.delAgent(agent)
 
+  def _addAgent(self, name, agent):
+    try:
+      self.cv.acquire()
+      self.agents[name] = agent
+    finally:
+      self.cv.release()
+    if self.session.console:
+      self.session.console.newAgent(agent)
+
+  def _ageAgents(self):
+    try:
+      self.cv.acquire()
+      to_delete = []
+      to_notify = []
+      for key in self.agents:
+        if self.agents[key].isOld():
+          to_delete.append(key)
+      for key in to_delete:
+        agent = self.agents.pop(key)
+        agent.close()
+        to_notify.append(agent)
+    finally:
+      self.cv.release()
+    if self.session.console:
+      for agent in to_notify:
+        self.session.console.delAgent(agent)
+
+  def _v2SendAgentLocate(self, predicate={}):
+    """
+    Broadcast an agent-locate request to cause all agents in the domain to tell us who they are.
+    """
+    dp = self.amqpSession.delivery_properties()
+    dp.routing_key = "console.request.agent_locate"
+    mp = self.amqpSession.message_properties()
+    mp.content_type = "amqp/map"
+    mp.user_id = self.authUser
+    mp.app_id = "qmf2"
+    mp.reply_to = self.amqpSession.reply_to("qmf.default.direct", self.v2_queue_name)
+    mp.application_headers = {'qmf.opcode':'_agent_locate_request'}
+    sendCodec = Codec()
+    sendCodec.write_map(predicate)
+    msg = Message(dp, mp, sendCodec.encoded)
+    self._send(msg, "qmf.default.topic")
+
   def _setHeader(self, codec, opcode, seq=0):
     """ Compose the header of a management message. """
     codec.write_uint8(ord('A'))
@@ -1785,24 +2150,105 @@ class Broker:
     finally:
       self.cv.release()
 
-  def _replyCb(self, msg):
+  def _v1Cb(self, msg):
+    try:
+      self._v1CbProtected(msg)
+    except Exception, e:
+      print "EXCEPTION in Broker._v1Cb:", e
+
+  def _v1CbProtected(self, msg):
+    """
+    This is the general message handler for messages received via the QMFv1 exchanges.
+    """
+    agent = None
+    agent_addr = None
+    mp = msg.get("message_properties")
+    ah = mp.application_headers
+    if ah and 'qmf.agent' in ah:
+      agent_addr = ah['qmf.agent']
+
+    if not agent_addr:
+      #
+      # See if we can determine the agent identity from the routing key
+      #
+      dp = msg.get("delivery_properties")
+      rkey = None
+      if dp.routing_key:
+        rkey = dp.routing_key
+        items = rkey.split('.')
+        if len(items) >= 4:
+          if items[0] == 'console' and items[3].isdigit():
+            agent_addr = str(items[3])  # The QMFv1 Agent Bank
+    if agent_addr != None and agent_addr in self.agents:
+      agent = self.agents[agent_addr]
+
     codec = Codec(msg.body)
+    alreadyTried = None
     while True:
       opcode, seq = self._checkHeader(codec)
+
+      if not agent and not alreadyTried:
+        alreadyTried = True
+        try:
+          self.cv.acquire()
+          if seq in self.seqToAgentMap:
+            agent = self.seqToAgentMap[seq]
+        finally:
+          self.cv.release()
+
       if   opcode == None: return
       if   opcode == 'b': self.session._handleBrokerResp      (self, codec, seq)
       elif opcode == 'p': self.session._handlePackageInd      (self, codec, seq)
-      elif opcode == 'z': self.session._handleCommandComplete (self, codec, seq)
       elif opcode == 'q': self.session._handleClassInd        (self, codec, seq)
-      elif opcode == 'm': self.session._handleMethodResp      (self, codec, seq)
+      elif opcode == 's': self.session._handleSchemaResp      (self, codec, seq, agent_addr)
       elif opcode == 'h': self.session._handleHeartbeatInd    (self, codec, seq, msg)
-      elif opcode == 'e': self.session._handleEventInd        (self, codec, seq)
-      elif opcode == 's': self.session._handleSchemaResp      (self, codec, seq)
-      elif opcode == 'c': self.session._handleContentInd      (self, codec, seq, prop=True)
-      elif opcode == 'i': self.session._handleContentInd      (self, codec, seq, stat=True)
-      elif opcode == 'g': self.session._handleContentInd      (self, codec, seq, prop=True, stat=True)
-    self.session.receiver._completed.add(msg.id)
-    self.session.channel.session_completed(self.session.receiver._completed)
+      elif opcode == 'z': self.session._handleCommandComplete (self, codec, seq, agent)
+      elif agent:
+        agent._handleQmfV1Message(opcode, seq, mp, ah, codec)
+
+    self.amqpSession.receiver._completed.add(msg.id)
+    self.amqpSession.channel.session_completed(self.amqpSession.receiver._completed)
+
+  def _v2Cb(self, msg):
+    """
+    This is the general message handler for messages received via QMFv2 exchanges.
+    """
+    mp = msg.get("message_properties")
+    ah = mp["application_headers"]
+    codec = Codec(msg.body)
+
+    if 'qmf.opcode' in ah:
+      opcode = ah['qmf.opcode']
+      if mp.content_type == "amqp/list":
+        content = codec.read_list()
+        if not content:
+          content = []
+      elif mp.content_type == "amqp/map":
+        content = codec.read_map()
+        if not content:
+          content = {}
+      else:
+        content = None
+
+      if content != None:
+        ##
+        ## Directly handle agent heartbeats and agent locate responses as these are broker-scope (they are
+        ## used to maintain the broker's list of agent proxies.
+        ##
+        if   opcode == '_agent_heartbeat_indication': self.session._v2HandleHeartbeatInd(self, mp, ah, content)
+        elif opcode == '_agent_locate_response':      self.session._v2HandleAgentLocateRsp(self, mp, ah, content)
+        else:
+          ##
+          ## All other opcodes are agent-scope and are forwarded to the agent proxy representing the sender
+          ## of the message.
+          ##
+          agent_addr = ah['qmf.agent']
+          if agent_addr in self.agents:
+            agent = self.agents[agent_addr]
+            agent._handleQmfV2Message(opcode, mp, ah, content)
+
+    self.amqpSession.receiver._completed.add(msg.id)
+    self.amqpSession.channel.session_completed(self.amqpSession.receiver._completed)
 
   def _exceptionCb(self, data):
     self.connected = False
@@ -1818,43 +2264,697 @@ class Broker:
     if self.thread:
       self.thread.disconnected()
 
+
+#===================================================================================================
+# Agent
+#===================================================================================================
 class Agent:
-  """ """
-  def __init__(self, broker, agentBank, label):
+  """
+  This class represents a proxy for a remote agent being managed
+  """
+  def __init__(self, broker, agentBank, label, isV2=False, interval=0):
     self.broker = broker
+    self.session = broker.session
+    self.schemaCache = self.session.schemaCache
     self.brokerBank = broker.getBrokerBank()
-    self.agentBank = agentBank
+    self.agentBank = str(agentBank)
     self.label = label
+    self.isV2 = isV2
+    self.heartbeatInterval = interval
+    self.lock = Lock()
+    self.seqMgr = self.session.seqMgr
+    self.contextMap = {}
+    self.unsolicitedContext = RequestContext(self, self)
+    self.lastSeenTime = time()
+    self.closed = None
+
+
+  def _checkClosed(self):
+    if self.closed:
+      raise Exception("Agent is disconnected")
+
+
+  def __call__(self, **kwargs):
+    """
+    This is the handler for unsolicited stuff received from the agent
+    """
+    if 'qmf_object' in kwargs:
+      if self.session.console:
+        self.session.console.objectProps(self.broker, kwargs['qmf_object'])
+    elif 'qmf_object_stats' in kwargs:
+      if self.session.console:
+        self.session.console.objectStats(self.broker, kwargs['qmf_object_stats'])
+    elif 'qmf_event' in kwargs:
+      if self.session.console:
+        self.session.console.event(self.broker, kwargs['qmf_event'])
+
+
+  def touch(self):
+    self.lastSeenTime = time()
+
+
+  def isOld(self):
+    if self.heartbeatInterval == 0:
+      return None
+    if time() - self.lastSeenTime > (2.0 * self.heartbeatInterval):
+      return True
+    return None
+
+
+  def close(self):
+    self.closed = True
+    copy = {}
+    try:
+      self.lock.acquire()
+      for seq in self.contextMap:
+        copy[seq] = self.contextMap[seq]
+    finally:
+      self.lock.release()
+
+    for seq in copy:
+      context = copy[seq]
+      context.cancel("Agent disconnected")
+
 
   def __repr__(self):
-    return "Agent at bank %d.%d (%s)" % (self.brokerBank, self.agentBank, self.label)
+    if self.isV2:
+      ver = "v2"
+    else:
+      ver = "v1"
+    return "Agent(%s) at bank %d.%s (%s)" % (ver, self.brokerBank, self.agentBank, self.label)
+
 
   def getBroker(self):
+    self._checkClosed()
     return self.broker
 
+
   def getBrokerBank(self):
+    self._checkClosed()
     return self.brokerBank
 
+
   def getAgentBank(self):
+    self._checkClosed()
     return self.agentBank
 
+
+  def getObjects(self, notifiable=None, **kwargs):
+    """ Get a list of objects from QMF agents.
+    All arguments are passed by name(keyword).
+
+    If 'notifiable' is None (default), this call will block until completion or timeout.
+    If supplied, notifiable is assumed to be a callable object that will be called when the
+    list of queried objects arrives.  The single argument to the call shall be a list of
+    the returned objects.
+
+    The class for queried objects may be specified in one of the following ways:
+
+    _schema = <schema> - supply a schema object returned from getSchema.
+    _key = <key>       - supply a classKey from the list returned by getClasses.
+    _class = <name>    - supply a class name as a string.  If the class name exists
+                         in multiple packages, a _package argument may also be supplied.
+    _objectId = <id>   - get the object referenced by the object-id
+
+    The default timeout for this synchronous operation is 60 seconds.  To change the timeout,
+    use the following argument:
+
+    _timeout = <time in seconds>
+
+    If additional arguments are supplied, they are used as property selectors.  For example,
+    if the argument name="test" is supplied, only objects whose "name" property is "test"
+    will be returned in the result.
+    """
+    self._checkClosed()
+    if notifiable:
+      if not callable(notifiable):
+        raise Exception("notifiable object must be callable")
+
+    #
+    # Isolate the selectors from the kwargs
+    #
+    selectors = {}
+    for key in kwargs:
+      value = kwargs[key]
+      if key[0] != '_':
+        selectors[key] = value
+
+    #
+    # Allocate a context to track this asynchronous request.
+    #
+    context = RequestContext(self, notifiable, selectors)
+    sequence = self.seqMgr._reserve(context)
+    try:
+      self.lock.acquire()
+      self.contextMap[sequence] = context
+      context.setSequence(sequence)
+    finally:
+      self.lock.release()
+
+    #
+    # Compose and send the query message to the agent using the appropriate protocol for the
+    # agent's QMF version.
+    #
+    if self.isV2:
+      self._v2SendGetQuery(sequence, kwargs)
+    else:
+      self.broker._setSequence(sequence, self)
+      self._v1SendGetQuery(sequence, kwargs)
+
+    #
+    # If this is a synchronous call, block and wait for completion.
+    #
+    if not notifiable:
+      timeout = 60
+      if '_timeout' in kwargs:
+        timeout = kwargs['_timeout']
+      context.waitForSignal(timeout)
+      if context.exception:
+        raise Exception(context.exception)
+      result = context.queryResults
+      return result
+
+
+  def _clearContext(self, sequence):
+    try:
+      self.lock.acquire()
+      self.contextMap.pop(sequence)
+    finally:
+      self.lock.release()
+
+
+  def _schemaInfoFromV2Agent(self):
+    """
+    We have just received new schema information from this agent.  Check to see if there's
+    more work that can now be done.
+    """
+    try:
+      self.lock.acquire()
+      copy_of_map = {}
+      for item in self.contextMap:
+        copy_of_map[item] = self.contextMap[item]
+    finally:
+      self.lock.release()
+
+    self.unsolicitedContext.reprocess()
+    for context in copy_of_map:
+      copy_of_map[context].reprocess()
+
+
+  def _handleV1Completion(self, sequence, code, text):
+    """
+    Called if one of this agent's V1 commands completed
+    """
+    context = None
+    try:
+      self.lock.acquire()
+      if sequence in self.contextMap:
+        context = self.contextMap[sequence]
+    finally:
+      self.lock.release()
+
+    if context:
+      if code != 0:
+        ex = "Error %d: %s" % (code, text)
+        context.setException(ex)
+      context.signal()
+    self.broker._clearSequence(sequence)
+
+
+  def _v1HandleMethodResp(self, codec, seq):
+    """
+    Handle a QMFv1 method response
+    """
+    code = codec.read_uint32()
+    text = codec.read_str16()
+    outArgs = {}
+    self.broker._clearSequence(seq)
+    pair = self.seqMgr._release(seq)
+    if pair == None:
+      return
+    method, synchronous = pair
+    if code == 0:
+      for arg in method.arguments:
+        if arg.dir.find("O") != -1:
+          outArgs[arg.name] = self.session._decodeValue(codec, arg.type, self.broker)
+    result = MethodResult(code, text, outArgs)
+    if synchronous:
+      try:
+        self.broker.cv.acquire()
+        self.broker.syncResult = result
+        self.broker.syncInFlight = False
+        self.broker.cv.notify()
+      finally:
+        self.broker.cv.release()
+    else:
+      if self.session.console:
+        self.session.console.methodResponse(self.broker, seq, result)
+
+
+  def _v1HandleEventInd(self, codec, seq):
+    """
+    Handle a QMFv1 event indication
+    """
+    event = Event(self, codec)
+    self.unsolicitedContext.doEvent(event)
+
+
+  def _v1HandleContentInd(self, codec, sequence, prop=False, stat=False):
+    """
+    Handle a QMFv1 content indication
+    """
+    classKey = ClassKey(codec)
+    schema = self.schemaCache.getSchema(classKey)
+    if not schema:
+      return
+
+    obj = Object(self, schema, codec, prop, stat)
+    if classKey.getPackageName() == "org.apache.qpid.broker" and classKey.getClassName() == "agent" and prop:
+      self.broker._updateAgent(obj)
+
+    context = self.unsolicitedContext
+    try:
+      self.lock.acquire()
+      if sequence in self.contextMap:
+        context = self.contextMap[sequence]
+    finally:
+      self.lock.release()
+
+    context.addV1QueryResult(obj)
+
+
+  def _v2HandleDataInd(self, mp, ah, content):
+    """
+    Handle a QMFv2 data indication from the agent
+    """
+    if mp.correlation_id:
+      try:
+        self.lock.acquire()
+        sequence = int(mp.correlation_id)
+        if sequence not in self.contextMap:
+          return
+        context = self.contextMap[sequence]
+      finally:
+        self.lock.release()
+    else:
+      context = self.unsolicitedContext
+
+    kind = "_data"
+    if "qmf.content" in ah:
+      kind = ah["qmf.content"]
+    if kind == "_data":
+      if content.__class__ != list:
+        return
+      for omap in content:
+        context.addV2QueryResult(omap)
+      context.processV2Data()
+
+    if 'partial' not in ah:
+      context.signal()
+
+
+  def _v2HandleMethodResp(self, mp, ah, content):
+    """
+    Handle a QMFv2 method response from the agent
+    """
+    context = None
+    sequence = None
+    if mp.correlation_id:
+      try:
+        self.lock.acquire()
+        seq = int(mp.correlation_id)
+      finally:
+        self.lock.release()
+    else:
+      return
+
+    pair = self.seqMgr._release(seq)
+    if pair == None:
+      return
+    method, synchronous = pair
+
+    result = MethodResult(0, 'OK', content['_arguments'])
+    if synchronous:
+      try:
+        self.broker.cv.acquire()
+        self.broker.syncResult = result
+        self.broker.syncInFlight = False
+        self.broker.cv.notify()
+      finally:
+        self.broker.cv.release()
+    else:
+      if self.session.console:
+        self.session.console.methodResponse(self.broker, seq, result)
+
+  def _v2HandleException(self, mp, ah, content):
+    """
+    Handle a QMFv2 exception
+    """
+    context = None
+    if mp.correlation_id:
+      try:
+        self.lock.acquire()
+        seq = int(mp.correlation_id)
+      finally:
+        self.lock.release()
+    else:
+      return
+
+    pair = self.seqMgr._release(seq)
+    if pair == None:
+      return
+    method, synchronous = pair
+
+    code = 7
+    text = ""
+    if '_status_code' in content:
+      code = content['_status_code']
+    if '_status_text' in content:
+      text = content['_status_text']
+    else:
+      text = content
+
+    result = MethodResult(code, text, {})
+    if synchronous:
+      try:
+        self.broker.cv.acquire()
+        self.broker.syncResult = result
+        self.broker.syncInFlight = False
+        self.broker.cv.notify()
+      finally:
+        self.broker.cv.release()
+    else:
+      if self.session.console:
+        self.session.console.methodResponse(self.broker, seq, result)
+
+
+  def _v1SendGetQuery(self, sequence, kwargs):
+    """
+    Send a get query to a QMFv1 agent.
+    """
+    #
+    # Build the query map
+    #
+    query = {}
+    if '_class' in kwargs:
+      query['_class'] = kwargs['_class']
+      if '_package' in kwargs:
+        query['_package'] = kwargs['_package']
+    elif '_key' in kwargs:
+      key = kwargs['_key']
+      query['_class'] = key.getClassName()
+      query['_package'] = key.getPackageName()
+    elif '_objectId' in kwargs:
+      query['_objectid'] = kwargs['_objectId'].__repr__()
+
+    #
+    # Construct and transmit the message
+    #
+    sendCodec = Codec()
+    self.broker._setHeader(sendCodec, 'G', sequence)
+    sendCodec.write_map(query)
+    smsg = self.broker._message(sendCodec.encoded, "agent.%d.%s" % (self.brokerBank, self.agentBank))
+    self.broker._send(smsg)
+
+
+  def _v2SendGetQuery(self, sequence, kwargs):
+    """
+    Send a get query to a QMFv2 agent.
+    """
+    #
+    # Build the query map
+    #
+    query = {'_what': 'OBJECT'}
+    if '_class' in kwargs:
+      schemaMap = {'_class_name': kwargs['_class']}
+      if '_package' in kwargs:
+        schemaMap['_package_name'] = kwargs['_package']
+      query['_schema_id'] = schemaMap
+    elif '_key' in kwargs:
+      query['_schema_id'] = kwargs['_key'].asMap()
+    elif '_objectId' in kwargs:
+      query['_object_id'] = kwargs['_objectId'].asMap()
+
+    #
+    # Construct and transmit the message
+    #
+    dp = self.broker.amqpSession.delivery_properties()
+    dp.routing_key = self.agentBank
+    mp = self.broker.amqpSession.message_properties()
+    mp.content_type = "amqp/map"
+    mp.user_id = self.broker.authUser
+    mp.correlation_id = str(sequence)
+    mp.app_id = "qmf2"
+    mp.reply_to = self.broker.amqpSession.reply_to("qmf.default.direct", self.broker.v2_queue_name)
+    mp.application_headers = {'qmf.opcode':'_query_request'}
+    sendCodec = Codec()
+    sendCodec.write_map(query)
+    msg = Message(dp, mp, sendCodec.encoded)
+    self.broker._send(msg, "qmf.default.direct")
+
+
+  def _v2SendSchemaRequest(self, schemaId):
+    """
+    Send a query to an agent to request details on a particular schema class.
+    IMPORTANT:  This function currently sends a QMFv1 schema-request to the address of
+                the agent.  The agent will send its response to amq.direct/<our-key>.
+                Eventually, this will be converted to a proper QMFv2 schema query.
+    """
+    sendCodec = Codec()
+    seq = self.seqMgr._reserve(None)
+    self.broker._setHeader(sendCodec, 'S', seq)
+    schemaId.encode(sendCodec)
+    smsg = self.broker._message(sendCodec.encoded, self.agentBank)
+    self.broker._send(smsg, "qmf.default.direct")
+
+
+  def _handleQmfV1Message(self, opcode, seq, mp, ah, codec):
+    """
+    Process QMFv1 messages arriving from an agent.
+    """
+    if   opcode == 'm': self._v1HandleMethodResp(codec, seq)
+    elif opcode == 'e': self._v1HandleEventInd(codec, seq)
+    elif opcode == 'c': self._v1HandleContentInd(codec, seq, prop=True)
+    elif opcode == 'i': self._v1HandleContentInd(codec, seq, stat=True)
+    elif opcode == 'g': self._v1HandleContentInd(codec, seq, prop=True, stat=True)
+
+
+  def _handleQmfV2Message(self, opcode, mp, ah, content):
+    """
+    Process QMFv2 messages arriving from an agent.
+    """
+    if   opcode == '_data_indication': self._v2HandleDataInd(mp, ah, content)
+    elif opcode == '_query_response':  self._v2HandleDataInd(mp, ah, content)
+    elif opcode == '_method_response': self._v2HandleMethodResp(mp, ah, content)
+    elif opcode == '_exception':       self._v2HandleException(mp, ah, content)
+
+
+#===================================================================================================
+# RequestContext
+#===================================================================================================
+class RequestContext(object):
+  """
+  This class tracks an asynchronous request sent to an agent.
+  TODO: Add logic for client-side selection and filtering deleted objects from get-queries
+  """
+  def __init__(self, agent, notifiable, selectors={}):
+    self.sequence = None
+    self.agent = agent
+    self.schemaCache = self.agent.schemaCache
+    self.notifiable = notifiable
+    self.selectors = selectors
+    self.startTime = time()
+    self.rawQueryResults = []
+    self.queryResults = []
+    self.exception = None
+    self.waitingForSchema = None
+    self.pendingSignal = None
+    self.cv = Condition()
+    self.blocked = notifiable == None
+
+
+  def setSequence(self, sequence):
+    self.sequence = sequence
+
+
+  def addV1QueryResult(self, data):
+    values = {}
+    for prop, val in data.getProperties():
+      values[prop.name] = val
+    for stat, val in data.getStatistics():
+      values[stat.name] = val
+    for key in values:
+      val = values[key]
+      if key in self.selectors and val != self.selectors[key]:
+        return
+
+    if self.notifiable:
+      self.notifiable(qmf_object=data)
+    else:
+      self.queryResults.append(data)
+
+
+  def addV2QueryResult(self, data):
+    values = data['_values']
+    for key in values:
+      val = values[key]
+      if key in self.selectors and val != self.selectors[key]:
+        return
+    self.rawQueryResults.append(data)
+
+
+  def doEvent(self, data):
+    if self.notifiable:
+      self.notifiable(qmf_event=data)
+
+
+  def setException(self, ex):
+    self.exception = ex
+
+
+  def getAge(self):
+    return time() - self.startTime
+
+
+  def cancel(self, exception):
+    self.setException(exception)
+    try:
+      self.cv.acquire()
+      self.blocked = None
+      self.waitingForSchema = None
+      self.cv.notify()
+    finally:
+      self.cv.release()
+    self._complete()
+
+
+  def waitForSignal(self, timeout):
+    try:
+      self.cv.acquire()
+      while self.blocked:
+        if (time() - self.startTime) > timeout:
+          self.exception = "Request timed out after %d seconds" % timeout
+          return
+        self.cv.wait(1)
+    finally:
+      self.cv.release()
+
+
+  def signal(self):
+    try:
+      self.cv.acquire()
+      if self.waitingForSchema:
+        self.pendingSignal = True
+        return
+      else:
+        self.blocked = None
+        self.cv.notify()
+    finally:
+      self.cv.release()
+    self._complete()
+
+
+  def _complete(self):
+    if self.notifiable:
+      if self.exception:
+        self.notifiable(qmf_exception=self.exception)
+      else:
+        self.notifiable(qmf_complete=True)
+
+    if self.sequence:
+      self.agent._clearContext(self.sequence)
+
+
+  def processV2Data(self):
+    """
+    Attempt to make progress on the entries in the raw_query_results queue.  If an entry has a schema
+    that is in our schema cache, process it.  Otherwise, send a request for the schema information
+    to the agent that manages the object.
+    """
+    schemaId = None
+    queryResults = []
+    try:
+      self.cv.acquire()
+      if self.waitingForSchema:
+        return
+      while (not self.waitingForSchema) and len(self.rawQueryResults) > 0:
+        head = self.rawQueryResults[0]
+        schemaId = self._getSchemaIdforV2ObjectLH(head)
+        schema = self.schemaCache.getSchema(schemaId)
+        if schema:
+          obj = Object(self.agent, schema, v2Map=head, agentName=self.agent.agentBank)
+          queryResults.append(obj)
+          self.rawQueryResults.pop(0)
+        else:
+          self.waitingForSchema = True
+    finally:
+      self.cv.release()
+
+    if self.waitingForSchema:
+      self.agent._v2SendSchemaRequest(schemaId)
+
+    for result in queryResults:
+      if self.notifiable:
+        self.notifiable(qmf_object=result)
+      else:
+        self.queryResults.append(result)
+
+    complete = None
+    try:
+      self.cv.acquire()
+      if not self.waitingForSchema and self.pendingSignal:
+        self.blocked = None
+        self.cv.notify()
+        complete = True
+    finally:
+      self.cv.release()
+
+    if complete:
+      self._complete()
+
+
+  def reprocess(self):
+    """
+    New schema information has been added to the schema-cache.  Clear our 'waiting' status
+    and see if we can make more progress on the raw query list.
+    """
+    try:
+      self.cv.acquire()
+      self.waitingForSchema = None
+    finally:
+      self.cv.release()
+    self.processV2Data()
+
+
+  def _getSchemaIdforV2ObjectLH(self, data):
+    """
+    Given a data map, extract the schema-identifier.
+    """
+    if data.__class__ != dict:
+      return None
+    if '_schema_id' in data:
+      return ClassKey(data['_schema_id'])
+    return None
+
+
+#===================================================================================================
+# Event
+#===================================================================================================
 class Event:
   """ """
-  def __init__(self, session, broker, codec):
-    self.session = session
-    self.broker  = broker
+  def __init__(self, agent, codec):
+    self.agent = agent
+    self.session = agent.session
+    self.broker  = agent.broker
     self.classKey = ClassKey(codec)
     self.timestamp = codec.read_int64()
     self.severity = codec.read_uint8()
-    self.schema = None
-    pname = self.classKey.getPackageName()
-    pkey = self.classKey.getPackageKey()
-    if pname in session.packages:
-      if pkey in session.packages[pname]:
-        self.schema = session.packages[pname][pkey]
-        self.arguments = {}
-        for arg in self.schema.arguments:
-          self.arguments[arg.name] = session._decodeValue(codec, arg.type, broker)
+    self.arguments = {}
+    self.schema = self.session.schemaCache.getSchema(self.classKey)
+    if not self.schema:
+      return
+    for arg in self.schema.arguments:
+      self.arguments[arg.name] = self.session._decodeValue(codec, arg.type, self.broker)
 
   def __repr__(self):
     if self.schema == None:
@@ -1895,6 +2995,10 @@ class Event:
   def getSchema(self):
     return self.schema
 
+
+#===================================================================================================
+# SequenceManager
+#===================================================================================================
 class SequenceManager:
   """ Manage sequence numbers for asynchronous method calls """
   def __init__(self):
@@ -1926,6 +3030,9 @@ class SequenceManager:
     return data
 
 
+#===================================================================================================
+# DebugConsole
+#===================================================================================================
 class DebugConsole(Console):
   """ """
   def brokerConnected(self, broker):

Modified: qpid/trunk/qpid/tools/src/py/qpid-cluster
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qpid-cluster?rev=929716&r1=929715&r2=929716&view=diff
==============================================================================
--- qpid/trunk/qpid/tools/src/py/qpid-cluster (original)
+++ qpid/trunk/qpid/tools/src/py/qpid-cluster Wed Mar 31 21:13:12 2010
@@ -94,7 +94,7 @@ class BrokerManager:
         self.broker = self.qmf.addBroker(brokerUrl, self.config._connTimeout)
         agents = self.qmf.getAgents()
         for a in agents:
-            if a.getAgentBank() == 0:
+            if a.getAgentBank() == '0':
                 self.brokerAgent = a
 
     def Disconnect(self):

Modified: qpid/trunk/qpid/tools/src/py/qpid-config
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qpid-config?rev=929716&r1=929715&r2=929716&view=diff
==============================================================================
--- qpid/trunk/qpid/tools/src/py/qpid-config (original)
+++ qpid/trunk/qpid/tools/src/py/qpid-config Wed Mar 31 21:13:12 2010
@@ -189,7 +189,7 @@ class BrokerManager:
         self.broker = self.qmf.addBroker(brokerUrl, _connTimeout)
         agents = self.qmf.getAgents()
         for a in agents:
-            if a.getAgentBank() == 0:
+            if a.getAgentBank() == '0':
                 self.brokerAgent = a
 
     def Disconnect(self):

Modified: qpid/trunk/qpid/tools/src/py/qpid-stat
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qpid-stat?rev=929716&r1=929715&r2=929716&view=diff
==============================================================================
--- qpid/trunk/qpid/tools/src/py/qpid-stat (original)
+++ qpid/trunk/qpid/tools/src/py/qpid-stat Wed Mar 31 21:13:12 2010
@@ -91,7 +91,7 @@ class Broker(object):
 
         agents = qmf.getAgents()
         for a in agents:
-            if a.getAgentBank() == 0:
+            if a.getAgentBank() == '0':
                 self.brokerAgent = a
 
         bobj = qmf.getObjects(_class="broker", _package="org.apache.qpid.broker", _agent=self.brokerAgent)[0]



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org