You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2010/01/15 15:29:42 UTC

svn commit: r899644 - in /qpid/branches/qmfv2/qpid/python/qmf: qmfAgent.py qmfCommon.py qmfConsole.py test/agent_test.py test/console_test.py

Author: tross
Date: Fri Jan 15 14:29:41 2010
New Revision: 899644

URL: http://svn.apache.org/viewvc?rev=899644&view=rev
Log:
QPID-2261 - Branch patch from Ken Giusti

Modified:
    qpid/branches/qmfv2/qpid/python/qmf/qmfAgent.py
    qpid/branches/qmfv2/qpid/python/qmf/qmfCommon.py
    qpid/branches/qmfv2/qpid/python/qmf/qmfConsole.py
    qpid/branches/qmfv2/qpid/python/qmf/test/agent_test.py
    qpid/branches/qmfv2/qpid/python/qmf/test/console_test.py

Modified: qpid/branches/qmfv2/qpid/python/qmf/qmfAgent.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmfv2/qpid/python/qmf/qmfAgent.py?rev=899644&r1=899643&r2=899644&view=diff
==============================================================================
--- qpid/branches/qmfv2/qpid/python/qmf/qmfAgent.py (original)
+++ qpid/branches/qmfv2/qpid/python/qmf/qmfAgent.py Fri Jan 15 14:29:41 2010
@@ -21,13 +21,56 @@
 import logging
 import datetime
 import time
-from threading import Thread, Lock
+import Queue
+from threading import Thread, Lock, currentThread
 from qpid.messaging import Connection, Message, Empty, SendError
 from uuid import uuid4
 from qmfCommon import (AMQP_QMF_AGENT_LOCATE, AMQP_QMF_AGENT_INDICATION,
                        makeSubject, parseSubject, OpCode, QmfQuery,
                        SchemaObjectClass, MsgKey, QmfData, QmfAddress,
-                       SchemaClass) 
+                       SchemaClass, SchemaClassId, WorkItem, SchemaMethod) 
+
+# global flag that indicates which thread (if any) is
+# running the agent notifier callback
+_callback_thread=None
+
+  ##==============================================================================
+  ## METHOD CALL
+  ##==============================================================================
+
+class _MethodCallHandle(object):
+    """
+    Private class used to hold context when handing off a method call to the
+    application.  Given to the app in a WorkItem, provided to the agent when
+    method_response() is invoked.
+    """
+    def __init__(self, correlation_id, reply_to, meth_name, _oid=None):
+        self.correlation_id = correlation_id
+        self.reply_to = reply_to
+        self.meth_name = meth_name
+        self.oid = _oid
+
+class MethodCallParams(object):
+    """
+    """
+    def __init__(self, name, _oid=None, _in_args=None, _user_id=None):
+        self._meth_name = name
+        self._oid = _oid
+        self._in_args = _in_args
+        self._user_id = _user_id
+
+    def get_name(self):
+        return self._meth_name
+
+    def get_object_id(self):
+        return self._oid
+
+    def get_args(self):
+        return self._in_args
+
+    def get_user_id(self):
+        return self._user_id
+
 
 
   ##==============================================================================
@@ -54,6 +97,8 @@
         self._schema_timestamp = long(0)
         self._schema = {}
         self._agent_data = {}
+        self._work_q = Queue.Queue()
+        self._work_q_put = False
 
     def get_name(self):
         return self.name
@@ -63,9 +108,9 @@
         locate_addr = QmfAddress.topic(AMQP_QMF_AGENT_LOCATE, self._domain)
         ind_addr = QmfAddress.topic(AMQP_QMF_AGENT_INDICATION, self._domain)
 
-        logging.debug("my direct addr=%s" % my_addr)
-        logging.debug("agent.locate addr=%s" % locate_addr)
-        logging.debug("agent.ind addr=%s" % ind_addr)
+        logging.error("my direct addr=%s" % my_addr)
+        logging.error("agent.locate addr=%s" % locate_addr)
+        logging.error("agent.ind addr=%s" % ind_addr)
 
         self._conn = conn
         self._session = self._conn.session()
@@ -133,30 +178,60 @@
             self._lock.release()
 
 
-    def methodResponse(self, context, status, text, arguments):
-        logging.error("!!!Agent.methodResponse() TBD!!!")
+    def method_response(self, handle, _out_args=None, _error=None): 
+        """
+        """
+        if not isinstance(handle, _MethodCallHandle):
+            raise TypeError("Invalid handle passed to method_response!")
+
+        _map = {SchemaMethod.KEY_NAME:handle.meth_name}
+        if handle.oid is not None:
+            _map[QmfData.KEY_OBJECT_ID] = handle.oid
+        if _out_args is not None:
+            _map[SchemaMethod.KEY_ARGUMENTS] = _out_args.copy()
+        if _error is not None:
+            if not isinstance(_error, QmfData):
+                raise TypeError("Invalid type for error - must be QmfData")
+            _map[SchemaMethod.KEY_ERROR] = _error.map_encode()
+
+        msg = Message(subject=makeSubject(OpCode.response),
+                      properties={"method":"response"},
+                      content={MsgKey.method:_map})
+        msg.correlation_id = handle.correlation_id
+
+        try:
+            tmp_snd = self._session.sender( handle.reply_to )
+            tmp_snd.send(msg)
+            logging.debug("method-response sent to [%s]" % handle.reply_to)
+        except SendError, e:
+            logging.error("Failed to send method response msg '%s' (%s)" % (msg, str(e)))
 
-    def getWorkItemCount(self): 
+    def get_workitem_count(self): 
         """ 
         Returns the count of pending WorkItems that can be retrieved.
         """
-        logging.error("!!!Agent.getWorkItemCount() TBD!!!")
+        return self._work_q.qsize()
 
-    def getNextWorkItem(self, timeout=None): 
+    def get_next_workitem(self, timeout=None): 
         """
         Obtains the next pending work item, or None if none available. 
         """
-        logging.error("!!!Agent.getNextWorkItem() TBD!!!")
+        try:
+            wi = self._work_q.get(True, timeout)
+        except Queue.Empty:
+            return None
+        return wi
 
-    def releaseWorkItem(self, wi): 
+    def release_workitem(self, wi): 
         """
         Releases a WorkItem instance obtained by getNextWorkItem(). Called when 
         the application has finished processing the WorkItem. 
         """
-        logging.error("!!!Agent.releaseWorkItem() TBD!!!")
+        pass
 
 
     def run(self):
+        global _callback_thread
         next_heartbeat = datetime.datetime.utcnow()
         while self._running:
 
@@ -174,19 +249,29 @@
             except Empty:
                 continue
 
-            try:
-                msg = self._locate_receiver.fetch(timeout = 0)
-            except Empty:
-                msg = None
-            if msg and msg.content_type == "amqp/map":
-                self._dispatch(msg, _direct=False)
-
-            try:
-                msg = self._direct_receiver.fetch(timeout = 0)
-            except Empty:
-                msg = None
-            if msg and msg.content_type == "amqp/map":
-                self._dispatch(msg, _direct=True)
+            while True:
+                try:
+                    msg = self._locate_receiver.fetch(timeout=0)
+                except Empty:
+                    break
+                if msg and msg.content_type == "amqp/map":
+                    self._dispatch(msg, _direct=False)
+
+            while True:
+                try:
+                    msg = self._direct_receiver.fetch(timeout=0)
+                except Empty:
+                    break
+                if msg and msg.content_type == "amqp/map":
+                    self._dispatch(msg, _direct=True)
+
+            if self._work_q_put and self._notifier:
+                # new stuff on work queue, kick the the application...
+                self._work_q_put = False
+                _callback_thread = currentThread()
+                logging.info("Calling agent notifier.indication")
+                self._notifier.indication()
+                _callback_thread = None
 
     #
     # Private:
@@ -227,7 +312,7 @@
         elif opcode == OpCode.get_query:
             self._handleQueryMsg( msg, cmap, props, version, _direct )
         elif opcode == OpCode.method_req:
-            logging.warning("!!! METHOD_REQ TBD !!!")
+            self._handleMethodReqMsg(msg, cmap, props, version, _direct)
         elif opcode == OpCode.cancel_subscription:
             logging.warning("!!! CANCEL_SUB TBD !!!")
         elif opcode == OpCode.create_subscription:
@@ -253,7 +338,7 @@
             query = cmap.get(MsgKey.query)
             if query is not None:
                 # fake a QmfData containing my identifier for the query compare
-                tmpData = QmfData(_values={"_name": self.get_name()})
+                tmpData = QmfData(_values={QmfQuery.KEY_AGENT_NAME: self.get_name()})
                 reply = QmfQuery(query).evaluate(tmpData)
 
         if reply:
@@ -278,24 +363,53 @@
         if "method" in props and props["method"] == "request":
             qmap = cmap.get(MsgKey.query)
             if qmap:
-                query = QmfQuery(qmap)
-                target = query.getTarget()
-                if target == QmfQuery._TARGET_PACKAGES:
+                query = QmfQuery.from_map(qmap)
+                target = query.get_target()
+                if target == QmfQuery.TARGET_PACKAGES:
                     self._queryPackages( msg, query )
-                elif target == QmfQuery._TARGET_SCHEMA_ID:
+                elif target == QmfQuery.TARGET_SCHEMA_ID:
                     self._querySchema( msg, query, _idOnly=True )
-                elif target == QmfQuery._TARGET_SCHEMA:
+                elif target == QmfQuery.TARGET_SCHEMA:
                     self._querySchema( msg, query)
