You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2010/02/04 20:38:55 UTC

svn commit: r906615 - in /qpid/trunk/qpid/python/qmf2: agent.py common.py console.py tests/__init__.py tests/multi_response.py

Author: kgiusti
Date: Thu Feb  4 19:38:55 2010
New Revision: 906615

URL: http://svn.apache.org/viewvc?rev=906615&view=rev
Log:
QPID-2261: add multi-msg query response support.  Fix mailbox code to allow mult-msg per correlation id.

Added:
    qpid/trunk/qpid/python/qmf2/tests/multi_response.py
Modified:
    qpid/trunk/qpid/python/qmf2/agent.py
    qpid/trunk/qpid/python/qmf2/common.py
    qpid/trunk/qpid/python/qmf2/console.py
    qpid/trunk/qpid/python/qmf2/tests/__init__.py

Modified: qpid/trunk/qpid/python/qmf2/agent.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qmf2/agent.py?rev=906615&r1=906614&r2=906615&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qmf2/agent.py (original)
+++ qpid/trunk/qpid/python/qmf2/agent.py Thu Feb  4 19:38:55 2010
@@ -95,6 +95,8 @@
         self._address = QmfAddress.direct(self.name, self._domain)
         self._notifier = _notifier
         self._heartbeat_interval = _heartbeat_interval
+        # @todo: currently, max # of objects in a single reply message, would
+        # be better if it were max bytesize of per-msg content...
         self._max_msg_size = _max_msg_size
         self._capacity = _capacity
 
@@ -456,6 +458,38 @@
         except SendError, e:
             logging.error("Failed to send reply msg '%s' (%s)" % (msg, str(e)))
 
+    def _send_query_response(self, subject, msgkey, cid, reply_to, objects):
+        """
+        Send a response to a query, breaking the result into multiple
+        messages based on the agent's _max_msg_size config parameter
+        """
+
+        total = len(objects)
+        if self._max_msg_size:
+            max_count = self._max_msg_size
+        else:
+            max_count = total
+
+        start = 0
+        end = min(total, max_count)
+        while end <= total:
+            m = Message(properties={"qmf.subject":subject,
+                                    "method":"response"},
+                        correlation_id = cid,
+                        content={msgkey:objects[start:end]})
+            self._send_reply(m, reply_to)
+            if end == total:
+                break;
+            start = end
+            end = min(total, end + max_count)
+
+        # response terminator - last message has empty object array
+        if total:
+            m = Message(properties={"qmf.subject":subject,
+                                    "method":"response"},
+                        correlation_id = cid,
+                        content={msgkey: []} )
+            self._send_reply(m, reply_to)
 
     def _dispatch(self, msg, _direct=False):
         """
@@ -615,12 +649,11 @@
         finally:
             self._lock.release()
 
-        m = Message(properties={"qmf.subject":make_subject(OpCode.data_ind),
-                                "method":"response"},
-                    content={MsgKey.package_info: pnames} )
-        if msg.correlation_id != None:
-            m.correlation_id = msg.correlation_id
-        self._send_reply(m, msg.reply_to)
+        self._send_query_response(make_subject(OpCode.data_ind),
+                                  MsgKey.package_info,
+                                  msg.correlation_id,
+                                  msg.reply_to,
+                                  pnames)
 
     def _querySchema( self, msg, query, _idOnly=False ):
         """
@@ -652,17 +685,15 @@
                 self._lock.release()
 
         if _idOnly:
-            content = {MsgKey.schema_id: schemas}
+            msgkey = MsgKey.schema_id
         else:
-            content = {MsgKey.schema:schemas}
-
-        m = Message(properties={"method":"response",
-                                "qmf.subject":make_subject(OpCode.data_ind)},
-                    content=content )
-        if msg.correlation_id != None:
-            m.correlation_id = msg.correlation_id
+            msgkey = MsgKey.schema
 
-        self._send_reply(m, msg.reply_to)
+        self._send_query_response(make_subject(OpCode.data_ind),
+                                  msgkey,
+                                  msg.correlation_id,
+                                  msg.reply_to,
+                                  schemas)
 
 
     def _queryData( self, msg, query, _idOnly=False ):
@@ -736,17 +767,16 @@
                 self._lock.release()
 
         if _idOnly:
-            content = {MsgKey.object_id:data_objs}
+            msgkey = MsgKey.object_id
         else:
-            content = {MsgKey.data_obj:data_objs}
+            msgkey = MsgKey.data_obj
 
-        m = Message(properties={"method":"response",
-                                "qmf.subject":make_subject(OpCode.data_ind)},
-                    content=content )
-        if msg.correlation_id != None:
-            m.correlation_id = msg.correlation_id
+        self._send_query_response(make_subject(OpCode.data_ind),
+                                  msgkey,
+                                  msg.correlation_id,
+                                  msg.reply_to,
+                                  data_objs)
 
