You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2011/02/18 22:17:07 UTC

svn commit: r1072151 - /qpid/branches/qpid-2935/qpid/cpp/src/tests/cluster_tests.py

Author: aconway
Date: Fri Feb 18 21:17:00 2011
New Revision: 1072151

URL: http://svn.apache.org/viewvc?rev=1072151&view=rev
Log:
QPID-2935: Cluster flow control tests.

The tests check if send operations block when expected due to flow
control and unblock as expected when messages are drained.

Modified:
    qpid/branches/qpid-2935/qpid/cpp/src/tests/cluster_tests.py

Modified: qpid/branches/qpid-2935/qpid/cpp/src/tests/cluster_tests.py
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/tests/cluster_tests.py?rev=1072151&r1=1072150&r2=1072151&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/tests/cluster_tests.py Fri Feb 18 21:17:00 2011
@@ -23,7 +23,7 @@ from qpid import datatypes, messaging
 from brokertest import *
 from qpid.harness import Skipped
 from qpid.messaging import Message, Empty
-from threading import Thread, Lock
+from threading import Thread, Lock, Condition
 from logging import getLogger
 from itertools import chain
 from tempfile import NamedTemporaryFile
@@ -304,179 +304,112 @@ acl allow all all
         # Verify logs are consistent
         cluster_test_logs.verify_logs()
 
-    def test_queue_flowlimit(self):
+    class BlockedSend(Thread):
+        """Send a message, send is expected to block.
+        Verify that it does block (for a given timeout), then allow
+        waiting till it unblocks when it is expected to do so."""
+        def __init__(self, sender, msg):
+            self.sender, self.msg = sender, msg
+            self.blocked = True
+            self.condition = Condition()
+            self.timeout = 0.1    # Time to wait for expected results.
+            Thread.__init__(self)
+        def run(self):
+            try:
+                self.sender.send(self.msg)
+                self.condition.acquire()
+                try:
+                    self.blocked = False
+                    self.condition.notify()
+                finally: self.condition.release()
+            except Exception,e: print "BlockedSend exception: %s"%e
+        def start(self):
+            Thread.start(self)
+            time.sleep(self.timeout)
+            assert self.blocked         # Expected to block
+        def assert_blocked(self): assert self.blocked
+        def wait(self):                 # Now expecting to unblock
+            self.condition.acquire()
+            try:
+                while self.blocked:
+                    self.condition.wait(self.timeout)
+                    if self.blocked: raise Exception("Timed out waiting for send to unblock")
+            finally: self.condition.release()
+            self.join()
+
+    def queue_flowlimit_test(self, brokers):
         """Verify that the queue's flowlimit configuration and state are
         correctly replicated.
+        The brokers argument allows this test to run on single broker,
+        cluster of 2 pre-startd brokers or cluster where second broker
+        starts after queue is in flow control.
         """
-        return;  # @todo enable once flow control works in clusters
-        # start a cluster of two brokers
-        args = ["--log-enable=info+:broker"]
-        cluster = self.cluster(2, args)
-
-        # configure a queue with a specific flow limit on broker 0
-        ssn0 = cluster[0].connect().session()
-        s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.max-count':99, 'qpid.flow_stop_count':5, 'qpid.flow_resume_count':3}}}}")
-        cluster[0].startQmf()
-        for q in cluster[0].qmf_session.getObjects(_class="queue"):
-            if q.name == "flq":
-                oid = q.getObjectId()
-                break
-        self.assertEqual(q.name, "flq")
-        self.assertEqual(q.flowStopCount, 5)
-        self.assertEqual(q.flowResumeCount, 3)
-        self.assertFalse(q.flowStopped)
-
-        # verify both brokers in cluster have same configuration
-        cluster[1].startQmf()
-        qs = cluster[1].qmf_session.getObjects(_objectId=oid)
-        self.assertEqual(len(qs), 1)
-        q = qs[0]
+        # configure a queue with a specific flow limit on first broker
+        ssn0 = brokers.first().connect().session()
+        s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':5, 'qpid.flow_resume_count':3}}}}")
+        brokers.first().startQmf()
+        q = [q for q in brokers.first().qmf_session.getObjects(_class="queue") if q.name == "flq"][0]
+        oid = q.getObjectId()
         self.assertEqual(q.name, "flq")
