You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2010/02/12 21:15:25 UTC

svn commit: r909591 - in /qpid/trunk/qpid/python/qmf2: common.py console.py tests/__init__.py tests/async_query.py

Author: kgiusti
Date: Fri Feb 12 20:15:21 2010
New Revision: 909591

URL: http://svn.apache.org/viewvc?rev=909591&view=rev
Log:
QPID-2261: add async query and schema prefetch

Added:
    qpid/trunk/qpid/python/qmf2/tests/async_query.py
Modified:
    qpid/trunk/qpid/python/qmf2/common.py
    qpid/trunk/qpid/python/qmf2/console.py
    qpid/trunk/qpid/python/qmf2/tests/__init__.py

Modified: qpid/trunk/qpid/python/qmf2/common.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qmf2/common.py?rev=909591&r1=909590&r2=909591&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qmf2/common.py (original)
+++ qpid/trunk/qpid/python/qmf2/common.py Fri Feb 12 20:15:21 2010
@@ -131,6 +131,7 @@
     OBJECT_UPDATE=5
     EVENT_RECEIVED=7
     AGENT_HEARTBEAT=8
+    QUERY_COMPLETE=9
     # Enumeration of the types of WorkItems produced on the Agent
     METHOD_CALL=1000
     QUERY=1001

Modified: qpid/trunk/qpid/python/qmf2/console.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qmf2/console.py?rev=909591&r1=909590&r2=909591&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qmf2/console.py (original)
+++ qpid/trunk/qpid/python/qmf2/console.py Fri Feb 12 20:15:21 2010
@@ -56,21 +56,47 @@
     """
     Virtual base class for all Mailbox-like objects.
     """
+    def __init__(self, console):
+        self.console = console
+        self.cid = 0
+        self.console._add_mailbox(self)
+
+    def get_address(self):
+        return self.cid
+
     def deliver(self, data):
+        """
+        Invoked by Console Management thread when a message arrives for
+        this mailbox.
+        """
         raise Exception("_Mailbox deliver() method must be provided")
 
+    def destroy(self):
+        """
+        Release the mailbox.  Once called, the mailbox should no longer be
+        referenced. 
+        """
+        self.console._remove_mailbox(self.cid)
+
 
-class _WaitableMailbox(_Mailbox):
+class _SyncMailbox(_Mailbox):
     """
     A simple mailbox that allows a consumer to wait for delivery of data.
     """
-    def __init__(self):
-        self._data = []
+    def __init__(self, console):
+        """
+        Invoked by application thread.
+        """
+        super(_SyncMailbox, self).__init__(console)
         self._cv = Condition()
+        self._data = []
         self._waiting = False
 
     def deliver(self, data):
-        """ Drop data into the mailbox, waking any waiters if necessary. """
+        """
+        Drop data into the mailbox, waking any waiters if necessary.
+        Invoked by Console Management thread only.
+        """
         self._cv.acquire()
         try:
             self._data.append(data)
@@ -81,7 +107,10 @@
             self._cv.release()
 
     def fetch(self, timeout=None):
-        """ Get one data item from a mailbox, with timeout. """
+        """
+        Get one data item from a mailbox, with timeout.
+        Invoked by application thread.
+        """
         self._cv.acquire()
         try:
             if len(self._data) == 0:
@@ -93,6 +122,189 @@
             self._cv.release()
 
 
