You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2020/08/04 18:58:06 UTC
[qpid-dispatch] 20/32: Dataplane: Updates to the message-extend
(return buffer count for flow control). Added bidirectional streaming test
to ref adaptor.
This is an automated email from the ASF dual-hosted git repository.
tross pushed a commit to branch dev-protocol-adaptors
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
commit 1463337dcc13a73b357fbfe15b5d8f0f8dba34a8
Author: Ted Ross <tr...@apache.org>
AuthorDate: Mon Jun 8 09:49:37 2020 -0400
Dataplane: Updates to the message-extend (return buffer count for flow control). Added bidirectional streaming test to ref adaptor.
---
include/qpid/dispatch/message.h | 8 ++-
src/adaptors/reference_adaptor.c | 73 ++++++++++++++++------
src/message.c | 5 +-
.../streaming_link_scrubber.c | 2 +-
4 files changed, 67 insertions(+), 21 deletions(-)
diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h
index 0f63b1a..1a7bce2 100644
--- a/include/qpid/dispatch/message.h
+++ b/include/qpid/dispatch/message.h
@@ -294,9 +294,15 @@ void qd_message_compose_5(qd_message_t *msg,
bool complete);
/**
+ * qd_message_extend
+ *
* Extend the content of a streaming message with more buffers.
+ *
+ * @param msg Pointer to a message
+ * @param buffers A list of buffers to be appended to the end of the message's stream
+ * @return The number of buffers stored in the message's content
*/
-void qd_message_extend(qd_message_t *msg, qd_buffer_list_t *buffers);
+int qd_message_extend(qd_message_t *msg, qd_buffer_list_t *buffers);
/** Put string representation of a message suitable for logging in buffer.
diff --git a/src/adaptors/reference_adaptor.c b/src/adaptors/reference_adaptor.c
index f227542..7a65c17 100644
--- a/src/adaptors/reference_adaptor.c
+++ b/src/adaptors/reference_adaptor.c
@@ -37,6 +37,7 @@ typedef struct qdr_ref_adaptor_t {
qdr_connection_t *conn;
qdr_link_t *out_link_1;
qdr_link_t *out_link_2;
+ qdr_link_t *in_link_2;
qdr_link_t *dynamic_in_link;
char *reply_to;
qd_message_t *streaming_message;
@@ -123,6 +124,16 @@ static void qdr_ref_second_attach(void *context, qdr_link_t *link,
"ref.2", //const char *name,
0, //const char *terminus_addr,
&link_id);
+
+ source = qdr_terminus(0);
+ qdr_terminus_set_address(source, address2);
+ adaptor->in_link_2 = qdr_link_first_attach(adaptor->conn,
+ QD_OUTGOING,
+ source, //qdr_terminus_t *source,
+ qdr_terminus(0), //qdr_terminus_t *target,
+ "ref.3", //const char *name,
+ 0, //const char *terminus_addr,
+ &link_id);
}
}
@@ -193,6 +204,7 @@ static void qdr_ref_flow(void *context, qdr_link_t *link, int credit)
qd_message_compose_5(adaptor->streaming_message, props, 0, false);
qd_compose_free(props);
+ printf("qdr_ref_flow: Starting a streaming delivery\n");
adaptor->streaming_delivery =
qdr_link_deliver(adaptor->out_link_2, adaptor->streaming_message, 0, false, 0, 0);
adaptor->stream_count = 0;
@@ -231,20 +243,34 @@ static uint64_t qdr_ref_deliver(void *context, qdr_link_t *link, qdr_delivery_t
qdr_ref_adaptor_t *adaptor = (qdr_ref_adaptor_t*) context;
qd_message_t *msg = qdr_delivery_message(delivery);
- qd_message_depth_status_t status = qd_message_check_depth(msg, QD_DEPTH_BODY);
-
- if (status == QD_MESSAGE_DEPTH_OK) {
- qd_iterator_t *body_iter = qd_message_field_iterator(msg, QD_FIELD_BODY);
- char *body = (char*) qd_iterator_copy(body_iter);
- printf("qdr_ref_deliver: message received, body=%s\n", body);
- free(body);
- qd_iterator_free(body_iter);
- qd_message_set_send_complete(msg);
+ qd_message_depth_status_t status = qd_message_check_depth(msg, QD_DEPTH_APPLICATION_PROPERTIES);
+
+ switch (status) {
+ case QD_MESSAGE_DEPTH_OK: {
+ if (qd_message_receive_complete(msg)) {
+ qd_iterator_t *body_iter = qd_message_field_iterator(msg, QD_FIELD_BODY);
+ char *body = (char*) qd_iterator_copy(body_iter);
+ printf("qdr_ref_deliver: complete message received, body=%s\n", body);
+ free(body);
+ qd_iterator_free(body_iter);
+ qd_message_set_send_complete(msg);
+ qdr_link_flow(adaptor->core, link, 1, false);
+ return PN_ACCEPTED; // This will cause the delivery to be settled
+ }
+ break;
}
- qdr_link_flow(adaptor->core, link, 1, false);
+ case QD_MESSAGE_DEPTH_INVALID:
+ printf("qdr_ref_deliver: message invalid\n");
+ qdr_link_flow(adaptor->core, link, 1, false);
+ break;
+
+ case QD_MESSAGE_DEPTH_INCOMPLETE:
+ printf("qdr_ref_deliver: message incomplete\n");
+ break;
+ }
- return PN_ACCEPTED; // This will cause the delivery to be settled
+ return 0;
}
@@ -269,7 +295,12 @@ static void qdr_ref_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t
}
printf("qdr_ref_delivery_update: disp=%s settled=%s\n", dispname, settled ? "true" : "false");
- if (settled && qdr_delivery_link(dlv) == adaptor->out_link_1)
+ if (qdr_delivery_link(dlv) == adaptor->out_link_2 && qdr_delivery_message(dlv) == adaptor->streaming_message) {
+ adaptor->streaming_message = 0;
+ adaptor->stream_count = 0;
+ }
+
+ if (settled)
qdr_delivery_decref(adaptor->core, dlv, "qdr_ref_delivery_update - settled delivery");
}
@@ -352,23 +383,30 @@ static void on_stream(void *context)
qdr_ref_adaptor_t *adaptor = (qdr_ref_adaptor_t*) context;
const char *content = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
const size_t content_length = strlen(content);
- qd_buffer_t *buf = qd_buffer();
- qd_buffer_list_t buffers;
+
+ if (!adaptor->streaming_message)
+ return;
+
+ qd_buffer_t *buf = qd_buffer();
+ qd_buffer_list_t buffers;
DEQ_INIT(buffers);
DEQ_INSERT_TAIL(buffers, buf);
memcpy(qd_buffer_cursor(buf), content, content_length);
qd_buffer_insert(buf, content_length);
- qd_message_extend(adaptor->streaming_message, &buffers);
+ int depth = qd_message_extend(adaptor->streaming_message, &buffers);
qdr_delivery_continue(adaptor->core, adaptor->streaming_delivery, false);
- if (adaptor->stream_count < 30) {
- qd_timer_schedule(adaptor->stream_timer, 1000);
+ if (adaptor->stream_count < 10) {
+ qd_timer_schedule(adaptor->stream_timer, 100);
adaptor->stream_count++;
+ printf("on_stream: sent streamed frame %d, depth=%d\n", adaptor->stream_count, depth);
} else {
qd_message_set_receive_complete(adaptor->streaming_message);
adaptor->streaming_message = 0;
+ adaptor->stream_count = 0;
+ printf("on_stream: completed streaming send, depth=%d\n", depth);
}
}
@@ -404,7 +442,6 @@ void qdr_ref_adaptor_init(qdr_core_t *core, void **adaptor_context)
qdr_ref_conn_trace);
*adaptor_context = adaptor;
- // TEMPORARY //
adaptor->startup_timer = qd_timer(core->qd, on_startup, adaptor);
qd_timer_schedule(adaptor->startup_timer, 0);
diff --git a/src/message.c b/src/message.c
index 8b833f3..97bcab2 100644
--- a/src/message.c
+++ b/src/message.c
@@ -2218,10 +2218,11 @@ void qd_message_compose_5(qd_message_t *msg,
}
-void qd_message_extend(qd_message_t *msg, qd_buffer_list_t *buffers)
+int qd_message_extend(qd_message_t *msg, qd_buffer_list_t *buffers)
{
qd_message_content_t *content = MSG_CONTENT(msg);
qd_buffer_t *buf = DEQ_HEAD(*buffers);
+ int count;
LOCK(content->lock);
while (buf) {
@@ -2230,7 +2231,9 @@ void qd_message_extend(qd_message_t *msg, qd_buffer_list_t *buffers)
}
DEQ_APPEND(content->buffers, (*buffers));
+ count = DEQ_SIZE(content->buffers);
UNLOCK(content->lock);
+ return count;
}
diff --git a/src/router_core/modules/streaming_link_scrubber/streaming_link_scrubber.c b/src/router_core/modules/streaming_link_scrubber/streaming_link_scrubber.c
index 1c578ca..126a09b 100644
--- a/src/router_core/modules/streaming_link_scrubber/streaming_link_scrubber.c
+++ b/src/router_core/modules/streaming_link_scrubber/streaming_link_scrubber.c
@@ -184,4 +184,4 @@ static void qcm_streaming_link_scrubber_final_CT(void *module_context)
}
-QDR_CORE_MODULE_DECLARE("streaming_link_scruber", qcm_streaming_link_scrubber_enable_CT, qcm_streaming_link_scrubber_init_CT, qcm_streaming_link_scrubber_final_CT)
+QDR_CORE_MODULE_DECLARE("streaming_link_scrubber", qcm_streaming_link_scrubber_enable_CT, qcm_streaming_link_scrubber_init_CT, qcm_streaming_link_scrubber_final_CT)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org