You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by pm...@apache.org on 2013/09/27 15:52:02 UTC

svn commit: r1526901 - /qpid/trunk/qpid/python/qpid/messaging/driver.py

Author: pmoravec
Date: Fri Sep 27 13:52:01 2013
New Revision: 1526901

URL: http://svn.apache.org/r1526901
Log:
QPID-5183 Python client does not release acquired messages on consumer close when session persists

Modified:
    qpid/trunk/qpid/python/qpid/messaging/driver.py

Modified: qpid/trunk/qpid/python/qpid/messaging/driver.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/driver.py?rev=1526901&r1=1526900&r2=1526901&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/driver.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/driver.py Fri Sep 27 13:52:01 2013
@@ -259,6 +259,15 @@ class LinkIn:
     reliability = link_opts.get("reliability")
     cmds = [MessageCancel(_rcv.destination)]
     cmds.extend(_rcv.on_unlink)
+    msgs = [] #release back messages for the closing receiver
+    msg = rcv.session._pop(rcv)
+    while msg is not None:
+      msgs.append(msg)
+      msg = rcv.session._pop(rcv)
+    if len(msgs) > 0:
+      ids = RangedSet(*[m._transfer_id for m in msgs])
+      log.debug("releasing back messages: %s, as receiver is closing", ids)
+      cmds.append(MessageRelease(ids, True))
     sst.write_cmds(cmds, action)
 
   def del_link(self, sst, rcv, _rcv):
@@ -1283,6 +1292,11 @@ class Engine:
     msg = self._decode(xfr)
     rcv = sst.destinations[xfr.destination].target
     msg._receiver = rcv
+    if rcv.closing or rcv.closed: # release message to a closing receiver
+      ids = RangedSet(*[msg._transfer_id])
+      log.debug("releasing back %s message: %s, as receiver is closing", ids, msg)
+      sst.write_cmd(MessageRelease(ids, True))
+      return
     if rcv.impending is not UNLIMITED:
       assert rcv.received < rcv.impending, "%s, %s" % (rcv.received, rcv.impending)
     rcv.received += 1



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org