You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2010/03/01 17:44:57 UTC

svn commit: r917584 - in /qpid/trunk/qpid/extras/qmf/src/py/qmf2: agent.py common.py console.py tests/subscriptions.py

Author: kgiusti
Date: Mon Mar  1 16:44:56 2010
New Revision: 917584

URL: http://svn.apache.org/viewvc?rev=917584&view=rev
Log:
QPID-2261: object subscriptions, and tests

Modified:
    qpid/trunk/qpid/extras/qmf/src/py/qmf2/agent.py
    qpid/trunk/qpid/extras/qmf/src/py/qmf2/common.py
    qpid/trunk/qpid/extras/qmf/src/py/qmf2/console.py
    qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py

Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf2/agent.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf2/agent.py?rev=917584&r1=917583&r2=917584&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/qmf/src/py/qmf2/agent.py (original)
+++ qpid/trunk/qpid/extras/qmf/src/py/qmf2/agent.py Mon Mar  1 16:44:56 2010
@@ -83,37 +83,6 @@
   ## SUBSCRIPTIONS
   ##==============================================================================
 
-class _ConsoleHandle(object):
-    """
-    """
-    def __init__(self, handle, reply_to):
-        self.console_handle = handle
-        self.reply_to = reply_to
-
-class SubscriptionParams(object):
-    """
-    """
-    def __init__(self, console_handle, query, interval, duration, user_id):
-        self._console_handle = console_handle
-        self._query = query
-        self._interval = interval
-        self._duration = duration
-        self._user_id = user_id
-
-    def get_console_handle(self):
-        return self._console_handle
-
-    def get_query(self):
-        return self._query
-
-    def get_interval(self):
-        return self._interval
-
-    def get_duration(self):
-        return self._duration
-
-    def get_user_id(self):
-        return self._user_id
 
 class _SubscriptionState(object):
     """
@@ -128,6 +97,7 @@
         now = datetime.datetime.utcnow()
         self.next_update = now  # do an immediate update
         self.expiration = now + datetime.timedelta(seconds=duration)
+        self.last_update = None
         self.id = 0
 
     def resubscribe(self, now, _duration=None):
@@ -135,9 +105,9 @@
             self.duration = _duration
         self.expiration = now + datetime.timedelta(seconds=self.duration)
 
-    def reset_interval(self, now):
+    def published(self, now):
         self.next_update = now + datetime.timedelta(seconds=self.interval)
-
+        self.last_update = now
 
 
   ##==============================================================================
@@ -193,6 +163,9 @@
         self._subscriptions = {}
         self._next_subscribe_event = None
 
+        # prevents multiple _wake_thread() calls
+        self._noop_pending = False
+
 
     def destroy(self, timeout=None):
         """
@@ -264,18 +237,7 @@
         self._running = False
         if self.isAlive():
             # kick my thread to wake it up
-            try:
-                msg = Message(id=QMF_APP_ID,
-                              subject=self.name,
-                              properties={ "method":"request",
-                                          "qmf.opcode":OpCode.noop},
-                              content={})
-
-                # TRACE
-                #logging.error("!!! sending wakeup to myself: %s" % msg)
-                self._direct_sender.send( msg, sync=True )
-            except SendError, e:
-                logging.error(str(e))
+            self._wake_thread()
             logging.debug("waiting for agent receiver thread to exit")
             self.join(timeout)
             if self.isAlive():
@@ -349,7 +311,6 @@
         """
         Register an instance of a QmfAgentData object.
         """
-        # @todo: need to update subscriptions
         # @todo: need to mark schema as "non-const"
         if not isinstance(data, QmfAgentData):
             raise TypeError("QmfAgentData instance expected")
@@ -369,6 +330,18 @@
                     self._described_data[sid][oid] = data
             else:
                 self._undescribed_data[oid] = data
+
+            # does the new object match any subscriptions?
+            now = datetime.datetime.utcnow()
+            for sid,sub in self._subscriptions.iteritems():
+                if sub.query.evaluate(data):
+                    # matched.  Mark the subscription as needing to be
+                    # serviced. The _publish() method will notice the new
+                    # object and will publish it next time it runs.
+                    sub.next_update = now
+                    self._next_subscribe_event = None
+                    # @todo: should we immediately publish?
+
         finally:
             self._lock.release()
 
@@ -387,7 +360,7 @@
         return data
 
 
-    def method_response(self, handle, _out_args=None, _error=None): 
+    def method_response(self, handle, _out_args=None, _error=None):
         """
         """
         if not isinstance(handle, _MethodCallHandle):
@@ -489,50 +462,37 @@
                 logging.debug("Agent Indication Sent")
                 next_heartbeat = now + datetime.timedelta(seconds = self._heartbeat_interval)
 
-
-
             #
             # Monitor Subscriptions
             #