+class _AsyncMailbox(_Mailbox):
+    """
+    A Mailbox for asynchronous delivery, with a timeout value.
+    """
+    def __init__(self, console, 
+                 agent_name,
+                 _timeout=None):
+        """
+        Invoked by application thread.
+        """
+        super(_AsyncMailbox, self).__init__(console)
+
+        self.agent_name = agent_name
+        self.console = console
+
+        if _timeout is None:
+            _timeout = console._reply_timeout
+        self.expiration_date = (datetime.datetime.utcnow() +
+                                datetime.timedelta(seconds=_timeout))
+        console._lock.acquire()
+        try:
+            console._async_mboxes[self.cid] = self
+        finally:
+            console._lock.release()
+
+        # now that an async mbox has been created, wake the
+        # console mgmt thread so it will know about the mbox expiration
+        # date (and adjust its idle sleep period correctly)
+
+        console._wake_thread()
+
+    def deliver(self, msg):
+        """
+        """
+        raise Exception("deliver() method must be provided")
+
+    def expire(self):
+        raise Exception("expire() method must be provided")
+
+
+    def destroy(self):
+        self.console._lock.acquire()
+        try:
+            if self.cid in self.console._async_mboxes:
+                del self.console._async_mboxes[self.cid]
+        finally:
+            self.console._lock.release()
+        super(_AsyncMailbox, self).destroy()
+
+
+
+class _QueryMailbox(_AsyncMailbox):
+    """
+    A mailbox used for asynchronous query requests.
+    """
+    def __init__(self, console, 
+                 agent_name,
+                 context,
+                 target, msgkey,
+                 _timeout=None):
+        """
+        Invoked by application thread.
+        """
+        super(_QueryMailbox, self).__init__(console,
+                                            agent_name,
+                                            _timeout)
+        self.target = target
+        self.msgkey = msgkey
+        self.context = context
+        self.result = []
+
+    def deliver(self, reply):
+        """
+        Process query response messages delivered to this mailbox.
+        Invoked by Console Management thread only.
+        """
+        done = False
+        objects = reply.content.get(self.msgkey)
+        if not objects:
+            done = True
+        else:
+            # convert from map to native types if needed
+            if self.target == QmfQuery.TARGET_SCHEMA_ID:
+                for sid_map in objects:
+                    self.result.append(SchemaClassId.from_map(sid_map))
+
+            elif self.target == QmfQuery.TARGET_SCHEMA:
+                for schema_map in objects:
+                    # extract schema id, convert based on schema type
+                    sid_map = schema_map.get(SchemaClass.KEY_SCHEMA_ID)
+                    if sid_map:
+                        sid = SchemaClassId.from_map(sid_map)
+                        if sid:
+                            if sid.get_type() == SchemaClassId.TYPE_DATA:
+                                schema = SchemaObjectClass.from_map(schema_map)
+                            else:
+                                schema = SchemaEventClass.from_map(schema_map)
+                            self.console._add_schema(schema)  # add to schema cache
+                            self.result.append(schema)
+
+            elif self.target == QmfQuery.TARGET_OBJECT:
+                for obj_map in objects:
+                    # @todo: need the agent name - ideally from the
+                    # reply message iself.
+                    agent = self.console.get_agent(self.agent_name)
+                    if agent:
+                        obj = QmfConsoleData(map_=obj_map, agent=agent)
+                        # start fetch of schema if not known
+                        sid = obj.get_schema_class_id()
+                        if sid:
+                            self.console._prefetch_schema(sid, agent)
+                        self.result.append(obj)
+
+
+            else:
+                # no conversion needed.
+                self.result += objects
+
+        if done:
+            # create workitem
+            # logging.error("QUERY COMPLETE for %s" % str(self.context))
+            wi = WorkItem(WorkItem.QUERY_COMPLETE, self.context, self.result)
+            self.console._work_q.put(wi)
+            self.console._work_q_put = True
+
+            self.destroy()
+
+
+    def expire(self):
+        logging.debug("ASYNC MAILBOX EXPIRED @ %s!!!" %
+                      datetime.datetime.utcnow())
+        # send along whatever (possibly none) has been received so far
+        wi = WorkItem(WorkItem.QUERY_COMPLETE, self.context, self.result)
+        self.console._work_q.put(wi)
+        self.console._work_q_put = True
+
+        self.destroy()
+
+
+
+class _SchemaPrefetchMailbox(_AsyncMailbox):
+    """
+    Handles responses to schema fetches made by the console.
+    """
+    def __init__(self, console,
+                 agent_name,
+                 schema_id,
+                 _timeout=None):
+        """
+        Invoked by application thread.
+        """
+        super(_SchemaPrefetchMailbox, self).__init__(console,
+                                                     agent_name,
+                                                     _timeout)
+
+        self.schema_id = schema_id
+
+
+    def deliver(self, reply):
+        """
+        Process schema response messages.
+        """
+        done = False
+        schemas = reply.content.get(MsgKey.schema)
+        if schemas:
+            for schema_map in schemas:
+                # extract schema id, convert based on schema type
+                sid_map = schema_map.get(SchemaClass.KEY_SCHEMA_ID)
+                if sid_map:
+                    sid = SchemaClassId.from_map(sid_map)
+                    if sid:
+                        if sid.get_type() == SchemaClassId.TYPE_DATA:
+                            schema = SchemaObjectClass.from_map(schema_map)
+                        else:
+                            schema = SchemaEventClass.from_map(schema_map)
+                        self.console._add_schema(schema)  # add to schema cache
+        self.destroy()
+
+
+    def expire(self):
+        self.destroy()
+
+
 
 ##==============================================================================
 ## DATA MODEL
@@ -185,8 +397,8 @@
         if _timeout is None:
             _timeout = self._agent._console._reply_timeout
 
-        mbox = _WaitableMailbox()
-        cid = self._agent._console._add_mailbox(mbox)
+        mbox = _SyncMailbox(self._agent._console)
+        cid = mbox.get_address()
 
         _map = {self.KEY_OBJECT_ID:str(oid),
                 SchemaMethod.KEY_NAME:name}
@@ -202,7 +414,7 @@
             self._agent._send_method_req(_map, cid)
         except SendError, e:
             logging.error(str(e))
-            self._agent._console._remove_mailbox(cid)
+            mbox.destroy()
             return None
 
         # @todo async method calls!!!
@@ -211,7 +423,7 @@
 
         logging.debug("Waiting for response to method req (%s)" % _timeout)
         replyMsg = mbox.fetch(_timeout)