-                elif target == QmfQuery._TARGET_AGENT:
-                    logging.warning("!!! Query TARGET=AGENT TBD !!!")
-                elif target == QmfQuery._TARGET_OBJECT_ID:
-                    logging.warning("!!! Query TARGET=OBJECT_ID TBD !!!")
-                elif target == QmfQuery._TARGET_OBJECT:
-                    logging.warning("!!! Query TARGET=OBJECT TBD !!!")
+                elif target == QmfQuery.TARGET_AGENT:
+                    logging.warning("!!! @todo: Query TARGET=AGENT TBD !!!")
+                elif target == QmfQuery.TARGET_OBJECT_ID:
+                    self._queryData(msg, query, _idOnly=True)
+                elif target == QmfQuery.TARGET_OBJECT:
+                    self._queryData(msg, query)
                 else:
                     logging.warning("Unrecognized query target: '%s'" % str(target))
 
 
+
+    def _handleMethodReqMsg(self, msg, cmap, props, version, _direct):
+        """
+        Process received Method Request
+        """
+        if "method" in props and props["method"] == "request":
+            mname = cmap.get(SchemaMethod.KEY_NAME)
+            if not mname:
+                logging.warning("Invalid method call from '%s': no name"
+                                % msg.reply_to)
+                return
+
+            in_args = cmap.get(SchemaMethod.KEY_ARGUMENTS)
+            oid = cmap.get(QmfData.KEY_OBJECT_ID)
+
+            print("!!! ci=%s rt=%s mn=%s oid=%s" % 
+                  (msg.correlation_id,
+                   msg.reply_to,
+                   mname,
+                   oid))
+
+            handle = _MethodCallHandle(msg.correlation_id,
+                                       msg.reply_to,
+                                       mname,
+                                       oid)
+            param = MethodCallParams( mname, oid, in_args, msg.user_id)
+            self._work_q.put(WorkItem(WorkItem.METHOD_CALL, handle, param))
+            self._work_q_put = True
+
     def _queryPackages(self, msg, query):
         """
         Run a query against the list of known packages
@@ -304,7 +418,7 @@
         self._lock.acquire()
         try:
             for name in self._packages.iterkeys():
-                if query.evaluate(QmfData.from_map({QmfQuery._PRED_PACKAGE:name})):
+                if query.evaluate(QmfData.create({SchemaClassId.KEY_PACKAGE:name})):
                     pnames.append(name)
         finally:
             self._lock.release()
@@ -326,36 +440,97 @@
         """
         """
         schemas = []
-        self._lock.acquire()
-        try:
-            for sid,val in self._schema.iteritems():
-                if query.evaluate(val):
-                    if _idOnly:
-                        schemas.append(sid.map_encode())
-                    else:
-                        schemas.append(val.map_encode())
-        finally:
-            self._lock.release()
+        # if querying for a specific schema, do a direct lookup
+        if query.get_selector() == QmfQuery.ID:
+            found = None
+            self._lock.acquire()
+            try:
+                found = self._schema.get(query.get_id())
+            finally:
+                self._lock.release()
+            if found:
+                if _idOnly:
+                    schemas.append(query.get_id().map_encode())
+                else:
+                    schemas.append(found.map_encode())
+        else: # otherwise, evaluate all schema
+            self._lock.acquire()
+            try:
+                for sid,val in self._schema.iteritems():
+                    if query.evaluate(val):
+                        if _idOnly:
+                            schemas.append(sid.map_encode())
+                        else:
+                            schemas.append(val.map_encode())
+            finally:
+                self._lock.release()
 
-        try:
-            tmp_snd = self._session.sender( msg.reply_to )
 
-            if _idOnly:
-                content = {MsgKey.schema_id: schemas}
-            else:
-                content = {MsgKey.schema:schemas}
+        tmp_snd = self._session.sender( msg.reply_to )
 
-            m = Message( subject=makeSubject(OpCode.data_ind),
-                         properties={"method":"response"},
-                         content=content )
-            if msg.correlation_id != None:
-                m.correlation_id = msg.correlation_id
+        if _idOnly:
+            content = {MsgKey.schema_id: schemas}
+        else:
+            content = {MsgKey.schema:schemas}
+
+        m = Message( subject=makeSubject(OpCode.data_ind),
+                     properties={"method":"response"},
+                     content=content )
+        if msg.correlation_id != None:
+            m.correlation_id = msg.correlation_id
+        try:
             tmp_snd.send(m)
             logging.debug("schema_id sent to [%s]" % msg.reply_to)
         except SendError, e:
             logging.error("Failed to send reply to query msg '%s' (%s)" % (msg, str(e)))
 
 
+    def _queryData( self, msg, query, _idOnly=False ):
+        """
+        """
+        data_objs = []
+        # if querying for a specific object, do a direct lookup
+        if query.get_selector() == QmfQuery.ID:
+            found = None
+            self._lock.acquire()
+            try:
+                found = self._agent_data.get(query.get_id())
+            finally:
+                self._lock.release()
+            if found:
+                if _idOnly:
+                    data_objs.append(query.get_id())
+                else:
+                    data_objs.append(found.map_encode())
+        else: # otherwise, evaluate all data
+            self._lock.acquire()
+            try:
+                for oid,val in self._agent_data.iteritems():
+                    if query.evaluate(val):
+                        if _idOnly:
+                            data_objs.append(oid)
+                        else:
+                            data_objs.append(val.map_encode())
+            finally:
+                self._lock.release()
+
+        tmp_snd = self._session.sender( msg.reply_to )
+
+        if _idOnly:
+            content = {MsgKey.object_id:data_objs}
+        else:
+            content = {MsgKey.data_obj:data_objs}
+
+        m = Message( subject=makeSubject(OpCode.data_ind),
+                     properties={"method":"response"},
+                     content=content )
+        if msg.correlation_id != None:
+            m.correlation_id = msg.correlation_id
+        try:
+            tmp_snd.send(m)
+            logging.debug("data reply sent to [%s]" % msg.reply_to)
+        except SendError, e:
+            logging.error("Failed to send reply to query msg '%s' (%s)" % (msg, str(e)))
 
 
   ##==============================================================================
@@ -407,7 +582,7 @@
 
 if __name__ == '__main__':
     # static test cases - no message passing, just exercise API
-    from qmfCommon import (AgentName, SchemaClassId, SchemaProperty, qmfTypes,
+    from qmfCommon import (AgentName, SchemaProperty, qmfTypes,
                            SchemaMethod, SchemaEventClass)
 
     logging.getLogger().setLevel(logging.INFO)
@@ -436,8 +611,8 @@
 
     # add method
     _meth = SchemaMethod(_desc="Method to set string and int in object." )
-    _meth.addArgument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) )
-    _meth.addArgument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) )
+    _meth.add_argument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) )
+    _meth.add_argument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) )
     _schema.add_method( "set_meth", _meth )
 
     # Add schema to Agent

Modified: qpid/branches/qmfv2/qpid/python/qmf/qmfCommon.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmfv2/qpid/python/qmf/qmfCommon.py?rev=899644&r1=899643&r2=899644&view=diff
==============================================================================
--- qpid/branches/qmfv2/qpid/python/qmf/qmfCommon.py (original)
+++ qpid/branches/qmfv2/qpid/python/qmf/qmfCommon.py Fri Jan 15 14:29:41 2010
@@ -49,6 +49,9 @@
     package_info = "package_info"
     schema_id = "schema_id"
     schema = "schema"
+    object_id="object_id"
+    data_obj="object"
+    method="method"
 
 
 class OpCode(object):
@@ -92,6 +95,11 @@
     return _sub[3:].split('.', 1)
 
 
+##==============================================================================
+## Async Event Model
+##==============================================================================
+
+
 class Notifier(object):
     """
     Virtual base class that defines a call back which alerts the application that
@@ -107,6 +115,48 @@
 
 
 
+class WorkItem(object):
+    """
+    Describes an event that has arrived for the application to process.  The
+    Notifier is invoked when one or more of these WorkItems become available
+    for processing. 
+    """
+    # Enumeration of the types of WorkItems produced on the Console
+    AGENT_ADDED=1
+    AGENT_DELETED=2
+    NEW_PACKAGE=3
+    NEW_CLASS=4
+    OBJECT_UPDATE=5
+    EVENT_RECEIVED=7
+    AGENT_HEARTBEAT=8
+    # Enumeration of the types of WorkItems produced on the Agent
+    METHOD_CALL=1000
+    QUERY=1001
+    SUBSCRIBE=1002
+    UNSUBSCRIBE=1003
+
+    def __init__(self, kind, handle, _params=None):
+        """
+        Used by the Console to create a work item.
+        
+        @type kind: int
+        @param kind: work item type
+        """
+        self._kind = kind
+        self._handle = handle
+        self._params = _params
+
+    def get_type(self):
+        return self._kind
+
+    def get_handle(self):
+        return self._handle
+
+    def get_params(self):
+        return self._params
+
+
+
 ##==============================================================================
 ## Addressing
 ##==============================================================================
