You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2010/02/25 22:16:49 UTC

svn commit: r916462 - in /qpid/trunk/qpid/extras/qmf/src/py/qmf2: console.py tests/subscriptions.py

Author: kgiusti
Date: Thu Feb 25 21:16:47 2010
New Revision: 916462

URL: http://svn.apache.org/viewvc?rev=916462&view=rev
Log:
QPID-2261: add console async subscription api, and tests

Modified:
    qpid/trunk/qpid/extras/qmf/src/py/qmf2/console.py
    qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py

Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf2/console.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf2/console.py?rev=916462&r1=916461&r2=916462&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/qmf/src/py/qmf2/console.py (original)
+++ qpid/trunk/qpid/extras/qmf/src/py/qmf2/console.py Thu Feb 25 21:16:47 2010
@@ -151,6 +151,24 @@
 
         console._wake_thread()
 
+    def reset_timeout(self, _timeout=None):
+        """ Reset the expiration date for this mailbox.
+        """
+        if _timeout is None:
+            _timeout = self.console._reply_timeout
+        self.console._lock.acquire()
+        try:
+            self.expiration_date = (datetime.datetime.utcnow() +
+                                    datetime.timedelta(seconds=_timeout))
+            self.console._next_mbox_expire = None
+        finally:
+            self.console._lock.release()
+
+        # wake the console mgmt thread so it will learn about the mbox
+        # expiration date (and adjust its idle sleep period correctly)
+
+        self.console._wake_thread()
+
     def deliver(self, msg):
         """
         """
@@ -353,32 +371,81 @@
 
 class _SubscriptionMailbox(_AsyncMailbox):
     """
-    A Mailbox for a single subscription.
+    A Mailbox for a single subscription.  Allows only sychronous "subscribe"
+    and "refresh" requests.
     """
-    def __init__(self, console, lifetime, context, agent):
+    def __init__(self, console, context, agent, duration, interval):
         """
         Invoked by application thread.
         """
-        super(_SubscriptionMailbox, self).__init__(console, lifetime)
+        super(_SubscriptionMailbox, self).__init__(console, duration)
         self.cv = Condition()
         self.data = []
         self.result = []
         self.context = context
+        self.duration = duration
+        self.interval = interval
         self.agent_name = agent.get_name()
         self.agent_subscription_id = None          # from agent
 
+    def subscribe(self, query):
+        agent = self.console.get_agent(self.agent_name)
+        if not agent:
+            logging.warning("subscribed failed - unknown agent '%s'" %
+                            self.agent_name)
+            return False
+        try:
+            logging.debug("Sending Subscribe to Agent (%s)" % self.agent_name)
+            agent._send_subscribe_req(query, self.get_address(), self.interval,
+                                      self.duration)
+        except SendError, e:
+            logging.error(str(e))
+            return False
+        return True
+
+    def resubscribe(self, duration):
+        agent = self.console.get_agent(self.agent_name)
+        if not agent:
+            logging.warning("resubscribed failed - unknown agent '%s'" %
+                            self.agent_name)
+            return False
+        try:
+            logging.debug("Sending resubscribe to Agent (%s)" % self.agent_name)
+            agent._send_resubscribe_req(self.get_address(),
+                                        self.agent_subscription_id, duration)
+        except SendError, e:
+            logging.error(str(e))
+            return False
+        return True
+
     def deliver(self, msg):
         """
         """
         opcode = msg.properties.get("qmf.opcode")
         if (opcode == OpCode.subscribe_rsp or
             opcode == OpCode.subscribe_refresh_rsp):
-            #
-            # sync only - just deliver the msg
-            #
+
+            error = msg.content.get("_error")
+            if error:
+                try:
+                    e_map = QmfData.from_map(error)
+                except TypeError:
+                    logging.warning("Invalid QmfData map received: '%s'"
+                                    % str(error))
+                    e_map = QmfData.create({"error":"Unknown error"})
+                sp = SubscribeParams(None, None, None, e_map)
+            else:
+                self.agent_subscription_id = msg.content.get("_subscription_id")
+                self.duration = msg.content.get("_duration", self.duration)
+                self.interval = msg.content.get("_interval", self.interval)
+                self.reset_timeout(self.duration)
+                sp = SubscribeParams(self.get_address(),
+                                     self.interval,
+                                     self.duration,
+                                     None)
             self.cv.acquire()
             try:
-                self.data.append(msg)
+                self.data.append(sp)
                 # if was empty, notify waiters
                 if len(self.data) == 1:
                     self.cv.notify()
@@ -386,43 +453,7 @@
                 self.cv.release()
             return
 