-        self._agent._console._remove_mailbox(cid)
+        mbox.destroy()
 
         if not replyMsg:
             logging.debug("Agent method req wait timed-out.")
@@ -258,7 +470,7 @@
     """
     def __init__(self, name, console):
         """
-        @type name: AgentId
+        @type name: string
         @param name: uniquely identifies this agent in the AMQP domain.
         """
 
@@ -358,8 +570,8 @@
         if _in_args:
             _in_args = _in_args.copy()
 
-        mbox = _WaitableMailbox()
-        cid = self._console._add_mailbox(mbox)
+        mbox = _SyncMailbox(self._console)
+        cid = mbox.get_address()
 
         _map = {SchemaMethod.KEY_NAME:name}
         if _in_args:
@@ -370,7 +582,7 @@
             self._send_method_req(_map, cid)
         except SendError, e:
             logging.error(str(e))
-            self._console._remove_mailbox(cid)
+            mbox.destroy()
             return None
 
         # @todo async method calls!!!
@@ -379,7 +591,7 @@
 
         logging.debug("Waiting for response to method req (%s)" % _timeout)
         replyMsg = mbox.fetch(_timeout)
-        self._console._remove_mailbox(cid)
+        mbox.destroy()
 
         if not replyMsg:
             logging.debug("Agent method req wait timed-out.")
@@ -497,19 +709,20 @@
         self._announce_recvr = None
         self._locate_sender = None
         self._schema_cache = {}
+        self._pending_schema_req = []
         self._agent_discovery_filter = None
         self._reply_timeout = reply_timeout
         self._agent_timeout = agent_timeout
         self._next_agent_expire = None
-        # lock out run() thread
-        self._cv = Condition()
+        self._next_mbox_expire = None
         # for passing WorkItems to the application
         self._work_q = Queue.Queue()
         self._work_q_put = False
         # Correlation ID and mailbox storage
         self._correlation_id = long(time.time())  # pseudo-randomize
         self._post_office = {} # indexed by cid
-        
+        self._async_mboxes = {} # indexed by cid, used to expire them
+
         ## Old stuff below???
         #self._broker_list = []
         #self.impl = qmfengine.Console()
@@ -612,15 +825,7 @@
         self._operational = False
         if self.isAlive():
             # kick my thread to wake it up
-            logging.debug("Sending noop to wake up [%s]" % self._address)
-            try:
-                msg = Message(properties={"method":"request",
-                                          "qmf.subject":make_subject(OpCode.noop)},
-                              subject=self._name,
-                              content={"noop":"noop"})
-                self._direct_sender.send( msg, sync=True )
-            except SendError, e:
-                logging.error(str(e))
+            self._wake_thread()
             logging.debug("waiting for console receiver thread to exit")
             self.join(timeout)
             if self.isAlive():
@@ -651,8 +856,8 @@
 
         self._lock.acquire()
         try:
-            if agent._id in self._agent_map:
-                del self._agent_map[agent._id]
+            if agent._name in self._agent_map:
+                del self._agent_map[agent._name]
         finally:
             self._lock.release()
 
@@ -672,8 +877,8 @@
 
         # agent not present yet - ping it with an agent_locate
 
-        mbox = _WaitableMailbox()
-        cid = self._add_mailbox(mbox)
+        mbox = _SyncMailbox(self)
+        cid = mbox.get_address()
 
         query = QmfQuery.create_id(QmfQuery.TARGET_AGENT, name)
         msg = Message(subject="console.ind.locate." + name,
@@ -690,7 +895,7 @@
             self._topic_sender.send(msg)
         except SendError, e:
             logging.error(str(e))
-            self._remove_mailbox(cid)
+            mbox.destroy()
             return None
 
         if timeout is None:
@@ -699,7 +904,7 @@
         new_agent = None
         logging.debug("Waiting for response to Agent Locate (%s)" % timeout)
         mbox.fetch(timeout)
-        self._remove_mailbox(cid)
+        mbox.destroy()
         logging.debug("Agent Locate wait ended (%s)" % time.time())
         self._lock.acquire()
         try:
@@ -749,15 +954,15 @@
         if not msgkey:
             raise Exception("Invalid target for query: %s" % str(query))
 
-        mbox = _WaitableMailbox()
-        cid = self._add_mailbox(mbox)
+        mbox = _SyncMailbox(self)
+        cid = mbox.get_address()
 
         try:
             logging.debug("Sending Query to Agent (%s)" % time.time())
             agent._send_query(query, cid)
         except SendError, e:
             logging.error(str(e))
-            self._remove_mailbox(cid)
+            mbox.destroy()
             return None
 
         if not timeout:
@@ -802,33 +1007,74 @@
             elif target == QmfQuery.TARGET_OBJECT:
                 for obj_map in objects:
                     obj = QmfConsoleData(map_=obj_map, agent=agent)
+                    # start fetch of schema if not known
+                    sid = obj.get_schema_class_id()
+                    if sid:
+                        self._prefetch_schema(sid, agent)
                     response.append(obj)
-                    # @todo prefetch unknown schema
-                    # sid_map = obj_map.get(QmfData.KEY_SCHEMA_ID)
-                    # if sid_map:
-                    #     sid = SchemaClassId.from_map(sid_map)
-                    #     # if the object references a schema, fetch it
-                    #     # schema = self._fetch_schema(sid, _agent=agent,
-                    #     # _timeout=timeout)
-                    #     # if not schema:
-                    #     #   logging.warning("Unknown schema, id=%s" % sid)
-                    #     #   continue
-                    #     obj = QmfConsoleData(map_=obj_map, agent=agent,
-                    #                          _schema=schema)
-                    # else:
-                    #     # no schema needed
             else:
                 # no conversion needed.
                 response += objects
 
             now = datetime.datetime.utcnow()
 
-        self._remove_mailbox(cid)
+        mbox.destroy()
         return response
 
 
+    def do_async_query(self, agent, query, app_handle, _timeout=None ):
+        """
+        """
+        query_keymap={QmfQuery.TARGET_PACKAGES: MsgKey.package_info,
+                      QmfQuery.TARGET_OBJECT_ID: MsgKey.object_id,
+                      QmfQuery.TARGET_SCHEMA_ID: MsgKey.schema_id,
+                      QmfQuery.TARGET_SCHEMA: MsgKey.schema,
+                      QmfQuery.TARGET_OBJECT: MsgKey.data_obj,
+                      QmfQuery.TARGET_AGENT: MsgKey.agent_info}
+
+        target = query.get_target()
+        msgkey = query_keymap.get(target)
+        if not msgkey:
+            raise Exception("Invalid target for query: %s" % str(query))
+
+        mbox = _QueryMailbox(self,
+                             agent.get_name(),
+                             app_handle,
+                             target, msgkey,
+                             _timeout)
+        cid = mbox.get_address()
+
+        try:
+            logging.debug("Sending Query to Agent (%s)" % time.time())
+            agent._send_query(query, cid)
+        except SendError, e:
+            logging.error(str(e))
+            mbox.destroy()
+            return False
+        return True
+
+
+    def _wake_thread(self):
+        """
+        Make the console management thread loop wakeup from its next_receiver
+        sleep.
+        """
+        logging.debug("Sending noop to wake up [%s]" % self._address)
+        msg = Message(properties={"method":"request",
+                                  "qmf.subject":make_subject(OpCode.noop)},
+                      subject=self._name,
+                      content={"noop":"noop"})
+        try:
+            self._direct_sender.send( msg, sync=True )
+        except SendError, e:
+            logging.error(str(e))
+
 
     def run(self):
+        """
+        Console Management Thread main loop.
+        Handles inbound messages, agent discovery, async mailbox timeouts.
+        """
         global _callback_thread
 
         self._ready.set()
@@ -858,6 +1104,7 @@
                 self._dispatch(msg, _direct=True)
 
             self._expire_agents()   # check for expired agents
+            self._expire_mboxes()   # check for expired async mailbox requests
 
             #if qLen == 0 and self._work_q.qsize() and self._notifier:
             if self._work_q_put and self._notifier:
@@ -869,11 +1116,15 @@
                 _callback_thread = None
 
             if self._operational:
-                # wait for a message to arrive or an agent
-                # to expire
+                # wait for a message to arrive, or an agent
+                # to expire, or a mailbox requrest to time out
                 now = datetime.datetime.utcnow()
-                if self._next_agent_expire > now:
-                    timeout = timedelta_to_secs(self._next_agent_expire - now)
+                next_expire = self._next_agent_expire
+                if (self._next_mbox_expire and
+                    self._next_mbox_expire < next_expire):
+                    next_expire = self._next_mbox_expire
+                if next_expire > now:
+                    timeout = timedelta_to_secs(next_expire - now)
                     try:
                         logging.debug("waiting for next rcvr (timeout=%s)..." % timeout)
                         xxx = self._session.next_receiver(timeout = timeout)
@@ -1089,7 +1340,8 @@
             return
 
         # wake up all waiters
-        logging.debug("waking waiters for correlation id %s" % msg.correlation_id)
+        logging.debug("waking waiters for correlation id %s" %
+                      msg.correlation_id)
         mbox.deliver(msg)
 
 
@@ -1151,6 +1403,36 @@
         self._work_q_put = True
 
 
+    def _expire_mboxes(self):
+        """
+        Check all async mailboxes for outstanding requests that have expired.
+        """
+        now = datetime.datetime.utcnow()
+        if self._next_mbox_expire and now < self._next_mbox_expire:
+            return
+        expired_mboxes = []
+        self._next_mbox_expire = None
+        self._lock.acquire()
+        try:
+            for mbox in self._async_mboxes.itervalues():
+                if now >= mbox.expiration_date:
+                    expired_mboxes.append(mbox)
+                else:
+                    if (self._next_mbox_expire is None or
+                        mbox.expiration_date < self._next_mbox_expire):
+                        self._next_mbox_expire = mbox.expiration_date
+
+            for mbox in expired_mboxes:
+                del self._async_mboxes[mbox.cid]
+        finally:
+            self._lock.release()
+
+        for mbox in expired_mboxes:
+            # note: expire() may deallocate the mbox, so don't touch
+            # it further.
+            mbox.expire()
+
+
     def _expire_agents(self):
         """
         Check for expired agents and issue notifications when they expire.