-        self._send_reply(m, msg.reply_to)
 
 
   ##==============================================================================

Modified: qpid/trunk/qpid/python/qmf2/common.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qmf2/common.py?rev=906615&r1=906614&r2=906615&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qmf2/common.py (original)
+++ qpid/trunk/qpid/python/qmf2/common.py Thu Feb  4 19:38:55 2010
@@ -969,6 +969,28 @@
         return cls(_target=target, _target_params=_target_params)
     create_wildcard = classmethod(_create_wildcard)
 
+    def _create_wildcard_object_id(cls, schema_id):
+        """
+        Create a wildcard to match all object_ids for a given schema.
+        """
+        if not isinstance(schema_id, SchemaClassId):
+            raise TypeError("class SchemaClassId expected")
+        params = {QmfData.KEY_SCHEMA_ID: schema_id}
+        return cls(_target=QmfQuery.TARGET_OBJECT_ID,
+                   _target_params=params)
+    create_wildcard_object_id = classmethod(_create_wildcard_object_id)
+
+    def _create_wildcard_object(cls, schema_id):
+        """
+        Create a wildcard to match all objects for a given schema.
+        """
+        if not isinstance(schema_id, SchemaClassId):
+            raise TypeError("class SchemaClassId expected")
+        params = {QmfData.KEY_SCHEMA_ID: schema_id}
+        return cls(_target=QmfQuery.TARGET_OBJECT,
+                   _target_params=params)
+    create_wildcard_object = classmethod(_create_wildcard_object)
+
     def _create_predicate(cls, target, predicate, _target_params=None): 
         return cls(_target=target, _target_params=_target_params,
                    _predicate=predicate)

Modified: qpid/trunk/qpid/python/qmf2/console.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qmf2/console.py?rev=906615&r1=906614&r2=906615&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qmf2/console.py (original)
+++ qpid/trunk/qpid/python/qmf2/console.py Thu Feb  4 19:38:55 2010
@@ -44,146 +44,56 @@
 
 
 ##==============================================================================
-## Sequence Manager  
+## Console Transaction Management
+##
+## At any given time, a console application may have multiple outstanding
+## message transactions with agents.  The following objects allow the console 
+## to track these outstanding transactions.
 ##==============================================================================
 
+
 class _Mailbox(object):
     """
-    Virtual base class for all Mailbox-like objects
+    Virtual base class for all Mailbox-like objects.
+    """
+    def deliver(self, data):
+        raise Exception("_Mailbox deliver() method must be provided")
+
+
+class _WaitableMailbox(_Mailbox):
+    """
+    A simple mailbox that allows a consumer to wait for delivery of data.
     """
     def __init__(self):
-        self._msgs = []
+        self._data = []
         self._cv = Condition()
         self._waiting = False
 
-    def deliver(self, obj):
+    def deliver(self, data):
+        """ Drop data into the mailbox, waking any waiters if necessary. """
         self._cv.acquire()
         try:
-            self._msgs.append(obj)
+            self._data.append(data)
             # if was empty, notify waiters
-            if len(self._msgs) == 1:
+            if len(self._data) == 1:
                 self._cv.notify()
         finally:
             self._cv.release()
 
     def fetch(self, timeout=None):
+        """ Get one data item from a mailbox, with timeout. """
         self._cv.acquire()
         try:
-            if len(self._msgs) == 0:
+            if len(self._data) == 0:
                 self._cv.wait(timeout)
-            if len(self._msgs):
-                return self._msgs.pop()
+            if len(self._data):
+                return self._data.pop(0)
             return None
         finally:
             self._cv.release()
 
 
 