-            # sid = msg.content.get("_subscription_id")
-            # lifetime = msg.content.get("_duration")
-            # error = msg.content.get("_error")
-            # sp = SubscribeParams(sid,
-            #                      msg.content.get("_interval"),
-            #                      lifetime, error)
-            # if sid and self.subscription_id is None:
-            #     self.subscription_id = sid
-            # if lifetime:
-            #     self.console._lock.acquire()
-            #     try:
-            #         self.expiration_date = (datetime.datetime.utcnow() +
-            #                                 datetime.timedelta(seconds=lifetime))
-            #     finally:
-            #         self.console._lock.release()
-
-            # if self.waiting:
-            #     self.cv.acquire()
-            #     try:
-            #         self.data.append(sp)
-            #         # if was empty, notify waiters
-            #         if len(self._data) == 1:
-            #             self._cv.notify()
-            #     finally:
-            #         self._cv.release()
-            # else:
-            #     if opcode == OpCode.subscribe_rsp:
-            #         wi = WorkItem(WorkItem.SUBSCRIBE_RESPONSE,
-            #                       self.context, sp)
-            #     else:
-            #         wi = WorkItem(WorkItem.RESUBSCRIBE_RESPONSE,
-            #                       self.context, sp)
-            #     self.console._work_q.put(wi)
-            #     self.console._work_q_put = True
-            #     if error:
-            #         self.destroy()
-
+        # else: data indication
         agent_name = msg.properties.get("qmf.agent")
         if not agent_name:
             logging.warning("Ignoring data_ind - no agent name given: %s" %
@@ -472,6 +503,72 @@
 
 
 
+class _AsyncSubscriptionMailbox(_SubscriptionMailbox):
+    """
+    A Mailbox for a single subscription.  Allows only asychronous "subscribe"
+    and "refresh" requests.
+    """
+    def __init__(self, console, context, agent, duration, interval):
+        """
+        Invoked by application thread.
+        """
+        super(_AsyncSubscriptionMailbox, self).__init__(console, context,
+                                                        agent, duration,
+                                                        interval)
+        self.subscribe_pending = False
+        self.resubscribe_pending = False
+
+
+    def subscribe(self, query, reply_timeout):
+        if super(_AsyncSubscriptionMailbox, self).subscribe(query):
+            self.subscribe_pending = True
+            self.reset_timeout(reply_timeout)
+            return True
+        return False
+
+    def resubscribe(self, duration, reply_timeout):
+        if super(_AsyncSubscriptionMailbox, self).resubscribe(duration):
+            self.resubscribe_pending = True
+            self.reset_timeout(reply_timeout)
+            return True
+        return False
+
+    def deliver(self, msg):
+        """
+        """
+        super(_AsyncSubscriptionMailbox, self).deliver(msg)
+        sp = self.fetch(0)
+        if sp:
+            # if the message was a reply to a subscribe or
+            # re-subscribe, then we get here.
+            if self.subscribe_pending:
+                wi = WorkItem(WorkItem.SUBSCRIBE_RESPONSE,
+                              self.context, sp)
+            else:
+                wi = WorkItem(WorkItem.RESUBSCRIBE_RESPONSE,
+                              self.context, sp)
+
+            self.subscribe_pending = False
+            self.resubscribe_pending = False
+
+            self.console._work_q.put(wi)
+            self.console._work_q_put = True
+
+            if not sp.succeeded():
+                self.destroy()
+
+
+    def expire(self):
+        """ Either the subscription expired, or a request timedout.
+        """
+        if self.subscribe_pending:
+            wi = WorkItem(WorkItem.SUBSCRIBE_RESPONSE,
+                          self.context, None)
+        elif self.resubscribe_pending:
+            wi = WorkItem(WorkItem.RESUBSCRIBE_RESPONSE,
+                          self.context, None)
+        self.destroy()
+
 
 ##==============================================================================
 ## DATA MODEL
@@ -1264,111 +1361,73 @@
 
     def create_subscription(self, agent, query, console_handle,
                             _interval=None, _duration=None,
-                            _reply_handle=None, _timeout=None):
+                            _blocking=True, _timeout=None):
         if not _duration:
             _duration = self._subscribe_timeout
 
-        if _reply_handle is not None:
-            assert(False)  # async TBD
-        else:
-            mbox = _SubscriptionMailbox(self, _duration, console_handle, agent)
-
-        cid = mbox.get_address()
-
-        try:
-            logging.debug("Sending Subscribe to Agent (%s)" % time.time())
-            agent._send_subscribe_req(query, cid, _interval, _duration)
-        except SendError, e:
-            logging.error(str(e))
-            mbox.destroy()
-            return None
+        if _timeout is None:
+            _timeout = self._reply_timeout
 
