You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2009/09/03 20:22:11 UTC
svn commit: r811066 - in /qpid/trunk/qpid/python/qpid: messaging.py
tests/messaging.py
Author: rhs
Date: Thu Sep 3 18:22:10 2009
New Revision: 811066
URL: http://svn.apache.org/viewvc?rev=811066&view=rev
Log:
added timeout option to send
Modified:
qpid/trunk/qpid/python/qpid/messaging.py
qpid/trunk/qpid/python/qpid/tests/messaging.py
Modified: qpid/trunk/qpid/python/qpid/messaging.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging.py?rev=811066&r1=811065&r2=811066&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging.py Thu Sep 3 18:22:10 2009
@@ -544,18 +544,25 @@
return self.queued - self.acked
@synchronized
- def send(self, object, sync=True):
+ def send(self, object, sync=True, timeout=None):
"""
Send a message. If the object passed in is of type L{unicode},
L{str}, L{list}, or L{dict}, it will automatically be wrapped in a
L{Message} and sent. If it is of type L{Message}, it will be sent
- directly.
+ directly. If the sender capacity is not L{UNLIMITED} then send
+ will block until there is available capacity to send the message.
+ If the timeout parameter is specified, then send will throw an
+ L{InsufficientCapacity} exception if capacity does not become
+ available within the specified time.
@type object: unicode, str, list, dict, Message
@param object: the message or content to send
@type sync: boolean
@param sync: if true then block until the message is sent
+
+ @type timeout: float
+ @param timeout: the time to wait for available capacity
"""
if not self.session.connection._connected or self.session.closing:
@@ -569,7 +576,8 @@
if self.capacity is not UNLIMITED:
if self.capacity <= 0:
raise InsufficientCapacity("capacity = %s" % self.capacity)
- self._ewait(lambda: self.pending() < self.capacity)
+ if not self._ewait(lambda: self.pending() < self.capacity, timeout=timeout):
+ raise InsufficientCapacity("capacity = %s" % self.capacity)
# XXX: what if we send the same message to multiple senders?
message._sender = self
Modified: qpid/trunk/qpid/python/qpid/tests/messaging.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging.py?rev=811066&r1=811065&r2=811066&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/messaging.py Thu Sep 3 18:22:10 2009
@@ -599,6 +599,22 @@
def testSendAsyncCapacityUNLIMITED(self):
self.asyncTest(UNLIMITED)
+ def testCapacityTimeout(self):
+ self.snd.capacity = 1
+ msgs = []
+ caught = False
+ while len(msgs) < 100:
+ m = self.content("testCapacity", len(msgs))
+ try:
+ self.snd.send(m, sync=False, timeout=0)
+ msgs.append(m)
+ except InsufficientCapacity:
+ caught = True
+ break
+ self.drain(self.rcv, expected=msgs)
+ self.ssn.acknowledge()
+ assert caught, "did not exceed capacity"
+
class MessageTests(Base):
def testCreateString(self):
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org