-class SequencedWaiter(object):
-    """ 
-    Manage sequence numbers for asynchronous method calls. 
-    Allows the caller to associate a generic piece of data with a unique sequence
-    number."""
-
-    def __init__(self):
-        self.lock     = Lock()
-        self.sequence = long(time.time())  # pseudo-randomize seq start
-        self.pending  = {}
-
-
-    def allocate(self):
-        """ 
-        Reserve a sequence number.
-        
-        @rtype: long
-        @return: a unique nonzero sequence number.
-        """
-        self.lock.acquire()
-        try:
-            seq = self.sequence
-            self.sequence = self.sequence + 1
-            self.pending[seq] = _Mailbox()
-        finally:
-            self.lock.release()
-        logging.debug( "sequence %d allocated" % seq)
-        return seq
-
-
-    def put_data(self, seq, new_data):
-        seq = long(seq)
-        logging.debug( "putting data [%r] to seq %d..." % (new_data, seq) )
-        self.lock.acquire()
-        try:
-            if seq in self.pending:
-                # logging.error("Putting seq %d @ %s" % (seq,time.time()))
-                self.pending[seq].deliver(new_data)
-            else:
-                logging.error( "seq %d not found!" % seq )
-        finally:
-            self.lock.release()
-
-
-
-    def get_data(self, seq, timeout=None):
-        """ 
-        Release a sequence number reserved using the reserve method.  This must
-        be called when the sequence is no longer needed.
-        
-        @type seq: int
-        @param seq: a sequence previously allocated by calling reserve().
-        @rtype: any
-        @return: the data originally associated with the reserved sequence number.
-        """
-        seq = long(seq)
-        logging.debug( "getting data for seq=%d" % seq)
-        mbox = None
-        self.lock.acquire()
-        try:
-            if seq in self.pending:
-                mbox = self.pending[seq]
-        finally:
-            self.lock.release()
-
-        # Note well: pending list is unlocked, so we can wait.
-        # we reference mbox locally, so it will not be released
-        # until we are done.
-
-        if mbox:
-            d = mbox.fetch(timeout)
-            logging.debug( "seq %d fetched %r!" % (seq, d) )
-            return d
-
-        logging.debug( "seq %d not found!" % seq )
-        return None
-
-
-    def release(self, seq):
-        """
-        Release the sequence, and its mailbox
-        """
-        seq = long(seq)
-        logging.debug( "releasing seq %d" % seq )
-        self.lock.acquire()
-        try:
-            if seq in self.pending:
-                del self.pending[seq]
-        finally:
-            self.lock.release()
-
-
-    def is_valid(self, seq):
-        """
-        True if seq is in use, else False (seq is unknown)
-        """
-        seq = long(seq)
-        self.lock.acquire()
-        try:
-            return seq in self.pending
-        finally:
-            self.lock.release()
-        return False
-
-
 ##==============================================================================
 ## DATA MODEL
 ##==============================================================================
@@ -275,9 +185,8 @@
         if _timeout is None:
             _timeout = self._agent._console._reply_timeout
 
-        handle = self._agent._console._req_correlation.allocate()
-        if handle == 0:
-            raise Exception("Can not allocate a correlation id!")
+        mbox = _WaitableMailbox()
+        cid = self._agent._console._add_mailbox(mbox)
 
         _map = {self.KEY_OBJECT_ID:str(oid),
                 SchemaMethod.KEY_NAME:name}
@@ -290,10 +199,10 @@
 
         logging.debug("Sending method req to Agent (%s)" % time.time())
         try:
-            self._agent._send_method_req(_map, handle)
+            self._agent._send_method_req(_map, cid)
         except SendError, e:
             logging.error(str(e))
-            self._agent._console._req_correlation.release(handle)
+            self._agent._console._remove_mailbox(cid)
             return None
 
         # @todo async method calls!!!
@@ -301,8 +210,9 @@
             print("ASYNC TBD")
 
         logging.debug("Waiting for response to method req (%s)" % _timeout)
-        replyMsg = self._agent._console._req_correlation.get_data(handle, _timeout)
-        self._agent._console._req_correlation.release(handle)
+        replyMsg = mbox.fetch(_timeout)
+        self._agent._console._remove_mailbox(cid)
+
         if not replyMsg:
             logging.debug("Agent method req wait timed-out.")
             return None