-        self.assertEqual(q.flowStopCount, 5)
-        self.assertEqual(q.flowResumeCount, 3)
-        self.assertFalse(q.flowStopped)
+        self.assertEqual(q.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L})
+        assert not q.flowStopped
 
         # fill the queue on one broker until flow control is active
-        class BlockedSender(Thread):
-            def __init__(self): Thread.__init__(self)
-            def run(self):
-                for x in range(6):
-                    s0.send(Message(str(x)))
-
-        sender = BlockedSender()
-        sender.start()
-
-        start = time.time()
-        while time.time() < start + 5:
-            q = cluster[0].qmf_session.getObjects(_objectId=oid)[0]
-            if q.flowStopped:
-                break;
-        self.assertTrue(q.flowStopped)
-
-        # verify flow control is active on other broker.
-        q = cluster[1].qmf_session.getObjects(_objectId=oid)[0]
-        self.assertTrue(q.flowStopped)
-
-        # now drain the queue using a session to the other broker
-        ssn1 = cluster[1].connect().session()
-        r1 = ssn1.receiver("flq", capacity=6)
-        try:
-            while r1.fetch(timeout=0):
-                ssn1.acknowledge()
-        except Empty:
-            pass
-        sender.join()
-
-        # and verify both brokers see an unblocked queue
-        q = cluster[0].qmf_session.getObjects(_objectId=oid)[0]
-        self.assertFalse(q.flowStopped)
-        q = cluster[1].qmf_session.getObjects(_objectId=oid)[0]
-        self.assertFalse(q.flowStopped)
-
-        ssn0.connection.close()
-        ssn1.connection.close()
-        cluster_test_logs.verify_logs()
-
-
-    def test_queue_flowlimit_join(self):
-        """Verify that the queue's flowlimit configuration and state are
-        correctly replicated to a newly joined broker.
-        """
-        return;  # @todo enable once flow control works in clusters
-        # start a cluster of two brokers
-        #args = ["--log-enable=info+:broker"]
-        args = ["--log-enable=debug"]
-        cluster = self.cluster(2, args)
-
-        # configure a queue with a specific flow limit on broker 0
-        ssn0 = cluster[0].connect().session()
-        s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.max-count':99, 'qpid.flow_stop_count':5, 'qpid.flow_resume_count':3}}}}")
-        cluster[0].startQmf()
-        for q in cluster[0].qmf_session.getObjects(_class="queue"):
-            if q.name == "flq":
-                oid = q.getObjectId()
-                break
-        self.assertEqual(q.name, "flq")
-        self.assertEqual(q.flowStopCount, 5)
-        self.assertEqual(q.flowResumeCount, 3)
-        self.assertFalse(q.flowStopped)
-
-        # verify both brokers in cluster have same configuration
-        cluster[1].startQmf()
-        qs = cluster[1].qmf_session.getObjects(_objectId=oid)
+        for x in range(5): s0.send(Message(str(x)))
+        sender = ShortTests.BlockedSend(s0, Message(str(6)))
+        sender.start()                  # Tests that sender does block
+        # Verify the broker queue goes into a flowStopped state
+        deadline = time.time() + 1
+        while not q.flowStopped and time.time() < deadline: q.update()
+        assert q.flowStopped
+        sender.assert_blocked()         # Still blocked
+
+        # Now verify the  both brokers in cluster have same configuration
+        brokers.second().startQmf()
+        qs = brokers.second().qmf_session.getObjects(_objectId=oid)
         self.assertEqual(len(qs), 1)
         q = qs[0]
         self.assertEqual(q.name, "flq")
