You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2015/01/28 23:05:20 UTC
[3/3] qpid-proton git commit: Improved send for simple case
Improved send for simple case
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/c1424402
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/c1424402
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/c1424402
Branch: refs/heads/master
Commit: c14244020a46ba0b20edf3e5c8599896751553d0
Parents: 1b0e7db
Author: Gordon Sim <gs...@redhat.com>
Authored: Tue Jan 27 14:16:05 2015 +0000
Committer: Gordon Sim <gs...@redhat.com>
Committed: Wed Jan 28 21:57:01 2015 +0000
----------------------------------------------------------------------
examples/engine/py/client.py | 2 +-
examples/engine/py/client_http.py | 2 +-
examples/engine/py/db_send.py | 2 +-
examples/engine/py/helloworld.py | 2 +-
examples/engine/py/helloworld_blocking.py | 2 +-
examples/engine/py/helloworld_direct.py | 2 +-
examples/engine/py/helloworld_direct_tornado.py | 2 +-
examples/engine/py/helloworld_tornado.py | 2 +-
examples/engine/py/server.py | 2 +-
examples/engine/py/server_direct.py | 6 +--
examples/engine/py/server_tx.py | 2 +-
examples/engine/py/simple_send.py | 2 +-
examples/engine/py/sync_client.py | 2 +-
examples/engine/py/tx_send.py | 2 +-
proton-c/bindings/python/proton/__init__.py | 39 +++++++++++++++++++-
proton-c/bindings/python/proton/reactors.py | 33 +++++------------
proton-c/bindings/python/proton/utils.py | 8 ++--
tests/python/proton_tests/utils.py | 5 +--
18 files changed, 69 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c1424402/examples/engine/py/client.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/client.py b/examples/engine/py/client.py
index f39c14a..0bc6bec 100755
--- a/examples/engine/py/client.py
+++ b/examples/engine/py/client.py
@@ -36,7 +36,7 @@ class Client(MessagingHandler):
def next_request(self):
if self.receiver.remote_source.address:
req = Message(reply_to=self.receiver.remote_source.address, body=self.requests[0])
- self.sender.send_msg(req)
+ self.sender.send(req)
def on_link_opened(self, event):
if event.receiver == self.receiver:
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c1424402/examples/engine/py/client_http.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/client_http.py b/examples/engine/py/client_http.py
index 6aa438d..aa46ed9 100755
--- a/examples/engine/py/client_http.py
+++ b/examples/engine/py/client_http.py
@@ -60,7 +60,7 @@ class Client(MessagingHandler):
request, handler = self.pending.pop(0)
self.sent.append((request, handler))
req = Message(reply_to=self.reply_address, body=request)
- self.sender.send_msg(req)
+ self.sender.send(req)
def request(self, body, handler):
self.pending.append((body, handler))
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c1424402/examples/engine/py/db_send.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/db_send.py b/examples/engine/py/db_send.py
index d3b321a..1e1ad3f 100755
--- a/examples/engine/py/db_send.py
+++ b/examples/engine/py/db_send.py
@@ -68,7 +68,7 @@ class Send(MessagingHandler):
if not self.keep_sending(): return
record = self.records.get(False)
id = record['id']
- self.sender.send_msg(Message(id=id, durable=True, body=record['description']), tag=str(id))
+ self.sender.send(Message(id=id, durable=True, body=record['description']), tag=str(id))
self.sent += 1
print "sent message %s" % id
self.request_records()
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c1424402/examples/engine/py/helloworld.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/helloworld.py b/examples/engine/py/helloworld.py
index 71f2c32..4ec53ca 100755
--- a/examples/engine/py/helloworld.py
+++ b/examples/engine/py/helloworld.py
@@ -34,7 +34,7 @@ class HelloWorld(MessagingHandler):
event.container.create_sender(conn, self.address)
def on_sendable(self, event):
- event.sender.send_msg(Message(body=u"Hello World!"))
+ event.sender.send(Message(body=u"Hello World!"))
event.sender.close()
def on_message(self, event):
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c1424402/examples/engine/py/helloworld_blocking.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/helloworld_blocking.py b/examples/engine/py/helloworld_blocking.py
index 9c5e062..d9a24a9 100755
--- a/examples/engine/py/helloworld_blocking.py
+++ b/examples/engine/py/helloworld_blocking.py
@@ -30,6 +30,6 @@ class HelloWorldReceiver(IncomingMessageHandler):
conn = BlockingConnection("localhost:5672")
conn.create_receiver("examples", handler=HelloWorldReceiver())
sender = conn.create_sender("examples")
-sender.send_msg(Message(body=u"Hello World!"));
+sender.send(Message(body=u"Hello World!"));
conn.run()
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c1424402/examples/engine/py/helloworld_direct.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/helloworld_direct.py b/examples/engine/py/helloworld_direct.py
index 1d6e475..6b5cb2e 100755
--- a/examples/engine/py/helloworld_direct.py
+++ b/examples/engine/py/helloworld_direct.py
@@ -32,7 +32,7 @@ class HelloWorld(MessagingHandler):
event.container.create_sender(self.url)
def on_sendable(self, event):
- event.sender.send_msg(Message(body=u"Hello World!"))
+ event.sender.send(Message(body=u"Hello World!"))
event.sender.close()
def on_message(self, event):
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c1424402/examples/engine/py/helloworld_direct_tornado.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/helloworld_direct_tornado.py b/examples/engine/py/helloworld_direct_tornado.py
index c3bcd80..e798dae 100755
--- a/examples/engine/py/helloworld_direct_tornado.py
+++ b/examples/engine/py/helloworld_direct_tornado.py
@@ -35,7 +35,7 @@ class HelloWorld(MessagingHandler):
event.container.create_sender(conn, self.address)
def on_sendable(self, event):
- event.sender.send_msg(Message(body=u"Hello World!"))
+ event.sender.send(Message(body=u"Hello World!"))
event.sender.close()
def on_message(self, event):
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c1424402/examples/engine/py/helloworld_tornado.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/helloworld_tornado.py b/examples/engine/py/helloworld_tornado.py
index d0523e8..c56ca8d 100755
--- a/examples/engine/py/helloworld_tornado.py
+++ b/examples/engine/py/helloworld_tornado.py
@@ -35,7 +35,7 @@ class HelloWorld(MessagingHandler):
event.container.create_sender(conn, self.address)
def on_sendable(self, event):
- event.sender.send_msg(Message(body=u"Hello World!"))
+ event.sender.send(Message(body=u"Hello World!"))
event.sender.close()
def on_message(self, event):
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c1424402/examples/engine/py/server.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/server.py b/examples/engine/py/server.py
index 3e6aad4..5326fa0 100755
--- a/examples/engine/py/server.py
+++ b/examples/engine/py/server.py
@@ -46,7 +46,7 @@ class Server(MessagingHandler):
if not sender:
sender = self.container.create_sender(self.conn, event.message.reply_to)
self.senders[event.message.reply_to] = sender
- sender.send_msg(Message(address=event.message.reply_to, body=event.message.body.upper()))
+ sender.send(Message(address=event.message.reply_to, body=event.message.body.upper()))
try:
Container(Server("localhost:5672", "examples")).run()
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c1424402/examples/engine/py/server_direct.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/server_direct.py b/examples/engine/py/server_direct.py
index 658abd4..6e1b830 100755
--- a/examples/engine/py/server_direct.py
+++ b/examples/engine/py/server_direct.py
@@ -20,7 +20,7 @@
from proton import generate_uuid, Message
from proton.handlers import MessagingHandler
-from proton.reactors import Container, delivery_tags, send_msg
+from proton.reactors import Container
class Server(MessagingHandler):
def __init__(self, url):
@@ -37,11 +37,9 @@ class Server(MessagingHandler):
if event.link.remote_source and event.link.remote_source.dynamic:
event.link.source.address = str(generate_uuid())
self.senders[event.link.source.address] = event.link
- event.link.tags = delivery_tags()
elif event.link.remote_target and event.link.remote_target.address:
event.link.target.address = event.link.remote_target.address
self.senders[event.link.remote_target.address] = event.link
- event.link.tags = delivery_tags()
elif event.link.remote_source:
event.link.source.address = event.link.remote_source.address
elif event.link.remote_target:
@@ -50,7 +48,7 @@ class Server(MessagingHandler):
def on_message(self, event):
sender = self.senders.get(event.message.reply_to)
if sender:
- send_msg(sender, Message(address=event.message.reply_to, body=event.message.body.upper()))
+ sender.send(Message(address=event.message.reply_to, body=event.message.body.upper()))
try:
Container(Server("localhost:8888")).run()
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c1424402/examples/engine/py/server_tx.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/server_tx.py b/examples/engine/py/server_tx.py
index ce43973..02929fa 100755
--- a/examples/engine/py/server_tx.py
+++ b/examples/engine/py/server_tx.py
@@ -30,7 +30,7 @@ class TxRequest(TransactionHandler):
self.request_delivery = request_delivery
def on_transaction_declared(self, event):
- self.sender.send_msg(self.response, transaction=event.transaction)
+ event.transaction.send(self.sender, self.response)
event.transaction.accept(self.request_delivery)
event.transaction.commit()
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c1424402/examples/engine/py/simple_send.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/simple_send.py b/examples/engine/py/simple_send.py
index 9ce9c58..40adc8d 100755
--- a/examples/engine/py/simple_send.py
+++ b/examples/engine/py/simple_send.py
@@ -37,7 +37,7 @@ class Send(MessagingHandler):
def on_sendable(self, event):
while event.sender.credit and self.sent < self.total:
msg = Message(body={'sequence':(self.sent+1)})
- event.sender.send_msg(msg)
+ event.sender.send(msg)
self.sent += 1
def on_accepted(self, event):
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c1424402/examples/engine/py/sync_client.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/sync_client.py b/examples/engine/py/sync_client.py
index 362385a..86cd7c2 100755
--- a/examples/engine/py/sync_client.py
+++ b/examples/engine/py/sync_client.py
@@ -51,7 +51,7 @@ class SyncRequestClient(IncomingMessageHandler):
def invoke(self, request):
"""Send a request, wait for and return the response"""
request.reply_to = self.reply_to
- self.sender.send_msg(request)
+ self.sender.send(request)
self.connection.wait(lambda: self.response, msg="Waiting for response")
response = self.response
self.response = None # Ready for next response.
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c1424402/examples/engine/py/tx_send.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/tx_send.py b/examples/engine/py/tx_send.py
index 46242ee..f2eb246 100755
--- a/examples/engine/py/tx_send.py
+++ b/examples/engine/py/tx_send.py
@@ -50,7 +50,7 @@ class TxSend(MessagingHandler, TransactionHandler):
def send(self):
while self.transaction and self.sender.credit and (self.committed + self.current_batch) < self.total:
msg = Message(body={'sequence':(self.committed+self.current_batch+1)})
- self.sender.send_msg(msg, transaction=self.transaction)
+ self.transaction.send(self.sender, msg)
self.current_batch += 1
if self.current_batch == self.batch_size:
self.transaction.commit()
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c1424402/proton-c/bindings/python/proton/__init__.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/__init__.py b/proton-c/bindings/python/proton/__init__.py
index 241d4dd..d01721d 100644
--- a/proton-c/bindings/python/proton/__init__.py
+++ b/proton-c/bindings/python/proton/__init__.py
@@ -1119,6 +1119,15 @@ The format of the message.
self._check(err)
return data
+ def send(self, sender, tag=None):
+ dlv = sender.delivery(tag or sender.delivery_tag())
+ encoded = self.encode()
+ sender.stream(encoded)
+ sender.advance()
+ if sender.snd_settle_mode == Link.SND_SETTLED:
+ dlv.settle()
+ return dlv
+
def __repr2__(self):
props = []
for attr in ("inferred", "address", "reply_to", "durable", "ttl",
@@ -2719,9 +2728,37 @@ class Sender(Link):
def offered(self, n):
pn_link_offered(self._impl, n)
- def send(self, bytes):
+ def stream(self, bytes):
+ """
+ Send specified bytes as part of the current delivery
+ """
return self._check(pn_link_send(self._impl, bytes))
+ def send(self, obj, tag=None):
+ """
+ Send specified object over this sender; the object is expected to
+ have a send() method on it that takes the sender and an optional
+ tag as arguments.
+
+ Where the object is a Message, this will send the message over
+ this link, creating a new delivery for the purpose.
+ """
+ if hasattr(obj, 'send'):
+ return obj.send(self, tag=tag)
+ else:
+ # treat object as bytes
+ return self.stream(obj)
+
+ def delivery_tag(self):
+ if not hasattr(self, 'tag_generator'):
+ def simple_tags():
+ count = 1
+ while True:
+ yield str(count)
+ count += 1
+ self.tag_generator = simple_tags()
+ return next(self.tag_generator)
+
class Receiver(Link):
def flow(self, n):
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c1424402/proton-c/bindings/python/proton/reactors.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/reactors.py b/proton-c/bindings/python/proton/reactors.py
index f692eed..ecf293d 100644
--- a/proton-c/bindings/python/proton/reactors.py
+++ b/proton-c/bindings/python/proton/reactors.py
@@ -453,26 +453,6 @@ class SelectLoop(object):
else:
return False
-def delivery_tags():
- count = 1
- while True:
- yield str(count)
- count += 1
-
-def send_msg(sender, msg, tag=None, handler=None, transaction=None):
- dlv = sender.delivery(tag or next(sender.tags))
- if transaction:
- dlv.local.data = [transaction.id]
- dlv.update(0x34)
- if handler:
- dlv.context = handler
- sender.send(msg.encode())
- sender.advance()
- return dlv
-
-def _send_msg(self, msg, tag=None, handler=None, transaction=None):
- return send_msg(self, msg, tag, handler, transaction)
-
class Transaction(object):
"""
@@ -512,10 +492,17 @@ class Transaction(object):
self._discharge = self._send_ctrl(symbol(u'amqp:discharge:list'), [self.id, failed])
def _send_ctrl(self, descriptor, value):
- delivery = self.txn_ctrl.send_msg(Message(body=Described(descriptor, value)), handler=self.internal_handler)
+ delivery = self.txn_ctrl.send(Message(body=Described(descriptor, value)))
+ delivery.context=self.internal_handler
delivery.transaction = self
return delivery
+ def send(self, sender, msg, tag=None):
+ dlv = sender.send(msg, tag=tag)
+ dlv.local.data = [self.id]
+ dlv.update(0x34)
+ return dlv
+
def accept(self, delivery):
self.update(delivery, PN_ACCEPTED)
if self.settle_before_discharge:
@@ -779,8 +766,8 @@ class Container(object):
snd.target.address = target
if handler:
snd.context = handler
- snd.tags = tags or delivery_tags()
- snd.send_msg = types.MethodType(_send_msg, snd)
+ if tags:
+ snd.tag_generator = tags
_apply_link_options(options, snd)
snd.open()
return snd
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c1424402/proton-c/bindings/python/proton/utils.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/utils.py b/proton-c/bindings/python/proton/utils.py
index fbdd70c..4c9d509 100644
--- a/proton-c/bindings/python/proton/utils.py
+++ b/proton-c/bindings/python/proton/utils.py
@@ -19,7 +19,7 @@
import collections, Queue, socket, time, threading
from proton import ConnectionException, Delivery, Endpoint, Handler, LinkException, Message
from proton import ProtonException, Timeout, Url
-from proton.reactors import AmqpSocket, Container, Events, SelectLoop, send_msg
+from proton.reactors import AmqpSocket, Container, Events, SelectLoop
from proton.handlers import Acking, MessagingHandler, ScopedHandler, IncomingMessageHandler
def utf8(s):
@@ -60,8 +60,8 @@ class BlockingSender(BlockingLink):
self.link.close()
raise LinkException("Failed to open sender %s, target does not match" % self.link.name)
- def send_msg(self, msg, timeout=False, error_states=None):
- delivery = send_msg(self.link, msg)
+ def send(self, msg, timeout=False, error_states=None):
+ delivery = self.link.send(msg)
self.connection.wait(lambda: delivery.settled, msg="Sending on sender %s" % self.link.name, timeout=timeout)
bad = error_states
if bad is None:
@@ -265,7 +265,7 @@ class SyncRequestResponse(IncomingMessageHandler):
raise ValueError("Request message has no address: %s" % request)
request.reply_to = self.reply_to
request.correlation_id = correlation_id = self.correlation_id.next()
- self.sender.send_msg(request)
+ self.sender.send(request)
def wakeup():
return self.response and (self.response.correlation_id == correlation_id)
self.connection.wait(wakeup, msg="Waiting for response")
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c1424402/tests/python/proton_tests/utils.py
----------------------------------------------------------------------
diff --git a/tests/python/proton_tests/utils.py b/tests/python/proton_tests/utils.py
index 5cd119c..68ec007 100644
--- a/tests/python/proton_tests/utils.py
+++ b/tests/python/proton_tests/utils.py
@@ -24,7 +24,7 @@ from proton_tests.common import Test, free_tcp_port
from copy import copy
from proton import Message, Url, generate_uuid
from proton.handlers import MessagingHandler
-from proton.reactors import Container, send_msg, delivery_tags
+from proton.reactors import Container
from proton.utils import SyncRequestResponse, BlockingConnection
@@ -54,14 +54,13 @@ class EchoServer(MessagingHandler, Thread):
if event.link.remote_source and event.link.remote_source.dynamic:
event.link.source.address = str(generate_uuid())
self.senders[event.link.source.address] = event.link
- event.link.tags = delivery_tags()
def on_message(self, event):
m = event.message
sender = self.senders.get(m.reply_to)
if sender:
reply = Message(address=m.reply_to, body=m.body, correlation_id=m.correlation_id)
- send_msg(sender, reply)
+ sender.send(reply)
def on_connection_closing(self, event):
self.acceptor.close()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org