You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2010/03/11 01:03:25 UTC

svn commit: r921638 - in /qpid/trunk/qpid/python/qpid: messaging/constants.py messaging/driver.py messaging/endpoints.py messaging/message.py tests/messaging/__init__.py tests/messaging/endpoints.py

Author: rhs
Date: Thu Mar 11 00:03:25 2010
New Revision: 921638

URL: http://svn.apache.org/viewvc?rev=921638&view=rev
Log:
added support for reject/release

Modified:
    qpid/trunk/qpid/python/qpid/messaging/constants.py
    qpid/trunk/qpid/python/qpid/messaging/driver.py
    qpid/trunk/qpid/python/qpid/messaging/endpoints.py
    qpid/trunk/qpid/python/qpid/messaging/message.py
    qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py
    qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py

Modified: qpid/trunk/qpid/python/qpid/messaging/constants.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/constants.py?rev=921638&r1=921637&r2=921638&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/constants.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/constants.py Thu Mar 11 00:03:25 2010
@@ -17,11 +17,16 @@
 # under the License.
 #
 
+__SELF__ = object()
+
 class Constant:
 
-  def __init__(self, name, value=None):
+  def __init__(self, name, value=__SELF__):
     self.name = name
-    self.value = value
+    if value is __SELF__:
+      self.value = self
+    else:
+      self.value = value
 
   def __repr__(self):
     return self.name
@@ -30,3 +35,6 @@ AMQP_PORT = 5672
 AMQPS_PORT = 5671
 
 UNLIMITED = Constant("UNLIMITED", 0xFFFFFFFFL)
+
+REJECTED = Constant("REJECTED")
+RELEASED = Constant("RELEASED")

Modified: qpid/trunk/qpid/python/qpid/messaging/driver.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/driver.py?rev=921638&r1=921637&r2=921638&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/driver.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/driver.py Thu Mar 11 00:03:25 2010
@@ -18,7 +18,7 @@
 #
 
 import socket, struct, sys, time
-from logging import getLogger
+from logging import getLogger, DEBUG
 from qpid import compat
 from qpid import sasl
 from qpid.concurrency import synchronized
@@ -27,9 +27,9 @@ from qpid.exceptions import Timeout, Ver
 from qpid.framing import OpEncoder, SegmentEncoder, FrameEncoder, \
     FrameDecoder, SegmentDecoder, OpDecoder
 from qpid.messaging import address
-from qpid.messaging.constants import UNLIMITED
+from qpid.messaging.constants import UNLIMITED, REJECTED, RELEASED
 from qpid.messaging.exceptions import ConnectError
-from qpid.messaging.message import get_codec, Message
+from qpid.messaging.message import get_codec, Disposition, Message
 from qpid.ops import *
 from qpid.selector import Selector
 from qpid.util import connect
@@ -435,6 +435,8 @@ class Driver:
       self._host = (self._host + 1) % len(self._hosts)
       self.close_engine(e)
 
+DEFAULT_DISPOSITION = Disposition(None)
+
 class Engine:
 
   def __init__(self, connection):
@@ -915,19 +917,49 @@ class Engine:
     if ssn.acked:
       messages = [m for m in ssn.acked if m not in sst.acked]
       if messages:
-        # XXX: we're ignoring acks that get lost when disconnected,
-        # could we deal this via some message-id based purge?
-        ids = RangedSet(*[m._transfer_id for m in messages if m._transfer_id is not None])
+        ids = RangedSet()
+
+        disposed = [(DEFAULT_DISPOSITION, [])]
+        for m in messages:
+          # XXX: we're ignoring acks that get lost when disconnected,
+          # could we deal this via some message-id based purge?
+          if m._transfer_id is None:
+            continue
+          ids.add(m._transfer_id)
+          disp = m._disposition or DEFAULT_DISPOSITION
+          last, msgs = disposed[-1]
+          if disp.type is last.type and disp.options == last.options:
+            msgs.append(m)
+          else:
+            disposed.append((disp, [m]))
+
         for range in ids:
           sst.executed.add_range(range)
         sst.write_op(SessionCompleted(sst.executed))
