You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2010/02/25 22:16:49 UTC
svn commit: r916462 - in /qpid/trunk/qpid/extras/qmf/src/py/qmf2: console.py
tests/subscriptions.py
Author: kgiusti
Date: Thu Feb 25 21:16:47 2010
New Revision: 916462
URL: http://svn.apache.org/viewvc?rev=916462&view=rev
Log:
QPID-2261: add console async subscription api, and tests
Modified:
qpid/trunk/qpid/extras/qmf/src/py/qmf2/console.py
qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py
Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf2/console.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf2/console.py?rev=916462&r1=916461&r2=916462&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/qmf/src/py/qmf2/console.py (original)
+++ qpid/trunk/qpid/extras/qmf/src/py/qmf2/console.py Thu Feb 25 21:16:47 2010
@@ -151,6 +151,24 @@
console._wake_thread()
+ def reset_timeout(self, _timeout=None):
+ """ Reset the expiration date for this mailbox.
+ """
+ if _timeout is None:
+ _timeout = self.console._reply_timeout
+ self.console._lock.acquire()
+ try:
+ self.expiration_date = (datetime.datetime.utcnow() +
+ datetime.timedelta(seconds=_timeout))
+ self.console._next_mbox_expire = None
+ finally:
+ self.console._lock.release()
+
+ # wake the console mgmt thread so it will learn about the mbox
+ # expiration date (and adjust its idle sleep period correctly)
+
+ self.console._wake_thread()
+
def deliver(self, msg):
"""
"""
@@ -353,32 +371,81 @@
class _SubscriptionMailbox(_AsyncMailbox):
"""
- A Mailbox for a single subscription.
+ A Mailbox for a single subscription. Allows only sychronous "subscribe"
+ and "refresh" requests.
"""
- def __init__(self, console, lifetime, context, agent):
+ def __init__(self, console, context, agent, duration, interval):
"""
Invoked by application thread.
"""
- super(_SubscriptionMailbox, self).__init__(console, lifetime)
+ super(_SubscriptionMailbox, self).__init__(console, duration)
self.cv = Condition()
self.data = []
self.result = []
self.context = context
+ self.duration = duration
+ self.interval = interval
self.agent_name = agent.get_name()
self.agent_subscription_id = None # from agent
+ def subscribe(self, query):
+ agent = self.console.get_agent(self.agent_name)
+ if not agent:
+ logging.warning("subscribed failed - unknown agent '%s'" %
+ self.agent_name)
+ return False
+ try:
+ logging.debug("Sending Subscribe to Agent (%s)" % self.agent_name)
+ agent._send_subscribe_req(query, self.get_address(), self.interval,
+ self.duration)
+ except SendError, e:
+ logging.error(str(e))
+ return False
+ return True
+
+ def resubscribe(self, duration):
+ agent = self.console.get_agent(self.agent_name)
+ if not agent:
+ logging.warning("resubscribed failed - unknown agent '%s'" %
+ self.agent_name)
+ return False
+ try:
+ logging.debug("Sending resubscribe to Agent (%s)" % self.agent_name)
+ agent._send_resubscribe_req(self.get_address(),
+ self.agent_subscription_id, duration)
+ except SendError, e:
+ logging.error(str(e))
+ return False
+ return True
+
def deliver(self, msg):
"""
"""
opcode = msg.properties.get("qmf.opcode")
if (opcode == OpCode.subscribe_rsp or
opcode == OpCode.subscribe_refresh_rsp):
- #
- # sync only - just deliver the msg
- #
+
+ error = msg.content.get("_error")
+ if error:
+ try:
+ e_map = QmfData.from_map(error)
+ except TypeError:
+ logging.warning("Invalid QmfData map received: '%s'"
+ % str(error))
+ e_map = QmfData.create({"error":"Unknown error"})
+ sp = SubscribeParams(None, None, None, e_map)
+ else:
+ self.agent_subscription_id = msg.content.get("_subscription_id")
+ self.duration = msg.content.get("_duration", self.duration)
+ self.interval = msg.content.get("_interval", self.interval)
+ self.reset_timeout(self.duration)
+ sp = SubscribeParams(self.get_address(),
+ self.interval,
+ self.duration,
+ None)
self.cv.acquire()
try:
- self.data.append(msg)
+ self.data.append(sp)
# if was empty, notify waiters
if len(self.data) == 1:
self.cv.notify()
@@ -386,43 +453,7 @@
self.cv.release()
return
- # sid = msg.content.get("_subscription_id")
- # lifetime = msg.content.get("_duration")
- # error = msg.content.get("_error")
- # sp = SubscribeParams(sid,
- # msg.content.get("_interval"),
- # lifetime, error)
- # if sid and self.subscription_id is None:
- # self.subscription_id = sid
- # if lifetime:
- # self.console._lock.acquire()
- # try:
- # self.expiration_date = (datetime.datetime.utcnow() +
- # datetime.timedelta(seconds=lifetime))
- # finally:
- # self.console._lock.release()
-
- # if self.waiting:
- # self.cv.acquire()
- # try:
- # self.data.append(sp)
- # # if was empty, notify waiters
- # if len(self._data) == 1:
- # self._cv.notify()
- # finally:
- # self._cv.release()
- # else:
- # if opcode == OpCode.subscribe_rsp:
- # wi = WorkItem(WorkItem.SUBSCRIBE_RESPONSE,
- # self.context, sp)
- # else:
- # wi = WorkItem(WorkItem.RESUBSCRIBE_RESPONSE,
- # self.context, sp)
- # self.console._work_q.put(wi)
- # self.console._work_q_put = True
- # if error:
- # self.destroy()
-
+ # else: data indication
agent_name = msg.properties.get("qmf.agent")
if not agent_name:
logging.warning("Ignoring data_ind - no agent name given: %s" %
@@ -472,6 +503,72 @@
+class _AsyncSubscriptionMailbox(_SubscriptionMailbox):
+ """
+ A Mailbox for a single subscription. Allows only asychronous "subscribe"
+ and "refresh" requests.
+ """
+ def __init__(self, console, context, agent, duration, interval):
+ """
+ Invoked by application thread.
+ """
+ super(_AsyncSubscriptionMailbox, self).__init__(console, context,
+ agent, duration,
+ interval)
+ self.subscribe_pending = False
+ self.resubscribe_pending = False
+
+
+ def subscribe(self, query, reply_timeout):
+ if super(_AsyncSubscriptionMailbox, self).subscribe(query):
+ self.subscribe_pending = True
+ self.reset_timeout(reply_timeout)
+ return True
+ return False
+
+ def resubscribe(self, duration, reply_timeout):
+ if super(_AsyncSubscriptionMailbox, self).resubscribe(duration):
+ self.resubscribe_pending = True
+ self.reset_timeout(reply_timeout)
+ return True
+ return False
+
+ def deliver(self, msg):
+ """
+ """
+ super(_AsyncSubscriptionMailbox, self).deliver(msg)
+ sp = self.fetch(0)
+ if sp:
+ # if the message was a reply to a subscribe or
+ # re-subscribe, then we get here.
+ if self.subscribe_pending:
+ wi = WorkItem(WorkItem.SUBSCRIBE_RESPONSE,
+ self.context, sp)
+ else:
+ wi = WorkItem(WorkItem.RESUBSCRIBE_RESPONSE,
+ self.context, sp)
+
+ self.subscribe_pending = False
+ self.resubscribe_pending = False
+
+ self.console._work_q.put(wi)
+ self.console._work_q_put = True
+
+ if not sp.succeeded():
+ self.destroy()
+
+
+ def expire(self):
+ """ Either the subscription expired, or a request timedout.
+ """
+ if self.subscribe_pending:
+ wi = WorkItem(WorkItem.SUBSCRIBE_RESPONSE,
+ self.context, None)
+ elif self.resubscribe_pending:
+ wi = WorkItem(WorkItem.RESUBSCRIBE_RESPONSE,
+ self.context, None)
+ self.destroy()
+
##==============================================================================
## DATA MODEL
@@ -1264,111 +1361,73 @@
def create_subscription(self, agent, query, console_handle,
_interval=None, _duration=None,
- _reply_handle=None, _timeout=None):
+ _blocking=True, _timeout=None):
if not _duration:
_duration = self._subscribe_timeout
- if _reply_handle is not None:
- assert(False) # async TBD
- else:
- mbox = _SubscriptionMailbox(self, _duration, console_handle, agent)
-
- cid = mbox.get_address()
-
- try:
- logging.debug("Sending Subscribe to Agent (%s)" % time.time())
- agent._send_subscribe_req(query, cid, _interval, _duration)
- except SendError, e:
- logging.error(str(e))
- mbox.destroy()
- return None
+ if _timeout is None:
+ _timeout = self._reply_timeout
- if _reply_handle is not None:
+ if not _blocking:
+ mbox = _AsyncSubscriptionMailbox(self, console_handle, agent,
+ _duration, _interval)
+ if not mbox.subscribe(query, _timeout):
+ mbox.destroy()
+ return False
return True
+ else:
+ mbox = _SubscriptionMailbox(self, console_handle, agent, _duration,
+ _interval)
- # wait for reply
- if _timeout is None:
- _timeout = self._reply_timeout
+ if not mbox.subscribe(query):
+ mbox.destroy()
+ return None
- logging.debug("Waiting for response to subscription (%s)" % _timeout)
- # @todo: what if mbox expires here?
- replyMsg = mbox.fetch(_timeout)
+ logging.debug("Waiting for response to subscription (%s)" % _timeout)
+ # @todo: what if mbox expires here?
+ sp = mbox.fetch(_timeout)
- if not replyMsg:
- logging.debug("Subscription request wait timed-out.")
- mbox.destroy()
- return None
+ if not sp:
+ logging.debug("Subscription request wait timed-out.")
+ mbox.destroy()
+ return None
- error = replyMsg.content.get("_error")
- if error:
- mbox.destroy()
- try:
- e_map = QmfData.from_map(error)
- except TypeError:
- e_map = QmfData.create({"error":"Unknown error"})
- return SubscribeParams(None, None, None, e_map)
-
- mbox.agent_subscription_id = replyMsg.content.get("_subscription_id")
- return SubscribeParams(mbox.get_address(),
- replyMsg.content.get("_interval"),
- replyMsg.content.get("_duration"),
- None)
+ if not sp.succeeded():
+ mbox.destroy()
+
+ return sp
def refresh_subscription(self, subscription_id,
_duration=None,
- _reply_handle=None, _timeout=None):
- if _reply_handle is not None:
- assert(False) # async TBD
+ _timeout=None):
+ if _timeout is None:
+ _timeout = self._reply_timeout
mbox = self._get_mailbox(subscription_id)
if not mbox:
logging.warning("Subscription %s not found." % subscription_id)
return None
- agent = self.get_agent(mbox.agent_name)
- if not agent:
- logging.warning("Subscription %s agent %s not found." %
- (mbox.agent_name, subscription_id))
- return None
-
- try:
- logging.debug("Sending Subscribe to Agent (%s)" % time.time())
- agent._send_resubscribe_req(subscription_id,
- mbox.agent_subscription_id,
- _duration)
- except SendError, e:
- logging.error(str(e))
- # @todo ???? mbox.destroy()
- return None
+ if isinstance(mbox, _AsyncSubscriptionMailbox):
+ return mbox.resubscribe(_duration, _timeout)
+ else:
+ # synchronous - wait for reply
+ if not mbox.resubscribe(_duration):
+ # @todo ???? mbox.destroy()
+ return None
- if _reply_handle is not None:
- return True
+ # wait for reply
- # wait for reply
- if _timeout is None:
- _timeout = self._reply_timeout
+ logging.debug("Waiting for response to subscription (%s)" % _timeout)
+ sp = mbox.fetch(_timeout)
- logging.debug("Waiting for response to subscription (%s)" % _timeout)
- replyMsg = mbox.fetch(_timeout)
+ if not sp:
+ logging.debug("re-subscribe request wait timed-out.")
+ # @todo???? mbox.destroy()
+ return None
- if not replyMsg:
- logging.debug("Subscription request wait timed-out.")
- # @todo???? mbox.destroy()
- return None
+ return sp
- error = replyMsg.content.get("_error")
- if error:
- # @todo mbox.destroy()
- try:
- e_map = QmfData.from_map(error)
- except TypeError:
- e_map = QmfData.create({"error":"Unknown error"})
- return SubscribeParams(None, None, None, e_map)
-
- return SubscribeParams(mbox.get_address(),
- replyMsg.content.get("_interval"),
- replyMsg.content.get("_duration"),
- None)
def cancel_subscription(self, subscription_id):
"""
Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py?rev=916462&r1=916461&r2=916462&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py (original)
+++ qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py Thu Feb 25 21:16:47 2010
@@ -582,3 +582,227 @@
self.console.destroy(10)
+ def test_async_by_obj_id_schema(self):
+ # create console
+ # find one agent
+ # async subscribe to changes to any object in package1/class1
+ self.notifier = _testNotifier()
+ self.console = qmf2.console.Console(notifier=self.notifier,
+ agent_timeout=3)
+ self.conn = qpid.messaging.Connection(self.broker.host,
+ self.broker.port,
+ self.broker.user,
+ self.broker.password)
+ self.conn.connect()
+ self.console.add_connection(self.conn)
+
+ # query to match object "p2c1_key2" in schema package2/class1
+ sid = SchemaClassId.create("package2", "class1")
+ query = QmfQuery.create_id_object("p2c1_key2", sid)
+
+ agent_app = self.agents[0]
+ aname = agent_app.agent.get_name()
+ agent = self.console.find_agent(aname, timeout=3)
+ self.assertTrue(agent and agent.get_name() == aname)
+
+ # setup subscription on agent
+
+ rc = self.console.create_subscription(agent,
+ query,
+ "my-handle",
+ _blocking=False)
+ self.assertTrue(rc)
+
+ r_count = 0
+ sp = None
+ while self.notifier.wait_for_work(4):
+ wi = self.console.get_next_workitem(timeout=0)
+ while wi is not None:
+ r_count += 1
+ if wi.get_type() == WorkItem.SUBSCRIBE_RESPONSE:
+ self.assertTrue(wi.get_handle() == "my-handle")
+ sp = wi.get_params()
+ self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams))
+ self.assertTrue(sp.succeeded())
+ self.assertTrue(sp.get_error() == None)
+ else:
+ self.assertTrue(wi.get_type() ==
+ WorkItem.SUBSCRIBE_INDICATION)
+ # sp better be set up by now!
+ self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams))
+ reply = wi.get_params()
+ self.assertTrue(isinstance(reply, type([])))
+ self.assertTrue(len(reply) == 1)
+ self.assertTrue(isinstance(reply[0], QmfData))
+ self.assertTrue(reply[0].get_object_id() == "p2c1_key2")
+ sid = reply[0].get_schema_class_id()
+ self.assertTrue(isinstance(sid, SchemaClassId))
+ self.assertTrue(sid.get_package_name() == "package2")
+ self.assertTrue(sid.get_class_name() == "class1")
+ self.assertTrue(wi.get_handle() == "my-handle")
+
+ self.console.release_workitem(wi)
+
+ wi = self.console.get_next_workitem(timeout=0)
+
+ # for now, I expect 5 publish per subscription
+ self.assertTrue(r_count == 6)
+
+ self.console.destroy(10)
+
+ def test_async_refresh(self):
+ # create console
+ # find one agent
+ # async subscribe to changes to any object in package1/class1
+ # refresh after third data indication
+ self.notifier = _testNotifier()
+ self.console = qmf2.console.Console(notifier=self.notifier,
+ agent_timeout=3)
+ self.conn = qpid.messaging.Connection(self.broker.host,
+ self.broker.port,
+ self.broker.user,
+ self.broker.password)
+ self.conn.connect()
+ self.console.add_connection(self.conn)
+
+ # query to match object "p2c1_key2" in schema package2/class1
+ sid = SchemaClassId.create("package2", "class1")
+ query = QmfQuery.create_id_object("p2c1_key2", sid)
+
+ agent_app = self.agents[0]
+ aname = agent_app.agent.get_name()
+ agent = self.console.find_agent(aname, timeout=3)
+ self.assertTrue(agent and agent.get_name() == aname)
+
+ # setup subscription on agent
+
+ rc = self.console.create_subscription(agent,
+ query,
+ "my-handle",
+ _blocking=False)
+ self.assertTrue(rc)
+
+ # refresh after three subscribe indications, count all
+ # indications to verify refresh worked
+ r_count = 0
+ sp = None
+ rp = None
+ while self.notifier.wait_for_work(4):
+ wi = self.console.get_next_workitem(timeout=0)
+ while wi is not None:
+ r_count += 1
+ if wi.get_type() == WorkItem.SUBSCRIBE_RESPONSE:
+ self.assertTrue(wi.get_handle() == "my-handle")
+ sp = wi.get_params()
+ self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams))
+ self.assertTrue(sp.succeeded())
+ self.assertTrue(sp.get_error() == None)
+ elif wi.get_type() == WorkItem.RESUBSCRIBE_RESPONSE:
+ self.assertTrue(wi.get_handle() == "my-handle")
+ rp = wi.get_params()
+ self.assertTrue(isinstance(rp, qmf2.console.SubscribeParams))
+ self.assertTrue(rp.succeeded())
+ self.assertTrue(rp.get_error() == None)
+ else:
+ self.assertTrue(wi.get_type() ==
+ WorkItem.SUBSCRIBE_INDICATION)
+ # sp better be set up by now!
+ self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams))
+ reply = wi.get_params()
+ self.assertTrue(isinstance(reply, type([])))
+ self.assertTrue(len(reply) == 1)
+ self.assertTrue(isinstance(reply[0], QmfData))
+ self.assertTrue(reply[0].get_object_id() == "p2c1_key2")
+ sid = reply[0].get_schema_class_id()
+ self.assertTrue(isinstance(sid, SchemaClassId))
+ self.assertTrue(sid.get_package_name() == "package2")
+ self.assertTrue(sid.get_class_name() == "class1")
+ self.assertTrue(wi.get_handle() == "my-handle")
+
+ if r_count == 4: # + 1 for subscribe reply
+ rp = self.console.refresh_subscription(sp.get_subscription_id())
+ self.assertTrue(rp)
+
+ self.console.release_workitem(wi)
+
+ wi = self.console.get_next_workitem(timeout=0)
+
+ # for now, I expect 5 publish per subscription, + 2 replys
+ self.assertTrue(r_count > 7)
+
+ self.console.destroy(10)
+
+
+ def test_async_cancel(self):
+ # create console
+ # find one agent
+ # async subscribe to changes to any object in package1/class1
+ # cancel after first data indication
+ self.notifier = _testNotifier()
+ self.console = qmf2.console.Console(notifier=self.notifier,
+ agent_timeout=3)
+ self.conn = qpid.messaging.Connection(self.broker.host,
+ self.broker.port,
+ self.broker.user,
+ self.broker.password)
+ self.conn.connect()
+ self.console.add_connection(self.conn)
+
+ # query to match object "p2c1_key2" in schema package2/class1
+ sid = SchemaClassId.create("package2", "class1")
+ query = QmfQuery.create_id_object("p2c1_key2", sid)
+
+ agent_app = self.agents[0]
+ aname = agent_app.agent.get_name()
+ agent = self.console.find_agent(aname, timeout=3)
+ self.assertTrue(agent and agent.get_name() == aname)
+
+ # setup subscription on agent
+
+ rc = self.console.create_subscription(agent,
+ query,
+ "my-handle",
+ _blocking=False)
+ self.assertTrue(rc)
+
+ # refresh after three subscribe indications, count all
+ # indications to verify refresh worked
+ r_count = 0
+ sp = None
+ rp = None
+ while self.notifier.wait_for_work(4):
+ wi = self.console.get_next_workitem(timeout=0)
+ while wi is not None:
+ r_count += 1
+ if wi.get_type() == WorkItem.SUBSCRIBE_RESPONSE:
+ self.assertTrue(wi.get_handle() == "my-handle")
+ sp = wi.get_params()
+ self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams))
+ self.assertTrue(sp.succeeded())
+ self.assertTrue(sp.get_error() == None)
+ else:
+ self.assertTrue(wi.get_type() ==
+ WorkItem.SUBSCRIBE_INDICATION)
+ # sp better be set up by now!
+ self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams))
+ reply = wi.get_params()
+ self.assertTrue(isinstance(reply, type([])))
+ self.assertTrue(len(reply) == 1)
+ self.assertTrue(isinstance(reply[0], QmfData))
+ self.assertTrue(reply[0].get_object_id() == "p2c1_key2")
+ sid = reply[0].get_schema_class_id()
+ self.assertTrue(isinstance(sid, SchemaClassId))
+ self.assertTrue(sid.get_package_name() == "package2")
+ self.assertTrue(sid.get_class_name() == "class1")
+ self.assertTrue(wi.get_handle() == "my-handle")
+
+ self.console.cancel_subscription(sp.get_subscription_id())
+
+ self.console.release_workitem(wi)
+
+ wi = self.console.get_next_workitem(timeout=0)
+
+ # for now, I expect 1 subscribe reply and 1 data_indication
+ self.assertTrue(r_count == 2)
+
+ self.console.destroy(10)
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org