You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2015/01/26 20:58:12 UTC

qpid-proton git commit: support for explicit acknowledgements when using sync fetch

Repository: qpid-proton
Updated Branches:
  refs/heads/master ce21ce6e4 -> 2daa1c06c


support for explicit acknowledgements when using sync fetch


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/2daa1c06
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/2daa1c06
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/2daa1c06

Branch: refs/heads/master
Commit: 2daa1c06c98870835cf253b6242872b8c816b131
Parents: ce21ce6
Author: Gordon Sim <gs...@redhat.com>
Authored: Mon Jan 26 19:57:10 2015 +0000
Committer: Gordon Sim <gs...@redhat.com>
Committed: Mon Jan 26 19:57:10 2015 +0000

----------------------------------------------------------------------
 proton-c/bindings/python/proton/handlers.py |  5 ++-
 proton-c/bindings/python/proton/utils.py    | 43 +++++++++++++++++++-----
 2 files changed, 37 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2daa1c06/proton-c/bindings/python/proton/handlers.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/handlers.py b/proton-c/bindings/python/proton/handlers.py
index b2548fb..188f194 100644
--- a/proton-c/bindings/python/proton/handlers.py
+++ b/proton-c/bindings/python/proton/handlers.py
@@ -71,9 +71,8 @@ class ScopedHandler(Handler):
 
         objects = [getattr(event, attr) for attr in self.scopes if hasattr(event, attr) and getattr(event, attr)]
         targets = [getattr(o, "context") for o in objects if hasattr(o, "context")]
-        handlers = [getattr(t, event.type.method) for t in targets if hasattr(t, event.type.method)]
-        for h in handlers:
-            h(event)
+        for t in targets:
+            event.dispatch(t)
 
 
 class OutgoingMessageHandler(Handler):

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2daa1c06/proton-c/bindings/python/proton/utils.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/utils.py b/proton-c/bindings/python/proton/utils.py
index 6910075..9e6fb81 100644
--- a/proton-c/bindings/python/proton/utils.py
+++ b/proton-c/bindings/python/proton/utils.py
@@ -17,9 +17,9 @@
 # under the License.
 #
 import collections, Queue, socket, time, threading
-from proton import ConnectionException, Endpoint, Handler, LinkException, Message, Timeout, Url
+from proton import ConnectionException, Delivery, Endpoint, Handler, LinkException, Message, Timeout, Url
 from proton.reactors import AmqpSocket, Container, Events, SelectLoop, send_msg
-from proton.handlers import MessagingHandler, ScopedHandler, IncomingMessageHandler
+from proton.handlers import Acking, MessagingHandler, ScopedHandler, IncomingMessageHandler
 
 class BlockingLink(object):
     def __init__(self, connection, link):
@@ -52,11 +52,12 @@ class BlockingSender(BlockingLink):
 
 class Fetcher(MessagingHandler):
     def __init__(self, prefetch):
-        super(Fetcher, self).__init__(prefetch=prefetch)
+        super(Fetcher, self).__init__(prefetch=prefetch, auto_accept=False)
         self.incoming = collections.deque([])
+        self.unsettled = collections.deque([])
 
     def on_message(self, event):
-        self.incoming.append(event.message)
+        self.incoming.append((event.message, event.delivery))
 
     def on_link_error(self, event):
         # This will be handled by BlockingConnection
@@ -67,7 +68,16 @@ class Fetcher(MessagingHandler):
         return len(self.incoming)
 
     def pop(self):
-        return self.incoming.popleft()
+        message, delivery = self.incoming.popleft()
+        if not delivery.settled:
+            self.unsettled.append(delivery)
+        return message
+
+    def settle(self, state=None):
+        delivery = self.unsettled.popleft()
+        if state:
+            delivery.update(state)
+        delivery.settle()
 
 
 class BlockingReceiver(BlockingLink):
@@ -75,7 +85,7 @@ class BlockingReceiver(BlockingLink):
         super(BlockingReceiver, self).__init__(connection, receiver)
         if self.link.source and self.link.source.address and self.link.source.address != self.link.remote_source.address:
             self.link.close()
-            raise LinkException("Failed to open receiver %s, source does not match" % link.name)
+            raise LinkException("Failed to open receiver %s, source does not match" % self.link.name)
         if credit: receiver.flow(credit)
         self.fetcher = fetcher
 
@@ -87,6 +97,23 @@ class BlockingReceiver(BlockingLink):
         self.connection.wait(lambda: self.fetcher.has_message, msg="Fetching on receiver %s" % self.link.name, timeout=timeout)
         return self.fetcher.pop()
 
+    def accept(self):
+        self.settle(Delivery.ACCEPTED)
+
+    def reject(self):
+        self.settle(Delivery.REJECTED)
+
+    def release(self, delivered=True):
+        if delivered:
+            self.settle(Delivery.MODIFIED)
+        else:
+            self.settle(Delivery.RELEASED)
+
+    def settle(self, state=None):
+        if not self.fetcher:
+            raise Exception("Can't call accept/reject etc on this receiver as a handler was provided")
+        self.fetcher.settle(state)
+
 
 class BlockingConnection(Handler):
     """
@@ -100,10 +127,10 @@ class BlockingConnection(Handler):
         self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_UNINIT),
                   msg="Opening connection")
 
-    def create_sender(self, address, handler=None, name=None):
+    def create_sender(self, address, handler=None, name=None, options=None):
         return BlockingSender(self, self.container.create_sender(self.conn, address, name=name, handler=handler))
 
-    def create_receiver(self, address, credit=None, dynamic=False, handler=None, name=None):
+    def create_receiver(self, address, credit=None, dynamic=False, handler=None, name=None, options=None):
         prefetch = credit
         if handler:
             fetcher = None


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org