You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/06/21 21:53:50 UTC
qpid-proton git commit: NO-JIRA: improved c message streaming test
Repository: qpid-proton
Updated Branches:
refs/heads/master 7fefb47bc -> 0822dd921
NO-JIRA: improved c message streaming test
proactor.c verifies streaming works with small frame sizes and session windows.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/0822dd92
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/0822dd92
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/0822dd92
Branch: refs/heads/master
Commit: 0822dd92199e0e62198f566376ee277cee157c48
Parents: 7fefb47
Author: Alan Conway <ac...@redhat.com>
Authored: Wed Jun 21 17:49:00 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Wed Jun 21 17:52:18 2017 -0400
----------------------------------------------------------------------
proton-c/src/tests/proactor.c | 49 ++++++++++++++++++++++++--------------
1 file changed, 31 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0822dd92/proton-c/src/tests/proactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/proactor.c b/proton-c/src/tests/proactor.c
index ad02d55..817711e 100644
--- a/proton-c/src/tests/proactor.c
+++ b/proton-c/src/tests/proactor.c
@@ -928,17 +928,28 @@ static void test_disconnect(test_t *t) {
struct message_stream_context {
pn_link_t *sender;
+ pn_delivery_t *dlv;
pn_rwbytes_t send_buf, recv_buf;
ssize_t size, sent, received;
bool complete;
};
+#define FRAME 512 /* Smallest legal frame */
+#define CHUNK (FRAME + FRAME/2) /* Chunk overflows frame */
+#define BODY (CHUNK*3 + CHUNK/2) /* Body doesn't fit into chunks */
+
static pn_event_type_t message_stream_handler(test_handler_t *th, pn_event_t *e) {
- static const ssize_t CHUNK = 100;
struct message_stream_context *ctx = (struct message_stream_context*)th->context;
- pn_event_type_t et = pn_event_type(e);
+ switch (pn_event_type(e)) {
+ case PN_CONNECTION_BOUND:
+ pn_transport_set_max_frame(pn_event_transport(e), FRAME);
+ return PN_EVENT_NONE;
+
+ case PN_SESSION_INIT:
+ pn_session_set_incoming_capacity(pn_event_session(e), FRAME); /* Single frame incoming */
+ pn_session_set_outgoing_window(pn_event_session(e), 1); /* Single frame outgoing */
+ return PN_EVENT_NONE;
- switch (et) {
case PN_LINK_REMOTE_OPEN:
common_handler(th, e);
if (pn_link_is_receiver(pn_event_link(e))) {
@@ -949,10 +960,10 @@ static pn_event_type_t message_stream_handler(test_handler_t *th, pn_event_t *e)
return PN_EVENT_NONE;
case PN_LINK_FLOW: /* Start a delivery */
- if (pn_link_is_sender(pn_event_link(e))) {
- pn_delivery(pn_event_link(e), pn_dtag("x", 1));
+ if (pn_link_is_sender(pn_event_link(e)) && !ctx->dlv) {
+ ctx->dlv = pn_delivery(pn_event_link(e), pn_dtag("x", 1));
}
- return et;
+ return PN_LINK_FLOW;
case PN_CONNECTION_WAKE: { /* Send a chunk */
ssize_t remains = ctx->size - ctx->sent;
@@ -962,10 +973,10 @@ static pn_event_type_t message_stream_handler(test_handler_t *th, pn_event_t *e)
if (ctx->sent == ctx->size) {
TEST_CHECK(th->t, pn_link_advance(ctx->sender));
}
- return et;
+ return PN_CONNECTION_WAKE;
}
- case PN_DELIVERY: { /* Receive a chunk */
+ case PN_DELIVERY: { /* Receive a delvery - smaller than a chunk? */
pn_delivery_t *dlv = pn_event_delivery(e);
if (pn_delivery_readable(dlv)) {
ssize_t n = pn_delivery_pending(dlv);
@@ -973,8 +984,8 @@ static pn_event_type_t message_stream_handler(test_handler_t *th, pn_event_t *e)
TEST_ASSERT(n == pn_link_recv(pn_event_link(e), ctx->recv_buf.start + ctx->received, n));
ctx->received += n;
}
- ctx->complete = !pn_delivery_partial(dlv);
- return et;
+ ctx->complete = !pn_delivery_partial(dlv);
+ return PN_DELIVERY;
}
default:
@@ -995,9 +1006,11 @@ static void test_message_stream(test_t *t) {
tps[1].handler.context = &ctx;
/* Encode a large (not very) message to send in chunks */
+ char *body = (char*)malloc(BODY);
+ memset(body, 'x', BODY);
pn_message_t *m = pn_message();
- char body[1024] = { 0 };
- pn_data_put_binary(pn_message_body(m), pn_bytes(sizeof(body), body));
+ pn_data_put_binary(pn_message_body(m), pn_bytes(BODY, body));
+ free(body);
ctx.size = message_encode(m, &ctx.send_buf);
pn_message_free(m);
@@ -1010,14 +1023,14 @@ static void test_message_stream(test_t *t) {
TEST_PROACTORS_RUN_UNTIL(tps, PN_LINK_FLOW);
/* Send and receive the message in chunks */
- while (ctx.sent < ctx.size) {
+ do {
pn_connection_wake(c); /* Initiate send/receive of one chunk */
- TEST_PROACTORS_RUN_UNTIL(tps, PN_CONNECTION_WAKE);
- TEST_PROACTORS_RUN_UNTIL(tps, PN_DELIVERY);
- }
- TEST_CHECK(t, ctx.complete);
- TEST_CHECK(t, pn_link_advance(snd));
+ do { /* May be multiple receives for one send */
+ TEST_PROACTORS_RUN_UNTIL(tps, PN_DELIVERY);
+ } while (ctx.received < ctx.sent);
+ } while (!ctx.complete);
TEST_CHECK(t, ctx.received == ctx.size);
+ TEST_CHECK(t, ctx.sent == ctx.size);
TEST_CHECK(t, !memcmp(ctx.send_buf.start, ctx.recv_buf.start, ctx.size));
free(ctx.send_buf.start);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org