You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2010/03/11 01:03:25 UTC
svn commit: r921638 - in /qpid/trunk/qpid/python/qpid:
messaging/constants.py messaging/driver.py messaging/endpoints.py
messaging/message.py tests/messaging/__init__.py tests/messaging/endpoints.py
Author: rhs
Date: Thu Mar 11 00:03:25 2010
New Revision: 921638
URL: http://svn.apache.org/viewvc?rev=921638&view=rev
Log:
added support for reject/release
Modified:
qpid/trunk/qpid/python/qpid/messaging/constants.py
qpid/trunk/qpid/python/qpid/messaging/driver.py
qpid/trunk/qpid/python/qpid/messaging/endpoints.py
qpid/trunk/qpid/python/qpid/messaging/message.py
qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py
qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py
Modified: qpid/trunk/qpid/python/qpid/messaging/constants.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/constants.py?rev=921638&r1=921637&r2=921638&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/constants.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/constants.py Thu Mar 11 00:03:25 2010
@@ -17,11 +17,16 @@
# under the License.
#
+__SELF__ = object()
+
class Constant:
- def __init__(self, name, value=None):
+ def __init__(self, name, value=__SELF__):
self.name = name
- self.value = value
+ if value is __SELF__:
+ self.value = self
+ else:
+ self.value = value
def __repr__(self):
return self.name
@@ -30,3 +35,6 @@ AMQP_PORT = 5672
AMQPS_PORT = 5671
UNLIMITED = Constant("UNLIMITED", 0xFFFFFFFFL)
+
+REJECTED = Constant("REJECTED")
+RELEASED = Constant("RELEASED")
Modified: qpid/trunk/qpid/python/qpid/messaging/driver.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/driver.py?rev=921638&r1=921637&r2=921638&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/driver.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/driver.py Thu Mar 11 00:03:25 2010
@@ -18,7 +18,7 @@
#
import socket, struct, sys, time
-from logging import getLogger
+from logging import getLogger, DEBUG
from qpid import compat
from qpid import sasl
from qpid.concurrency import synchronized
@@ -27,9 +27,9 @@ from qpid.exceptions import Timeout, Ver
from qpid.framing import OpEncoder, SegmentEncoder, FrameEncoder, \
FrameDecoder, SegmentDecoder, OpDecoder
from qpid.messaging import address
-from qpid.messaging.constants import UNLIMITED
+from qpid.messaging.constants import UNLIMITED, REJECTED, RELEASED
from qpid.messaging.exceptions import ConnectError
-from qpid.messaging.message import get_codec, Message
+from qpid.messaging.message import get_codec, Disposition, Message
from qpid.ops import *
from qpid.selector import Selector
from qpid.util import connect
@@ -435,6 +435,8 @@ class Driver:
self._host = (self._host + 1) % len(self._hosts)
self.close_engine(e)
+DEFAULT_DISPOSITION = Disposition(None)
+
class Engine:
def __init__(self, connection):
@@ -915,19 +917,49 @@ class Engine:
if ssn.acked:
messages = [m for m in ssn.acked if m not in sst.acked]
if messages:
- # XXX: we're ignoring acks that get lost when disconnected,
- # could we deal this via some message-id based purge?
- ids = RangedSet(*[m._transfer_id for m in messages if m._transfer_id is not None])
+ ids = RangedSet()
+
+ disposed = [(DEFAULT_DISPOSITION, [])]
+ for m in messages:
+ # XXX: we're ignoring acks that get lost when disconnected,
+ # could we deal this via some message-id based purge?
+ if m._transfer_id is None:
+ continue
+ ids.add(m._transfer_id)
+ disp = m._disposition or DEFAULT_DISPOSITION
+ last, msgs = disposed[-1]
+ if disp.type is last.type and disp.options == last.options:
+ msgs.append(m)
+ else:
+ disposed.append((disp, [m]))
+
for range in ids:
sst.executed.add_range(range)
sst.write_op(SessionCompleted(sst.executed))
- def ack_ack():
- for m in messages:
- ssn.acked.remove(m)
- if not ssn.transactional:
- sst.acked.remove(m)
- sst.write_cmd(MessageAccept(ids), ack_ack)
- log.debug("SACK[%s]: %s", ssn.log_id, m)
+
+ def ack_acker(msgs):
+ def ack_ack():
+ for m in msgs:
+ ssn.acked.remove(m)
+ if not ssn.transactional:
+ sst.acked.remove(m)
+ return ack_ack
+
+ for disp, msgs in disposed:
+ if not msgs: continue
+ if disp.type is None:
+ op = MessageAccept
+ elif disp.type is RELEASED:
+ op = MessageRelease
+ elif disp.type is REJECTED:
+ op = MessageReject
+ sst.write_cmd(op(RangedSet(*[m._transfer_id for m in msgs]),
+ **disp.options),
+ ack_acker(msgs))
+ if log.isEnabledFor(DEBUG):
+ for m in msgs:
+ log.debug("SACK[%s]: %s, %s", ssn.log_id, m, m._disposition)
+
sst.acked.extend(messages)
if ssn.committing and not sst.committing:
@@ -948,7 +980,7 @@ class Engine:
for range in ids:
sst.executed.add_range(range)
sst.write_op(SessionCompleted(sst.executed))
- sst.write_cmd(MessageRelease(ids))
+ sst.write_cmd(MessageRelease(ids, True))
sst.write_cmd(TxRollback(), do_rb_ok)
def do_rb_ok():
@@ -1055,8 +1087,11 @@ class Engine:
if mp.application_headers is None:
mp.application_headers = {}
mp.application_headers[TO] = msg.to
- if msg.durable:
- dp.delivery_mode = delivery_mode.persistent
+ if msg.durable is not None:
+ if msg.durable:
+ dp.delivery_mode = delivery_mode.persistent
+ else:
+ dp.delivery_mode = delivery_mode.non_persistent
if msg.priority is not None:
dp.priority = msg.priority
if msg.ttl is not None:
@@ -1109,7 +1144,8 @@ class Engine:
if mp.reply_to is not None:
msg.reply_to = reply_to2addr(mp.reply_to)
msg.correlation_id = mp.correlation_id
- msg.durable = dp.delivery_mode == delivery_mode.persistent
+ if dp.delivery_mode is not None:
+ msg.durable = dp.delivery_mode == delivery_mode.persistent
msg.priority = dp.priority
msg.ttl = dp.ttl
msg.redelivered = dp.redelivered
Modified: qpid/trunk/qpid/python/qpid/messaging/endpoints.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/endpoints.py?rev=921638&r1=921637&r2=921638&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/endpoints.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/endpoints.py Thu Mar 11 00:03:25 2010
@@ -508,7 +508,7 @@ class Session:
raise Empty
@synchronized
- def acknowledge(self, message=None, sync=True):
+ def acknowledge(self, message=None, disposition=None, sync=True):
"""
Acknowledge the given L{Message}. If message is None, then all
unacknowledged messages on the session are acknowledged.
@@ -530,6 +530,7 @@ class Session:
raise InsufficientCapacity("ack_capacity = %s" % self.ack_capacity)
self._wakeup()
self._ewait(lambda: len(self.acked) < self.ack_capacity)
+ m._disposition = disposition
self.unacked.remove(m)
self.acked.append(m)
Modified: qpid/trunk/qpid/python/qpid/messaging/message.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/message.py?rev=921638&r1=921637&r2=921638&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/message.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/message.py Thu Mar 11 00:03:25 2010
@@ -129,7 +129,7 @@ class Message:
"correlation_id", "priority", "ttl"]:
value = self.__dict__[name]
if value is not None: args.append("%s=%r" % (name, value))
- for name in ["durable", "properties"]:
+ for name in ["durable", "redelivered", "properties"]:
value = self.__dict__[name]
if value: args.append("%s=%r" % (name, value))
if self.content_type != get_type(self.content):
@@ -141,4 +141,15 @@ class Message:
args.append(repr(self.content))
return "Message(%s)" % ", ".join(args)
-__all__ = ["Message"]
+class Disposition:
+
+ def __init__(self, type, **options):
+ self.type = type
+ self.options = options
+
+ def __repr__(self):
+ args = [str(self.type)] + \
+ ["%s=%r" % (k, v) for k, v in self.options.items()]
+ return "Disposition(%s)" % ", ".join(args)
+
+__all__ = ["Message", "Disposition"]
Modified: qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py?rev=921638&r1=921637&r2=921638&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py Thu Mar 11 00:03:25 2010
@@ -59,6 +59,9 @@ class Base(Test):
else:
return "%s[%s, %s]" % (base, count, self.test_id)
+ def message(self, base, count = None):
+ return Message(self.content(base, count))
+
def ping(self, ssn):
PING_Q = 'ping-queue; {create: always, delete: always}'
# send a message
@@ -70,7 +73,7 @@ class Base(Test):
ssn.acknowledge()
assert msg.content == content, "expected %r, got %r" % (content, msg.content)
- def drain(self, rcv, limit=None, timeout=0, expected=None):
+ def drain(self, rcv, limit=None, timeout=0, expected=None, redelivered=False):
messages = []
try:
while limit is None or len(messages) < limit:
@@ -78,21 +81,22 @@ class Base(Test):
except Empty:
pass
if expected is not None:
- self.assertEchos(expected, messages)
+ self.assertEchos(expected, messages, redelivered)
return messages
def diff(self, m1, m2):
result = {}
for attr in ("id", "subject", "user_id", "to", "reply_to",
"correlation_id", "durable", "priority", "ttl",
- "properties", "content_type", "content"):
+ "redelivered", "properties", "content_type",
+ "content"):
a1 = getattr(m1, attr)
a2 = getattr(m2, attr)
if a1 != a2:
result[attr] = (a1, a2)
return result
- def assertEcho(self, msg, echo):
+ def assertEcho(self, msg, echo, redelivered=False):
if not isinstance(msg, Message) or not isinstance(echo, Message):
if isinstance(msg, Message):
msg = msg.content
@@ -102,14 +106,19 @@ class Base(Test):
else:
delta = self.diff(msg, echo)
mttl, ettl = delta.pop("ttl", (0, 0))
+ if redelivered:
+ assert echo.redelivered, \
+ "expected %s to be redelivered: %s" % (msg, echo)
+ if delta.has_key("redelivered"):
+ del delta["redelivered"]
assert mttl is not None and ettl is not None, "%s, %s" % (mttl, ettl)
assert mttl >= ettl, "%s, %s" % (mttl, ettl)
assert not delta, "expected %s, got %s, delta %s" % (msg, echo, delta)
- def assertEchos(self, msgs, echoes):
+ def assertEchos(self, msgs, echoes, redelivered=False):
assert len(msgs) == len(echoes), "%s, %s" % (msgs, echoes)
for m, e in zip(msgs, echoes):
- self.assertEcho(m, e)
+ self.assertEcho(m, e, redelivered)
def assertEmpty(self, rcv):
contents = self.drain(rcv)
Modified: qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py?rev=921638&r1=921637&r2=921638&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py Thu Mar 11 00:03:25 2010
@@ -227,21 +227,60 @@ class SessionTests(Base):
def testAcknowledgeAsyncAckCapUNLIMITED(self):
self.ackTest(lambda ssn: ssn.acknowledge(sync=False), UNLIMITED)
- def send(self, ssn, queue, base, count=1):
- snd = ssn.sender(queue, durable=self.durable())
- contents = []
+ def testRelease(self):
+ msgs = [self.message("testRelease", i) for i in range(3)]
+ snd = self.ssn.sender("test-release-queue; {create: always, delete: always}")
+ for m in msgs:
+ snd.send(m)
+ rcv = self.ssn.receiver(snd.target)
+ echos = self.drain(rcv, expected=msgs)
+ self.ssn.acknowledge(echos[0])
+ self.ssn.acknowledge(echos[1], Disposition(RELEASED, set_redelivered=True))
+ self.ssn.acknowledge(echos[2], Disposition(RELEASED))
+ self.drain(rcv, limit=1, expected=msgs[1:2], redelivered=True)
+ self.drain(rcv, expected=msgs[2:3])
+ self.ssn.acknowledge()
+
+ def testReject(self):
+ msgs = [self.message("testReject", i) for i in range(3)]
+ snd = self.ssn.sender("""
+ test-reject-queue; {
+ create: always,
+ delete: always,
+ node-properties: {
+ x-properties: {
+ alternate_exchange: 'amq.topic'
+ }
+ }
+ }
+""")
+ for m in msgs:
+ snd.send(m)
+ rcv = self.ssn.receiver(snd.target)
+ rej = self.ssn.receiver("amq.topic")
+ echos = self.drain(rcv, expected=msgs)
+ self.ssn.acknowledge(echos[0])
+ self.ssn.acknowledge(echos[1], Disposition(REJECTED))
+ self.ssn.acknowledge(echos[2],
+ Disposition(REJECTED, code=3, text="test-reject"))
+ self.drain(rej, expected=msgs[1:])
+ self.ssn.acknowledge()
+
+ def send(self, ssn, target, base, count=1):
+ snd = ssn.sender(target, durable=self.durable())
+ messages = []
for i in range(count):
- c = self.content(base, i)
+ c = self.message(base, i)
snd.send(c)
- contents.append(c)
+ messages.append(c)
snd.close()
- return contents
+ return messages
def txTest(self, commit):
TX_Q = 'test-tx-queue; {create: sender, delete: receiver}'
TX_Q_COPY = 'test-tx-queue-copy; {create: always, delete: always}'
txssn = self.conn.session(transactional=True)
- contents = self.send(self.ssn, TX_Q, "txTest", 3)
+ messages = self.send(self.ssn, TX_Q, "txTest", 3)
txrcv = txssn.receiver(TX_Q)
txsnd = txssn.sender(TX_Q_COPY, durable=self.durable())
rcv = self.ssn.receiver(txrcv.source)
@@ -255,10 +294,10 @@ class SessionTests(Base):
if commit:
txssn.commit()
self.assertEmpty(rcv)
- self.drain(copy_rcv, expected=contents)
+ self.drain(copy_rcv, expected=messages)
else:
txssn.rollback()
- self.drain(rcv, expected=contents)
+ self.drain(rcv, expected=messages, redelivered=True)
self.assertEmpty(copy_rcv)
self.ssn.acknowledge()
@@ -271,13 +310,13 @@ class SessionTests(Base):
def txTestSend(self, commit):
TX_SEND_Q = 'test-tx-send-queue; {create: sender, delete: receiver}'
txssn = self.conn.session(transactional=True)
- contents = self.send(txssn, TX_SEND_Q, "txTestSend", 3)
+ messages = self.send(txssn, TX_SEND_Q, "txTestSend", 3)
rcv = self.ssn.receiver(TX_SEND_Q)
self.assertEmpty(rcv)
if commit:
txssn.commit()
- self.drain(rcv, expected=contents)
+ self.drain(rcv, expected=messages)
self.ssn.acknowledge()
else:
txssn.rollback()
@@ -297,17 +336,17 @@ class SessionTests(Base):
txssn = self.conn.session(transactional=True)
txrcv = txssn.receiver(TX_ACK_QC)
self.assertEmpty(txrcv)
- contents = self.send(self.ssn, TX_ACK_QC, "txTestAck", 3)
- self.drain(txrcv, expected=contents)
+ messages = self.send(self.ssn, TX_ACK_QC, "txTestAck", 3)
+ self.drain(txrcv, expected=messages)
if commit:
txssn.acknowledge()
else:
txssn.rollback()
- self.drain(txrcv, expected=contents)
+ self.drain(txrcv, expected=messages, redelivered=True)
txssn.acknowledge()
txssn.rollback()
- self.drain(txrcv, expected=contents)
+ self.drain(txrcv, expected=messages, redelivered=True)
txssn.commit() # commit without ack
self.assertEmpty(txrcv)
@@ -315,7 +354,7 @@ class SessionTests(Base):
txssn = self.conn.session(transactional=True)
txrcv = txssn.receiver(TX_ACK_QC)
- self.drain(txrcv, expected=contents)
+ self.drain(txrcv, expected=messages, redelivered=True)
txssn.acknowledge()
txssn.commit()
rcv = self.ssn.receiver(TX_ACK_QD)
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org