-        def ack_ack():
-          for m in messages:
-            ssn.acked.remove(m)
-            if not ssn.transactional:
-              sst.acked.remove(m)
-        sst.write_cmd(MessageAccept(ids), ack_ack)
-        log.debug("SACK[%s]: %s", ssn.log_id, m)
+
+        def ack_acker(msgs):
+          def ack_ack():
+            for m in msgs:
+              ssn.acked.remove(m)
+              if not ssn.transactional:
+                sst.acked.remove(m)
+          return ack_ack
+
+        for disp, msgs in disposed:
+          if not msgs: continue
+          if disp.type is None:
+            op = MessageAccept
+          elif disp.type is RELEASED:
+            op = MessageRelease
+          elif disp.type is REJECTED:
+            op = MessageReject
+          sst.write_cmd(op(RangedSet(*[m._transfer_id for m in msgs]),
+                           **disp.options),
+                        ack_acker(msgs))
+          if log.isEnabledFor(DEBUG):
+            for m in msgs:
+              log.debug("SACK[%s]: %s, %s", ssn.log_id, m, m._disposition)
+
         sst.acked.extend(messages)
 
     if ssn.committing and not sst.committing:
@@ -948,7 +980,7 @@ class Engine:
         for range in ids:
           sst.executed.add_range(range)
         sst.write_op(SessionCompleted(sst.executed))
-        sst.write_cmd(MessageRelease(ids))
+        sst.write_cmd(MessageRelease(ids, True))
         sst.write_cmd(TxRollback(), do_rb_ok)
 
       def do_rb_ok():
@@ -1055,8 +1087,11 @@ class Engine:
       if mp.application_headers is None:
         mp.application_headers = {}
       mp.application_headers[TO] = msg.to
-    if msg.durable:
-      dp.delivery_mode = delivery_mode.persistent
+    if msg.durable is not None:
+      if msg.durable:
+        dp.delivery_mode = delivery_mode.persistent
+      else:
+        dp.delivery_mode = delivery_mode.non_persistent
     if msg.priority is not None:
       dp.priority = msg.priority
     if msg.ttl is not None:
@@ -1109,7 +1144,8 @@ class Engine:
     if mp.reply_to is not None:
       msg.reply_to = reply_to2addr(mp.reply_to)
     msg.correlation_id = mp.correlation_id
-    msg.durable = dp.delivery_mode == delivery_mode.persistent
+    if dp.delivery_mode is not None:
+      msg.durable = dp.delivery_mode == delivery_mode.persistent
     msg.priority = dp.priority
     msg.ttl = dp.ttl
     msg.redelivered = dp.redelivered

Modified: qpid/trunk/qpid/python/qpid/messaging/endpoints.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/endpoints.py?rev=921638&r1=921637&r2=921638&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/endpoints.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/endpoints.py Thu Mar 11 00:03:25 2010
@@ -508,7 +508,7 @@ class Session:
       raise Empty
 
   @synchronized
