You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/09/27 18:12:05 UTC

[1/2] qpid-proton git commit: PROTON-1512: c examples: clean up examples and fix some issues.

Repository: qpid-proton
Updated Branches:
  refs/heads/master ea02b9337 -> 58dfb4aa4


PROTON-1512: c examples: clean up examples and fix some issues.

All examples that receive messages use a consistent code sequence and error
reporting, all deal with aborted messages consistently. All receivers accumulate
messages in local buffer not in proton link buffers.

Fixed issues
- broker.c use per-link buffer for thread safety
- direct.c - wasn't accumulating messages in separate buffer

Added abort tests to example_test.py, using send-abort.c and verifying broker.c
and direct.c correctly report and drop aborted messages and continue to function.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/58dfb4aa
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/58dfb4aa
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/58dfb4aa

Branch: refs/heads/master
Commit: 58dfb4aa4424c0116443533639a42b7ceda774e7
Parents: 5b1be87
Author: Alan Conway <ac...@redhat.com>
Authored: Wed Sep 27 14:01:48 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Wed Sep 27 14:10:59 2017 -0400

----------------------------------------------------------------------
 examples/c/broker.c        |  36 +++++-----
 examples/c/direct.c        | 156 +++++++++++++++++++++-------------------
 examples/c/example_test.py |  59 +++++++++++----
 examples/c/receive.c       |  68 +++++++++---------
 4 files changed, 175 insertions(+), 144 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/58dfb4aa/examples/c/broker.c
