You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2015/01/28 23:05:20 UTC

[3/3] qpid-proton git commit: Improved send for simple case

Improved send for simple case


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/c1424402
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/c1424402
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/c1424402

Branch: refs/heads/master
Commit: c14244020a46ba0b20edf3e5c8599896751553d0
Parents: 1b0e7db
Author: Gordon Sim <gs...@redhat.com>
Authored: Tue Jan 27 14:16:05 2015 +0000
Committer: Gordon Sim <gs...@redhat.com>
Committed: Wed Jan 28 21:57:01 2015 +0000

----------------------------------------------------------------------
 examples/engine/py/client.py                    |  2 +-
 examples/engine/py/client_http.py               |  2 +-
 examples/engine/py/db_send.py                   |  2 +-
 examples/engine/py/helloworld.py                |  2 +-
 examples/engine/py/helloworld_blocking.py       |  2 +-
 examples/engine/py/helloworld_direct.py         |  2 +-
 examples/engine/py/helloworld_direct_tornado.py |  2 +-
 examples/engine/py/helloworld_tornado.py        |  2 +-
 examples/engine/py/server.py                    |  2 +-
 examples/engine/py/server_direct.py             |  6 +--
 examples/engine/py/server_tx.py                 |  2 +-
 examples/engine/py/simple_send.py               |  2 +-
 examples/engine/py/sync_client.py               |  2 +-
 examples/engine/py/tx_send.py                   |  2 +-
 proton-c/bindings/python/proton/__init__.py     | 39 +++++++++++++++++++-
 proton-c/bindings/python/proton/reactors.py     | 33 +++++------------
 proton-c/bindings/python/proton/utils.py        |  8 ++--
 tests/python/proton_tests/utils.py              |  5 +--
 18 files changed, 69 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c1424402/examples/engine/py/client.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/client.py b/examples/engine/py/client.py
index f39c14a..0bc6bec 100755
--- a/examples/engine/py/client.py
+++ b/examples/engine/py/client.py
@@ -36,7 +36,7 @@ class Client(MessagingHandler):
     def next_request(self):
         if self.receiver.remote_source.address:
             req = Message(reply_to=self.receiver.remote_source.address, body=self.requests[0])
-            self.sender.send_msg(req)
+            self.sender.send(req)
 
     def on_link_opened(self, event):
         if event.receiver == self.receiver:

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c1424402/examples/engine/py/client_http.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/client_http.py b/examples/engine/py/client_http.py
index 6aa438d..aa46ed9 100755
--- a/examples/engine/py/client_http.py
+++ b/examples/engine/py/client_http.py
@@ -60,7 +60,7 @@ class Client(MessagingHandler):
             request, handler = self.pending.pop(0)
             self.sent.append((request, handler))
             req = Message(reply_to=self.reply_address, body=request)
-            self.sender.send_msg(req)
+            self.sender.send(req)
 
     def request(self, body, handler):
         self.pending.append((body, handler))

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c1424402/examples/engine/py/db_send.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/db_send.py b/examples/engine/py/db_send.py
index d3b321a..1e1ad3f 100755
--- a/examples/engine/py/db_send.py
+++ b/examples/engine/py/db_send.py
@@ -68,7 +68,7 @@ class Send(MessagingHandler):
             if not self.keep_sending(): return
             record = self.records.get(False)
             id = record['id']
-            self.sender.send_msg(Message(id=id, durable=True, body=record['description']), tag=str(id))
+            self.sender.send(Message(id=id, durable=True, body=record['description']), tag=str(id))
             self.sent += 1
             print "sent message %s" % id
         self.request_records()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c1424402/examples/engine/py/helloworld.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/helloworld.py b/examples/engine/py/helloworld.py
index 71f2c32..4ec53ca 100755
--- a/examples/engine/py/helloworld.py
+++ b/examples/engine/py/helloworld.py
@@ -34,7 +34,7 @@ class HelloWorld(MessagingHandler):
         event.container.create_sender(conn, self.address)
 
     def on_sendable(self, event):
-        event.sender.send_msg(Message(body=u"Hello World!"))
+        event.sender.send(Message(body=u"Hello World!"))
         event.sender.close()
 
     def on_message(self, event):

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c1424402/examples/engine/py/helloworld_blocking.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/helloworld_blocking.py b/examples/engine/py/helloworld_blocking.py
index 9c5e062..d9a24a9 100755
--- a/examples/engine/py/helloworld_blocking.py
+++ b/examples/engine/py/helloworld_blocking.py
@@ -30,6 +30,6 @@ class HelloWorldReceiver(IncomingMessageHandler):
 conn = BlockingConnection("localhost:5672")
 conn.create_receiver("examples", handler=HelloWorldReceiver())
 sender = conn.create_sender("examples")
