You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2010/03/01 17:44:57 UTC
svn commit: r917584 - in /qpid/trunk/qpid/extras/qmf/src/py/qmf2: agent.py
common.py console.py tests/subscriptions.py
Author: kgiusti
Date: Mon Mar 1 16:44:56 2010
New Revision: 917584
URL: http://svn.apache.org/viewvc?rev=917584&view=rev
Log:
QPID-2261: object subscriptions, and tests
Modified:
qpid/trunk/qpid/extras/qmf/src/py/qmf2/agent.py
qpid/trunk/qpid/extras/qmf/src/py/qmf2/common.py
qpid/trunk/qpid/extras/qmf/src/py/qmf2/console.py
qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py
Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf2/agent.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf2/agent.py?rev=917584&r1=917583&r2=917584&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/qmf/src/py/qmf2/agent.py (original)
+++ qpid/trunk/qpid/extras/qmf/src/py/qmf2/agent.py Mon Mar 1 16:44:56 2010
@@ -83,37 +83,6 @@
## SUBSCRIPTIONS
##==============================================================================
-class _ConsoleHandle(object):
- """
- """
- def __init__(self, handle, reply_to):
- self.console_handle = handle
- self.reply_to = reply_to
-
-class SubscriptionParams(object):
- """
- """
- def __init__(self, console_handle, query, interval, duration, user_id):
- self._console_handle = console_handle
- self._query = query
- self._interval = interval
- self._duration = duration
- self._user_id = user_id
-
- def get_console_handle(self):
- return self._console_handle
-
- def get_query(self):
- return self._query
-
- def get_interval(self):
- return self._interval
-
- def get_duration(self):
- return self._duration
-
- def get_user_id(self):
- return self._user_id
class _SubscriptionState(object):
"""
@@ -128,6 +97,7 @@
now = datetime.datetime.utcnow()
self.next_update = now # do an immediate update
self.expiration = now + datetime.timedelta(seconds=duration)
+ self.last_update = None
self.id = 0
def resubscribe(self, now, _duration=None):
@@ -135,9 +105,9 @@
self.duration = _duration
self.expiration = now + datetime.timedelta(seconds=self.duration)
- def reset_interval(self, now):
+ def published(self, now):
self.next_update = now + datetime.timedelta(seconds=self.interval)
-
+ self.last_update = now
##==============================================================================
@@ -193,6 +163,9 @@
self._subscriptions = {}
self._next_subscribe_event = None
+ # prevents multiple _wake_thread() calls
+ self._noop_pending = False
+
def destroy(self, timeout=None):
"""
@@ -264,18 +237,7 @@
self._running = False
if self.isAlive():
# kick my thread to wake it up
- try:
- msg = Message(id=QMF_APP_ID,
- subject=self.name,
- properties={ "method":"request",
- "qmf.opcode":OpCode.noop},
- content={})
-
- # TRACE
- #logging.error("!!! sending wakeup to myself: %s" % msg)
- self._direct_sender.send( msg, sync=True )
- except SendError, e:
- logging.error(str(e))
+ self._wake_thread()
logging.debug("waiting for agent receiver thread to exit")
self.join(timeout)
if self.isAlive():
@@ -349,7 +311,6 @@
"""
Register an instance of a QmfAgentData object.
"""
- # @todo: need to update subscriptions
# @todo: need to mark schema as "non-const"
if not isinstance(data, QmfAgentData):
raise TypeError("QmfAgentData instance expected")
@@ -369,6 +330,18 @@
self._described_data[sid][oid] = data
else:
self._undescribed_data[oid] = data
+
+ # does the new object match any subscriptions?
+ now = datetime.datetime.utcnow()
+ for sid,sub in self._subscriptions.iteritems():
+ if sub.query.evaluate(data):
+ # matched. Mark the subscription as needing to be
+ # serviced. The _publish() method will notice the new
+ # object and will publish it next time it runs.
+ sub.next_update = now
+ self._next_subscribe_event = None
+ # @todo: should we immediately publish?
+
finally:
self._lock.release()
@@ -387,7 +360,7 @@
return data
- def method_response(self, handle, _out_args=None, _error=None):
+ def method_response(self, handle, _out_args=None, _error=None):
"""
"""
if not isinstance(handle, _MethodCallHandle):
@@ -489,50 +462,37 @@
logging.debug("Agent Indication Sent")
next_heartbeat = now + datetime.timedelta(seconds = self._heartbeat_interval)
-
-
#
# Monitor Subscriptions
#
- if (self._next_subscribe_event is None or
- now >= self._next_subscribe_event):
-
- logging.debug("%s polling subscriptions..." % self.name)
- self._next_subscribe_event = now + datetime.timedelta(seconds=
+ self._lock.acquire()
+ try:
+ now = datetime.datetime.utcnow()
+ if (self._next_subscribe_event is None or
+ now >= self._next_subscribe_event):
+ logging.debug("%s polling subscriptions..." % self.name)
+ self._next_subscribe_event = now + datetime.timedelta(seconds=
self._max_duration)
- self._lock.acquire()
- try:
- dead_ss = []
+ dead_ss = {}
for sid,ss in self._subscriptions.iteritems():
if now >= ss.expiration:
- dead_ss.append(sid)
+ dead_ss[sid] = ss
continue
if now >= ss.next_update:
- response = []
- objs = self._queryData(ss.query)
- if objs:
- for obj in objs:
- response.append(obj.map_encode())
- logging.debug("!!! %s publishing %s!!!" % (self.name, ss.correlation_id))
- self._send_query_response( ContentType.data,
- ss.correlation_id,
- ss.reply_to,
- response)
- ss.reset_interval(now)
-
+ self._publish(ss)
next_timeout = min(ss.expiration, ss.next_update)
if next_timeout < self._next_subscribe_event:
self._next_subscribe_event = next_timeout
- for sid in dead_ss:
+ for sid,ss in dead_ss.iteritems():
del self._subscriptions[sid]
- finally:
- self._lock.release()
+ self._unpublish(ss)
+ finally:
+ self._lock.release()
#
# notify application of pending WorkItems
#
-
if self._work_q_put and self._notifier:
logging.debug("%s notifying application..." % self.name)
# new stuff on work queue, kick the the application...
@@ -545,10 +505,22 @@
#
# Sleep until messages arrive or something times out
#
- next_timeout = min(next_heartbeat, self._next_subscribe_event)
- timeout = timedelta_to_secs(next_timeout -
- datetime.datetime.utcnow())
- if timeout > 0.0:
+ now = datetime.datetime.utcnow()
+ next_timeout = next_heartbeat
+ self._lock.acquire()
+ try:
+ # the mailbox expire flag may be cleared by the
+ # app thread(s) in order to force an immediate publish
+ if self._next_subscribe_event is None:
+ next_timeout = now
+ elif self._next_subscribe_event < next_timeout:
+ next_timeout = self._next_subscribe_event
+ finally:
+ self._lock.release()
+
+ timeout = timedelta_to_secs(next_timeout - now)
+
+ if self._running and timeout > 0.0:
logging.debug("%s sleeping %s seconds..." % (self.name,
timeout))
try:
@@ -557,7 +529,7 @@
pass
-
+ logging.debug("Shutting down Agent %s thread" % self.name)
#
# Private:
@@ -667,6 +639,7 @@
elif opcode == OpCode.subscribe_cancel_ind:
self._handleUnsubscribeReqMsg(msg, cmap, props, version, _direct)
elif opcode == OpCode.noop:
+ self._noop_pending = False
logging.debug("No-op msg received.")
else:
logging.warning("Ignoring message with unrecognized 'opcode' value: '%s'"
@@ -950,10 +923,13 @@
self._lock.acquire()
try:
if sid in self._subscriptions:
+ dead_sub = self._subscriptions[sid]
del self._subscriptions[sid]
finally:
self._lock.release()
+ self._unpublish(dead_sub)
+
def _queryPackagesReply(self, msg, query):
"""
@@ -1100,6 +1076,70 @@
return data_objs
+ def _publish(self, sub):
+ """ Publish a subscription.
+ """
+ response = []
+ now = datetime.datetime.utcnow()
+ objs = self._queryData(sub.query)
+ if objs:
+ for obj in objs:
+ if sub.id not in obj._subscriptions:
+ # new to subscription - publish it
+ obj._subscriptions[sub.id] = sub
+ response.append(obj.map_encode())
+ elif obj._dtime:
+ # obj._dtime is millisec since utc. Convert to datetime
+ utcdt = datetime.datetime.utcfromtimestamp(obj._dtime/1000.0)
+ if utcdt > sub.last_update:
+ response.append(obj.map_encode())
+ else:
+ # obj._utime is millisec since utc. Convert to datetime
+ utcdt = datetime.datetime.utcfromtimestamp(obj._utime/1000.0)
+ if utcdt > sub.last_update:
+ response.append(obj.map_encode())
+
+ if response:
+ logging.debug("!!! %s publishing %s!!!" % (self.name, sub.correlation_id))
+ self._send_query_response( ContentType.data,
+ sub.correlation_id,
+ sub.reply_to,
+ response)
+ sub.published(now)
+
+ def _unpublish(self, sub):
+ """ This subscription is about to be deleted, remove it from any
+ referencing objects.
+ """
+ objs = self._queryData(sub.query)
+ if objs:
+ for obj in objs:
+ if sub.id in obj._subscriptions:
+ del obj._subscriptions[sub.id]
+
+
+
+ def _wake_thread(self):
+ """
+ Make the agent management thread loop wakeup from its next_receiver
+ sleep.
+ """
+ self._lock.acquire()
+ try:
+ if not self._noop_pending:
+ logging.debug("Sending noop to wake up [%s]" % self._address)
+ msg = Message(id=QMF_APP_ID,
+ subject=self.name,
+ properties={"method":"indication",
+ "qmf.opcode":OpCode.noop},
+ content={})
+ try:
+ self._direct_sender.send( msg, sync=True )
+ self._noop_pending = True
+ except SendError, e:
+ logging.error(str(e))
+ finally:
+ self._lock.release()
##==============================================================================
@@ -1164,6 +1204,7 @@
self._agent = agent
self._validated = False
self._modified = True
+ self._subscriptions = {}
def destroy(self):
self._dtime = long(time.time() * 1000)
@@ -1175,7 +1216,8 @@
def set_value(self, _name, _value, _subType=None):
super(QmfAgentData, self).set_value(_name, _value, _subType)
- self._touch()
+ self._utime = long(time.time() * 1000)
+ self._touch(_name)
# @todo: publish change
def inc_value(self, name, delta=1):
@@ -1191,8 +1233,12 @@
def dec_value(self, name, delta=1):
""" subtract the delta from the property """
# @todo: need to take write-lock
- logging.error(" TBD!!!")
- self._touch()
+ val = self.get_value(name)
+ try:
+ val -= delta
+ except:
+ raise
+ self.set_value(name, val)
def validate(self):
"""
@@ -1212,12 +1258,32 @@
raise Exception("Required property '%s' not present." % name)
self._validated = True
- def _touch(self):
+ def _touch(self, field=None):
"""
Mark this object as modified. Used to force a publish of this object
if on subscription.
"""
- self._modified = True
+ now = datetime.datetime.utcnow()
+ publish = False
+ if field:
+ # if the named field is not continuous, mark any subscriptions as
+ # needing to be published.
+ sid = self.get_schema_class_id()
+ if sid:
+ self._agent._lock.acquire()
+ try:
+ schema = self._agent._schema.get(sid)
+ if schema:
+ prop = schema.get_property(field)
+ if prop and not prop.is_continuous():
+ for sid,sub in self._subscriptions.iteritems():
+ sub.next_update = now
+ publish = True
+ if publish:
+ self._agent._next_subscribe_event = None
+ self._agent._wake_thread()
+ finally:
+ self._agent._lock.release()
Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf2/common.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf2/common.py?rev=917584&r1=917583&r2=917584&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/qmf/src/py/qmf2/common.py (original)
+++ qpid/trunk/qpid/extras/qmf/src/py/qmf2/common.py Mon Mar 1 16:44:56 2010
@@ -1447,12 +1447,17 @@
map["unit"] = str, describes units used
map["min"] = int, minimum allowed value
map["max"] = int, maximun allowed value
- map["maxlen"] = int, if string type, this is the maximum length in bytes
+ map["maxlen"] = int, if string type, this is the maximum length in bytes
required to represent the longest instance of this string.
map["desc"] = str, human-readable description of this argument
map["reference"] = str, ???
map["parent_ref"] = bool, true if this property references an object in
which this object is in a child-parent relationship. Default False
+ map["continuous"] = bool, true if the value potentially changes too fast to
+ be directly monitorable. Example: fast changing statistic or random
+ number. Subscriptions to objects containing continuous data will publish
+ only on an interval basis, rather than every time the data changes. Default
+ False.
"""
__hash__ = None
_access_strings = ["RO","RW","RC"]
@@ -1479,6 +1484,7 @@
self._isParentRef = False
self._dir = None
self._default = None
+ self._is_continuous = False
for key, value in kwargs.items():
if key == "access":
@@ -1495,6 +1501,8 @@
elif key == "desc" : self._desc = value
elif key == "reference" : self._reference = value
elif key == "parent_ref" : self._isParentRef = _to_bool(value)
+ elif key == "parent_ref" : self._isParentRef = _to_bool(value)
+ elif key == "continuous" : self._is_continuous = _to_bool(value)
elif key == "dir":
value = str(value).upper()
if value not in self._dir_strings:
@@ -1503,7 +1511,7 @@
elif key == "default" : self._default = value
# constructor
- def _create(cls, type_code, kwargs={}):
+ def _create(cls, type_code, **kwargs):
return cls(_type_code=type_code, kwargs=kwargs)
create = classmethod(_create)
@@ -1538,6 +1546,8 @@
def get_default(self): return self._default
+ def is_continuous(self): return self._is_continuous
+
def map_encode(self):
"""
Return the map encoding of this schema.
@@ -1556,6 +1566,7 @@
_map["parent_ref"] = self._isParentRef
if self._dir: _map["dir"] = self._dir
if self._default: _map["default"] = self._default
+ if self._is_continuous: _map["continuous"] = self._is_continuous
return _map
def __repr__(self):
@@ -1568,6 +1579,7 @@
hasher.update(str(self._type))
hasher.update(str(self._isIndex))
hasher.update(str(self._isOptional))
+ hasher.update(str(self._is_continuous))
if self._access: hasher.update(self._access)
if self._unit: hasher.update(self._unit)
if self._desc: hasher.update(self._desc)
@@ -1575,7 +1587,6 @@
if self._default: hasher.update(self._default)
-
class SchemaMethod(_mapEncoder):
"""
The SchemaMethod class describes the method's structure, and contains a
Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf2/console.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf2/console.py?rev=917584&r1=917583&r2=917584&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/qmf/src/py/qmf2/console.py (original)
+++ qpid/trunk/qpid/extras/qmf/src/py/qmf2/console.py Mon Mar 1 16:44:56 2010
@@ -1456,7 +1456,7 @@
logging.debug("Sending noop to wake up [%s]" % self._address)
msg = Message(id=QMF_APP_ID,
subject=self._name,
- properties={"method":"request",
+ properties={"method":"indication",
"qmf.opcode":OpCode.noop},
content={})
try:
@@ -1510,30 +1510,31 @@
self._notifier.indication()
_callback_thread = None
- if self._operational:
- # wait for a message to arrive, or an agent
- # to expire, or a mailbox requrest to time out
- now = datetime.datetime.utcnow()
- next_expire = self._next_agent_expire
- # the mailbox expire flag may be cleared by the
- # app thread(s)
- self._lock.acquire()
- try:
- if (self._next_mbox_expire and
- self._next_mbox_expire < next_expire):
- next_expire = self._next_mbox_expire
- finally:
- self._lock.release()
+ # wait for a message to arrive, or an agent
+ # to expire, or a mailbox requrest to time out
+ now = datetime.datetime.utcnow()
+ next_expire = self._next_agent_expire
+
+ self._lock.acquire()
+ try:
+ # the mailbox expire flag may be cleared by the
+ # app thread(s) to force an immedate mailbox scan
+ if self._next_mbox_expire is None:
+ next_expire = now
+ elif self._next_mbox_expire < next_expire:
+ next_expire = self._next_mbox_expire
+ finally:
+ self._lock.release()
- if next_expire > now:
- timeout = timedelta_to_secs(next_expire - now)
- try:
- logging.debug("waiting for next rcvr (timeout=%s)..." % timeout)
- xxx = self._session.next_receiver(timeout = timeout)
- except Empty:
- pass
+ timeout = timedelta_to_secs(next_expire - now)
+ if self._operational and timeout > 0.0:
+ try:
+ logging.debug("waiting for next rcvr (timeout=%s)..." % timeout)
+ self._session.next_receiver(timeout = timeout)
+ except Empty:
+ pass
logging.debug("Shutting down Console thread")
@@ -1742,8 +1743,8 @@
mbox = self._get_mailbox(msg.correlation_id)
if not mbox:
- logging.debug("Response msg received with unknown correlation_id"
- " msg='%s'" % str(msg))
+ logging.warning("Response msg received with unknown correlation_id"
+ " msg='%s'" % str(msg))
return
# wake up all waiters
Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py?rev=917584&r1=917583&r2=917584&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py (original)
+++ qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py Mon Mar 1 16:44:56 2010
@@ -75,7 +75,13 @@
_object_id_names=["key"] )
_schema.add_property( "key", SchemaProperty(qmfTypes.TYPE_LSTR))
- _schema.add_property( "count1", SchemaProperty(qmfTypes.TYPE_UINT32))
+
+ # note: count1 is continuous, count2 is not
+ count1_prop = SchemaProperty.create(qmfTypes.TYPE_UINT32,
+ continuous=True)
+ _schema.add_property( "count1", count1_prop)
+ count2_prop = SchemaProperty.create(qmfTypes.TYPE_UINT32,
+ continuous=False)
_schema.add_property( "count2", SchemaProperty(qmfTypes.TYPE_UINT32))
self.agent.register_object_class(_schema)
@@ -224,7 +230,7 @@
# create console
# find all agents
# subscribe to changes to any object in package1/class1
- # should succeed
+ # should succeed - verify 1 publish
self.notifier = _testNotifier()
self.console = qmf2.console.Console(notifier=self.notifier,
agent_timeout=3)
@@ -288,10 +294,10 @@
wi = self.console.get_next_workitem(timeout=0)
- # for now, I expect 5 publish per subscription
- self.assertTrue(r_count == 5 * len(subscriptions))
+ # expect 1 publish per subscription
+ self.assertTrue(r_count == 5)
for ii in range(len(subscriptions)):
- self.assertTrue(subscriptions[ii][1] == 5)
+ self.assertTrue(subscriptions[ii][1] == 1)
self.console.destroy(10)
@@ -349,21 +355,17 @@
self.assertTrue(len(reply) == 1)
self.assertTrue(isinstance(reply[0], QmfData))
self.assertTrue(reply[0].get_object_id() == "undesc-2")
- # print("!!! get_params() = %s" % wi.get_params())
self.assertTrue(wi.get_handle() < len(subscriptions))
subscriptions[wi.get_handle()][1] += 1
- # self.assertTrue(isinstance(reply, qmf2.console.MethodResult))
- # self.assertTrue(reply.succeeded())
- # self.assertTrue(reply.get_argument("cookie") ==
- # wi.get_handle())
+
self.console.release_workitem(wi)
wi = self.console.get_next_workitem(timeout=0)
- # for now, I expect 5 publish per subscription
- self.assertTrue(r_count == 5 * len(subscriptions))
- #for ii in range(len(subscriptions)):
- # self.assertTrue(subscriptions[ii][1] == 5)
+ # expect 1 publish per subscription
+ self.assertTrue(r_count == 5)
+ for ii in range(len(subscriptions)):
+ self.assertTrue(subscriptions[ii][1] == 1)
self.console.destroy(10)
@@ -426,18 +428,15 @@
self.assertTrue(sid.get_class_name() == "class1")
self.assertTrue(wi.get_handle() < len(subscriptions))
subscriptions[wi.get_handle()][1] += 1
- # self.assertTrue(isinstance(reply, qmf2.console.MethodResult))
- # self.assertTrue(reply.succeeded())
- # self.assertTrue(reply.get_argument("cookie") ==
- # wi.get_handle())
+
self.console.release_workitem(wi)
wi = self.console.get_next_workitem(timeout=0)
- # for now, I expect 5 publish per subscription
- self.assertTrue(r_count == 5 * len(subscriptions))
- #for ii in range(len(subscriptions)):
- # self.assertTrue(subscriptions[ii][1] == 5)
+ # expect 1 publish per subscription
+ self.assertTrue(r_count == 5)
+ for ii in range(len(subscriptions)):
+ self.assertTrue(subscriptions[ii][1] == 1)
self.console.destroy(10)
@@ -459,9 +458,9 @@
self.conn.connect()
self.console.add_connection(self.conn)
- # query to match object "p2c1_key2" in schema package2/class1
- sid = SchemaClassId.create("package2", "class1")
- query = QmfQuery.create_id_object("p2c1_key2", sid)
+ # query to match object "p1c1_key2" in schema package1/class1
+ sid = SchemaClassId.create("package1", "class1")
+ query = QmfQuery.create_id_object("p1c1_key2", sid)
agent_app = self.agents[0]
aname = agent_app.agent.get_name()
@@ -489,13 +488,20 @@
self.assertTrue(isinstance(reply, type([])))
self.assertTrue(len(reply) == 1)
self.assertTrue(isinstance(reply[0], QmfData))
- self.assertTrue(reply[0].get_object_id() == "p2c1_key2")
+ self.assertTrue(reply[0].get_object_id() == "p1c1_key2")
sid = reply[0].get_schema_class_id()
self.assertTrue(isinstance(sid, SchemaClassId))
- self.assertTrue(sid.get_package_name() == "package2")
+ self.assertTrue(sid.get_package_name() == "package1")
self.assertTrue(sid.get_class_name() == "class1")
self.assertTrue(wi.get_handle() == "my-handle")
+ # count1 is continuous, touching it will force a
+ # publish on the interval
+ self.assertTrue(sid is not None)
+ test_obj = agent_app.agent.get_object("p1c1_key2", sid)
+ self.assertTrue(test_obj is not None)
+ test_obj.set_value("count1", r_count)
+
self.console.release_workitem(wi)
if r_count == 3:
@@ -504,11 +510,8 @@
wi = self.console.get_next_workitem(timeout=0)
- # for now, I expect 5 publish per subscription
+ # expect 5 publish per subscription, more if refreshed
self.assertTrue(r_count > 5)
- # print("!!! total r_count=%d" % r_count)
- #for ii in range(len(subscriptions)):
- # self.assertTrue(subscriptions[ii][1] == 5)
self.console.destroy(10)
@@ -530,9 +533,9 @@
self.conn.connect()
self.console.add_connection(self.conn)
- # query to match object "p2c1_key2" in schema package2/class1
- sid = SchemaClassId.create("package2", "class1")
- query = QmfQuery.create_id_object("p2c1_key2", sid)
+ # query to match object "p1c1_key2" in schema package1/class1
+ sid = SchemaClassId.create("package1", "class1")
+ query = QmfQuery.create_id_object("p1c1_key2", sid)
agent_app = self.agents[0]
aname = agent_app.agent.get_name()
@@ -560,13 +563,20 @@
self.assertTrue(isinstance(reply, type([])))
self.assertTrue(len(reply) == 1)
self.assertTrue(isinstance(reply[0], QmfData))
- self.assertTrue(reply[0].get_object_id() == "p2c1_key2")
+ self.assertTrue(reply[0].get_object_id() == "p1c1_key2")
sid = reply[0].get_schema_class_id()
self.assertTrue(isinstance(sid, SchemaClassId))
- self.assertTrue(sid.get_package_name() == "package2")
+ self.assertTrue(sid.get_package_name() == "package1")
self.assertTrue(sid.get_class_name() == "class1")
self.assertTrue(wi.get_handle() == "my-handle")
+ # count1 is continuous, touching it will force a
+ # publish on the interval
+ self.assertTrue(sid is not None)
+ test_obj = agent_app.agent.get_object("p1c1_key2", sid)
+ self.assertTrue(test_obj is not None)
+ test_obj.set_value("count1", r_count)
+
self.console.release_workitem(wi)
if r_count == 3:
@@ -574,10 +584,8 @@
wi = self.console.get_next_workitem(timeout=0)
- # for now, I expect 5 publish per subscription full duration
- self.assertTrue(r_count < 5)
- #for ii in range(len(subscriptions)):
- # self.assertTrue(subscriptions[ii][1] == 5)
+ # expect only 3 publish received before cancel
+ self.assertTrue(r_count == 3)
self.console.destroy(10)
@@ -645,8 +653,8 @@
wi = self.console.get_next_workitem(timeout=0)
- # for now, I expect 5 publish per subscription
- self.assertTrue(r_count == 6)
+ # one response + one publish = 2
+ self.assertTrue(r_count == 2)
self.console.destroy(10)
@@ -665,9 +673,9 @@
self.conn.connect()
self.console.add_connection(self.conn)
- # query to match object "p2c1_key2" in schema package2/class1
- sid = SchemaClassId.create("package2", "class1")
- query = QmfQuery.create_id_object("p2c1_key2", sid)
+ # query to match object "p1c1_key2" in schema package1/class1
+ sid = SchemaClassId.create("package1", "class1")
+ query = QmfQuery.create_id_object("p1c1_key2", sid)
agent_app = self.agents[0]
aname = agent_app.agent.get_name()
@@ -685,6 +693,7 @@
# refresh after three subscribe indications, count all
# indications to verify refresh worked
r_count = 0
+ i_count = 0
sp = None
rp = None
while self.notifier.wait_for_work(4):
@@ -706,20 +715,28 @@
else:
self.assertTrue(wi.get_type() ==
WorkItem.SUBSCRIBE_INDICATION)
+ i_count += 1
# sp better be set up by now!
self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams))
reply = wi.get_params()
self.assertTrue(isinstance(reply, type([])))
self.assertTrue(len(reply) == 1)
self.assertTrue(isinstance(reply[0], QmfData))
- self.assertTrue(reply[0].get_object_id() == "p2c1_key2")
+ self.assertTrue(reply[0].get_object_id() == "p1c1_key2")
sid = reply[0].get_schema_class_id()
self.assertTrue(isinstance(sid, SchemaClassId))
- self.assertTrue(sid.get_package_name() == "package2")
+ self.assertTrue(sid.get_package_name() == "package1")
self.assertTrue(sid.get_class_name() == "class1")
self.assertTrue(wi.get_handle() == "my-handle")
- if r_count == 4: # + 1 for subscribe reply
+ # count1 is continuous, touching it will force a
+ # publish on the interval
+ self.assertTrue(sid is not None)
+ test_obj = agent_app.agent.get_object("p1c1_key2", sid)
+ self.assertTrue(test_obj is not None)
+ test_obj.set_value("count1", r_count)
+
+ if r_count == 4: # 3 data + 1 subscribe reply
rp = self.console.refresh_subscription(sp.get_subscription_id())
self.assertTrue(rp)
@@ -727,8 +744,9 @@
wi = self.console.get_next_workitem(timeout=0)
- # for now, I expect 5 publish per subscription, + 2 replys
- self.assertTrue(r_count > 7)
+ # expect 5 publish per subscription, more if refreshed
+ self.assertTrue(sp is not None and rp is not None)
+ self.assertTrue(i_count > 5)
self.console.destroy(10)
@@ -748,9 +766,9 @@
self.conn.connect()
self.console.add_connection(self.conn)
- # query to match object "p2c1_key2" in schema package2/class1
- sid = SchemaClassId.create("package2", "class1")
- query = QmfQuery.create_id_object("p2c1_key2", sid)
+ # query to match object "p1c1_key2" in schema package1/class1
+ sid = SchemaClassId.create("package1", "class1")
+ query = QmfQuery.create_id_object("p1c1_key2", sid)
agent_app = self.agents[0]
aname = agent_app.agent.get_name()
@@ -765,8 +783,6 @@
_blocking=False)
self.assertTrue(rc)
- # refresh after three subscribe indications, count all
- # indications to verify refresh worked
r_count = 0
sp = None
rp = None
@@ -789,20 +805,220 @@
self.assertTrue(isinstance(reply, type([])))
self.assertTrue(len(reply) == 1)
self.assertTrue(isinstance(reply[0], QmfData))
- self.assertTrue(reply[0].get_object_id() == "p2c1_key2")
+ self.assertTrue(reply[0].get_object_id() == "p1c1_key2")
sid = reply[0].get_schema_class_id()
self.assertTrue(isinstance(sid, SchemaClassId))
- self.assertTrue(sid.get_package_name() == "package2")
+ self.assertTrue(sid.get_package_name() == "package1")
self.assertTrue(sid.get_class_name() == "class1")
self.assertTrue(wi.get_handle() == "my-handle")
+ # count1 is continuous, touching it will force a
+ # publish on the interval
+ self.assertTrue(sid is not None)
+ test_obj = agent_app.agent.get_object("p1c1_key2", sid)
+ self.assertTrue(test_obj is not None)
+ test_obj.set_value("count1", r_count)
+
+ if r_count == 3:
self.console.cancel_subscription(sp.get_subscription_id())
self.console.release_workitem(wi)
wi = self.console.get_next_workitem(timeout=0)
- # for now, I expect 1 subscribe reply and 1 data_indication
- self.assertTrue(r_count == 2)
+ # expect cancel after 3 replies
+ self.assertTrue(r_count == 3)
+
+ self.console.destroy(10)
+
+
+
+
+ def test_sync_periodic_publish_continuous(self):
+ # create console
+ # find all agents
+ # subscribe to changes to any object in package1/class1
+ # should succeed - verify 1 publish
+ # Change continuous property on each publish,
+ # should only see 1 publish per interval
+ self.notifier = _testNotifier()
+ self.console = qmf2.console.Console(notifier=self.notifier,
+ agent_timeout=3)
+ self.conn = qpid.messaging.Connection(self.broker.host,
+ self.broker.port,
+ self.broker.user,
+ self.broker.password)
+ self.conn.connect()
+ self.console.add_connection(self.conn)
+
+ subscriptions = []
+ index = 0
+
+ # query to match all objects in schema package1/class1
+ sid = SchemaClassId.create("package1", "class1")
+ t_params = {QmfData.KEY_SCHEMA_ID: sid}
+ query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT,
+ _target_params=t_params)
+ # find an agent
+ agent_app = self.agents[0]
+ aname = agent_app.agent.get_name()
+ agent = self.console.find_agent(aname, timeout=3)
+ self.assertTrue(agent and agent.get_name() == aname)
+
+ # setup subscription on agent
+
+ sp = self.console.create_subscription(agent,
+ query,
+ "some-handle")
+ self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams))
+ self.assertTrue(sp.succeeded())
+ self.assertTrue(sp.get_error() == None)
+ self.assertTrue(sp.get_duration() == 10)
+ self.assertTrue(sp.get_publish_interval() == 2)
+
+ # now wait for the (2 * interval) and count the updates
+ r_count = 0
+ sid = None
+ while self.notifier.wait_for_work(4):
+ wi = self.console.get_next_workitem(timeout=0)
+ while wi is not None:
+ r_count += 1
+ self.assertTrue(wi.get_type() == WorkItem.SUBSCRIBE_INDICATION)
+ self.assertTrue(wi.get_handle() == "some-handle")
+ if r_count == 1:
+ # first indication - returns all matching objects
+ reply = wi.get_params()
+ self.assertTrue(isinstance(reply, type([])))
+ self.assertTrue(len(reply) == 2)
+ for obj in reply:
+ self.assertTrue(isinstance(obj, QmfData))
+ self.assertTrue(obj.get_object_id() == "p1c1_key2" or
+ obj.get_object_id() == "p1c1_key1")
+ sid = obj.get_schema_class_id()
+ self.assertTrue(isinstance(sid, SchemaClassId))
+ self.assertTrue(sid.get_package_name() == "package1")
+ self.assertTrue(sid.get_class_name() == "class1")
+
+ else:
+ # verify publish of modified object only!
+ reply = wi.get_params()
+ self.assertTrue(isinstance(reply, type([])))
+ self.assertTrue(len(reply) == 1)
+ obj = reply[0]
+ self.assertTrue(isinstance(obj, QmfData))
+ self.assertTrue(obj.get_object_id() == "p1c1_key2")
+ self.assertTrue(obj.get_value("count1") == r_count - 1)
+ # fail test if we receive more than expected
+ self.assertTrue(r_count < 10)
+
+
+ # now update one of the objects!
+ self.assertTrue(sid is not None)
+ test_obj = agent_app.agent.get_object("p1c1_key2", sid)
+ self.assertTrue(test_obj is not None)
+ test_obj.set_value("count1", r_count)
+
+ self.console.release_workitem(wi)
+
+ wi = self.console.get_next_workitem(timeout=0)
+
+ # expect at most 1 publish per interval seen
+ self.assertTrue(r_count < 10)
+
+ self.console.destroy(10)
+
+
+
+
+ def test_sync_periodic_publish_noncontinuous(self):
+ # create console, find agent
+ # subscribe to changes to any object in package1/class1
+ # should succeed - verify 1 publish
+ # Change noncontinuous property on each publish,
+ # should only see 1 publish per each update
+ self.notifier = _testNotifier()
+ self.console = qmf2.console.Console(notifier=self.notifier,
+ agent_timeout=3)
+ self.conn = qpid.messaging.Connection(self.broker.host,
+ self.broker.port,
+ self.broker.user,
+ self.broker.password)
+ self.conn.connect()
+ self.console.add_connection(self.conn)
+
+ subscriptions = []
+ index = 0
+
+ # query to match all objects in schema package1/class1
+ sid = SchemaClassId.create("package1", "class1")
+ t_params = {QmfData.KEY_SCHEMA_ID: sid}
+ query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT,
+ _target_params=t_params)
+ # find an agent
+ agent_app = self.agents[0]
+ aname = agent_app.agent.get_name()
+ agent = self.console.find_agent(aname, timeout=3)
+ self.assertTrue(agent and agent.get_name() == aname)
+
+ # setup subscription on agent
+
+ sp = self.console.create_subscription(agent,
+ query,
+ "some-handle")
+ self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams))
+ self.assertTrue(sp.succeeded())
+ self.assertTrue(sp.get_error() == None)
+ self.assertTrue(sp.get_duration() == 10)
+ self.assertTrue(sp.get_publish_interval() == 2)
+
+ # now wait for the (2 * interval) and count the updates
+ r_count = 0
+ sid = None
+ while self.notifier.wait_for_work(4):
+ wi = self.console.get_next_workitem(timeout=0)
+ while wi is not None:
+ r_count += 1
+ self.assertTrue(wi.get_type() == WorkItem.SUBSCRIBE_INDICATION)
+ self.assertTrue(wi.get_handle() == "some-handle")
+ if r_count == 1:
+ # first indication - returns all matching objects
+ reply = wi.get_params()
+ self.assertTrue(isinstance(reply, type([])))
+ self.assertTrue(len(reply) == 2)
+ for obj in reply:
+ self.assertTrue(isinstance(obj, QmfData))
+ self.assertTrue(obj.get_object_id() == "p1c1_key2" or
+ obj.get_object_id() == "p1c1_key1")
+ sid = obj.get_schema_class_id()
+ self.assertTrue(isinstance(sid, SchemaClassId))
+ self.assertTrue(sid.get_package_name() == "package1")
+ self.assertTrue(sid.get_class_name() == "class1")
+
+ else:
+ # verify publish of modified object only!
+ reply = wi.get_params()
+ self.assertTrue(isinstance(reply, type([])))
+ self.assertTrue(len(reply) == 1)
+ obj = reply[0]
+ self.assertTrue(isinstance(obj, QmfData))
+ self.assertTrue(obj.get_object_id() == "p1c1_key2")
+ self.assertTrue(obj.get_value("count2") == r_count - 1)
+ # fail test if we receive more than expected
+ self.assertTrue(r_count < 30)
+
+
+ # now update the noncontinuous field of one of the objects!
+ if r_count < 20:
+ self.assertTrue(sid is not None)
+ test_obj = agent_app.agent.get_object("p1c1_key2", sid)
+ self.assertTrue(test_obj is not None)
+ test_obj.set_value("count2", r_count)
+
+ self.console.release_workitem(wi)
+
+ wi = self.console.get_next_workitem(timeout=0)
+
+ # expect at least 1 publish per update
+ self.assertTrue(r_count > 10)
self.console.destroy(10)
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org