You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/06/21 21:53:50 UTC

qpid-proton git commit: NO-JIRA: improved c message streaming test

Repository: qpid-proton
Updated Branches:
  refs/heads/master 7fefb47bc -> 0822dd921


NO-JIRA: improved c message streaming test

proactor.c verifies streaming works with small frame sizes and session windows.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/0822dd92
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/0822dd92
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/0822dd92

Branch: refs/heads/master
Commit: 0822dd92199e0e62198f566376ee277cee157c48
Parents: 7fefb47
Author: Alan Conway <ac...@redhat.com>
Authored: Wed Jun 21 17:49:00 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Wed Jun 21 17:52:18 2017 -0400

----------------------------------------------------------------------
 proton-c/src/tests/proactor.c | 49 ++++++++++++++++++++++++--------------
 1 file changed, 31 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0822dd92/proton-c/src/tests/proactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/proactor.c b/proton-c/src/tests/proactor.c
index ad02d55..817711e 100644
--- a/proton-c/src/tests/proactor.c
+++ b/proton-c/src/tests/proactor.c
@@ -928,17 +928,28 @@ static void test_disconnect(test_t *t) {
 
 struct message_stream_context {
   pn_link_t *sender;
+  pn_delivery_t *dlv;
   pn_rwbytes_t send_buf, recv_buf;
   ssize_t size, sent, received;
   bool complete;
 };
 
+#define FRAME 512                   /* Smallest legal frame */
+#define CHUNK (FRAME + FRAME/2)     /* Chunk overflows frame */
+#define BODY (CHUNK*3 + CHUNK/2)    /* Body doesn't fit into chunks */
+
 static pn_event_type_t message_stream_handler(test_handler_t *th, pn_event_t *e) {
-  static const ssize_t CHUNK = 100;
   struct message_stream_context *ctx = (struct message_stream_context*)th->context;
-  pn_event_type_t et = pn_event_type(e);
+  switch (pn_event_type(e)) {
+   case PN_CONNECTION_BOUND:
+    pn_transport_set_max_frame(pn_event_transport(e), FRAME);
+    return PN_EVENT_NONE;
+
+   case PN_SESSION_INIT:
+    pn_session_set_incoming_capacity(pn_event_session(e), FRAME); /* Single frame incoming */
+    pn_session_set_outgoing_window(pn_event_session(e), 1);       /* Single frame outgoing */
+    return PN_EVENT_NONE;
 
-  switch (et) {
    case PN_LINK_REMOTE_OPEN:
     common_handler(th, e);
     if (pn_link_is_receiver(pn_event_link(e))) {
@@ -949,10 +960,10 @@ static pn_event_type_t message_stream_handler(test_handler_t *th, pn_event_t *e)
     return PN_EVENT_NONE;
 
    case PN_LINK_FLOW:           /* Start a delivery */
-    if (pn_link_is_sender(pn_event_link(e))) {
-      pn_delivery(pn_event_link(e), pn_dtag("x", 1));
+    if (pn_link_is_sender(pn_event_link(e)) && !ctx->dlv) {
+      ctx->dlv = pn_delivery(pn_event_link(e), pn_dtag("x", 1));
     }
-    return et;
+    return PN_LINK_FLOW;
 
    case PN_CONNECTION_WAKE: {     /* Send a chunk */
      ssize_t remains = ctx->size - ctx->sent;
@@ -962,10 +973,10 @@ static pn_event_type_t message_stream_handler(test_handler_t *th, pn_event_t *e)
      if (ctx->sent == ctx->size) {
        TEST_CHECK(th->t, pn_link_advance(ctx->sender));
      }
-     return et;
+     return PN_CONNECTION_WAKE;
    }
 
-   case PN_DELIVERY: {          /* Receive a chunk */
+   case PN_DELIVERY: {          /* Receive a delvery - smaller than a chunk? */
      pn_delivery_t *dlv = pn_event_delivery(e);
      if (pn_delivery_readable(dlv)) {
        ssize_t n = pn_delivery_pending(dlv);
@@ -973,8 +984,8 @@ static pn_event_type_t message_stream_handler(test_handler_t *th, pn_event_t *e)
        TEST_ASSERT(n == pn_link_recv(pn_event_link(e), ctx->recv_buf.start + ctx->received, n));
        ctx->received += n;
      }
-     ctx->complete = !pn_delivery_partial(dlv); 
-     return et;
+     ctx->complete = !pn_delivery_partial(dlv);
+     return PN_DELIVERY;
    }
 
    default:
@@ -995,9 +1006,11 @@ static void test_message_stream(test_t *t) {
   tps[1].handler.context = &ctx;
 
   /* Encode a large (not very) message to send in chunks */
+  char *body = (char*)malloc(BODY);
+  memset(body, 'x', BODY);
   pn_message_t *m = pn_message();
-  char body[1024] = { 0 };
-  pn_data_put_binary(pn_message_body(m), pn_bytes(sizeof(body), body));
+  pn_data_put_binary(pn_message_body(m), pn_bytes(BODY, body));
+  free(body);
   ctx.size = message_encode(m, &ctx.send_buf);
   pn_message_free(m);
 
@@ -1010,14 +1023,14 @@ static void test_message_stream(test_t *t) {
   TEST_PROACTORS_RUN_UNTIL(tps, PN_LINK_FLOW);
 
   /* Send and receive the message in chunks */
-  while (ctx.sent < ctx.size) {
+  do {
     pn_connection_wake(c);      /* Initiate send/receive of one chunk */
-    TEST_PROACTORS_RUN_UNTIL(tps, PN_CONNECTION_WAKE);
-    TEST_PROACTORS_RUN_UNTIL(tps, PN_DELIVERY);
-  }
-  TEST_CHECK(t, ctx.complete);
-  TEST_CHECK(t, pn_link_advance(snd));
+    do {                        /* May be multiple receives for one send */
+      TEST_PROACTORS_RUN_UNTIL(tps, PN_DELIVERY);
+    } while (ctx.received < ctx.sent);
+  } while (!ctx.complete);
   TEST_CHECK(t, ctx.received == ctx.size);
+  TEST_CHECK(t, ctx.sent == ctx.size);
   TEST_CHECK(t, !memcmp(ctx.send_buf.start, ctx.recv_buf.start, ctx.size));
 
   free(ctx.send_buf.start);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org