@@ -1288,9 +1570,43 @@
             sid = schema.get_class_id()
             if not self._schema_cache.has_key(sid):
                 self._schema_cache[sid] = schema
+                if sid in self._pending_schema_req:
+                    self._pending_schema_req.remove(sid)
+        finally:
+            self._lock.release()
+
+    def _prefetch_schema(self, schema_id, agent):
+        """
+        Send an async request for the schema identified by schema_id if the
+        schema is not available in the cache.
+        """
+        need_fetch = False
+        self._lock.acquire()
+        try:
+            if ((not self._schema_cache.has_key(schema_id)) and
+                schema_id not in self._pending_schema_req):
+                self._pending_schema_req.append(schema_id)
+                need_fetch = True
         finally:
             self._lock.release()
 
+        if need_fetch:
+            mbox = _SchemaPrefetchMailbox(self, agent.get_name(),
+                                          schema_id)
+            query = QmfQuery.create_id(QmfQuery.TARGET_SCHEMA, schema_id)
+            logging.debug("Sending Schema Query to Agent (%s)" % time.time())
+            try:
+                agent._send_query(query, mbox.get_address())
+            except SendError, e:
+                logging.error(str(e))
+                mbox.destroy()
+                self._lock.acquire()
+                try:
+                    self._pending_schema_req.remove(schema_id)
+                finally:
+                    self._lock.release()
+
+
     def _fetch_schema(self, schema_id, _agent=None, _timeout=None):
         """
         Find the schema identified by schema_id.  If not in the cache, ask the
@@ -1320,16 +1636,16 @@
             return None
 
     def _add_mailbox(self, mbox):
-        """ Add a mailbox to the post office, return a unique identifier """
-        cid = 0
+        """ 
+        Add a mailbox to the post office, and assign it a unique address.
+        """
         self._lock.acquire()
         try:
-            cid = self._correlation_id
+            mbox.cid = self._correlation_id
             self._correlation_id += 1
-            self._post_office[cid] = mbox
+            self._post_office[mbox.cid] = mbox
         finally:
             self._lock.release()
-        return cid
 
     def _get_mailbox(self, mid):
         try:
@@ -1355,7 +1671,8 @@
 
         self._lock.acquire()
         try:
-            del self._post_office[mid]
+            if mid in self._post_office:
+                del self._post_office[mid]
         finally:
             self._lock.release()
 

Modified: qpid/trunk/qpid/python/qmf2/tests/__init__.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qmf2/tests/__init__.py?rev=909591&r1=909590&r2=909591&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qmf2/tests/__init__.py (original)
+++ qpid/trunk/qpid/python/qmf2/tests/__init__.py Fri Feb 12 20:15:21 2010
@@ -25,3 +25,4 @@
 import obj_gets
 import events
 import multi_response
+import async_query

Added: qpid/trunk/qpid/python/qmf2/tests/async_query.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qmf2/tests/async_query.py?rev=909591&view=auto
==============================================================================
--- qpid/trunk/qpid/python/qmf2/tests/async_query.py (added)
+++ qpid/trunk/qpid/python/qmf2/tests/async_query.py Fri Feb 12 20:15:21 2010
@@ -0,0 +1,460 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import unittest
+import logging
+from threading import Thread, Event
+
+import qpid.messaging
+from qmf2.common import (Notifier, SchemaObjectClass, SchemaClassId,
+                           SchemaProperty, qmfTypes, SchemaMethod, QmfQuery,
+                           QmfData, WorkItem)
+import qmf2.console
+from qmf2.agent import(QmfAgentData, Agent)
+
+
+class _testNotifier(Notifier):
+    def __init__(self):
+        self._event = Event()
+
+    def indication(self):
+        # note: called by qmf daemon thread
+        self._event.set()
+
+    def wait_for_work(self, timeout):
+        # note: called by application thread to wait
+        # for qmf to generate work
+        self._event.wait(timeout)
+        timed_out = self._event.isSet() == False
+        if not timed_out:
+            self._event.clear()
+            return True
+        return False
+
+
+class _agentApp(Thread):
+    def __init__(self, name, broker_url, heartbeat):
+        Thread.__init__(self)
+        self.notifier = _testNotifier()
+        self.broker_url = broker_url
+        self.agent = Agent(name,
+                           _notifier=self.notifier,
+                           _heartbeat_interval=heartbeat)
+
+        # Dynamically construct a management database
+
+        _schema = SchemaObjectClass( _classId=SchemaClassId("MyPackage", "MyClass"),
+                                     _desc="A test data schema",
+                                     _object_id_names=["index1", "index2"] )
+        # add properties
+        _schema.add_property( "index1", SchemaProperty(qmfTypes.TYPE_UINT8))
+        _schema.add_property( "index2", SchemaProperty(qmfTypes.TYPE_LSTR))
+
+        # these two properties are statistics
+        _schema.add_property( "query_count", SchemaProperty(qmfTypes.TYPE_UINT32))
+        _schema.add_property( "method_call_count", SchemaProperty(qmfTypes.TYPE_UINT32))
+
+        # These two properties can be set via the method call
+        _schema.add_property( "set_string", SchemaProperty(qmfTypes.TYPE_LSTR))
+        _schema.add_property( "set_int", SchemaProperty(qmfTypes.TYPE_UINT32))
+
+        # add method
+        _meth = SchemaMethod( _desc="Method to set string and int in object." )
+        _meth.add_argument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) )
+        _meth.add_argument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) )
+        _schema.add_method( "set_meth", _meth )
+
+        # Add schema to Agent
+
+        self.agent.register_object_class(_schema)
+
+        # instantiate managed data objects matching the schema
+
+        _obj1 = QmfAgentData( self.agent, _schema=_schema,
+                              _values={"index1":100, "index2":"a name"})
+        _obj1.set_value("set_string", "UNSET")
+        _obj1.set_value("set_int", 0)
+        _obj1.set_value("query_count", 0)
+        _obj1.set_value("method_call_count", 0)
+        self.agent.add_object( _obj1 )
+
+        self.agent.add_object( QmfAgentData( self.agent, _schema=_schema,
+                                             _values={"index1":99,
+                                                      "index2": "another name",
+                                                      "set_string": "UNSET",
+                                                      "set_int": 0,
+                                                      "query_count": 0,
+                                                      "method_call_count": 0} ))
+
+        self.agent.add_object( QmfAgentData( self.agent, _schema=_schema,
+                                             _values={"index1":50,
+                                                      "index2": "my name",
+                                                      "set_string": "SET",
+                                                      "set_int": 0,
+                                                      "query_count": 0,
+                                                      "method_call_count": 0} ))
+
+
+        # add an "unstructured" object to the Agent
+        _obj2 = QmfAgentData(self.agent, _object_id="01545")
+        _obj2.set_value("field1", "a value")
+        _obj2.set_value("field2", 2)
+        _obj2.set_value("field3", {"a":1, "map":2, "value":3})
+        _obj2.set_value("field4", ["a", "list", "value"])
+        _obj2.set_value("index1", 50)
+        self.agent.add_object(_obj2)
+
+        _obj2 = QmfAgentData(self.agent, _object_id="01546")
+        _obj2.set_value("field1", "a value")
+        _obj2.set_value("field2", 3)
+        _obj2.set_value("field3", {"a":1, "map":2, "value":3})
+        _obj2.set_value("field4", ["a", "list", "value"])
+        _obj2.set_value("index1", 51)
+        self.agent.add_object(_obj2)
+
+        _obj2 = QmfAgentData(self.agent, _object_id="01544")
+        _obj2.set_value("field1", "a value")
+        _obj2.set_value("field2", 4)
+        _obj2.set_value("field3", {"a":1, "map":2, "value":3})
+        _obj2.set_value("field4", ["a", "list", "value"])
+        _obj2.set_value("index1", 49)
+        self.agent.add_object(_obj2)
+
+        _obj2 = QmfAgentData(self.agent, _object_id="01543")
+        _obj2.set_value("field1", "a value")
+        _obj2.set_value("field2", 4)
+        _obj2.set_value("field3", {"a":1, "map":2, "value":3})
+        _obj2.set_value("field4", ["a", "list", "value"])
+        _obj2.set_value("index1", 48)
+        self.agent.add_object(_obj2)
+
+        self.running = False
+        self.ready = Event()
+
+    def start_app(self):
+        self.running = True
+        self.start()
+        self.ready.wait(10)
+        if not self.ready.is_set():
+            raise Exception("Agent failed to connect to broker.")
+
+    def stop_app(self):
+        self.running = False
+        # wake main thread
+        self.notifier.indication() # hmmm... collide with daemon???
+        self.join(10)
+        if self.isAlive():
+            raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!")
+
+    def run(self):
+        # broker_url = "user/passwd@hostname:port"
+        self.conn = qpid.messaging.Connection(self.broker_url.host,
+                                              self.broker_url.port,
+                                              self.broker_url.user,
+                                              self.broker_url.password)
+        self.conn.connect()
+        self.agent.set_connection(self.conn)
+        self.ready.set()
+
+        while self.running:
+            self.notifier.wait_for_work(None)
+            wi = self.agent.get_next_workitem(timeout=0)
+            while wi is not None:
+                logging.error("UNEXPECTED AGENT WORKITEM RECEIVED=%s" % wi.get_type())
+                self.agent.release_workitem(wi)
+                wi = self.agent.get_next_workitem(timeout=0)
+
+        if self.conn:
+            self.agent.remove_connection(10)
+        self.agent.destroy(10)
+
+
+
+
+class BaseTest(unittest.TestCase):
+    def configure(self, config):
+        self.config = config
+        self.broker = config.broker
+        self.defines = self.config.defines
+
+    def setUp(self):
+        # one second agent indication interval
+        self.agent_heartbeat = 1
+        self.agent1 = _agentApp("agent1", self.broker, self.agent_heartbeat)
+        self.agent1.start_app()
+        self.agent2 = _agentApp("agent2", self.broker, self.agent_heartbeat)
+        self.agent2.start_app()
+
+    def tearDown(self):
+        if self.agent1:
+            self.agent1.stop_app()
+            self.agent1 = None
+        if self.agent2:
+            self.agent2.stop_app()
+            self.agent2 = None
+
+    def test_all_schema_ids(self):
+        # create console
+        # find agents
+        # asynchronous query for all schema ids
+        self.notifier = _testNotifier()
+        self.console = qmf2.console.Console(notifier=self.notifier,
+                                              agent_timeout=3)
+        self.conn = qpid.messaging.Connection(self.broker.host,
+                                              self.broker.port,
+                                              self.broker.user,
+                                              self.broker.password)
+        self.conn.connect()
+        self.console.add_connection(self.conn)
+
+        for aname in ["agent1", "agent2"]:
+            agent = self.console.find_agent(aname, timeout=3)
+            self.assertTrue(agent and agent.get_name() == aname)
+
+            # send queries
+            query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA_ID)
+            rc = self.console.do_async_query(agent, query, aname)
+            self.assertTrue(rc)
+
+        # done.  Now wait for async responses
+
+        count = 0
+        while self.notifier.wait_for_work(3):
+            wi = self.console.get_next_workitem(timeout=0)
+            while wi is not None:
+                count += 1
+                self.assertTrue(wi.get_type() == WorkItem.QUERY_COMPLETE)
+                self.assertTrue(wi.get_handle() == "agent1" or
+                                wi.get_handle() == "agent2")
+                reply = wi.get_params()
+                self.assertTrue(len(reply) == 1)
+                self.assertTrue(isinstance(reply[0], SchemaClassId))
+                self.assertTrue(reply[0].get_package_name() == "MyPackage")
+                self.assertTrue(reply[0].get_class_name() == "MyClass")
+                self.console.release_workitem(wi)
+                wi = self.console.get_next_workitem(timeout=0)
+
+        self.assertTrue(count == 2)
+        self.console.destroy(10)
+
+
+
+    def test_undescribed_objs(self):
+        # create console
+        # find agents
+        # asynchronous query for all non-schema objects
+        self.notifier = _testNotifier()
+        self.console = qmf2.console.Console(notifier=self.notifier,
+                                              agent_timeout=3)
+        self.conn = qpid.messaging.Connection(self.broker.host,
+                                              self.broker.port,
+                                              self.broker.user,
+                                              self.broker.password)
+        self.conn.connect()
+        self.console.add_connection(self.conn)
+
+        for aname in ["agent1", "agent2"]:
+            agent = self.console.find_agent(aname, timeout=3)
+            self.assertTrue(agent and agent.get_name() == aname)
+
+            # send queries
+            query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT)
+            rc = self.console.do_async_query(agent, query, aname)
+            self.assertTrue(rc)
+
+        # done.  Now wait for async responses
+
+        count = 0
+        while self.notifier.wait_for_work(3):
+            wi = self.console.get_next_workitem(timeout=0)
+            while wi is not None:
+                count += 1
+                self.assertTrue(wi.get_type() == WorkItem.QUERY_COMPLETE)
+                self.assertTrue(wi.get_handle() == "agent1" or
+                                wi.get_handle() == "agent2")
+                reply = wi.get_params()
+                self.assertTrue(len(reply) == 4)
+                self.assertTrue(isinstance(reply[0], qmf2.console.QmfConsoleData))
+                self.assertFalse(reply[0].is_described()) # no schema
+                self.console.release_workitem(wi)
+                wi = self.console.get_next_workitem(timeout=0)
+
+        self.assertTrue(count == 2)
+        self.console.destroy(10)
+
+
+
+    def test_described_objs(self):
+        # create console
+        # find agents
+        # asynchronous query for all schema-based objects
+        self.notifier = _testNotifier()
+        self.console = qmf2.console.Console(notifier=self.notifier,
+                                              agent_timeout=3)
+        self.conn = qpid.messaging.Connection(self.broker.host,
+                                              self.broker.port,
+                                              self.broker.user,
+                                              self.broker.password)
+        self.conn.connect()
+        self.console.add_connection(self.conn)
+
+        for aname in ["agent1", "agent2"]:
+            agent = self.console.find_agent(aname, timeout=3)
+            self.assertTrue(agent and agent.get_name() == aname)
+
+            #
+            t_params = {QmfData.KEY_SCHEMA_ID: SchemaClassId("MyPackage", "MyClass")}
+            query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT, t_params)
+            #
+            rc = self.console.do_async_query(agent, query, aname)
+            self.assertTrue(rc)
+
+        # done.  Now wait for async responses
+
+        count = 0
+        while self.notifier.wait_for_work(3):
+            wi = self.console.get_next_workitem(timeout=0)
+            while wi is not None:
+                count += 1
+                self.assertTrue(wi.get_type() == WorkItem.QUERY_COMPLETE)
+                self.assertTrue(wi.get_handle() == "agent1" or
+                                wi.get_handle() == "agent2")
+                reply = wi.get_params()
+                self.assertTrue(len(reply) == 3)
+                self.assertTrue(isinstance(reply[0], qmf2.console.QmfConsoleData))
+                self.assertTrue(reply[0].is_described()) # has schema
+                self.console.release_workitem(wi)
+                wi = self.console.get_next_workitem(timeout=0)
+
+        self.assertTrue(count == 2)
+        # @todo test if the console has learned the corresponding schemas....
+        self.console.destroy(10)
+
+
+
+    def test_all_schemas(self):
+        # create console
+        # find agents
+        # asynchronous query for all schemas
+        self.notifier = _testNotifier()
+        self.console = qmf2.console.Console(notifier=self.notifier,
+                                              agent_timeout=3)
+        self.conn = qpid.messaging.Connection(self.broker.host,
+                                              self.broker.port,
+                                              self.broker.user,
+                                              self.broker.password)
+        self.conn.connect()
+        self.console.add_connection(self.conn)
+
+        # test internal state using non-api calls:
+        # no schemas present yet
+        self.assertTrue(len(self.console._schema_cache) == 0)
+        # end test
+
+        for aname in ["agent1", "agent2"]:
+            agent = self.console.find_agent(aname, timeout=3)
+            self.assertTrue(agent and agent.get_name() == aname)
+
+            # send queries
+            query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA)
+            rc = self.console.do_async_query(agent, query, aname)
+            self.assertTrue(rc)
+
+        # done.  Now wait for async responses
+
+        count = 0
+        while self.notifier.wait_for_work(3):
+            wi = self.console.get_next_workitem(timeout=0)
+            while wi is not None:
+                count += 1
+                self.assertTrue(wi.get_type() == WorkItem.QUERY_COMPLETE)
+                self.assertTrue(wi.get_handle() == "agent1" or
+                                wi.get_handle() == "agent2")
+                reply = wi.get_params()
+                self.assertTrue(len(reply) == 1)
+                self.assertTrue(isinstance(reply[0], qmf2.common.SchemaObjectClass))
+                self.assertTrue(reply[0].get_class_id().get_package_name() == "MyPackage")
+                self.assertTrue(reply[0].get_class_id().get_class_name() == "MyClass")
+                self.console.release_workitem(wi)
+                wi = self.console.get_next_workitem(timeout=0)
+
+        self.assertTrue(count == 2)
+
+        # test internal state using non-api calls:
+        # schema has been learned
+        self.assertTrue(len(self.console._schema_cache) == 1)
+        # end test
+
+        self.console.destroy(10)
+
+
+
+    def test_query_expiration(self):
+        # create console
+        # find agents
+        # kill the agents
+        # send async query
+        # wait for & verify expiration
+        self.notifier = _testNotifier()
+        self.console = qmf2.console.Console(notifier=self.notifier,
+                                              agent_timeout=30)
+        self.conn = qpid.messaging.Connection(self.broker.host,
+                                              self.broker.port,
+                                              self.broker.user,
+                                              self.broker.password)
+        self.conn.connect()
+        self.console.add_connection(self.conn)
+
+        # find the agents
+        agents = []
+        for aname in ["agent1", "agent2"]:
+            agent = self.console.find_agent(aname, timeout=3)
+            self.assertTrue(agent and agent.get_name() == aname)
+            agents.append(agent)
+
+        # now nuke the agents from orbit.  It's the only way to be sure.
+
+        self.agent1.stop_app()
+        self.agent1 = None
+        self.agent2.stop_app()
+        self.agent2 = None
+
+        # now send queries to agents that no longer exist
+        for agent in agents:
+            query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA)
+            rc = self.console.do_async_query(agent, query, agent.get_name(),
+                                             _timeout=2)
+            self.assertTrue(rc)
+
+        # done.  Now wait for async responses due to timeouts
+
+        count = 0
+        while self.notifier.wait_for_work(3):
+            wi = self.console.get_next_workitem(timeout=0)
+            while wi is not None:
+                count += 1
+                self.assertTrue(wi.get_type() == WorkItem.QUERY_COMPLETE)
+                self.assertTrue(wi.get_handle() == "agent1" or
+                                wi.get_handle() == "agent2")
+                reply = wi.get_params()
+                self.assertTrue(len(reply) == 0)  # empty
+
+                self.console.release_workitem(wi)
+                wi = self.console.get_next_workitem(timeout=0)
+
+        self.assertTrue(count == 2)
+        self.console.destroy(10)



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org