-        if _reply_handle is not None:
+        if not _blocking:
+            mbox = _AsyncSubscriptionMailbox(self, console_handle, agent,
+                                             _duration, _interval)
+            if not mbox.subscribe(query, _timeout):
+                mbox.destroy()
+                return False
             return True
+        else:
+            mbox = _SubscriptionMailbox(self, console_handle, agent, _duration,
+                                        _interval)
 
-        # wait for reply
-        if _timeout is None:
-            _timeout = self._reply_timeout
+            if not mbox.subscribe(query):
+                mbox.destroy()
+                return None
 
-        logging.debug("Waiting for response to subscription (%s)" % _timeout)
-        # @todo: what if mbox expires here?
-        replyMsg = mbox.fetch(_timeout)
+            logging.debug("Waiting for response to subscription (%s)" % _timeout)
+            # @todo: what if mbox expires here?
+            sp = mbox.fetch(_timeout)
 
-        if not replyMsg:
-            logging.debug("Subscription request wait timed-out.")
-            mbox.destroy()
-            return None
+            if not sp:
+                logging.debug("Subscription request wait timed-out.")
+                mbox.destroy()
+                return None
 
-        error = replyMsg.content.get("_error")
-        if error:
-            mbox.destroy()
-            try:
-                e_map = QmfData.from_map(error)
-            except TypeError:
-                e_map = QmfData.create({"error":"Unknown error"})
-            return SubscribeParams(None, None, None, e_map)
-
-        mbox.agent_subscription_id = replyMsg.content.get("_subscription_id")
-        return SubscribeParams(mbox.get_address(),
-                               replyMsg.content.get("_interval"),
-                               replyMsg.content.get("_duration"),
-                               None)
+            if not sp.succeeded():
+                mbox.destroy()
+
+            return sp
 
     def refresh_subscription(self, subscription_id,
                              _duration=None,
-                             _reply_handle=None, _timeout=None):
-        if _reply_handle is not None:
-            assert(False)  # async TBD
+                             _timeout=None):
+        if _timeout is None:
+            _timeout = self._reply_timeout
 
         mbox = self._get_mailbox(subscription_id)
         if not mbox:
             logging.warning("Subscription %s not found." % subscription_id)
             return None
 
-        agent = self.get_agent(mbox.agent_name)
-        if not agent:
-            logging.warning("Subscription %s agent %s not found." %
-                            (mbox.agent_name, subscription_id))
-            return None
-
-        try:
-            logging.debug("Sending Subscribe to Agent (%s)" % time.time())
-            agent._send_resubscribe_req(subscription_id,
-                                        mbox.agent_subscription_id,
-                                        _duration)
-        except SendError, e:
-            logging.error(str(e))
-            # @todo ???? mbox.destroy()
-            return None
+        if isinstance(mbox, _AsyncSubscriptionMailbox):
+            return mbox.resubscribe(_duration, _timeout)
+        else:
+            # synchronous - wait for reply
+            if not mbox.resubscribe(_duration):
+                # @todo ???? mbox.destroy()
+                return None
 
-        if _reply_handle is not None:
-            return True
+            # wait for reply
 
-        # wait for reply
-        if _timeout is None:
-            _timeout = self._reply_timeout
+            logging.debug("Waiting for response to subscription (%s)" % _timeout)
+            sp = mbox.fetch(_timeout)
 
-        logging.debug("Waiting for response to subscription (%s)" % _timeout)
-        replyMsg = mbox.fetch(_timeout)
+            if not sp:
+                logging.debug("re-subscribe request wait timed-out.")
+                # @todo???? mbox.destroy()
+                return None
 
-        if not replyMsg:
-            logging.debug("Subscription request wait timed-out.")
-            # @todo???? mbox.destroy()
-            return None
+            return sp
 