-sender.send_msg(Message(body=u"Hello World!"));
+sender.send(Message(body=u"Hello World!"));
 conn.run()
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c1424402/examples/engine/py/helloworld_direct.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/helloworld_direct.py b/examples/engine/py/helloworld_direct.py
index 1d6e475..6b5cb2e 100755
--- a/examples/engine/py/helloworld_direct.py
+++ b/examples/engine/py/helloworld_direct.py
@@ -32,7 +32,7 @@ class HelloWorld(MessagingHandler):
         event.container.create_sender(self.url)
 
     def on_sendable(self, event):
-        event.sender.send_msg(Message(body=u"Hello World!"))
+        event.sender.send(Message(body=u"Hello World!"))
         event.sender.close()
 
     def on_message(self, event):

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c1424402/examples/engine/py/helloworld_direct_tornado.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/helloworld_direct_tornado.py b/examples/engine/py/helloworld_direct_tornado.py
index c3bcd80..e798dae 100755
--- a/examples/engine/py/helloworld_direct_tornado.py
+++ b/examples/engine/py/helloworld_direct_tornado.py
@@ -35,7 +35,7 @@ class HelloWorld(MessagingHandler):
         event.container.create_sender(conn, self.address)
 
     def on_sendable(self, event):
-        event.sender.send_msg(Message(body=u"Hello World!"))
+        event.sender.send(Message(body=u"Hello World!"))
         event.sender.close()
 
     def on_message(self, event):

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c1424402/examples/engine/py/helloworld_tornado.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/helloworld_tornado.py b/examples/engine/py/helloworld_tornado.py
index d0523e8..c56ca8d 100755
--- a/examples/engine/py/helloworld_tornado.py
+++ b/examples/engine/py/helloworld_tornado.py
@@ -35,7 +35,7 @@ class HelloWorld(MessagingHandler):
         event.container.create_sender(conn, self.address)
 
     def on_sendable(self, event):
-        event.sender.send_msg(Message(body=u"Hello World!"))
+        event.sender.send(Message(body=u"Hello World!"))
         event.sender.close()
 
     def on_message(self, event):

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c1424402/examples/engine/py/server.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/server.py b/examples/engine/py/server.py
index 3e6aad4..5326fa0 100755
--- a/examples/engine/py/server.py
+++ b/examples/engine/py/server.py
@@ -46,7 +46,7 @@ class Server(MessagingHandler):
         if not sender:
             sender = self.container.create_sender(self.conn, event.message.reply_to)
             self.senders[event.message.reply_to] = sender
-        sender.send_msg(Message(address=event.message.reply_to, body=event.message.body.upper()))
+        sender.send(Message(address=event.message.reply_to, body=event.message.body.upper()))
 
 try:
     Container(Server("localhost:5672", "examples")).run()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c1424402/examples/engine/py/server_direct.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/server_direct.py b/examples/engine/py/server_direct.py
index 658abd4..6e1b830 100755
--- a/examples/engine/py/server_direct.py
+++ b/examples/engine/py/server_direct.py
@@ -20,7 +20,7 @@
 
 from proton import generate_uuid, Message
 from proton.handlers import MessagingHandler
-from proton.reactors import Container, delivery_tags, send_msg
+from proton.reactors import Container
 
 class Server(MessagingHandler):
     def __init__(self, url):
@@ -37,11 +37,9 @@ class Server(MessagingHandler):
             if event.link.remote_source and event.link.remote_source.dynamic:
                 event.link.source.address = str(generate_uuid())
                 self.senders[event.link.source.address] = event.link
-                event.link.tags = delivery_tags()
             elif event.link.remote_target and event.link.remote_target.address:
                 event.link.target.address = event.link.remote_target.address
                 self.senders[event.link.remote_target.address] = event.link
-                event.link.tags = delivery_tags()
             elif event.link.remote_source:
                 event.link.source.address = event.link.remote_source.address
         elif event.link.remote_target:
@@ -50,7 +48,7 @@ class Server(MessagingHandler):
     def on_message(self, event):
         sender = self.senders.get(event.message.reply_to)
         if sender:
-            send_msg(sender, Message(address=event.message.reply_to, body=event.message.body.upper()))
+            sender.send(Message(address=event.message.reply_to, body=event.message.body.upper()))
 
 try:
     Container(Server("localhost:8888")).run()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c1424402/examples/engine/py/server_tx.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/server_tx.py b/examples/engine/py/server_tx.py
index ce43973..02929fa 100755
--- a/examples/engine/py/server_tx.py
+++ b/examples/engine/py/server_tx.py
@@ -30,7 +30,7 @@ class TxRequest(TransactionHandler):
         self.request_delivery = request_delivery
 
     def on_transaction_declared(self, event):
