You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2020/08/07 21:34:55 UTC
[qpid-dispatch] 01/02: DISPATCH-1742 - Completed implementation of
outbound streaming path
This is an automated email from the ASF dual-hosted git repository.
tross pushed a commit to branch dev-protocol-adaptors
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
commit 5d021e224119f78ce043d7c1fc4d94b76f49e7c0
Author: Ted Ross <tr...@apache.org>
AuthorDate: Fri Aug 7 17:30:34 2020 -0400
DISPATCH-1742 - Completed implementation of outbound streaming path
---
src/adaptors/reference_adaptor.c | 89 ++++++++++++++++++++++-----------
src/message.c | 103 ++++++++++++++++++++++++++++++++++-----
src/message_private.h | 1 +
3 files changed, 154 insertions(+), 39 deletions(-)
diff --git a/src/adaptors/reference_adaptor.c b/src/adaptors/reference_adaptor.c
index b6b0c1b..3c05bd8 100644
--- a/src/adaptors/reference_adaptor.c
+++ b/src/adaptors/reference_adaptor.c
@@ -247,39 +247,72 @@ static uint64_t qdr_ref_deliver(void *context, qdr_link_t *link, qdr_delivery_t
switch (status) {
case QD_MESSAGE_DEPTH_OK: {
+ //
+ // At least one complete body performative has arrived. It is now safe to switch
+ // over to the per-message extraction of body-data segments.
+ //
printf("qdr_ref_deliver: depth ok\n");
qd_message_body_data_t *body_data;
qd_message_body_data_result_t body_data_result;
- body_data_result = qd_message_next_body_data(msg, &body_data);
-
- switch (body_data_result) {
- case QD_MESSAGE_BODY_DATA_OK: {
- qd_iterator_t *body_iter = qd_message_body_data_iterator(body_data);
- char *body = (char*) qd_iterator_copy(body_iter);
- printf("qdr_ref_deliver: message body-data received: %s\n", body);
- free(body);
- qd_iterator_free(body_iter);
- break;
- }
+
+ //
+ // Process as many body-data segments as are available.
+ //
+ while (true) {
+ body_data_result = qd_message_next_body_data(msg, &body_data);
+
+ switch (body_data_result) {
+ case QD_MESSAGE_BODY_DATA_OK: {
+ //
+ // We have a new valid body-data segment. Handle it
+ //
+ printf("qdr_ref_deliver: body_data_buffer_count: %d\n", qd_message_body_data_buffer_count(body_data));
+
+ qd_iterator_t *body_iter = qd_message_body_data_iterator(body_data);
+ char *body = (char*) qd_iterator_copy(body_iter);
+ printf("qdr_ref_deliver: message body-data received: %s\n", body);
+ free(body);
+ qd_iterator_free(body_iter);
+ qd_message_body_data_release(body_data);
+ break;
+ }
- case QD_MESSAGE_BODY_DATA_INCOMPLETE:
- printf("qdr_ref_deliver: body-data incomplete\n");
- break;
-
- case QD_MESSAGE_BODY_DATA_NO_MORE:
- qd_message_set_send_complete(msg);
- qdr_link_flow(adaptor->core, link, 1, false);
- return PN_ACCEPTED; // This will cause the delivery to be settled
+ case QD_MESSAGE_BODY_DATA_INCOMPLETE:
+ //
+ // A new segment has not completely arrived yet. Check again later.
+ //
+ printf("qdr_ref_deliver: body-data incomplete\n");
+ return 0;
+
+ case QD_MESSAGE_BODY_DATA_NO_MORE:
+ //
+ // We have already handled the last body-data segment for this delivery.
+ // Complete the "sending" of this delivery and replenish credit.
+ //
+ // Note that depending on the adaptor, it might be desirable to delay the
+ // acceptance and settlement of this delivery until a later event (i.e. when
+ // a requested action has completed).
+ //
+ qd_message_set_send_complete(msg);
+ qdr_link_flow(adaptor->core, link, 1, false);
+ return PN_ACCEPTED; // This will cause the delivery to be settled
- case QD_MESSAGE_BODY_DATA_INVALID:
- printf("qdr_ref_deliver: body-data invalid\n");
- qdr_link_flow(adaptor->core, link, 1, false);
- return PN_REJECTED;
-
- case QD_MESSAGE_BODY_DATA_NOT_DATA:
- printf("qdr_ref_deliver: body not data\n");
- qdr_link_flow(adaptor->core, link, 1, false);
- return PN_REJECTED;
+ case QD_MESSAGE_BODY_DATA_INVALID:
+ //
+ // The body-data is corrupt in some way. Stop handling the delivery and reject it.
+ //
+ printf("qdr_ref_deliver: body-data invalid\n");
+ qdr_link_flow(adaptor->core, link, 1, false);
+ return PN_REJECTED;
+
+ case QD_MESSAGE_BODY_DATA_NOT_DATA:
+ //
+ // Valid data was seen, but it is not a body-data performative. Reject the delivery.
+ //
+ printf("qdr_ref_deliver: body not data\n");
+ qdr_link_flow(adaptor->core, link, 1, false);
+ return PN_REJECTED;
+ }
}
break;
diff --git a/src/message.c b/src/message.c
index 24aa928..69312f3 100644
--- a/src/message.c
+++ b/src/message.c
@@ -658,7 +658,8 @@ static qd_section_status_t message_section_check(qd_buffer_t **buffer,
const unsigned char *pattern,
int pattern_length,
const unsigned char *expected_tags,
- qd_field_location_t *location)
+ qd_field_location_t *location,
+ bool dup_ok)
{
if (!*cursor || !can_advance(cursor, buffer))
return QD_SECTION_NEED_MORE;
@@ -691,7 +692,7 @@ static qd_section_status_t message_section_check(qd_buffer_t **buffer,
if (*expected_tags == 0)
return QD_SECTION_INVALID; // Error: Unexpected tag
- if (location->parsed)
+ if (location->parsed && !dup_ok)
return QD_SECTION_INVALID; // Error: Duplicate section
//
@@ -1916,9 +1917,9 @@ static qd_message_depth_status_t message_check_depth_LH(qd_message_content_t *co
return QD_MESSAGE_DEPTH_OK;
qd_section_status_t rc;
- rc = message_section_check(&content->parse_buffer, &content->parse_cursor, short_pattern, SHORT, expected_tags, location);
+ rc = message_section_check(&content->parse_buffer, &content->parse_cursor, short_pattern, SHORT, expected_tags, location, false);
if (rc == QD_SECTION_NO_MATCH) // try the alternative
- rc = message_section_check(&content->parse_buffer, &content->parse_cursor, long_pattern, LONG, expected_tags, location);
+ rc = message_section_check(&content->parse_buffer, &content->parse_cursor, long_pattern, LONG, expected_tags, location, false);
if (rc == QD_SECTION_MATCH || (optional && rc == QD_SECTION_NO_MATCH)) {
content->parse_depth = depth;
@@ -2302,8 +2303,49 @@ int qd_message_extend(qd_message_t *msg, qd_composed_field_t *field)
*/
static void find_last_buffer(qd_field_location_t *location, unsigned char **cursor, qd_buffer_t **buffer)
{
- //qd_buffer_t *buf = location->buffer;
-
+ qd_buffer_t *buf = location->buffer;
+ size_t remaining = location->hdr_length + location->length;
+
+ while (!!buf && remaining > 0) {
+ size_t this_buf_size = qd_buffer_size(buf) - (buf == location->buffer ? location->offset : 0);
+ if (remaining <= this_buf_size) {
+ *buffer = buf;
+ *cursor = qd_buffer_base(buf) + (buf == location->buffer ? location->offset : 0) + remaining;
+ return;
+ }
+ remaining -= this_buf_size;
+ buf = DEQ_NEXT(buf);
+ }
+
+ assert(false); // The field should already have been validated as complete.
+}
+
+
+void trim_body_data_headers(qd_message_body_data_t *body_data)
+{
+ const qd_field_location_t *location = &body_data->section;
+ qd_buffer_t *buffer = location->buffer;
+ unsigned char *cursor = qd_buffer_base(buffer) + location->offset;
+
+ bool good = advance(&cursor, &buffer, location->hdr_length);
+ assert(good);
+ if (good) {
+ unsigned char tag = 0;
+ next_octet(&cursor, &buffer, &tag);
+ if (tag == QD_AMQP_VBIN8)
+ advance(&cursor, &buffer, 1);
+ else if (tag == QD_AMQP_VBIN32)
+ advance(&cursor, &buffer, 4);
+
+ can_advance(&cursor, &buffer); // bump cursor to the next buffer if necessary
+
+ body_data->payload.buffer = buffer;
+ body_data->payload.offset = cursor - qd_buffer_base(buffer);
+ body_data->payload.length = location->length;
+ body_data->payload.hdr_length = 0;
+ body_data->payload.parsed = true;
+ body_data->payload.tag = tag;
+ }
}
@@ -2317,18 +2359,27 @@ static void find_last_buffer(qd_field_location_t *location, unsigned char **curs
*/
qd_iterator_t *qd_message_body_data_iterator(const qd_message_body_data_t *body_data)
{
- return 0;
+ const qd_field_location_t *location = &body_data->payload;
+
+ return qd_iterator_buffer(location->buffer, location->offset, location->length, ITER_VIEW_ALL);
}
/**
* qd_message_body_data_buffer_count
*
- * Return the number of buffers contained in the body_data object.
+ * Return the number of buffers contained in payload portion of the body_data object.
*/
int qd_message_body_data_buffer_count(const qd_message_body_data_t *body_data)
{
- return 0;
+ int count = 1;
+ qd_buffer_t *buffer = body_data->payload.buffer;
+ while (!!buffer && buffer != body_data->last_buffer) {
+ buffer = DEQ_NEXT(buffer);
+ count++;
+ }
+
+ return count;
}
@@ -2341,7 +2392,34 @@ int qd_message_body_data_buffer_count(const qd_message_body_data_t *body_data)
*/
int qd_message_body_data_buffers(qd_message_body_data_t *body_data, pn_raw_buffer_t *buffers, int offset, int count)
{
- return 0;
+ int actual_count = 0;
+ qd_buffer_t *buffer = body_data->payload.buffer;
+
+ //
+ // Skip the offset
+ //
+ while (offset > 0 && !!buffer) {
+ buffer = DEQ_NEXT(buffer);
+ offset--;
+ }
+
+ //
+ // Fill the buffer array
+ //
+ int idx = 0;
+ while (idx < count && !!buffer) {
+ buffers[idx].context = 0;
+ buffers[idx].bytes = (char*) qd_buffer_base(buffer) + (buffer == body_data->payload.buffer ? body_data->payload.offset : 0);
+ buffers[idx].capacity = BUFFER_SIZE;
+ buffers[idx].size = qd_buffer_size(buffer) - (buffer == body_data->payload.buffer ? body_data->payload.offset : 0);
+ buffers[idx].offset = 0;
+
+ buffer = DEQ_NEXT(buffer);
+ actual_count++;
+ idx++;
+ }
+
+ return actual_count;
}
@@ -2353,6 +2431,7 @@ int qd_message_body_data_buffers(qd_message_body_data_t *body_data, pn_raw_buffe
*/
void qd_message_body_data_release(qd_message_body_data_t *body_data)
{
+ free_qd_message_body_data_t(body_data);
}
@@ -2374,6 +2453,7 @@ qd_message_body_data_result_t qd_message_next_body_data(qd_message_t *in_msg, qd
find_last_buffer(&body_data->section, &msg->body_cursor, &msg->body_buffer);
body_data->last_buffer = msg->body_buffer;
+ trim_body_data_headers(body_data);
assert(DEQ_SIZE(msg->body_data_list) == 0);
DEQ_INSERT_TAIL(msg->body_data_list, body_data);
@@ -2390,7 +2470,7 @@ qd_message_body_data_result_t qd_message_next_body_data(qd_message_t *in_msg, qd
section_status = message_section_check(&msg->body_buffer, &msg->body_cursor,
BODY_DATA_SHORT, 3, TAGS_BINARY,
- &location);
+ &location, true);
switch (section_status) {
case QD_SECTION_INVALID:
@@ -2404,6 +2484,7 @@ qd_message_body_data_result_t qd_message_next_body_data(qd_message_t *in_msg, qd
body_data->section = location;
find_last_buffer(&body_data->section, &msg->body_cursor, &msg->body_buffer);
body_data->last_buffer = msg->body_buffer;
+ trim_body_data_headers(body_data);
DEQ_INSERT_TAIL(msg->body_data_list, body_data);
*out_body_data = body_data;
return QD_MESSAGE_BODY_DATA_OK;
diff --git a/src/message_private.h b/src/message_private.h
index 6dc901a..20f4ce4 100644
--- a/src/message_private.h
+++ b/src/message_private.h
@@ -65,6 +65,7 @@ struct qd_message_body_data_t {
DEQ_LINKS(qd_message_body_data_t); // Linkage to form a DEQ
qd_message_pvt_t *owning_message; // Pointer to the owning message
qd_field_location_t section; // Section descriptor for the field
+ qd_field_location_t payload; // Descriptor for the payload of the body data
qd_buffer_t *last_buffer; // Pointer to the last buffer in the field
};
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org