You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2009/09/02 14:58:05 UTC
svn commit: r810495 - in /qpid/trunk/qpid/python/qpid: messaging.py
tests/messaging.py
Author: rhs
Date: Wed Sep 2 12:58:05 2009
New Revision: 810495
URL: http://svn.apache.org/viewvc?rev=810495&view=rev
Log:
added sync flag to acknowledge and ack_capcity to Session
Modified:
qpid/trunk/qpid/python/qpid/messaging.py
qpid/trunk/qpid/python/qpid/tests/messaging.py
Modified: qpid/trunk/qpid/python/qpid/messaging.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging.py?rev=810495&r1=810494&r2=810495&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging.py Wed Sep 2 12:58:05 2009
@@ -339,6 +339,8 @@
self.incoming = []
self.unacked = []
self.acked = []
+ # XXX: I hate this name.
+ self.ack_capacity = UNLIMITED
self.closing = False
self.closed = False
@@ -437,13 +439,15 @@
return None
@synchronized
- def acknowledge(self, message=None):
+ def acknowledge(self, message=None, sync=True):
"""
Acknowledge the given L{Message}. If message is None, then all
unacknowledged messages on the session are acknowledged.
@type message: Message
@param message: the message to acknowledge or None
+ @type sync: boolean
+ @param sync: if true then block until the message(s) are acknowledged
"""
if message is None:
messages = self.unacked[:]
@@ -451,12 +455,18 @@
messages = [message]
for m in messages:
+ if self.ack_capacity is not UNLIMITED:
+ if self.ack_capacity <= 0:
+ # XXX: this is currently a SendError, maybe it should be a SessionError?
+ raise InsufficientCapacity("ack_capacity = %s" % self.ack_capacity)
+ self.wakeup()
+ self.ewait(lambda: len(self.acked) < self.ack_capacity)
self.unacked.remove(m)
self.acked.append(m)
self.wakeup()
- self.wait(lambda: self.connection.error or not [m for m in messages if m in self.acked])
- self.check_error()
+ if sync:
+ self.ewait(lambda: not [m for m in messages if m in self.acked])
@synchronized
def commit(self):
@@ -539,6 +549,8 @@
while self.thread.isAlive():
self.thread.join(3)
self.thread = None
+ # XXX: should be able to express this condition through API calls
+ self.ewait(lambda: not self.outgoing and not self.acked)
self.connection._remove_session(self)
def parse_addr(address):
@@ -1116,6 +1128,7 @@
# XXX: really need to make this async so that we don't give up the lock
_ssn.sync()
+ # XXX: we're ignoring acks that get lost when disconnected
for m in messages:
ssn.acked.remove(m)
if ssn.transactional:
Modified: qpid/trunk/qpid/python/qpid/tests/messaging.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging.py?rev=810495&r1=810494&r2=810495&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/messaging.py Wed Sep 2 12:58:05 2009
@@ -72,13 +72,15 @@
ssn.acknowledge()
assert msg.content == content, "expected %r, got %r" % (content, msg.content)
- def drain(self, rcv, limit=None, timeout=0):
+ def drain(self, rcv, limit=None, timeout=0, expected=None):
contents = []
try:
while limit is None or len(contents) < limit:
contents.append(rcv.fetch(timeout=timeout).content)
except Empty:
pass
+ if expected is not None:
+ assert expected == contents, "expected %s, got %s" % (expected, contents)
return contents
def assertEmpty(self, rcv):
@@ -225,27 +227,27 @@
# XXX, we need a convenient way to assert that required queues are
# empty on setup, and possibly also to drain queues on teardown
- def testAcknowledge(self):
+ def ackTest(self, acker, ack_capacity=None):
# send a bunch of messages
snd = self.ssn.sender("test-ack-queue")
- tid = "a"
- contents = ["testAcknowledge[%s, %s]" % (i, tid) for i in range(10)]
+ contents = [self.content("ackTest", i) for i in range(15)]
for c in contents:
snd.send(c)
# drain the queue, verify the messages are there and then close
# without acking
rcv = self.ssn.receiver(snd.target)
- assert contents == self.drain(rcv)
+ self.drain(rcv, expected=contents)
self.ssn.close()
# drain the queue again, verify that they are all the messages
# were requeued, and ack this time before closing
self.ssn = self.conn.session()
+ if ack_capacity is not None:
+ self.ssn.ack_capacity = ack_capacity
rcv = self.ssn.receiver("test-ack-queue")
- drained = self.drain(rcv)
- assert contents == drained, "expected %s, got %s" % (contents, drained)
- self.ssn.acknowledge()
+ self.drain(rcv, expected=contents)
+ acker(self.ssn)
self.ssn.close()
# drain the queue a final time and verify that the messages were
@@ -254,6 +256,32 @@
rcv = self.ssn.receiver("test-ack-queue")
self.assertEmpty(rcv)
+ def testAcknowledge(self):
+ self.ackTest(lambda ssn: ssn.acknowledge())
+
+ def testAcknowledgeAsync(self):
+ self.ackTest(lambda ssn: ssn.acknowledge(sync=False))
+
+ def testAcknowledgeAsyncAckCap0(self):
+ try:
+ self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 0)
+ assert False, "acknowledge shouldn't succeed with ack_capacity of zero"
+ except InsufficientCapacity:
+ pass
+ finally:
+ self.ssn.ack_capacity = UNLIMITED
+ self.drain(self.ssn.receiver("test-ack-queue"))
+ self.ssn.acknowledge()
+
+ def testAcknowledgeAsyncAckCap1(self):
+ self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 1)
+
+ def testAcknowledgeAsyncAckCap5(self):
+ self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 5)
+
+ def testAcknowledgeAsyncAckCapUNLIMITED(self):
+ self.ackTest(lambda ssn: ssn.acknowledge(sync=False), UNLIMITED)
+
def send(self, ssn, queue, base, count=1):
snd = ssn.sender(queue)
contents = []
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org