You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2010/02/24 22:05:03 UTC

svn commit: r915980 - in /qpid/trunk/qpid/extras/qmf/src/py/qmf2: agent.py console.py tests/subscriptions.py

Author: kgiusti
Date: Wed Feb 24 21:05:03 2010
New Revision: 915980

URL: http://svn.apache.org/viewvc?rev=915980&view=rev
Log:
QPID-2261: test subscription refresh and cancel

Modified:
    qpid/trunk/qpid/extras/qmf/src/py/qmf2/agent.py
    qpid/trunk/qpid/extras/qmf/src/py/qmf2/console.py
    qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py

Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf2/agent.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf2/agent.py?rev=915980&r1=915979&r2=915980&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/qmf/src/py/qmf2/agent.py (original)
+++ qpid/trunk/qpid/extras/qmf/src/py/qmf2/agent.py Wed Feb 24 21:05:03 2010
@@ -894,7 +894,7 @@
         if props.get("method") == "request":
             sid = cmap.get("_subscription_id")
             if not sid:
-                logging.debug("Invalid subscription refresh msg: %s" %
+                logging.error("Invalid subscription refresh msg: %s" %
                               str(msg))
                 return
 
@@ -902,7 +902,7 @@
             try:
                 ss = self._subscriptions.get(sid)
                 if not ss:
-                    logging.debug("Ignoring unknown subscription: %s" %
+                    logging.error("Ignoring unknown subscription: %s" %
                                   str(sid))
                     return
                 duration = cmap.get("_duration")
@@ -914,15 +914,29 @@
                         elif duration < self._min_duration:
                             duration = self._min_duration
                     except:
-                        logging.debug("Bad duration value: %s" % str(msg))
+                        logging.error("Bad duration value: %s" % str(msg))
                         duration = None  # use existing duration
 
                 ss.resubscribe(datetime.datetime.utcnow(), duration)
 
+                new_duration = ss.duration
+                new_interval = ss.interval
+
             finally:
                 self._lock.release()
 
 
+            sr_map = {"_subscription_id": sid,
+                      "_interval": new_interval,
+                      "_duration": new_duration}
+            m = Message(id=QMF_APP_ID,
+                        properties={"method":"response",
+                                   "qmf.opcode":OpCode.subscribe_refresh_rsp},
+                        correlation_id = msg.correlation_id,
+                        content=sr_map)
+            self._send_reply(m, msg.reply_to)
+
+
     def _handleUnsubscribeReqMsg(self, msg, cmap, props, version, _direct):
         """
         Process received Cancel Subscription Request

Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf2/console.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf2/console.py?rev=915980&r1=915979&r2=915980&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/qmf/src/py/qmf2/console.py (original)
+++ qpid/trunk/qpid/extras/qmf/src/py/qmf2/console.py Wed Feb 24 21:05:03 2010
@@ -1375,7 +1375,7 @@
         """
         mbox = self._get_mailbox(subscription_id)
         if not mbox:
-            return None
+            return
 
         agent = self.get_agent(mbox.agent_name)
         if agent:
@@ -1603,6 +1603,8 @@
             self._handle_response_msg(msg, cmap, version, _direct)
         elif opcode == OpCode.subscribe_rsp:
             self._handle_response_msg(msg, cmap, version, _direct)
+        elif opcode == OpCode.subscribe_refresh_rsp:
+            self._handle_response_msg(msg, cmap, version, _direct)
         elif opcode == OpCode.method_rsp:
             self._handle_response_msg(msg, cmap, version, _direct)
         elif opcode == OpCode.data_ind:

Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py?rev=915980&r1=915979&r2=915980&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py (original)
+++ qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py Wed Feb 24 21:05:03 2010
@@ -262,9 +262,9 @@
             subscriptions.append([sp, 0])
             index += 1
 
-        # now wait for the duration + interval + 1 and count the updates
+        # now wait for the (2 * interval) and count the updates
         r_count = 0
-        while self.notifier.wait_for_work(10 + 2 + 1):
+        while self.notifier.wait_for_work(4):
             wi = self.console.get_next_workitem(timeout=0)
             while wi is not None:
                 r_count += 1
@@ -336,10 +336,10 @@
             subscriptions.append([sp, 0])
             index += 1
 
-        # now wait for all subscriptions to expire (duration + interval + 1 for
-        # luck)
+        # now wait for all subscriptions to expire (2x interval w/o
+        # indications)
         r_count = 0
-        while self.notifier.wait_for_work(10 + 2 + 1):
+        while self.notifier.wait_for_work(4):
             wi = self.console.get_next_workitem(timeout=0)
             while wi is not None:
                 r_count += 1
@@ -407,10 +407,10 @@
             subscriptions.append([sp, 0])
             index += 1
 
-        # now wait for all subscriptions to expire (duration + interval + 1 for
-        # luck)
+        # now wait for all subscriptions to expire (2x interval w/o
+        # indications)
         r_count = 0
-        while self.notifier.wait_for_work(10 + 2 + 1):
+        while self.notifier.wait_for_work(4):
             wi = self.console.get_next_workitem(timeout=0)
             while wi is not None:
                 r_count += 1
@@ -439,7 +439,145 @@
         #for ii in range(len(subscriptions)):
         #    self.assertTrue(subscriptions[ii][1] == 5)
 
+        self.console.destroy(10)
+
+
+
+    def test_sync_refresh(self):
+        # create console
+        # find one agent
+        # subscribe to changes to any object in package1/class1
+        # after 3 data indications, refresh
+        # verify > 5 more data indications received
+        self.notifier = _testNotifier()
+        self.console = qmf2.console.Console(notifier=self.notifier,
+                                              agent_timeout=3)
+        self.conn = qpid.messaging.Connection(self.broker.host,
+                                              self.broker.port,
+                                              self.broker.user,
+                                              self.broker.password)
+        self.conn.connect()
+        self.console.add_connection(self.conn)
+
+        # query to match object "p2c1_key2" in schema package2/class1
+        sid = SchemaClassId.create("package2", "class1")
+        query = QmfQuery.create_id_object("p2c1_key2", sid)
+
+        agent_app = self.agents[0]
+        aname = agent_app.agent.get_name()
+        agent = self.console.find_agent(aname, timeout=3)
+        self.assertTrue(agent and agent.get_name() == aname)
+
+        # setup subscription on agent
+
+        sp = self.console.create_subscription(agent,
+                                              query,
+                                              "my-handle")
+        self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams))
+        self.assertTrue(sp.succeeded())
+        self.assertTrue(sp.get_error() == None)
+
+        # refresh after three subscribe indications, count all
+        # indications to verify refresh worked
+        r_count = 0
+        while self.notifier.wait_for_work(4):
+            wi = self.console.get_next_workitem(timeout=0)
+            while wi is not None:
+                r_count += 1
+                self.assertTrue(wi.get_type() == WorkItem.SUBSCRIBE_INDICATION)
+                reply = wi.get_params()
+                self.assertTrue(isinstance(reply, type([])))
+                self.assertTrue(len(reply) == 1)
+                self.assertTrue(isinstance(reply[0], QmfData))
+                self.assertTrue(reply[0].get_object_id() == "p2c1_key2")
+                sid = reply[0].get_schema_class_id()
+                self.assertTrue(isinstance(sid, SchemaClassId))
+                self.assertTrue(sid.get_package_name() == "package2")
+                self.assertTrue(sid.get_class_name() == "class1")
+                self.assertTrue(wi.get_handle() == "my-handle")
+
+                self.console.release_workitem(wi)
+
+                if r_count == 3:
+                    rp = self.console.refresh_subscription(sp.get_subscription_id())
+                    self.assertTrue(isinstance(rp, qmf2.console.SubscribeParams))
+
+                wi = self.console.get_next_workitem(timeout=0)
+
+        # for now, I expect 5 publish per subscription
+        self.assertTrue(r_count > 5)
+        # print("!!! total r_count=%d" % r_count)
+        #for ii in range(len(subscriptions)):
+        #    self.assertTrue(subscriptions[ii][1] == 5)
 
