You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2010/01/11 21:46:07 UTC

svn commit: r898057 [1/3] - in /qpid/branches/qmfv2/qpid/python/qmf: qmfAgent.py qmfCommon.py qmfConsole.py test/agent_test.py test/console_test.py

Author: tross
Date: Mon Jan 11 20:46:07 2010
New Revision: 898057

URL: http://svn.apache.org/viewvc?rev=898057&view=rev
Log:
QPID-2261 - Patch from Ken Giusti committed to the qmfv2 branch.

Modified:
    qpid/branches/qmfv2/qpid/python/qmf/qmfAgent.py
    qpid/branches/qmfv2/qpid/python/qmf/qmfCommon.py
    qpid/branches/qmfv2/qpid/python/qmf/qmfConsole.py
    qpid/branches/qmfv2/qpid/python/qmf/test/agent_test.py
    qpid/branches/qmfv2/qpid/python/qmf/test/console_test.py

Modified: qpid/branches/qmfv2/qpid/python/qmf/qmfAgent.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmfv2/qpid/python/qmf/qmfAgent.py?rev=898057&r1=898056&r2=898057&view=diff
==============================================================================
--- qpid/branches/qmfv2/qpid/python/qmf/qmfAgent.py (original)
+++ qpid/branches/qmfv2/qpid/python/qmf/qmfAgent.py Mon Jan 11 20:46:07 2010
@@ -24,11 +24,10 @@
 from threading import Thread, Lock
 from qpid.messaging import Connection, Message, Empty, SendError
 from uuid import uuid4
-from qmfCommon import (AMQP_QMF_TOPIC, AMQP_QMF_DIRECT, AMQP_QMF_AGENT_LOCATE, 
-                       AMQP_QMF_AGENT_INDICATION, AgentId, QmfManaged, makeSubject,
-                       parseSubject, OpCode, QmfQuery, SchemaObjectClass, MsgKey, 
-                       QmfData)
-
+from qmfCommon import (AMQP_QMF_AGENT_LOCATE, AMQP_QMF_AGENT_INDICATION,
+                       makeSubject, parseSubject, OpCode, QmfQuery,
+                       SchemaObjectClass, MsgKey, QmfData, QmfAddress,
+                       SchemaClass) 
 
 
   ##==============================================================================
@@ -36,21 +35,18 @@
   ##==============================================================================
 
 class Agent(Thread):
-    def __init__(self, vendor, product, name=None,
-                 notifier=None, heartbeat_interval=30,
-                 kwargs={}):
+    def __init__(self, name, _domain=None, _notifier=None, _heartbeat_interval=30, 
+                 _max_msg_size=0, _capacity=10):
         Thread.__init__(self)
         self._running = False
-        self.vendor = vendor
-        self.product = product
-        if name:
-            self.name = name
-        else:
-            self.name = uuid4().get_urn().split(":")[2]
-        self._id = AgentId(self.vendor, self.product, self.name)
-        self._address = str(self._id)
-        self._notifier = notifier
-        self._heartbeat_interval = heartbeat_interval
+
+        self.name = str(name)
+        self._domain = _domain
+        self._notifier = _notifier
+        self._heartbeat_interval = _heartbeat_interval
+        self._max_msg_size = _max_msg_size
+        self._capacity = _capacity
+
         self._conn = None
         self._session = None
         self._lock = Lock()
@@ -59,33 +55,48 @@
         self._schema = {}
         self._agent_data = {}
 
-    def getAgentId(self):
-        return AgentId(self.vendor, self.product, self.name)
+    def get_name(self):
+        return self.name
 
     def setConnection(self, conn):
+        my_addr = QmfAddress.direct(self.name, self._domain)
+        locate_addr = QmfAddress.topic(AMQP_QMF_AGENT_LOCATE, self._domain)
+        ind_addr = QmfAddress.topic(AMQP_QMF_AGENT_INDICATION, self._domain)
+
+        logging.debug("my direct addr=%s" % my_addr)
+        logging.debug("agent.locate addr=%s" % locate_addr)
+        logging.debug("agent.ind addr=%s" % ind_addr)
+
         self._conn = conn
         self._session = self._conn.session()
