You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2015/01/07 22:00:27 UTC

[1/4] qpid-proton git commit: PROTON-667: allow state field to be sent along with transfer frame

Repository: qpid-proton
Updated Branches:
  refs/heads/master a439ee2de -> 8c35ce407


PROTON-667: allow state field to be sent along with transfer frame


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/e052a016
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/e052a016
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/e052a016

Branch: refs/heads/master
Commit: e052a016adcbfd0d2b5137ac5e6be829478b2c6a
Parents: 1e96dae
Author: Gordon Sim <gs...@apache.org>
Authored: Wed Oct 22 19:12:23 2014 +0000
Committer: Gordon Sim <gs...@redhat.com>
Committed: Wed Jan 7 20:23:57 2015 +0000

----------------------------------------------------------------------
 proton-c/src/transport/transport.c | 14 ++++++++++----
 1 file changed, 10 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e052a016/proton-c/src/transport/transport.c
----------------------------------------------------------------------
diff --git a/proton-c/src/transport/transport.c b/proton-c/src/transport/transport.c
index 6220329..1a1e969 100644
--- a/proton-c/src/transport/transport.c
+++ b/proton-c/src/transport/transport.c
@@ -741,7 +741,9 @@ int pn_post_amqp_transfer_frame(pn_transport_t *transport, uint16_t ch,
                                 uint32_t message_format,
                                 bool settled,
                                 bool more,
-                                pn_sequence_t frame_limit)
+                                pn_sequence_t frame_limit,
+                                uint64_t code,
+                                pn_data_t* state)
 {
   bool more_flag = more;
   int framecount = 0;
@@ -751,10 +753,10 @@ int pn_post_amqp_transfer_frame(pn_transport_t *transport, uint16_t ch,
 
  compute_performatives:
   pn_data_clear(transport->output_args);
-  int err = pn_data_fill(transport->output_args, "DL[IIzIoo]", TRANSFER,
+  int err = pn_data_fill(transport->output_args, "DL[IIzIoon?DLC]", TRANSFER,
                          handle, id, tag->size, tag->start,
                          message_format,
-                         settled, more_flag);
+                         settled, more_flag, (bool)code, code, state);
   if (err) {
     pn_transport_logf(transport,
                       "error posting transfer frame: %s: %s", pn_code(err),
@@ -1917,6 +1919,9 @@ int pn_process_tpwork_sender(pn_transport_t *transport, pn_delivery_t *delivery,
       pn_bytes_t bytes = pn_buffer_bytes(delivery->bytes);
       size_t full_size = bytes.size;
       pn_bytes_t tag = pn_buffer_bytes(delivery->tag);
+      pn_data_clear(transport->disp_data);
+      pni_disposition_encode(&delivery->local, transport->disp_data);
+
       int count = pn_post_amqp_transfer_frame(transport,
                                               ssn_state->local_channel,
                                               link_state->local_handle,
@@ -1924,7 +1929,8 @@ int pn_process_tpwork_sender(pn_transport_t *transport, pn_delivery_t *delivery,
                                               0, // message-format
                                               delivery->local.settled,
                                               !delivery->done,
-                                              ssn_state->remote_incoming_window);
+                                              ssn_state->remote_incoming_window,
+                                              delivery->local.type, transport->disp_data);
       if (count < 0) return count;
       xfr_posted = true;
       ssn_state->outgoing_transfer_count += count;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[3/4] qpid-proton git commit: Added automatable test script for examples and fixed transactional examples.

Posted by gs...@apache.org.
Added automatable test script for examples and fixed transactional examples.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/a2101829
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/a2101829
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/a2101829

Branch: refs/heads/master
Commit: a21018297035991b78114fd7e39a872bab29cb00
Parents: e052a01
Author: Gordon Sim <gs...@redhat.com>
Authored: Tue Jan 6 13:20:01 2015 +0000
Committer: Gordon Sim <gs...@redhat.com>
Committed: Wed Jan 7 20:24:19 2015 +0000

----------------------------------------------------------------------
 examples/engine/py/db_common.py             |  23 +++++-
 examples/engine/py/db_recv.py               |  35 ++++++--
 examples/engine/py/db_send.py               |  24 +++++-
 examples/engine/py/simple_recv.py           |  20 ++++-
 examples/engine/py/simple_send.py           |  11 ++-
 examples/engine/py/test_examples.py         | 100 +++++++++++++++++++++++
 examples/engine/py/tx_recv.py               |  38 ++++++---
 examples/engine/py/tx_recv_interactive.py   |   6 +-
 examples/engine/py/tx_send.py               |  28 +++++--
 examples/engine/py/tx_send_sync.py          |  76 -----------------
 proton-c/bindings/python/proton/handlers.py |  56 ++++++-------
 proton-c/bindings/python/proton/reactors.py |  16 +++-
 12 files changed, 289 insertions(+), 144 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a2101829/examples/engine/py/db_common.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/db_common.py b/examples/engine/py/db_common.py
index 584c15a..05ad6f5 100644
--- a/examples/engine/py/db_common.py
+++ b/examples/engine/py/db_common.py
@@ -29,16 +29,23 @@ class Db(object):
         self.tasks = Queue.Queue()
         self.position = None
         self.pending_events = []
+        self.running = True
         self.thread = threading.Thread(target=self._process)
         self.thread.daemon=True
         self.thread.start()
 
+    def close(self):
+        self.tasks.put(lambda conn: self._close())
+
     def reset(self):
         self.tasks.put(lambda conn: self._reset())
 
     def load(self, records, event=None):
         self.tasks.put(lambda conn: self._load(conn, records, event))
 
+    def get_id(self, event):
+        self.tasks.put(lambda conn: self._get_id(conn, event))
+
     def insert(self, id, data, event=None):
         self.tasks.put(lambda conn: self._insert(conn, id, data, event))
 
@@ -48,6 +55,19 @@ class Db(object):
     def _reset(self, ignored=None):
         self.position = None
 
+    def _close(self, ignored=None):
+        self.running = False
+
+    def _get_id(self, conn, event):
+        cursor = conn.execute("SELECT * FROM records ORDER BY id DESC")
+        row = cursor.fetchone()
+        if event:
+            if row:
+                event.id = row['id']
+            else:
+                event.id = 0
+            self.events.trigger(event)
+
     def _load(self, conn, records, event):
         if self.position:
             cursor = conn.execute("SELECT * FROM records WHERE id > ? ORDER BY id", (self.position,))
@@ -80,7 +100,7 @@ class Db(object):
         conn = sqlite3.connect(self.db)
         conn.row_factory = sqlite3.Row
         with conn:
-            while True:
+            while self.running:
                 f = self.tasks.get(True)
                 try:
                     while True:
@@ -91,3 +111,4 @@ class Db(object):
                 for event in self.pending_events:
                     self.events.trigger(event)
                 self.pending_events = []
+        self.events.close()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a2101829/examples/engine/py/db_recv.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/db_recv.py b/examples/engine/py/db_recv.py
index 8b4490d..15170ee 100755
--- a/examples/engine/py/db_recv.py
+++ b/examples/engine/py/db_recv.py
@@ -18,36 +18,61 @@
 # under the License.
 #
 
+import optparse
 from proton.handlers import MessagingHandler
 from proton.reactors import ApplicationEvent, Container
 from db_common import Db
 
 class Recv(MessagingHandler):
-    def __init__(self, url):
+    def __init__(self, url, count):
         super(Recv, self).__init__(auto_accept=False)
         self.url = url
         self.delay = 0
         # TODO: load last tag from db
         self.last_id = None
+        self.expected = count
+        self.received = 0
+        self.accepted = 0
 
     def on_start(self, event):
         self.db = Db("dst_db", event.container.get_event_trigger())
+        e = ApplicationEvent("id_loaded")
+        e.container = event.container
+        self.db.get_id(e)
+
+    def on_id_loaded(self, event):
+        self.last_id = event.id
         event.container.create_receiver(self.url)
 
     def on_record_inserted(self, event):
         self.accept(event.delivery)
+        self.accepted += 1
+        if self.accepted == self.expected:
+            event.connection.close()
+            self.db.close()
 
     def on_message(self, event):
         id = int(event.message.id)
         if (not self.last_id) or id > self.last_id:
-            self.last_id = id
-            self.db.insert(id, event.message.body, ApplicationEvent("record_inserted", delivery=event.delivery))
-            print "inserted message %s" % id
+            if self.received < self.expected:
+                self.received += 1
+                self.last_id = id
+                self.db.insert(id, event.message.body, ApplicationEvent("record_inserted", delivery=event.delivery))
+                print "inserted message %s" % id
+            else:
+                self.release(event.delivery)
         else:
             self.accept(event.delivery)
 
+parser = optparse.OptionParser(usage="usage: %prog [options]")
+parser.add_option("-a", "--address", default="localhost:5672/examples",
+                  help="address from which messages are received (default %default)")
+parser.add_option("-m", "--messages", type="int", default=0,
+                  help="number of messages to receive; 0 receives indefinitely (default %default)")
+opts, args = parser.parse_args()
+
 try:
-    Container(Recv("localhost:5672/examples")).run()
+    Container(Recv(opts.address, opts.messages)).run()
 except KeyboardInterrupt: pass
 
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a2101829/examples/engine/py/db_send.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/db_send.py b/examples/engine/py/db_send.py
index ce3ce79..b298c11 100755
--- a/examples/engine/py/db_send.py
+++ b/examples/engine/py/db_send.py
@@ -18,6 +18,7 @@
 # under the License.
 #
 
+import optparse
 import Queue
 import time
 from proton import Message
@@ -26,13 +27,18 @@ from proton.reactors import ApplicationEvent, Container
 from db_common import Db
 
 class Send(MessagingHandler):
-    def __init__(self, url):
+    def __init__(self, url, count):
         super(Send, self).__init__()
         self.url = url
         self.delay = 0
         self.sent = 0
+        self.confirmed = 0
         self.load_count = 0
         self.records = Queue.Queue(maxsize=50)
+        self.target = count
+
+    def keep_sending(self):
+        return self.target == 0 or self.sent < self.target
 
     def on_start(self, event):
         self.container = event.container
@@ -59,6 +65,7 @@ class Send(MessagingHandler):
 
     def send(self):
         while self.sender.credit and not self.records.empty():
+            if not self.keep_sending(): return
             record = self.records.get(False)
             id = record['id']
             self.sender.send_msg(Message(id=id, durable=True, body=record['description']), tag=str(id))
@@ -70,16 +77,29 @@ class Send(MessagingHandler):
         id = int(event.delivery.tag)
         self.db.delete(id)
         print "settled message %s" % id
+        self.confirmed += 1
+        if self.confirmed == self.target:
+            event.connection.close()
+            self.db.close()
 
     def on_disconnected(self, event):
         self.db.reset()
+        self.sent = self.confirmed
 
     def on_timer(self, event):
         if event.subject == "data":
             print "Rechecking for data..."
             self.request_records()
 
+parser = optparse.OptionParser(usage="usage: %prog [options]",
+                               description="Send messages to the supplied address.")
+parser.add_option("-a", "--address", default="localhost:5672/examples",
+                  help="address to which messages are sent (default %default)")
+parser.add_option("-m", "--messages", type="int", default=0,
+                  help="number of messages to send; 0 sends indefinitely (default %default)")
+opts, args = parser.parse_args()
+
 try:
-    Container(Send("localhost:5672/examples")).run()
+    Container(Send(opts.address, opts.messages)).run()
 except KeyboardInterrupt: pass
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a2101829/examples/engine/py/simple_recv.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/simple_recv.py b/examples/engine/py/simple_recv.py
index 6825c86..04d6f34 100755
--- a/examples/engine/py/simple_recv.py
+++ b/examples/engine/py/simple_recv.py
@@ -18,22 +18,36 @@
 # under the License.
 #
 
+import optparse
 from proton.handlers import MessagingHandler
 from proton.reactors import Container
 
 class Recv(MessagingHandler):
-    def __init__(self, url):
+    def __init__(self, url, count):
         super(Recv, self).__init__()
         self.url = url
+        self.expected = count
+        self.received = 0
 
     def on_start(self, event):
         event.container.create_receiver(self.url)
 
     def on_message(self, event):
-        print event.message.body
+        if self.expected == 0 or self.received < self.expected:
+            print event.message.body
+            self.received += 1
+            if self.received == self.expected:
+                event.connection.close()
+
+parser = optparse.OptionParser(usage="usage: %prog [options]")
+parser.add_option("-a", "--address", default="localhost:5672/examples",
+                  help="address from which messages are received (default %default)")
+parser.add_option("-m", "--messages", type="int", default=100,
+                  help="number of messages to receive; 0 receives indefinitely (default %default)")
+opts, args = parser.parse_args()
 
 try:
-    Container(Recv("localhost:5672/examples")).run()
+    Container(Recv(opts.address, opts.messages)).run()
 except KeyboardInterrupt: pass
 
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a2101829/examples/engine/py/simple_send.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/simple_send.py b/examples/engine/py/simple_send.py
index 21530ef..7f92163 100755
--- a/examples/engine/py/simple_send.py
+++ b/examples/engine/py/simple_send.py
@@ -18,6 +18,7 @@
 # under the License.
 #
 
+import optparse
 from proton import Message
 from proton.handlers import MessagingHandler
 from proton.reactors import Container
@@ -48,6 +49,14 @@ class Send(MessagingHandler):
     def on_disconnected(self, event):
         self.sent = self.confirmed
 
+parser = optparse.OptionParser(usage="usage: %prog [options]",
+                               description="Send messages to the supplied address.")
+parser.add_option("-a", "--address", default="localhost:5672/examples",
+                  help="address to which messages are sent (default %default)")
+parser.add_option("-m", "--messages", type="int", default=100,
+                  help="number of messages to send (default %default)")
+opts, args = parser.parse_args()
+
 try:
-    Container(Send("localhost:5672/examples", 10000)).run()
+    Container(Send(opts.address, opts.messages)).run()
 except KeyboardInterrupt: pass

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a2101829/examples/engine/py/test_examples.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/test_examples.py b/examples/engine/py/test_examples.py
new file mode 100644
index 0000000..4ea09cf
--- /dev/null
+++ b/examples/engine/py/test_examples.py
@@ -0,0 +1,100 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import subprocess
+import unittest
+
+class ExamplesTest(unittest.TestCase):
+    def test_helloworld(self, example="helloworld.py"):
+        p = subprocess.Popen([example], stderr=subprocess.STDOUT, stdout=subprocess.PIPE)
+        p.wait()
+        output = [l.strip() for l in p.stdout]
+        self.assertEqual(output, ['Hello World!'])
+
+    def test_helloworld_direct(self):
+        self.test_helloworld('helloworld_direct.py')
+
+    def test_helloworld_blocking(self):
+        self.test_helloworld('helloworld_blocking.py')
+
+    def test_helloworld_tornado(self):
+        self.test_helloworld('helloworld_tornado.py')
+
+    def test_helloworld_direct_tornado(self):
+        self.test_helloworld('helloworld_direct_tornado.py')
+
+    def test_simple_send_recv(self, recv='simple_recv.py', send='simple_send.py'):
+        r = subprocess.Popen([recv], stderr=subprocess.STDOUT, stdout=subprocess.PIPE)
+        s = subprocess.Popen([send], stderr=subprocess.STDOUT, stdout=subprocess.PIPE)
+        s.wait()
+        r.wait()
+        actual = [l.strip() for l in r.stdout]
+        expected = ["{'sequence': %iL}" % (i+1) for i in range(100)]
+        self.assertEqual(actual, expected)
+
+    def test_client_server(self, client='client.py', server='server.py'):
+        s = subprocess.Popen(['server.py'], stderr=subprocess.STDOUT, stdout=subprocess.PIPE)
+        c = subprocess.Popen(['client.py'], stderr=subprocess.STDOUT, stdout=subprocess.PIPE)
+        c.wait()
+        actual = [l.strip() for l in c.stdout]
+        inputs = ["Twas brillig, and the slithy toves",
+                    "Did gire and gymble in the wabe.",
+                    "All mimsy were the borogroves,",
+                    "And the mome raths outgrabe."]
+        expected = ["%s => %s" % (l, l.upper()) for l in inputs]
+        self.assertEqual(actual, expected)
+        s.terminate()
+
+    def test_sync_client_server(self):
+        self.test_client_server(client='sync_client.py')
+
+    def test_client_tx_server(self):
+        self.test_client_server(server='tx_server.py')
+
+    def test_sync_client_tx_server(self):
+        self.test_client_server(client='sync_client.py', server='tx_server.py')
+
+    def test_db_send_recv(self):
+        self.maxDiff = None
+        # setup databases
+        subprocess.check_call(['db_ctrl.py', 'init', './src_db'])
+        subprocess.check_call(['db_ctrl.py', 'init', './dst_db'])
+        fill = subprocess.Popen(['db_ctrl.py', 'insert', './src_db'], stdin=subprocess.PIPE, stderr=subprocess.STDOUT, stdout=subprocess.PIPE)
+        for i in range(100):
+            fill.stdin.write("Message-%i\n" % (i+1))
+        fill.stdin.close()
+        fill.wait()
+        # run send and recv
+        r = subprocess.Popen(['db_recv.py', '-m', '100'], stderr=subprocess.STDOUT, stdout=subprocess.PIPE)
+        s = subprocess.Popen(['db_send.py', '-m', '100'], stderr=subprocess.STDOUT, stdout=subprocess.PIPE)
+        s.wait()
+        r.wait()
+        # verify output of receive
+        actual = [l.strip() for l in r.stdout]
+        expected = ["inserted message %i" % (i+1) for i in range(100)]
+        self.assertEqual(actual, expected)
+        # verify state of databases
+        v = subprocess.Popen(['db_ctrl.py', 'list', './dst_db'], stderr=subprocess.STDOUT, stdout=subprocess.PIPE)
+        v.wait()
+        expected = ["(%i, u'Message-%i')" % ((i+1), (i+1)) for i in range(100)]
+        actual = [l.strip() for l in v.stdout]
+        self.assertEqual(actual, expected)
+
+    def test_tx_send_tx_recv(self):
+        self.test_simple_send_recv(recv='tx_recv.py', send='tx_send.py')

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a2101829/examples/engine/py/tx_recv.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/tx_recv.py b/examples/engine/py/tx_recv.py
index fc4bb8a..e034592 100755
--- a/examples/engine/py/tx_recv.py
+++ b/examples/engine/py/tx_recv.py
@@ -18,25 +18,30 @@
 # under the License.
 #
 
+import optparse
+from proton import Url
 from proton.reactors import Container
-from proton.handlers import TransactionalClientHandler
+from proton.handlers import MessagingHandler, TransactionHandler
 
-class TxRecv(TransactionalClientHandler):
-    def __init__(self, batch_size):
-        super(TxRecv, self).__init__(prefetch=0)
-        self.current_batch = 0
+class TxRecv(MessagingHandler, TransactionHandler):
+    def __init__(self, url, messages, batch_size):
+        super(TxRecv, self).__init__(prefetch=0, auto_accept=False)
+        self.url = Url(url)
+        self.expected = messages
         self.batch_size = batch_size
+        self.current_batch = 0
+        self.committed = 0
 
     def on_start(self, event):
         self.container = event.container
-        self.conn = self.container.connect("localhost:5672")
-        self.receiver = self.container.create_receiver(self.conn, "examples")
+        self.conn = self.container.connect(self.url)
+        self.receiver = self.container.create_receiver(self.conn, self.url.path)
         self.container.declare_transaction(self.conn, handler=self)
         self.transaction = None
 
     def on_message(self, event):
         print event.message.body
-        self.accept(event.delivery, self.transaction)
+        self.transaction.accept(event.delivery)
         self.current_batch += 1
         if self.current_batch == self.batch_size:
             self.transaction.commit()
@@ -47,14 +52,27 @@ class TxRecv(TransactionalClientHandler):
         self.transaction = event.transaction
 
     def on_transaction_committed(self, event):
+        self.committed += self.current_batch
         self.current_batch = 0
-        self.container.declare_transaction(self.conn, handler=self)
+        if self.expected == 0 or self.committed < self.expected:
+            self.container.declare_transaction(self.conn, handler=self)
+        else:
+            event.connection.close()
 
     def on_disconnected(self, event):
         self.current_batch = 0
 
+parser = optparse.OptionParser(usage="usage: %prog [options]")
+parser.add_option("-a", "--address", default="localhost:5672/examples",
+                  help="address from which messages are received (default %default)")
+parser.add_option("-m", "--messages", type="int", default=100,
+                  help="number of messages to receive; 0 receives indefinitely (default %default)")
+parser.add_option("-b", "--batch-size", type="int", default=10,
+                  help="number of messages in each transaction (default %default)")
+opts, args = parser.parse_args()
+
 try:
-    Container(TxRecv(10)).run()
+    Container(TxRecv(opts.address, opts.messages, opts.batch_size)).run()
 except KeyboardInterrupt: pass
 
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a2101829/examples/engine/py/tx_recv_interactive.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/tx_recv_interactive.py b/examples/engine/py/tx_recv_interactive.py
index 6eb320e..aa85c4e 100755
--- a/examples/engine/py/tx_recv_interactive.py
+++ b/examples/engine/py/tx_recv_interactive.py
@@ -21,11 +21,11 @@
 import sys
 import threading
 from proton.reactors import ApplicationEvent, Container
-from proton.handlers import TransactionalClientHandler
+from proton.handlers import MessagingHandler, TransactionHandler
 
-class TxRecv(TransactionalClientHandler):
+class TxRecv(MessagingHandler, TransactionHandler):
     def __init__(self):
-        super(TxRecv, self).__init__(prefetch=0)
+        super(TxRecv, self).__init__(prefetch=0, auto_accept=False)
 
     def on_start(self, event):
         self.container = event.container

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a2101829/examples/engine/py/tx_send.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/tx_send.py b/examples/engine/py/tx_send.py
index 5b11280..d234da3 100755
--- a/examples/engine/py/tx_send.py
+++ b/examples/engine/py/tx_send.py
@@ -18,13 +18,15 @@
 # under the License.
 #
 
-from proton import Message
+import optparse
+from proton import Message, Url
 from proton.reactors import Container
-from proton.handlers import TransactionalClientHandler
+from proton.handlers import MessagingHandler, TransactionHandler
 
-class TxSend(TransactionalClientHandler):
-    def __init__(self, messages, batch_size):
+class TxSend(MessagingHandler, TransactionHandler):
+    def __init__(self, url, messages, batch_size):
         super(TxSend, self).__init__()
+        self.url = Url(url)
         self.current_batch = 0
         self.committed = 0
         self.confirmed = 0
@@ -33,8 +35,8 @@ class TxSend(TransactionalClientHandler):
 
     def on_start(self, event):
         self.container = event.container
-        self.conn = self.container.connect("localhost:5672", handler=self)
-        self.sender = self.container.create_sender(self.conn, "examples")
+        self.conn = self.container.connect(self.url)
+        self.sender = self.container.create_sender(self.conn, self.url.path)
         self.container.declare_transaction(self.conn, handler=self)
         self.transaction = None
 
@@ -46,7 +48,7 @@ class TxSend(TransactionalClientHandler):
         self.send()
 
     def send(self):
-        while self.transaction and self.sender.credit and self.committed < self.total:
+        while self.transaction and self.sender.credit and (self.committed + self.current_batch) < self.total:
             msg = Message(body={'sequence':(self.committed+self.current_batch+1)})
             self.sender.send_msg(msg, transaction=self.transaction)
             self.current_batch += 1
@@ -70,6 +72,16 @@ class TxSend(TransactionalClientHandler):
     def on_disconnected(self, event):
         self.current_batch = 0
 
+parser = optparse.OptionParser(usage="usage: %prog [options]",
+                               description="Send messages transactionally to the supplied address.")
+parser.add_option("-a", "--address", default="localhost:5672/examples",
+                  help="address to which messages are sent (default %default)")
+parser.add_option("-m", "--messages", type="int", default=100,
+                  help="number of messages to send (default %default)")
+parser.add_option("-b", "--batch-size", type="int", default=10,
+                  help="number of messages in each transaction (default %default)")
+opts, args = parser.parse_args()
+
 try:
-    Container(TxSend(10000, 10)).run()
+    Container(TxSend(opts.address, opts.messages, opts.batch_size)).run()
 except KeyboardInterrupt: pass

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a2101829/examples/engine/py/tx_send_sync.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/tx_send_sync.py b/examples/engine/py/tx_send_sync.py
deleted file mode 100755
index c051408..0000000
--- a/examples/engine/py/tx_send_sync.py
+++ /dev/null
@@ -1,76 +0,0 @@
-#!/usr/bin/env python
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-from proton import Message
-from proton.reactors import Container
-from proton.handlers import TransactionalClientHandler
-
-class TxSend(TransactionalClientHandler):
-    def __init__(self, messages, batch_size):
-        super(TxSend, self).__init__()
-        self.current_batch = 0
-        self.committed = 0
-        self.confirmed = 0
-        self.total = messages
-        self.batch_size = batch_size
-
-    def on_start(self, event):
-        self.container = event.container
-        self.conn = self.container.connect("localhost:5672", handler=self)
-        self.sender = self.container.create_sender(self.conn, "examples")
-        self.container.declare_transaction(self.conn, handler=self)
-        self.transaction = None
-
-    def on_transaction_declared(self, event):
-        self.transaction = event.transaction
-        self.send()
-
-    def on_credit(self, event):
-        self.send()
-
-    def send(self):
-        while self.transaction and self.current_batch < self.batch_size and self.sender.credit and self.committed < self.total:
-            msg = Message(body={'sequence':(self.committed+self.current_batch+1)})
-            self.sender.send_msg(msg, transaction=self.transaction)
-            self.current_batch += 1
-
-    def on_accepted(self, event):
-        if event.sender == self.sender:
-            self.confirmed += 1
-            if self.confirmed == self.batch_size:
-                self.transaction.commit()
-                self.transaction = None
-                self.confirmed = 0
-
-    def on_transaction_committed(self, event):
-        self.committed += self.current_batch
-        if self.committed == self.total:
-            print "all messages committed"
-            event.connection.close()
-        else:
-            self.current_batch = 0
-            self.container.declare_transaction(self.conn, handler=self)
-
-    def on_disconnected(self, event):
-        self.current_batch = 0
-
-try:
-    Container(TxSend(10000, 10)).run()
-except KeyboardInterrupt: pass

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a2101829/proton-c/bindings/python/proton/handlers.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/handlers.py b/proton-c/bindings/python/proton/handlers.py
index 0181d0e..5a29d14 100644
--- a/proton-c/bindings/python/proton/handlers.py
+++ b/proton-c/bindings/python/proton/handlers.py
@@ -384,7 +384,7 @@ class EndpointStateHandler(Handler):
 class MessagingHandler(Handler, Acking):
     """
     A general purpose handler that makes the proton-c events somewhat
-    simpler to deal with and.or avoids repetitive tasks for common use
+    simpler to deal with and/or avoids repetitive tasks for common use
     cases.
     """
     def __init__(self, prefetch=10, auto_accept=True, auto_settle=True, peer_close_is_error=False):
@@ -406,45 +406,37 @@ class MessagingHandler(Handler, Acking):
         EndpointStateHandler.print_error(event.link, "link")
         event.connection.close()
 
-class TransactionalAcking(object):
-    def accept(self, delivery, transaction):
-        transaction.accept(delivery)
-
-class TransactionHandler(OutgoingMessageHandler, TransactionalAcking):
-    def __init__(self, auto_settle=True, delegate=None):
-        super(TransactionHandler, self).__init__(auto_settle, delegate)
-
-    def on_settled(self, event):
-        if hasattr(event.delivery, "transaction"):
-            event.transaction = event.delivery.transaction
-            event.delivery.transaction.handle_outcome(event)
-
+class TransactionHandler(object):
+    """
+    The interface for transaction handlers, i.e. objects that want to
+    be notified of state changes related to a transaction.
+    """
     def on_transaction_declared(self, event):
-        if self.delegate:
-            dispatch(self.delegate, 'on_transaction_declared', event)
+        pass
 
     def on_transaction_committed(self, event):
-        if self.delegate:
-            dispatch(self.delegate, 'on_transaction_committed', event)
+        pass
 
     def on_transaction_aborted(self, event):
-        if self.delegate:
-            dispatch(self.delegate, 'on_transaction_aborted', event)
+        pass
 
     def on_transaction_declare_failed(self, event):
-        if self.delegate:
-            dispatch(self.delegate, 'on_transaction_declare_failed', event)
+        pass
 
     def on_transaction_commit_failed(self, event):
-        if self.delegate:
-            dispatch(self.delegate, 'on_transaction_commit_failed', event)
+        pass
+
+class TransactionalClientHandler(MessagingHandler, TransactionHandler):
+    """
+    An extension to the MessagingHandler for applications using
+    transactions.
+    """
 
-class TransactionalClientHandler(Handler, TransactionalAcking):
     def __init__(self, prefetch=10, auto_accept=False, auto_settle=True, peer_close_is_error=False):
-        super(TransactionalClientHandler, self).__init__()
-        self.handlers = []
-        if prefetch:
-            self.handlers.append(FlowController(prefetch))
-        self.handlers.append(EndpointStateHandler(peer_close_is_error, self))
-        self.handlers.append(IncomingMessageHandler(auto_accept, self))
-        self.handlers.append(TransactionHandler(auto_settle, self))
+        super(TransactionalClientHandler, self).__init__(prefetch, auto_accept, auto_settle, peer_close_is_error)
+
+    def accept(self, delivery, transaction=None):
+        if transaction:
+            transaction.accept(delivery)
+        else:
+            super(TransactionalClientHandler, self).accept(delivery)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a2101829/proton-c/bindings/python/proton/reactors.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/reactors.py b/proton-c/bindings/python/proton/reactors.py
index dcb7cae..0b5bd7e 100644
--- a/proton-c/bindings/python/proton/reactors.py
+++ b/proton-c/bindings/python/proton/reactors.py
@@ -23,7 +23,7 @@ from proton import Endpoint, Event, EventBase, EventType, generate_uuid, Handler
 from proton import ProtonException, PN_ACCEPTED, PN_PYREF, SASL, Session, symbol
 from proton import Terminus, Timeout, Transport, TransportException, ulong, Url
 from select import select
-from proton.handlers import nested_handlers, ScopedHandler
+from proton.handlers import nested_handlers, OutgoingMessageHandler, ScopedHandler
 
 class AmqpSocket(object):
     """
@@ -224,12 +224,13 @@ class EventInjector(object):
 
     def close(self):
         self._closed = True
+        os.write(self.pipe[1], "!")
 
     def fileno(self):
         return self.pipe[0]
 
     def reading(self):
-        return True
+        return not self.closed()
 
     def writing(self):
         return False
@@ -481,6 +482,15 @@ class Transaction(object):
         self.failed = False
         self._pending = []
         self.settle_before_discharge = settle_before_discharge
+        class InternalTransactionHandler(OutgoingMessageHandler):
+            def __init__(self):
+                super(InternalTransactionHandler, self).__init__(auto_settle=True)
+
+            def on_settled(self, event):
+                if hasattr(event.delivery, "transaction"):
+                    event.transaction = event.delivery.transaction
+                    event.delivery.transaction.handle_outcome(event)
+        self.internal_handler = InternalTransactionHandler()
         self.declare()
 
     def commit(self):
@@ -497,7 +507,7 @@ class Transaction(object):
         self._discharge = self._send_ctrl(symbol(u'amqp:discharge:list'), [self.id, failed])
 
     def _send_ctrl(self, descriptor, value):
-        delivery = self.txn_ctrl.send_msg(Message(body=Described(descriptor, value)), handler=self.handler)
+        delivery = self.txn_ctrl.send_msg(Message(body=Described(descriptor, value)), handler=self.internal_handler)
         delivery.transaction = self
         return delivery
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[4/4] qpid-proton git commit: initialise values to keep (some) compilers happy

Posted by gs...@apache.org.
initialise values to keep (some) compilers happy


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/8c35ce40
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/8c35ce40
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/8c35ce40

Branch: refs/heads/master
Commit: 8c35ce407c73608171ab995cd4a4023fbe8f93f9
Parents: a210182
Author: Gordon Sim <gs...@redhat.com>
Authored: Wed Jan 7 21:00:49 2015 +0000
Committer: Gordon Sim <gs...@redhat.com>
Committed: Wed Jan 7 21:00:49 2015 +0000

----------------------------------------------------------------------
 proton-c/src/tests/object.c | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8c35ce40/proton-c/src/tests/object.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/object.c b/proton-c/src/tests/object.c
index 2596bec..5a32f60 100644
--- a/proton-c/src/tests/object.c
+++ b/proton-c/src/tests/object.c
@@ -867,8 +867,8 @@ void test_heap(int seed, int size)
   srand(seed);
   pn_list_t *list = pn_list(PN_VOID, 0);
 
-  intptr_t min;
-  intptr_t max;
+  intptr_t min = 0;
+  intptr_t max = 0;
 
   for (int i = 0; i < size; i++) {
     intptr_t r = rand();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/4] qpid-proton git commit: PROTON-730: allow transactional state to be read on incoming transfers

Posted by gs...@apache.org.
PROTON-730: allow transactional state to be read on incoming transfers


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/1e96dae8
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/1e96dae8
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/1e96dae8

Branch: refs/heads/master
Commit: 1e96dae8039e560b9de09d3c97343d301667fb22
Parents: a439ee2
Author: Gordon Sim <gs...@apache.org>
Authored: Mon Oct 27 15:25:52 2014 +0000
Committer: Gordon Sim <gs...@redhat.com>
Committed: Wed Jan 7 20:23:57 2015 +0000

----------------------------------------------------------------------
 proton-c/src/transport/transport.c | 11 +++++++++--
 1 file changed, 9 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1e96dae8/proton-c/src/transport/transport.c
----------------------------------------------------------------------
diff --git a/proton-c/src/transport/transport.c b/proton-c/src/transport/transport.c
index a434b48..6220329 100644
--- a/proton-c/src/transport/transport.c
+++ b/proton-c/src/transport/transport.c
@@ -1181,8 +1181,11 @@ int pn_do_transfer(pn_transport_t *transport, uint8_t frame_type, uint16_t chann
   pn_sequence_t id;
   bool settled;
   bool more;
-  int err = pn_data_scan(args, "D.[I?Iz.oo]", &handle, &id_present, &id, &tag,
-                         &settled, &more);
+  bool has_type;
+  uint64_t type;
+  pn_data_clear(transport->disp_data);
+  int err = pn_data_scan(args, "D.[I?Iz.oo.D?LC]", &handle, &id_present, &id, &tag,
+                         &settled, &more, &has_type, &type, transport->disp_data);
   if (err) return err;
   pn_session_t *ssn = pn_channel_state(transport, channel);
 
@@ -1212,6 +1215,10 @@ int pn_do_transfer(pn_transport_t *transport, uint8_t frame_type, uint16_t chann
                          "sequencing error, expected delivery-id %u, got %u",
                          state->id, id);
     }
+    if (has_type) {
+      delivery->remote.type = type;
+      pn_data_copy(delivery->remote.data, transport->disp_data);
+    }
 
     link->state.delivery_count++;
     link->state.link_credit--;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org