@@ -376,10 +286,6 @@
         Low-level routine to asynchronously send a message to this agent.
         """
         msg.reply_to = str(self._console._address)
-        # handle = self._console._req_correlation.allocate()
-        # if handle == 0:
-        #    raise Exception("Can not allocate a correlation id!")
-        # msg.correlation_id = str(handle)
         if correlation_id:
             msg.correlation_id = str(correlation_id)
         # TRACE
@@ -452,9 +358,8 @@
         if _in_args:
             _in_args = _in_args.copy()
 
-        handle = self._console._req_correlation.allocate()
-        if handle == 0:
-            raise Exception("Can not allocate a correlation id!")
+        mbox = _WaitableMailbox()
+        cid = self._console._add_mailbox(mbox)
 
         _map = {SchemaMethod.KEY_NAME:name}
         if _in_args:
@@ -462,10 +367,10 @@
 
         logging.debug("Sending method req to Agent (%s)" % time.time())
         try:
-            self._send_method_req(_map, handle)
+            self._send_method_req(_map, cid)
         except SendError, e:
             logging.error(str(e))
-            self._console._req_correlation.release(handle)
+            self._console._remove_mailbox(cid)
             return None
 
         # @todo async method calls!!!
@@ -473,8 +378,9 @@
             print("ASYNC TBD")
 
         logging.debug("Waiting for response to method req (%s)" % _timeout)
-        replyMsg = self._console._req_correlation.get_data(handle, _timeout)
-        self._console._req_correlation.release(handle)
+        replyMsg = mbox.fetch(_timeout)
+        self._console._remove_mailbox(cid)
+
         if not replyMsg:
             logging.debug("Agent method req wait timed-out.")
             return None
@@ -591,7 +497,6 @@
         self._announce_recvr = None
         self._locate_sender = None
         self._schema_cache = {}
-        self._req_correlation = SequencedWaiter()
         self._agent_discovery_filter = None
         self._reply_timeout = reply_timeout
         self._agent_timeout = agent_timeout
@@ -601,6 +506,10 @@
         # for passing WorkItems to the application
         self._work_q = Queue.Queue()
         self._work_q_put = False
+        # Correlation ID and mailbox storage
+        self._correlation_id = long(time.time())  # pseudo-randomize
+        self._post_office = {} # indexed by cid
+        
         ## Old stuff below???
         #self._broker_list = []
         #self.impl = qmfengine.Console()
@@ -763,9 +672,8 @@
 
         # agent not present yet - ping it with an agent_locate
 
-        handle = self._req_correlation.allocate()
-        if handle == 0:
-            raise Exception("Can not allocate a correlation id!")
+        mbox = _WaitableMailbox()
+        cid = self._add_mailbox(mbox)
 
         query = QmfQuery.create_id(QmfQuery.TARGET_AGENT, name)
         msg = Message(subject="console.ind.locate." + name,
@@ -773,7 +681,7 @@
                                   "qmf.subject":make_subject(OpCode.agent_locate)},
                       content={MsgKey.query: query.map_encode()})
         msg.reply_to = str(self._address)
-        msg.correlation_id = str(handle)
+        msg.correlation_id = str(cid)
         logging.debug("Sending Agent Locate (%s)" % time.time())
         # TRACE
         #logging.error("!!! Console %s sending agent locate (%s)" % 
@@ -782,7 +690,7 @@
             self._topic_sender.send(msg)
         except SendError, e:
             logging.error(str(e))
-            self._req_correlation.release(handle)
+            self._remove_mailbox(cid)
             return None
 
         if timeout is None:
@@ -790,14 +698,15 @@
 
         new_agent = None
         logging.debug("Waiting for response to Agent Locate (%s)" % timeout)
-        self._req_correlation.get_data( handle, timeout )
-        self._req_correlation.release(handle)
+        mbox.fetch(timeout)
+        self._remove_mailbox(cid)
         logging.debug("Agent Locate wait ended (%s)" % time.time())
         self._lock.acquire()
         try:
             new_agent = self._agent_map.get(name)
         finally:
             self._lock.release()
+
         return new_agent
 
 
@@ -828,82 +737,96 @@
     def do_query(self, agent, query, timeout=None ):
         """
         """
+        query_keymap={QmfQuery.TARGET_PACKAGES: MsgKey.package_info,
+                      QmfQuery.TARGET_OBJECT_ID: MsgKey.object_id,
+                      QmfQuery.TARGET_SCHEMA_ID: MsgKey.schema_id,
+                      QmfQuery.TARGET_SCHEMA: MsgKey.schema,
+                      QmfQuery.TARGET_OBJECT: MsgKey.data_obj,
+                      QmfQuery.TARGET_AGENT: MsgKey.agent_info}
 
         target = query.get_target()
-        handle = self._req_correlation.allocate()
-        if handle == 0:
-            raise Exception("Can not allocate a correlation id!")
+        msgkey = query_keymap.get(target)
+        if not msgkey:
+            raise Exception("Invalid target for query: %s" % str(query))
+
+        mbox = _WaitableMailbox()
+        cid = self._add_mailbox(mbox)
+
         try:
             logging.debug("Sending Query to Agent (%s)" % time.time())
-            agent._send_query(query, handle)
+            agent._send_query(query, cid)
         except SendError, e:
             logging.error(str(e))
-            self._req_correlation.release(handle)
+            self._remove_mailbox(cid)
             return None
 
         if not timeout:
             timeout = self._reply_timeout
 
         logging.debug("Waiting for response to Query (%s)" % timeout)
-        reply = self._req_correlation.get_data(handle, timeout)
-        self._req_correlation.release(handle)
-        if not reply:
-            logging.debug("Agent Query wait timed-out.")
-            return None
+        now = datetime.datetime.utcnow()
+        expire =  now + datetime.timedelta(seconds=timeout)
+
+        response = []
+        while (expire > now):
+            timeout = timedelta_to_secs(expire - now)
+            reply = mbox.fetch(timeout)
+            if not reply:
+                logging.debug("Query wait timed-out.")
+                break
+
+            objects = reply.content.get(msgkey)
+            if not objects:
+                # last response is empty
+                break
+
+            # convert from map to native types if needed
+            if target == QmfQuery.TARGET_SCHEMA_ID:
+                for sid_map in objects:
+                    response.append(SchemaClassId.from_map(sid_map))
+
+            elif target == QmfQuery.TARGET_SCHEMA:
+                for schema_map in objects:
+                    # extract schema id, convert based on schema type
+                    sid_map = schema_map.get(SchemaClass.KEY_SCHEMA_ID)
+                    if sid_map:
+                        sid = SchemaClassId.from_map(sid_map)
+                        if sid:
+                            if sid.get_type() == SchemaClassId.TYPE_DATA:
+                                schema = SchemaObjectClass.from_map(schema_map)
+                            else:
+                                schema = SchemaEventClass.from_map(schema_map)
+                            self._add_schema(schema)  # add to schema cache
+                            response.append(schema)
+
+            elif target == QmfQuery.TARGET_OBJECT:
+                for obj_map in objects:
+                    obj = QmfConsoleData(map_=obj_map, agent=agent)
+                    response.append(obj)
+                    # @todo prefetch unknown schema
+                    # sid_map = obj_map.get(QmfData.KEY_SCHEMA_ID)
+                    # if sid_map:
+                    #     sid = SchemaClassId.from_map(sid_map)
+                    #     # if the object references a schema, fetch it
+                    #     # schema = self._fetch_schema(sid, _agent=agent,
+                    #     # _timeout=timeout)
+                    #     # if not schema:
+                    #     #   logging.warning("Unknown schema, id=%s" % sid)
+                    #     #   continue
+                    #     obj = QmfConsoleData(map_=obj_map, agent=agent,
+                    #                          _schema=schema)
+                    # else:
+                    #     # no schema needed
+            else:
+                # no conversion needed.
+                response += objects
+
+            now = datetime.datetime.utcnow()
+
+        self._remove_mailbox(cid)
+        return response
+
 
-        if target == QmfQuery.TARGET_PACKAGES:
-            # simply pass back the list of package names
-            logging.debug("Response to Packet Query received")
-            return reply.content.get(MsgKey.package_info)
-        elif target == QmfQuery.TARGET_OBJECT_ID:
-            # simply pass back the list of object_id's
-            logging.debug("Response to Object Id Query received")
-            return reply.content.get(MsgKey.object_id)
-        elif target == QmfQuery.TARGET_SCHEMA_ID:
-            logging.debug("Response to Schema Id Query received")
-            id_list = []
-            for sid_map in reply.content.get(MsgKey.schema_id):
-                id_list.append(SchemaClassId.from_map(sid_map))
-            return id_list
-        elif target == QmfQuery.TARGET_SCHEMA:
-            logging.debug("Response to Schema Query received")
-            schema_list = []
-            for schema_map in reply.content.get(MsgKey.schema):
-                # extract schema id, convert based on schema type
-                sid_map = schema_map.get(SchemaClass.KEY_SCHEMA_ID)
-                if sid_map:
-                    sid = SchemaClassId.from_map(sid_map)
-                    if sid:
-                        if sid.get_type() == SchemaClassId.TYPE_DATA:
-                            schema = SchemaObjectClass.from_map(schema_map)
-                        else:
-                            schema = SchemaEventClass.from_map(schema_map)
-                        schema_list.append(schema)
-                        self._add_schema(schema)
-            return schema_list
-        elif target == QmfQuery.TARGET_OBJECT:
-            logging.debug("Response to Object Query received")
-            obj_list = []
-            for obj_map in reply.content.get(MsgKey.data_obj):
-                obj = QmfConsoleData(map_=obj_map, agent=agent)
-                obj_list.append(obj)
-                # sid_map = obj_map.get(QmfData.KEY_SCHEMA_ID)
-                # if sid_map:
-                #     sid = SchemaClassId.from_map(sid_map)
-                #     # if the object references a schema, fetch it
-                #     # schema = self._fetch_schema(sid, _agent=agent,
-                #     # _timeout=timeout)
-                #     # if not schema:
-                #     #   logging.warning("Unknown schema, id=%s" % sid)
-                #     #   continue
-                #     obj = QmfConsoleData(map_=obj_map, agent=agent,
-                #                          _schema=schema)
-                # else:
-                #     # no schema needed
-            return obj_list
-        else:
-            logging.warning("Unexpected Target for a Query: '%s'" % target)
-            return None
 
     def run(self):
         global _callback_thread
@@ -1115,7 +1038,8 @@
 
         correlated = False
         if msg.correlation_id:
-            correlated = self._req_correlation.is_valid(msg.correlation_id)
+            mbox = self._get_mailbox(msg.correlation_id)
+            correlated = mbox is not None
 
         agent = None
         self._lock.acquire()
@@ -1150,7 +1074,7 @@
         if correlated:
             # wake up all waiters
             logging.debug("waking waiters for correlation id %s" % msg.correlation_id)
-            self._req_correlation.put_data(msg.correlation_id, msg)
+            mbox.deliver(msg)
 
     def _handle_data_ind_msg(self, msg, cmap, version, direct):
         """
