You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/09/27 18:12:05 UTC
[1/2] qpid-proton git commit: PROTON-1512: c examples: clean up
examples and fix some issues.
Repository: qpid-proton
Updated Branches:
refs/heads/master ea02b9337 -> 58dfb4aa4
PROTON-1512: c examples: clean up examples and fix some issues.
All examples that receive messages use a consistent code sequence and error
reporting, all deal with aborted messages consistently. All receivers accumulate
messages in local buffer not in proton link buffers.
Fixed issues
- broker.c use per-link buffer for thread safety
- direct.c - wasn't accumulating messages in separate buffer
Added abort tests to example_test.py, using send-abort.c and verifying broker.c
and direct.c correctly report and drop aborted messages and continue to function.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/58dfb4aa
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/58dfb4aa
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/58dfb4aa
Branch: refs/heads/master
Commit: 58dfb4aa4424c0116443533639a42b7ceda774e7
Parents: 5b1be87
Author: Alan Conway <ac...@redhat.com>
Authored: Wed Sep 27 14:01:48 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Wed Sep 27 14:10:59 2017 -0400
----------------------------------------------------------------------
examples/c/broker.c | 36 +++++-----
examples/c/direct.c | 156 +++++++++++++++++++++-------------------
examples/c/example_test.py | 59 +++++++++++----
examples/c/receive.c | 68 +++++++++---------
4 files changed, 175 insertions(+), 144 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/58dfb4aa/examples/c/broker.c
----------------------------------------------------------------------
diff --git a/examples/c/broker.c b/examples/c/broker.c
index c2e4c16..3a4d04a 100644
--- a/examples/c/broker.c
+++ b/examples/c/broker.c
@@ -327,28 +327,26 @@ static void handle(broker_t* b, pn_event_t* e) {
}
case PN_DELIVERY: { /* Incoming message data */
pn_delivery_t *d = pn_event_delivery(e);
- pn_link_t *l = pn_delivery_link(d);
- if (!pn_delivery_readable(d)) break;
- pn_rwbytes_t *m = message_buffer(l);
- for (size_t p = pn_delivery_pending(d); p > 0; p = pn_delivery_pending(d)) {
- /* Append data to the reeving buffer */
- m->size += p;
+ if (pn_delivery_readable(d)) {
+ pn_link_t *l = pn_delivery_link(d);
+ size_t size = pn_delivery_pending(d);
+ pn_rwbytes_t* m = message_buffer(l); /* Append data to incoming message buffer */
+ m->size += size;
m->start = (char*)realloc(m->start, m->size);
- int recv = pn_link_recv(l, m->start + m->size - p, p);
- if (recv < 0 && recv != PN_EOS) {
- fprintf(stderr, "PN_DELIVERY: pn_link_recv error %s\n", pn_code(recv));
- break;
+ int err = pn_link_recv(l, m->start, m->size);
+ if (err < 0 && err != PN_EOS) {
+ fprintf(stderr, "PN_DELIVERY error: %s\n", pn_code(err));
+ pn_delivery_settle(d); /* Free the delivery so we can receive the next message */
+ m->size = 0; /* forget the data we accumulated */
+ } else if (!pn_delivery_partial(d)) { /* Message is complete */
+ const char *qname = pn_terminus_get_address(pn_link_target(l));
+ queue_receive(b->proactor, queues_get(&b->queues, qname), *m);
+ *m = pn_rwbytes_null; /* Reset the buffer for the next message*/
+ pn_delivery_update(d, PN_ACCEPTED);
+ pn_delivery_settle(d);
+ pn_link_flow(l, WINDOW - pn_link_credit(l));
}
}
- if (!pn_delivery_partial(d)) {
- /* The broker does not decode the message, just forwards it. */
- const char *qname = pn_terminus_get_address(pn_link_target(l));
- queue_receive(b->proactor, queues_get(&b->queues, qname), *m);
- *m = pn_rwbytes_null;
- pn_delivery_update(d, PN_ACCEPTED);
- pn_delivery_settle(d);
- pn_link_flow(l, WINDOW - pn_link_credit(l));
- }
break;
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/58dfb4aa/examples/c/direct.c
----------------------------------------------------------------------
diff --git a/examples/c/direct.c b/examples/c/direct.c
index 15550e6..f55519e 100644
--- a/examples/c/direct.c
+++ b/examples/c/direct.c
@@ -41,7 +41,7 @@ typedef struct app_data_t {
pn_proactor_t *proactor;
pn_listener_t *listener;
- pn_rwbytes_t message_buffer;
+ pn_rwbytes_t msgin, msgout; /* Buffers for incoming/outgoing messages */
/* Sender values */
int sent;
@@ -56,11 +56,20 @@ static const int BATCH = 1000; /* Batch size for unlimited receive */
static int exit_code = 0;
-static void check_condition(pn_event_t *e, pn_condition_t *cond) {
+/* Close the connection and the listener so so we will get a
+ * PN_PROACTOR_INACTIVE event and exit, once all outstanding events
+ * are processed.
+ */
+static void close_all(pn_connection_t *c, app_data_t *app) {
+ if (c) pn_connection_close(c);
+ if (app->listener) pn_listener_close(app->listener);
+}
+
+static void check_condition(pn_event_t *e, pn_condition_t *cond, app_data_t *app) {
if (pn_condition_is_set(cond)) {
fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
pn_condition_get_name(cond), pn_condition_get_description(cond));
- pn_connection_close(pn_event_connection(e));
+ close_all(pn_event_connection(e), app);
exit_code = 1;
}
}
@@ -78,18 +87,18 @@ static pn_bytes_t encode_message(app_data_t* app) {
pn_data_exit(body);
/* encode the message, expanding the encode buffer as needed */
- if (app->message_buffer.start == NULL) {
+ if (app->msgout.start == NULL) {
static const size_t initial_size = 128;
- app->message_buffer = pn_rwbytes(initial_size, (char*)malloc(initial_size));
+ app->msgout = pn_rwbytes(initial_size, (char*)malloc(initial_size));
}
- /* app->message_buffer is the total buffer space available. */
+ /* app->msgout is the total buffer space available. */
/* mbuf wil point at just the portion used by the encoded message */
- pn_rwbytes_t mbuf = pn_rwbytes(app->message_buffer.size, app->message_buffer.start);
+ pn_rwbytes_t mbuf = pn_rwbytes(app->msgout.size, app->msgout.start);
int status = 0;
while ((status = pn_message_encode(message, mbuf.start, &mbuf.size)) == PN_OVERFLOW) {
- app->message_buffer.size *= 2;
- app->message_buffer.start = (char*)realloc(app->message_buffer.start, app->message_buffer.size);
- mbuf.size = app->message_buffer.size;
+ app->msgout.size *= 2;
+ app->msgout.start = (char*)realloc(app->msgout.start, app->msgout.size);
+ mbuf.size = app->msgout.size;
}
if (status != 0) {
fprintf(stderr, "error encoding message: %s\n", pn_error_text(pn_message_error(message)));
@@ -99,26 +108,21 @@ static pn_bytes_t encode_message(app_data_t* app) {
return pn_bytes(mbuf.size, mbuf.start);
}
-#define MAX_SIZE 1024
-
-static void decode_message(pn_delivery_t *dlv) {
- static char buffer[MAX_SIZE];
- ssize_t len;
- // try to decode the message body
- if (pn_delivery_pending(dlv) < MAX_SIZE) {
- // read in the raw data
- len = pn_link_recv(pn_delivery_link(dlv), buffer, MAX_SIZE);
- if (len > 0) {
- // decode it into a proton message
- pn_message_t *m = pn_message();
- if (PN_OK == pn_message_decode(m, buffer, len)) {
- pn_string_t *s = pn_string(NULL);
- pn_inspect(pn_message_body(m), s);
- printf("%s\n", pn_string_get(s));
- pn_free(s);
- }
- pn_message_free(m);
- }
+static void decode_message(pn_rwbytes_t data) {
+ pn_message_t *m = pn_message();
+ int err = pn_message_decode(m, data.start, data.size);
+ if (!err) {
+ /* Print the decoded message */
+ pn_string_t *s = pn_string(NULL);
+ pn_inspect(pn_message_body(m), s);
+ printf("%s\n", pn_string_get(s));
+ fflush(stdout);
+ pn_free(s);
+ pn_message_free(m);
+ free(data.start);
+ } else {
+ fprintf(stderr, "decode_message: %s\n", pn_code(err));
+ exit_code = 1;
}
}
@@ -132,36 +136,37 @@ static void handle_receive(app_data_t* app, pn_event_t* event) {
pn_link_flow(l, app->message_count ? app->message_count : BATCH);
} break;
- case PN_DELIVERY: {
- /* A message has been received */
- pn_link_t *link = NULL;
- pn_delivery_t *dlv = pn_event_delivery(event);
- if (pn_delivery_readable(dlv) && !pn_delivery_partial(dlv)) {
- link = pn_delivery_link(dlv);
- decode_message(dlv);
- /* Accept the delivery */
- pn_delivery_update(dlv, PN_ACCEPTED);
- /* done with the delivery, move to the next and free it */
- pn_link_advance(link);
- pn_delivery_settle(dlv); /* dlv is now freed */
-
- if (app->message_count == 0) {
- /* receive forever - see if more credit is needed */
- if (pn_link_credit(link) < BATCH/2) {
- /* Grant enough credit to bring it up to BATCH: */
- pn_link_flow(link, BATCH - pn_link_credit(link));
+ case PN_DELIVERY: { /* Incoming message data */
+ pn_delivery_t *d = pn_event_delivery(event);
+ if (pn_delivery_readable(d)) {
+ pn_link_t *l = pn_delivery_link(d);
+ size_t size = pn_delivery_pending(d);
+ pn_rwbytes_t* m = &app->msgin; /* Append data to incoming message buffer */
+ m->size += size;
+ m->start = (char*)realloc(m->start, m->size);
+ int err = pn_link_recv(l, m->start, m->size);
+ if (err < 0 && err != PN_EOS) {
+ fprintf(stderr, "PN_DELIVERY error: %s\n", pn_code(err));
+ pn_delivery_settle(d); /* Free the delivery so we can receive the next message */
+ m->size = 0; /* forget the data we accumulated */
+ } else if (!pn_delivery_partial(d)) { /* Message is complete */
+ decode_message(*m);
+ *m = pn_rwbytes_null;
+ pn_delivery_update(d, PN_ACCEPTED);
+ pn_delivery_settle(d); /* settle and free d */
+ if (app->message_count == 0) {
+ /* receive forever - see if more credit is needed */
+ if (pn_link_credit(l) < BATCH/2) {
+ pn_link_flow(l, BATCH - pn_link_credit(l));
+ }
+ } else if (++app->received >= app->message_count) {
+ printf("%d messages received\n", app->received);
+ close_all(pn_event_connection(event), app);
}
- } else if (++app->received >= app->message_count) {
- /* done receiving, close the endpoints */
- printf("%d messages received\n", app->received);
- pn_session_t *ssn = pn_link_session(link);
- pn_link_close(link);
- pn_session_close(ssn);
- pn_connection_close(pn_session_connection(ssn));
}
}
- } break;
-
+ break;
+ }
default:
break;
}
@@ -197,8 +202,7 @@ static void handle_send(app_data_t* app, pn_event_t* event) {
if (pn_delivery_remote_state(d) == PN_ACCEPTED) {
if (++app->acknowledged == app->message_count) {
printf("%d messages sent and acknowledged\n", app->acknowledged);
- pn_connection_close(pn_event_connection(event));
- /* Continue handling events till we receive TRANSPORT_CLOSED */
+ close_all(pn_event_connection(event), app);
}
}
} break;
@@ -244,24 +248,25 @@ static bool handle(app_data_t* app, pn_event_t* event) {
}
case PN_TRANSPORT_CLOSED:
- check_condition(event, pn_transport_condition(pn_event_transport(event)));
- pn_listener_close(app->listener); /* Finished */
+ check_condition(event, pn_transport_condition(pn_event_transport(event)), app);
break;
case PN_CONNECTION_REMOTE_CLOSE:
- check_condition(event, pn_connection_remote_condition(pn_event_connection(event)));
- pn_connection_close(pn_event_connection(event));
+ check_condition(event, pn_connection_remote_condition(pn_event_connection(event)), app);
+ pn_connection_close(pn_event_connection(event)); /* Return the close */
break;
case PN_SESSION_REMOTE_CLOSE:
- check_condition(event, pn_session_remote_condition(pn_event_session(event)));
- pn_connection_close(pn_event_connection(event));
+ check_condition(event, pn_session_remote_condition(pn_event_session(event)), app);
+ pn_session_close(pn_event_session(event)); /* Return the close */
+ pn_session_free(pn_event_session(event));
break;
case PN_LINK_REMOTE_CLOSE:
case PN_LINK_REMOTE_DETACH:
- check_condition(event, pn_link_remote_condition(pn_event_link(event)));
- pn_connection_close(pn_event_connection(event));
+ check_condition(event, pn_link_remote_condition(pn_event_link(event)), app);
+ pn_link_close(pn_event_link(event)); /* Return the close */
+ pn_link_free(pn_event_link(event));
break;
case PN_PROACTOR_TIMEOUT:
@@ -270,7 +275,8 @@ static bool handle(app_data_t* app, pn_event_t* event) {
break;
case PN_LISTENER_CLOSE:
- check_condition(event, pn_listener_condition(pn_event_listener(event)));
+ app->listener = NULL; /* Listener is closed */
+ check_condition(event, pn_listener_condition(pn_event_listener(event)), app);
break;
case PN_PROACTOR_INACTIVE:
@@ -306,12 +312,11 @@ void run(app_data_t *app) {
int main(int argc, char **argv) {
struct app_data_t app = {0};
- int i = 0;
- app.container_id = argv[i++]; /* Should be unique */
- app.host = (argc > 1) ? argv[i++] : "";
- app.port = (argc > 1) ? argv[i++] : "amqp";
- app.amqp_address = (argc > i) ? argv[i++] : "examples";
- app.message_count = (argc > i) ? atoi(argv[i++]) : 10;
+ app.container_id = argv[0]; /* Should be unique */
+ app.host = (argc > 1) ? argv[1] : "";
+ app.port = (argc > 2) ? argv[2] : "amqp";
+ app.amqp_address = (argc > 3) ? argv[3] : "examples";
+ app.message_count = (argc > 4) ? atoi(argv[4]) : 10;
/* Create the proactor and connect */
app.proactor = pn_proactor();
@@ -321,6 +326,7 @@ int main(int argc, char **argv) {
pn_proactor_listen(app.proactor, app.listener, addr, 16);
run(&app);
pn_proactor_free(app.proactor);
- free(app.message_buffer.start);
+ free(app.msgout.start);
+ free(app.msgin.start);
return exit_code;
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/58dfb4aa/examples/c/example_test.py
----------------------------------------------------------------------
diff --git a/examples/c/example_test.py b/examples/c/example_test.py
index 02bb1fd..ee3e4e4 100644
--- a/examples/c/example_test.py
+++ b/examples/c/example_test.py
@@ -26,8 +26,13 @@ def python_cmd(name):
dir = os.path.dirname(__file__)
return [sys.executable, os.path.join(dir, "..", "..", "python", name)]
-def receive_expect(n):
- return ''.join('{"sequence"=%s}\n'%i for i in xrange(1, n+1)) + "%s messages received\n"%n
+def receive_expect_messages(n=10): return ''.join(['{"sequence"=%s}\n'%i for i in xrange(1, n+1)])
+def receive_expect_total(n=10): return "%s messages received\n"%n
+def receive_expect(n=10): return receive_expect_messages(n)+receive_expect_total(n)
+
+def send_expect(n=10): return "%s messages sent and acknowledged\n" % n
+def send_abort_expect(n=10): return "%s messages started and aborted\n" % n
+
class Broker(object):
def __init__(self, test):
@@ -51,38 +56,64 @@ class Broker(object):
class CExampleTest(ProcTestCase):
+ def runex(self, name, port, *args):
+ """Run an example with standard arugments, return output"""
+ return self.proc([name, "", port, "examples"] + list(args)).wait_exit()
+
def test_send_receive(self):
"""Send first then receive"""
with Broker(self) as b:
- s = self.proc(["send", "", b.port])
- self.assertEqual("10 messages sent and acknowledged\n", s.wait_exit())
- r = self.proc(["receive", "", b.port])
- self.assertEqual(receive_expect(10), r.wait_exit())
+ self.assertEqual(send_expect(), self.runex("send", b.port))
+ self.assertMultiLineEqual(receive_expect(), self.runex("receive", b.port))
def test_receive_send(self):
"""Start receiving first, then send."""
with Broker(self) as b:
- r = self.proc(["receive", "", b.port]);
- s = self.proc(["send", "", b.port]);
- self.assertEqual("10 messages sent and acknowledged\n", s.wait_exit())
- self.assertEqual(receive_expect(10), r.wait_exit())
+ self.assertEqual(send_expect(), self.runex("send", b.port))
+ self.assertMultiLineEqual(receive_expect(), self.runex("receive", b.port))
def test_send_direct(self):
"""Send to direct server"""
with TestPort() as tp:
d = self.proc(["direct", "", tp.port])
d.wait_re("listening")
- self.assertEqual("10 messages sent and acknowledged\n", self.proc(["send", "", tp.port]).wait_exit())
- self.assertIn(receive_expect(10), d.wait_exit())
+ self.assertEqual(send_expect(), self.runex("send", tp.port))
+ self.assertMultiLineEqual("listening\n"+receive_expect(), d.wait_exit())
def test_receive_direct(self):
"""Receive from direct server"""
with TestPort() as tp:
d = self.proc(["direct", "", tp.port])
d.wait_re("listening")
- self.assertEqual(receive_expect(10), self.proc(["receive", "", tp.port]).wait_exit())
- self.assertIn("10 messages sent and acknowledged\n", d.wait_exit())
+ self.assertMultiLineEqual(receive_expect(), self.runex("receive", tp.port))
+ self.assertEqual("listening\n10 messages sent and acknowledged\n", d.wait_exit())
+ def test_send_abort_broker(self):
+ """Sending aborted messages to a broker"""
+ with Broker(self) as b:
+ self.assertEqual(send_expect(), self.runex("send", b.port))
+ self.assertEqual(send_abort_expect(), self.runex("send-abort", b.port))
+ b.proc.wait_re("PN_DELIVERY error: PN_ABORTED\n"*10)
+ self.assertEqual(send_expect(), self.runex("send", b.port))
+ expect = receive_expect_messages(10)+receive_expect_messages(10)+receive_expect_total(20)
+ self.assertMultiLineEqual(expect, self.runex("receive", b.port, "20"))
+
+ def test_send_abort_direct(self):
+ """Send aborted messages to the direct server"""
+ with TestPort() as tp:
+ d = self.proc(["direct", "", tp.port, "examples", "20"])
+ expect = "listening\n"
+ d.wait_re(expect)
+ self.assertEqual(send_expect(), self.runex("send", tp.port))
+ expect += receive_expect_messages()
+ d.wait_re(expect)
+ self.assertEqual(send_abort_expect(), self.runex("send-abort", tp.port))
+ expect += "PN_DELIVERY error: PN_ABORTED\n"*10
+ d.wait_re(expect)
+ self.assertEqual(send_expect(), self.runex("send", tp.port))
+ expect += receive_expect_messages()+receive_expect_total(20)
+ self.maxDiff = None
+ self.assertMultiLineEqual(expect, d.wait_exit())
if __name__ == "__main__":
unittest.main()
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/58dfb4aa/examples/c/receive.c
----------------------------------------------------------------------
diff --git a/examples/c/receive.c b/examples/c/receive.c
index 1b3ac1a..7df955a 100644
--- a/examples/c/receive.c
+++ b/examples/c/receive.c
@@ -40,7 +40,7 @@ typedef struct app_data_t {
pn_proactor_t *proactor;
int received;
bool finished;
- pn_rwbytes_t receiving; /* Partially received message */
+ pn_rwbytes_t msgin; /* Partially received message */
} app_data_t;
static const int BATCH = 1000; /* Batch size for unlimited receive */
@@ -93,44 +93,40 @@ static bool handle(app_data_t* app, pn_event_t* event) {
case PN_DELIVERY: {
/* A message has been received */
pn_delivery_t *d = pn_event_delivery(event);
- pn_link_t *r = pn_delivery_link(d);
- if (!pn_delivery_readable(d)) break;
- for (size_t p = pn_delivery_pending(d); p > 0; p = pn_delivery_pending(d)) {
- /* Append data to the receving buffer */
- app->receiving.size += p;
- app->receiving.start = (char*)realloc(app->receiving.start, app->receiving.size);
- int recv = pn_link_recv(r, app->receiving.start + app->receiving.size - p, p);
- if (recv < 0 && recv != PN_EOS) {
- fprintf(stderr, "PN_DELIVERY: pn_link_recv error %s\n", pn_code(recv));
- exit_code = 1;
- break;
- }
- }
- if (!pn_delivery_partial(d)) {
- decode_message(app->receiving);
- app->receiving = pn_rwbytes_null;
- /* Accept the delivery */
- pn_delivery_update(d, PN_ACCEPTED);
- /* done with the delivery, move to the next and free it */
- pn_link_advance(r);
- pn_delivery_settle(d); /* d is now freed */
-
- if (app->message_count == 0) {
- /* receive forever - see if more credit is needed */
- if (pn_link_credit(r) < BATCH/2) {
- /* Grant enough credit to bring it up to BATCH: */
- pn_link_flow(r, BATCH - pn_link_credit(r));
+ if (pn_delivery_readable(d)) {
+ pn_link_t *l = pn_delivery_link(d);
+ size_t size = pn_delivery_pending(d);
+ pn_rwbytes_t* m = &app->msgin; /* Append data to incoming message buffer */
+ m->size += size;
+ m->start = (char*)realloc(m->start, m->size);
+ int err = pn_link_recv(l, m->start, m->size);
+ if (err < 0 && err != PN_EOS) {
+ fprintf(stderr, "PN_DELIVERY error: %s\n", pn_code(err));
+ pn_delivery_settle(d); /* Free the delivery so we can receive the next message */
+ m->size = 0; /* forget the data we accumulated */
+ } else if (!pn_delivery_partial(d)) { /* Message is complete */
+ decode_message(*m);
+ *m = pn_rwbytes_null; /* Reset the buffer for the next message*/
+ /* Accept the delivery */
+ pn_delivery_update(d, PN_ACCEPTED);
+ pn_delivery_settle(d); /* settle and free d */
+ if (app->message_count == 0) {
+ /* receive forever - see if more credit is needed */
+ if (pn_link_credit(l) < BATCH/2) {
+ /* Grant enough credit to bring it up to BATCH: */
+ pn_link_flow(l, BATCH - pn_link_credit(l));
+ }
+ } else if (++app->received >= app->message_count) {
+ printf("%d messages received\n", app->received);
+ pn_session_t *ssn = pn_link_session(l);
+ pn_link_close(l);
+ pn_session_close(ssn);
+ pn_connection_close(pn_session_connection(ssn));
}
- } else if (++app->received >= app->message_count) {
- /* done receiving, close the endpoints */
- printf("%d messages received\n", app->received);
- pn_session_t *ssn = pn_link_session(r);
- pn_link_close(r);
- pn_session_close(ssn);
- pn_connection_close(pn_session_connection(ssn));
}
}
- } break;
+ break;
+ }
case PN_TRANSPORT_CLOSED:
check_condition(event, pn_transport_condition(pn_event_transport(event)));
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/2] qpid-proton git commit: PROTON-1512: c examples: fix thread
safety issue in broker
Posted by ac...@apache.org.
PROTON-1512: c examples: fix thread safety issue in broker
Replace global receiving buffer with buffer-per-link to accumulate messages correctly.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/5b1be877
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/5b1be877
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/5b1be877
Branch: refs/heads/master
Commit: 5b1be8771faeafa7401d5f96705d38d266c61206
Parents: ea02b93
Author: Alan Conway <ac...@redhat.com>
Authored: Wed Sep 27 10:17:28 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Wed Sep 27 14:10:59 2017 -0400
----------------------------------------------------------------------
examples/c/broker.c | 54 +++++++++++++++++++++++++++++-------------------
1 file changed, 33 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5b1be877/examples/c/broker.c
----------------------------------------------------------------------
diff --git a/examples/c/broker.c b/examples/c/broker.c
index 9129c3f..c2e4c16 100644
--- a/examples/c/broker.c
+++ b/examples/c/broker.c
@@ -119,20 +119,24 @@ static void queue_send(queue_t *q, pn_link_t *s) {
}
}
-/* Data associated with each broker connection */
-typedef struct broker_data_t {
- bool check_queues; /* Check senders on the connection for available data in queues. */
-} broker_data_t;
-
-/* Use the context pointer as a boolean flag to indicate we need to check queues */
-void pn_connection_set_check_queues(pn_connection_t *c, bool check) {
+/* Use the connection context pointer as a boolean flag to indicate we need to check queues */
+void set_check_queues(pn_connection_t *c, bool check) {
pn_connection_set_context(c, (void*)check);
}
-bool pn_connection_get_check_queues(pn_connection_t *c) {
+bool get_check_queues(pn_connection_t *c) {
return (bool)pn_connection_get_context(c);
}
+/* Use a buffer per link to accumulate message data - message can arrive in multiple deliveries,
+ and the broker can receive messages on many concurrently. */
+pn_rwbytes_t *message_buffer(pn_link_t *l) {
+ if (!pn_link_get_context(l)) {
+ pn_link_set_context(l, calloc(1, sizeof(pn_rwbytes_t)));
+ }
+ return (pn_rwbytes_t*)pn_link_get_context(l);
+}
+
/* Put a message on the queue, called in receiver dispatch loop.
If the queue was previously empty, notify waiting senders.
*/
@@ -142,7 +146,7 @@ static void queue_receive(pn_proactor_t *d, queue_t *q, pn_rwbytes_t m) {
if (q->messages.len == 1) { /* Was empty, notify waiting connections */
for (size_t i = 0; i < q->waiting.len; ++i) {
pn_connection_t *c = q->waiting.data[i];
- pn_connection_set_check_queues(c, true);
+ set_check_queues(c, true);
pn_connection_wake(c); /* Wake the connection */
}
q->waiting.len = 0;
@@ -192,7 +196,6 @@ typedef struct broker_t {
const char *container_id; /* AMQP container-id */
queues_t queues;
bool finished;
- pn_rwbytes_t receiving; /* Partially received message data */
} broker_t;
void broker_stop(broker_t *b) {
@@ -285,8 +288,8 @@ static void handle(broker_t* b, pn_event_t* e) {
break;
}
case PN_CONNECTION_WAKE: {
- if (pn_connection_get_check_queues(c)) {
- pn_connection_set_check_queues(c, false);
+ if (get_check_queues(c)) {
+ set_check_queues(c, false);
int flags = PN_LOCAL_ACTIVE&PN_REMOTE_ACTIVE;
for (pn_link_t *l = pn_link_head(c, flags); l != NULL; l = pn_link_next(l, flags))
link_send(b, l);
@@ -314,15 +317,24 @@ static void handle(broker_t* b, pn_event_t* e) {
link_send(b, pn_event_link(e));
break;
}
- case PN_DELIVERY: {
+ case PN_LINK_FINAL: {
+ pn_rwbytes_t *buf = (pn_rwbytes_t*)pn_link_get_context(pn_event_link(e));
+ if (buf) {
+ free(buf->start);
+ free(buf);
+ }
+ break;
+ }
+ case PN_DELIVERY: { /* Incoming message data */
pn_delivery_t *d = pn_event_delivery(e);
- pn_link_t *r = pn_delivery_link(d);
+ pn_link_t *l = pn_delivery_link(d);
if (!pn_delivery_readable(d)) break;
+ pn_rwbytes_t *m = message_buffer(l);
for (size_t p = pn_delivery_pending(d); p > 0; p = pn_delivery_pending(d)) {
/* Append data to the reeving buffer */
- b->receiving.size += p;
- b->receiving.start = (char*)realloc(b->receiving.start, b->receiving.size);
- int recv = pn_link_recv(r, b->receiving.start + b->receiving.size - p, p);
+ m->size += p;
+ m->start = (char*)realloc(m->start, m->size);
+ int recv = pn_link_recv(l, m->start + m->size - p, p);
if (recv < 0 && recv != PN_EOS) {
fprintf(stderr, "PN_DELIVERY: pn_link_recv error %s\n", pn_code(recv));
break;
@@ -330,12 +342,12 @@ static void handle(broker_t* b, pn_event_t* e) {
}
if (!pn_delivery_partial(d)) {
/* The broker does not decode the message, just forwards it. */
- const char *qname = pn_terminus_get_address(pn_link_target(r));
- queue_receive(b->proactor, queues_get(&b->queues, qname), b->receiving);
- b->receiving = pn_rwbytes_null;
+ const char *qname = pn_terminus_get_address(pn_link_target(l));
+ queue_receive(b->proactor, queues_get(&b->queues, qname), *m);
+ *m = pn_rwbytes_null;
pn_delivery_update(d, PN_ACCEPTED);
pn_delivery_settle(d);
- pn_link_flow(r, WINDOW - pn_link_credit(r));
+ pn_link_flow(l, WINDOW - pn_link_credit(l));
}
break;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org