----------------------------------------------------------------------
diff --git a/examples/c/broker.c b/examples/c/broker.c
index c2e4c16..3a4d04a 100644
--- a/examples/c/broker.c
+++ b/examples/c/broker.c
@@ -327,28 +327,26 @@ static void handle(broker_t* b, pn_event_t* e) {
    }
    case PN_DELIVERY: {          /* Incoming message data */
      pn_delivery_t *d = pn_event_delivery(e);
-     pn_link_t *l = pn_delivery_link(d);
-     if (!pn_delivery_readable(d)) break;
-     pn_rwbytes_t *m = message_buffer(l);
-     for (size_t p = pn_delivery_pending(d); p > 0; p = pn_delivery_pending(d)) {
-       /* Append data to the reeving buffer */
-       m->size += p;
+     if (pn_delivery_readable(d)) {
+       pn_link_t *l = pn_delivery_link(d);
+       size_t size = pn_delivery_pending(d);
+       pn_rwbytes_t* m = message_buffer(l); /* Append data to incoming message buffer */
+       m->size += size;
        m->start = (char*)realloc(m->start, m->size);
-       int recv = pn_link_recv(l, m->start + m->size - p, p);
-       if (recv < 0 && recv != PN_EOS) {
-         fprintf(stderr, "PN_DELIVERY: pn_link_recv error %s\n", pn_code(recv));
-         break;
+       int err = pn_link_recv(l, m->start, m->size);
+       if (err < 0 && err != PN_EOS) {
+         fprintf(stderr, "PN_DELIVERY error: %s\n", pn_code(err));
+         pn_delivery_settle(d); /* Free the delivery so we can receive the next message */
+         m->size = 0;           /* forget the data we accumulated */
+       } else if (!pn_delivery_partial(d)) { /* Message is complete */
+         const char *qname = pn_terminus_get_address(pn_link_target(l));
+         queue_receive(b->proactor, queues_get(&b->queues, qname), *m);
+         *m = pn_rwbytes_null;  /* Reset the buffer for the next message*/
+         pn_delivery_update(d, PN_ACCEPTED);
+         pn_delivery_settle(d);
+         pn_link_flow(l, WINDOW - pn_link_credit(l));
        }
      }
-     if (!pn_delivery_partial(d)) {
-       /* The broker does not decode the message, just forwards it. */
-       const char *qname = pn_terminus_get_address(pn_link_target(l));
-       queue_receive(b->proactor, queues_get(&b->queues, qname), *m);
-       *m = pn_rwbytes_null;
-       pn_delivery_update(d, PN_ACCEPTED);
-       pn_delivery_settle(d);
-       pn_link_flow(l, WINDOW - pn_link_credit(l));
-     }
      break;
    }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/58dfb4aa/examples/c/direct.c
----------------------------------------------------------------------
diff --git a/examples/c/direct.c b/examples/c/direct.c
index 15550e6..f55519e 100644
--- a/examples/c/direct.c
+++ b/examples/c/direct.c
@@ -41,7 +41,7 @@ typedef struct app_data_t {
 
   pn_proactor_t *proactor;
   pn_listener_t *listener;
-  pn_rwbytes_t message_buffer;
+  pn_rwbytes_t msgin, msgout;   /* Buffers for incoming/outgoing messages */
 
   /* Sender values */
   int sent;
@@ -56,11 +56,20 @@ static const int BATCH = 1000; /* Batch size for unlimited receive */
 
 static int exit_code = 0;
 
-static void check_condition(pn_event_t *e, pn_condition_t *cond) {
+/* Close the connection and the listener so so we will get a
+ * PN_PROACTOR_INACTIVE event and exit, once all outstanding events
+ * are processed.
+ */
+static void close_all(pn_connection_t *c, app_data_t *app) {
+  if (c) pn_connection_close(c);
+  if (app->listener) pn_listener_close(app->listener);
+}
+
+static void check_condition(pn_event_t *e, pn_condition_t *cond, app_data_t *app) {
   if (pn_condition_is_set(cond)) {
     fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
             pn_condition_get_name(cond), pn_condition_get_description(cond));
-    pn_connection_close(pn_event_connection(e));
+    close_all(pn_event_connection(e), app);
     exit_code = 1;
   }
 }
@@ -78,18 +87,18 @@ static pn_bytes_t encode_message(app_data_t* app) {
   pn_data_exit(body);
 
   /* encode the message, expanding the encode buffer as needed */
-  if (app->message_buffer.start == NULL) {
+  if (app->msgout.start == NULL) {
     static const size_t initial_size = 128;
-    app->message_buffer = pn_rwbytes(initial_size, (char*)malloc(initial_size));
+    app->msgout = pn_rwbytes(initial_size, (char*)malloc(initial_size));
   }
-  /* app->message_buffer is the total buffer space available. */
+  /* app->msgout is the total buffer space available. */
   /* mbuf wil point at just the portion used by the encoded message */
-  pn_rwbytes_t mbuf = pn_rwbytes(app->message_buffer.size, app->message_buffer.start);
+  pn_rwbytes_t mbuf = pn_rwbytes(app->msgout.size, app->msgout.start);
   int status = 0;
   while ((status = pn_message_encode(message, mbuf.start, &mbuf.size)) == PN_OVERFLOW) {
-    app->message_buffer.size *= 2;
-    app->message_buffer.start = (char*)realloc(app->message_buffer.start, app->message_buffer.size);
-    mbuf.size = app->message_buffer.size;
+    app->msgout.size *= 2;
+    app->msgout.start = (char*)realloc(app->msgout.start, app->msgout.size);
+    mbuf.size = app->msgout.size;
   }
   if (status != 0) {
     fprintf(stderr, "error encoding message: %s\n", pn_error_text(pn_message_error(message)));
@@ -99,26 +108,21 @@ static pn_bytes_t encode_message(app_data_t* app) {
   return pn_bytes(mbuf.size, mbuf.start);
 }
 
-#define MAX_SIZE 1024
-
-static void decode_message(pn_delivery_t *dlv) {
-  static char buffer[MAX_SIZE];
-  ssize_t len;
-  // try to decode the message body
-  if (pn_delivery_pending(dlv) < MAX_SIZE) {
-    // read in the raw data
-    len = pn_link_recv(pn_delivery_link(dlv), buffer, MAX_SIZE);
-    if (len > 0) {
-      // decode it into a proton message
-      pn_message_t *m = pn_message();
-      if (PN_OK == pn_message_decode(m, buffer, len)) {
-        pn_string_t *s = pn_string(NULL);
-        pn_inspect(pn_message_body(m), s);
-        printf("%s\n", pn_string_get(s));
-        pn_free(s);
-      }
-      pn_message_free(m);
-    }
+static void decode_message(pn_rwbytes_t data) {
+  pn_message_t *m = pn_message();
+  int err = pn_message_decode(m, data.start, data.size);
+  if (!err) {
+    /* Print the decoded message */
+    pn_string_t *s = pn_string(NULL);
+    pn_inspect(pn_message_body(m), s);
+    printf("%s\n", pn_string_get(s));
+    fflush(stdout);
+    pn_free(s);
+    pn_message_free(m);
+    free(data.start);
+  } else {
+    fprintf(stderr, "decode_message: %s\n", pn_code(err));
+    exit_code = 1;
   }
 }
 
@@ -132,36 +136,37 @@ static void handle_receive(app_data_t* app, pn_event_t* event) {
      pn_link_flow(l, app->message_count ? app->message_count : BATCH);
    } break;
 
-   case PN_DELIVERY: {
-     /* A message has been received */
-     pn_link_t *link = NULL;
-     pn_delivery_t *dlv = pn_event_delivery(event);
-     if (pn_delivery_readable(dlv) && !pn_delivery_partial(dlv)) {
-       link = pn_delivery_link(dlv);
-       decode_message(dlv);
-       /* Accept the delivery */
-       pn_delivery_update(dlv, PN_ACCEPTED);
-       /* done with the delivery, move to the next and free it */
-       pn_link_advance(link);
-       pn_delivery_settle(dlv);  /* dlv is now freed */
-
-       if (app->message_count == 0) {
-         /* receive forever - see if more credit is needed */
-         if (pn_link_credit(link) < BATCH/2) {
-           /* Grant enough credit to bring it up to BATCH: */
-           pn_link_flow(link, BATCH - pn_link_credit(link));
+   case PN_DELIVERY: {          /* Incoming message data */
+     pn_delivery_t *d = pn_event_delivery(event);
+     if (pn_delivery_readable(d)) {
+       pn_link_t *l = pn_delivery_link(d);
+       size_t size = pn_delivery_pending(d);
+       pn_rwbytes_t* m = &app->msgin; /* Append data to incoming message buffer */
+       m->size += size;
+       m->start = (char*)realloc(m->start, m->size);
+       int err = pn_link_recv(l, m->start, m->size);
+       if (err < 0 && err != PN_EOS) {
+         fprintf(stderr, "PN_DELIVERY error: %s\n", pn_code(err));
+         pn_delivery_settle(d); /* Free the delivery so we can receive the next message */
+         m->size = 0;           /* forget the data we accumulated */
+       } else if (!pn_delivery_partial(d)) { /* Message is complete */
+         decode_message(*m);
+         *m = pn_rwbytes_null;
+         pn_delivery_update(d, PN_ACCEPTED);
+         pn_delivery_settle(d);  /* settle and free d */
+         if (app->message_count == 0) {
+           /* receive forever - see if more credit is needed */
+           if (pn_link_credit(l) < BATCH/2) {
+             pn_link_flow(l, BATCH - pn_link_credit(l));
+           }
+         } else if (++app->received >= app->message_count) {
+           printf("%d messages received\n", app->received);
+           close_all(pn_event_connection(event), app);
          }
-       } else if (++app->received >= app->message_count) {
-         /* done receiving, close the endpoints */
-         printf("%d messages received\n", app->received);
-         pn_session_t *ssn = pn_link_session(link);
-         pn_link_close(link);
-         pn_session_close(ssn);
-         pn_connection_close(pn_session_connection(ssn));
        }
      }
-   } break;
-
+     break;
+   }
    default:
     break;
   }
@@ -197,8 +202,7 @@ static void handle_send(app_data_t* app, pn_event_t* event) {
      if (pn_delivery_remote_state(d) == PN_ACCEPTED) {
        if (++app->acknowledged == app->message_count) {
          printf("%d messages sent and acknowledged\n", app->acknowledged);
-         pn_connection_close(pn_event_connection(event));
-         /* Continue handling events till we receive TRANSPORT_CLOSED */
+         close_all(pn_event_connection(event), app);
        }
      }
    } break;
@@ -244,24 +248,25 @@ static bool handle(app_data_t* app, pn_event_t* event) {
    }
 
    case PN_TRANSPORT_CLOSED:
-    check_condition(event, pn_transport_condition(pn_event_transport(event)));
-    pn_listener_close(app->listener); /* Finished */
+    check_condition(event, pn_transport_condition(pn_event_transport(event)), app);
     break;
 
    case PN_CONNECTION_REMOTE_CLOSE:
-    check_condition(event, pn_connection_remote_condition(pn_event_connection(event)));
-    pn_connection_close(pn_event_connection(event));
+    check_condition(event, pn_connection_remote_condition(pn_event_connection(event)), app);
+    pn_connection_close(pn_event_connection(event)); /* Return the close */
     break;
 
    case PN_SESSION_REMOTE_CLOSE:
-    check_condition(event, pn_session_remote_condition(pn_event_session(event)));
-    pn_connection_close(pn_event_connection(event));
+    check_condition(event, pn_session_remote_condition(pn_event_session(event)), app);
+    pn_session_close(pn_event_session(event)); /* Return the close */
+    pn_session_free(pn_event_session(event));
     break;
 
    case PN_LINK_REMOTE_CLOSE:
    case PN_LINK_REMOTE_DETACH:
-    check_condition(event, pn_link_remote_condition(pn_event_link(event)));
-    pn_connection_close(pn_event_connection(event));
+    check_condition(event, pn_link_remote_condition(pn_event_link(event)), app);
+    pn_link_close(pn_event_link(event)); /* Return the close */
+    pn_link_free(pn_event_link(event));
     break;
 
    case PN_PROACTOR_TIMEOUT:
@@ -270,7 +275,8 @@ static bool handle(app_data_t* app, pn_event_t* event) {
     break;
 
    case PN_LISTENER_CLOSE:
-    check_condition(event, pn_listener_condition(pn_event_listener(event)));
+    app->listener = NULL;        /* Listener is closed */
+    check_condition(event, pn_listener_condition(pn_event_listener(event)), app);
     break;
 
    case PN_PROACTOR_INACTIVE:
@@ -306,12 +312,11 @@ void run(app_data_t *app) {
 
 int main(int argc, char **argv) {
   struct app_data_t app = {0};
-  int i = 0;
-  app.container_id = argv[i++];   /* Should be unique */
-  app.host = (argc > 1) ? argv[i++] : "";
-  app.port = (argc > 1) ? argv[i++] : "amqp";
-  app.amqp_address = (argc > i) ? argv[i++] : "examples";
-  app.message_count = (argc > i) ? atoi(argv[i++]) : 10;
+  app.container_id = argv[0];   /* Should be unique */
+  app.host = (argc > 1) ? argv[1] : "";
+  app.port = (argc > 2) ? argv[2] : "amqp";
+  app.amqp_address = (argc > 3) ? argv[3] : "examples";
+  app.message_count = (argc > 4) ? atoi(argv[4]) : 10;
 
   /* Create the proactor and connect */
   app.proactor = pn_proactor();
@@ -321,6 +326,7 @@ int main(int argc, char **argv) {
   pn_proactor_listen(app.proactor, app.listener, addr, 16);
   run(&app);
   pn_proactor_free(app.proactor);
-  free(app.message_buffer.start);
+  free(app.msgout.start);
+  free(app.msgin.start);
   return exit_code;
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/58dfb4aa/examples/c/example_test.py
----------------------------------------------------------------------
diff --git a/examples/c/example_test.py b/examples/c/example_test.py
index 02bb1fd..ee3e4e4 100644
--- a/examples/c/example_test.py
+++ b/examples/c/example_test.py
@@ -26,8 +26,13 @@ def python_cmd(name):
     dir = os.path.dirname(__file__)
     return [sys.executable, os.path.join(dir, "..", "..", "python", name)]
 
-def receive_expect(n):
-    return ''.join('{"sequence"=%s}\n'%i for i in xrange(1, n+1)) + "%s messages received\n"%n
+def receive_expect_messages(n=10): return ''.join(['{"sequence"=%s}\n'%i for i in xrange(1, n+1)])
+def receive_expect_total(n=10): return "%s messages received\n"%n
+def receive_expect(n=10): return receive_expect_messages(n)+receive_expect_total(n)
+
+def send_expect(n=10): return "%s messages sent and acknowledged\n" % n
+def send_abort_expect(n=10): return "%s messages started and aborted\n" % n
+
 
 class Broker(object):
     def __init__(self, test):
@@ -51,38 +56,64 @@ class Broker(object):
 
 class CExampleTest(ProcTestCase):
 
+    def runex(self, name, port, *args):
+        """Run an example with standard arugments, return output"""
+        return self.proc([name, "", port, "examples"] + list(args)).wait_exit()
+
     def test_send_receive(self):
         """Send first then receive"""
         with Broker(self) as b:
-            s = self.proc(["send", "", b.port])
-            self.assertEqual("10 messages sent and acknowledged\n", s.wait_exit())
-            r = self.proc(["receive", "", b.port])
-            self.assertEqual(receive_expect(10), r.wait_exit())
+            self.assertEqual(send_expect(), self.runex("send", b.port))
+            self.assertMultiLineEqual(receive_expect(), self.runex("receive", b.port))
 
     def test_receive_send(self):
         """Start receiving  first, then send."""
         with Broker(self) as b:
-            r = self.proc(["receive", "", b.port]);
-            s = self.proc(["send", "", b.port]);
-            self.assertEqual("10 messages sent and acknowledged\n", s.wait_exit())
-            self.assertEqual(receive_expect(10), r.wait_exit())
+            self.assertEqual(send_expect(), self.runex("send", b.port))
+            self.assertMultiLineEqual(receive_expect(), self.runex("receive", b.port))
 
     def test_send_direct(self):
         """Send to direct server"""
         with TestPort() as tp:
             d = self.proc(["direct", "", tp.port])
             d.wait_re("listening")
-            self.assertEqual("10 messages sent and acknowledged\n", self.proc(["send", "", tp.port]).wait_exit())
-            self.assertIn(receive_expect(10), d.wait_exit())
+            self.assertEqual(send_expect(), self.runex("send", tp.port))
+            self.assertMultiLineEqual("listening\n"+receive_expect(), d.wait_exit())
 
     def test_receive_direct(self):
         """Receive from direct server"""
         with TestPort() as tp:
             d = self.proc(["direct", "", tp.port])
             d.wait_re("listening")
-            self.assertEqual(receive_expect(10), self.proc(["receive", "", tp.port]).wait_exit())
-            self.assertIn("10 messages sent and acknowledged\n", d.wait_exit())
+            self.assertMultiLineEqual(receive_expect(), self.runex("receive", tp.port))
+            self.assertEqual("listening\n10 messages sent and acknowledged\n", d.wait_exit())
 
+    def test_send_abort_broker(self):
+        """Sending aborted messages to a broker"""
+        with Broker(self) as b:
+            self.assertEqual(send_expect(), self.runex("send", b.port))
+            self.assertEqual(send_abort_expect(), self.runex("send-abort", b.port))
+            b.proc.wait_re("PN_DELIVERY error: PN_ABORTED\n"*10)
+            self.assertEqual(send_expect(), self.runex("send", b.port))
+            expect = receive_expect_messages(10)+receive_expect_messages(10)+receive_expect_total(20)
+            self.assertMultiLineEqual(expect, self.runex("receive", b.port, "20"))
+
+    def test_send_abort_direct(self):
+        """Send aborted messages to the direct server"""
+        with TestPort() as tp:
+            d = self.proc(["direct", "", tp.port, "examples", "20"])
+            expect = "listening\n"
+            d.wait_re(expect)
+            self.assertEqual(send_expect(), self.runex("send", tp.port))
+            expect += receive_expect_messages()
+            d.wait_re(expect)
+            self.assertEqual(send_abort_expect(), self.runex("send-abort", tp.port))
+            expect += "PN_DELIVERY error: PN_ABORTED\n"*10
+            d.wait_re(expect)
+            self.assertEqual(send_expect(), self.runex("send", tp.port))
+            expect += receive_expect_messages()+receive_expect_total(20)
+            self.maxDiff = None
+            self.assertMultiLineEqual(expect, d.wait_exit())
 
 if __name__ == "__main__":
     unittest.main()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/58dfb4aa/examples/c/receive.c
----------------------------------------------------------------------
diff --git a/examples/c/receive.c b/examples/c/receive.c
index 1b3ac1a..7df955a 100644
--- a/examples/c/receive.c
+++ b/examples/c/receive.c
@@ -40,7 +40,7 @@ typedef struct app_data_t {
   pn_proactor_t *proactor;
   int received;
   bool finished;
-  pn_rwbytes_t receiving;       /* Partially received message */
+  pn_rwbytes_t msgin;       /* Partially received message */
 } app_data_t;
 
 static const int BATCH = 1000; /* Batch size for unlimited receive */
@@ -93,44 +93,40 @@ static bool handle(app_data_t* app, pn_event_t* event) {
    case PN_DELIVERY: {
      /* A message has been received */
      pn_delivery_t *d = pn_event_delivery(event);
-     pn_link_t *r = pn_delivery_link(d);
-     if (!pn_delivery_readable(d)) break;
-     for (size_t p = pn_delivery_pending(d); p > 0; p = pn_delivery_pending(d)) {
-       /* Append data to the receving buffer */
-       app->receiving.size += p;
-       app->receiving.start = (char*)realloc(app->receiving.start, app->receiving.size);
-       int recv = pn_link_recv(r, app->receiving.start + app->receiving.size - p, p);
-       if (recv < 0 && recv != PN_EOS) {
-         fprintf(stderr, "PN_DELIVERY: pn_link_recv error %s\n", pn_code(recv));
-         exit_code = 1;
-         break;
-       }
-     }
-     if (!pn_delivery_partial(d)) {
-       decode_message(app->receiving);
-       app->receiving = pn_rwbytes_null;
-       /* Accept the delivery */
-       pn_delivery_update(d, PN_ACCEPTED);
-       /* done with the delivery, move to the next and free it */
-       pn_link_advance(r);
-       pn_delivery_settle(d);  /* d is now freed */
-
-       if (app->message_count == 0) {
-         /* receive forever - see if more credit is needed */
-         if (pn_link_credit(r) < BATCH/2) {
-           /* Grant enough credit to bring it up to BATCH: */
-           pn_link_flow(r, BATCH - pn_link_credit(r));
+     if (pn_delivery_readable(d)) {
+       pn_link_t *l = pn_delivery_link(d);
+       size_t size = pn_delivery_pending(d);
+       pn_rwbytes_t* m = &app->msgin; /* Append data to incoming message buffer */
+       m->size += size;
+       m->start = (char*)realloc(m->start, m->size);
+       int err = pn_link_recv(l, m->start, m->size);
+       if (err < 0 && err != PN_EOS) {
+         fprintf(stderr, "PN_DELIVERY error: %s\n", pn_code(err));
+         pn_delivery_settle(d); /* Free the delivery so we can receive the next message */
+         m->size = 0;           /* forget the data we accumulated */
+       } else if (!pn_delivery_partial(d)) { /* Message is complete */
+         decode_message(*m);
+         *m = pn_rwbytes_null;  /* Reset the buffer for the next message*/
+         /* Accept the delivery */
+         pn_delivery_update(d, PN_ACCEPTED);
+         pn_delivery_settle(d);  /* settle and free d */
+         if (app->message_count == 0) {
+           /* receive forever - see if more credit is needed */
+           if (pn_link_credit(l) < BATCH/2) {
+             /* Grant enough credit to bring it up to BATCH: */
+             pn_link_flow(l, BATCH - pn_link_credit(l));
+           }
+         } else if (++app->received >= app->message_count) {
+           printf("%d messages received\n", app->received);
+           pn_session_t *ssn = pn_link_session(l);
+           pn_link_close(l);
+           pn_session_close(ssn);
+           pn_connection_close(pn_session_connection(ssn));
          }
-       } else if (++app->received >= app->message_count) {
-         /* done receiving, close the endpoints */
-         printf("%d messages received\n", app->received);
-         pn_session_t *ssn = pn_link_session(r);
-         pn_link_close(r);
-         pn_session_close(ssn);
-         pn_connection_close(pn_session_connection(ssn));
        }
      }
-   } break;
+     break;
+   }
 
    case PN_TRANSPORT_CLOSED:
     check_condition(event, pn_transport_condition(pn_event_transport(event)));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/2] qpid-proton git commit: PROTON-1512: c examples: fix thread safety issue in broker

Posted by ac...@apache.org.
PROTON-1512: c examples: fix thread safety issue in broker

Replace global receiving buffer with buffer-per-link to accumulate messages correctly.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/5b1be877
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/5b1be877
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/5b1be877

Branch: refs/heads/master
Commit: 5b1be8771faeafa7401d5f96705d38d266c61206
Parents: ea02b93
Author: Alan Conway <ac...@redhat.com>
Authored: Wed Sep 27 10:17:28 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Wed Sep 27 14:10:59 2017 -0400

----------------------------------------------------------------------
 examples/c/broker.c | 54 +++++++++++++++++++++++++++++-------------------
 1 file changed, 33 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5b1be877/examples/c/broker.c
----------------------------------------------------------------------
diff --git a/examples/c/broker.c b/examples/c/broker.c
index 9129c3f..c2e4c16 100644
--- a/examples/c/broker.c
+++ b/examples/c/broker.c
@@ -119,20 +119,24 @@ static void queue_send(queue_t *q, pn_link_t *s) {
   }
 }
 
-/* Data associated with each broker connection */
-typedef struct broker_data_t {
-  bool check_queues;          /* Check senders on the connection for available data in queues. */
-} broker_data_t;
-
-/* Use the context pointer as a boolean flag to indicate we need to check queues */
-void pn_connection_set_check_queues(pn_connection_t *c, bool check) {
+/* Use the connection context pointer as a boolean flag to indicate we need to check queues */
+void set_check_queues(pn_connection_t *c, bool check) {
   pn_connection_set_context(c, (void*)check);
 }
 
-bool pn_connection_get_check_queues(pn_connection_t *c) {
+bool get_check_queues(pn_connection_t *c) {
   return (bool)pn_connection_get_context(c);
 }
 
+/* Use a buffer per link to accumulate message data - message can arrive in multiple deliveries,
+   and the broker can receive messages on many concurrently. */
+pn_rwbytes_t *message_buffer(pn_link_t *l) {
+  if (!pn_link_get_context(l)) {
+    pn_link_set_context(l, calloc(1, sizeof(pn_rwbytes_t)));
+  }
+  return (pn_rwbytes_t*)pn_link_get_context(l);
+}
+
 /* Put a message on the queue, called in receiver dispatch loop.
    If the queue was previously empty, notify waiting senders.
 */
@@ -142,7 +146,7 @@ static void queue_receive(pn_proactor_t *d, queue_t *q, pn_rwbytes_t m) {
   if (q->messages.len == 1) { /* Was empty, notify waiting connections */
     for (size_t i = 0; i < q->waiting.len; ++i) {
       pn_connection_t *c = q->waiting.data[i];
-      pn_connection_set_check_queues(c, true);
+      set_check_queues(c, true);
       pn_connection_wake(c); /* Wake the connection */
     }
     q->waiting.len = 0;
@@ -192,7 +196,6 @@ typedef struct broker_t {
   const char *container_id;     /* AMQP container-id */
   queues_t queues;
   bool finished;
-  pn_rwbytes_t receiving;       /* Partially received message data */
 } broker_t;
 
 void broker_stop(broker_t *b) {
@@ -285,8 +288,8 @@ static void handle(broker_t* b, pn_event_t* e) {
      break;
    }
    case PN_CONNECTION_WAKE: {
-     if (pn_connection_get_check_queues(c)) {
-       pn_connection_set_check_queues(c, false);
+     if (get_check_queues(c)) {
+       set_check_queues(c, false);
        int flags = PN_LOCAL_ACTIVE&PN_REMOTE_ACTIVE;
        for (pn_link_t *l = pn_link_head(c, flags); l != NULL; l = pn_link_next(l, flags))
          link_send(b, l);
@@ -314,15 +317,24 @@ static void handle(broker_t* b, pn_event_t* e) {
      link_send(b, pn_event_link(e));
      break;
    }
-   case PN_DELIVERY: {
+   case PN_LINK_FINAL: {
+     pn_rwbytes_t *buf = (pn_rwbytes_t*)pn_link_get_context(pn_event_link(e));
+     if (buf) {
+       free(buf->start);
+       free(buf);
+     }
+     break;
+   }
+   case PN_DELIVERY: {          /* Incoming message data */
      pn_delivery_t *d = pn_event_delivery(e);
-     pn_link_t *r = pn_delivery_link(d);
+     pn_link_t *l = pn_delivery_link(d);
      if (!pn_delivery_readable(d)) break;
+     pn_rwbytes_t *m = message_buffer(l);
      for (size_t p = pn_delivery_pending(d); p > 0; p = pn_delivery_pending(d)) {
        /* Append data to the reeving buffer */
-       b->receiving.size += p;
-       b->receiving.start = (char*)realloc(b->receiving.start, b->receiving.size);
-       int recv = pn_link_recv(r, b->receiving.start + b->receiving.size - p, p);
+       m->size += p;
+       m->start = (char*)realloc(m->start, m->size);
+       int recv = pn_link_recv(l, m->start + m->size - p, p);
        if (recv < 0 && recv != PN_EOS) {
          fprintf(stderr, "PN_DELIVERY: pn_link_recv error %s\n", pn_code(recv));
          break;
@@ -330,12 +342,12 @@ static void handle(broker_t* b, pn_event_t* e) {
      }
      if (!pn_delivery_partial(d)) {
        /* The broker does not decode the message, just forwards it. */
-       const char *qname = pn_terminus_get_address(pn_link_target(r));
-       queue_receive(b->proactor, queues_get(&b->queues, qname), b->receiving);
-       b->receiving = pn_rwbytes_null;
+       const char *qname = pn_terminus_get_address(pn_link_target(l));
+       queue_receive(b->proactor, queues_get(&b->queues, qname), *m);
+       *m = pn_rwbytes_null;
        pn_delivery_update(d, PN_ACCEPTED);
        pn_delivery_settle(d);
-       pn_link_flow(r, WINDOW - pn_link_credit(r));
+       pn_link_flow(l, WINDOW - pn_link_credit(l));
      }
      break;
    }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org