You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2010/08/02 14:10:52 UTC
svn commit: r981474 - in /qpid/trunk/qpid/python/qpid:
messaging/endpoints.py tests/messaging/__init__.py
tests/messaging/endpoints.py
Author: rhs
Date: Mon Aug 2 12:10:52 2010
New Revision: 981474
URL: http://svn.apache.org/viewvc?rev=981474&view=rev
Log:
fixed bug in flow control logic; added tests
Modified:
qpid/trunk/qpid/python/qpid/messaging/endpoints.py
qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py
qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py
Modified: qpid/trunk/qpid/python/qpid/messaging/endpoints.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/endpoints.py?rev=981474&r1=981473&r2=981474&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/endpoints.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/endpoints.py Mon Aug 2 12:10:52 2010
@@ -1009,8 +1009,9 @@ class Receiver(Endpoint, object):
if msg is None:
raise Empty()
elif self._capacity not in (0, UNLIMITED.value):
- if self.received - self.returned <= int(ceil(self.threshold * self._capacity)):
- self.granted = self.received + self._capacity
+ t = int(ceil(self.threshold * self._capacity))
+ if self.received - self.returned <= t:
+ self.granted = self.returned + self._capacity
self._wakeup()
return msg
@@ -1018,7 +1019,7 @@ class Receiver(Endpoint, object):
if self._capacity == UNLIMITED.value:
self.granted = UNLIMITED
else:
- self.granted = self.received + self._capacity
+ self.granted = self.returned + self._capacity
@synchronized
def close(self, timeout=None):
Modified: qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py?rev=981474&r1=981473&r2=981474&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py Mon Aug 2 12:10:52 2010
@@ -18,6 +18,7 @@
#
import time
+from math import ceil
from qpid.harness import Skipped
from qpid.messaging import *
from qpid.tests import Test
@@ -134,9 +135,23 @@ class Base(Test):
contents = self.drain(rcv)
assert len(contents) == 0, "%s is supposed to be empty: %s" % (rcv, contents)
- def assertAvailable(self, rcv, expected):
+ def assertAvailable(self, rcv, expected=None, lower=None, upper=None):
+ if expected is not None:
+ if lower is not None or upper is not None:
+ raise ValueError("cannot specify lower or upper when specifying expected")
+ lower = expected
+ upper = expected
+ else:
+ if lower is None:
+ lower = int(ceil(rcv.threshold*rcv.capacity))
+ if upper is None:
+ upper = rcv.capacity
+
p = rcv.available()
- assert p == expected, "expected %s, got %s" % (expected, p)
+ if upper == lower:
+ assert p == lower, "expected %s, got %s" % (lower, p)
+ else:
+ assert lower <= p <= upper, "expected %s to be in range [%s, %s]" % (p, lower, upper)
def sleep(self):
time.sleep(self.delay())
Modified: qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py?rev=981474&r1=981473&r2=981474&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py Mon Aug 2 12:10:52 2010
@@ -659,9 +659,9 @@ class ReceiverTests(Base):
def setup_receiver(self):
return self.ssn.receiver(RECEIVER_Q)
- def send(self, base, count = None):
+ def send(self, base, count = None, sync=True):
content = self.content(base, count)
- self.snd.send(content)
+ self.snd.send(content, sync=sync)
return content
def testFetch(self):
@@ -762,25 +762,51 @@ class ReceiverTests(Base):
self.ssn.acknowledge()
- def testCapacity(self):
- self.rcv.capacity = 5
+ def capacityTest(self, capacity, threshold=None):
+ if threshold is not None:
+ self.rcv.threshold = threshold
+ self.rcv.capacity = capacity
self.assertAvailable(self.rcv, 0)
- for i in range(15):
- self.send("testCapacity", i)
+ for i in range(2*capacity):
+ self.send("capacityTest(%s, %s)" % (capacity, threshold), i, sync=False)
+ self.snd.sync()
self.sleep()
- self.assertAvailable(self.rcv, 5)
+ self.assertAvailable(self.rcv)
- self.drain(self.rcv, limit = 5)
+ first = capacity/2
+ second = capacity - first
+ self.drain(self.rcv, limit = first)
+ self.sleep()
+ self.assertAvailable(self.rcv)
+ self.drain(self.rcv, limit = second)
self.sleep()
- self.assertAvailable(self.rcv, 5)
+ self.assertAvailable(self.rcv)
drained = self.drain(self.rcv)
- assert len(drained) == 10, "%s, %s" % (len(drained), drained)
+ assert len(drained) == capacity, "%s, %s" % (len(drained), drained)
self.assertAvailable(self.rcv, 0)
self.ssn.acknowledge()
+ def testCapacity5(self):
+ self.capacityTest(5)
+
+ def testCapacity5Threshold1(self):
+ self.capacityTest(5, 1)
+
+ def testCapacity10(self):
+ self.capacityTest(10)
+
+ def testCapacity10Threshold1(self):
+ self.capacityTest(10, 1)
+
+ def testCapacity100(self):
+ self.capacityTest(100)
+
+ def testCapacity100Threshold1(self):
+ self.capacityTest(100, 1)
+
def testCapacityUNLIMITED(self):
self.rcv.capacity = UNLIMITED
self.assertAvailable(self.rcv, 0)
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org