@@ -1158,14 +1082,15 @@
         """
         logging.debug("_handle_data_ind_msg '%s' (%s)" % (msg, time.time()))
 
-        if not self._req_correlation.is_valid(msg.correlation_id):
+        mbox = self._get_mailbox(msg.correlation_id)
+        if not mbox:
             logging.debug("Data indicate received with unknown correlation_id"
                           " msg='%s'" % str(msg)) 
             return
 
         # wake up all waiters
         logging.debug("waking waiters for correlation id %s" % msg.correlation_id)
-        self._req_correlation.put_data(msg.correlation_id, msg)
+        mbox.deliver(msg)
 
 
     def _handle_response_msg(self, msg, cmap, version, direct):
@@ -1175,14 +1100,15 @@
         # @todo code replication - clean me.
         logging.debug("_handle_response_msg '%s' (%s)" % (msg, time.time()))
 
-        if not self._req_correlation.is_valid(msg.correlation_id):
+        mbox = self._get_mailbox(msg.correlation_id)
+        if not mbox:
             logging.debug("Response msg received with unknown correlation_id"
                           " msg='%s'" % str(msg))
             return
 
         # wake up all waiters
         logging.debug("waking waiters for correlation id %s" % msg.correlation_id)
-        self._req_correlation.put_data(msg.correlation_id, msg)
+        mbox.deliver(msg)
 
     def _handle_event_ind_msg(self, msg, cmap, version, _direct):
         ei_map = cmap.get(MsgKey.event)
@@ -1393,6 +1319,46 @@
         else:
             return None
 
+    def _add_mailbox(self, mbox):
+        """ Add a mailbox to the post office, return a unique identifier """
+        cid = 0
+        self._lock.acquire()
+        try:
+            cid = self._correlation_id
+            self._correlation_id += 1
+            self._post_office[cid] = mbox
+        finally:
+            self._lock.release()
+        return cid
+
+    def _get_mailbox(self, mid):
+        try:
+            mid = long(mid)
+        except TypeError:
+            logging.error("Invalid mailbox id: %s" % str(mid))
+            return None
+
+        self._lock.acquire()
+        try:
+            return self._post_office.get(mid)
+        finally:
+            self._lock.release()
+
+
+    def _remove_mailbox(self, mid):
+        """ Remove a mailbox and its address from the post office """
+        try:
+            mid = long(mid)
+        except TypeError:
+            logging.error("Invalid mailbox id: %s" % str(mid))
+            return None
+
+        self._lock.acquire()
+        try:
+            del self._post_office[mid]
+        finally:
+            self._lock.release()
+
     def __repr__(self):
         return str(self._address)
 

Modified: qpid/trunk/qpid/python/qmf2/tests/__init__.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qmf2/tests/__init__.py?rev=906615&r1=906614&r2=906615&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qmf2/tests/__init__.py (original)
+++ qpid/trunk/qpid/python/qmf2/tests/__init__.py Thu Feb  4 19:38:55 2010
@@ -24,3 +24,4 @@
 import basic_method
 import obj_gets
 import events
+import multi_response

Added: qpid/trunk/qpid/python/qmf2/tests/multi_response.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qmf2/tests/multi_response.py?rev=906615&view=auto
==============================================================================
--- qpid/trunk/qpid/python/qmf2/tests/multi_response.py (added)
+++ qpid/trunk/qpid/python/qmf2/tests/multi_response.py Thu Feb  4 19:38:55 2010
@@ -0,0 +1,295 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import unittest
+import logging
+from threading import Thread, Event
+
+import qpid.messaging
+from qmf2.common import (Notifier, SchemaObjectClass, SchemaClassId,
+                         SchemaProperty, qmfTypes, SchemaMethod, QmfQuery,
+                         QmfData) 
+import qmf2.console
+from qmf2.agent import(QmfAgentData, Agent)
+
+# note: objects, schema per agent must each be > max objs
+_SCHEMAS_PER_AGENT=7
+_OBJS_PER_AGENT=19
+_MAX_OBJS_PER_MSG=3
+
+
+class _testNotifier(Notifier):
+    def __init__(self):
+        self._event = Event()
+
+    def indication(self):
+        # note: called by qmf daemon thread
+        self._event.set()
+
+    def wait_for_work(self, timeout):
+        # note: called by application thread to wait
+        # for qmf to generate work
+        self._event.wait(timeout)
+        timed_out = self._event.isSet() == False
+        if not timed_out:
+            self._event.clear()
+            return True
+        return False
+
+
+class _agentApp(Thread):
+    def __init__(self, name, broker_url, heartbeat):
+        Thread.__init__(self)
+        self.schema_count = _SCHEMAS_PER_AGENT
+        self.obj_count = _OBJS_PER_AGENT
+        self.notifier = _testNotifier()
+        self.broker_url = broker_url
+        self.agent = Agent(name,
+                           _notifier=self.notifier,
+                           _heartbeat_interval=heartbeat,
+                           _max_msg_size=_MAX_OBJS_PER_MSG)
+
+        # Dynamically construct a management database
+        for i in range(self.schema_count):
+            _schema = SchemaObjectClass( _classId=SchemaClassId("MyPackage",
+                                                                "MyClass-" + str(i)),
+                                         _desc="A test data schema",
+                                         _object_id_names=["index1", "index2"] )
+            # add properties
+            _schema.add_property( "index1", SchemaProperty(qmfTypes.TYPE_UINT8))
+            _schema.add_property( "index2", SchemaProperty(qmfTypes.TYPE_LSTR))
+
+            # these two properties are statistics
+            _schema.add_property( "query_count", SchemaProperty(qmfTypes.TYPE_UINT32))
+            _schema.add_property( "method_call_count", SchemaProperty(qmfTypes.TYPE_UINT32))
+
+            # These two properties can be set via the method call
+            _schema.add_property( "set_string", SchemaProperty(qmfTypes.TYPE_LSTR))
+            _schema.add_property( "set_int", SchemaProperty(qmfTypes.TYPE_UINT32))
+
+            # add method
+            _meth = SchemaMethod( _desc="Method to set string and int in object." )
+            _meth.add_argument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) )
+            _meth.add_argument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) )
+            _schema.add_method( "set_meth", _meth )
+
+            # Add schema to Agent
+
+            self.agent.register_object_class(_schema)
+
+            # instantiate managed data objects matching the schema
+
+            for j in range(self.obj_count):
+
+                self.agent.add_object( QmfAgentData( self.agent, _schema=_schema,
+                                                     _values={"index1":j,
+                                                              "index2": "name-" + str(j),
+                                                              "set_string": "UNSET",
+                                                              "set_int": 0,
+                                                              "query_count": 0,
+                                                              "method_call_count": 0} ))
+
+        self.running = False
+        self.ready = Event()
+
+    def start_app(self):
+        self.running = True
+        self.start()
+        self.ready.wait(10)
+        if not self.ready.is_set():
+            raise Exception("Agent failed to connect to broker.")
+
+    def stop_app(self):
+        self.running = False
+        # wake main thread
+        self.notifier.indication() # hmmm... collide with daemon???
+        self.join(10)
+        if self.isAlive():
+            raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!")
+
+    def run(self):
+        # broker_url = "user/passwd@hostname:port"
+        self.conn = qpid.messaging.Connection(self.broker_url.host,
+                                              self.broker_url.port,
+                                              self.broker_url.user,
+                                              self.broker_url.password)
+        self.conn.connect()
+        self.agent.set_connection(self.conn)
+        self.ready.set()
+
+        while self.running:
+            self.notifier.wait_for_work(None)
+            wi = self.agent.get_next_workitem(timeout=0)
+            while wi is not None:
+                logging.error("UNEXPECTED AGENT WORKITEM RECEIVED=%s" % wi.get_type())
+                self.agent.release_workitem(wi)
+                wi = self.agent.get_next_workitem(timeout=0)
+
+        if self.conn:
+            self.agent.remove_connection(10)
+        self.agent.destroy(10)
+
+
+
+
+class BaseTest(unittest.TestCase):
+    def configure(self, config):
+        self.agent_count = 2
+        self.config = config
+        self.broker = config.broker
+        self.defines = self.config.defines
+
+    def setUp(self):
+        # one second agent indication interval
+        self.agent_heartbeat = 1
+        self.agents = []
+        for a in range(self.agent_count):
+            agent = _agentApp("agent-" + str(a), 
+                              self.broker, 
+                              self.agent_heartbeat)
+            agent.start_app()
+            self.agents.append(agent)
+
+    def tearDown(self):
+        for agent in self.agents:
+            if agent is not None:
+                agent.stop_app()
+
+    def test_all_schema_id(self):
+        # create console
+        # find agents
+        # synchronous query for all schemas_ids
+        self.notifier = _testNotifier()
+        self.console = qmf2.console.Console(notifier=self.notifier,
+                                              agent_timeout=3)
+        self.conn = qpid.messaging.Connection(self.broker.host,
+                                              self.broker.port,
+                                              self.broker.user,
+                                              self.broker.password)
+        self.conn.connect()
+        self.console.add_connection(self.conn)
+
+        for agent_app in self.agents:
+            agent = self.console.find_agent(agent_app.agent.get_name(), timeout=3)
+            self.assertTrue(agent and agent.get_name() == agent_app.agent.get_name())
+
+            # get a list of all schema_ids
+            query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA_ID)
+            sid_list = self.console.do_query(agent, query)
+            self.assertTrue(sid_list and len(sid_list) == _SCHEMAS_PER_AGENT)
+            for sid in sid_list:
+                self.assertTrue(isinstance(sid, SchemaClassId))
+                self.assertTrue(sid.get_package_name() == "MyPackage")
+                self.assertTrue(sid.get_class_name().split('-')[0] == "MyClass")
+
+        self.console.destroy(10)
+
+
+    def test_all_schema(self):
+        # create console
+        # find agents
+        # synchronous query for all schemas
+        self.notifier = _testNotifier()
+        self.console = qmf2.console.Console(notifier=self.notifier,
+                                              agent_timeout=3)
+        self.conn = qpid.messaging.Connection(self.broker.host,
+                                              self.broker.port,
+                                              self.broker.user,
+                                              self.broker.password)
+        self.conn.connect()
+        self.console.add_connection(self.conn)
+
+        for agent_app in self.agents:
+            agent = self.console.find_agent(agent_app.agent.get_name(), timeout=3)
+            self.assertTrue(agent and agent.get_name() == agent_app.agent.get_name())
+
+            # get a list of all schema_ids
+            query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA)
+            schema_list = self.console.do_query(agent, query)
+            self.assertTrue(schema_list and 
+                            len(schema_list) == _SCHEMAS_PER_AGENT) 
+            for schema in schema_list:
+                self.assertTrue(isinstance(schema, SchemaObjectClass))
+
+        self.console.destroy(10)
+
+
+    def test_all_object_id(self):
+        # create console
+        # find agents
+        # synchronous query for all object_ids by schema_id
+        self.notifier = _testNotifier()
+        self.console = qmf2.console.Console(notifier=self.notifier,
+                                              agent_timeout=3)
+        self.conn = qpid.messaging.Connection(self.broker.host,
+                                              self.broker.port,
+                                              self.broker.user,
+                                              self.broker.password)
+        self.conn.connect()
+        self.console.add_connection(self.conn)
+
+        for agent_app in self.agents:
+            agent = self.console.find_agent(agent_app.agent.get_name(), timeout=3)
+            self.assertTrue(agent and agent.get_name() == agent_app.agent.get_name())
+
+            # get a list of all schema_ids
+            query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA_ID)
+            sid_list = self.console.do_query(agent, query)
+            self.assertTrue(sid_list and len(sid_list) == _SCHEMAS_PER_AGENT)
+            for sid in sid_list:
+                query = QmfQuery.create_wildcard_object_id(sid)
+                oid_list = self.console.do_query(agent, query)
+                self.assertTrue(oid_list and
+                                len(oid_list) == _OBJS_PER_AGENT) 
+                for oid in oid_list:
+                    self.assertTrue(isinstance(oid, basestring))
+
+        self.console.destroy(10)
+
+
+    def test_all_objects(self):
+        # create console
+        # find agents
+        # synchronous query for all objects by schema_id
+        self.notifier = _testNotifier()
+        self.console = qmf2.console.Console(notifier=self.notifier,
+                                              agent_timeout=3)
+        self.conn = qpid.messaging.Connection(self.broker.host,
+                                              self.broker.port,
+                                              self.broker.user,
+                                              self.broker.password)
+        self.conn.connect()
+        self.console.add_connection(self.conn)
+
+        for agent_app in self.agents:
+            agent = self.console.find_agent(agent_app.agent.get_name(), timeout=3)
+            self.assertTrue(agent and agent.get_name() == agent_app.agent.get_name())
+
+            # get a list of all schema_ids
+            query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA_ID)
+            sid_list = self.console.do_query(agent, query)
+            self.assertTrue(sid_list and len(sid_list) == _SCHEMAS_PER_AGENT)
+            for sid in sid_list:
+                query = QmfQuery.create_wildcard_object(sid)
+                obj_list = self.console.do_query(agent, query)
+                self.assertTrue(obj_list and
+                                len(obj_list) == _OBJS_PER_AGENT)
+                for obj in obj_list:
+                    self.assertTrue(isinstance(obj,
+                                               qmf2.console.QmfConsoleData))
+
+        self.console.destroy(10)



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org