You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2010/08/02 14:10:52 UTC

svn commit: r981474 - in /qpid/trunk/qpid/python/qpid: messaging/endpoints.py tests/messaging/__init__.py tests/messaging/endpoints.py

Author: rhs
Date: Mon Aug  2 12:10:52 2010
New Revision: 981474

URL: http://svn.apache.org/viewvc?rev=981474&view=rev
Log:
fixed bug in flow control logic; added tests

Modified:
    qpid/trunk/qpid/python/qpid/messaging/endpoints.py
    qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py
    qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py

Modified: qpid/trunk/qpid/python/qpid/messaging/endpoints.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/endpoints.py?rev=981474&r1=981473&r2=981474&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/endpoints.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/endpoints.py Mon Aug  2 12:10:52 2010
@@ -1009,8 +1009,9 @@ class Receiver(Endpoint, object):
       if msg is None:
         raise Empty()
     elif self._capacity not in (0, UNLIMITED.value):
-      if self.received - self.returned <= int(ceil(self.threshold * self._capacity)):
-        self.granted = self.received + self._capacity
+      t = int(ceil(self.threshold * self._capacity))
+      if self.received - self.returned <= t:
+        self.granted = self.returned + self._capacity
         self._wakeup()
     return msg
 
@@ -1018,7 +1019,7 @@ class Receiver(Endpoint, object):
     if self._capacity == UNLIMITED.value:
       self.granted = UNLIMITED
     else:
-      self.granted = self.received + self._capacity
+      self.granted = self.returned + self._capacity
 
   @synchronized
   def close(self, timeout=None):

Modified: qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py?rev=981474&r1=981473&r2=981474&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py Mon Aug  2 12:10:52 2010
@@ -18,6 +18,7 @@
 #
 
 import time
+from math import ceil
 from qpid.harness import Skipped
 from qpid.messaging import *
 from qpid.tests import Test
@@ -134,9 +135,23 @@ class Base(Test):
     contents = self.drain(rcv)
     assert len(contents) == 0, "%s is supposed to be empty: %s" % (rcv, contents)
 
-  def assertAvailable(self, rcv, expected):
+  def assertAvailable(self, rcv, expected=None, lower=None, upper=None):
+    if expected is not None:
+      if lower is not None or upper is not None:
+        raise ValueError("cannot specify lower or upper when specifying expected")
+      lower = expected
+      upper = expected
+    else:
+      if lower is None:
+        lower = int(ceil(rcv.threshold*rcv.capacity))
+      if upper is None:
+        upper = rcv.capacity
+
     p = rcv.available()
-    assert p == expected, "expected %s, got %s" % (expected, p)
+    if upper == lower:
+      assert p == lower, "expected %s, got %s" % (lower, p)
+    else:
+      assert lower <= p <= upper, "expected %s to be in range [%s, %s]" % (p, lower, upper)
 
   def sleep(self):
     time.sleep(self.delay())

Modified: qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py?rev=981474&r1=981473&r2=981474&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py Mon Aug  2 12:10:52 2010
@@ -659,9 +659,9 @@ class ReceiverTests(Base):
   def setup_receiver(self):
     return self.ssn.receiver(RECEIVER_Q)
 
-  def send(self, base, count = None):
+  def send(self, base, count = None, sync=True):
     content = self.content(base, count)
-    self.snd.send(content)
+    self.snd.send(content, sync=sync)
     return content
 
   def testFetch(self):
@@ -762,25 +762,51 @@ class ReceiverTests(Base):
 
     self.ssn.acknowledge()
 
-  def testCapacity(self):
-    self.rcv.capacity = 5
+  def capacityTest(self, capacity, threshold=None):
+    if threshold is not None:
+      self.rcv.threshold = threshold
+    self.rcv.capacity = capacity
     self.assertAvailable(self.rcv, 0)
 
-    for i in range(15):
-      self.send("testCapacity", i)
+    for i in range(2*capacity):
+      self.send("capacityTest(%s, %s)" % (capacity, threshold), i, sync=False)
+    self.snd.sync()
     self.sleep()
-    self.assertAvailable(self.rcv, 5)
+    self.assertAvailable(self.rcv)
 
-    self.drain(self.rcv, limit = 5)
+    first = capacity/2
+    second = capacity - first
+    self.drain(self.rcv, limit = first)
+    self.sleep()
+    self.assertAvailable(self.rcv)
+    self.drain(self.rcv, limit = second)
     self.sleep()
-    self.assertAvailable(self.rcv, 5)
+    self.assertAvailable(self.rcv)
 
     drained = self.drain(self.rcv)
-    assert len(drained) == 10, "%s, %s" % (len(drained), drained)
+    assert len(drained) == capacity, "%s, %s" % (len(drained), drained)
     self.assertAvailable(self.rcv, 0)
 
     self.ssn.acknowledge()
 
+  def testCapacity5(self):
+    self.capacityTest(5)
+
+  def testCapacity5Threshold1(self):
+    self.capacityTest(5, 1)
+
+  def testCapacity10(self):
+    self.capacityTest(10)
+
+  def testCapacity10Threshold1(self):
+    self.capacityTest(10, 1)
+
+  def testCapacity100(self):
+    self.capacityTest(100)
+
+  def testCapacity100Threshold1(self):
+    self.capacityTest(100, 1)
+
   def testCapacityUNLIMITED(self):
     self.rcv.capacity = UNLIMITED
     self.assertAvailable(self.rcv, 0)



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org