You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2015/01/16 20:03:35 UTC

qpid-proton git commit: Added blocking fetch() capability to BlockingConnection

Repository: qpid-proton
Updated Branches:
  refs/heads/master 0cf97bb60 -> 51c59f7d5


Added blocking fetch() capability to BlockingConnection


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/51c59f7d
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/51c59f7d
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/51c59f7d

Branch: refs/heads/master
Commit: 51c59f7d52f67d66c6e615aeaa78955fbf750459
Parents: 0cf97bb
Author: Gordon Sim <gs...@redhat.com>
Authored: Fri Jan 16 15:49:11 2015 +0000
Committer: Gordon Sim <gs...@redhat.com>
Committed: Fri Jan 16 19:04:32 2015 +0000

----------------------------------------------------------------------
 proton-c/bindings/python/proton/utils.py | 47 +++++++++++++++++++++++----
 1 file changed, 40 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51c59f7d/proton-c/bindings/python/proton/utils.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/utils.py b/proton-c/bindings/python/proton/utils.py
index 03c9417..4e5d4b9 100644
--- a/proton-c/bindings/python/proton/utils.py
+++ b/proton-c/bindings/python/proton/utils.py
@@ -16,10 +16,10 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-import Queue, socket, time
+import collections, Queue, socket, time
 from proton import ConnectionException, Endpoint, Handler, Message, Timeout, Url
 from proton.reactors import AmqpSocket, Container, Events, SelectLoop, send_msg
-from proton.handlers import ScopedHandler
+from proton.handlers import MessagingHandler, ScopedHandler
 
 class BlockingLink(object):
     def __init__(self, connection, link):
@@ -39,14 +39,40 @@ class BlockingSender(BlockingLink):
     def __init__(self, connection, sender):
         super(BlockingSender, self).__init__(connection, sender)
 
-    def send_msg(self, msg):
+    def send_msg(self, msg, timeout=False):
         delivery = send_msg(self.link, msg)
-        self.connection.wait(lambda: delivery.settled, msg="Sending on sender %s" % self.link.name)
+        self.connection.wait(lambda: delivery.settled, msg="Sending on sender %s" % self.link.name, timeout=timeout)
+
+class Fetcher(MessagingHandler):
+    def __init__(self, prefetch):
+        super(Fetcher, self).__init__(prefetch=prefetch)
+        self.incoming = collections.deque([])
+
+    def on_message(self, event):
+        self.incoming.append(event.message)
+
+    @property
+    def has_message(self):
+        return len(self.incoming)
+
+    def pop(self):
+        return self.incoming.popleft()
+
 
 class BlockingReceiver(BlockingLink):
-    def __init__(self, connection, receiver, credit=1):
+    def __init__(self, connection, receiver, fetcher, credit=1):
         super(BlockingReceiver, self).__init__(connection, receiver)
         if credit: receiver.flow(credit)
+        self.fetcher = fetcher
+
+    def fetch(self, timeout=False):
+        if not self.fetcher:
+            raise Exception("Can't call fetch on this receiver as a handler was provided")
+        if not self.link.credit:
+            self.link.flow(1)
+        self.connection.wait(lambda: self.fetcher.has_message, msg="Fetching on receiver %s" % self.link.name, timeout=timeout)
+        return self.fetcher.pop()
+
 
 class BlockingConnection(Handler):
     """
@@ -66,9 +92,16 @@ class BlockingConnection(Handler):
     def create_sender(self, address, handler=None):
         return BlockingSender(self, self.container.create_sender(self.conn, address, handler=handler))
 
-    def create_receiver(self, address, credit=1, dynamic=False, handler=None):
+    def create_receiver(self, address, credit=None, dynamic=False, handler=None):
+        prefetch = credit
+        if handler:
+            fetcher = None
+            if prefetch is None:
+                prefetch = 1
+        else:
+            fetcher = Fetcher(credit)
         return BlockingReceiver(
-            self, self.container.create_receiver(self.conn, address, dynamic=dynamic, handler=handler), credit=credit)
+            self, self.container.create_receiver(self.conn, address, dynamic=dynamic, handler=handler or fetcher), fetcher, credit=prefetch)
 
     def close(self):
         self.conn.close()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org