-            if (self._next_subscribe_event is None or
-                now >= self._next_subscribe_event):
-
-                logging.debug("%s polling subscriptions..." % self.name)
-                self._next_subscribe_event = now + datetime.timedelta(seconds=
+            self._lock.acquire()
+            try:
+                now = datetime.datetime.utcnow()
+                if (self._next_subscribe_event is None or
+                    now >= self._next_subscribe_event):
+                    logging.debug("%s polling subscriptions..." % self.name)
+                    self._next_subscribe_event = now + datetime.timedelta(seconds=
                                                                       self._max_duration)
-                self._lock.acquire()
-                try:
-                    dead_ss = []
+                    dead_ss = {}
                     for sid,ss in self._subscriptions.iteritems():
                         if now >= ss.expiration:
-                            dead_ss.append(sid)
+                            dead_ss[sid] = ss
                             continue
                         if now >= ss.next_update:
-                            response = []
-                            objs = self._queryData(ss.query)
-                            if objs:
-                                for obj in objs:
-                                    response.append(obj.map_encode())
-                                logging.debug("!!! %s publishing %s!!!" % (self.name, ss.correlation_id))
-                                self._send_query_response( ContentType.data,
-                                                           ss.correlation_id,
-                                                           ss.reply_to,
-                                                           response)
-                            ss.reset_interval(now)
-
+                            self._publish(ss)
                         next_timeout = min(ss.expiration, ss.next_update)
                         if next_timeout < self._next_subscribe_event:
                             self._next_subscribe_event = next_timeout
 
-                    for sid in dead_ss:
+                    for sid,ss in dead_ss.iteritems():
                         del self._subscriptions[sid]
-                finally:
-                    self._lock.release()
+                        self._unpublish(ss)
+            finally:
+                self._lock.release()
 
             #
             # notify application of pending WorkItems
             #
-
             if self._work_q_put and self._notifier:
                 logging.debug("%s notifying application..." % self.name)
                 # new stuff on work queue, kick the the application...
@@ -545,10 +505,22 @@
             #
             # Sleep until messages arrive or something times out
             #
-            next_timeout = min(next_heartbeat, self._next_subscribe_event)
-            timeout = timedelta_to_secs(next_timeout -
-                                        datetime.datetime.utcnow())
-            if timeout > 0.0:
+            now = datetime.datetime.utcnow()
+            next_timeout = next_heartbeat
+            self._lock.acquire()
+            try:
+                # the mailbox expire flag may be cleared by the
+                # app thread(s) in order to force an immediate publish
+                if self._next_subscribe_event is None:
+                    next_timeout = now
+                elif self._next_subscribe_event < next_timeout:
+                    next_timeout = self._next_subscribe_event
+            finally:
+                self._lock.release()
+
+            timeout = timedelta_to_secs(next_timeout - now)
+
+            if self._running and timeout > 0.0:
                 logging.debug("%s sleeping %s seconds..." % (self.name,
                                                              timeout))
                 try:
@@ -557,7 +529,7 @@
                     pass
 
 
-
+        logging.debug("Shutting down Agent %s thread" % self.name)
 
     #
     # Private:
@@ -667,6 +639,7 @@
         elif opcode == OpCode.subscribe_cancel_ind:
             self._handleUnsubscribeReqMsg(msg, cmap, props, version, _direct)
         elif opcode == OpCode.noop:
+            self._noop_pending = False
             logging.debug("No-op msg received.")
         else:
             logging.warning("Ignoring message with unrecognized 'opcode' value: '%s'"
@@ -950,10 +923,13 @@
             self._lock.acquire()
             try:
                 if sid in self._subscriptions:
+                    dead_sub = self._subscriptions[sid]
                     del self._subscriptions[sid]
             finally:
                 self._lock.release()
 
+            self._unpublish(dead_sub)
+
 
     def _queryPackagesReply(self, msg, query):
         """
@@ -1100,6 +1076,70 @@
 
         return data_objs
 
+    def _publish(self, sub):
+        """ Publish a subscription.
+        """
+        response = []
+        now = datetime.datetime.utcnow()
+        objs = self._queryData(sub.query)
+        if objs:
+            for obj in objs:
+                if sub.id not in obj._subscriptions:
+                    # new to subscription - publish it
+                    obj._subscriptions[sub.id] = sub
+                    response.append(obj.map_encode())
+                elif obj._dtime:
+                    # obj._dtime is millisec since utc.  Convert to datetime
+                    utcdt = datetime.datetime.utcfromtimestamp(obj._dtime/1000.0)
+                    if utcdt > sub.last_update:
+                        response.append(obj.map_encode())
+                else:
+                    # obj._utime is millisec since utc.  Convert to datetime
+                    utcdt = datetime.datetime.utcfromtimestamp(obj._utime/1000.0)
+                    if utcdt > sub.last_update:
+                        response.append(obj.map_encode())
+
+            if response:
+                logging.debug("!!! %s publishing %s!!!" % (self.name, sub.correlation_id))
+                self._send_query_response( ContentType.data,
+                                           sub.correlation_id,
+                                           sub.reply_to,
+                                           response)
+        sub.published(now)
+
+    def _unpublish(self, sub):
+        """ This subscription is about to be deleted, remove it from any
+        referencing objects.
+        """
+        objs = self._queryData(sub.query)
+        if objs:
+            for obj in objs:
+                if sub.id in obj._subscriptions:
+                    del obj._subscriptions[sub.id]
+
+
+
+    def _wake_thread(self):
+        """
+        Make the agent management thread loop wakeup from its next_receiver
+        sleep.
+        """
+        self._lock.acquire()
+        try:
+            if not self._noop_pending:
+                logging.debug("Sending noop to wake up [%s]" % self._address)
+                msg = Message(id=QMF_APP_ID,
+                              subject=self.name,
+                              properties={"method":"indication",
+                                          "qmf.opcode":OpCode.noop},
+                              content={})
+                try:
+                    self._direct_sender.send( msg, sync=True )
+                    self._noop_pending = True
+                except SendError, e:
+                    logging.error(str(e))
+        finally:
+            self._lock.release()
 
 
   ##==============================================================================
@@ -1164,6 +1204,7 @@
         self._agent = agent
         self._validated = False
         self._modified = True
+        self._subscriptions = {}
 
     def destroy(self): 
         self._dtime = long(time.time() * 1000)
@@ -1175,7 +1216,8 @@
 
     def set_value(self, _name, _value, _subType=None):
         super(QmfAgentData, self).set_value(_name, _value, _subType)
-        self._touch()
+        self._utime = long(time.time() * 1000)
+        self._touch(_name)
         # @todo: publish change
 
     def inc_value(self, name, delta=1):
@@ -1191,8 +1233,12 @@
     def dec_value(self, name, delta=1): 
         """ subtract the delta from the property """
         # @todo: need to take write-lock
-        logging.error(" TBD!!!")
-        self._touch()
+        val = self.get_value(name)
+        try:
+            val -= delta
+        except:
+            raise
+        self.set_value(name, val)
 
     def validate(self):
         """
@@ -1212,12 +1258,32 @@
                     raise Exception("Required property '%s' not present." % name)
         self._validated = True
 
-    def _touch(self):
+    def _touch(self, field=None):
         """
         Mark this object as modified.  Used to force a publish of this object
         if on subscription.
         """
-        self._modified = True
+        now = datetime.datetime.utcnow()
+        publish = False
+        if field:
+            # if the named field is not continuous, mark any subscriptions as
+            # needing to be published.
+            sid = self.get_schema_class_id()
+            if sid:
+                self._agent._lock.acquire()
+                try:
+                    schema = self._agent._schema.get(sid)
+                    if schema:
+                        prop = schema.get_property(field)
+                        if prop and not prop.is_continuous():
+                            for sid,sub in self._subscriptions.iteritems():
+                                sub.next_update = now
+                                publish = True
+                    if publish:
+                        self._agent._next_subscribe_event = None
+                        self._agent._wake_thread()
+                finally:
+                    self._agent._lock.release()
 
 
 

Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf2/common.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf2/common.py?rev=917584&r1=917583&r2=917584&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/qmf/src/py/qmf2/common.py (original)
+++ qpid/trunk/qpid/extras/qmf/src/py/qmf2/common.py Mon Mar  1 16:44:56 2010
@@ -1447,12 +1447,17 @@
     map["unit"] = str, describes units used
     map["min"] = int, minimum allowed value
     map["max"] = int, maximun allowed value
-    map["maxlen"] = int, if string type, this is the maximum length in bytes 
+    map["maxlen"] = int, if string type, this is the maximum length in bytes
     required to represent the longest instance of this string.
     map["desc"] = str, human-readable description of this argument
     map["reference"] = str, ???
     map["parent_ref"] = bool, true if this property references an object  in
     which this object is in a child-parent relationship. Default False
+    map["continuous"] = bool, true if the value potentially changes too fast to
+    be directly monitorable.  Example: fast changing statistic or random
+    number. Subscriptions to objects containing continuous data will publish
+    only on an interval basis, rather than every time the data changes. Default
+    False.
     """
     __hash__ = None
     _access_strings = ["RO","RW","RC"]
@@ -1479,6 +1484,7 @@
         self._isParentRef  = False
         self._dir = None
         self._default = None
+        self._is_continuous = False
 
         for key, value in kwargs.items():
             if key == "access":
@@ -1495,6 +1501,8 @@
             elif key == "desc"    : self._desc    = value
             elif key == "reference" : self._reference = value
             elif key == "parent_ref"   : self._isParentRef = _to_bool(value)
+            elif key == "parent_ref"   : self._isParentRef = _to_bool(value)
+            elif key == "continuous"   : self._is_continuous = _to_bool(value)
             elif key == "dir":
                 value = str(value).upper()
                 if value not in self._dir_strings:
@@ -1503,7 +1511,7 @@
             elif key == "default" : self._default = value
 
     # constructor
-    def _create(cls, type_code, kwargs={}):
+    def _create(cls, type_code, **kwargs):
         return cls(_type_code=type_code, kwargs=kwargs)
     create = classmethod(_create)
 
@@ -1538,6 +1546,8 @@
 
     def get_default(self): return self._default
 
+    def is_continuous(self): return self._is_continuous
+
     def map_encode(self):
         """
         Return the map encoding of this schema.
@@ -1556,6 +1566,7 @@
         _map["parent_ref"] = self._isParentRef
         if self._dir: _map["dir"] = self._dir
         if self._default: _map["default"] = self._default
+        if self._is_continuous: _map["continuous"] = self._is_continuous
         return _map
 
     def __repr__(self): 
@@ -1568,6 +1579,7 @@
         hasher.update(str(self._type))
         hasher.update(str(self._isIndex))
         hasher.update(str(self._isOptional))
+        hasher.update(str(self._is_continuous))
         if self._access: hasher.update(self._access)
         if self._unit: hasher.update(self._unit)
         if self._desc: hasher.update(self._desc)
@@ -1575,7 +1587,6 @@
         if self._default: hasher.update(self._default)
 
 
-
 class SchemaMethod(_mapEncoder):
     """ 
     The SchemaMethod class describes the method's structure, and contains a

Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf2/console.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf2/console.py?rev=917584&r1=917583&r2=917584&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/qmf/src/py/qmf2/console.py (original)
+++ qpid/trunk/qpid/extras/qmf/src/py/qmf2/console.py Mon Mar  1 16:44:56 2010
@@ -1456,7 +1456,7 @@
         logging.debug("Sending noop to wake up [%s]" % self._address)
         msg = Message(id=QMF_APP_ID,
                       subject=self._name,
-                      properties={"method":"request",
+                      properties={"method":"indication",
                                   "qmf.opcode":OpCode.noop},
                       content={})
         try:
@@ -1510,30 +1510,31 @@
                 self._notifier.indication()
                 _callback_thread = None
 
-            if self._operational:
-                # wait for a message to arrive, or an agent
-                # to expire, or a mailbox requrest to time out
-                now = datetime.datetime.utcnow()
-                next_expire = self._next_agent_expire
 
-                # the mailbox expire flag may be cleared by the
-                # app thread(s)
-                self._lock.acquire()
-                try:
-                    if (self._next_mbox_expire and
-                        self._next_mbox_expire < next_expire):
-                        next_expire = self._next_mbox_expire
-                finally:
-                    self._lock.release()
+            # wait for a message to arrive, or an agent
+            # to expire, or a mailbox requrest to time out
+            now = datetime.datetime.utcnow()
+            next_expire = self._next_agent_expire
+
+            self._lock.acquire()
+            try:
+            # the mailbox expire flag may be cleared by the
+            # app thread(s) to force an immedate mailbox scan
+                if self._next_mbox_expire is None:
+                    next_expire = now
+                elif self._next_mbox_expire < next_expire:
+                    next_expire = self._next_mbox_expire
+            finally:
+                self._lock.release()
 
-                if next_expire > now:
-                    timeout = timedelta_to_secs(next_expire - now)
-                    try:
-                        logging.debug("waiting for next rcvr (timeout=%s)..." % timeout)
-                        xxx = self._session.next_receiver(timeout = timeout)
-                    except Empty:
-                        pass
+            timeout = timedelta_to_secs(next_expire - now)
 
+            if self._operational and timeout > 0.0:
+                try:
+                    logging.debug("waiting for next rcvr (timeout=%s)..." % timeout)
+                    self._session.next_receiver(timeout = timeout)
+                except Empty:
+                    pass
 
         logging.debug("Shutting down Console thread")
 
@@ -1742,8 +1743,8 @@
 
         mbox = self._get_mailbox(msg.correlation_id)
         if not mbox:
-            logging.debug("Response msg received with unknown correlation_id"
-                          " msg='%s'" % str(msg))
+            logging.warning("Response msg received with unknown correlation_id"
+                            " msg='%s'" % str(msg))
             return
 
         # wake up all waiters

Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py?rev=917584&r1=917583&r2=917584&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py (original)
+++ qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py Mon Mar  1 16:44:56 2010
@@ -75,7 +75,13 @@
                                      _object_id_names=["key"] )
 
         _schema.add_property( "key", SchemaProperty(qmfTypes.TYPE_LSTR))
-        _schema.add_property( "count1", SchemaProperty(qmfTypes.TYPE_UINT32))
+
+        # note: count1 is continuous, count2 is not
+        count1_prop = SchemaProperty.create(qmfTypes.TYPE_UINT32,
+                                            continuous=True)
+        _schema.add_property( "count1", count1_prop)
+        count2_prop = SchemaProperty.create(qmfTypes.TYPE_UINT32,
+                                            continuous=False)
         _schema.add_property( "count2", SchemaProperty(qmfTypes.TYPE_UINT32))
 
         self.agent.register_object_class(_schema)
@@ -224,7 +230,7 @@
         # create console
         # find all agents
         # subscribe to changes to any object in package1/class1
-        # should succeed
+        # should succeed - verify 1 publish
         self.notifier = _testNotifier()
         self.console = qmf2.console.Console(notifier=self.notifier,
                                               agent_timeout=3)
@@ -288,10 +294,10 @@
 
                 wi = self.console.get_next_workitem(timeout=0)
 
-        # for now, I expect 5 publish per subscription
-        self.assertTrue(r_count == 5 * len(subscriptions))
+        # expect 1 publish per subscription
+        self.assertTrue(r_count == 5)
         for ii in range(len(subscriptions)):
-            self.assertTrue(subscriptions[ii][1] == 5)
+            self.assertTrue(subscriptions[ii][1] == 1)
 
         self.console.destroy(10)
 
@@ -349,21 +355,17 @@
                 self.assertTrue(len(reply) == 1)
                 self.assertTrue(isinstance(reply[0], QmfData))
                 self.assertTrue(reply[0].get_object_id() == "undesc-2")
-                # print("!!! get_params() = %s" % wi.get_params())
                 self.assertTrue(wi.get_handle() < len(subscriptions))
                 subscriptions[wi.get_handle()][1] += 1
-                # self.assertTrue(isinstance(reply, qmf2.console.MethodResult))
-                # self.assertTrue(reply.succeeded())
-                # self.assertTrue(reply.get_argument("cookie") ==
-                # wi.get_handle())
+
                 self.console.release_workitem(wi)
 
                 wi = self.console.get_next_workitem(timeout=0)
 
-        # for now, I expect 5 publish per subscription
-        self.assertTrue(r_count == 5 * len(subscriptions))
-        #for ii in range(len(subscriptions)):
-        #    self.assertTrue(subscriptions[ii][1] == 5)
+        # expect 1 publish per subscription
+        self.assertTrue(r_count == 5)
+        for ii in range(len(subscriptions)):
+            self.assertTrue(subscriptions[ii][1] == 1)
 
         self.console.destroy(10)
 
@@ -426,18 +428,15 @@
                 self.assertTrue(sid.get_class_name() == "class1")
                 self.assertTrue(wi.get_handle() < len(subscriptions))
                 subscriptions[wi.get_handle()][1] += 1
-                # self.assertTrue(isinstance(reply, qmf2.console.MethodResult))
-                # self.assertTrue(reply.succeeded())
-                # self.assertTrue(reply.get_argument("cookie") ==
-                # wi.get_handle())
+
                 self.console.release_workitem(wi)
 
                 wi = self.console.get_next_workitem(timeout=0)
 
-        # for now, I expect 5 publish per subscription
-        self.assertTrue(r_count == 5 * len(subscriptions))
-        #for ii in range(len(subscriptions)):
-        #    self.assertTrue(subscriptions[ii][1] == 5)
+        # expect 1 publish per subscription
+        self.assertTrue(r_count == 5)
+        for ii in range(len(subscriptions)):
+            self.assertTrue(subscriptions[ii][1] == 1)
 
         self.console.destroy(10)
 
@@ -459,9 +458,9 @@
         self.conn.connect()
         self.console.add_connection(self.conn)
 
-        # query to match object "p2c1_key2" in schema package2/class1
-        sid = SchemaClassId.create("package2", "class1")
-        query = QmfQuery.create_id_object("p2c1_key2", sid)
+        # query to match object "p1c1_key2" in schema package1/class1
+        sid = SchemaClassId.create("package1", "class1")
+        query = QmfQuery.create_id_object("p1c1_key2", sid)
 
         agent_app = self.agents[0]
         aname = agent_app.agent.get_name()
@@ -489,13 +488,20 @@
                 self.assertTrue(isinstance(reply, type([])))
                 self.assertTrue(len(reply) == 1)
                 self.assertTrue(isinstance(reply[0], QmfData))
-                self.assertTrue(reply[0].get_object_id() == "p2c1_key2")
+                self.assertTrue(reply[0].get_object_id() == "p1c1_key2")
                 sid = reply[0].get_schema_class_id()
                 self.assertTrue(isinstance(sid, SchemaClassId))
-                self.assertTrue(sid.get_package_name() == "package2")
+                self.assertTrue(sid.get_package_name() == "package1")
                 self.assertTrue(sid.get_class_name() == "class1")
                 self.assertTrue(wi.get_handle() == "my-handle")
 
+                # count1 is continuous, touching it will force a
+                # publish on the interval
+                self.assertTrue(sid is not None)
+                test_obj = agent_app.agent.get_object("p1c1_key2", sid)
+                self.assertTrue(test_obj is not None)
+                test_obj.set_value("count1", r_count)
+
                 self.console.release_workitem(wi)
 
                 if r_count == 3:
@@ -504,11 +510,8 @@
 
                 wi = self.console.get_next_workitem(timeout=0)
 
-        # for now, I expect 5 publish per subscription
+        # expect 5 publish per subscription, more if refreshed
         self.assertTrue(r_count > 5)
-        # print("!!! total r_count=%d" % r_count)
-        #for ii in range(len(subscriptions)):
-        #    self.assertTrue(subscriptions[ii][1] == 5)
 
         self.console.destroy(10)
 
@@ -530,9 +533,9 @@
         self.conn.connect()
         self.console.add_connection(self.conn)
 
-        # query to match object "p2c1_key2" in schema package2/class1
-        sid = SchemaClassId.create("package2", "class1")
-        query = QmfQuery.create_id_object("p2c1_key2", sid)
+        # query to match object "p1c1_key2" in schema package1/class1
+        sid = SchemaClassId.create("package1", "class1")
+        query = QmfQuery.create_id_object("p1c1_key2", sid)
 
         agent_app = self.agents[0]
         aname = agent_app.agent.get_name()
@@ -560,13 +563,20 @@
                 self.assertTrue(isinstance(reply, type([])))
                 self.assertTrue(len(reply) == 1)
                 self.assertTrue(isinstance(reply[0], QmfData))
-                self.assertTrue(reply[0].get_object_id() == "p2c1_key2")
+                self.assertTrue(reply[0].get_object_id() == "p1c1_key2")
                 sid = reply[0].get_schema_class_id()
                 self.assertTrue(isinstance(sid, SchemaClassId))
-                self.assertTrue(sid.get_package_name() == "package2")
+                self.assertTrue(sid.get_package_name() == "package1")
                 self.assertTrue(sid.get_class_name() == "class1")
                 self.assertTrue(wi.get_handle() == "my-handle")
 
+                # count1 is continuous, touching it will force a
+                # publish on the interval
+                self.assertTrue(sid is not None)
+                test_obj = agent_app.agent.get_object("p1c1_key2", sid)
+                self.assertTrue(test_obj is not None)
+                test_obj.set_value("count1", r_count)
+
                 self.console.release_workitem(wi)
 
                 if r_count == 3:
@@ -574,10 +584,8 @@
 
                 wi = self.console.get_next_workitem(timeout=0)
 
-        # for now, I expect 5 publish per subscription full duration
-        self.assertTrue(r_count < 5)
-        #for ii in range(len(subscriptions)):
-        #    self.assertTrue(subscriptions[ii][1] == 5)
+        # expect only 3 publish received before cancel
+        self.assertTrue(r_count == 3)
 
         self.console.destroy(10)
 
@@ -645,8 +653,8 @@
 
                 wi = self.console.get_next_workitem(timeout=0)
 
-        # for now, I expect 5 publish per subscription
-        self.assertTrue(r_count == 6)
+        # one response + one publish = 2
+        self.assertTrue(r_count == 2)
 
         self.console.destroy(10)
 
@@ -665,9 +673,9 @@
         self.conn.connect()
         self.console.add_connection(self.conn)
 
-        # query to match object "p2c1_key2" in schema package2/class1
-        sid = SchemaClassId.create("package2", "class1")
-        query = QmfQuery.create_id_object("p2c1_key2", sid)
+        # query to match object "p1c1_key2" in schema package1/class1
+        sid = SchemaClassId.create("package1", "class1")
+        query = QmfQuery.create_id_object("p1c1_key2", sid)
 
         agent_app = self.agents[0]
         aname = agent_app.agent.get_name()
@@ -685,6 +693,7 @@
         # refresh after three subscribe indications, count all
         # indications to verify refresh worked
         r_count = 0
+        i_count = 0
         sp = None
         rp = None
         while self.notifier.wait_for_work(4):
@@ -706,20 +715,28 @@
                 else:
                     self.assertTrue(wi.get_type() ==
                                     WorkItem.SUBSCRIBE_INDICATION)
+                    i_count += 1
                     # sp better be set up by now!
                     self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams))
                     reply = wi.get_params()
                     self.assertTrue(isinstance(reply, type([])))
                     self.assertTrue(len(reply) == 1)
                     self.assertTrue(isinstance(reply[0], QmfData))
-                    self.assertTrue(reply[0].get_object_id() == "p2c1_key2")
+                    self.assertTrue(reply[0].get_object_id() == "p1c1_key2")
                     sid = reply[0].get_schema_class_id()
                     self.assertTrue(isinstance(sid, SchemaClassId))
-                    self.assertTrue(sid.get_package_name() == "package2")
+                    self.assertTrue(sid.get_package_name() == "package1")
                     self.assertTrue(sid.get_class_name() == "class1")
                     self.assertTrue(wi.get_handle() == "my-handle")
 
-                    if r_count == 4:  # + 1 for subscribe reply
+                    # count1 is continuous, touching it will force a
+                    # publish on the interval
+                    self.assertTrue(sid is not None)
+                    test_obj = agent_app.agent.get_object("p1c1_key2", sid)
+                    self.assertTrue(test_obj is not None)
+                    test_obj.set_value("count1", r_count)
+
+                    if r_count == 4:  # 3 data + 1 subscribe reply
                         rp = self.console.refresh_subscription(sp.get_subscription_id())
                         self.assertTrue(rp)
 
@@ -727,8 +744,9 @@
 
                 wi = self.console.get_next_workitem(timeout=0)
 
-        # for now, I expect 5 publish per subscription, + 2 replys
-        self.assertTrue(r_count > 7)
+        # expect 5 publish per subscription, more if refreshed
+        self.assertTrue(sp is not None and rp is not None)
+        self.assertTrue(i_count > 5)
 
         self.console.destroy(10)
 
@@ -748,9 +766,9 @@
         self.conn.connect()
         self.console.add_connection(self.conn)
 
-        # query to match object "p2c1_key2" in schema package2/class1
-        sid = SchemaClassId.create("package2", "class1")
-        query = QmfQuery.create_id_object("p2c1_key2", sid)
+        # query to match object "p1c1_key2" in schema package1/class1
+        sid = SchemaClassId.create("package1", "class1")
+        query = QmfQuery.create_id_object("p1c1_key2", sid)
 
         agent_app = self.agents[0]
         aname = agent_app.agent.get_name()
@@ -765,8 +783,6 @@
                                               _blocking=False)
         self.assertTrue(rc)
 
-        # refresh after three subscribe indications, count all
-        # indications to verify refresh worked
         r_count = 0
         sp = None
         rp = None
@@ -789,20 +805,220 @@
                     self.assertTrue(isinstance(reply, type([])))
                     self.assertTrue(len(reply) == 1)
                     self.assertTrue(isinstance(reply[0], QmfData))
-                    self.assertTrue(reply[0].get_object_id() == "p2c1_key2")
+                    self.assertTrue(reply[0].get_object_id() == "p1c1_key2")
                     sid = reply[0].get_schema_class_id()
                     self.assertTrue(isinstance(sid, SchemaClassId))
-                    self.assertTrue(sid.get_package_name() == "package2")
+                    self.assertTrue(sid.get_package_name() == "package1")
                     self.assertTrue(sid.get_class_name() == "class1")
                     self.assertTrue(wi.get_handle() == "my-handle")
 
+                    # count1 is continuous, touching it will force a
+                    # publish on the interval
+                    self.assertTrue(sid is not None)
+                    test_obj = agent_app.agent.get_object("p1c1_key2", sid)
+                    self.assertTrue(test_obj is not None)
+                    test_obj.set_value("count1", r_count)
+
+                if r_count == 3:
                     self.console.cancel_subscription(sp.get_subscription_id())
 
                 self.console.release_workitem(wi)
 
                 wi = self.console.get_next_workitem(timeout=0)
 
-        # for now, I expect 1 subscribe reply and 1 data_indication
-        self.assertTrue(r_count == 2)
+        # expect cancel after 3 replies
+        self.assertTrue(r_count == 3)
+
+        self.console.destroy(10)
+
+
+
+
+    def test_sync_periodic_publish_continuous(self):
+        # create console
+        # find all agents
+        # subscribe to changes to any object in package1/class1
+        # should succeed - verify 1 publish
+        # Change continuous property on each publish,
+        # should only see 1 publish per interval
+        self.notifier = _testNotifier()
+        self.console = qmf2.console.Console(notifier=self.notifier,
+                                              agent_timeout=3)
+        self.conn = qpid.messaging.Connection(self.broker.host,
+                                              self.broker.port,
+                                              self.broker.user,
+                                              self.broker.password)
+        self.conn.connect()
+        self.console.add_connection(self.conn)
+
+        subscriptions = []
+        index = 0
+
+        # query to match all objects in schema package1/class1
+        sid = SchemaClassId.create("package1", "class1")
+        t_params = {QmfData.KEY_SCHEMA_ID: sid}
+        query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT,
+                                         _target_params=t_params)
+        # find an agent
+        agent_app = self.agents[0]
+        aname = agent_app.agent.get_name()
+        agent = self.console.find_agent(aname, timeout=3)
+        self.assertTrue(agent and agent.get_name() == aname)
+
+        # setup subscription on agent
+
+        sp = self.console.create_subscription(agent,
+                                              query,
+                                              "some-handle")
+        self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams))
+        self.assertTrue(sp.succeeded())
+        self.assertTrue(sp.get_error() == None)
+        self.assertTrue(sp.get_duration() == 10)
+        self.assertTrue(sp.get_publish_interval() == 2)
+
+        # now wait for the (2 * interval) and count the updates
+        r_count = 0
+        sid = None
+        while self.notifier.wait_for_work(4):
+            wi = self.console.get_next_workitem(timeout=0)
+            while wi is not None:
+                r_count += 1
+                self.assertTrue(wi.get_type() == WorkItem.SUBSCRIBE_INDICATION)
+                self.assertTrue(wi.get_handle() == "some-handle")
+                if r_count == 1:
+                    # first indication - returns all matching objects
+                    reply = wi.get_params()
+                    self.assertTrue(isinstance(reply, type([])))
+                    self.assertTrue(len(reply) == 2)
+                    for obj in reply:
+                        self.assertTrue(isinstance(obj, QmfData))
+                        self.assertTrue(obj.get_object_id() == "p1c1_key2" or
+                                        obj.get_object_id() == "p1c1_key1")
+                        sid = obj.get_schema_class_id()
+                        self.assertTrue(isinstance(sid, SchemaClassId))
+                        self.assertTrue(sid.get_package_name() == "package1")
+                        self.assertTrue(sid.get_class_name() == "class1")
+
+                else:
+                    # verify publish of modified object only!
+                    reply = wi.get_params()
+                    self.assertTrue(isinstance(reply, type([])))
+                    self.assertTrue(len(reply) == 1)
+                    obj = reply[0]
+                    self.assertTrue(isinstance(obj, QmfData))
+                    self.assertTrue(obj.get_object_id() == "p1c1_key2")
+                    self.assertTrue(obj.get_value("count1") == r_count - 1)
+                    # fail test if we receive more than expected
+                    self.assertTrue(r_count < 10)
+
+
+                # now update one of the objects!
+                self.assertTrue(sid is not None)
+                test_obj = agent_app.agent.get_object("p1c1_key2", sid)
+                self.assertTrue(test_obj is not None)
+                test_obj.set_value("count1", r_count)
+
+                self.console.release_workitem(wi)
+
+                wi = self.console.get_next_workitem(timeout=0)
+
+        # expect at most 1 publish per interval seen
+        self.assertTrue(r_count < 10)
+
+        self.console.destroy(10)
+
+
+
+
+    def test_sync_periodic_publish_noncontinuous(self):
+        # create console, find agent
+        # subscribe to changes to any object in package1/class1
+        # should succeed - verify 1 publish
+        # Change noncontinuous property on each publish,
+        # should only see 1 publish per each update 
+        self.notifier = _testNotifier()
+        self.console = qmf2.console.Console(notifier=self.notifier,
+                                              agent_timeout=3)
+        self.conn = qpid.messaging.Connection(self.broker.host,
+                                              self.broker.port,
+                                              self.broker.user,
+                                              self.broker.password)
+        self.conn.connect()
+        self.console.add_connection(self.conn)
+
+        subscriptions = []
+        index = 0
+
+        # query to match all objects in schema package1/class1
+        sid = SchemaClassId.create("package1", "class1")
+        t_params = {QmfData.KEY_SCHEMA_ID: sid}
+        query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT,
+                                         _target_params=t_params)
+        # find an agent
+        agent_app = self.agents[0]
+        aname = agent_app.agent.get_name()
+        agent = self.console.find_agent(aname, timeout=3)
+        self.assertTrue(agent and agent.get_name() == aname)
+
+        # setup subscription on agent
+
+        sp = self.console.create_subscription(agent,
+                                              query,
+                                              "some-handle")
+        self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams))
+        self.assertTrue(sp.succeeded())
+        self.assertTrue(sp.get_error() == None)
+        self.assertTrue(sp.get_duration() == 10)
+        self.assertTrue(sp.get_publish_interval() == 2)
+
+        # now wait for the (2 * interval) and count the updates
+        r_count = 0
+        sid = None
+        while self.notifier.wait_for_work(4):
+            wi = self.console.get_next_workitem(timeout=0)
+            while wi is not None:
+                r_count += 1
+                self.assertTrue(wi.get_type() == WorkItem.SUBSCRIBE_INDICATION)
+                self.assertTrue(wi.get_handle() == "some-handle")
+                if r_count == 1:
+                    # first indication - returns all matching objects
+                    reply = wi.get_params()
+                    self.assertTrue(isinstance(reply, type([])))
+                    self.assertTrue(len(reply) == 2)
+                    for obj in reply:
+                        self.assertTrue(isinstance(obj, QmfData))
+                        self.assertTrue(obj.get_object_id() == "p1c1_key2" or
+                                        obj.get_object_id() == "p1c1_key1")
+                        sid = obj.get_schema_class_id()
+                        self.assertTrue(isinstance(sid, SchemaClassId))
+                        self.assertTrue(sid.get_package_name() == "package1")
+                        self.assertTrue(sid.get_class_name() == "class1")
+
+                else:
+                    # verify publish of modified object only!
+                    reply = wi.get_params()
+                    self.assertTrue(isinstance(reply, type([])))
+                    self.assertTrue(len(reply) == 1)
+                    obj = reply[0]
+                    self.assertTrue(isinstance(obj, QmfData))
+                    self.assertTrue(obj.get_object_id() == "p1c1_key2")
+                    self.assertTrue(obj.get_value("count2") == r_count - 1)
+                    # fail test if we receive more than expected
+                    self.assertTrue(r_count < 30)
+
+
+                # now update the noncontinuous field of one of the objects!
+                if r_count < 20:
+                    self.assertTrue(sid is not None)
+                    test_obj = agent_app.agent.get_object("p1c1_key2", sid)
+                    self.assertTrue(test_obj is not None)
+                    test_obj.set_value("count2", r_count)
+
+                self.console.release_workitem(wi)
+
+                wi = self.console.get_next_workitem(timeout=0)
+
+        # expect at least 1 publish per update
+        self.assertTrue(r_count > 10)
 
         self.console.destroy(10)



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org