You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2009/08/31 21:32:35 UTC

svn commit: r809707 - in /qpid/trunk/qpid/python: Makefile qpid/messaging.py qpid/tests/messaging.py

Author: rhs
Date: Mon Aug 31 19:32:35 2009
New Revision: 809707

URL: http://svn.apache.org/viewvc?rev=809707&view=rev
Log:
added async send to the API; minor doc updates

Modified:
    qpid/trunk/qpid/python/Makefile
    qpid/trunk/qpid/python/qpid/messaging.py
    qpid/trunk/qpid/python/qpid/tests/messaging.py

Modified: qpid/trunk/qpid/python/Makefile
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/Makefile?rev=809707&r1=809706&r2=809707&view=diff
==============================================================================
--- qpid/trunk/qpid/python/Makefile (original)
+++ qpid/trunk/qpid/python/Makefile Mon Aug 31 19:32:35 2009
@@ -50,7 +50,7 @@
 
 doc:
 	@mkdir -p $(BUILD)
-	epydoc qpid/messaging.py -o $(BUILD)/doc --no-private --no-sourcecode --include-log
+	epydoc qpid.messaging -o $(BUILD)/doc --no-private --no-sourcecode --include-log
 
 install: build
 	install -d $(PYTHON_LIB)

Modified: qpid/trunk/qpid/python/qpid/messaging.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging.py?rev=809707&r1=809706&r2=809707&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging.py Mon Aug 31 19:32:35 2009
@@ -559,6 +559,9 @@
 class SendError(SessionError):
   pass
 
+class InsufficientCapacity(SendError):
+  pass
+
 class Sender(Lockable):
 
   """
@@ -569,6 +572,9 @@
     self.session = session
     self.index = index
     self.target = target
+    self.capacity = UNLIMITED
+    self.queued = Serial(0)
+    self.acked = Serial(0)
     self.closed = False
     self._lock = self.session._lock
     self._condition = self.session._condition
@@ -586,7 +592,16 @@
     return self.session.ewait(predicate, timeout, exc)
 
   @synchronized
-  def send(self, object):
+  def pending(self):
+    """
+    Returns the number of messages awaiting acknowledgment.
+    @rtype: int
+    @return: the number of unacknowledged messages
+    """
+    return self.queued - self.acked
+
+  @synchronized
+  def send(self, object, sync=True):
     """
     Send a message. If the object passed in is of type L{unicode},
     L{str}, L{list}, or L{dict}, it will automatically be wrapped in a
@@ -595,6 +610,9 @@
 
     @type object: unicode, str, list, dict, Message
     @param object: the message or content to send
+
+    @type sync: boolean
+    @param sync: if true then block until the message is sent
     """
 
     if not self.session.connection._connected or self.session.closing:
@@ -605,12 +623,22 @@
     else:
       message = Message(object)
 
+    if self.capacity is not UNLIMITED:
+      if self.capacity <= 0:
+        raise InsufficientCapacity("capacity = %s" % self.capacity)
+      self.ewait(lambda: self.pending() < self.capacity)
+
     # XXX: what if we send the same message to multiple senders?
     message._sender = self
     self.session.outgoing.append(message)
+    self.queued += 1
+    mno = self.queued
 
     self.wakeup()
-    self.ewait(lambda: message not in self.session.outgoing)
+
+    if sync:
+      self.ewait(lambda: self.acked >= mno)
+      assert message not in self.session.outgoing
 
   @synchronized
   def close(self):
@@ -675,6 +703,13 @@
 
   @synchronized
   def pending(self):
+    """
+    Returns the number of messages available to be fetched by the
+    application.
+
+    @rtype: int
+    @return: the number of available messages
+    """
     return self.received - self.returned
 
   def _capacity(self):
@@ -1203,6 +1238,7 @@
     # XXX: really need to make this async so that we don't give up the lock
     _ssn.sync()
     # XXX: should we log the ack somehow too?
+    snd.acked += 1
 
   @synchronized
   def _message_transfer(self, ssn, cmd):
@@ -1241,5 +1277,5 @@
     msg._transfer_id = message.id
     return msg
 
-__all__ = ["Connection", "Pattern", "Session", "Sender", "Receiver", "Message",
-           "Empty", "timestamp", "uuid4"]
+__all__ = ["Connection", "ConnectionError", "ConnectError", "Pattern", "Session", "Sender", "Receiver", "Message",
+           "ReceiveError", "Empty", "SendError", "InsufficientCapacity", "timestamp", "uuid4"]

Modified: qpid/trunk/qpid/python/qpid/tests/messaging.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging.py?rev=809707&r1=809706&r2=809707&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/messaging.py Mon Aug 31 19:32:35 2009
@@ -23,7 +23,8 @@
 import time
 from qpid.tests import Test
 from qpid.harness import Skipped
-from qpid.messaging import Connection, ConnectError, Disconnected, Empty, Message, UNLIMITED, uuid4
+from qpid.messaging import Connection, ConnectError, Disconnected, Empty, \
+    InsufficientCapacity, Message, UNLIMITED, uuid4
 from Queue import Queue, Empty as QueueEmpty
 
 class Base(Test):
@@ -71,11 +72,11 @@
     ssn.acknowledge()
     assert msg.content == content, "expected %r, got %r" % (content, msg.content)
 
-  def drain(self, rcv, limit=None):
+  def drain(self, rcv, limit=None, timeout=0):
     contents = []
     try:
       while limit is None or len(contents) < limit:
-        contents.append(rcv.fetch(0).content)
+        contents.append(rcv.fetch(timeout=timeout).content)
     except Empty:
       pass
     return contents
@@ -543,6 +544,32 @@
   def testSendMap(self):
     self.checkContent({"testSendMap": self.test_id, "pie": "blueberry", "pi": 3.14})
 
+  def asyncTest(self, capacity):
+    self.snd.capacity = capacity
+    msgs = [self.content("asyncTest", i) for i in range(15)]
+    for m in msgs:
+      self.snd.send(m, sync=False)
+    drained = self.drain(self.rcv, timeout=self.delay())
+    assert msgs == drained, "expected %s, got %s" % (msgs, drained)
+    self.ssn.acknowledge()
+
+  def testSendAsyncCapacity0(self):
+    try:
+      self.asyncTest(0)
+      assert False, "send shouldn't succeed with zero capacity"
+    except InsufficientCapacity:
+      # this is expected
+      pass
+
+  def testSendAsyncCapacity1(self):
+    self.asyncTest(1)
+
+  def testSendAsyncCapacity5(self):
+    self.asyncTest(5)
+
+  def testSendAsyncCapacityUNLIMITED(self):
+    self.asyncTest(UNLIMITED)
+
 class MessageTests(Base):
 
   def testCreateString(self):



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org