@@ -155,15 +205,15 @@
     """
     _separator = ":"
 
-    def __init__(self, vendor, product, name, str_=None):
+    def __init__(self, vendor, product, name, _str=None):
         """
         Note: this object must be immutable, as it is used to index into a dictionary
         """
-        if str_:
+        if _str is not None:
             # construct from string representation
-            if _str.count(AgentId._separator) < 2:
-                raise TypeError("AgentId string format must be 'vendor.product.name'")
-            self._vendor, self._product, self._name = param.split(AgentId._separator)
+            if _str.count(AgentName._separator) < 2:
+                raise TypeError("AgentName string format must be 'vendor.product.name'")
+            self._vendor, self._product, self._name = _str.split(AgentName._separator)
         else:
             self._vendor = vendor
             self._product = product
@@ -299,9 +349,9 @@
                    _object_id=_object_id, _schema=_schema, _const=_const)
     create = classmethod(_create)
 
-    def _from_map(cls, map_, _schema=None, _const=False):
+    def __from_map(cls, map_, _schema=None, _const=False):
         return cls(_map=map_, _schema=_schema, _const=_const)
-    from_map = classmethod(_from_map)
+    from_map = classmethod(__from_map)
 
     def is_managed(self):
         return self._object_id is not None
@@ -671,9 +721,9 @@
 
 
 
-class MethodResponse(object):
-    def __init__(self, impl):
-        pass
+#class MethodResponse(object):
+#    def __init__(self, impl):
+#        pass
 #         self.impl = qmfengine.MethodResponse(impl)
 
 
@@ -756,140 +806,173 @@
 
 class QmfQuery(_mapEncoder):
 
-    _TARGET="what"
-    _PREDICATE="where"
+    KEY_TARGET="what"
+    KEY_PREDICATE="where"
+    KEY_ID="id"
+
+    ### Query Types
+    ID=1
+    PREDICATE=2
 
     #### Query Targets ####
-    _TARGET_PACKAGES="schema_package"
+    TARGET_PACKAGES="schema_package"
     # (returns just package names)
-    # predicate key(s):
+    # allowed predicate key(s):
     #
-    #_PRED_PACKAGE
-
+    # SchemaClassId.KEY_PACKAGE
 
-    _TARGET_SCHEMA_ID="schema_id"
-    _TARGET_SCHEMA="schema"
-    # predicate key(s):
+    TARGET_SCHEMA_ID="schema_id"
+    TARGET_SCHEMA="schema"
+    # allowed predicate key(s):
     #
-    #_PRED_PACKAGE
-    #_PRED_CLASS
-    #_PRED_TYPE
-    #_PRED_HASH
-    #_PRED_SCHEMA_ID
+    # SchemaClassId.KEY_PACKAGE
+    # SchemaClassId.KEY_CLASS
+    # SchemaClassId.KEY_TYPE
+    # SchemaClassId.KEY_HASH
+    # SchemaClass.KEY_SCHEMA_ID
     # name of property (exist test only)
     # name of method (exist test only)
 
-
-    _TARGET_AGENT="agent"
-    # predicate keys(s):
+    TARGET_AGENT="agent"
+    # allowed predicate keys(s):
     #
-    #_PRED_VENDOR="_vendor"
-    #_PRED_PRODUCT="_product"
-    #_PRED_NAME="_name"
-
-    _TARGET_OBJECT_ID="object_id"
-    _TARGET_OBJECT="object"
-    # package and class names must be suppled in the target value:
-    # predicate on all values or meta-values[tbd]
+    KEY_AGENT_NAME="_name"
+
+    TARGET_OBJECT_ID="object_id"
+    TARGET_OBJECT="object"
+    # allowed predicate keys(s):
     #
-    #_PRED_PACKAGE
-    #_PRED_CLASS
-    #_PRED_TYPE
-    #_PRED_HASH
-    #_primary_key
-    #_PRED_SCHEMA_ID
-    #_PRED_OBJECT_ID
-    #_PRED_UPDATE_TS
-    #_PRED_CREATE_TS
-    #_PRED_DELETE_TS
-    #<name of property>
-
-    _PRED_PACKAGE="_package_name"
-    _PRED_CLASS="_class_name"
-    _PRED_TYPE="_type"
-    _PRED_HASH="_hash_str"
-    _PRED_VENDOR="_vendor"
-    _PRED_PRODUCT="_product"
-    _PRED_NAME="_name"
-    _PRED_PRIMARY_KEY="_primary_key"
-    _PRED_SCHEMA_ID="_schema_id"
-    _PRED_OBJECT_ID="_object_id"
-    _PRED_UPDATE_TS="_update_ts"
-    _PRED_CREATE_TS="_create_ts"
-    _PRED_DELETE_TS="_delete_ts"
-
-    _CMP_EQ="eq"
-    _CMP_NE="ne"
-    _CMP_LT="lt"
-    _CMP_LE="le"
-    _CMP_GT="gt"
-    _CMP_GE="ge"
-    _CMP_RE_MATCH="re_match"
-    _CMP_EXISTS="exists"
-    _CMP_TRUE="true"
-    _CMP_FALSE="false"
-
-    _LOGIC_AND="and"
-    _LOGIC_OR="or"
-    _LOGIC_NOT="not"
-
-    _valid_targets = [_TARGET_PACKAGES, _TARGET_OBJECT_ID, _TARGET_SCHEMA, _TARGET_SCHEMA_ID, 
-                      _TARGET_OBJECT, _TARGET_AGENT]
-
-    def __init__(self, qmap):
-        """
-        """
-        self._target_map = None
-        self._predicate = None
-
-        if type(qmap) != dict:
-            raise TypeError("constructor must be of type dict")
-
-        if self._TARGET in qmap:
-            self._target_map = qmap[self._TARGET]
-            if self._PREDICATE in qmap:
-                self.setPredicate(qmap[self._PREDICATE])
-            return
-        else:
-            # assume qmap to be the target map
-            self._target_map = qmap[:]
+    # SchemaClassId.KEY_PACKAGE
+    # SchemaClassId.KEY_CLASS
+    # SchemaClassId.KEY_TYPE
+    # SchemaClassId.KEY_HASH
+    # QmfData.KEY_SCHEMA_ID
+    # QmfData.KEY_OBJECT_ID
+    # QmfData.KEY_UPDATE_TS
+    # QmfData.KEY_CREATE_TS
+    # QmfData.KEY_DELETE_TS
+    # <name of data value>
+
+    CMP_EQ="eq"
+    CMP_NE="ne"
+    CMP_LT="lt"
+    CMP_LE="le"
+    CMP_GT="gt"
+    CMP_GE="ge"
+    CMP_RE_MATCH="re_match"
+    CMP_EXISTS="exists"
+    CMP_TRUE="true"
+    CMP_FALSE="false"
+
+    LOGIC_AND="and"
+    LOGIC_OR="or"
+    LOGIC_NOT="not"
 
+    _valid_targets = [TARGET_PACKAGES, TARGET_OBJECT_ID, TARGET_SCHEMA, TARGET_SCHEMA_ID, 
+                      TARGET_OBJECT, TARGET_AGENT]
 
-    def setPredicate(self, predicate):
+    def __init__(self, _target=None, _target_params=None, _predicate=None,
+                 _id=None, _map=None):
         """
         """
-        if isinstance(predicate, QmfQueryPredicate):
-            self._predicate = predicate
-        elif type(predicate) == dict:
-            self._predicate = QmfQueryPredicate(predicate)
+        if _map is not None:
+            target_map = _map.get(self.KEY_TARGET)
+            if not target_map:
+                raise TypeError("QmfQuery requires a target map")
+
+            _target = None
+            for key in target_map.iterkeys():
+                if key in self._valid_targets:
+                    _target = key
+                    break
+
+            _target_params = target_map.get(_target)
+
+            _id = _map.get(self.KEY_ID)
+            if _id is not None:
+                # Convert identifier to native type if necessary
+                if _target == self.TARGET_SCHEMA:
+                    _id = SchemaClassId.from_map(_id)
+            else: 
+                pred = _map.get(self.KEY_PREDICATE)
+                if pred:
+                    _predicate = QmfQueryPredicate(pred)
+
+        self._target = _target
+        if not self._target:
+            raise TypeError("QmfQuery requires a target value")
+        self._target_params = _target_params
+        self._predicate = _predicate
+        self._id = _id
+
+    # constructors
+    def _create_wildcard(cls, target, _target_params=None):
+        return cls(_target=target, _target_params=_target_params)
+    create_wildcard = classmethod(_create_wildcard)
+
+    def _create_predicate(cls, target, predicate, _target_params=None): 
+        return cls(_target=target, _target_params=_target_params,
+                   _predicate=predicate)
+    create_predicate = classmethod(_create_predicate)
+
+    def _create_id(cls, target, ident, _target_params=None): 
+        return cls(_target=target, _target_params=_target_params, _id=ident)
+    create_id = classmethod(_create_id)
+
+    def _from_map(cls, map_):
+        return cls(_map=map_)
+    from_map = classmethod(_from_map)
+
+    def get_target(self):
+        return self._target
+
+    def get_target_param(self):
+        return self._target_params
+
+    def get_selector(self):
+        if self._id:
+            return QmfQuery.ID
         else:
-            raise TypeError("Invalid type for a predicate: %s" % str(predicate))
+            return QmfQuery.PREDICATE
+
+    def get_id(self):
+        return self._id
 
+    def get_predicate(self):
+        """
+        """
+        return self._predicate
 
     def evaluate(self, qmfData):
         """
         """
-        # @todo: how to interpred qmfData against target??????
-        #
+        if self._id:
+            if self._target == self.TARGET_SCHEMA:
+                return (qmfData.has_value(qmfData.KEY_SCHEMA_ID) and
+                        qmfData.get_value(qmfData.KEY_SCHEMA_ID) == self._id)
+            elif self._target == self.TARGET_OBJECT:
+                return (qmfData.has_value(qmfData.KEY_OBJECT_ID) and
+                        qmfData.get_value(qmfData.KEY_OBJECT_ID) == self._id)
+            elif self._target == self.TARGET_AGENT:
+                return (qmfData.has_value(self.KEY_AGENT_NAME) and
+                        qmfData.get_value(self.KEY_AGENT_NAME) == self._id)
+
+            raise Exception("Unsupported query target '%s'" % str(self._target))
+
         if self._predicate:
             return self._predicate.evaluate(qmfData)
-        # no predicate - always match
+        # no predicate and no id - always match
         return True
 
-    def getTarget(self):
-        for key in self._target_map.iterkeys():
-            if key in self._valid_targets:
-                return key
-        return None
-
-    def getPredicate(self):
-        return self._predicate
-
     def map_encode(self):
-        _map = {}
-        _map[self._TARGET] = self._target_map
-        if self._predicate is not None:
-            _map[self._PREDICATE] = self._predicate.map_encode()
+        _map = {self.KEY_TARGET: {self._target: self._target_params}}
+        if self._id is not None:
+            if isinstance(self._id, _mapEncoder):
+                _map[self.KEY_ID] = self._id.map_encode()
+            else:
+                _map[self.KEY_ID] = self._id
+        elif self._predicate is not None:
+            _map[self.KEY_PREDICATE] = self._predicate.map_encode()
         return _map
 
     def __repr__(self):
@@ -901,11 +984,11 @@
     """
     Class for Query predicates.
     """
-    _valid_cmp_ops = [QmfQuery._CMP_EQ, QmfQuery._CMP_NE, QmfQuery._CMP_LT, 
-                      QmfQuery._CMP_GT, QmfQuery._CMP_LE, QmfQuery._CMP_GE,
-                      QmfQuery._CMP_EXISTS, QmfQuery._CMP_RE_MATCH,
-                      QmfQuery._CMP_TRUE, QmfQuery._CMP_FALSE]
-    _valid_logic_ops = [QmfQuery._LOGIC_AND, QmfQuery._LOGIC_OR, QmfQuery._LOGIC_NOT]
+    _valid_cmp_ops = [QmfQuery.CMP_EQ, QmfQuery.CMP_NE, QmfQuery.CMP_LT, 
+                      QmfQuery.CMP_GT, QmfQuery.CMP_LE, QmfQuery.CMP_GE,
+                      QmfQuery.CMP_EXISTS, QmfQuery.CMP_RE_MATCH,
+                      QmfQuery.CMP_TRUE, QmfQuery.CMP_FALSE]
+    _valid_logic_ops = [QmfQuery.LOGIC_AND, QmfQuery.LOGIC_OR, QmfQuery.LOGIC_NOT]
 
 
     def __init__( self, pmap):
@@ -919,7 +1002,7 @@
         if type(pmap) == dict:
             for key in pmap.iterkeys():
                 if key in self._valid_cmp_ops:
-                    # coparison operation - may have "name" and "value"
+                    # comparison operation - may have "name" and "value"
                     self._oper = key
                     break
                 if key in self._valid_logic_ops:
@@ -955,16 +1038,16 @@
         if not isinstance(qmfData, QmfData):
             raise TypeError("Query expects to evaluate QmfData types.")
 
-        if self._oper == QmfQuery._CMP_TRUE:
+        if self._oper == QmfQuery.CMP_TRUE:
             logging.debug("query evaluate TRUE")
             return True
-        if self._oper == QmfQuery._CMP_FALSE:
+        if self._oper == QmfQuery.CMP_FALSE:
             logging.debug("query evaluate FALSE")
             return False
 
-        if self._oper in [QmfQuery._CMP_EQ, QmfQuery._CMP_NE, QmfQuery._CMP_LT, 
-                          QmfQuery._CMP_LE, QmfQuery._CMP_GT, QmfQuery._CMP_GE,
-                          QmfQuery._CMP_RE_MATCH]:
+        if self._oper in [QmfQuery.CMP_EQ, QmfQuery.CMP_NE, QmfQuery.CMP_LT, 
+                          QmfQuery.CMP_LE, QmfQuery.CMP_GT, QmfQuery.CMP_GE,
+                          QmfQuery.CMP_RE_MATCH]:
             if len(self._operands) != 2:
                 logging.warning("Malformed query compare expression received: '%s, %s'" %
                                 (self._oper, str(self._operands)))
@@ -982,13 +1065,13 @@
             logging.debug("query evaluate %s: '%s' '%s' '%s'" % 
                           (name, str(arg1), self._oper, str(arg2)))
             try:
-                if self._oper == QmfQuery._CMP_EQ: return arg1 == arg2
-                if self._oper == QmfQuery._CMP_NE: return arg1 != arg2
-                if self._oper == QmfQuery._CMP_LT: return arg1 < arg2
-                if self._oper == QmfQuery._CMP_LE: return arg1 <= arg2
-                if self._oper == QmfQuery._CMP_GT: return arg1 > arg2
-                if self._oper == QmfQuery._CMP_GE: return arg1 >= arg2
-                if self._oper == QmfQuery._CMP_RE_MATCH: 
+                if self._oper == QmfQuery.CMP_EQ: return arg1 == arg2
+                if self._oper == QmfQuery.CMP_NE: return arg1 != arg2
+                if self._oper == QmfQuery.CMP_LT: return arg1 < arg2
+                if self._oper == QmfQuery.CMP_LE: return arg1 <= arg2
+                if self._oper == QmfQuery.CMP_GT: return arg1 > arg2
+                if self._oper == QmfQuery.CMP_GE: return arg1 >= arg2
+                if self._oper == QmfQuery.CMP_RE_MATCH: 
                     logging.error("!!! RE QUERY TBD !!!")
                     return False
             except:
@@ -998,7 +1081,7 @@
             return False
 
 
-        if self._oper == QmfQuery._CMP_EXISTS:
+        if self._oper == QmfQuery.CMP_EXISTS:
             if len(self._operands) != 1:
                 logging.warning("Malformed query present expression received")
                 return False
@@ -1006,21 +1089,21 @@
             logging.debug("query evaluate PRESENT: [%s]" % str(name))
             return qmfData.has_value(name)
 
-        if self._oper == QmfQuery._LOGIC_AND:
+        if self._oper == QmfQuery.LOGIC_AND:
             logging.debug("query evaluate AND: '%s'" % str(self._operands))
             for exp in self._operands:
                 if not exp.evaluate(qmfData):
                     return False
             return True
 
-        if self._oper == QmfQuery._LOGIC_OR:
+        if self._oper == QmfQuery.LOGIC_OR:
             logging.debug("query evaluate OR: [%s]" % str(self._operands))
             for exp in self._operands:
                 if exp.evaluate(qmfData):
                     return True
             return False
 
-        if self._oper == QmfQuery._LOGIC_NOT:
+        if self._oper == QmfQuery.LOGIC_NOT:
             logging.debug("query evaluate NOT: [%s]" % str(self._operands))
             for exp in self._operands:
                 if exp.evaluate(qmfData):
@@ -1129,7 +1212,7 @@
     KEY_HASH="_hash_str"
 
     TYPE_DATA = "_data"
-    TYPE_EVENT = "event"
+    TYPE_EVENT = "_event"
 
     _valid_types=[TYPE_DATA, TYPE_EVENT]
     _schemaHashStrFormat = "%08x-%08x-%08x-%08x"
@@ -1333,7 +1416,7 @@
 
     def getAccess(self): return self._access
 
-    def isOptional(self): return self._isOptional
+    def is_optional(self): return self._isOptional
 
     def isIndex(self): return self._isIndex
 
@@ -1351,9 +1434,9 @@
 
     def isParentRef(self): return self._isParentRef
 
-    def getDirection(self): return self._dir
+    def get_direction(self): return self._dir
 
-    def getDefault(self): return self._default
+    def get_default(self): return self._default
 
     def map_encode(self):
         """
