You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2010/02/13 00:01:21 UTC

svn commit: r909648 - in /qpid/trunk/qpid/python/qmf2: common.py console.py tests/__init__.py tests/async_method.py tests/async_query.py

Author: kgiusti
Date: Fri Feb 12 23:01:21 2010
New Revision: 909648

URL: http://svn.apache.org/viewvc?rev=909648&view=rev
Log:
QPID-2261: add async method call workitems

Added:
    qpid/trunk/qpid/python/qmf2/tests/async_method.py
Modified:
    qpid/trunk/qpid/python/qmf2/common.py
    qpid/trunk/qpid/python/qmf2/console.py
    qpid/trunk/qpid/python/qmf2/tests/__init__.py
    qpid/trunk/qpid/python/qmf2/tests/async_query.py

Modified: qpid/trunk/qpid/python/qmf2/common.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qmf2/common.py?rev=909648&r1=909647&r2=909648&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qmf2/common.py (original)
+++ qpid/trunk/qpid/python/qmf2/common.py Fri Feb 12 23:01:21 2010
@@ -132,6 +132,7 @@
     EVENT_RECEIVED=7
     AGENT_HEARTBEAT=8
     QUERY_COMPLETE=9
+    METHOD_RESPONSE=10
     # Enumeration of the types of WorkItems produced on the Agent
     METHOD_CALL=1000
     QUERY=1001

Modified: qpid/trunk/qpid/python/qmf2/console.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qmf2/console.py?rev=909648&r1=909647&r2=909648&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qmf2/console.py (original)
+++ qpid/trunk/qpid/python/qmf2/console.py Fri Feb 12 23:01:21 2010
@@ -127,14 +127,11 @@
     A Mailbox for asynchronous delivery, with a timeout value.
     """
     def __init__(self, console, 
-                 agent_name,
                  _timeout=None):
         """
         Invoked by application thread.
         """
         super(_AsyncMailbox, self).__init__(console)
-
-        self.agent_name = agent_name
         self.console = console
 
         if _timeout is None:
@@ -186,8 +183,8 @@
         Invoked by application thread.
         """
         super(_QueryMailbox, self).__init__(console,
-                                            agent_name,
                                             _timeout)
+        self.agent_name = agent_name
         self.target = target
         self.msgkey = msgkey
         self.context = context
@@ -267,19 +264,15 @@
     Handles responses to schema fetches made by the console.
     """
     def __init__(self, console,
-                 agent_name,
                  schema_id,
                  _timeout=None):
         """
         Invoked by application thread.
         """
         super(_SchemaPrefetchMailbox, self).__init__(console,
-                                                     agent_name,
                                                      _timeout)
-
         self.schema_id = schema_id
 
-
     def deliver(self, reply):
         """
         Process schema response messages.
@@ -306,6 +299,62 @@
 
 
 
+class _MethodMailbox(_AsyncMailbox):
+    """
+    A mailbox used for asynchronous method requests.
+    """
+    def __init__(self, console, 
+                 context,
+                 _timeout=None):
+        """
+        Invoked by application thread.
+        """
+        super(_MethodMailbox, self).__init__(console,
+                                             _timeout)
+        self.context = context
+
+    def deliver(self, reply):
+        """
+        Process method response messages delivered to this mailbox.
+        Invoked by Console Management thread only.
+        """
+
+        _map = reply.content.get(MsgKey.method)
+        if not _map:
+            logging.error("Invalid method call reply message")
+            result = None
+        else:
+            error=_map.get(SchemaMethod.KEY_ERROR)
+            if error:
+                error = QmfData.from_map(error)
+                result = MethodResult(_error=error)
+            else:
+                result = MethodResult(_out_args=_map.get(SchemaMethod.KEY_ARGUMENTS))
+
+        # create workitem
+        wi = WorkItem(WorkItem.METHOD_RESPONSE, self.context, result)
+        self.console._work_q.put(wi)
+        self.console._work_q_put = True
+
+        self.destroy()
+
+
+    def expire(self):
+        """
+        The mailbox expired without receiving a reply.
+        Invoked by the Console Management thread only.
+        """
+        logging.debug("ASYNC MAILBOX EXPIRED @ %s!!!" %
+                      datetime.datetime.utcnow())
+        # send along an empty response
+        wi = WorkItem(WorkItem.METHOD_RESPONSE, self.context, None)
+        self.console._work_q.put(wi)
+        self.console._work_q_put = True
+
+        self.destroy()
+
+
+
 ##==============================================================================
 ## DATA MODEL
 ##==============================================================================
