You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2009/08/31 21:32:35 UTC
svn commit: r809707 - in /qpid/trunk/qpid/python: Makefile qpid/messaging.py
qpid/tests/messaging.py
Author: rhs
Date: Mon Aug 31 19:32:35 2009
New Revision: 809707
URL: http://svn.apache.org/viewvc?rev=809707&view=rev
Log:
added async send to the API; minor doc updates
Modified:
qpid/trunk/qpid/python/Makefile
qpid/trunk/qpid/python/qpid/messaging.py
qpid/trunk/qpid/python/qpid/tests/messaging.py
Modified: qpid/trunk/qpid/python/Makefile
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/Makefile?rev=809707&r1=809706&r2=809707&view=diff
==============================================================================
--- qpid/trunk/qpid/python/Makefile (original)
+++ qpid/trunk/qpid/python/Makefile Mon Aug 31 19:32:35 2009
@@ -50,7 +50,7 @@
doc:
@mkdir -p $(BUILD)
- epydoc qpid/messaging.py -o $(BUILD)/doc --no-private --no-sourcecode --include-log
+ epydoc qpid.messaging -o $(BUILD)/doc --no-private --no-sourcecode --include-log
install: build
install -d $(PYTHON_LIB)
Modified: qpid/trunk/qpid/python/qpid/messaging.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging.py?rev=809707&r1=809706&r2=809707&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging.py Mon Aug 31 19:32:35 2009
@@ -559,6 +559,9 @@
class SendError(SessionError):
pass
+class InsufficientCapacity(SendError):
+ pass
+
class Sender(Lockable):
"""
@@ -569,6 +572,9 @@
self.session = session
self.index = index
self.target = target
+ self.capacity = UNLIMITED
+ self.queued = Serial(0)
+ self.acked = Serial(0)
self.closed = False
self._lock = self.session._lock
self._condition = self.session._condition
@@ -586,7 +592,16 @@
return self.session.ewait(predicate, timeout, exc)
@synchronized
- def send(self, object):
+ def pending(self):
+ """
+ Returns the number of messages awaiting acknowledgment.
+ @rtype: int
+ @return: the number of unacknowledged messages
+ """
+ return self.queued - self.acked
+
+ @synchronized
+ def send(self, object, sync=True):
"""
Send a message. If the object passed in is of type L{unicode},
L{str}, L{list}, or L{dict}, it will automatically be wrapped in a
@@ -595,6 +610,9 @@
@type object: unicode, str, list, dict, Message
@param object: the message or content to send
+
+ @type sync: boolean
+ @param sync: if true then block until the message is sent
"""
if not self.session.connection._connected or self.session.closing:
@@ -605,12 +623,22 @@
else:
message = Message(object)
+ if self.capacity is not UNLIMITED:
+ if self.capacity <= 0:
+ raise InsufficientCapacity("capacity = %s" % self.capacity)
+ self.ewait(lambda: self.pending() < self.capacity)
+
# XXX: what if we send the same message to multiple senders?
message._sender = self
self.session.outgoing.append(message)
+ self.queued += 1
+ mno = self.queued
self.wakeup()
- self.ewait(lambda: message not in self.session.outgoing)
+
+ if sync:
+ self.ewait(lambda: self.acked >= mno)
+ assert message not in self.session.outgoing
@synchronized
def close(self):
@@ -675,6 +703,13 @@
@synchronized
def pending(self):
+ """
+ Returns the number of messages available to be fetched by the
+ application.
+
+ @rtype: int
+ @return: the number of available messages
+ """
return self.received - self.returned
def _capacity(self):
@@ -1203,6 +1238,7 @@
# XXX: really need to make this async so that we don't give up the lock
_ssn.sync()
# XXX: should we log the ack somehow too?
+ snd.acked += 1
@synchronized
def _message_transfer(self, ssn, cmd):
@@ -1241,5 +1277,5 @@
msg._transfer_id = message.id
return msg
-__all__ = ["Connection", "Pattern", "Session", "Sender", "Receiver", "Message",
- "Empty", "timestamp", "uuid4"]
+__all__ = ["Connection", "ConnectionError", "ConnectError", "Pattern", "Session", "Sender", "Receiver", "Message",
+ "ReceiveError", "Empty", "SendError", "InsufficientCapacity", "timestamp", "uuid4"]
Modified: qpid/trunk/qpid/python/qpid/tests/messaging.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging.py?rev=809707&r1=809706&r2=809707&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/messaging.py Mon Aug 31 19:32:35 2009
@@ -23,7 +23,8 @@
import time
from qpid.tests import Test
from qpid.harness import Skipped
-from qpid.messaging import Connection, ConnectError, Disconnected, Empty, Message, UNLIMITED, uuid4
+from qpid.messaging import Connection, ConnectError, Disconnected, Empty, \
+ InsufficientCapacity, Message, UNLIMITED, uuid4
from Queue import Queue, Empty as QueueEmpty
class Base(Test):
@@ -71,11 +72,11 @@
ssn.acknowledge()
assert msg.content == content, "expected %r, got %r" % (content, msg.content)
- def drain(self, rcv, limit=None):
+ def drain(self, rcv, limit=None, timeout=0):
contents = []
try:
while limit is None or len(contents) < limit:
- contents.append(rcv.fetch(0).content)
+ contents.append(rcv.fetch(timeout=timeout).content)
except Empty:
pass
return contents
@@ -543,6 +544,32 @@
def testSendMap(self):
self.checkContent({"testSendMap": self.test_id, "pie": "blueberry", "pi": 3.14})
+ def asyncTest(self, capacity):
+ self.snd.capacity = capacity
+ msgs = [self.content("asyncTest", i) for i in range(15)]
+ for m in msgs:
+ self.snd.send(m, sync=False)
+ drained = self.drain(self.rcv, timeout=self.delay())
+ assert msgs == drained, "expected %s, got %s" % (msgs, drained)
+ self.ssn.acknowledge()
+
+ def testSendAsyncCapacity0(self):
+ try:
+ self.asyncTest(0)
+ assert False, "send shouldn't succeed with zero capacity"
+ except InsufficientCapacity:
+ # this is expected
+ pass
+
+ def testSendAsyncCapacity1(self):
+ self.asyncTest(1)
+
+ def testSendAsyncCapacity5(self):
+ self.asyncTest(5)
+
+ def testSendAsyncCapacityUNLIMITED(self):
+ self.asyncTest(UNLIMITED)
+
class MessageTests(Base):
def testCreateString(self):
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org