You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2009/09/02 14:58:05 UTC

svn commit: r810495 - in /qpid/trunk/qpid/python/qpid: messaging.py tests/messaging.py

Author: rhs
Date: Wed Sep  2 12:58:05 2009
New Revision: 810495

URL: http://svn.apache.org/viewvc?rev=810495&view=rev
Log:
added sync flag to acknowledge and ack_capcity to Session

Modified:
    qpid/trunk/qpid/python/qpid/messaging.py
    qpid/trunk/qpid/python/qpid/tests/messaging.py

Modified: qpid/trunk/qpid/python/qpid/messaging.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging.py?rev=810495&r1=810494&r2=810495&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging.py Wed Sep  2 12:58:05 2009
@@ -339,6 +339,8 @@
     self.incoming = []
     self.unacked = []
     self.acked = []
+    # XXX: I hate this name.
+    self.ack_capacity = UNLIMITED
 
     self.closing = False
     self.closed = False
@@ -437,13 +439,15 @@
     return None
 
   @synchronized
-  def acknowledge(self, message=None):
+  def acknowledge(self, message=None, sync=True):
     """
     Acknowledge the given L{Message}. If message is None, then all
     unacknowledged messages on the session are acknowledged.
 
     @type message: Message
     @param message: the message to acknowledge or None
+    @type sync: boolean
+    @param sync: if true then block until the message(s) are acknowledged
     """
     if message is None:
       messages = self.unacked[:]
@@ -451,12 +455,18 @@
       messages = [message]
 
     for m in messages:
+      if self.ack_capacity is not UNLIMITED:
+        if self.ack_capacity <= 0:
+          # XXX: this is currently a SendError, maybe it should be a SessionError?
+          raise InsufficientCapacity("ack_capacity = %s" % self.ack_capacity)
+        self.wakeup()
+        self.ewait(lambda: len(self.acked) < self.ack_capacity)
       self.unacked.remove(m)
       self.acked.append(m)
 
     self.wakeup()
-    self.wait(lambda: self.connection.error or not [m for m in messages if m in self.acked])
-    self.check_error()
+    if sync:
+      self.ewait(lambda: not [m for m in messages if m in self.acked])
 
   @synchronized
   def commit(self):
@@ -539,6 +549,8 @@
     while self.thread.isAlive():
       self.thread.join(3)
     self.thread = None
+    # XXX: should be able to express this condition through API calls
+    self.ewait(lambda: not self.outgoing and not self.acked)
     self.connection._remove_session(self)
 
 def parse_addr(address):
@@ -1116,6 +1128,7 @@
       # XXX: really need to make this async so that we don't give up the lock
       _ssn.sync()
 
+      # XXX: we're ignoring acks that get lost when disconnected
       for m in messages:
         ssn.acked.remove(m)
         if ssn.transactional:

Modified: qpid/trunk/qpid/python/qpid/tests/messaging.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging.py?rev=810495&r1=810494&r2=810495&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/messaging.py Wed Sep  2 12:58:05 2009
@@ -72,13 +72,15 @@
     ssn.acknowledge()
     assert msg.content == content, "expected %r, got %r" % (content, msg.content)
 
-  def drain(self, rcv, limit=None, timeout=0):
+  def drain(self, rcv, limit=None, timeout=0, expected=None):
     contents = []
     try:
       while limit is None or len(contents) < limit:
         contents.append(rcv.fetch(timeout=timeout).content)
     except Empty:
       pass
+    if expected is not None:
+      assert expected == contents, "expected %s, got %s" % (expected, contents)
     return contents
 
   def assertEmpty(self, rcv):
@@ -225,27 +227,27 @@
 
   # XXX, we need a convenient way to assert that required queues are
   # empty on setup, and possibly also to drain queues on teardown
-  def testAcknowledge(self):
+  def ackTest(self, acker, ack_capacity=None):
     # send a bunch of messages
     snd = self.ssn.sender("test-ack-queue")
-    tid = "a"
-    contents = ["testAcknowledge[%s, %s]" % (i, tid) for i in range(10)]
+    contents = [self.content("ackTest", i) for i in range(15)]
     for c in contents:
       snd.send(c)
 
     # drain the queue, verify the messages are there and then close
     # without acking
     rcv = self.ssn.receiver(snd.target)
-    assert contents == self.drain(rcv)
+    self.drain(rcv, expected=contents)
     self.ssn.close()
 
     # drain the queue again, verify that they are all the messages
     # were requeued, and ack this time before closing
     self.ssn = self.conn.session()
+    if ack_capacity is not None:
+      self.ssn.ack_capacity = ack_capacity
     rcv = self.ssn.receiver("test-ack-queue")
-    drained = self.drain(rcv)
-    assert contents == drained, "expected %s, got %s" % (contents, drained)
-    self.ssn.acknowledge()
+    self.drain(rcv, expected=contents)
+    acker(self.ssn)
     self.ssn.close()
 
     # drain the queue a final time and verify that the messages were
@@ -254,6 +256,32 @@
     rcv = self.ssn.receiver("test-ack-queue")
     self.assertEmpty(rcv)
 
+  def testAcknowledge(self):
+    self.ackTest(lambda ssn: ssn.acknowledge())
+
+  def testAcknowledgeAsync(self):
+    self.ackTest(lambda ssn: ssn.acknowledge(sync=False))
+
+  def testAcknowledgeAsyncAckCap0(self):
+    try:
+      self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 0)
+      assert False, "acknowledge shouldn't succeed with ack_capacity of zero"
+    except InsufficientCapacity:
+      pass
+    finally:
+      self.ssn.ack_capacity = UNLIMITED
+      self.drain(self.ssn.receiver("test-ack-queue"))
+      self.ssn.acknowledge()
+
+  def testAcknowledgeAsyncAckCap1(self):
+    self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 1)
+
+  def testAcknowledgeAsyncAckCap5(self):
+    self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 5)
+
+  def testAcknowledgeAsyncAckCapUNLIMITED(self):
+    self.ackTest(lambda ssn: ssn.acknowledge(sync=False), UNLIMITED)
+
   def send(self, ssn, queue, base, count=1):
     snd = ssn.sender(queue)
     contents = []



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org