You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2010/02/24 22:05:03 UTC
svn commit: r915980 - in /qpid/trunk/qpid/extras/qmf/src/py/qmf2: agent.py
console.py tests/subscriptions.py
Author: kgiusti
Date: Wed Feb 24 21:05:03 2010
New Revision: 915980
URL: http://svn.apache.org/viewvc?rev=915980&view=rev
Log:
QPID-2261: test subscription refresh and cancel
Modified:
qpid/trunk/qpid/extras/qmf/src/py/qmf2/agent.py
qpid/trunk/qpid/extras/qmf/src/py/qmf2/console.py
qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py
Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf2/agent.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf2/agent.py?rev=915980&r1=915979&r2=915980&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/qmf/src/py/qmf2/agent.py (original)
+++ qpid/trunk/qpid/extras/qmf/src/py/qmf2/agent.py Wed Feb 24 21:05:03 2010
@@ -894,7 +894,7 @@
if props.get("method") == "request":
sid = cmap.get("_subscription_id")
if not sid:
- logging.debug("Invalid subscription refresh msg: %s" %
+ logging.error("Invalid subscription refresh msg: %s" %
str(msg))
return
@@ -902,7 +902,7 @@
try:
ss = self._subscriptions.get(sid)
if not ss:
- logging.debug("Ignoring unknown subscription: %s" %
+ logging.error("Ignoring unknown subscription: %s" %
str(sid))
return
duration = cmap.get("_duration")
@@ -914,15 +914,29 @@
elif duration < self._min_duration:
duration = self._min_duration
except:
- logging.debug("Bad duration value: %s" % str(msg))
+ logging.error("Bad duration value: %s" % str(msg))
duration = None # use existing duration
ss.resubscribe(datetime.datetime.utcnow(), duration)
+ new_duration = ss.duration
+ new_interval = ss.interval
+
finally:
self._lock.release()
+ sr_map = {"_subscription_id": sid,
+ "_interval": new_interval,
+ "_duration": new_duration}
+ m = Message(id=QMF_APP_ID,
+ properties={"method":"response",
+ "qmf.opcode":OpCode.subscribe_refresh_rsp},
+ correlation_id = msg.correlation_id,
+ content=sr_map)
+ self._send_reply(m, msg.reply_to)
+
+
def _handleUnsubscribeReqMsg(self, msg, cmap, props, version, _direct):
"""
Process received Cancel Subscription Request
Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf2/console.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf2/console.py?rev=915980&r1=915979&r2=915980&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/qmf/src/py/qmf2/console.py (original)
+++ qpid/trunk/qpid/extras/qmf/src/py/qmf2/console.py Wed Feb 24 21:05:03 2010
@@ -1375,7 +1375,7 @@
"""
mbox = self._get_mailbox(subscription_id)
if not mbox:
- return None
+ return
agent = self.get_agent(mbox.agent_name)
if agent:
@@ -1603,6 +1603,8 @@
self._handle_response_msg(msg, cmap, version, _direct)
elif opcode == OpCode.subscribe_rsp:
self._handle_response_msg(msg, cmap, version, _direct)
+ elif opcode == OpCode.subscribe_refresh_rsp:
+ self._handle_response_msg(msg, cmap, version, _direct)
elif opcode == OpCode.method_rsp:
self._handle_response_msg(msg, cmap, version, _direct)
elif opcode == OpCode.data_ind:
Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py?rev=915980&r1=915979&r2=915980&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py (original)
+++ qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py Wed Feb 24 21:05:03 2010
@@ -262,9 +262,9 @@
subscriptions.append([sp, 0])
index += 1
- # now wait for the duration + interval + 1 and count the updates
+ # now wait for the (2 * interval) and count the updates
r_count = 0
- while self.notifier.wait_for_work(10 + 2 + 1):
+ while self.notifier.wait_for_work(4):
wi = self.console.get_next_workitem(timeout=0)
while wi is not None:
r_count += 1
@@ -336,10 +336,10 @@
subscriptions.append([sp, 0])
index += 1
- # now wait for all subscriptions to expire (duration + interval + 1 for
- # luck)
+ # now wait for all subscriptions to expire (2x interval w/o
+ # indications)
r_count = 0
- while self.notifier.wait_for_work(10 + 2 + 1):
+ while self.notifier.wait_for_work(4):
wi = self.console.get_next_workitem(timeout=0)
while wi is not None:
r_count += 1
@@ -407,10 +407,10 @@
subscriptions.append([sp, 0])
index += 1
- # now wait for all subscriptions to expire (duration + interval + 1 for
- # luck)
+ # now wait for all subscriptions to expire (2x interval w/o
+ # indications)
r_count = 0
- while self.notifier.wait_for_work(10 + 2 + 1):
+ while self.notifier.wait_for_work(4):
wi = self.console.get_next_workitem(timeout=0)
while wi is not None:
r_count += 1
@@ -439,7 +439,145 @@
#for ii in range(len(subscriptions)):
# self.assertTrue(subscriptions[ii][1] == 5)
+ self.console.destroy(10)
+
+
+
+ def test_sync_refresh(self):
+ # create console
+ # find one agent
+ # subscribe to changes to any object in package1/class1
+ # after 3 data indications, refresh
+ # verify > 5 more data indications received
+ self.notifier = _testNotifier()
+ self.console = qmf2.console.Console(notifier=self.notifier,
+ agent_timeout=3)
+ self.conn = qpid.messaging.Connection(self.broker.host,
+ self.broker.port,
+ self.broker.user,
+ self.broker.password)
+ self.conn.connect()
+ self.console.add_connection(self.conn)
+
+ # query to match object "p2c1_key2" in schema package2/class1
+ sid = SchemaClassId.create("package2", "class1")
+ query = QmfQuery.create_id_object("p2c1_key2", sid)
+
+ agent_app = self.agents[0]
+ aname = agent_app.agent.get_name()
+ agent = self.console.find_agent(aname, timeout=3)
+ self.assertTrue(agent and agent.get_name() == aname)
+
+ # setup subscription on agent
+
+ sp = self.console.create_subscription(agent,
+ query,
+ "my-handle")
+ self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams))
+ self.assertTrue(sp.succeeded())
+ self.assertTrue(sp.get_error() == None)
+
+ # refresh after three subscribe indications, count all
+ # indications to verify refresh worked
+ r_count = 0
+ while self.notifier.wait_for_work(4):
+ wi = self.console.get_next_workitem(timeout=0)
+ while wi is not None:
+ r_count += 1
+ self.assertTrue(wi.get_type() == WorkItem.SUBSCRIBE_INDICATION)
+ reply = wi.get_params()
+ self.assertTrue(isinstance(reply, type([])))
+ self.assertTrue(len(reply) == 1)
+ self.assertTrue(isinstance(reply[0], QmfData))
+ self.assertTrue(reply[0].get_object_id() == "p2c1_key2")
+ sid = reply[0].get_schema_class_id()
+ self.assertTrue(isinstance(sid, SchemaClassId))
+ self.assertTrue(sid.get_package_name() == "package2")
+ self.assertTrue(sid.get_class_name() == "class1")
+ self.assertTrue(wi.get_handle() == "my-handle")
+
+ self.console.release_workitem(wi)
+
+ if r_count == 3:
+ rp = self.console.refresh_subscription(sp.get_subscription_id())
+ self.assertTrue(isinstance(rp, qmf2.console.SubscribeParams))
+
+ wi = self.console.get_next_workitem(timeout=0)
+
+ # for now, I expect 5 publish per subscription
+ self.assertTrue(r_count > 5)
+ # print("!!! total r_count=%d" % r_count)
+ #for ii in range(len(subscriptions)):
+ # self.assertTrue(subscriptions[ii][1] == 5)
+ self.console.destroy(10)
+
+
+
+ def test_sync_cancel(self):
+ # create console
+ # find one agent
+ # subscribe to changes to any object in package1/class1
+ # after 2 data indications, cancel subscription
+ # verify < 5 data indications received
+ self.notifier = _testNotifier()
+ self.console = qmf2.console.Console(notifier=self.notifier,
+ agent_timeout=3)
+ self.conn = qpid.messaging.Connection(self.broker.host,
+ self.broker.port,
+ self.broker.user,
+ self.broker.password)
+ self.conn.connect()
+ self.console.add_connection(self.conn)
+
+ # query to match object "p2c1_key2" in schema package2/class1
+ sid = SchemaClassId.create("package2", "class1")
+ query = QmfQuery.create_id_object("p2c1_key2", sid)
+
+ agent_app = self.agents[0]
+ aname = agent_app.agent.get_name()
+ agent = self.console.find_agent(aname, timeout=3)
+ self.assertTrue(agent and agent.get_name() == aname)
+
+ # setup subscription on agent
+
+ sp = self.console.create_subscription(agent,
+ query,
+ "my-handle")
+ self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams))
+ self.assertTrue(sp.succeeded())
+ self.assertTrue(sp.get_error() == None)
+
+ # refresh after three subscribe indications, count all
+ # indications to verify refresh worked
+ r_count = 0
+ while self.notifier.wait_for_work(4):
+ wi = self.console.get_next_workitem(timeout=0)
+ while wi is not None:
+ r_count += 1
+ self.assertTrue(wi.get_type() == WorkItem.SUBSCRIBE_INDICATION)
+ reply = wi.get_params()
+ self.assertTrue(isinstance(reply, type([])))
+ self.assertTrue(len(reply) == 1)
+ self.assertTrue(isinstance(reply[0], QmfData))
+ self.assertTrue(reply[0].get_object_id() == "p2c1_key2")
+ sid = reply[0].get_schema_class_id()
+ self.assertTrue(isinstance(sid, SchemaClassId))
+ self.assertTrue(sid.get_package_name() == "package2")
+ self.assertTrue(sid.get_class_name() == "class1")
+ self.assertTrue(wi.get_handle() == "my-handle")
+
+ self.console.release_workitem(wi)
+
+ if r_count == 3:
+ self.console.cancel_subscription(sp.get_subscription_id())
+
+ wi = self.console.get_next_workitem(timeout=0)
+
+ # for now, I expect 5 publish per subscription full duration
+ self.assertTrue(r_count < 5)
+ #for ii in range(len(subscriptions)):
+ # self.assertTrue(subscriptions[ii][1] == 5)
self.console.destroy(10)
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org