-        self._locate_receiver = self._session.receiver(AMQP_QMF_AGENT_LOCATE, capacity=10)
-        self._direct_receiver = self._session.receiver(AMQP_QMF_DIRECT + "/" + self._address,
-                                                       capacity=10)
-        self._ind_sender = self._session.sender(AMQP_QMF_AGENT_INDICATION)
+        self._direct_receiver = self._session.receiver(str(my_addr) +
+                                                       ";{create:always,"
+                                                       " node-properties:"
+                                                       " {type:topic, x-properties: {type:direct}}}", 
+                                                       capacity=self._capacity)
+        self._locate_receiver = self._session.receiver(str(locate_addr) + 
+                                                       ";{create:always, node-properties:{type:topic}}",
+                                                       capacity=self._capacity)
+        self._ind_sender = self._session.sender(str(ind_addr) +
+                                                ";{create:always, node-properties:{type:topic}}")
+
         self._running = True
         self.start()
     
-    def registerObjectClass(self, schema):
+    def register_object_class(self, schema):
         """
-        Register an instance of a SchemaObjectClass with this agent
+        Register an instance of a SchemaClass with this agent
         """
         # @todo: need to update subscriptions
         # @todo: need to mark schema as "non-const"
-        if not isinstance(schema, SchemaObjectClass):
-            raise TypeError("SchemaObjectClass instance expected")
+        if not isinstance(schema, SchemaClass):
+            raise TypeError("SchemaClass instance expected")
 
         self._lock.acquire()
         try:
-            classId = schema.getClassId()
-            pname = classId.getPackageName()
-            cname = classId.getClassName()
+            classId = schema.get_class_id()
+            pname = classId.get_package_name()
+            cname = classId.get_class_name()
             if pname not in self._packages:
                 self._packages[pname] = [cname]
             else:
@@ -96,14 +107,13 @@
         finally:
             self._lock.release()
 
-
-    def registerEventClass(self, cls):
-        logging.error("!!!Agent.registerEventClass() TBD!!!")
+    def register_event_class(self, schema):
+        return self.register_object_class(schema)
 
     def raiseEvent(self, qmfEvent):
         logging.error("!!!Agent.raiseEvent() TBD!!!")
 
-    def addObject(self, data ):
+    def add_object(self, data ):
         """
         Register an instance of a QmfAgentData object.
         """
@@ -112,9 +122,13 @@
         if not isinstance(data, QmfAgentData):
             raise TypeError("QmfAgentData instance expected")
 
+        id_ = data.get_object_id()
+        if not id_:
+            raise TypeError("No identifier assigned to QmfAgentData!")
+
         self._lock.acquire()
         try:
-            self._agent_data[data.getObjectId()] = data
+            self._agent_data[id_] = data
         finally:
             self._lock.release()
 
@@ -147,37 +161,32 @@
         while self._running:
 
             now = datetime.datetime.utcnow()
-            print("now=%s next_heartbeat=%s" % (now, next_heartbeat))
+            # print("now=%s next_heartbeat=%s" % (now, next_heartbeat))
             if  now >= next_heartbeat:
                 self._ind_sender.send(self._makeAgentIndMsg())
                 logging.debug("Agent Indication Sent")
                 next_heartbeat = now + datetime.timedelta(seconds = self._heartbeat_interval)
 
             timeout = ((next_heartbeat - now) + datetime.timedelta(microseconds=999999)).seconds 
-            print("now=%s next_heartbeat=%s timeout=%s" % (now, next_heartbeat, timeout))
+            # print("now=%s next_heartbeat=%s timeout=%s" % (now, next_heartbeat, timeout))
             try:
-                logging.error("waiting for next rcvr (timeout=%s)..." % timeout)
-                self._session.next_receiver(timeout = timeout)
+                self._session.next_receiver(timeout=timeout)
             except Empty:
-                pass
-            except KeyboardInterrupt:
-                break
+                continue
 
             try:
                 msg = self._locate_receiver.fetch(timeout = 0)
-                if msg.content_type == "amqp/map":
-                    self._dispatch(msg, _direct=False)
             except Empty:
-                pass
+                msg = None
+            if msg and msg.content_type == "amqp/map":
+                self._dispatch(msg, _direct=False)
 
             try:
                 msg = self._direct_receiver.fetch(timeout = 0)
-                if msg.content_type == "amqp/map":
-                    self._dispatch(msg, _direct=True)
             except Empty:
-                pass
-
-
+                msg = None
+            if msg and msg.content_type == "amqp/map":
+                self._dispatch(msg, _direct=True)
 
     #
     # Private:
@@ -187,8 +196,8 @@
         """
         Create an agent indication message identifying this agent
         """
-        _map = self.getAgentId().mapEncode()
-        _map["schemaTimestamp"] = self._schema_timestamp
+        _map = {"_name": self.get_name(),
+                "_schema_timestamp": self._schema_timestamp}
         return Message( subject=makeSubject(OpCode.agent_ind),
                         properties={"method":"response"},
                         content={MsgKey.agent_info: _map})