-        self.sender.send_msg(self.response, transaction=event.transaction)
+        event.transaction.send(self.sender, self.response)
         event.transaction.accept(self.request_delivery)
         event.transaction.commit()
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c1424402/examples/engine/py/simple_send.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/simple_send.py b/examples/engine/py/simple_send.py
index 9ce9c58..40adc8d 100755
--- a/examples/engine/py/simple_send.py
+++ b/examples/engine/py/simple_send.py
@@ -37,7 +37,7 @@ class Send(MessagingHandler):
     def on_sendable(self, event):
         while event.sender.credit and self.sent < self.total:
             msg = Message(body={'sequence':(self.sent+1)})
-            event.sender.send_msg(msg)
+            event.sender.send(msg)
             self.sent += 1
 
     def on_accepted(self, event):

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c1424402/examples/engine/py/sync_client.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/sync_client.py b/examples/engine/py/sync_client.py
index 362385a..86cd7c2 100755
--- a/examples/engine/py/sync_client.py
+++ b/examples/engine/py/sync_client.py
@@ -51,7 +51,7 @@ class SyncRequestClient(IncomingMessageHandler):
     def invoke(self, request):
         """Send a request, wait for and return the response"""
         request.reply_to = self.reply_to
-        self.sender.send_msg(request)
+        self.sender.send(request)
         self.connection.wait(lambda: self.response, msg="Waiting for response")
         response = self.response
         self.response = None    # Ready for next response.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c1424402/examples/engine/py/tx_send.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/tx_send.py b/examples/engine/py/tx_send.py
index 46242ee..f2eb246 100755
--- a/examples/engine/py/tx_send.py
+++ b/examples/engine/py/tx_send.py
@@ -50,7 +50,7 @@ class TxSend(MessagingHandler, TransactionHandler):
     def send(self):
         while self.transaction and self.sender.credit and (self.committed + self.current_batch) < self.total:
             msg = Message(body={'sequence':(self.committed+self.current_batch+1)})
-            self.sender.send_msg(msg, transaction=self.transaction)
+            self.transaction.send(self.sender, msg)
             self.current_batch += 1
             if self.current_batch == self.batch_size:
                 self.transaction.commit()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c1424402/proton-c/bindings/python/proton/__init__.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/__init__.py b/proton-c/bindings/python/proton/__init__.py
index 241d4dd..d01721d 100644
--- a/proton-c/bindings/python/proton/__init__.py
+++ b/proton-c/bindings/python/proton/__init__.py
@@ -1119,6 +1119,15 @@ The format of the message.
         self._check(err)
         return data
 
