You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/09/21 19:30:35 UTC

qpid-proton git commit: PROTON-1512: c examples use recommended PN_DELIVERY pattern

Repository: qpid-proton
Updated Branches:
  refs/heads/master 6743ab441 -> 5c9984994


PROTON-1512: c examples use recommended PN_DELIVERY pattern

Fixed the C examples to use the recommended pattern of always reading data
on a PN_DELIVERY event. Letting data accumulate in proton can cause a hang
if a message is larger than the session flow-control limit.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/5c998499
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/5c998499
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/5c998499

Branch: refs/heads/master
Commit: 5c99849941f6c553a1080995936696ff56e3e4a9
Parents: 6743ab4
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Sep 21 14:42:37 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Sep 21 14:42:37 2017 -0400

----------------------------------------------------------------------
 examples/c/broker.c             | 27 +++++++------
 examples/c/receive.c            | 76 ++++++++++++++++++------------------
 proton-c/include/proton/link.h  |  2 +
 proton-c/include/proton/types.h |  2 +-
 4 files changed, 58 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5c998499/examples/c/broker.c
----------------------------------------------------------------------
diff --git a/examples/c/broker.c b/examples/c/broker.c
index 866a4c9..9129c3f 100644
--- a/examples/c/broker.c
+++ b/examples/c/broker.c
@@ -192,6 +192,7 @@ typedef struct broker_t {
   const char *container_id;     /* AMQP container-id */
   queues_t queues;
   bool finished;
+  pn_rwbytes_t receiving;       /* Partially received message data */
 } broker_t;
 
 void broker_stop(broker_t *b) {
@@ -245,13 +246,10 @@ static void session_unsub(broker_t *b, pn_session_t *ssn) {
   }
 }
 
-static int exit_code = 0;
-
 static void check_condition(pn_event_t *e, pn_condition_t *cond) {
   if (pn_condition_is_set(cond)) {
     fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
             pn_condition_get_name(cond), pn_condition_get_description(cond));
-    exit_code = 1;              /* Remeber there was an unexpected error */
   }
 }
 
