You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/09/21 19:30:35 UTC
qpid-proton git commit: PROTON-1512: c examples use recommended
PN_DELIVERY pattern
Repository: qpid-proton
Updated Branches:
refs/heads/master 6743ab441 -> 5c9984994
PROTON-1512: c examples use recommended PN_DELIVERY pattern
Fixed the C examples to use the recommended pattern of always reading data
on a PN_DELIVERY event. Letting data accumulate in proton can cause a hang
if a message is larger than the session flow-control limit.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/5c998499
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/5c998499
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/5c998499
Branch: refs/heads/master
Commit: 5c99849941f6c553a1080995936696ff56e3e4a9
Parents: 6743ab4
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Sep 21 14:42:37 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Sep 21 14:42:37 2017 -0400
----------------------------------------------------------------------
examples/c/broker.c | 27 +++++++------
examples/c/receive.c | 76 ++++++++++++++++++------------------
proton-c/include/proton/link.h | 2 +
proton-c/include/proton/types.h | 2 +-
4 files changed, 58 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5c998499/examples/c/broker.c
----------------------------------------------------------------------
diff --git a/examples/c/broker.c b/examples/c/broker.c
index 866a4c9..9129c3f 100644
--- a/examples/c/broker.c
+++ b/examples/c/broker.c
@@ -192,6 +192,7 @@ typedef struct broker_t {
const char *container_id; /* AMQP container-id */
queues_t queues;
bool finished;
+ pn_rwbytes_t receiving; /* Partially received message data */
} broker_t;
void broker_stop(broker_t *b) {
@@ -245,13 +246,10 @@ static void session_unsub(broker_t *b, pn_session_t *ssn) {
}
}
-static int exit_code = 0;
-
static void check_condition(pn_event_t *e, pn_condition_t *cond) {
if (pn_condition_is_set(cond)) {
fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
pn_condition_get_name(cond), pn_condition_get_description(cond));
- exit_code = 1; /* Remeber there was an unexpected error */
}
}
@@ -319,15 +317,22 @@ static void handle(broker_t* b, pn_event_t* e) {
case PN_DELIVERY: {
pn_delivery_t *d = pn_event_delivery(e);
pn_link_t *r = pn_delivery_link(d);
- if (pn_link_is_receiver(r) &&
- pn_delivery_readable(d) && !pn_delivery_partial(d))
- {
- size_t size = pn_delivery_pending(d);
+ if (!pn_delivery_readable(d)) break;
+ for (size_t p = pn_delivery_pending(d); p > 0; p = pn_delivery_pending(d)) {
+ /* Append data to the reeving buffer */
+ b->receiving.size += p;
+ b->receiving.start = (char*)realloc(b->receiving.start, b->receiving.size);
+ int recv = pn_link_recv(r, b->receiving.start + b->receiving.size - p, p);
+ if (recv < 0 && recv != PN_EOS) {
+ fprintf(stderr, "PN_DELIVERY: pn_link_recv error %s\n", pn_code(recv));
+ break;
+ }
+ }
+ if (!pn_delivery_partial(d)) {
/* The broker does not decode the message, just forwards it. */
- pn_rwbytes_t m = { size, (char*)malloc(size) };
- pn_link_recv(r, m.start, m.size);
const char *qname = pn_terminus_get_address(pn_link_target(r));
- queue_receive(b->proactor, queues_get(&b->queues, qname), m);
+ queue_receive(b->proactor, queues_get(&b->queues, qname), b->receiving);
+ b->receiving = pn_rwbytes_null;
pn_delivery_update(d, PN_ACCEPTED);
pn_delivery_settle(d);
pn_link_flow(r, WINDOW - pn_link_credit(r));
@@ -419,5 +424,5 @@ int main(int argc, char **argv) {
}
pn_proactor_free(b.proactor);
free(threads);
- return exit_code;
+ return 0;
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5c998499/examples/c/receive.c
----------------------------------------------------------------------
diff --git a/examples/c/receive.c b/examples/c/receive.c
index a75de1d..1b3ac1a 100644
--- a/examples/c/receive.c
+++ b/examples/c/receive.c
@@ -40,6 +40,7 @@ typedef struct app_data_t {
pn_proactor_t *proactor;
int received;
bool finished;
+ pn_rwbytes_t receiving; /* Partially received message */
} app_data_t;
static const int BATCH = 1000; /* Batch size for unlimited receive */
@@ -55,31 +56,20 @@ static void check_condition(pn_event_t *e, pn_condition_t *cond) {
}
}
-#define MAX_SIZE 1024
-
-static void decode_message(pn_delivery_t *dlv) {
- static char buffer[MAX_SIZE];
- ssize_t len;
- // try to decode the message body
- if (pn_delivery_pending(dlv) < MAX_SIZE) {
- // read in the raw data
- len = pn_link_recv(pn_delivery_link(dlv), buffer, MAX_SIZE);
- if (len > 0) {
- // decode it into a proton message
- pn_message_t *m = pn_message();
- if (PN_OK == pn_message_decode(m, buffer, len)) {
- pn_string_t *s = pn_string(NULL);
- pn_inspect(pn_message_body(m), s);
- printf("%s\n", pn_string_get(s));
- pn_free(s);
- }
- pn_message_free(m);
- } else if (len < 0) {
- fprintf(stderr, "decode_message: %s\n", pn_code(len));
- exit_code = 1;
- } else {
- fprintf(stderr, "decode_message: no data\n");
- }
+static void decode_message(pn_rwbytes_t data) {
+ pn_message_t *m = pn_message();
+ int err = pn_message_decode(m, data.start, data.size);
+ if (!err) {
+ /* Print the decoded message */
+ pn_string_t *s = pn_string(NULL);
+ pn_inspect(pn_message_body(m), s);
+ printf("%s\n", pn_string_get(s));
+ pn_free(s);
+ pn_message_free(m);
+ free(data.start);
+ } else {
+ fprintf(stderr, "decode_message: %s\n", pn_code(err));
+ exit_code = 1;
}
}
@@ -102,28 +92,40 @@ static bool handle(app_data_t* app, pn_event_t* event) {
case PN_DELIVERY: {
/* A message has been received */
- pn_link_t *link = NULL;
- pn_delivery_t *dlv = pn_event_delivery(event);
- if (pn_delivery_readable(dlv) && !pn_delivery_partial(dlv)) {
- link = pn_delivery_link(dlv);
- decode_message(dlv);
+ pn_delivery_t *d = pn_event_delivery(event);
+ pn_link_t *r = pn_delivery_link(d);
+ if (!pn_delivery_readable(d)) break;
+ for (size_t p = pn_delivery_pending(d); p > 0; p = pn_delivery_pending(d)) {
+ /* Append data to the receving buffer */
+ app->receiving.size += p;
+ app->receiving.start = (char*)realloc(app->receiving.start, app->receiving.size);
+ int recv = pn_link_recv(r, app->receiving.start + app->receiving.size - p, p);
+ if (recv < 0 && recv != PN_EOS) {
+ fprintf(stderr, "PN_DELIVERY: pn_link_recv error %s\n", pn_code(recv));
+ exit_code = 1;
+ break;
+ }
+ }
+ if (!pn_delivery_partial(d)) {
+ decode_message(app->receiving);
+ app->receiving = pn_rwbytes_null;
/* Accept the delivery */
- pn_delivery_update(dlv, PN_ACCEPTED);
+ pn_delivery_update(d, PN_ACCEPTED);
/* done with the delivery, move to the next and free it */
- pn_link_advance(link);
- pn_delivery_settle(dlv); /* dlv is now freed */
+ pn_link_advance(r);
+ pn_delivery_settle(d); /* d is now freed */
if (app->message_count == 0) {
/* receive forever - see if more credit is needed */
- if (pn_link_credit(link) < BATCH/2) {
+ if (pn_link_credit(r) < BATCH/2) {
/* Grant enough credit to bring it up to BATCH: */
- pn_link_flow(link, BATCH - pn_link_credit(link));
+ pn_link_flow(r, BATCH - pn_link_credit(r));
}
} else if (++app->received >= app->message_count) {
/* done receiving, close the endpoints */
printf("%d messages received\n", app->received);
- pn_session_t *ssn = pn_link_session(link);
- pn_link_close(link);
+ pn_session_t *ssn = pn_link_session(r);
+ pn_link_close(r);
pn_session_close(ssn);
pn_connection_close(pn_session_connection(ssn));
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5c998499/proton-c/include/proton/link.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/link.h b/proton-c/include/proton/link.h
index bf5cddc..6a5a86c 100644
--- a/proton-c/include/proton/link.h
+++ b/proton-c/include/proton/link.h
@@ -632,6 +632,8 @@ PN_EXTERN void pn_link_set_drain(pn_link_t *receiver, bool drain);
* @param[in] n the buffer capacity
* @return The number of bytes received, or an error code:
* - ::PN_EOS: The message has been completely received
+ * - 0: No data available now.
+ * If pn_delivery_partial() there will be further ::PN_DELIVERY events with more data.
* - ::PN_STATE_ERR: The link has no current delivery
* - ::PN_ABORTED: See pn_delivery_aborted()
*/
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5c998499/proton-c/include/proton/types.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/types.h b/proton-c/include/proton/types.h
index 414b865..07b3618 100644
--- a/proton-c/include/proton/types.h
+++ b/proton-c/include/proton/types.h
@@ -242,7 +242,7 @@ typedef struct pn_rwbytes_t {
*/
PN_EXTERN pn_rwbytes_t pn_rwbytes(size_t size, char *start);
-static const pn_bytes_t pn_rwbytes_null = { 0, NULL };
+static const pn_rwbytes_t pn_rwbytes_null = { 0, NULL };
/**
* Holds the state flags for an AMQP endpoint.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org