-        self.assertEqual(q.flowStopCount, 5)
-        self.assertEqual(q.flowResumeCount, 3)
-        self.assertFalse(q.flowStopped)
-
-        # fill the queue on one broker until flow control is active
-        class BlockedSender(Thread):
-            def __init__(self): Thread.__init__(self)
-            def run(self):
-                for x in range(6):
-                    s0.send(Message(str(x)))
-
-        sender = BlockedSender()
-        sender.start()
-
-        start = time.time()
-        while time.time() < start + 5:
-            q = cluster[0].qmf_session.getObjects(_objectId=oid)[0]
-            if q.flowStopped:
-                break;
-        self.assertTrue(q.flowStopped)
-
-        # verify flow control is active on other broker.
-        q = cluster[1].qmf_session.getObjects(_objectId=oid)[0]
-        self.assertTrue(q.flowStopped)
-
-        # add a new broker to the cluster
-        print("Start")
-        cluster.start()
-        print("Start Done")
-        
-        # todo: enable verification:
-        # cluster[2].startQmf()
-        # qs = cluster[2].qmf_session.getObjects(_objectId=oid)
-        # self.assertEqual(len(qs), 1)
-        # q = qs[0]
-        # self.assertEqual(q.name, "flq")
-        # self.assertEqual(q.flowStopCount, 5)
-        # self.assertEqual(q.flowResumeCount, 3)
-        # self.assertEqual(q.msgDepth, 5)
-        # self.assertFalse(q.flowStopped)
-        # q = cluster[2].qmf_session.getObjects(_objectId=oid)[0]
-        # self.assertTrue(q.flowStopped)
-
-        # verify new member's queue config
-        # verify new member's queue flow setting
-
-
-
+        self.assertEqual(q.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L})
+        assert q.flowStopped
 
         # now drain the queue using a session to the other broker
-        ssn1 = cluster[1].connect().session()
+        ssn1 = brokers.second().connect().session()
         r1 = ssn1.receiver("flq", capacity=6)
-        try:
-            while r1.fetch(timeout=1):
-                ssn1.acknowledge()
-        except Empty:
-            pass
-        sender.join()
-
-        # and verify both brokers see an unblocked queue
-        q = cluster[0].qmf_session.getObjects(_objectId=oid)[0]
-        self.assertFalse(q.flowStopped)
-        q = cluster[1].qmf_session.getObjects(_objectId=oid)[0]
-        self.assertFalse(q.flowStopped)
+        for x in range(4):
+            r1.fetch(timeout=0)
+            ssn1.acknowledge()
+        sender.wait()                   # Verify no longer blocked.
 
         ssn0.connection.close()
         ssn1.connection.close()
         cluster_test_logs.verify_logs()
 
+    def test_queue_flowlimit(self):
+        """Test flow limits on a standalone broker"""
+        broker = self.broker()
+        class Brokers:
+            def first(self): return broker
+            def second(self): return broker
+        self.queue_flowlimit_test(Brokers())
 
+    def test_queue_flowlimit_cluster(self):
+        return          # TODO aconway 2011-02-18: disabled till fixed, QPID-2935
+        cluster = self.cluster(2)
+        class Brokers:
+            def first(self): return cluster[0]
+            def second(self): return cluster[1]
+        self.queue_flowlimit_test(Brokers())
+
+    def test_queue_flowlimit_cluster_join(self):
+        return          # TODO aconway 2011-02-18: disabled till fixed, QPID-2935
+        cluster = self.cluster(1)
+        class Brokers:
+            def first(self): return cluster[0]
+            def second(self):
+                if len(cluster) == 1: cluster.start()
+                return cluster[1]
+        self.queue_flowlimit_test(Brokers())
 
 class LongTests(BrokerTest):
     """Tests that can run for a long time if -DDURATION=<minutes> is set"""



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org