+  def send(self, sender, tag=None):
+    dlv = sender.delivery(tag or sender.delivery_tag())
+    encoded = self.encode()
+    sender.stream(encoded)
+    sender.advance()
+    if sender.snd_settle_mode == Link.SND_SETTLED:
+      dlv.settle()
+    return dlv
+
   def __repr2__(self):
     props = []
     for attr in ("inferred", "address", "reply_to", "durable", "ttl",
@@ -2719,9 +2728,37 @@ class Sender(Link):
   def offered(self, n):
     pn_link_offered(self._impl, n)
 
-  def send(self, bytes):
+  def stream(self, bytes):
+    """
+    Send specified bytes as part of the current delivery
+    """
     return self._check(pn_link_send(self._impl, bytes))
 
+  def send(self, obj, tag=None):
+    """
+    Send specified object over this sender; the object is expected to
+    have a send() method on it that takes the sender and an optional
+    tag as arguments.
+
+    Where the object is a Message, this will send the message over
+    this link, creating a new delivery for the purpose.
+    """
+    if hasattr(obj, 'send'):
+      return obj.send(self, tag=tag)
+    else:
+      # treat object as bytes
+      return self.stream(obj)
+
+  def delivery_tag(self):
+    if not hasattr(self, 'tag_generator'):
+      def simple_tags():
+        count = 1
+        while True:
+          yield str(count)
+          count += 1
+      self.tag_generator = simple_tags()
+    return next(self.tag_generator)
+
 class Receiver(Link):
 
   def flow(self, n):

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c1424402/proton-c/bindings/python/proton/reactors.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/reactors.py b/proton-c/bindings/python/proton/reactors.py
index f692eed..ecf293d 100644
--- a/proton-c/bindings/python/proton/reactors.py
+++ b/proton-c/bindings/python/proton/reactors.py
@@ -453,26 +453,6 @@ class SelectLoop(object):
         else:
             return False
 
-def delivery_tags():
-    count = 1
-    while True:
-        yield str(count)
-        count += 1
-
-def send_msg(sender, msg, tag=None, handler=None, transaction=None):
-    dlv = sender.delivery(tag or next(sender.tags))
-    if transaction:
-        dlv.local.data = [transaction.id]
-        dlv.update(0x34)
-    if handler:
-        dlv.context = handler
-    sender.send(msg.encode())
-    sender.advance()
-    return dlv
-
-def _send_msg(self, msg, tag=None, handler=None, transaction=None):
-    return send_msg(self, msg, tag, handler, transaction)
-
 
 class Transaction(object):
     """
@@ -512,10 +492,17 @@ class Transaction(object):
         self._discharge = self._send_ctrl(symbol(u'amqp:discharge:list'), [self.id, failed])
 
     def _send_ctrl(self, descriptor, value):
-        delivery = self.txn_ctrl.send_msg(Message(body=Described(descriptor, value)), handler=self.internal_handler)
+        delivery = self.txn_ctrl.send(Message(body=Described(descriptor, value)))
+        delivery.context=self.internal_handler
         delivery.transaction = self
         return delivery
 
+    def send(self, sender, msg, tag=None):
+        dlv = sender.send(msg, tag=tag)
+        dlv.local.data = [self.id]
+        dlv.update(0x34)
+        return dlv
+
     def accept(self, delivery):
         self.update(delivery, PN_ACCEPTED)
         if self.settle_before_discharge:
@@ -779,8 +766,8 @@ class Container(object):
             snd.target.address = target
         if handler:
             snd.context = handler
-        snd.tags = tags or delivery_tags()
-        snd.send_msg = types.MethodType(_send_msg, snd)
+        if tags:
+            snd.tag_generator = tags
         _apply_link_options(options, snd)
         snd.open()
         return snd

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c1424402/proton-c/bindings/python/proton/utils.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/utils.py b/proton-c/bindings/python/proton/utils.py
index fbdd70c..4c9d509 100644
--- a/proton-c/bindings/python/proton/utils.py
+++ b/proton-c/bindings/python/proton/utils.py
@@ -19,7 +19,7 @@
 import collections, Queue, socket, time, threading
 from proton import ConnectionException, Delivery, Endpoint, Handler, LinkException, Message
 from proton import ProtonException, Timeout, Url
-from proton.reactors import AmqpSocket, Container, Events, SelectLoop, send_msg
+from proton.reactors import AmqpSocket, Container, Events, SelectLoop
 from proton.handlers import Acking, MessagingHandler, ScopedHandler, IncomingMessageHandler
 
 def utf8(s):
@@ -60,8 +60,8 @@ class BlockingSender(BlockingLink):
             self.link.close()
             raise LinkException("Failed to open sender %s, target does not match" % self.link.name)
 
-    def send_msg(self, msg, timeout=False, error_states=None):
-        delivery = send_msg(self.link, msg)
+    def send(self, msg, timeout=False, error_states=None):
+        delivery = self.link.send(msg)
         self.connection.wait(lambda: delivery.settled, msg="Sending on sender %s" % self.link.name, timeout=timeout)
         bad = error_states
         if bad is None:
@@ -265,7 +265,7 @@ class SyncRequestResponse(IncomingMessageHandler):
             raise ValueError("Request message has no address: %s" % request)
         request.reply_to = self.reply_to
         request.correlation_id = correlation_id = self.correlation_id.next()
-        self.sender.send_msg(request)
+        self.sender.send(request)
         def wakeup():
             return self.response and (self.response.correlation_id == correlation_id)
         self.connection.wait(wakeup, msg="Waiting for response")

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c1424402/tests/python/proton_tests/utils.py
----------------------------------------------------------------------
diff --git a/tests/python/proton_tests/utils.py b/tests/python/proton_tests/utils.py
index 5cd119c..68ec007 100644
--- a/tests/python/proton_tests/utils.py
+++ b/tests/python/proton_tests/utils.py
@@ -24,7 +24,7 @@ from proton_tests.common import Test, free_tcp_port
 from copy import copy
 from proton import Message, Url, generate_uuid
 from proton.handlers import MessagingHandler
-from proton.reactors import Container, send_msg, delivery_tags
+from proton.reactors import Container
 from proton.utils import SyncRequestResponse, BlockingConnection
 
 
@@ -54,14 +54,13 @@ class EchoServer(MessagingHandler, Thread):
             if event.link.remote_source and event.link.remote_source.dynamic:
                 event.link.source.address = str(generate_uuid())
                 self.senders[event.link.source.address] = event.link
-                event.link.tags = delivery_tags()
 
     def on_message(self, event):
         m = event.message
         sender = self.senders.get(m.reply_to)
         if sender:
             reply = Message(address=m.reply_to, body=m.body, correlation_id=m.correlation_id)
-            send_msg(sender, reply)
+            sender.send(reply)
 
     def on_connection_closing(self, event):
         self.acceptor.close()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org