You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2015/01/26 20:58:12 UTC
qpid-proton git commit: support for explicit acknowledgements when
using sync fetch
Repository: qpid-proton
Updated Branches:
refs/heads/master ce21ce6e4 -> 2daa1c06c
support for explicit acknowledgements when using sync fetch
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/2daa1c06
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/2daa1c06
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/2daa1c06
Branch: refs/heads/master
Commit: 2daa1c06c98870835cf253b6242872b8c816b131
Parents: ce21ce6
Author: Gordon Sim <gs...@redhat.com>
Authored: Mon Jan 26 19:57:10 2015 +0000
Committer: Gordon Sim <gs...@redhat.com>
Committed: Mon Jan 26 19:57:10 2015 +0000
----------------------------------------------------------------------
proton-c/bindings/python/proton/handlers.py | 5 ++-
proton-c/bindings/python/proton/utils.py | 43 +++++++++++++++++++-----
2 files changed, 37 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2daa1c06/proton-c/bindings/python/proton/handlers.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/handlers.py b/proton-c/bindings/python/proton/handlers.py
index b2548fb..188f194 100644
--- a/proton-c/bindings/python/proton/handlers.py
+++ b/proton-c/bindings/python/proton/handlers.py
@@ -71,9 +71,8 @@ class ScopedHandler(Handler):
objects = [getattr(event, attr) for attr in self.scopes if hasattr(event, attr) and getattr(event, attr)]
targets = [getattr(o, "context") for o in objects if hasattr(o, "context")]
- handlers = [getattr(t, event.type.method) for t in targets if hasattr(t, event.type.method)]
- for h in handlers:
- h(event)
+ for t in targets:
+ event.dispatch(t)
class OutgoingMessageHandler(Handler):
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2daa1c06/proton-c/bindings/python/proton/utils.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/utils.py b/proton-c/bindings/python/proton/utils.py
index 6910075..9e6fb81 100644
--- a/proton-c/bindings/python/proton/utils.py
+++ b/proton-c/bindings/python/proton/utils.py
@@ -17,9 +17,9 @@
# under the License.
#
import collections, Queue, socket, time, threading
-from proton import ConnectionException, Endpoint, Handler, LinkException, Message, Timeout, Url
+from proton import ConnectionException, Delivery, Endpoint, Handler, LinkException, Message, Timeout, Url
from proton.reactors import AmqpSocket, Container, Events, SelectLoop, send_msg
-from proton.handlers import MessagingHandler, ScopedHandler, IncomingMessageHandler
+from proton.handlers import Acking, MessagingHandler, ScopedHandler, IncomingMessageHandler
class BlockingLink(object):
def __init__(self, connection, link):
@@ -52,11 +52,12 @@ class BlockingSender(BlockingLink):
class Fetcher(MessagingHandler):
def __init__(self, prefetch):
- super(Fetcher, self).__init__(prefetch=prefetch)
+ super(Fetcher, self).__init__(prefetch=prefetch, auto_accept=False)
self.incoming = collections.deque([])
+ self.unsettled = collections.deque([])
def on_message(self, event):
- self.incoming.append(event.message)
+ self.incoming.append((event.message, event.delivery))
def on_link_error(self, event):
# This will be handled by BlockingConnection
@@ -67,7 +68,16 @@ class Fetcher(MessagingHandler):
return len(self.incoming)
def pop(self):
- return self.incoming.popleft()
+ message, delivery = self.incoming.popleft()
+ if not delivery.settled:
+ self.unsettled.append(delivery)
+ return message
+
+ def settle(self, state=None):
+ delivery = self.unsettled.popleft()
+ if state:
+ delivery.update(state)
+ delivery.settle()
class BlockingReceiver(BlockingLink):
@@ -75,7 +85,7 @@ class BlockingReceiver(BlockingLink):
super(BlockingReceiver, self).__init__(connection, receiver)
if self.link.source and self.link.source.address and self.link.source.address != self.link.remote_source.address:
self.link.close()
- raise LinkException("Failed to open receiver %s, source does not match" % link.name)
+ raise LinkException("Failed to open receiver %s, source does not match" % self.link.name)
if credit: receiver.flow(credit)
self.fetcher = fetcher
@@ -87,6 +97,23 @@ class BlockingReceiver(BlockingLink):
self.connection.wait(lambda: self.fetcher.has_message, msg="Fetching on receiver %s" % self.link.name, timeout=timeout)
return self.fetcher.pop()
+ def accept(self):
+ self.settle(Delivery.ACCEPTED)
+
+ def reject(self):
+ self.settle(Delivery.REJECTED)
+
+ def release(self, delivered=True):
+ if delivered:
+ self.settle(Delivery.MODIFIED)
+ else:
+ self.settle(Delivery.RELEASED)
+
+ def settle(self, state=None):
+ if not self.fetcher:
+ raise Exception("Can't call accept/reject etc on this receiver as a handler was provided")
+ self.fetcher.settle(state)
+
class BlockingConnection(Handler):
"""
@@ -100,10 +127,10 @@ class BlockingConnection(Handler):
self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_UNINIT),
msg="Opening connection")
- def create_sender(self, address, handler=None, name=None):
+ def create_sender(self, address, handler=None, name=None, options=None):
return BlockingSender(self, self.container.create_sender(self.conn, address, name=name, handler=handler))
- def create_receiver(self, address, credit=None, dynamic=False, handler=None, name=None):
+ def create_receiver(self, address, credit=None, dynamic=False, handler=None, name=None, options=None):
prefetch = credit
if handler:
fetcher = None
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org