+        self.console.destroy(10)
+
+
+
+    def test_sync_cancel(self):
+        # create console
+        # find one agent
+        # subscribe to changes to any object in package1/class1
+        # after 2 data indications, cancel subscription
+        # verify < 5 data indications received
+        self.notifier = _testNotifier()
+        self.console = qmf2.console.Console(notifier=self.notifier,
+                                              agent_timeout=3)
+        self.conn = qpid.messaging.Connection(self.broker.host,
+                                              self.broker.port,
+                                              self.broker.user,
+                                              self.broker.password)
+        self.conn.connect()
+        self.console.add_connection(self.conn)
+
+        # query to match object "p2c1_key2" in schema package2/class1
+        sid = SchemaClassId.create("package2", "class1")
+        query = QmfQuery.create_id_object("p2c1_key2", sid)
+
+        agent_app = self.agents[0]
+        aname = agent_app.agent.get_name()
+        agent = self.console.find_agent(aname, timeout=3)
+        self.assertTrue(agent and agent.get_name() == aname)
+
+        # setup subscription on agent
+
+        sp = self.console.create_subscription(agent,
+                                              query,
+                                              "my-handle")
+        self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams))
+        self.assertTrue(sp.succeeded())
+        self.assertTrue(sp.get_error() == None)
+
+        # refresh after three subscribe indications, count all
+        # indications to verify refresh worked
+        r_count = 0
+        while self.notifier.wait_for_work(4):
+            wi = self.console.get_next_workitem(timeout=0)
+            while wi is not None:
+                r_count += 1
+                self.assertTrue(wi.get_type() == WorkItem.SUBSCRIBE_INDICATION)
+                reply = wi.get_params()
+                self.assertTrue(isinstance(reply, type([])))
+                self.assertTrue(len(reply) == 1)
+                self.assertTrue(isinstance(reply[0], QmfData))
+                self.assertTrue(reply[0].get_object_id() == "p2c1_key2")
+                sid = reply[0].get_schema_class_id()
+                self.assertTrue(isinstance(sid, SchemaClassId))
+                self.assertTrue(sid.get_package_name() == "package2")
+                self.assertTrue(sid.get_class_name() == "class1")
+                self.assertTrue(wi.get_handle() == "my-handle")
+
+                self.console.release_workitem(wi)
+
+                if r_count == 3:
+                    self.console.cancel_subscription(sp.get_subscription_id())
+
+                wi = self.console.get_next_workitem(timeout=0)
+
+        # for now, I expect 5 publish per subscription full duration
+        self.assertTrue(r_count < 5)
+        #for ii in range(len(subscriptions)):
+        #    self.assertTrue(subscriptions[ii][1] == 5)
 
         self.console.destroy(10)
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org