@@ -241,9 +250,11 @@
 
         reply = True
         if "method" in props and props["method"] == "request":
-            if MsgKey.query in cmap:
-                agentIdMap = self.getAgentId().mapEncode()
-                reply = QmfQuery(cmap[MsgKey.query]).evaluate(QmfData(agentIdMap))
+            query = cmap.get(MsgKey.query)
+            if query is not None:
+                # fake a QmfData containing my identifier for the query compare
+                tmpData = QmfData(_values={"_name": self.get_name()})
+                reply = QmfQuery(query).evaluate(tmpData)
 
         if reply:
             try:
@@ -272,10 +283,9 @@
                 if target == QmfQuery._TARGET_PACKAGES:
                     self._queryPackages( msg, query )
                 elif target == QmfQuery._TARGET_SCHEMA_ID:
-                    self._querySchemaId( msg, query )
+                    self._querySchema( msg, query, _idOnly=True )
                 elif target == QmfQuery._TARGET_SCHEMA:
-                    logging.warning("!!! Query TARGET=SCHEMA TBD !!!")
-                    #self._querySchema( query.getPredicate(), _idOnly=False )
+                    self._querySchema( msg, query)
                 elif target == QmfQuery._TARGET_AGENT:
                     logging.warning("!!! Query TARGET=AGENT TBD !!!")
                 elif target == QmfQuery._TARGET_OBJECT_ID:
@@ -294,7 +304,7 @@
         self._lock.acquire()
         try:
             for name in self._packages.iterkeys():
-                if query.evaluate(QmfData(_props={QmfQuery._PRED_PACKAGE:name})):
+                if query.evaluate(QmfData.from_map({QmfQuery._PRED_PACKAGE:name})):
                     pnames.append(name)
         finally:
             self._lock.release()
@@ -312,23 +322,32 @@
             logging.error("Failed to send reply to query msg '%s' (%s)" % (msg, str(e)))
 
 
-    def _querySchemaId( self, msg, query ):
+    def _querySchema( self, msg, query, _idOnly=False ):
         """
         """
         schemas = []
         self._lock.acquire()
         try:
-            for schemaId in self._schema.iterkeys():
-                if query.evaluate(QmfData(_props={QmfQuery._PRED_PACKAGE:schemaId.getPackageName()})):
-                    schemas.append(schemaId.mapEncode())
+            for sid,val in self._schema.iteritems():
+                if query.evaluate(val):
+                    if _idOnly:
+                        schemas.append(sid.map_encode())
+                    else:
+                        schemas.append(val.map_encode())
         finally:
             self._lock.release()
 
         try:
             tmp_snd = self._session.sender( msg.reply_to )
+
+            if _idOnly:
+                content = {MsgKey.schema_id: schemas}
+            else:
+                content = {MsgKey.schema:schemas}
+
             m = Message( subject=makeSubject(OpCode.data_ind),
                          properties={"method":"response"},
-                         content={MsgKey.schema_id: schemas} )
+                         content=content )
             if msg.correlation_id != None:
                 m.correlation_id = msg.correlation_id
             tmp_snd.send(m)
@@ -340,35 +359,45 @@
 
 
   ##==============================================================================
-  ## OBJECTS
+  ## DATA MODEL
   ##==============================================================================
 
 
-class QmfAgentData(QmfManaged):
+class QmfAgentData(QmfData):
     """
     A managed data object that is owned by an agent.
     """
-    def __init__(self, _agent, _schema, _props={}):
-        """
-        @type _agent: class Agent
-        @param _agent: the agent that manages this object.
-        @type _schema: class SchemaObjectClass
-        @param _schema: the schema used to describe this data object
-        @type _props: map of "name"=<value> pairs
-        @param _props: initial values for all properties in this object
-        """
-        super(QmfAgentData, self).__init__(_agentId=_agent.getAgentId(), 
-                                           _schema=_schema, 
-                                           _props=_props)
+
+    def __init__(self, agent, _values={}, _subtypes={}, _tag=None, _object_id=None,
+                 _schema=None):
+        # timestamp in millisec since epoch UTC
+        ctime = long(time.time() * 1000)
+        super(QmfAgentData, self).__init__(_values=_values, _subtypes=_subtypes,
+                                           _tag=_tag, _ctime=ctime,
+                                           _utime=ctime, _object_id=_object_id,
+                                           _schema=_schema, _const=False)
+        self._agent = agent
 
     def destroy(self): 
-        self._timestamps[QmfManaged._ts_delete] = long(time.time() * 1000)
+        self._dtime = long(time.time() * 1000)
         # @todo: publish change
 
