You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2011/05/05 23:41:40 UTC

svn commit: r1099979 - in /qpid/trunk/qpid/cpp/src/tests: cluster_tests.py queue_flow_limit_tests.py

Author: kgiusti
Date: Thu May  5 21:41:40 2011
New Revision: 1099979

URL: http://svn.apache.org/viewvc?rev=1099979&view=rev
Log:
QPID-3244: unit test for the fix.

Modified:
    qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
    qpid/trunk/qpid/cpp/src/tests/queue_flow_limit_tests.py

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=1099979&r1=1099978&r2=1099979&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Thu May  5 21:41:40 2011
@@ -532,6 +532,54 @@ acl allow all all
         assert not sender.isAlive()
         assert sender.done
 
+    def test_blocked_queue_delete(self):
+        """Verify that producers which are blocked on a queue due to flow
+        control are unblocked when that queue is deleted.
+        """
+
+        cluster = self.cluster(2)
+        cluster[0].startQmf()
+        cluster[1].startQmf()
+
+        # configure a queue with a specific flow limit on first broker
+        ssn0 = cluster[0].connect().session()
+        s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':5, 'qpid.flow_resume_count':3}}}}")
+        q1 = [q for q in cluster[0].qmf_session.getObjects(_class="queue") if q.name == "flq"][0]
+        oid = q1.getObjectId()
+        self.assertEqual(q1.name, "flq")
+        self.assertEqual(q1.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L})
+        assert not q1.flowStopped
+        self.assertEqual(q1.flowStoppedCount, 0)
+
+        # fill the queue on one broker until flow control is active
+        for x in range(5): s0.send(Message(str(x)))
+        sender = ShortTests.BlockedSend(s0, Message(str(6)))
+        sender.start()                  # Tests that sender does block
+        # Verify the broker queue goes into a flowStopped state
+        deadline = time.time() + 1
+        while not q1.flowStopped and time.time() < deadline: q1.update()
+        assert q1.flowStopped
+        self.assertEqual(q1.flowStoppedCount, 1)
+        sender.assert_blocked()         # Still blocked
+
+        # Now verify the  both brokers in cluster have same configuration
+        qs = cluster[1].qmf_session.getObjects(_objectId=oid)
+        self.assertEqual(len(qs), 1)
+        q2 = qs[0]
+        self.assertEqual(q2.name, "flq")
+        self.assertEqual(q2.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L})
+        assert q2.flowStopped
+        self.assertEqual(q2.flowStoppedCount, 1)
+
+        # now delete the blocked queue from other broker
+        ssn1 = cluster[1].connect().session()
+        self.evaluate_address(ssn1, "flq;{delete:always}")
+        sender.wait()                   # Verify no longer blocked.
+
+        ssn0.connection.close()
+        ssn1.connection.close()
+        cluster_test_logs.verify_logs()
+
 
     def test_alternate_exchange_update(self):
         """Verify that alternate-exchange on exchanges and queues is propagated to new members of a cluster. """

Modified: qpid/trunk/qpid/cpp/src/tests/queue_flow_limit_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/queue_flow_limit_tests.py?rev=1099979&r1=1099978&r2=1099979&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/queue_flow_limit_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/queue_flow_limit_tests.py Thu May  5 21:41:40 2011
@@ -325,5 +325,47 @@ class QueueFlowLimitTests(TestBase010):
         self.verify_limit(TestQ(oid))
 
 
+    def test_blocked_queue_delete(self):
+        """ Verify that blocked senders are unblocked when a queue that is flow
+        controlled is deleted.
+        """
+
+        class BlockedSender(Thread):
+            def __init__(self, tester, queue, count, capacity=10):
+                self.tester = tester
+                self.queue = queue
+                self.count = count
+                self.capacity = capacity
+                Thread.__init__(self)
+                self.done = False
+                self.start()
+            def run(self):
+                # spawn qpid-send
+                p = self.tester._start_qpid_send(self.queue,
+                                                 self.count,
+                                                 self.capacity)
+                p.close()  # waits for qpid-send to complete
+                self.done = True
+
+        self.startQmf();
+        oid = self._create_queue("kill-q", stop_size=10, resume_size=2)
+        q = self.qmf.getObjects(_objectId=oid)[0]
+        self.failIf(q.flowStopped)
+
+        sender = BlockedSender(self, "kill-q", count=100)
+        # wait for flow control
+        deadline = time() + 10
+        while (not q.flowStopped) and time() < deadline:
+            q.update()
+
+        self.failUnless(q.flowStopped)
+        self.failIf(sender.done)   # sender blocked
+
+        self._delete_queue("kill-q")
+        sender.join(5)
+        self.failIf(sender.isAlive())
+        self.failUnless(sender.done)
+
+
 
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org