@@ -1402,7 +1485,11 @@
     map["arguments"] = map of "name"=<SchemaProperty> pairs.
     map["desc"] = str, description of the method
     """
-    def __init__(self, args={}, _desc=None, _map=None):
+    KEY_NAME="_name"
+    KEY_ARGUMENTS="_arguments"
+    KEY_DESC="_desc"
+    KEY_ERROR="_error"
+    def __init__(self, _args={}, _desc=None, _map=None):
         """
         Construct a SchemaMethod.
 
@@ -1412,14 +1499,16 @@
         @param _desc: Human-readable description of the schema
         """
         if _map is not None:
-            _desc = _map.get("desc")
-            margs = _map.get("arguments", args)
-            # margs are in map format - covert to SchemaProperty
-            args = {}
-            for name,val in margs.iteritems():
-                args[name] = SchemaProperty.from_map(val)
+            _desc = _map.get(self.KEY_DESC)
+            margs = _map.get(self.KEY_ARGUMENTS)
+            if margs:
+                # margs are in map format - covert to SchemaProperty
+                tmp_args = {}
+                for name,val in margs.iteritems():
+                    tmp_args[name] = SchemaProperty.from_map(val)
+                _args=tmp_args
 
-        self._arguments = args.copy()
+        self._arguments = _args.copy()
         self._desc = _desc
 
     # map constructor
@@ -1427,13 +1516,13 @@
         return cls(_map=map_)
     from_map = classmethod(_from_map)
 
-    def getDesc(self): return self._desc
+    def get_desc(self): return self._desc
 
-    def getArgCount(self): return len(self._arguments)
+    def get_arg_count(self): return len(self._arguments)
 
-    def getArguments(self): return self._arguments.copy()
+    def get_arguments(self): return self._arguments.copy()
 
-    def getArgument(self, name): return self._arguments[name]
+    def get_argument(self, name): return self._arguments.get(name)
 
     def add_argument(self, name, schema):
         """
