You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2015/01/07 22:00:27 UTC
[1/4] qpid-proton git commit: PROTON-667: allow state field to be
sent along with transfer frame
Repository: qpid-proton
Updated Branches:
refs/heads/master a439ee2de -> 8c35ce407
PROTON-667: allow state field to be sent along with transfer frame
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/e052a016
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/e052a016
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/e052a016
Branch: refs/heads/master
Commit: e052a016adcbfd0d2b5137ac5e6be829478b2c6a
Parents: 1e96dae
Author: Gordon Sim <gs...@apache.org>
Authored: Wed Oct 22 19:12:23 2014 +0000
Committer: Gordon Sim <gs...@redhat.com>
Committed: Wed Jan 7 20:23:57 2015 +0000
----------------------------------------------------------------------
proton-c/src/transport/transport.c | 14 ++++++++++----
1 file changed, 10 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e052a016/proton-c/src/transport/transport.c
----------------------------------------------------------------------
diff --git a/proton-c/src/transport/transport.c b/proton-c/src/transport/transport.c
index 6220329..1a1e969 100644
--- a/proton-c/src/transport/transport.c
+++ b/proton-c/src/transport/transport.c
@@ -741,7 +741,9 @@ int pn_post_amqp_transfer_frame(pn_transport_t *transport, uint16_t ch,
uint32_t message_format,
bool settled,
bool more,
- pn_sequence_t frame_limit)
+ pn_sequence_t frame_limit,
+ uint64_t code,
+ pn_data_t* state)
{
bool more_flag = more;
int framecount = 0;
@@ -751,10 +753,10 @@ int pn_post_amqp_transfer_frame(pn_transport_t *transport, uint16_t ch,
compute_performatives:
pn_data_clear(transport->output_args);
- int err = pn_data_fill(transport->output_args, "DL[IIzIoo]", TRANSFER,
+ int err = pn_data_fill(transport->output_args, "DL[IIzIoon?DLC]", TRANSFER,
handle, id, tag->size, tag->start,
message_format,
- settled, more_flag);
+ settled, more_flag, (bool)code, code, state);
if (err) {
pn_transport_logf(transport,
"error posting transfer frame: %s: %s", pn_code(err),
@@ -1917,6 +1919,9 @@ int pn_process_tpwork_sender(pn_transport_t *transport, pn_delivery_t *delivery,
pn_bytes_t bytes = pn_buffer_bytes(delivery->bytes);
size_t full_size = bytes.size;
pn_bytes_t tag = pn_buffer_bytes(delivery->tag);
+ pn_data_clear(transport->disp_data);
+ pni_disposition_encode(&delivery->local, transport->disp_data);
+
int count = pn_post_amqp_transfer_frame(transport,
ssn_state->local_channel,
link_state->local_handle,
@@ -1924,7 +1929,8 @@ int pn_process_tpwork_sender(pn_transport_t *transport, pn_delivery_t *delivery,
0, // message-format
delivery->local.settled,
!delivery->done,
- ssn_state->remote_incoming_window);
+ ssn_state->remote_incoming_window,
+ delivery->local.type, transport->disp_data);
if (count < 0) return count;
xfr_posted = true;
ssn_state->outgoing_transfer_count += count;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[3/4] qpid-proton git commit: Added automatable test script for
examples and fixed transactional examples.
Posted by gs...@apache.org.
Added automatable test script for examples and fixed transactional examples.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/a2101829
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/a2101829
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/a2101829
Branch: refs/heads/master
Commit: a21018297035991b78114fd7e39a872bab29cb00
Parents: e052a01
Author: Gordon Sim <gs...@redhat.com>
Authored: Tue Jan 6 13:20:01 2015 +0000
Committer: Gordon Sim <gs...@redhat.com>
Committed: Wed Jan 7 20:24:19 2015 +0000
----------------------------------------------------------------------
examples/engine/py/db_common.py | 23 +++++-
examples/engine/py/db_recv.py | 35 ++++++--
examples/engine/py/db_send.py | 24 +++++-
examples/engine/py/simple_recv.py | 20 ++++-
examples/engine/py/simple_send.py | 11 ++-
examples/engine/py/test_examples.py | 100 +++++++++++++++++++++++
examples/engine/py/tx_recv.py | 38 ++++++---
examples/engine/py/tx_recv_interactive.py | 6 +-
examples/engine/py/tx_send.py | 28 +++++--
examples/engine/py/tx_send_sync.py | 76 -----------------
proton-c/bindings/python/proton/handlers.py | 56 ++++++-------
proton-c/bindings/python/proton/reactors.py | 16 +++-
12 files changed, 289 insertions(+), 144 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a2101829/examples/engine/py/db_common.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/db_common.py b/examples/engine/py/db_common.py
index 584c15a..05ad6f5 100644
--- a/examples/engine/py/db_common.py
+++ b/examples/engine/py/db_common.py
@@ -29,16 +29,23 @@ class Db(object):
self.tasks = Queue.Queue()
self.position = None
self.pending_events = []
+ self.running = True
self.thread = threading.Thread(target=self._process)
self.thread.daemon=True
self.thread.start()
+ def close(self):
+ self.tasks.put(lambda conn: self._close())
+
def reset(self):
self.tasks.put(lambda conn: self._reset())
def load(self, records, event=None):
self.tasks.put(lambda conn: self._load(conn, records, event))
+ def get_id(self, event):
+ self.tasks.put(lambda conn: self._get_id(conn, event))
+
def insert(self, id, data, event=None):
self.tasks.put(lambda conn: self._insert(conn, id, data, event))
@@ -48,6 +55,19 @@ class Db(object):
def _reset(self, ignored=None):
self.position = None
+ def _close(self, ignored=None):
+ self.running = False
+
+ def _get_id(self, conn, event):
+ cursor = conn.execute("SELECT * FROM records ORDER BY id DESC")
+ row = cursor.fetchone()
+ if event:
+ if row:
+ event.id = row['id']
+ else:
+ event.id = 0
+ self.events.trigger(event)
+
def _load(self, conn, records, event):
if self.position:
cursor = conn.execute("SELECT * FROM records WHERE id > ? ORDER BY id", (self.position,))
@@ -80,7 +100,7 @@ class Db(object):
conn = sqlite3.connect(self.db)
conn.row_factory = sqlite3.Row
with conn:
- while True:
+ while self.running:
f = self.tasks.get(True)
try:
while True:
@@ -91,3 +111,4 @@ class Db(object):
for event in self.pending_events:
self.events.trigger(event)
self.pending_events = []
+ self.events.close()
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a2101829/examples/engine/py/db_recv.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/db_recv.py b/examples/engine/py/db_recv.py
index 8b4490d..15170ee 100755
--- a/examples/engine/py/db_recv.py
+++ b/examples/engine/py/db_recv.py
@@ -18,36 +18,61 @@
# under the License.
#
+import optparse
from proton.handlers import MessagingHandler
from proton.reactors import ApplicationEvent, Container
from db_common import Db
class Recv(MessagingHandler):
- def __init__(self, url):
+ def __init__(self, url, count):
super(Recv, self).__init__(auto_accept=False)
self.url = url
self.delay = 0
# TODO: load last tag from db
self.last_id = None
+ self.expected = count
+ self.received = 0
+ self.accepted = 0
def on_start(self, event):
self.db = Db("dst_db", event.container.get_event_trigger())
+ e = ApplicationEvent("id_loaded")
+ e.container = event.container
+ self.db.get_id(e)
+
+ def on_id_loaded(self, event):
+ self.last_id = event.id
event.container.create_receiver(self.url)
def on_record_inserted(self, event):
self.accept(event.delivery)
+ self.accepted += 1
+ if self.accepted == self.expected:
+ event.connection.close()
+ self.db.close()
def on_message(self, event):
id = int(event.message.id)
if (not self.last_id) or id > self.last_id:
- self.last_id = id
- self.db.insert(id, event.message.body, ApplicationEvent("record_inserted", delivery=event.delivery))
- print "inserted message %s" % id
+ if self.received < self.expected:
+ self.received += 1
+ self.last_id = id
+ self.db.insert(id, event.message.body, ApplicationEvent("record_inserted", delivery=event.delivery))
+ print "inserted message %s" % id
+ else:
+ self.release(event.delivery)
else:
self.accept(event.delivery)
+parser = optparse.OptionParser(usage="usage: %prog [options]")
+parser.add_option("-a", "--address", default="localhost:5672/examples",
+ help="address from which messages are received (default %default)")
+parser.add_option("-m", "--messages", type="int", default=0,
+ help="number of messages to receive; 0 receives indefinitely (default %default)")
+opts, args = parser.parse_args()
+
try:
- Container(Recv("localhost:5672/examples")).run()
+ Container(Recv(opts.address, opts.messages)).run()
except KeyboardInterrupt: pass
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a2101829/examples/engine/py/db_send.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/db_send.py b/examples/engine/py/db_send.py
index ce3ce79..b298c11 100755
--- a/examples/engine/py/db_send.py
+++ b/examples/engine/py/db_send.py
@@ -18,6 +18,7 @@
# under the License.
#
+import optparse
import Queue
import time
from proton import Message
@@ -26,13 +27,18 @@ from proton.reactors import ApplicationEvent, Container
from db_common import Db
class Send(MessagingHandler):
- def __init__(self, url):
+ def __init__(self, url, count):
super(Send, self).__init__()
self.url = url
self.delay = 0
self.sent = 0
+ self.confirmed = 0
self.load_count = 0
self.records = Queue.Queue(maxsize=50)
+ self.target = count
+
+ def keep_sending(self):
+ return self.target == 0 or self.sent < self.target
def on_start(self, event):
self.container = event.container
@@ -59,6 +65,7 @@ class Send(MessagingHandler):
def send(self):
while self.sender.credit and not self.records.empty():
+ if not self.keep_sending(): return
record = self.records.get(False)
id = record['id']
self.sender.send_msg(Message(id=id, durable=True, body=record['description']), tag=str(id))
@@ -70,16 +77,29 @@ class Send(MessagingHandler):
id = int(event.delivery.tag)
self.db.delete(id)
print "settled message %s" % id
+ self.confirmed += 1
+ if self.confirmed == self.target:
+ event.connection.close()
+ self.db.close()
def on_disconnected(self, event):
self.db.reset()
+ self.sent = self.confirmed
def on_timer(self, event):
if event.subject == "data":
print "Rechecking for data..."
self.request_records()
+parser = optparse.OptionParser(usage="usage: %prog [options]",
+ description="Send messages to the supplied address.")
+parser.add_option("-a", "--address", default="localhost:5672/examples",
+ help="address to which messages are sent (default %default)")
+parser.add_option("-m", "--messages", type="int", default=0,
+ help="number of messages to send; 0 sends indefinitely (default %default)")
+opts, args = parser.parse_args()
+
try:
- Container(Send("localhost:5672/examples")).run()
+ Container(Send(opts.address, opts.messages)).run()
except KeyboardInterrupt: pass
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a2101829/examples/engine/py/simple_recv.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/simple_recv.py b/examples/engine/py/simple_recv.py
index 6825c86..04d6f34 100755
--- a/examples/engine/py/simple_recv.py
+++ b/examples/engine/py/simple_recv.py
@@ -18,22 +18,36 @@
# under the License.
#
+import optparse
from proton.handlers import MessagingHandler
from proton.reactors import Container
class Recv(MessagingHandler):
- def __init__(self, url):
+ def __init__(self, url, count):
super(Recv, self).__init__()
self.url = url
+ self.expected = count
+ self.received = 0
def on_start(self, event):
event.container.create_receiver(self.url)
def on_message(self, event):
- print event.message.body
+ if self.expected == 0 or self.received < self.expected:
+ print event.message.body
+ self.received += 1
+ if self.received == self.expected:
+ event.connection.close()
+
+parser = optparse.OptionParser(usage="usage: %prog [options]")
+parser.add_option("-a", "--address", default="localhost:5672/examples",
+ help="address from which messages are received (default %default)")
+parser.add_option("-m", "--messages", type="int", default=100,
+ help="number of messages to receive; 0 receives indefinitely (default %default)")
+opts, args = parser.parse_args()
try:
- Container(Recv("localhost:5672/examples")).run()
+ Container(Recv(opts.address, opts.messages)).run()
except KeyboardInterrupt: pass
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a2101829/examples/engine/py/simple_send.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/simple_send.py b/examples/engine/py/simple_send.py
index 21530ef..7f92163 100755
--- a/examples/engine/py/simple_send.py
+++ b/examples/engine/py/simple_send.py
@@ -18,6 +18,7 @@
# under the License.
#
+import optparse
from proton import Message
from proton.handlers import MessagingHandler
from proton.reactors import Container
@@ -48,6 +49,14 @@ class Send(MessagingHandler):
def on_disconnected(self, event):
self.sent = self.confirmed
+parser = optparse.OptionParser(usage="usage: %prog [options]",
+ description="Send messages to the supplied address.")
+parser.add_option("-a", "--address", default="localhost:5672/examples",
+ help="address to which messages are sent (default %default)")
+parser.add_option("-m", "--messages", type="int", default=100,
+ help="number of messages to send (default %default)")
+opts, args = parser.parse_args()
+
try:
- Container(Send("localhost:5672/examples", 10000)).run()
+ Container(Send(opts.address, opts.messages)).run()
except KeyboardInterrupt: pass
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a2101829/examples/engine/py/test_examples.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/test_examples.py b/examples/engine/py/test_examples.py
new file mode 100644
index 0000000..4ea09cf
--- /dev/null
+++ b/examples/engine/py/test_examples.py
@@ -0,0 +1,100 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import subprocess
+import unittest
+
+class ExamplesTest(unittest.TestCase):
+ def test_helloworld(self, example="helloworld.py"):
+ p = subprocess.Popen([example], stderr=subprocess.STDOUT, stdout=subprocess.PIPE)
+ p.wait()
+ output = [l.strip() for l in p.stdout]
+ self.assertEqual(output, ['Hello World!'])
+
+ def test_helloworld_direct(self):
+ self.test_helloworld('helloworld_direct.py')
+
+ def test_helloworld_blocking(self):
+ self.test_helloworld('helloworld_blocking.py')
+
+ def test_helloworld_tornado(self):
+ self.test_helloworld('helloworld_tornado.py')
+
+ def test_helloworld_direct_tornado(self):
+ self.test_helloworld('helloworld_direct_tornado.py')
+
+ def test_simple_send_recv(self, recv='simple_recv.py', send='simple_send.py'):
+ r = subprocess.Popen([recv], stderr=subprocess.STDOUT, stdout=subprocess.PIPE)
+ s = subprocess.Popen([send], stderr=subprocess.STDOUT, stdout=subprocess.PIPE)
+ s.wait()
+ r.wait()
+ actual = [l.strip() for l in r.stdout]
+ expected = ["{'sequence': %iL}" % (i+1) for i in range(100)]
+ self.assertEqual(actual, expected)
+
+ def test_client_server(self, client='client.py', server='server.py'):
+ s = subprocess.Popen(['server.py'], stderr=subprocess.STDOUT, stdout=subprocess.PIPE)
+ c = subprocess.Popen(['client.py'], stderr=subprocess.STDOUT, stdout=subprocess.PIPE)
+ c.wait()
+ actual = [l.strip() for l in c.stdout]
+ inputs = ["Twas brillig, and the slithy toves",
+ "Did gire and gymble in the wabe.",
+ "All mimsy were the borogroves,",
+ "And the mome raths outgrabe."]
+ expected = ["%s => %s" % (l, l.upper()) for l in inputs]
+ self.assertEqual(actual, expected)
+ s.terminate()
+
+ def test_sync_client_server(self):
+ self.test_client_server(client='sync_client.py')
+
+ def test_client_tx_server(self):
+ self.test_client_server(server='tx_server.py')
+
+ def test_sync_client_tx_server(self):
+ self.test_client_server(client='sync_client.py', server='tx_server.py')
+
+ def test_db_send_recv(self):
+ self.maxDiff = None
+ # setup databases
+ subprocess.check_call(['db_ctrl.py', 'init', './src_db'])
+ subprocess.check_call(['db_ctrl.py', 'init', './dst_db'])
+ fill = subprocess.Popen(['db_ctrl.py', 'insert', './src_db'], stdin=subprocess.PIPE, stderr=subprocess.STDOUT, stdout=subprocess.PIPE)
+ for i in range(100):
+ fill.stdin.write("Message-%i\n" % (i+1))
+ fill.stdin.close()
+ fill.wait()
+ # run send and recv
+ r = subprocess.Popen(['db_recv.py', '-m', '100'], stderr=subprocess.STDOUT, stdout=subprocess.PIPE)
+ s = subprocess.Popen(['db_send.py', '-m', '100'], stderr=subprocess.STDOUT, stdout=subprocess.PIPE)
+ s.wait()
+ r.wait()
+ # verify output of receive
+ actual = [l.strip() for l in r.stdout]
+ expected = ["inserted message %i" % (i+1) for i in range(100)]
+ self.assertEqual(actual, expected)
+ # verify state of databases
+ v = subprocess.Popen(['db_ctrl.py', 'list', './dst_db'], stderr=subprocess.STDOUT, stdout=subprocess.PIPE)
+ v.wait()
+ expected = ["(%i, u'Message-%i')" % ((i+1), (i+1)) for i in range(100)]
+ actual = [l.strip() for l in v.stdout]
+ self.assertEqual(actual, expected)
+
+ def test_tx_send_tx_recv(self):
+ self.test_simple_send_recv(recv='tx_recv.py', send='tx_send.py')
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a2101829/examples/engine/py/tx_recv.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/tx_recv.py b/examples/engine/py/tx_recv.py
index fc4bb8a..e034592 100755
--- a/examples/engine/py/tx_recv.py
+++ b/examples/engine/py/tx_recv.py
@@ -18,25 +18,30 @@
# under the License.
#
+import optparse
+from proton import Url
from proton.reactors import Container
-from proton.handlers import TransactionalClientHandler
+from proton.handlers import MessagingHandler, TransactionHandler
-class TxRecv(TransactionalClientHandler):
- def __init__(self, batch_size):
- super(TxRecv, self).__init__(prefetch=0)
- self.current_batch = 0
+class TxRecv(MessagingHandler, TransactionHandler):
+ def __init__(self, url, messages, batch_size):
+ super(TxRecv, self).__init__(prefetch=0, auto_accept=False)
+ self.url = Url(url)
+ self.expected = messages
self.batch_size = batch_size
+ self.current_batch = 0
+ self.committed = 0
def on_start(self, event):
self.container = event.container
- self.conn = self.container.connect("localhost:5672")
- self.receiver = self.container.create_receiver(self.conn, "examples")
+ self.conn = self.container.connect(self.url)
+ self.receiver = self.container.create_receiver(self.conn, self.url.path)
self.container.declare_transaction(self.conn, handler=self)
self.transaction = None
def on_message(self, event):
print event.message.body
- self.accept(event.delivery, self.transaction)
+ self.transaction.accept(event.delivery)
self.current_batch += 1
if self.current_batch == self.batch_size:
self.transaction.commit()
@@ -47,14 +52,27 @@ class TxRecv(TransactionalClientHandler):
self.transaction = event.transaction
def on_transaction_committed(self, event):
+ self.committed += self.current_batch
self.current_batch = 0
- self.container.declare_transaction(self.conn, handler=self)
+ if self.expected == 0 or self.committed < self.expected:
+ self.container.declare_transaction(self.conn, handler=self)
+ else:
+ event.connection.close()
def on_disconnected(self, event):
self.current_batch = 0
+parser = optparse.OptionParser(usage="usage: %prog [options]")
+parser.add_option("-a", "--address", default="localhost:5672/examples",
+ help="address from which messages are received (default %default)")
+parser.add_option("-m", "--messages", type="int", default=100,
+ help="number of messages to receive; 0 receives indefinitely (default %default)")
+parser.add_option("-b", "--batch-size", type="int", default=10,
+ help="number of messages in each transaction (default %default)")
+opts, args = parser.parse_args()
+
try:
- Container(TxRecv(10)).run()
+ Container(TxRecv(opts.address, opts.messages, opts.batch_size)).run()
except KeyboardInterrupt: pass
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a2101829/examples/engine/py/tx_recv_interactive.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/tx_recv_interactive.py b/examples/engine/py/tx_recv_interactive.py
index 6eb320e..aa85c4e 100755
--- a/examples/engine/py/tx_recv_interactive.py
+++ b/examples/engine/py/tx_recv_interactive.py
@@ -21,11 +21,11 @@
import sys
import threading
from proton.reactors import ApplicationEvent, Container
-from proton.handlers import TransactionalClientHandler
+from proton.handlers import MessagingHandler, TransactionHandler
-class TxRecv(TransactionalClientHandler):
+class TxRecv(MessagingHandler, TransactionHandler):
def __init__(self):
- super(TxRecv, self).__init__(prefetch=0)
+ super(TxRecv, self).__init__(prefetch=0, auto_accept=False)
def on_start(self, event):
self.container = event.container
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a2101829/examples/engine/py/tx_send.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/tx_send.py b/examples/engine/py/tx_send.py
index 5b11280..d234da3 100755
--- a/examples/engine/py/tx_send.py
+++ b/examples/engine/py/tx_send.py
@@ -18,13 +18,15 @@
# under the License.
#
-from proton import Message
+import optparse
+from proton import Message, Url
from proton.reactors import Container
-from proton.handlers import TransactionalClientHandler
+from proton.handlers import MessagingHandler, TransactionHandler
-class TxSend(TransactionalClientHandler):
- def __init__(self, messages, batch_size):
+class TxSend(MessagingHandler, TransactionHandler):
+ def __init__(self, url, messages, batch_size):
super(TxSend, self).__init__()
+ self.url = Url(url)
self.current_batch = 0
self.committed = 0
self.confirmed = 0
@@ -33,8 +35,8 @@ class TxSend(TransactionalClientHandler):
def on_start(self, event):
self.container = event.container
- self.conn = self.container.connect("localhost:5672", handler=self)
- self.sender = self.container.create_sender(self.conn, "examples")
+ self.conn = self.container.connect(self.url)
+ self.sender = self.container.create_sender(self.conn, self.url.path)
self.container.declare_transaction(self.conn, handler=self)
self.transaction = None
@@ -46,7 +48,7 @@ class TxSend(TransactionalClientHandler):
self.send()
def send(self):
- while self.transaction and self.sender.credit and self.committed < self.total:
+ while self.transaction and self.sender.credit and (self.committed + self.current_batch) < self.total:
msg = Message(body={'sequence':(self.committed+self.current_batch+1)})
self.sender.send_msg(msg, transaction=self.transaction)
self.current_batch += 1
@@ -70,6 +72,16 @@ class TxSend(TransactionalClientHandler):
def on_disconnected(self, event):
self.current_batch = 0
+parser = optparse.OptionParser(usage="usage: %prog [options]",
+ description="Send messages transactionally to the supplied address.")
+parser.add_option("-a", "--address", default="localhost:5672/examples",
+ help="address to which messages are sent (default %default)")
+parser.add_option("-m", "--messages", type="int", default=100,
+ help="number of messages to send (default %default)")
+parser.add_option("-b", "--batch-size", type="int", default=10,
+ help="number of messages in each transaction (default %default)")
+opts, args = parser.parse_args()
+
try:
- Container(TxSend(10000, 10)).run()
+ Container(TxSend(opts.address, opts.messages, opts.batch_size)).run()
except KeyboardInterrupt: pass
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a2101829/examples/engine/py/tx_send_sync.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/tx_send_sync.py b/examples/engine/py/tx_send_sync.py
deleted file mode 100755
index c051408..0000000
--- a/examples/engine/py/tx_send_sync.py
+++ /dev/null
@@ -1,76 +0,0 @@
-#!/usr/bin/env python
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-from proton import Message
-from proton.reactors import Container
-from proton.handlers import TransactionalClientHandler
-
-class TxSend(TransactionalClientHandler):
- def __init__(self, messages, batch_size):
- super(TxSend, self).__init__()
- self.current_batch = 0
- self.committed = 0
- self.confirmed = 0
- self.total = messages
- self.batch_size = batch_size
-
- def on_start(self, event):
- self.container = event.container
- self.conn = self.container.connect("localhost:5672", handler=self)
- self.sender = self.container.create_sender(self.conn, "examples")
- self.container.declare_transaction(self.conn, handler=self)
- self.transaction = None
-
- def on_transaction_declared(self, event):
- self.transaction = event.transaction
- self.send()
-
- def on_credit(self, event):
- self.send()
-
- def send(self):
- while self.transaction and self.current_batch < self.batch_size and self.sender.credit and self.committed < self.total:
- msg = Message(body={'sequence':(self.committed+self.current_batch+1)})
- self.sender.send_msg(msg, transaction=self.transaction)
- self.current_batch += 1
-
- def on_accepted(self, event):
- if event.sender == self.sender:
- self.confirmed += 1
- if self.confirmed == self.batch_size:
- self.transaction.commit()
- self.transaction = None
- self.confirmed = 0
-
- def on_transaction_committed(self, event):
- self.committed += self.current_batch
- if self.committed == self.total:
- print "all messages committed"
- event.connection.close()
- else:
- self.current_batch = 0
- self.container.declare_transaction(self.conn, handler=self)
-
- def on_disconnected(self, event):
- self.current_batch = 0
-
-try:
- Container(TxSend(10000, 10)).run()
-except KeyboardInterrupt: pass
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a2101829/proton-c/bindings/python/proton/handlers.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/handlers.py b/proton-c/bindings/python/proton/handlers.py
index 0181d0e..5a29d14 100644
--- a/proton-c/bindings/python/proton/handlers.py
+++ b/proton-c/bindings/python/proton/handlers.py
@@ -384,7 +384,7 @@ class EndpointStateHandler(Handler):
class MessagingHandler(Handler, Acking):
"""
A general purpose handler that makes the proton-c events somewhat
- simpler to deal with and.or avoids repetitive tasks for common use
+ simpler to deal with and/or avoids repetitive tasks for common use
cases.
"""
def __init__(self, prefetch=10, auto_accept=True, auto_settle=True, peer_close_is_error=False):
@@ -406,45 +406,37 @@ class MessagingHandler(Handler, Acking):
EndpointStateHandler.print_error(event.link, "link")
event.connection.close()
-class TransactionalAcking(object):
- def accept(self, delivery, transaction):
- transaction.accept(delivery)
-
-class TransactionHandler(OutgoingMessageHandler, TransactionalAcking):
- def __init__(self, auto_settle=True, delegate=None):
- super(TransactionHandler, self).__init__(auto_settle, delegate)
-
- def on_settled(self, event):
- if hasattr(event.delivery, "transaction"):
- event.transaction = event.delivery.transaction
- event.delivery.transaction.handle_outcome(event)
-
+class TransactionHandler(object):
+ """
+ The interface for transaction handlers, i.e. objects that want to
+ be notified of state changes related to a transaction.
+ """
def on_transaction_declared(self, event):
- if self.delegate:
- dispatch(self.delegate, 'on_transaction_declared', event)
+ pass
def on_transaction_committed(self, event):
- if self.delegate:
- dispatch(self.delegate, 'on_transaction_committed', event)
+ pass
def on_transaction_aborted(self, event):
- if self.delegate:
- dispatch(self.delegate, 'on_transaction_aborted', event)
+ pass
def on_transaction_declare_failed(self, event):
- if self.delegate:
- dispatch(self.delegate, 'on_transaction_declare_failed', event)
+ pass
def on_transaction_commit_failed(self, event):
- if self.delegate:
- dispatch(self.delegate, 'on_transaction_commit_failed', event)
+ pass
+
+class TransactionalClientHandler(MessagingHandler, TransactionHandler):
+ """
+ An extension to the MessagingHandler for applications using
+ transactions.
+ """
-class TransactionalClientHandler(Handler, TransactionalAcking):
def __init__(self, prefetch=10, auto_accept=False, auto_settle=True, peer_close_is_error=False):
- super(TransactionalClientHandler, self).__init__()
- self.handlers = []
- if prefetch:
- self.handlers.append(FlowController(prefetch))
- self.handlers.append(EndpointStateHandler(peer_close_is_error, self))
- self.handlers.append(IncomingMessageHandler(auto_accept, self))
- self.handlers.append(TransactionHandler(auto_settle, self))
+ super(TransactionalClientHandler, self).__init__(prefetch, auto_accept, auto_settle, peer_close_is_error)
+
+ def accept(self, delivery, transaction=None):
+ if transaction:
+ transaction.accept(delivery)
+ else:
+ super(TransactionalClientHandler, self).accept(delivery)
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a2101829/proton-c/bindings/python/proton/reactors.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/reactors.py b/proton-c/bindings/python/proton/reactors.py
index dcb7cae..0b5bd7e 100644
--- a/proton-c/bindings/python/proton/reactors.py
+++ b/proton-c/bindings/python/proton/reactors.py
@@ -23,7 +23,7 @@ from proton import Endpoint, Event, EventBase, EventType, generate_uuid, Handler
from proton import ProtonException, PN_ACCEPTED, PN_PYREF, SASL, Session, symbol
from proton import Terminus, Timeout, Transport, TransportException, ulong, Url
from select import select
-from proton.handlers import nested_handlers, ScopedHandler
+from proton.handlers import nested_handlers, OutgoingMessageHandler, ScopedHandler
class AmqpSocket(object):
"""
@@ -224,12 +224,13 @@ class EventInjector(object):
def close(self):
self._closed = True
+ os.write(self.pipe[1], "!")
def fileno(self):
return self.pipe[0]
def reading(self):
- return True
+ return not self.closed()
def writing(self):
return False
@@ -481,6 +482,15 @@ class Transaction(object):
self.failed = False
self._pending = []
self.settle_before_discharge = settle_before_discharge
+ class InternalTransactionHandler(OutgoingMessageHandler):
+ def __init__(self):
+ super(InternalTransactionHandler, self).__init__(auto_settle=True)
+
+ def on_settled(self, event):
+ if hasattr(event.delivery, "transaction"):
+ event.transaction = event.delivery.transaction
+ event.delivery.transaction.handle_outcome(event)
+ self.internal_handler = InternalTransactionHandler()
self.declare()
def commit(self):
@@ -497,7 +507,7 @@ class Transaction(object):
self._discharge = self._send_ctrl(symbol(u'amqp:discharge:list'), [self.id, failed])
def _send_ctrl(self, descriptor, value):
- delivery = self.txn_ctrl.send_msg(Message(body=Described(descriptor, value)), handler=self.handler)
+ delivery = self.txn_ctrl.send_msg(Message(body=Described(descriptor, value)), handler=self.internal_handler)
delivery.transaction = self
return delivery
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[4/4] qpid-proton git commit: initialise values to keep (some)
compilers happy
Posted by gs...@apache.org.
initialise values to keep (some) compilers happy
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/8c35ce40
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/8c35ce40
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/8c35ce40
Branch: refs/heads/master
Commit: 8c35ce407c73608171ab995cd4a4023fbe8f93f9
Parents: a210182
Author: Gordon Sim <gs...@redhat.com>
Authored: Wed Jan 7 21:00:49 2015 +0000
Committer: Gordon Sim <gs...@redhat.com>
Committed: Wed Jan 7 21:00:49 2015 +0000
----------------------------------------------------------------------
proton-c/src/tests/object.c | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8c35ce40/proton-c/src/tests/object.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/object.c b/proton-c/src/tests/object.c
index 2596bec..5a32f60 100644
--- a/proton-c/src/tests/object.c
+++ b/proton-c/src/tests/object.c
@@ -867,8 +867,8 @@ void test_heap(int seed, int size)
srand(seed);
pn_list_t *list = pn_list(PN_VOID, 0);
- intptr_t min;
- intptr_t max;
+ intptr_t min = 0;
+ intptr_t max = 0;
for (int i = 0; i < size; i++) {
intptr_t r = rand();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/4] qpid-proton git commit: PROTON-730: allow transactional state
to be read on incoming transfers
Posted by gs...@apache.org.
PROTON-730: allow transactional state to be read on incoming transfers
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/1e96dae8
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/1e96dae8
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/1e96dae8
Branch: refs/heads/master
Commit: 1e96dae8039e560b9de09d3c97343d301667fb22
Parents: a439ee2
Author: Gordon Sim <gs...@apache.org>
Authored: Mon Oct 27 15:25:52 2014 +0000
Committer: Gordon Sim <gs...@redhat.com>
Committed: Wed Jan 7 20:23:57 2015 +0000
----------------------------------------------------------------------
proton-c/src/transport/transport.c | 11 +++++++++--
1 file changed, 9 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1e96dae8/proton-c/src/transport/transport.c
----------------------------------------------------------------------
diff --git a/proton-c/src/transport/transport.c b/proton-c/src/transport/transport.c
index a434b48..6220329 100644
--- a/proton-c/src/transport/transport.c
+++ b/proton-c/src/transport/transport.c
@@ -1181,8 +1181,11 @@ int pn_do_transfer(pn_transport_t *transport, uint8_t frame_type, uint16_t chann
pn_sequence_t id;
bool settled;
bool more;
- int err = pn_data_scan(args, "D.[I?Iz.oo]", &handle, &id_present, &id, &tag,
- &settled, &more);
+ bool has_type;
+ uint64_t type;
+ pn_data_clear(transport->disp_data);
+ int err = pn_data_scan(args, "D.[I?Iz.oo.D?LC]", &handle, &id_present, &id, &tag,
+ &settled, &more, &has_type, &type, transport->disp_data);
if (err) return err;
pn_session_t *ssn = pn_channel_state(transport, channel);
@@ -1212,6 +1215,10 @@ int pn_do_transfer(pn_transport_t *transport, uint8_t frame_type, uint16_t chann
"sequencing error, expected delivery-id %u, got %u",
state->id, id);
}
+ if (has_type) {
+ delivery->remote.type = type;
+ pn_data_copy(delivery->remote.data, transport->disp_data);
+ }
link->state.delivery_count++;
link->state.link_credit--;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org