You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2010/03/30 14:18:21 UTC

svn commit: r929104 - in /qpid/branches/qmf-devel0.7a/qpid: cpp/managementgen/qmfgen/ cpp/managementgen/qmfgen/templates/ cpp/src/qpid/agent/ cpp/src/qpid/management/ extras/qmf/src/py/qmf/

Author: tross
Date: Tue Mar 30 12:18:21 2010
New Revision: 929104

URL: http://svn.apache.org/viewvc?rev=929104&view=rev
Log:
Methods (both styles) and Session-scope get queries now working for both V1 and V2.
Almost all of the tests pass.

Modified:
    qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmfgen/schema.py
    qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmfgen/templates/Class.cpp
    qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
    qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementObject.cpp
    qpid/branches/qmf-devel0.7a/qpid/extras/qmf/src/py/qmf/console.py

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmfgen/schema.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmfgen/schema.py?rev=929104&r1=929103&r2=929104&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmfgen/schema.py (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmfgen/schema.py Tue Mar 30 12:18:21 2010
@@ -1402,7 +1402,7 @@ class SchemaClass:
 
       stream.write ("        status = coreObject->ManagementMethod (METHOD_" +\
                     method.getName().upper() + ", ioArgs, text);\n")
-      stream.write ("        outMap[\"_status_code\"] = (status);\n")
+      stream.write ("        outMap[\"_status_code\"] = (uint32_t) status;\n")
       stream.write ("        outMap[\"_status_text\"] = ::qpid::management::Manageable::StatusText(status, text);\n")
       for arg in method.args:
         if arg.getDir () == "O" or arg.getDir () == "IO":

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmfgen/templates/Class.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmfgen/templates/Class.cpp?rev=929104&r1=929103&r2=929104&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmfgen/templates/Class.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/managementgen/qmfgen/templates/Class.cpp Tue Mar 30 12:18:21 2010
@@ -333,6 +333,6 @@ void /*MGEN:Class.NameCap*/::doMethod (/
     std::string          text;
 
 /*MGEN:Class.MapMethodHandlers*/
-    outMap["_status_code"] = status;
+    outMap["_status_code"] = (uint32_t) status;
     outMap["_status_text"] = Manageable::StatusText(status, text);
 }

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp?rev=929104&r1=929103&r2=929104&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp Tue Mar 30 12:18:21 2010
@@ -459,19 +459,21 @@ void ManagementAgentImpl::invokeMethodRe
 
     qpid::messaging::Message outMsg;
     qpid::messaging::MapContent outMap(outMsg);
+    outMap["_values"] = Variant::Map();
 
     if ((oid = inMap.find("_object_id")) == inMap.end() ||
         (mid = inMap.find("_method_name")) == inMap.end()) {
-        (outMap["_values"].asMap())["_status"] = Manageable::STATUS_PARAMETER_INVALID;
+        (outMap["_values"].asMap())["_status_code"] = Manageable::STATUS_PARAMETER_INVALID;
         (outMap["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID);
         failed = true;
     } else {
         string methodName;
         ObjectId objId;
         Variant::Map inArgs;
+        Variant::Map callMap;
 
         try {
-            // coversions will throw if input is invalid.
+            // conversions will throw if input is invalid.
             objId = ObjectId(oid->second.asMap());
             methodName = mid->second.getString();
 
@@ -482,17 +484,29 @@ void ManagementAgentImpl::invokeMethodRe
 
             ManagementObjectMap::iterator iter = managementObjects.find(objId);
             if (iter == managementObjects.end() || iter->second->isDeleted()) {
-                (outMap["_values"].asMap())["_status"] = Manageable::STATUS_UNKNOWN_OBJECT;
+                (outMap["_values"].asMap())["_status_code"] = Manageable::STATUS_UNKNOWN_OBJECT;
                 (outMap["_values"].asMap())["_status_text"] = Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT);
                 failed = true;
             } else {
+                iter->second->doMethod(methodName, inArgs, callMap);
+            }
 
-                iter->second->doMethod(methodName, inArgs, outMap.asMap());
+            if (callMap["_status_code"].asUint32() == 0) {
+                outMap["_arguments"] = Variant::Map();
+                for (Variant::Map::const_iterator iter = callMap.begin();
+                     iter != callMap.end(); iter++)
+                    if (iter->first != "_status_code" && iter->first != "_status_text")
+                        outMap["_arguments"].asMap()[iter->first] = iter->second;
+            } else {
+                (outMap["_values"].asMap())["_status_code"] = callMap["_status_code"];
+                (outMap["_values"].asMap())["_status_text"] = callMap["_status_text"];
+                failed = true;
             }
 
-        } catch(exception& e) {
+        } catch(messaging::InvalidConversion& e) {
             outMap.clear();
-            (outMap["_values"].asMap())["_status"] = Manageable::STATUS_EXCEPTION;
+            outMap["_values"] = Variant::Map();
+            (outMap["_values"].asMap())["_status_code"] = Manageable::STATUS_EXCEPTION;
             (outMap["_values"].asMap())["_status_text"] = e.what();
             failed = true;
         }
@@ -501,10 +515,13 @@ void ManagementAgentImpl::invokeMethodRe
     Variant::Map headers;
     headers["method"] = "response";
     headers["qmf.agent"] = name_address;
-    if (failed)
+    if (failed) {
         headers["qmf.opcode"] = "_exception";
-    else
+        QPID_LOG(trace, "SENT Exception map=" << outMap);
+    } else {
         headers["qmf.opcode"] = "_method_response";
+        QPID_LOG(trace, "SENT MethodResponse map=" << outMap);
+    }
 
     outMap.encode();
     connThreadBody.sendBuffer(outMsg.getContent(), cid, headers, "qmf.default.direct", replyTo);

Modified: qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementObject.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementObject.cpp?rev=929104&r1=929103&r2=929104&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementObject.cpp (original)
+++ qpid/branches/qmf-devel0.7a/qpid/cpp/src/qpid/management/ManagementObject.cpp Tue Mar 30 12:18:21 2010
@@ -188,11 +188,6 @@ void ObjectId::setV2Key(const Management
 // encode as V2-format map
 void ObjectId::mapEncode(messaging::VariantMap& map) const
 {
-    if (agent == 0)
-        map["_first"] = first;
-    else
-        map["_first"] = (first | agent->first);
-
     map["_object_name"] = v2Key;
     if (!agentName.empty())
         map["_agent_name"] = agentName;

Modified: qpid/branches/qmf-devel0.7a/qpid/extras/qmf/src/py/qmf/console.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7a/qpid/extras/qmf/src/py/qmf/console.py?rev=929104&r1=929103&r2=929104&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7a/qpid/extras/qmf/src/py/qmf/console.py (original)
+++ qpid/branches/qmf-devel0.7a/qpid/extras/qmf/src/py/qmf/console.py Tue Mar 30 12:18:21 2010
@@ -287,7 +287,7 @@ class Object(object):
       if name == prop.name:
         return value
       if name == "_" + prop.name + "_" and prop.type == 10:  # Dereference references
-        deref = self._session.getObjects(_objectId=value, _broker=self._broker)
+        deref = self._agent.getObjects(_objectId=value)
         if len(deref) != 1:
           return None
         else:
@@ -321,10 +321,6 @@ class Object(object):
         aIdx = 0
         sendCodec = Codec()
         seq = self._session.seqMgr._reserve((method, synchronous))
-        self._broker._setHeader(sendCodec, 'M', seq)
-        self._objectId.encode(sendCodec)
-        self._schema.getKey().encode(sendCodec)
-        sendCodec.write_str8(name)
 
         count = 0
         for arg in method.arguments:
@@ -333,24 +329,64 @@ class Object(object):
         if count != len(args):
           raise Exception("Incorrect number of arguments: expected %d, got %d" % (count, len(args)))
 
-        for arg in method.arguments:
-          if arg.dir.find("I") != -1:
-            self._session._encodeValue(sendCodec, args[aIdx], arg.type)
-            aIdx += 1
-        if timeWait:
-          ttl = timeWait * 1000
+        if self._agent.isV2:
+          #
+          # Compose and send a QMFv2 method request
+          #
+          call = {}
+          call['_object_id'] = self._objectId.asMap()
+          call['_method_name'] = name
+          argMap = {}
+          for arg in method.arguments:
+            if arg.dir.find("I") != -1:
+              argMap[arg.name] = args[aIdx]
+              aIdx += 1
+          call['_arguments'] = argMap
+
+          dp = self._broker.amqpSession.delivery_properties()
+          dp.routing_key = self._objectId.getAgentBank()
+          mp = self._broker.amqpSession.message_properties()
+          mp.content_type = "amqp/map"
+          mp.user_id = self._broker.authUser
+          mp.correlation_id = str(seq)
+          mp.app_id = "qmf2"
+          mp.reply_to = self._broker.amqpSession.reply_to("qmf.default.direct", self._broker.v2_queue_name)
+          mp.application_headers = {'qmf.opcode':'_method_request'}
+          sendCodec.write_map(call)
+          smsg = Message(dp, mp, sendCodec.encoded)
+          exchange = "qmf.default.direct"
+
         else:
-          ttl = None
-        smsg = self._broker._message(sendCodec.encoded, "agent.%d.%s" %
-                                     (self._objectId.getBrokerBank(), self._objectId.getAgentBank()),
-                                     ttl=ttl)
+          #
+          # Associate this sequence with the agent hosting the object so we can correctly
+          # route the method-response
+          #
+          agent = self._broker.getAgent(self._broker.getBrokerBank(), self._objectId.getAgentBank())
+          self._broker._setSequence(seq, agent)
+
+          #
+          # Compose and send a QMFv1 method request
+          #
+          self._broker._setHeader(sendCodec, 'M', seq)
+          self._objectId.encode(sendCodec)
+          self._schema.getKey().encode(sendCodec)
+          sendCodec.write_str8(name)
+
+          for arg in method.arguments:
+            if arg.dir.find("I") != -1:
+              self._session._encodeValue(sendCodec, args[aIdx], arg.type)
+              aIdx += 1
+          smsg = self._broker._message(sendCodec.encoded, "agent.%d.%s" %
+                                       (self._objectId.getBrokerBank(), self._objectId.getAgentBank()))
+          exchange = "qpid.management"
+
         if synchronous:
           try:
             self._broker.cv.acquire()
             self._broker.syncInFlight = True
           finally:
             self._broker.cv.release()
-        self._broker._send(smsg)
+        self._broker._send(smsg, exchange)
         return seq
     return None
 
@@ -551,7 +587,9 @@ class Session:
 
     self.brokers.append(broker)
     if not self.manageConnections:
-      self.getObjects(broker=broker, _class="agent", _agent=broker.getAgent(1,0))
+      agent = broker.getAgent(1,0)
+      if agent:
+        agent.getObjects(_class="agent")
     return broker
 
 
@@ -712,76 +750,17 @@ class Session:
     if len(agentList) == 0:
       return []
 
-    pname = None
-    cname = None
-    hash = None
-    classKey = None
-    if   "_schema" in kwargs: classKey = kwargs["_schema"].getKey()
-    elif "_key"    in kwargs: classKey = kwargs["_key"]
-    elif "_class"  in kwargs:
-      cname = kwargs["_class"]
-      if "_package" in kwargs:
-        pname = kwargs["_package"]
-    if cname == None and classKey == None and "_objectId" not in kwargs:
-      raise Exception("No class supplied, use '_schema', '_key', '_class', or '_objectId' argument")
-
-    map = {}
-    self.getSelect = []
-    if "_objectId" in kwargs:
-      map["_objectid"] = kwargs["_objectId"].__repr__()
-    else:
-      if cname == None:
-        cname = classKey.getClassName()
-        pname = classKey.getPackageName()
-        hash = classKey.getHash()
-      map["_class"] = cname
-      if pname != None: map["_package"] = pname
-      if hash  != None: map["_hash"]    = hash
-      for item in kwargs:
-        if item[0] != '_':
-          self.getSelect.append((item, kwargs[item]))
-
-    self.getResult = []
+    #
+    # We now have a list of agents to query, start the queries and gather the results.
+    #
+    request = SessionGetRequest(len(agentList))
     for agent in agentList:
-      broker = agent.broker
-      sendCodec = Codec()
-      try:
-        self.cv.acquire()
-        seq = self.seqMgr._reserve(self._CONTEXT_MULTIGET)
-        self.syncSequenceList.append(seq)
-      finally:
-        self.cv.release()
-      broker._setHeader(sendCodec, 'G', seq)
-      sendCodec.write_map(map)
-      smsg = broker._message(sendCodec.encoded, "agent.%d.%d" % (agent.brokerBank, agent.agentBank))
-      broker._send(smsg)
-
-    starttime = time()
-    timeout = False
-    if "_timeout" in kwargs:
-      waitTime = kwargs["_timeout"]
-    else:
-      waitTime = self.DEFAULT_GET_WAIT_TIME
-    try:
-      self.cv.acquire()
-      while len(self.syncSequenceList) > 0 and self.error == None:
-        self.cv.wait(waitTime)
-        if time() - starttime > waitTime:
-          for pendingSeq in self.syncSequenceList:
-            self.seqMgr._release(pendingSeq)
-          self.syncSequenceList = []
-          timeout = True
-    finally:
-      self.cv.release()
-
-    if self.error:
-      errorText = self.error
-      self.error = None
-      raise Exception(errorText)
-
-    if len(self.getResult) == 0 and timeout:
-      raise RuntimeError("No agent responded within timeout period")
-    return self.getResult
+      agent.getObjects(request, **kwargs)
+    timeout = 60
+    if '_timeout' in kwargs:
+        timeout = kwargs['_timeout']
+    request.wait(timeout)
+    return request.result
 
 
   def setEventFilter(self, **kwargs):
@@ -1179,14 +1158,10 @@ class Session:
     schema = self.getSchema(schemaKey)
     for method in schema.getMethods():
       if name == method.name:
-        aIdx = 0
-        sendCodec = Codec()
-        seq = self.seqMgr._reserve((method, False))
-        broker._setHeader(sendCodec, 'M', seq)
-        objectId.encode(sendCodec)
-        schemaKey.encode(sendCodec)
-        sendCodec.write_str8(name)
-
+        #
+        # Count the arguments supplied and validate that the number is what is expected
+        # based on the schema.
+        #
         count = 0
         for arg in method.arguments:
           if arg.dir.find("I") != -1:
@@ -1194,18 +1169,106 @@ class Session:
         if count != len(argList):
           raise Exception("Incorrect number of arguments: expected %d, got %d" % (count, len(argList)))
 
-        for arg in method.arguments:
-          if arg.dir.find("I") != -1:
-            self._encodeValue(sendCodec, argList[aIdx], arg.type)
-            aIdx += 1
-        smsg = broker._message(sendCodec.encoded, "agent.%d.%d" %
-                               (objectId.getBrokerBank(), objectId.getAgentBank()))
-        broker._send(smsg)
+        aIdx = 0
+        sendCodec = Codec()
+        seq = self.seqMgr._reserve((method, False))
+
+        if objectId.isV2():
+          #
+          # Compose and send a QMFv2 method request
+          #
+          call = {}
+          call['_object_id'] = objectId.asMap()
+          call['_method_name'] = name
+          args = {}
+          for arg in method.arguments:
+            if arg.dir.find("I") != -1:
+              args[arg.name] = argList[aIdx]
+              aIdx += 1
+          call['_arguments'] = args
+
+          dp = broker.amqpSession.delivery_properties()
+          dp.routing_key = objectId.getAgentBank()
+          mp = broker.amqpSession.message_properties()
+          mp.content_type = "amqp/map"
+          mp.user_id = broker.authUser
+          mp.correlation_id = str(seq)
+          mp.app_id = "qmf2"
+          mp.reply_to = broker.amqpSession.reply_to("qmf.default.direct", broker.v2_queue_name)
+          mp.application_headers = {'qmf.opcode':'_method_request'}
+          sendCodec.write_map(call)
+          msg = Message(dp, mp, sendCodec.encoded)
+          broker._send(msg, "qmf.default.direct")
+
+        else:
+          #
+          # Associate this sequence with the agent hosting the object so we can correctly
+          # route the method-response
+          #
+          agent = broker.getAgent(broker.getBrokerBank(), objectId.getAgentBank())
+          broker._setSequence(seq, agent)
+
+          #
+          # Compose and send a QMFv1 method request
+          #
+          broker._setHeader(sendCodec, 'M', seq)
+          objectId.encode(sendCodec)
+          schemaKey.encode(sendCodec)
+          sendCodec.write_str8(name)
+
+          for arg in method.arguments:
+            if arg.dir.find("I") != -1:
+              self._encodeValue(sendCodec, argList[aIdx], arg.type)
+              aIdx += 1
+              smsg = broker._message(sendCodec.encoded, "agent.%d.%s" %
+                                     (objectId.getBrokerBank(), objectId.getAgentBank()))
+          broker._send(smsg)
         return seq
     return None
 
 
 #===================================================================================================
+# SessionGetRequest
+#===================================================================================================
+class SessionGetRequest(object):
+  """
+  This class is used to track get-object queries at the Session level.
+  """
+  def __init__(self, agentCount):
+    self.agentCount = agentCount
+    self.result = []
+    self.cv = Condition()
+    self.waiting = True
+
+  def __call__(self, **kwargs):
+    """
+    Callable entry point for gathering collected objects.
+    """
+    try:
+      self.cv.acquire()
+      if 'qmf_object' in kwargs:
+        self.result.append(kwargs['qmf_object'])
+      elif 'qmf_complete' in kwargs or 'qmf_exception' in kwargs:
+        self.agentCount -= 1
+        if self.agentCount == 0:
+          self.waiting = None
+          self.cv.notify()
+    finally:
+      self.cv.release()
+
+  def wait(self, timeout):
+    starttime = time()
+    try:
+      self.cv.acquire()
+      while self.waiting:
+        if (time() - starttime) > timeout:
+          raise Exception("Timed out after %d seconds" % timeout)
+        self.cv.wait(1)
+    finally:
+      self.cv.release()
+
+
+#===================================================================================================
 # SchemaCache
 #===================================================================================================
 class SchemaCache(object):
@@ -1572,7 +1635,7 @@ class ObjectId:
       else:
         first  = constructor.read_uint64()
         second = constructor.read_uint64()
-      self.agentName = str((first & 0x0000FFFFF0000000) >> 28)
+      self.agentName = str((first & 0x000000000FFFFFFF) >> 28)
       self.agentEpoch = (first & 0x0FFF000000000000) >> 48
       self.objectName = str(second)
 
@@ -1600,6 +1663,9 @@ class ObjectId:
     return "%d-%d-%d-%s-%s" % (self.getFlags(), self.getSequence(),
                                self.getBrokerBank(), self.getAgentBank(), self.getObject())
 
+  def isV2(self):
+    return not self.agentName.isdigit()
+
   def index(self):
     return self.__repr__()
 
@@ -1622,11 +1688,11 @@ class ObjectId:
     return self.getSequence() == 0
 
   def encode(self, codec):
-    first = self.agentEpoch << 48
+    first = (self.agentEpoch << 48) + (1 << 28)
     second = 0
 
     try:
-      first += int(self.agentName) << 28
+      first += int(self.agentName)
     except:
       pass
 
@@ -1787,7 +1853,7 @@ class Broker:
 
   def getAgent(self, brokerBank, agentBank):
     """ Return the agent object associated with a particular broker and agent bank value."""
-    bankKey = agentBank
+    bankKey = str(agentBank)
     try:
       self.cv.acquire()
       if bankKey in self.agents:
@@ -1852,7 +1918,7 @@ class Broker:
       try:
         self.cv.acquire()
         self.agents = {}
-        self.agents[0] = Agent(self, 0, "BrokerAgent")
+        self.agents['0'] = Agent(self, 0, "BrokerAgent")
       finally:
         self.cv.release()
 
@@ -2331,9 +2397,18 @@ class Agent:
         raise Exception("notifiable object must be callable")
 
     #
+    # Isolate the selectors from the kwargs
+    #
+    selectors = {}
+    for key in kwargs:
+      value = kwargs[key]
+      if key[0] != '_':
+        selectors[key] = value
+
+    #
     # Allocate a context to track this asynchronous request.
     #
-    context = RequestContext(self, notifiable)
+    context = RequestContext(self, notifiable, selectors)
     sequence = self.seqMgr._reserve(context)
     try:
       self.lock.acquire()
@@ -2419,6 +2494,7 @@ class Agent:
     code = codec.read_uint32()
     text = codec.read_str16()
     outArgs = {}
+    self.broker._clearSequence(seq)
     pair = self.seqMgr._release(seq)
     if pair == None:
       return
@@ -2426,19 +2502,19 @@ class Agent:
     if code == 0:
       for arg in method.arguments:
         if arg.dir.find("O") != -1:
-          outArgs[arg.name] = self._decodeValue(codec, arg.type, broker)
+          outArgs[arg.name] = self.session._decodeValue(codec, arg.type, self.broker)
     result = MethodResult(code, text, outArgs)
     if synchronous:
       try:
-        broker.cv.acquire()
-        broker.syncResult = result
-        broker.syncInFlight = False
-        broker.cv.notify()
+        self.broker.cv.acquire()
+        self.broker.syncResult = result
+        self.broker.syncInFlight = False
+        self.broker.cv.notify()
       finally:
-        broker.cv.release()
+        self.broker.cv.release()
     else:
-      if self.console:
-        self.console.methodResponse(broker, seq, result)
+      if self.session.console:
+        self.session.console.methodResponse(self.broker, seq, result)
 
 
   def _v1HandleEventInd(self, codec, seq):
@@ -2502,12 +2578,79 @@ class Agent:
       context.signal()
 
 
-  def _v2HandleMethodRsp(self, mp, ah, content):
-    pass
+  def _v2HandleMethodResp(self, mp, ah, content):
+    """
+    Handle a QMFv2 method response from the agent
+    """
+    context = None
+    sequence = None
+    if mp.correlation_id:
+      try:
+        self.lock.acquire()
+        seq = int(mp.correlation_id)
+      finally:
+        self.lock.release()
+    else:
+      return
+
+    pair = self.seqMgr._release(seq)
+    if pair == None:
+      return
+    method, synchronous = pair
 
+    result = MethodResult(0, 'OK', content['_arguments'])
+    if synchronous:
+      try:
+        self.broker.cv.acquire()
+        self.broker.syncResult = result
+        self.broker.syncInFlight = False
+        self.broker.cv.notify()
+      finally:
+        self.broker.cv.release()
+    else:
+      if self.session.console:
+        self.session.console.methodResponse(self.broker, seq, result)
 
   def _v2HandleException(self, mp, ah, content):
-    pass
+    """
+    Handle a QMFv2 exception
+    """
+    context = None
+    if mp.correlation_id:
+      try:
+        self.lock.acquire()
+        seq = int(mp.correlation_id)
+      finally:
+        self.lock.release()
+    else:
+      return
+
+    pair = self.seqMgr._release(seq)
+    if pair == None:
+      return
+    method, synchronous = pair
+
+    code = 7
+    text = ""
+    if '_status_code' in content:
+      code = content['_status_code']
+    if '_status_text' in content:
+      text = content['_status_text']
+    else:
+      text = content
+
+    result = MethodResult(code, text, {})
+    if synchronous:
+      try:
+        self.broker.cv.acquire()
+        self.broker.syncResult = result
+        self.broker.syncInFlight = False
+        self.broker.cv.notify()
+      finally:
+        self.broker.cv.release()
+    else:
+      if self.session.console:
+        self.session.console.methodResponse(self.broker, seq, result)
 
 
   def _v1SendGetQuery(self, sequence, kwargs):
@@ -2607,7 +2750,7 @@ class Agent:
     """
     if   opcode == '_data_indication': self._v2HandleDataInd(mp, ah, content)
     elif opcode == '_query_response':  self._v2HandleDataInd(mp, ah, content)
-    elif opcode == '_method_response': self._v2HandleMethodRsp(mp, ah, content)
+    elif opcode == '_method_response': self._v2HandleMethodResp(mp, ah, content)
     elif opcode == '_exception':       self._v2HandleException(mp, ah, content)
 
 
@@ -2619,11 +2762,12 @@ class RequestContext(object):
   This class tracks an asynchronous request sent to an agent.
   TODO: Add logic for client-side selection and filtering deleted objects from get-queries
   """
-  def __init__(self, agent, notifiable):
+  def __init__(self, agent, notifiable, selectors={}):
     self.sequence = None
     self.agent = agent
     self.schemaCache = self.agent.schemaCache
     self.notifiable = notifiable
+    self.selectors = selectors
     self.startTime = time()
     self.rawQueryResults = []
     self.queryResults = []
@@ -2639,6 +2783,16 @@ class RequestContext(object):
 
 
   def addV1QueryResult(self, data):
+    values = {}
+    for prop, val in data.getProperties():
+      values[prop.name] = val
+    for stat, val in data.getStatistics():
+      values[stat.name] = val
+    for key in values:
+      val = values[key]
+      if key in self.selectors and val != self.selectors[key]:
+        return
+
     if self.notifiable:
       self.notifiable(qmf_object=data)
     else:
@@ -2646,6 +2800,11 @@ class RequestContext(object):
 
 
   def addV2QueryResult(self, data):
+    values = data['_values']
+    for key in values:
+      val = values[key]
+      if key in self.selectors and val != self.selectors[key]:
+        return
     self.rawQueryResults.append(data)
 
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org