@@ -1447,6 +1536,9 @@
         """
         if not isinstance(schema, SchemaProperty):
             raise TypeError("argument must be a SchemaProperty class")
+        # "Input" argument, by default
+        if schema._dir is None:
+            schema._dir = "I"
         self._arguments[name] = schema
 
     def map_encode(self):
@@ -1457,8 +1549,8 @@
         _args = {}
         for name,val in self._arguments.iteritems():
             _args[name] = val.map_encode()
-        _map["arguments"] = _args
-        if self._desc: _map["desc"] = self._desc
+        _map[self.KEY_ARGUMENTS] = _args
+        if self._desc: _map[self.KEY_DESC] = self._desc
         return _map
 
     def __repr__(self):
@@ -1493,7 +1585,6 @@
     map["_schema_id"] = map representation of a SchemaClassId instance
     map["_primary_key_names"] = order list of primary key names
     """
-    KEY_SCHEMA_ID="_schema_id"
     KEY_PRIMARY_KEY_NAMES="_primary_key_names"
     KEY_DESC = "_desc"
 
@@ -1671,6 +1762,11 @@
         if self._classId.get_type() != SchemaClassId.TYPE_DATA:
             raise TypeError("Invalid ClassId type for data schema: %s" % self._classId)
 
+    # map constructor
+    def __from_map(cls, map_):
+        return cls(_map=map_)
+    from_map = classmethod(__from_map)
+
     def get_id_names(self): 
         return self._object_id_names[:]
 
@@ -1725,6 +1821,11 @@
             raise TypeError("Invalid ClassId type for event schema: %s" %
                             self._classId)
 
+    # map constructor
+    def __from_map(cls, map_):
+        return cls(_map=map_)
+    from_map = classmethod(__from_map)
+
 
 
 

Modified: qpid/branches/qmfv2/qpid/python/qmf/qmfConsole.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmfv2/qpid/python/qmf/qmfConsole.py?rev=899644&r1=899643&r2=899644&view=diff
==============================================================================
--- qpid/branches/qmfv2/qpid/python/qmf/qmfConsole.py (original)
+++ qpid/branches/qmfv2/qpid/python/qmf/qmfConsole.py Fri Jan 15 14:29:41 2010
@@ -28,16 +28,18 @@
 from threading import currentThread
 from threading import Condition
 
-from qpid.messaging import *
+from qpid.messaging import Connection, Message, Empty, SendError
 
 from qmfCommon import (makeSubject, parseSubject, OpCode, QmfQuery, Notifier,
                        QmfQueryPredicate, MsgKey, QmfData, QmfAddress,
-                       AMQP_QMF_AGENT_LOCATE, AMQP_QMF_AGENT_INDICATION)  
+                       AMQP_QMF_AGENT_LOCATE, AMQP_QMF_AGENT_INDICATION,
+                       SchemaClass, SchemaClassId, SchemaEventClass,
+                       SchemaObjectClass, WorkItem, SchemaMethod)
 
 
 
 # global flag that indicates which thread (if any) is
-# running the console callback 
+# running the console notifier callback
 _callback_thread=None
 
 
@@ -243,13 +245,86 @@
         logging.error(" TBD!!!")
         return None
 