-        error = replyMsg.content.get("_error")
-        if error:
-            # @todo mbox.destroy()
-            try:
-                e_map = QmfData.from_map(error)
-            except TypeError:
-                e_map = QmfData.create({"error":"Unknown error"})
-            return SubscribeParams(None, None, None, e_map)
-
-        return SubscribeParams(mbox.get_address(),
-                               replyMsg.content.get("_interval"),
-                               replyMsg.content.get("_duration"),
-                               None)
 
     def cancel_subscription(self, subscription_id):
         """

Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py?rev=916462&r1=916461&r2=916462&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py (original)
+++ qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py Thu Feb 25 21:16:47 2010
@@ -582,3 +582,227 @@
         self.console.destroy(10)
 
 
+    def test_async_by_obj_id_schema(self):
+        # create console
+        # find one agent
+        # async subscribe to changes to any object in package1/class1
+        self.notifier = _testNotifier()
+        self.console = qmf2.console.Console(notifier=self.notifier,
+                                              agent_timeout=3)
+        self.conn = qpid.messaging.Connection(self.broker.host,
+                                              self.broker.port,
+                                              self.broker.user,
+                                              self.broker.password)
+        self.conn.connect()
+        self.console.add_connection(self.conn)
+
+        # query to match object "p2c1_key2" in schema package2/class1
+        sid = SchemaClassId.create("package2", "class1")
+        query = QmfQuery.create_id_object("p2c1_key2", sid)
+
+        agent_app = self.agents[0]
+        aname = agent_app.agent.get_name()
+        agent = self.console.find_agent(aname, timeout=3)
+        self.assertTrue(agent and agent.get_name() == aname)
+
+        # setup subscription on agent
+
+        rc = self.console.create_subscription(agent,
+                                              query,
+                                              "my-handle",
+                                              _blocking=False)
+        self.assertTrue(rc)
+
+        r_count = 0
+        sp = None
+        while self.notifier.wait_for_work(4):
+            wi = self.console.get_next_workitem(timeout=0)
+            while wi is not None:
+                r_count += 1
+                if wi.get_type() == WorkItem.SUBSCRIBE_RESPONSE:
+                    self.assertTrue(wi.get_handle() == "my-handle")
+                    sp = wi.get_params()
+                    self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams))
+                    self.assertTrue(sp.succeeded())
+                    self.assertTrue(sp.get_error() == None)
+                else:
+                    self.assertTrue(wi.get_type() ==
+                                    WorkItem.SUBSCRIBE_INDICATION)
+                    # sp better be set up by now!
+                    self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams))
+                    reply = wi.get_params()
+                    self.assertTrue(isinstance(reply, type([])))
+                    self.assertTrue(len(reply) == 1)
+                    self.assertTrue(isinstance(reply[0], QmfData))
+                    self.assertTrue(reply[0].get_object_id() == "p2c1_key2")
+                    sid = reply[0].get_schema_class_id()
+                    self.assertTrue(isinstance(sid, SchemaClassId))
+                    self.assertTrue(sid.get_package_name() == "package2")
+                    self.assertTrue(sid.get_class_name() == "class1")
+                    self.assertTrue(wi.get_handle() == "my-handle")
+
+                self.console.release_workitem(wi)
+
+                wi = self.console.get_next_workitem(timeout=0)
+
+        # for now, I expect 5 publish per subscription
+        self.assertTrue(r_count == 6)
+
+        self.console.destroy(10)
+
+    def test_async_refresh(self):
+        # create console
+        # find one agent
+        # async subscribe to changes to any object in package1/class1
+        # refresh after third data indication
+        self.notifier = _testNotifier()
+        self.console = qmf2.console.Console(notifier=self.notifier,
+                                              agent_timeout=3)
+        self.conn = qpid.messaging.Connection(self.broker.host,
+                                              self.broker.port,
+                                              self.broker.user,
+                                              self.broker.password)
+        self.conn.connect()
+        self.console.add_connection(self.conn)
+
+        # query to match object "p2c1_key2" in schema package2/class1
+        sid = SchemaClassId.create("package2", "class1")
+        query = QmfQuery.create_id_object("p2c1_key2", sid)
+
+        agent_app = self.agents[0]
+        aname = agent_app.agent.get_name()
+        agent = self.console.find_agent(aname, timeout=3)
+        self.assertTrue(agent and agent.get_name() == aname)
+
+        # setup subscription on agent
+
+        rc = self.console.create_subscription(agent,
+                                              query,
+                                              "my-handle",
+                                              _blocking=False)
+        self.assertTrue(rc)
+
+        # refresh after three subscribe indications, count all
+        # indications to verify refresh worked
+        r_count = 0
+        sp = None
+        rp = None
+        while self.notifier.wait_for_work(4):
+            wi = self.console.get_next_workitem(timeout=0)
+            while wi is not None:
+                r_count += 1
+                if wi.get_type() == WorkItem.SUBSCRIBE_RESPONSE:
+                    self.assertTrue(wi.get_handle() == "my-handle")
+                    sp = wi.get_params()
+                    self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams))
+                    self.assertTrue(sp.succeeded())
+                    self.assertTrue(sp.get_error() == None)
+                elif wi.get_type() == WorkItem.RESUBSCRIBE_RESPONSE:
+                    self.assertTrue(wi.get_handle() == "my-handle")
+                    rp = wi.get_params()
+                    self.assertTrue(isinstance(rp, qmf2.console.SubscribeParams))
+                    self.assertTrue(rp.succeeded())
+                    self.assertTrue(rp.get_error() == None)
+                else:
+                    self.assertTrue(wi.get_type() ==
+                                    WorkItem.SUBSCRIBE_INDICATION)
+                    # sp better be set up by now!
+                    self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams))
+                    reply = wi.get_params()
+                    self.assertTrue(isinstance(reply, type([])))
+                    self.assertTrue(len(reply) == 1)
+                    self.assertTrue(isinstance(reply[0], QmfData))
+                    self.assertTrue(reply[0].get_object_id() == "p2c1_key2")
+                    sid = reply[0].get_schema_class_id()
+                    self.assertTrue(isinstance(sid, SchemaClassId))
+                    self.assertTrue(sid.get_package_name() == "package2")
+                    self.assertTrue(sid.get_class_name() == "class1")
+                    self.assertTrue(wi.get_handle() == "my-handle")
+
+                    if r_count == 4:  # + 1 for subscribe reply
+                        rp = self.console.refresh_subscription(sp.get_subscription_id())
+                        self.assertTrue(rp)
+
+                self.console.release_workitem(wi)
+
+                wi = self.console.get_next_workitem(timeout=0)
+
+        # for now, I expect 5 publish per subscription, + 2 replys
+        self.assertTrue(r_count > 7)
+
+        self.console.destroy(10)
+
+
+    def test_async_cancel(self):
+        # create console
+        # find one agent
+        # async subscribe to changes to any object in package1/class1
+        # cancel after first data indication
+        self.notifier = _testNotifier()
+        self.console = qmf2.console.Console(notifier=self.notifier,
+                                              agent_timeout=3)
+        self.conn = qpid.messaging.Connection(self.broker.host,
+                                              self.broker.port,
+                                              self.broker.user,
+                                              self.broker.password)
+        self.conn.connect()
+        self.console.add_connection(self.conn)
+
+        # query to match object "p2c1_key2" in schema package2/class1
+        sid = SchemaClassId.create("package2", "class1")
+        query = QmfQuery.create_id_object("p2c1_key2", sid)
+
+        agent_app = self.agents[0]
+        aname = agent_app.agent.get_name()
+        agent = self.console.find_agent(aname, timeout=3)
+        self.assertTrue(agent and agent.get_name() == aname)
+
+        # setup subscription on agent
+
+        rc = self.console.create_subscription(agent,
+                                              query,
+                                              "my-handle",
+                                              _blocking=False)
+        self.assertTrue(rc)
+
+        # refresh after three subscribe indications, count all
+        # indications to verify refresh worked
+        r_count = 0
+        sp = None
+        rp = None
+        while self.notifier.wait_for_work(4):
+            wi = self.console.get_next_workitem(timeout=0)
+            while wi is not None:
+                r_count += 1
+                if wi.get_type() == WorkItem.SUBSCRIBE_RESPONSE:
+                    self.assertTrue(wi.get_handle() == "my-handle")
+                    sp = wi.get_params()
+                    self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams))
+                    self.assertTrue(sp.succeeded())
+                    self.assertTrue(sp.get_error() == None)
+                else:
+                    self.assertTrue(wi.get_type() ==
+                                    WorkItem.SUBSCRIBE_INDICATION)
+                    # sp better be set up by now!
+                    self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams))
+                    reply = wi.get_params()
+                    self.assertTrue(isinstance(reply, type([])))
+                    self.assertTrue(len(reply) == 1)
+                    self.assertTrue(isinstance(reply[0], QmfData))
+                    self.assertTrue(reply[0].get_object_id() == "p2c1_key2")
+                    sid = reply[0].get_schema_class_id()
+                    self.assertTrue(isinstance(sid, SchemaClassId))
+                    self.assertTrue(sid.get_package_name() == "package2")
+                    self.assertTrue(sid.get_class_name() == "class1")
+                    self.assertTrue(wi.get_handle() == "my-handle")
+
+                    self.console.cancel_subscription(sp.get_subscription_id())
+
+                self.console.release_workitem(wi)
+
+                wi = self.console.get_next_workitem(timeout=0)
+
+        # for now, I expect 1 subscribe reply and 1 data_indication
+        self.assertTrue(r_count == 2)
+
+        self.console.destroy(10)



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org