You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2015/01/16 20:03:35 UTC
qpid-proton git commit: Added blocking fetch() capability to
BlockingConnection
Repository: qpid-proton
Updated Branches:
refs/heads/master 0cf97bb60 -> 51c59f7d5
Added blocking fetch() capability to BlockingConnection
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/51c59f7d
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/51c59f7d
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/51c59f7d
Branch: refs/heads/master
Commit: 51c59f7d52f67d66c6e615aeaa78955fbf750459
Parents: 0cf97bb
Author: Gordon Sim <gs...@redhat.com>
Authored: Fri Jan 16 15:49:11 2015 +0000
Committer: Gordon Sim <gs...@redhat.com>
Committed: Fri Jan 16 19:04:32 2015 +0000
----------------------------------------------------------------------
proton-c/bindings/python/proton/utils.py | 47 +++++++++++++++++++++++----
1 file changed, 40 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51c59f7d/proton-c/bindings/python/proton/utils.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/utils.py b/proton-c/bindings/python/proton/utils.py
index 03c9417..4e5d4b9 100644
--- a/proton-c/bindings/python/proton/utils.py
+++ b/proton-c/bindings/python/proton/utils.py
@@ -16,10 +16,10 @@
# specific language governing permissions and limitations
# under the License.
#
-import Queue, socket, time
+import collections, Queue, socket, time
from proton import ConnectionException, Endpoint, Handler, Message, Timeout, Url
from proton.reactors import AmqpSocket, Container, Events, SelectLoop, send_msg
-from proton.handlers import ScopedHandler
+from proton.handlers import MessagingHandler, ScopedHandler
class BlockingLink(object):
def __init__(self, connection, link):
@@ -39,14 +39,40 @@ class BlockingSender(BlockingLink):
def __init__(self, connection, sender):
super(BlockingSender, self).__init__(connection, sender)
- def send_msg(self, msg):
+ def send_msg(self, msg, timeout=False):
delivery = send_msg(self.link, msg)
- self.connection.wait(lambda: delivery.settled, msg="Sending on sender %s" % self.link.name)
+ self.connection.wait(lambda: delivery.settled, msg="Sending on sender %s" % self.link.name, timeout=timeout)
+
+class Fetcher(MessagingHandler):
+ def __init__(self, prefetch):
+ super(Fetcher, self).__init__(prefetch=prefetch)
+ self.incoming = collections.deque([])
+
+ def on_message(self, event):
+ self.incoming.append(event.message)
+
+ @property
+ def has_message(self):
+ return len(self.incoming)
+
+ def pop(self):
+ return self.incoming.popleft()
+
class BlockingReceiver(BlockingLink):
- def __init__(self, connection, receiver, credit=1):
+ def __init__(self, connection, receiver, fetcher, credit=1):
super(BlockingReceiver, self).__init__(connection, receiver)
if credit: receiver.flow(credit)
+ self.fetcher = fetcher
+
+ def fetch(self, timeout=False):
+ if not self.fetcher:
+ raise Exception("Can't call fetch on this receiver as a handler was provided")
+ if not self.link.credit:
+ self.link.flow(1)
+ self.connection.wait(lambda: self.fetcher.has_message, msg="Fetching on receiver %s" % self.link.name, timeout=timeout)
+ return self.fetcher.pop()
+
class BlockingConnection(Handler):
"""
@@ -66,9 +92,16 @@ class BlockingConnection(Handler):
def create_sender(self, address, handler=None):
return BlockingSender(self, self.container.create_sender(self.conn, address, handler=handler))
- def create_receiver(self, address, credit=1, dynamic=False, handler=None):
+ def create_receiver(self, address, credit=None, dynamic=False, handler=None):
+ prefetch = credit
+ if handler:
+ fetcher = None
+ if prefetch is None:
+ prefetch = 1
+ else:
+ fetcher = Fetcher(credit)
return BlockingReceiver(
- self, self.container.create_receiver(self.conn, address, dynamic=dynamic, handler=handler), credit=credit)
+ self, self.container.create_receiver(self.conn, address, dynamic=dynamic, handler=handler or fetcher), fetcher, credit=prefetch)
def close(self):
self.conn.close()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org