-    def invoke_method(self, name, _in_args=None, _reply_handle=None,
+    def invoke_method(self, name, _in_args={}, _reply_handle=None,
                       _timeout=None):
         """
         invoke the named method.
         """
-        logging.error(" TBD!!!")
-        return None
+        assert self._agent
+        assert self._agent._console
+
+        oid = self.get_object_id()
+        if oid is None:
+            raise ValueError("Cannot invoke methods on unmanaged objects.")
+
+        if _timeout is None:
+            _timeout = self._agent._console._reply_timeout
+
+        if _in_args:
+            _in_args = _in_args.copy()
+
+        if self._schema:
+            # validate
+            ms = self._schema.get_method(name)
+            if ms is None:
+                raise ValueError("Method '%s' is undefined." % ms)
+
+            for aname,prop in ms.get_arguments().iteritems():
+                if aname not in _in_args:
+                    if prop.get_default():
+                        _in_args[aname] = prop.get_default()
+                    elif not prop.is_optional():
+                        raise ValueError("Method '%s' requires argument '%s'"
+                                         % (name, aname))
+            for aname in _in_args.iterkeys():
+                prop = ms.get_argument(aname)
+                if prop is None:
+                    raise ValueError("Method '%s' does not define argument"
+                                     " '%s'" % (name, aname))
+                if "I" not in prop.get_direction():
+                    raise ValueError("Method '%s' argument '%s' is not an"
+                                     " input." % (name, aname)) 
+
+            # @todo check if value is correct (type, range, etc)
+
+        handle = self._agent._console._req_correlation.allocate()
+        if handle == 0:
+            raise Exception("Can not allocate a correlation id!")
+
+        _map = {self.KEY_OBJECT_ID:str(oid),
+                SchemaMethod.KEY_NAME:name}
+        if _in_args:
+            _map[SchemaMethod.KEY_ARGUMENTS] = _in_args
+
+        logging.debug("Sending method req to Agent (%s)" % time.time())
+        try:
+            self._agent._sendMethodReq(_map, handle)
+        except SendError, e:
+            logging.error(str(e))
+            self._agent._console._req_correlation.release(handle)
+            return None
+
+        # @todo async method calls!!!
+        if _reply_handle is not None:
+            print("ASYNC TBD")
+
+        logging.debug("Waiting for response to method req (%s)" % _timeout)
+        replyMsg = self._agent._console._req_correlation.get_data(handle, _timeout)
+        self._agent._console._req_correlation.release(handle)
+        if not replyMsg:
+            logging.debug("Agent method req wait timed-out.")
+            return None
+
+        _map = replyMsg.content.get(MsgKey.method)
+        if not _map:
+            logging.error("Invalid method call reply message")
+            return None
+
+        error=_map.get(SchemaMethod.KEY_ERROR)
+        if error:
+            return MethodResult(_error=QmfData.from_map(error))
+        else:
+            return MethodResult(_out_args=_map.get(SchemaMethod.KEY_ARGUMENTS))
 
 
 
@@ -364,6 +439,54 @@
         """
         pass
 
+
+    def invoke_method(self, name, _in_args={}, _reply_handle=None,
+                      _timeout=None): 
+        """
+        """
+        assert self._console
+
+        if _timeout is None:
+            _timeout = self._console._reply_timeout
+
+        if _in_args:
+            _in_args = _in_args.copy()
+
+        handle = self._console._req_correlation.allocate()
+        if handle == 0:
+            raise Exception("Can not allocate a correlation id!")
+
+        _map = {SchemaMethod.KEY_NAME:name}
+        if _in_args:
+            _map[SchemaMethod.KEY_ARGUMENTS] = _in_args
+
+        logging.debug("Sending method req to Agent (%s)" % time.time())
+        try:
+            self._sendMethodReq(_map, handle)
+        except SendError, e:
+            logging.error(str(e))
+            self._console._req_correlation.release(handle)
+            return None
+
+        # @todo async method calls!!!
+        if _reply_handle is not None:
+            print("ASYNC TBD")
+
+        logging.debug("Waiting for response to method req (%s)" % _timeout)
+        replyMsg = self._console._req_correlation.get_data(handle, _timeout)
+        self._console._req_correlation.release(handle)
+        if not replyMsg:
+            logging.debug("Agent method req wait timed-out.")
+            return None
+
+        _map = replyMsg.content.get(MsgKey.method)
+        if not _map:
+            logging.error("Invalid method call reply message")
+            return None
+
+        return MethodResult(_out_args=_map.get(SchemaMethod.KEY_ARGUMENTS),
+                            _error=_map.get(SchemaMethod.KEY_ERROR))
+
     def __repr__(self):
         return str(self._address)
     
@@ -379,45 +502,46 @@
         self._sendMsg( msg, correlation_id )
 
 
+    def _sendMethodReq(self, mr_map, correlation_id=None):
+        """
+        """
+        msg = Message(subject=makeSubject(OpCode.method_req),
+                      properties={"method":"request"},
+                      content=mr_map)
+        self._sendMsg( msg, correlation_id )
+
+
   ##==============================================================================
-  ## CONSOLE
+  ## METHOD CALL
   ##==============================================================================
 
+class MethodResult(object):
+    def __init__(self, _out_args=None, _error=None):
+        self._error = _error
+        self._out_args = _out_args
+
+    def succeeded(self): 
+        return self._error is None
+
+    def get_exception(self):
+        return self._error
+
+    def get_arguments(self): 
+        return self._out_args
+
+    def get_argument(self, name): 
+        arg = None
+        if self._out_args:
+            arg = self._out_args.get(name)
+        return arg
 
 
-class WorkItem(object):
-    """
-    Describes an event that has arrived at the Console for the
-    application to process.  The Notifier is invoked when one or 
-    more of these WorkItems become available for processing.
-    """
-    #
-    # Enumeration of the types of WorkItems produced by the Console
-    #
-    AGENT_ADDED = 1
-    AGENT_DELETED = 2
-    NEW_PACKAGE = 3
-    NEW_CLASS = 4
-    OBJECT_UPDATE = 5
-    EVENT_RECEIVED = 7
-    AGENT_HEARTBEAT = 8
-
-    def __init__(self, kind, kwargs={}):
-        """
-        Used by the Console to create a work item.
-        
-        @type kind: int
-        @param kind: work item type
-        """
-        self._kind = kind
-        self._param_map = kwargs
+  ##==============================================================================
+  ## CONSOLE
+  ##==============================================================================
 
 
-    def getType(self):
-        return self._kind
 
-    def getParams(self):
-        return self._param_map
 
 
 
@@ -465,6 +589,7 @@
         self._cv = Condition()
         # for passing WorkItems to the application
         self._work_q = Queue.Queue()
+        self._work_q_put = False
         ## Old stuff below???
         #self._broker_list = []
         #self.impl = qmfengine.Console()
@@ -512,14 +637,15 @@
                                                     " x-properties:"
                                                     " {type:direct}}}", 
                                                     capacity=1)
+        logging.error("local addr=%s" % self._address)
         ind_addr = QmfAddress.topic(AMQP_QMF_AGENT_INDICATION, self._domain)
-        logging.debug("agent.ind addr=%s" % ind_addr)
+        logging.error("agent.ind addr=%s" % ind_addr)
         self._announce_recvr = self._session.receiver(str(ind_addr) +
                                                       ";{create:always,"
                                                       " node-properties:{type:topic}}",
                                                       capacity=1)
         locate_addr = QmfAddress.topic(AMQP_QMF_AGENT_LOCATE, self._domain)
-        logging.debug("agent.locate addr=%s" % locate_addr)
+        logging.error("agent.locate addr=%s" % locate_addr)
         self._locate_sender = self._session.sender(str(locate_addr) +
                                                    ";{create:always,"
                                                    " node-properties:{type:topic}}")
@@ -615,9 +741,7 @@
                                               " x-properties:"
                                               " {type:direct}}}")
 
-            query = QmfQuery({QmfQuery._TARGET: {QmfQuery._TARGET_AGENT:None},
-                              QmfQuery._PREDICATE:
-                                  {QmfQuery._CMP_EQ: ["_name", name]}})
+            query = QmfQuery.create_id(QmfQuery.TARGET_AGENT, name)
             msg = Message(subject=makeSubject(OpCode.agent_locate),
                           properties={"method":"request"},
                           content={MsgKey.query: query.map_encode()})
@@ -630,7 +754,7 @@
             self._req_correlation.release(handle)
             return None
 
-        if not timeout:
+        if timeout is None:
             timeout = self._reply_timeout
 
         new_agent = None
@@ -650,6 +774,7 @@
         """
         """
 
+        target = query.get_target()
         handle = self._req_correlation.allocate()
         if handle == 0:
             raise Exception("Can not allocate a correlation id!")
@@ -667,15 +792,63 @@
         logging.debug("Waiting for response to Query (%s)" % timeout)
         reply = self._req_correlation.get_data(handle, timeout)
         self._req_correlation.release(handle)
-        logging.debug("Agent Query wait ended (%s)" % time.time())
-        if reply:
-            print("Agent Query Reply='%s'" % reply)
-            return reply.content
-        else:
-            print("Agent Query FAILED!!!")
+        if not reply:
+            logging.debug("Agent Query wait timed-out.")
             return None
 
-
+        if target == QmfQuery.TARGET_PACKAGES:
+            # simply pass back the list of package names
+            logging.debug("Response to Packet Query received")
+            return reply.content.get(MsgKey.package_info)
+        elif target == QmfQuery.TARGET_OBJECT_ID:
+            # simply pass back the list of object_id's
+            logging.debug("Response to Object Id Query received")
+            return reply.content.get(MsgKey.object_id)
+        elif target == QmfQuery.TARGET_SCHEMA_ID:
+            logging.debug("Response to Schema Id Query received")
+            id_list = []
+            for sid_map in reply.content.get(MsgKey.schema_id):
+                id_list.append(SchemaClassId.from_map(sid_map))
+            return id_list
+        elif target == QmfQuery.TARGET_SCHEMA:
+            logging.debug("Response to Schema Query received")
+            schema_list = []
+            for schema_map in reply.content.get(MsgKey.schema):
+                # extract schema id, convert based on schema type
+                sid_map = schema_map.get(SchemaClass.KEY_SCHEMA_ID)
+                if sid_map:
+                    sid = SchemaClassId.from_map(sid_map)
+                    if sid:
+                        if sid.get_type() == SchemaClassId.TYPE_DATA:
+                            schema = SchemaObjectClass.from_map(schema_map)
+                        else:
+                            schema = SchemaEventClass.from_map(schema_map)
+                        schema_list.append(schema)
+                        self._add_schema(schema)
+            return schema_list
+        elif target == QmfQuery.TARGET_OBJECT:
+            logging.debug("Response to Object Query received")
+            obj_list = []
+            for obj_map in reply.content.get(MsgKey.data_obj):
+                # if the object references a schema, fetch it
+                sid_map = obj_map.get(QmfData.KEY_SCHEMA_ID)
+                if sid_map:
+                    sid = SchemaClassId.from_map(sid_map)
+                    schema = self._fetch_schema(sid, _agent=agent,
+                                                _timeout=timeout)
+                    if not schema:
+                        logging.warning("Unknown schema, id=%s" % sid)
+                        continue
+                    obj = QmfConsoleData(map_=obj_map, agent=agent,
+                                         _schema=schema)
+                else:
+                    # no schema needed
+                    obj = QmfConsoleData(map_=obj_map, agent=agent)
+                obj_list.append(obj)
+            return obj_list
+        else:
+            logging.warning("Unexpected Target for a Query: '%s'" % target)
+            return None
 
     def run(self):
         global _callback_thread
@@ -684,30 +857,30 @@
         #
         while self._operational:
 
-            qLen = self._work_q.qsize()
-
-            try:
-                msg = self._announce_recvr.fetch(timeout = 0)
-                if msg:
-                    self._dispatch(msg, _direct=False)
-            except Empty:
-                pass
+            # qLen = self._work_q.qsize()
 
-            try:
-                msg = self._direct_recvr.fetch(timeout = 0)
-                if msg:
-                    self._dispatch(msg, _direct=True)
-            except Empty:
-                pass
+            while True:
+                try:
+                    msg = self._announce_recvr.fetch(timeout=0)
+                except Empty:
+                    break
+                self._dispatch(msg, _direct=False)
+
+            while True:
+                try:
+                    msg = self._direct_recvr.fetch(timeout = 0)
+                except Empty:
+                    break
+                self._dispatch(msg, _direct=True)
 
             self._expireAgents()   # check for expired agents
 
-            if qLen == 0 and self._work_q.qsize() and self._notifier:
-                # work queue went non-empty, kick
-                # the application...
-
+            #if qLen == 0 and self._work_q.qsize() and self._notifier:
+            if self._work_q_put and self._notifier:
+                # new stuff on work queue, kick the the application...
+                self._work_q_put = False
                 _callback_thread = currentThread()
-                logging.info("Calling console indication")
+                logging.info("Calling console notifier.indication")
                 self._notifier.indication()
                 _callback_thread = None
 
@@ -761,7 +934,7 @@
         elif opcode == OpCode.object_ind:
             logging.warning("!!! object_ind TBD !!!")
         elif opcode == OpCode.response:
-            logging.warning("!!! response TBD !!!")
+            self._handleResponseMsg(msg, cmap, version, _direct)
         elif opcode == OpCode.schema_ind:
             logging.warning("!!! schema_ind TBD !!!")
         elif opcode == OpCode.noop:
@@ -777,26 +950,29 @@
         """
         logging.debug("_handleAgentIndMsg '%s' (%s)" % (msg, time.time()))
 
-        if MsgKey.agent_info in cmap:
-            try:
-                # TODO: fix
-                name = cmap[MsgKey.agent_info]["_name"]
-            except:
-                logging.warning("Bad agent-ind message received: '%s'" % msg)
-                return
+        ai_map = cmap.get(MsgKey.agent_info)
+        if not ai_map or not isinstance(ai_map, type({})):
+            logging.warning("Bad agent-ind message received: '%s'" % msg)
+            return
+        name = ai_map.get("_name")
+        if not name:
+            logging.warning("Bad agent-ind message received: agent name missing"
+                            " '%s'" % msg)
+            return
 
         ignore = True
         matched = False
         correlated = False
+        agent_query = self._agent_discovery_filter
+
         if msg.correlation_id:
             correlated = self._req_correlation.isValid(msg.correlation_id)
+
         if direct and correlated:
             ignore = False
-        elif self._agent_discovery_filter:
-            logging.error("FIXME: agent discovery filter - new agent name style")
-            # matched = self._agent_discovery_filter.evaluate(QmfData(agent_id.mapEncode()))
-            # ignore = not matched
-            matched = True; ignore = False  # for now
+        elif agent_query:
+            matched = agent_query.evaluate(QmfData.create(values=ai_map))
+            ignore = not matched
 
         if not ignore:
             agent = None
@@ -820,9 +996,9 @@
 
             if old_timestamp == None and matched:
                 logging.error("AGENT_ADDED for %s (%s)" % (agent, time.time()))
-                wi = WorkItem(WorkItem.AGENT_ADDED,
-                              {"agent": agent})
+                wi = WorkItem(WorkItem.AGENT_ADDED, None, {"agent": agent})
                 self._work_q.put(wi)
+                self._work_q_put = True
 
             if correlated:
                 # wake up all waiters
@@ -847,6 +1023,22 @@
         self._req_correlation.put_data(msg.correlation_id, msg)
 
 
+    def _handleResponseMsg(self, msg, cmap, version, direct):
+        """
+        Process a received data-ind message.
+        """
+        # @todo code replication - clean me.
+        logging.debug("_handleResponseMsg '%s' (%s)" % (msg, time.time()))
+
+        if not self._req_correlation.isValid(msg.correlation_id):
+            logging.error("FIXME: uncorrelated response??? msg='%s'" % str(msg))
+            return
+
+        # wake up all waiters
+        logging.error("waking waiters for correlation id %s" % msg.correlation_id)
+        self._req_correlation.put_data(msg.correlation_id, msg)
+
+
     def _expireAgents(self):
         """
         Check for expired agents and issue notifications when they expire.
@@ -865,8 +1057,10 @@
                     if agent_deathtime <= now:
                         logging.debug("AGENT_DELETED for %s" % agent)
                         agent._announce_timestamp = None
-                        wi = WorkItem(WorkItem.AGENT_DELETED, {"agent":agent})
+                        wi = WorkItem(WorkItem.AGENT_DELETED, None,
+                                      {"agent":agent})
                         self._work_q.put(wi)
+                        self._work_q_put = True
                     else:
                         if (agent_deathtime - now) < next_expire_delta:
                             next_expire_delta = agent_deathtime - now
@@ -903,28 +1097,29 @@
 
         # new agent - query for its schema database for
         # seeding the schema cache (@todo)
-        # query = QmfQuery({QmfQuery._TARGET_SCHEMA_ID:None})
+        # query = QmfQuery({QmfQuery.TARGET_SCHEMA_ID:None})
         # agent._sendQuery( query )
 
         return agent
 
 
 
-    def enableAgentDiscovery(self, query=None):
+    def enable_agent_discovery(self, _query=None):
         """
         Called to enable the asynchronous Agent Discovery process.
         Once enabled, AGENT_ADD work items can arrive on the WorkQueue.
         """
-        if query:
-            if not isinstance(query, QmfQuery):
-                raise TypeError("Type QmfQuery expected")
-            self._agent_discovery_filter = query
+        # @todo: fix - take predicate only, not entire query!
+        if _query is not None:
+            if (not isinstance(_query, QmfQuery) or
+                _query.get_target() != QmfQuery.TARGET_AGENT):
+                raise TypeError("Type QmfQuery with target == TARGET_AGENT expected")
+            self._agent_discovery_filter = _query
         else:
             # create a match-all agent query (no predicate)
-            self._agent_discovery_filter = QmfQuery({QmfQuery._TARGET: 
-                                                     {QmfQuery._TARGET_AGENT:None}})
+            self._agent_discovery_filter = QmfQuery.create_wildcard(QmfQuery.TARGET_AGENT) 
 
-    def disableAgentDiscovery(self):
+    def disable_agent_discovery(self):
         """
         Called to disable the async Agent Discovery process enabled by
         calling enableAgentDiscovery()
@@ -933,7 +1128,7 @@
 
 
 
-    def getWorkItemCount(self):
+    def get_workitem_count(self):
         """
         Returns the count of pending WorkItems that can be retrieved.
         """
@@ -941,19 +1136,19 @@
 
 
 
-    def getNextWorkItem(self, timeout=None):
+    def get_next_workitem(self, timeout=None):
         """
         Returns the next pending work item, or None if none available.
         @todo: subclass and return an Empty event instead.
         """
         try:
             wi = self._work_q.get(True, timeout)
-        except:
+        except Queue.Empty:
             return None
         return wi
 
 
-    def releaseWorkItem(self, wi):
+    def release_workitem(self, wi):
         """
         Return a WorkItem to the Console when it is no longer needed.
         @todo: call Queue.task_done() - only 2.5+
@@ -963,6 +1158,50 @@
         """
         pass
 
+    def _add_schema(self, schema):
+        """
+        @todo
+        """
+        if not isinstance(schema, SchemaClass):
+            raise TypeError("SchemaClass type expected")
+
+        self._lock.acquire()
+        try:
+            sid = schema.get_class_id()
+            if not self._schema_cache.has_key(sid):
+                self._schema_cache[sid] = schema
+        finally:
+            self._lock.release()
+
+    def _fetch_schema(self, schema_id, _agent=None, _timeout=None):
+        """
+        Find the schema identified by schema_id.  If not in the cache, ask the
+        agent for it.
+        """
+        if not isinstance(schema_id, SchemaClassId):
+            raise TypeError("SchemaClassId type expected")
+
+        self._lock.acquire()
+        try:
+            schema = self._schema_cache.get(schema_id)
+            if schema:
+                return schema
+        finally:
+            self._lock.release()
+
+        if _agent is None:
+            return None
+
+        # note: doQuery will add the new schema to the cache automatically.
+        slist = self.doQuery(_agent,
+                             QmfQuery.create_id(QmfQuery.TARGET_SCHEMA, schema_id),
+                             _timeout)
+        if slist:
+            return slist[0]
+        else:
+            return None
+
+
 
     # def get_packages(self):
     #     plist = []
@@ -1341,9 +1580,7 @@
 
 if __name__ == '__main__':
     # temp test code
-    from qmfCommon import (qmfTypes, QmfData,
-                           QmfEvent, SchemaClassId, SchemaEventClass,
-                           SchemaProperty, SchemaObjectClass)
+    from qmfCommon import (qmfTypes, QmfEvent, SchemaProperty)
 
     logging.getLogger().setLevel(logging.INFO)
 
@@ -1362,7 +1599,7 @@
 
     _myConsole = Console(notifier=_noteMe)
 
-    _myConsole.enableAgentDiscovery()
+    _myConsole.enable_agent_discovery()
     logging.info("Waiting...")
 
 
@@ -1430,16 +1667,16 @@
                                            "statistics": { "amqp_type": qmfTypes.TYPE_DELTATIME,
                                                            "unit": "seconds",
                                                            "desc": "time until I retire"},
-                                           "meth1": {"desc": "A test method",
-                                                     "arguments":
+                                           "meth1": {"_desc": "A test method",
+                                                     "_arguments":
                                                          {"arg1": {"amqp_type": qmfTypes.TYPE_UINT32,
                                                                    "desc": "an argument 1",
                                                                    "dir":  "I"},
                                                           "arg2": {"amqp_type": qmfTypes.TYPE_BOOL,
                                                                    "dir":  "IO",
                                                                    "desc": "some weird boolean"}}},
-                                           "meth2": {"desc": "A test method",
-                                                     "arguments":
+                                           "meth2": {"_desc": "A test method",
+                                                     "_arguments":
                                                          {"m2arg1": {"amqp_type": qmfTypes.TYPE_UINT32,
                                                                      "desc": "an 'nuther argument",
                                                                      "dir":
@@ -1538,15 +1775,14 @@
 
     logging.info( "******** Messing around with Queries ********" )
 
-    _q1 = QmfQuery({QmfQuery._TARGET: {QmfQuery._TARGET_AGENT:None},
-                    QmfQuery._PREDICATE:
-                        {QmfQuery._LOGIC_AND: 
-                         [{QmfQuery._CMP_EQ: ["vendor",  "AVendor"]},
-                          {QmfQuery._CMP_EQ: ["product", "SomeProduct"]},
-                          {QmfQuery._CMP_EQ: ["name", "Thingy"]},
-                          {QmfQuery._LOGIC_OR:
-                               [{QmfQuery._CMP_LE: ["temperature", -10]},
-                                {QmfQuery._CMP_FALSE: None},
-                                {QmfQuery._CMP_EXISTS: ["namey"]}]}]}})
+    _q1 = QmfQuery.create_predicate(QmfQuery.TARGET_AGENT,
+                                    QmfQueryPredicate({QmfQuery.LOGIC_AND:
+                                                           [{QmfQuery.CMP_EQ: ["vendor",  "AVendor"]},
+                                                            {QmfQuery.CMP_EQ: ["product", "SomeProduct"]},
+                                                            {QmfQuery.CMP_EQ: ["name", "Thingy"]},
+                                                            {QmfQuery.LOGIC_OR:
+                                                                 [{QmfQuery.CMP_LE: ["temperature", -10]},
+                                                                  {QmfQuery.CMP_FALSE: None},
+                                                                  {QmfQuery.CMP_EXISTS: ["namey"]}]}]}))
 
-    print("_q1.mapEncode() = [%s]" % _q1)
+    print("_q1.mapEncode() = [%s]" % _q1.map_encode())

Modified: qpid/branches/qmfv2/qpid/python/qmf/test/agent_test.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmfv2/qpid/python/qmf/test/agent_test.py?rev=899644&r1=899643&r2=899644&view=diff
==============================================================================
--- qpid/branches/qmfv2/qpid/python/qmf/test/agent_test.py (original)
+++ qpid/branches/qmfv2/qpid/python/qmf/test/agent_test.py Fri Jan 15 14:29:41 2010
@@ -5,7 +5,8 @@
 
 from qpid.messaging import *
 from qmfCommon import (qmfTypes, SchemaProperty, SchemaObjectClass, QmfData,
-                       QmfEvent, SchemaMethod, Notifier, SchemaClassId) 
+                       QmfEvent, SchemaMethod, Notifier, SchemaClassId,
+                       WorkItem) 
 from qmfAgent import (Agent, QmfAgentData)
 
 
@@ -61,14 +62,14 @@
 
 # instantiate managed data objects matching the schema
 
-_obj = QmfAgentData( _agent, _schema=_schema )
-_obj.set_value("index1", 100)
-_obj.set_value("index2", "a name" )
-_obj.set_value("set_string", "UNSET")
-_obj.set_value("set_int", 0)
-_obj.set_value("query_count", 0)
-_obj.set_value("method_call_count", 0)
-_agent.add_object( _obj )
+_obj1 = QmfAgentData( _agent, _schema=_schema )
+_obj1.set_value("index1", 100)
+_obj1.set_value("index2", "a name" )
+_obj1.set_value("set_string", "UNSET")
+_obj1.set_value("set_int", 0)
+_obj1.set_value("query_count", 0)
+_obj1.set_value("method_call_count", 0)
+_agent.add_object( _obj1 )
 
 _agent.add_object( QmfAgentData( _agent, _schema=_schema,
                                 _values={"index1":99, 
@@ -78,26 +79,52 @@
                                          "query_count": 0,
                                          "method_call_count": 0} ))
 
+# add an "unstructured" object to the Agent
+_obj2 = QmfAgentData(_agent, _object_id="01545")
+_obj2.set_value("field1", "a value")
+_obj2.set_value("field2", 2)
+_obj2.set_value("field3", {"a":1, "map":2, "value":3})
+_obj2.set_value("field4", ["a", "list", "value"])
+_agent.add_object(_obj2)
+
+
 ## Now connect to the broker
 
 _c = Connection("localhost")
 _c.connect()
 _agent.setConnection(_c)
 
+_error_data = QmfData.create({"code": -1, "description": "You made a boo-boo."})
 
 _done = False
 while not _done:
-    try:
-        _notifier.waitForWork()
-
-        _wi = _agent.getNextWorkItem(timeout=0)
-        while _wi:
-            print("work item %d:%s" % (_wi.getType(), str(_wi.getParams())))
-            _agent.releaseWorkItem(_wi)
-            _wi = _agent.getNextWorkItem(timeout=0)
-    except:
-        print( "shutting down..." )
-        _done = True
+    # try:
+    _notifier.waitForWork()
+    
+    _wi = _agent.get_next_workitem(timeout=0)
+    while _wi:
+
+        if _wi.get_type() == WorkItem.METHOD_CALL:
+            mc = _wi.get_params()
+            
+            if mc.get_name() == "set_meth":
+                print("!!! Calling 'set_meth' on Object_id = %s" % mc.get_object_id())
+                print("!!! args='%s'" % str(mc.get_args()))
+                print("!!! userid=%s" % str(mc.get_user_id()))
+                print("!!! handle=%s" % _wi.get_handle())
+                _agent.method_response(_wi.get_handle(),
+                                       {"rc1": 100, "rc2": "Success"})
+            else:
+                print("!!! Unknown Method name = %s" % mc.get_name())
+                _agent.method_response(_wi.get_handle(), _error=_error_data)
+        else:
+            print("TBD: work item %d:%s" % (_wi.get_type(), str(_wi.get_params())))
+
+        _agent.release_workitem(_wi)
+        _wi = _agent.get_next_workitem(timeout=0)
+        #    except:
+        #        print( "shutting down...")
+        #        _done = True
 
 print( "Removing connection... TBD!!!" )
 #_myConsole.remove_connection( _c, 10 )

Modified: qpid/branches/qmfv2/qpid/python/qmf/test/console_test.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmfv2/qpid/python/qmf/test/console_test.py?rev=899644&r1=899643&r2=899644&view=diff
==============================================================================
--- qpid/branches/qmfv2/qpid/python/qmf/test/console_test.py (original)
+++ qpid/branches/qmfv2/qpid/python/qmf/test/console_test.py Fri Jan 15 14:29:41 2010
@@ -4,7 +4,8 @@
 
 
 from qpid.messaging import *
-from qmfCommon import (Notifier, QmfQuery, MsgKey, SchemaClassId, SchemaClass)
+from qmfCommon import (Notifier, QmfQuery, QmfQueryPredicate, MsgKey,
+                       SchemaClassId, SchemaClass, QmfData) 
 from qmfConsole import Console
 
 
@@ -33,64 +34,117 @@
 _myConsole = Console(notifier=_notifier)
 _myConsole.addConnection( _c )
 
-# Discover only agents from vendor "redhat.com" that 
-# are a "qmf" product....
+# Allow discovery only for the agent named "qmf.testAgent"
 # @todo: replace "manual" query construction with 
 # a formal class-based Query API
-_query = {QmfQuery._TARGET: 
-          {QmfQuery._TARGET_AGENT:None},
-          QmfQuery._PREDICATE:
-              {QmfQuery._CMP_EQ: ["_name",  "qmf.testAgent"]}}
-_query = QmfQuery(_query)
-
-_myConsole.enableAgentDiscovery(_query)
+_query = QmfQuery.create_predicate(QmfQuery.TARGET_AGENT, 
+                                   QmfQueryPredicate({QmfQuery.CMP_EQ:
+                                                          [QmfQuery.KEY_AGENT_NAME,
+                                                           "qmf.testAgent"]}))
+_myConsole.enable_agent_discovery(_query)
 
 _done = False
 while not _done:
 #    try:
     _notifier.waitForWork()
 
-    _wi = _myConsole.getNextWorkItem(timeout=0)
+    _wi = _myConsole.get_next_workitem(timeout=0)
     while _wi:
-        print("!!! work item received %d:%s" % (_wi.getType(),
-                                                str(_wi.getParams())))
+        print("!!! work item received %d:%s" % (_wi.get_type(),
+                                                str(_wi.get_params())))
 
 
-        if _wi.getType() == _wi.AGENT_ADDED:
-            _agent = _wi.getParams().get("agent")
+        if _wi.get_type() == _wi.AGENT_ADDED:
+            _agent = _wi.get_params().get("agent")
             if not _agent:
                 print("!!!! AGENT IN REPLY IS NULL !!! ")
 
-            _query = QmfQuery( {QmfQuery._TARGET: 
-                                {QmfQuery._TARGET_PACKAGES:None}} )
-
-            _reply = _myConsole.doQuery(_agent, _query)
-
-            package_list = _reply.get(MsgKey.package_info)
-            for pname in package_list:
-                print("!!! Querying for schema from package: %s" % pname)
-                _query = QmfQuery({QmfQuery._TARGET: 
-                                   {QmfQuery._TARGET_SCHEMA_ID:None},
-                                   QmfQuery._PREDICATE:
-                                       {QmfQuery._CMP_EQ: 
-                                        [SchemaClassId.KEY_PACKAGE, pname]}})
-
-                _reply = _myConsole.doQuery(_agent, _query)
+            _query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT_ID)
+            oid_list = _myConsole.doQuery(_agent, _query)
 
-                schema_id_list = _reply.get(MsgKey.schema_id)
-                for sid_map in schema_id_list:
-                    _query = QmfQuery({QmfQuery._TARGET: 
-                                       {QmfQuery._TARGET_SCHEMA:None},
-                                       QmfQuery._PREDICATE:
-                                           {QmfQuery._CMP_EQ: 
-                                            [SchemaClass.KEY_SCHEMA_ID, sid_map]}})
+            print("!!!************************** REPLY=%s" % oid_list)
 
-                    _reply = _myConsole.doQuery(_agent, _query)
+            for oid in oid_list:
+                _query = QmfQuery.create_id(QmfQuery.TARGET_OBJECT, 
+                                            oid)
+                obj_list = _myConsole.doQuery(_agent, _query)
+
+                print("!!!************************** REPLY=%s" % obj_list)
+
+                if obj_list is None:
+                    obj_list={}
+
+                for obj in obj_list:
+                    resp = obj.invoke_method( "set_meth", 
+                                              {"arg_int": -11,
+                                               "arg_str": "are we not goons?"},
+                                              None,
+                                              3)
+                    if resp is None:
+                        print("!!!*** NO RESPONSE FROM METHOD????") 
+                    else:
+                        print("!!! method succeeded()=%s" % resp.succeeded())
+                        print("!!! method exception()=%s" % resp.get_exception())
+                        print("!!! method get args() = %s" % resp.get_arguments())
+
+                    if not obj.is_described():
+                        resp = obj.invoke_method( "bad method", 
+                                                  {"arg_int": -11,
+                                                   "arg_str": "are we not goons?"},
+                                                  None,
+                                                  3)
+                        if resp is None:
+                            print("!!!*** NO RESPONSE FROM METHOD????") 
+                        else:
+                            print("!!! method succeeded()=%s" % resp.succeeded())
+                            print("!!! method exception()=%s" % resp.get_exception())
+                            print("!!! method get args() = %s" % resp.get_arguments())
+
+
+            #---------------------------------
+            #_query = QmfQuery.create_id(QmfQuery.TARGET_OBJECT, "99another name")
+
+            #obj_list = _myConsole.doQuery(_agent, _query)
+
+            #---------------------------------
+
+            # _query = QmfQuery.create_wildcard(QmfQuery.TARGET_PACKAGES)
+
+            # package_list = _myConsole.doQuery(_agent, _query)
+
+            # for pname in package_list:
+            #     print("!!! Querying for schema from package: %s" % pname)
+            #     _query = QmfQuery.create_predicate(QmfQuery.TARGET_SCHEMA_ID,
+            #                                        QmfQueryPredicate(
+            #             {QmfQuery.CMP_EQ: [SchemaClassId.KEY_PACKAGE, pname]}))
+
+            #     schema_id_list = _myConsole.doQuery(_agent, _query)
+            #     for sid in schema_id_list:
+            #         _query = QmfQuery.create_predicate(QmfQuery.TARGET_SCHEMA,
+            #                                            QmfQueryPredicate(
+            #                 {QmfQuery.CMP_EQ: [SchemaClass.KEY_SCHEMA_ID,
+            #                                    sid.map_encode()]}))
+
+            #         schema_list = _myConsole.doQuery(_agent, _query)
+            #         for schema in schema_list:
+            #             sid = schema.get_class_id()
+            #             _query = QmfQuery.create_predicate(
+            #                 QmfQuery.TARGET_OBJECT_ID,
+            #                 QmfQueryPredicate({QmfQuery.CMP_EQ:
+            #                                        [QmfData.KEY_SCHEMA_ID,
+            #                                         sid.map_encode()]}))
+
+            #             oid_list = _myConsole.doQuery(_agent, _query)
+            #             for oid in oid_list:
+            #                 _query = QmfQuery.create_id(
+            #                     QmfQuery.TARGET_OBJECT, oid)
+            #                 _reply = _myConsole.doQuery(_agent, _query)
 
+            #                 print("!!!************************** REPLY=%s" % _reply)
 
 
-        _myConsole.releaseWorkItem(_wi)
-        _wi = _myConsole.getNextWorkItem(timeout=0)
+        _myConsole.release_workitem(_wi)
+        _wi = _myConsole.get_next_workitem(timeout=0)
 #    except:
 #        logging.info( "shutting down..." )
 #        _done = True



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org