-  def acknowledge(self, message=None, sync=True):
+  def acknowledge(self, message=None, disposition=None, sync=True):
     """
     Acknowledge the given L{Message}. If message is None, then all
     unacknowledged messages on the session are acknowledged.
@@ -530,6 +530,7 @@ class Session:
           raise InsufficientCapacity("ack_capacity = %s" % self.ack_capacity)
         self._wakeup()
         self._ewait(lambda: len(self.acked) < self.ack_capacity)
+      m._disposition = disposition
       self.unacked.remove(m)
       self.acked.append(m)
 

Modified: qpid/trunk/qpid/python/qpid/messaging/message.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/message.py?rev=921638&r1=921637&r2=921638&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/message.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/message.py Thu Mar 11 00:03:25 2010
@@ -129,7 +129,7 @@ class Message:
                  "correlation_id", "priority", "ttl"]:
       value = self.__dict__[name]
       if value is not None: args.append("%s=%r" % (name, value))
-    for name in ["durable", "properties"]:
+    for name in ["durable", "redelivered", "properties"]:
       value = self.__dict__[name]
       if value: args.append("%s=%r" % (name, value))
     if self.content_type != get_type(self.content):
@@ -141,4 +141,15 @@ class Message:
         args.append(repr(self.content))
     return "Message(%s)" % ", ".join(args)
 
-__all__ = ["Message"]
+class Disposition:
+
+  def __init__(self, type, **options):
+    self.type = type
+    self.options = options
+
+  def __repr__(self):
+    args = [str(self.type)] + \
+        ["%s=%r" % (k, v) for k, v in self.options.items()]
+    return "Disposition(%s)" % ", ".join(args)
+
+__all__ = ["Message", "Disposition"]

Modified: qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py?rev=921638&r1=921637&r2=921638&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py Thu Mar 11 00:03:25 2010
@@ -59,6 +59,9 @@ class Base(Test):
     else:
       return "%s[%s, %s]" % (base, count, self.test_id)
 
+  def message(self, base, count = None):
+    return Message(self.content(base, count))
+
   def ping(self, ssn):
     PING_Q = 'ping-queue; {create: always, delete: always}'
     # send a message
@@ -70,7 +73,7 @@ class Base(Test):
     ssn.acknowledge()
     assert msg.content == content, "expected %r, got %r" % (content, msg.content)
 
-  def drain(self, rcv, limit=None, timeout=0, expected=None):
+  def drain(self, rcv, limit=None, timeout=0, expected=None, redelivered=False):
     messages = []
     try:
       while limit is None or len(messages) < limit:
@@ -78,21 +81,22 @@ class Base(Test):
     except Empty:
       pass
     if expected is not None:
-      self.assertEchos(expected, messages)
+      self.assertEchos(expected, messages, redelivered)
     return messages
 
   def diff(self, m1, m2):
     result = {}
     for attr in ("id", "subject", "user_id", "to", "reply_to",
                  "correlation_id", "durable", "priority", "ttl",
-                 "properties", "content_type", "content"):
+                 "redelivered", "properties", "content_type",
+                 "content"):
       a1 = getattr(m1, attr)
       a2 = getattr(m2, attr)
       if a1 != a2:
         result[attr] = (a1, a2)
     return result
 
-  def assertEcho(self, msg, echo):
+  def assertEcho(self, msg, echo, redelivered=False):
     if not isinstance(msg, Message) or not isinstance(echo, Message):
       if isinstance(msg, Message):
         msg = msg.content
@@ -102,14 +106,19 @@ class Base(Test):
     else:
       delta = self.diff(msg, echo)
       mttl, ettl = delta.pop("ttl", (0, 0))
+      if redelivered:
+        assert echo.redelivered, \
+            "expected %s to be redelivered: %s" % (msg, echo)
+        if delta.has_key("redelivered"):
+          del delta["redelivered"]
       assert mttl is not None and ettl is not None, "%s, %s" % (mttl, ettl)
       assert mttl >= ettl, "%s, %s" % (mttl, ettl)
       assert not delta, "expected %s, got %s, delta %s" % (msg, echo, delta)
 
-  def assertEchos(self, msgs, echoes):
+  def assertEchos(self, msgs, echoes, redelivered=False):
     assert len(msgs) == len(echoes), "%s, %s" % (msgs, echoes)
     for m, e in zip(msgs, echoes):
-      self.assertEcho(m, e)
+      self.assertEcho(m, e, redelivered)
 
   def assertEmpty(self, rcv):
     contents = self.drain(rcv)

Modified: qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py?rev=921638&r1=921637&r2=921638&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/messaging/endpoints.py Thu Mar 11 00:03:25 2010
@@ -227,21 +227,60 @@ class SessionTests(Base):
   def testAcknowledgeAsyncAckCapUNLIMITED(self):
     self.ackTest(lambda ssn: ssn.acknowledge(sync=False), UNLIMITED)
 
-  def send(self, ssn, queue, base, count=1):
-    snd = ssn.sender(queue, durable=self.durable())
-    contents = []
+  def testRelease(self):
+    msgs = [self.message("testRelease", i) for i in range(3)]
+    snd = self.ssn.sender("test-release-queue; {create: always, delete: always}")
+    for m in msgs:
+      snd.send(m)
+    rcv = self.ssn.receiver(snd.target)
+    echos = self.drain(rcv, expected=msgs)
+    self.ssn.acknowledge(echos[0])
+    self.ssn.acknowledge(echos[1], Disposition(RELEASED, set_redelivered=True))
+    self.ssn.acknowledge(echos[2], Disposition(RELEASED))
+    self.drain(rcv, limit=1, expected=msgs[1:2], redelivered=True)
+    self.drain(rcv, expected=msgs[2:3])
+    self.ssn.acknowledge()
+
+  def testReject(self):
+    msgs = [self.message("testReject", i) for i in range(3)]
+    snd = self.ssn.sender("""
+      test-reject-queue; {
+        create: always,
+        delete: always,
+        node-properties: {
+          x-properties: {
+            alternate_exchange: 'amq.topic'
+          }
+        }
+      }
+""")
+    for m in msgs:
+      snd.send(m)
+    rcv = self.ssn.receiver(snd.target)
+    rej = self.ssn.receiver("amq.topic")
+    echos = self.drain(rcv, expected=msgs)
+    self.ssn.acknowledge(echos[0])
+    self.ssn.acknowledge(echos[1], Disposition(REJECTED))
+    self.ssn.acknowledge(echos[2],
+                         Disposition(REJECTED, code=3, text="test-reject"))
+    self.drain(rej, expected=msgs[1:])
+    self.ssn.acknowledge()
+
+  def send(self, ssn, target, base, count=1):
+    snd = ssn.sender(target, durable=self.durable())
+    messages = []
     for i in range(count):
-      c = self.content(base, i)
+      c = self.message(base, i)
       snd.send(c)
-      contents.append(c)
+      messages.append(c)
     snd.close()
-    return contents
+    return messages
 
   def txTest(self, commit):
     TX_Q = 'test-tx-queue; {create: sender, delete: receiver}'
     TX_Q_COPY = 'test-tx-queue-copy; {create: always, delete: always}'
     txssn = self.conn.session(transactional=True)
-    contents = self.send(self.ssn, TX_Q, "txTest", 3)
+    messages = self.send(self.ssn, TX_Q, "txTest", 3)
     txrcv = txssn.receiver(TX_Q)
     txsnd = txssn.sender(TX_Q_COPY, durable=self.durable())
     rcv = self.ssn.receiver(txrcv.source)
@@ -255,10 +294,10 @@ class SessionTests(Base):
     if commit:
       txssn.commit()
       self.assertEmpty(rcv)
-      self.drain(copy_rcv, expected=contents)
+      self.drain(copy_rcv, expected=messages)
     else:
       txssn.rollback()
-      self.drain(rcv, expected=contents)
+      self.drain(rcv, expected=messages, redelivered=True)
       self.assertEmpty(copy_rcv)
     self.ssn.acknowledge()
 
@@ -271,13 +310,13 @@ class SessionTests(Base):
   def txTestSend(self, commit):
     TX_SEND_Q = 'test-tx-send-queue; {create: sender, delete: receiver}'
     txssn = self.conn.session(transactional=True)
-    contents = self.send(txssn, TX_SEND_Q, "txTestSend", 3)
+    messages = self.send(txssn, TX_SEND_Q, "txTestSend", 3)
     rcv = self.ssn.receiver(TX_SEND_Q)
     self.assertEmpty(rcv)
 
     if commit:
       txssn.commit()
-      self.drain(rcv, expected=contents)
+      self.drain(rcv, expected=messages)
       self.ssn.acknowledge()
     else:
       txssn.rollback()
@@ -297,17 +336,17 @@ class SessionTests(Base):
     txssn = self.conn.session(transactional=True)
     txrcv = txssn.receiver(TX_ACK_QC)
     self.assertEmpty(txrcv)
-    contents = self.send(self.ssn, TX_ACK_QC, "txTestAck", 3)
-    self.drain(txrcv, expected=contents)
+    messages = self.send(self.ssn, TX_ACK_QC, "txTestAck", 3)
+    self.drain(txrcv, expected=messages)
 
     if commit:
       txssn.acknowledge()
     else:
       txssn.rollback()
-      self.drain(txrcv, expected=contents)
+      self.drain(txrcv, expected=messages, redelivered=True)
       txssn.acknowledge()
       txssn.rollback()
-      self.drain(txrcv, expected=contents)
+      self.drain(txrcv, expected=messages, redelivered=True)
       txssn.commit() # commit without ack
       self.assertEmpty(txrcv)
 
@@ -315,7 +354,7 @@ class SessionTests(Base):
 
     txssn = self.conn.session(transactional=True)
     txrcv = txssn.receiver(TX_ACK_QC)
-    self.drain(txrcv, expected=contents)
+    self.drain(txrcv, expected=messages, redelivered=True)
     txssn.acknowledge()
     txssn.commit()
     rcv = self.ssn.receiver(TX_ACK_QD)



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org