@@ -374,7 +423,7 @@
         query = QmfQuery.create_id_object(self.get_object_id(),
                                           self.get_schema_class_id())
         obj_list = self._agent._console.do_query(self._agent, query,
-                                                timeout=_timeout)
+                                                _timeout=_timeout)
         if obj_list is None or len(obj_list) != 1:
             return None
 
@@ -385,7 +434,7 @@
     def invoke_method(self, name, _in_args={}, _reply_handle=None,
                       _timeout=None):
         """
-        invoke the named method.
+        Invoke the named method on this object.
         """
         assert self._agent
         assert self._agent._console
@@ -397,7 +446,11 @@
         if _timeout is None:
             _timeout = self._agent._console._reply_timeout
 
-        mbox = _SyncMailbox(self._agent._console)
+        if _reply_handle is not None:
+            mbox = _MethodMailbox(self._agent._console,
+                                  _reply_handle)
+        else:
+            mbox = _SyncMailbox(self._agent._console)
         cid = mbox.get_address()
 
         _map = {self.KEY_OBJECT_ID:str(oid),
@@ -417,9 +470,8 @@
             mbox.destroy()
             return None
 
-        # @todo async method calls!!!
         if _reply_handle is not None:
-            print("ASYNC TBD")
+            return True
 
         logging.debug("Waiting for response to method req (%s)" % _timeout)
         replyMsg = mbox.fetch(_timeout)
@@ -561,21 +613,23 @@
     def invoke_method(self, name, _in_args={}, _reply_handle=None,
                       _timeout=None): 
         """
+        Invoke the named method on this agent.
         """
         assert self._console
 
         if _timeout is None:
             _timeout = self._console._reply_timeout
 
-        if _in_args:
-            _in_args = _in_args.copy()
-
-        mbox = _SyncMailbox(self._console)
+        if _reply_handle is not None:
+            mbox = _MethodMailbox(self._console,
+                                  _reply_handle)
+        else:
+            mbox = _SyncMailbox(self._console)
         cid = mbox.get_address()
 
         _map = {SchemaMethod.KEY_NAME:name}
         if _in_args:
-            _map[SchemaMethod.KEY_ARGUMENTS] = _in_args
+            _map[SchemaMethod.KEY_ARGUMENTS] = _in_args.copy()
 
         logging.debug("Sending method req to Agent (%s)" % time.time())
         try:
@@ -585,9 +639,8 @@
             mbox.destroy()
             return None
 
-        # @todo async method calls!!!
         if _reply_handle is not None:
-            print("ASYNC TBD")
+            return True
 
         logging.debug("Waiting for response to method req (%s)" % _timeout)
         replyMsg = mbox.fetch(_timeout)
@@ -939,7 +992,7 @@
         return agent
 
 
-    def do_query(self, agent, query, timeout=None ):
+    def do_query(self, agent, query, _reply_handle=None, _timeout=None ):
         """
         """
         query_keymap={QmfQuery.TARGET_PACKAGES: MsgKey.package_info,
@@ -954,7 +1007,15 @@
         if not msgkey:
             raise Exception("Invalid target for query: %s" % str(query))
 
-        mbox = _SyncMailbox(self)
+        if _reply_handle is not None:
+            mbox = _QueryMailbox(self,
+                                 agent.get_name(),
+                                 _reply_handle,
+                                 target, msgkey,
+                                 _timeout)
+        else:
+            mbox = _SyncMailbox(self)
+
         cid = mbox.get_address()
 
         try:
@@ -965,17 +1026,21 @@
             mbox.destroy()
             return None
 
-        if not timeout:
-            timeout = self._reply_timeout
+        # return now if async reply expected
+        if _reply_handle is not None:
+            return True
+
+        if not _timeout:
+            _timeout = self._reply_timeout
 
-        logging.debug("Waiting for response to Query (%s)" % timeout)
+        logging.debug("Waiting for response to Query (%s)" % _timeout)
         now = datetime.datetime.utcnow()
-        expire =  now + datetime.timedelta(seconds=timeout)
+        expire =  now + datetime.timedelta(seconds=_timeout)
 
         response = []
         while (expire > now):
-            timeout = timedelta_to_secs(expire - now)
-            reply = mbox.fetch(timeout)
+            _timeout = timedelta_to_secs(expire - now)
+            reply = mbox.fetch(_timeout)
             if not reply:
                 logging.debug("Query wait timed-out.")
                 break
@@ -1021,39 +1086,6 @@
         mbox.destroy()
         return response
 
-
-    def do_async_query(self, agent, query, app_handle, _timeout=None ):
-        """
-        """
-        query_keymap={QmfQuery.TARGET_PACKAGES: MsgKey.package_info,
-                      QmfQuery.TARGET_OBJECT_ID: MsgKey.object_id,
-                      QmfQuery.TARGET_SCHEMA_ID: MsgKey.schema_id,
-                      QmfQuery.TARGET_SCHEMA: MsgKey.schema,
-                      QmfQuery.TARGET_OBJECT: MsgKey.data_obj,
-                      QmfQuery.TARGET_AGENT: MsgKey.agent_info}
-
-        target = query.get_target()
-        msgkey = query_keymap.get(target)
-        if not msgkey:
-            raise Exception("Invalid target for query: %s" % str(query))
-
-        mbox = _QueryMailbox(self,
-                             agent.get_name(),
-                             app_handle,
-                             target, msgkey,
-                             _timeout)
-        cid = mbox.get_address()
-
-        try:
-            logging.debug("Sending Query to Agent (%s)" % time.time())
-            agent._send_query(query, cid)
-        except SendError, e:
-            logging.error(str(e))
-            mbox.destroy()
-            return False
-        return True
-
-
     def _wake_thread(self):
         """
         Make the console management thread loop wakeup from its next_receiver
@@ -1189,7 +1221,7 @@
                     query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT,
                                                      t_params)
                 timeout = timedelta_to_secs(expired - now)
-                reply = self.do_query(agent, query, timeout)
+                reply = self.do_query(agent, query, _timeout=timeout)
                 if reply:
                     obj_list = obj_list + reply
             else:
@@ -1209,7 +1241,7 @@
                             [QmfQuery.QUOTE, _pname]]
                 query = QmfQuery.create_predicate(QmfQuery.TARGET_SCHEMA_ID, pred)
                 timeout = timedelta_to_secs(expired - now)
-                sid_list = self.do_query(agent, query, timeout)
+                sid_list = self.do_query(agent, query, _timeout=timeout)
                 if sid_list:
                     for sid in sid_list:
                         now = datetime.datetime.utcnow()
@@ -1221,7 +1253,7 @@
                             t_params = {QmfData.KEY_SCHEMA_ID: sid}
                             query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT, t_params)
                         timeout = timedelta_to_secs(expired - now)
-                        reply = self.do_query(agent, query, timeout)
+                        reply = self.do_query(agent, query, _timeout=timeout)
                         if reply:
                             obj_list = obj_list + reply
         if obj_list:
@@ -1591,8 +1623,7 @@
             self._lock.release()
 
         if need_fetch:
-            mbox = _SchemaPrefetchMailbox(self, agent.get_name(),
-                                          schema_id)
+            mbox = _SchemaPrefetchMailbox(self, schema_id)
             query = QmfQuery.create_id(QmfQuery.TARGET_SCHEMA, schema_id)
             logging.debug("Sending Schema Query to Agent (%s)" % time.time())
             try:
@@ -1628,8 +1659,8 @@
 
         # note: do_query will add the new schema to the cache automatically.
         slist = self.do_query(_agent,
-                             QmfQuery.create_id(QmfQuery.TARGET_SCHEMA, schema_id),
-                             _timeout)
+                              QmfQuery.create_id(QmfQuery.TARGET_SCHEMA, schema_id),
+                              _timeout=_timeout)
         if slist:
             return slist[0]
         else:

Modified: qpid/trunk/qpid/python/qmf2/tests/__init__.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qmf2/tests/__init__.py?rev=909648&r1=909647&r2=909648&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qmf2/tests/__init__.py (original)
+++ qpid/trunk/qpid/python/qmf2/tests/__init__.py Fri Feb 12 23:01:21 2010
@@ -26,3 +26,4 @@
 import events
 import multi_response
 import async_query
+import async_method

Added: qpid/trunk/qpid/python/qmf2/tests/async_method.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qmf2/tests/async_method.py?rev=909648&view=auto
==============================================================================
--- qpid/trunk/qpid/python/qmf2/tests/async_method.py (added)
+++ qpid/trunk/qpid/python/qmf2/tests/async_method.py Fri Feb 12 23:01:21 2010
@@ -0,0 +1,362 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import unittest
+import logging
+from threading import Thread, Event
+
+import qpid.messaging
+from qmf2.common import (Notifier, SchemaObjectClass, SchemaClassId,
+                           SchemaProperty, qmfTypes, SchemaMethod, QmfQuery,
+                           QmfData, WorkItem) 
+import qmf2.console
+from qmf2.agent import(QmfAgentData, Agent, MethodCallParams)
+
+
+class _testNotifier(Notifier):
+    def __init__(self):
+        self._event = Event()
+
+    def indication(self):
+        # note: called by qmf daemon thread
+        self._event.set()
+
+    def wait_for_work(self, timeout):
+        # note: called by application thread to wait
+        # for qmf to generate work
+        self._event.wait(timeout)
+        timed_out = self._event.isSet() == False
+        if not timed_out:
+            self._event.clear()
+            return True
+        return False
+
+
+class _agentApp(Thread):
+    def __init__(self, name, broker_url, heartbeat):
+        Thread.__init__(self)
+        self.notifier = _testNotifier()
+        self.broker_url = broker_url
+        self.agent = Agent(name,
+                           _notifier=self.notifier,
+                           _heartbeat_interval=heartbeat)
+
+        # Dynamically construct a management database
+
+        _schema = SchemaObjectClass( _classId=SchemaClassId("MyPackage", "MyClass"),
+                                     _desc="A test data schema",
+                                     _object_id_names=["index1", "index2"] )
+        # add properties
+        _schema.add_property( "index1", SchemaProperty(qmfTypes.TYPE_UINT8))
+        _schema.add_property( "index2", SchemaProperty(qmfTypes.TYPE_LSTR))
+
+        # these two properties are statistics
+        _schema.add_property( "query_count", SchemaProperty(qmfTypes.TYPE_UINT32))
+        _schema.add_property( "method_call_count", SchemaProperty(qmfTypes.TYPE_UINT32))
+
+        # These two properties can be set via the method call
+        _schema.add_property( "set_string", SchemaProperty(qmfTypes.TYPE_LSTR))
+        _schema.add_property( "set_int", SchemaProperty(qmfTypes.TYPE_UINT32))
+
+        # add method
+        _meth = SchemaMethod( _desc="Method to set string and int in object." )
+        _meth.add_argument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) )
+        _meth.add_argument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) )
+        # the input value of cookie is returned in the response
+        _meth.add_argument( "cookie", SchemaProperty(qmfTypes.TYPE_LSTR,
+                                                     kwargs={"dir":"IO"}))
+        _schema.add_method( "set_meth", _meth )
+
+        # Add schema to Agent
+
+        self.agent.register_object_class(_schema)
+
+        # instantiate managed data objects matching the schema
+
+        _obj1 = QmfAgentData( self.agent, _schema=_schema,
+                              _values={"index1":100, "index2":"a name"})
+        _obj1.set_value("set_string", "UNSET")
+        _obj1.set_value("set_int", 0)
+        _obj1.set_value("query_count", 0)
+        _obj1.set_value("method_call_count", 0)
+        self.agent.add_object( _obj1 )
+
+        self.agent.add_object( QmfAgentData( self.agent, _schema=_schema,
+                                             _values={"index1":99, 
+                                                      "index2": "another name",
+                                                      "set_string": "UNSET",
+                                                      "set_int": 0,
+                                                      "query_count": 0,
+                                                      "method_call_count": 0} ))
+
+        # add an "unstructured" object to the Agent
+        _obj2 = QmfAgentData(self.agent, _object_id="01545")
+        _obj2.set_value("field1", "a value")
+        _obj2.set_value("field2", 2)
+        _obj2.set_value("field3", {"a":1, "map":2, "value":3})
+        _obj2.set_value("field4", ["a", "list", "value"])
+        self.agent.add_object(_obj2)
+
+        self.running = False
+        self.ready = Event()
+
+    def start_app(self):
+        self.running = True
+        self.start()
+        self.ready.wait(10)
+        if not self.ready.is_set():
+            raise Exception("Agent failed to connect to broker.")
+
+    def stop_app(self):
+        self.running = False
+        # wake main thread
+        self.notifier.indication() # hmmm... collide with daemon???
+        self.join(10)
+        if self.isAlive():
+            raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!")
+
+    def run(self):
+        # broker_url = "user/passwd@hostname:port"
+        self.conn = qpid.messaging.Connection(self.broker_url.host,
+                                              self.broker_url.port,
+                                              self.broker_url.user,
+                                              self.broker_url.password)
+        self.conn.connect()
+        self.agent.set_connection(self.conn)
+        self.ready.set()
+
+        # Agent application main processing loop
+        while self.running:
+            self.notifier.wait_for_work(None)
+            wi = self.agent.get_next_workitem(timeout=0)
+            while wi is not None:
+                if wi.get_type() == WorkItem.METHOD_CALL:
+                    mc = wi.get_params()
+                    if not isinstance(mc, MethodCallParams):
+                        raise Exception("Unexpected method call parameters")
+
+                    if mc.get_name() == "set_meth":
+                        obj = self.agent.get_object(mc.get_object_id(),
+                                                    mc.get_schema_id())
+                        if obj is None:
+                            error_info = QmfData.create({"code": -2, 
+                                                         "description":
+                                                             "Bad Object Id."},
+                                                        _object_id="_error")
+                            self.agent.method_response(wi.get_handle(),
+                                                       _error=error_info)
+                        else:
+                            obj.inc_value("method_call_count")
+                            out_args = {"code" : 0}
+                            if "cookie" in mc.get_args():
+                                out_args["cookie"] = mc.get_args()["cookie"]
+                            if "arg_int" in mc.get_args():
+                                obj.set_value("set_int", mc.get_args()["arg_int"])
+                            if "arg_str" in mc.get_args():
+                                obj.set_value("set_string", mc.get_args()["arg_str"])
+                            self.agent.method_response(wi.get_handle(),
+                                                       out_args)
+                    elif mc.get_name() == "a_method":
+                        obj = self.agent.get_object(mc.get_object_id(),
+                                                    mc.get_schema_id())
+                        if obj is None:
+                            error_info = QmfData.create({"code": -3, 
+                                                         "description":
+                                                             "Unknown object id."},
+                                                        _object_id="_error")
+                            self.agent.method_response(wi.get_handle(),
+                                                       _error=error_info)
+                        elif obj.get_object_id() != "01545":
+                            error_info = QmfData.create( {"code": -4, 
+                                                          "description":
+                                                              "Unexpected id."},
+                                                         _object_id="_error")
+                            self.agent.method_response(wi.get_handle(),
+                                                       _error=error_info)
+                        else:
+                            args = mc.get_args()
+                            if ("arg1" in args and args["arg1"] == 1 and
+                                "arg2" in args and args["arg2"] == "Now set!"
+                                and "arg3" in args and args["arg3"] == 1966): 
+                                out_args = {"code" : 0}
+                                if "cookie" in mc.get_args():
+                                    out_args["cookie"] = mc.get_args()["cookie"]
+                                self.agent.method_response(wi.get_handle(),
+                                                           out_args)
+                            else:
+                                error_info = QmfData.create(
+                                    {"code": -5, 
+                                     "description":
+                                         "Bad Args."},
+                                    _object_id="_error")
+                                self.agent.method_response(wi.get_handle(),
+                                                           _error=error_info)
+                    else:
+                        error_info = QmfData.create( {"code": -1, 
+                                                     "description":
+                                                         "Unknown method call."},
+                                                     _object_id="_error")
+                        self.agent.method_response(wi.get_handle(), _error=error_info)
+
+                self.agent.release_workitem(wi)
+                wi = self.agent.get_next_workitem(timeout=0)
+
+        if self.conn:
+            self.agent.remove_connection(10)
+        self.agent.destroy(10)
+
+
+
+class BaseTest(unittest.TestCase):
+    def configure(self, config):
+        self.config = config
+        self.broker = config.broker
+        self.defines = self.config.defines
+
+    def setUp(self):
+        # one second agent heartbeat interval
+        self.agent_heartbeat = 1
+        self.agent1 = _agentApp("agent1", self.broker, self.agent_heartbeat)
+        self.agent1.start_app()
+        self.agent2 = _agentApp("agent2", self.broker, self.agent_heartbeat)
+        self.agent2.start_app()
+
+    def tearDown(self):
+        if self.agent1:
+            self.agent1.stop_app()
+            self.agent1 = None
+        if self.agent2:
+            self.agent2.stop_app()
+            self.agent2 = None
+
+    def test_described_obj(self):
+        # create console
+        # find agents
+        # synchronous query for all objects in schema
+        # method call on each object
+        self.notifier = _testNotifier()
+        self.console = qmf2.console.Console(notifier=self.notifier,
+                                            agent_timeout=3)
+        self.conn = qpid.messaging.Connection(self.broker.host,
+                                              self.broker.port,
+                                              self.broker.user,
+                                              self.broker.password)
+        self.conn.connect()
+        self.console.add_connection(self.conn)
+
+        i_count = 0
+        for aname in ["agent1", "agent2"]:
+            agent = self.console.find_agent(aname, timeout=3)
+            self.assertTrue(agent and agent.get_name() == aname)
+
+            query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA_ID)
+
+            sid_list = self.console.do_query(agent, query)
+            self.assertTrue(sid_list and len(sid_list) == 1)
+            for sid in sid_list:
+                t_params = {QmfData.KEY_SCHEMA_ID: sid}
+                query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT,
+                                                _target_params=t_params)
+                obj_list = self.console.do_query(agent, query)
+                self.assertTrue(len(obj_list) == 2)
+                for obj in obj_list:
+                    cookie = "cookie-" + str(i_count)
+                    i_count += 1
+                    mr = obj.invoke_method( "set_meth", 
+                                            {"arg_int": -99,
+                                             "arg_str": "Now set!",
+                                             "cookie": cookie},
+                                            _reply_handle=cookie,
+                                            _timeout=3)
+                    self.assertTrue(mr)
+
+        # done, now wait for async responses
+
+        r_count = 0
+        while self.notifier.wait_for_work(3):
+            wi = self.console.get_next_workitem(timeout=0)
+            while wi is not None:
+                r_count += 1
+                self.assertTrue(wi.get_type() == WorkItem.METHOD_RESPONSE)
+                reply = wi.get_params()
+                self.assertTrue(isinstance(reply, qmf2.console.MethodResult))
+                self.assertTrue(reply.succeeded())
+                self.assertTrue(reply.get_argument("cookie") == wi.get_handle())
+
+                wi = self.console.get_next_workitem(timeout=0)
+
+        self.assertTrue(r_count == i_count)
+
+        self.console.destroy(10)
+
+
+    def test_managed_obj(self):
+        # create console
+        # find agents
+        # synchronous query for a managed object
+        # method call on each object
+        self.notifier = _testNotifier()
+        self.console = qmf2.console.Console(notifier=self.notifier,
+                                              agent_timeout=3)
+        self.conn = qpid.messaging.Connection(self.broker.host,
+                                              self.broker.port,
+                                              self.broker.user,
+                                              self.broker.password)
+        self.conn.connect()
+        self.console.add_connection(self.conn)
+
+        i_count = 0
+        for aname in ["agent1", "agent2"]:
+            agent = self.console.find_agent(aname, timeout=3)
+            self.assertTrue(agent and agent.get_name() == aname)
+
+            query = QmfQuery.create_id(QmfQuery.TARGET_OBJECT, "01545")
+            obj_list = self.console.do_query(agent, query)
+
+            self.assertTrue(isinstance(obj_list, type([])))
+            self.assertTrue(len(obj_list) == 1)
+            obj = obj_list[0]
+
+            cookie = "cookie-" + str(i_count)
+            i_count += 1
+            mr = obj.invoke_method("a_method",
+                                   {"arg1": 1,
+                                    "arg2": "Now set!",
+                                    "arg3": 1966,
+                                    "cookie": cookie},
+                                   _reply_handle=cookie,
+                                   _timeout=3)
+            self.assertTrue(mr)
+
+        # done, now wait for async responses
+
+        r_count = 0
+        while self.notifier.wait_for_work(3):
+            wi = self.console.get_next_workitem(timeout=0)
+            while wi is not None:
+                r_count += 1
+                self.assertTrue(wi.get_type() == WorkItem.METHOD_RESPONSE)
+                reply = wi.get_params()
+                self.assertTrue(isinstance(reply, qmf2.console.MethodResult))
+                self.assertTrue(reply.succeeded())
+                self.assertTrue(reply.get_argument("cookie") == wi.get_handle())
+
+                wi = self.console.get_next_workitem(timeout=0)
+
+        self.assertTrue(r_count == i_count)
+
+        self.console.destroy(10)

Modified: qpid/trunk/qpid/python/qmf2/tests/async_query.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qmf2/tests/async_query.py?rev=909648&r1=909647&r2=909648&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qmf2/tests/async_query.py (original)
+++ qpid/trunk/qpid/python/qmf2/tests/async_query.py Fri Feb 12 23:01:21 2010
@@ -227,7 +227,8 @@
 
             # send queries
             query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA_ID)
-            rc = self.console.do_async_query(agent, query, aname)
+            rc = self.console.do_query(agent, query,
+                                       _reply_handle=aname)
             self.assertTrue(rc)
 
         # done.  Now wait for async responses
@@ -273,7 +274,7 @@
 
             # send queries
             query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT)
-            rc = self.console.do_async_query(agent, query, aname)
+            rc = self.console.do_query(agent, query, _reply_handle=aname)
             self.assertTrue(rc)
 
         # done.  Now wait for async responses
@@ -320,7 +321,7 @@
             t_params = {QmfData.KEY_SCHEMA_ID: SchemaClassId("MyPackage", "MyClass")}
             query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT, t_params)
             #
-            rc = self.console.do_async_query(agent, query, aname)
+            rc = self.console.do_query(agent, query, _reply_handle=aname)
             self.assertTrue(rc)
 
         # done.  Now wait for async responses
@@ -371,7 +372,7 @@
 
             # send queries
             query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA)
-            rc = self.console.do_async_query(agent, query, aname)
+            rc = self.console.do_query(agent, query, _reply_handle=aname)
             self.assertTrue(rc)
 
         # done.  Now wait for async responses
@@ -436,8 +437,9 @@
         # now send queries to agents that no longer exist
         for agent in agents:
             query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA)
-            rc = self.console.do_async_query(agent, query, agent.get_name(),
-                                             _timeout=2)
+            rc = self.console.do_query(agent, query,
+                                       _reply_handle=agent.get_name(),
+                                       _timeout=2)
             self.assertTrue(rc)
 
         # done.  Now wait for async responses due to timeouts



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org