You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2009/09/03 20:22:11 UTC

svn commit: r811066 - in /qpid/trunk/qpid/python/qpid: messaging.py tests/messaging.py

Author: rhs
Date: Thu Sep  3 18:22:10 2009
New Revision: 811066

URL: http://svn.apache.org/viewvc?rev=811066&view=rev
Log:
added timeout option to send

Modified:
    qpid/trunk/qpid/python/qpid/messaging.py
    qpid/trunk/qpid/python/qpid/tests/messaging.py

Modified: qpid/trunk/qpid/python/qpid/messaging.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging.py?rev=811066&r1=811065&r2=811066&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging.py Thu Sep  3 18:22:10 2009
@@ -544,18 +544,25 @@
     return self.queued - self.acked
 
   @synchronized
-  def send(self, object, sync=True):
+  def send(self, object, sync=True, timeout=None):
     """
     Send a message. If the object passed in is of type L{unicode},
     L{str}, L{list}, or L{dict}, it will automatically be wrapped in a
     L{Message} and sent. If it is of type L{Message}, it will be sent
-    directly.
+    directly. If the sender capacity is not L{UNLIMITED} then send
+    will block until there is available capacity to send the message.
+    If the timeout parameter is specified, then send will throw an
+    L{InsufficientCapacity} exception if capacity does not become
+    available within the specified time.
 
     @type object: unicode, str, list, dict, Message
     @param object: the message or content to send
 
     @type sync: boolean
     @param sync: if true then block until the message is sent
+
+    @type timeout: float
+    @param timeout: the time to wait for available capacity
     """
 
     if not self.session.connection._connected or self.session.closing:
@@ -569,7 +576,8 @@
     if self.capacity is not UNLIMITED:
       if self.capacity <= 0:
         raise InsufficientCapacity("capacity = %s" % self.capacity)
-      self._ewait(lambda: self.pending() < self.capacity)
+      if not self._ewait(lambda: self.pending() < self.capacity, timeout=timeout):
+        raise InsufficientCapacity("capacity = %s" % self.capacity)
 
     # XXX: what if we send the same message to multiple senders?
     message._sender = self

Modified: qpid/trunk/qpid/python/qpid/tests/messaging.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging.py?rev=811066&r1=811065&r2=811066&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/messaging.py Thu Sep  3 18:22:10 2009
@@ -599,6 +599,22 @@
   def testSendAsyncCapacityUNLIMITED(self):
     self.asyncTest(UNLIMITED)
 
+  def testCapacityTimeout(self):
+    self.snd.capacity = 1
+    msgs = []
+    caught = False
+    while len(msgs) < 100:
+      m = self.content("testCapacity", len(msgs))
+      try:
+        self.snd.send(m, sync=False, timeout=0)
+        msgs.append(m)
+      except InsufficientCapacity:
+        caught = True
+        break
+    self.drain(self.rcv, expected=msgs)
+    self.ssn.acknowledge()
+    assert caught, "did not exceed capacity"
+
 class MessageTests(Base):
 
   def testCreateString(self):



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org