@@ -319,15 +317,22 @@ static void handle(broker_t* b, pn_event_t* e) {
    case PN_DELIVERY: {
      pn_delivery_t *d = pn_event_delivery(e);
      pn_link_t *r = pn_delivery_link(d);
-     if (pn_link_is_receiver(r) &&
-         pn_delivery_readable(d) && !pn_delivery_partial(d))
-     {
-       size_t size = pn_delivery_pending(d);
+     if (!pn_delivery_readable(d)) break;
+     for (size_t p = pn_delivery_pending(d); p > 0; p = pn_delivery_pending(d)) {
+       /* Append data to the reeving buffer */
+       b->receiving.size += p;
+       b->receiving.start = (char*)realloc(b->receiving.start, b->receiving.size);
+       int recv = pn_link_recv(r, b->receiving.start + b->receiving.size - p, p);
+       if (recv < 0 && recv != PN_EOS) {
+         fprintf(stderr, "PN_DELIVERY: pn_link_recv error %s\n", pn_code(recv));
+         break;
+       }
+     }
+     if (!pn_delivery_partial(d)) {
        /* The broker does not decode the message, just forwards it. */
-       pn_rwbytes_t m = { size, (char*)malloc(size) };
-       pn_link_recv(r, m.start, m.size);
        const char *qname = pn_terminus_get_address(pn_link_target(r));
-       queue_receive(b->proactor, queues_get(&b->queues, qname), m);
+       queue_receive(b->proactor, queues_get(&b->queues, qname), b->receiving);
+       b->receiving = pn_rwbytes_null;
        pn_delivery_update(d, PN_ACCEPTED);
        pn_delivery_settle(d);
        pn_link_flow(r, WINDOW - pn_link_credit(r));
@@ -419,5 +424,5 @@ int main(int argc, char **argv) {
   }
   pn_proactor_free(b.proactor);
   free(threads);
-  return exit_code;
+  return 0;
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5c998499/examples/c/receive.c
----------------------------------------------------------------------
diff --git a/examples/c/receive.c b/examples/c/receive.c
index a75de1d..1b3ac1a 100644
--- a/examples/c/receive.c
+++ b/examples/c/receive.c
@@ -40,6 +40,7 @@ typedef struct app_data_t {
   pn_proactor_t *proactor;
   int received;
   bool finished;
+  pn_rwbytes_t receiving;       /* Partially received message */
 } app_data_t;
 
 static const int BATCH = 1000; /* Batch size for unlimited receive */
@@ -55,31 +56,20 @@ static void check_condition(pn_event_t *e, pn_condition_t *cond) {
   }
 }
 
-#define MAX_SIZE 1024
-
-static void decode_message(pn_delivery_t *dlv) {
-  static char buffer[MAX_SIZE];
-  ssize_t len;
-  // try to decode the message body
-  if (pn_delivery_pending(dlv) < MAX_SIZE) {
-    // read in the raw data
-    len = pn_link_recv(pn_delivery_link(dlv), buffer, MAX_SIZE);
-    if (len > 0) {
-      // decode it into a proton message
-      pn_message_t *m = pn_message();
-      if (PN_OK == pn_message_decode(m, buffer, len)) {
-        pn_string_t *s = pn_string(NULL);
-        pn_inspect(pn_message_body(m), s);
-        printf("%s\n", pn_string_get(s));
-        pn_free(s);
-      }
-      pn_message_free(m);
-    } else if (len < 0) {
-      fprintf(stderr, "decode_message: %s\n", pn_code(len));
-      exit_code = 1;
-    } else {
-      fprintf(stderr, "decode_message: no data\n");
-    }
+static void decode_message(pn_rwbytes_t data) {
+  pn_message_t *m = pn_message();
+  int err = pn_message_decode(m, data.start, data.size);
+  if (!err) {
+    /* Print the decoded message */
+    pn_string_t *s = pn_string(NULL);
+    pn_inspect(pn_message_body(m), s);
+    printf("%s\n", pn_string_get(s));
+    pn_free(s);
+    pn_message_free(m);
+    free(data.start);
+  } else {
+    fprintf(stderr, "decode_message: %s\n", pn_code(err));
+    exit_code = 1;
   }
 }
 
@@ -102,28 +92,40 @@ static bool handle(app_data_t* app, pn_event_t* event) {
 
    case PN_DELIVERY: {
      /* A message has been received */
-     pn_link_t *link = NULL;
-     pn_delivery_t *dlv = pn_event_delivery(event);
-     if (pn_delivery_readable(dlv) && !pn_delivery_partial(dlv)) {
-       link = pn_delivery_link(dlv);
-       decode_message(dlv);
+     pn_delivery_t *d = pn_event_delivery(event);
+     pn_link_t *r = pn_delivery_link(d);
+     if (!pn_delivery_readable(d)) break;
+     for (size_t p = pn_delivery_pending(d); p > 0; p = pn_delivery_pending(d)) {
+       /* Append data to the receving buffer */
+       app->receiving.size += p;
+       app->receiving.start = (char*)realloc(app->receiving.start, app->receiving.size);
+       int recv = pn_link_recv(r, app->receiving.start + app->receiving.size - p, p);
+       if (recv < 0 && recv != PN_EOS) {
+         fprintf(stderr, "PN_DELIVERY: pn_link_recv error %s\n", pn_code(recv));
+         exit_code = 1;
+         break;
+       }
+     }
+     if (!pn_delivery_partial(d)) {
+       decode_message(app->receiving);
+       app->receiving = pn_rwbytes_null;
        /* Accept the delivery */
-       pn_delivery_update(dlv, PN_ACCEPTED);
+       pn_delivery_update(d, PN_ACCEPTED);
        /* done with the delivery, move to the next and free it */
-       pn_link_advance(link);
-       pn_delivery_settle(dlv);  /* dlv is now freed */
+       pn_link_advance(r);
+       pn_delivery_settle(d);  /* d is now freed */
 
        if (app->message_count == 0) {
          /* receive forever - see if more credit is needed */
-         if (pn_link_credit(link) < BATCH/2) {
+         if (pn_link_credit(r) < BATCH/2) {
            /* Grant enough credit to bring it up to BATCH: */
-           pn_link_flow(link, BATCH - pn_link_credit(link));
+           pn_link_flow(r, BATCH - pn_link_credit(r));
          }
        } else if (++app->received >= app->message_count) {
          /* done receiving, close the endpoints */
          printf("%d messages received\n", app->received);
-         pn_session_t *ssn = pn_link_session(link);
-         pn_link_close(link);
+         pn_session_t *ssn = pn_link_session(r);
+         pn_link_close(r);
          pn_session_close(ssn);
          pn_connection_close(pn_session_connection(ssn));
        }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5c998499/proton-c/include/proton/link.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/link.h b/proton-c/include/proton/link.h
index bf5cddc..6a5a86c 100644
--- a/proton-c/include/proton/link.h
+++ b/proton-c/include/proton/link.h
@@ -632,6 +632,8 @@ PN_EXTERN void pn_link_set_drain(pn_link_t *receiver, bool drain);
  * @param[in] n the buffer capacity
  * @return The number of bytes received, or an error code:
  *   - ::PN_EOS: The message has been completely received
+ *   - 0: No data available now.
+ *     If pn_delivery_partial() there will be further ::PN_DELIVERY events with more data.
  *   - ::PN_STATE_ERR: The link has no current delivery
  *   - ::PN_ABORTED: See pn_delivery_aborted()
  */

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5c998499/proton-c/include/proton/types.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/types.h b/proton-c/include/proton/types.h
index 414b865..07b3618 100644
--- a/proton-c/include/proton/types.h
+++ b/proton-c/include/proton/types.h
@@ -242,7 +242,7 @@ typedef struct pn_rwbytes_t {
  */
 PN_EXTERN pn_rwbytes_t pn_rwbytes(size_t size, char *start);
 
-static const pn_bytes_t pn_rwbytes_null = { 0, NULL };
+static const pn_rwbytes_t pn_rwbytes_null = { 0, NULL };
 
 /**
  * Holds the state flags for an AMQP endpoint.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org