-    def setProperty( self, _name, _value):
-        super(QmfAgentData, self).setProperty(_name, _value)
+    def is_deleted(self): 
+        return self._dtime == 0
+
+    def set_value(self, _name, _value, _subType=None):
+        super(QmfAgentData, self).set_value(_name, _value, _subType)
         # @todo: publish change
 
+    def inc_value(self, name, delta):
+        """ add the delta to the property """
+        # @todo: need to take write-lock
+        logging.error(" TBD!!!")
+
+    def dec_value(self, name, delta): 
+        """ subtract the delta from the property """
+        # @todo: need to take write-lock
+        logging.error(" TBD!!!")
 
 
 ################################################################################
@@ -377,22 +406,87 @@
 ################################################################################
 
 if __name__ == '__main__':
-    import time
+    # static test cases - no message passing, just exercise API
+    from qmfCommon import (AgentName, SchemaClassId, SchemaProperty, qmfTypes,
+                           SchemaMethod, SchemaEventClass)
+
     logging.getLogger().setLevel(logging.INFO)
-    logging.info( "Starting Connection" )
-    _c = Connection("localhost")
-    _c.connect()
-    #c.start()
-
-    logging.info( "Starting Agent" )
-    _agent = Agent("redhat.com", "agent", "tross")
-    _agent.setConnection(_c)
 
-    logging.info( "Running Agent" )
-    
-    while True:
-        try:
-            time.sleep(10)
-        except KeyboardInterrupt:
-            break
-    
+    logging.info( "Create an Agent" )
+    _agent_name = AgentName("redhat.com", "agent", "tross")
+    _agent = Agent(str(_agent_name))
+
+    logging.info( "Get agent name: '%s'" % _agent.get_name())
+
+    logging.info( "Create SchemaObjectClass" )
+
+    _schema = SchemaObjectClass(SchemaClassId("MyPackage", "MyClass"),
+                                _desc="A test data schema",
+                                _object_id_names=["index1", "index2"])
+    # add properties
+    _schema.add_property("index1", SchemaProperty(qmfTypes.TYPE_UINT8))
+    _schema.add_property("index2", SchemaProperty(qmfTypes.TYPE_LSTR)) 
+
+    # these two properties are statistics
+    _schema.add_property("query_count", SchemaProperty(qmfTypes.TYPE_UINT32))
+    _schema.add_property("method_call_count", SchemaProperty(qmfTypes.TYPE_UINT32))
+    # These two properties can be set via the method call
+    _schema.add_property("set_string", SchemaProperty(qmfTypes.TYPE_LSTR))
+    _schema.add_property("set_int", SchemaProperty(qmfTypes.TYPE_UINT32))
+
+    # add method
+    _meth = SchemaMethod(_desc="Method to set string and int in object." )
+    _meth.addArgument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) )
+    _meth.addArgument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) )
+    _schema.add_method( "set_meth", _meth )
+
+    # Add schema to Agent
+
+    print("Schema Map='%s'" % str(_schema.map_encode()))
+
+    _agent.register_object_class(_schema)
+
+    # instantiate managed data objects matching the schema
+
+    logging.info( "Create QmfAgentData" )
+
+    _obj = QmfAgentData( _agent, _schema=_schema )
+    _obj.set_value("index1", 100)
+    _obj.set_value("index2", "a name" )
+    _obj.set_value("set_string", "UNSET")
+    _obj.set_value("set_int", 0)
+    _obj.set_value("query_count", 0)
+    _obj.set_value("method_call_count", 0)
+
+    print("Obj1 Map='%s'" % str(_obj.map_encode()))
+
+    _agent.add_object( _obj )
+
+    _obj = QmfAgentData( _agent, 
+                         _values={"index1":99, 
+                                  "index2": "another name",
+                                  "set_string": "UNSET",
+                                  "set_int": 0,
+                                  "query_count": 0,
+                                  "method_call_count": 0},
+                         _schema=_schema)
+
+    print("Obj2 Map='%s'" % str(_obj.map_encode()))
+
+    _agent.add_object(_obj)
+
+    ##############
+
+
+
+    logging.info( "Create SchemaEventClass" )
+
+    _event = SchemaEventClass(SchemaClassId("MyPackage", "MyEvent",
+                                            stype=SchemaClassId.TYPE_EVENT),
+                              _desc="A test data schema",
+                              _props={"edata_1": SchemaProperty(qmfTypes.TYPE_UINT32)})
+    _event.add_property("edata_2", SchemaProperty(qmfTypes.TYPE_LSTR)) 
+
+    print("Event Map='%s'" % str(_event.map_encode()))
+
+    _agent.register_event_class(_event)



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org