You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2020/10/20 15:22:23 UTC
[qpid-dispatch] branch dev-protocol-adaptors-2 updated:
DISPATCH-1803: prevent body_data sections from violating Q2 limit
This is an automated email from the ASF dual-hosted git repository.
kgiusti pushed a commit to branch dev-protocol-adaptors-2
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push:
new 3732ca4 DISPATCH-1803: prevent body_data sections from violating Q2 limit
3732ca4 is described below
commit 3732ca4334c3ba6cfb15870f37af6c1c435bea90
Author: Kenneth Giusti <kg...@apache.org>
AuthorDate: Wed Oct 14 09:20:00 2020 -0400
DISPATCH-1803: prevent body_data sections from violating Q2 limit
This closes #879
---
include/qpid/dispatch/message.h | 14 ++++++
src/adaptors/http1/http1_client.c | 12 +----
src/adaptors/http1/http1_server.c | 12 +----
src/adaptors/http2/http2_adaptor.c | 5 +-
src/adaptors/reference_adaptor.c | 12 ++---
src/adaptors/tcp_adaptor.c | 5 +-
src/message.c | 46 +++++++++++++++++
tests/message_test.c | 100 +++++++++++++++++++++++++++++++++++++
8 files changed, 167 insertions(+), 39 deletions(-)
diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h
index f1f2295..9978d1c 100644
--- a/include/qpid/dispatch/message.h
+++ b/include/qpid/dispatch/message.h
@@ -378,6 +378,20 @@ typedef enum {
qd_message_body_data_result_t qd_message_next_body_data(qd_message_t *msg, qd_message_body_data_t **body_data);
+/**
+ * qd_message_body_data_append
+ *
+ * Append the buffers in data as a sequence of one or more BODY_DATA sections
+ * to the given message. The buffers in data are moved into the message
+ * content by this function.
+ *
+ * @param msg Pointer to message under construction
+ * @param data List of buffers containing body data.
+ * @return The number of buffers stored in the message's content
+ */
+int qd_message_body_data_append(qd_message_t *msg, qd_buffer_list_t *data);
+
+
/** Put string representation of a message suitable for logging in buffer.
* @return buffer
*/
diff --git a/src/adaptors/http1/http1_client.c b/src/adaptors/http1/http1_client.c
index 0f508db..585848e 100644
--- a/src/adaptors/http1/http1_client.c
+++ b/src/adaptors/http1/http1_client.c
@@ -772,17 +772,7 @@ static int _client_rx_body_cb(h1_codec_request_state_t *hrs, qd_buffer_list_t *b
"[C%"PRIu64"][L%"PRIu64"] HTTP request body received len=%zu.",
hconn->conn_id, hconn->in_link_id, len);
- //
- // Compose a DATA performative for this section of the stream
- //
- qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0);
- qd_compose_insert_binary_buffers(field, body);
-
- //
- // Extend the streaming message and free the composed field
- //
- qd_message_extend(msg, field);
- qd_compose_free(field);
+ qd_message_body_data_append(msg, body);
//
// Notify the router that more data is ready to be pushed out on the delivery
diff --git a/src/adaptors/http1/http1_server.c b/src/adaptors/http1/http1_server.c
index 9d9cbf5..f6aba2c 100644
--- a/src/adaptors/http1/http1_server.c
+++ b/src/adaptors/http1/http1_server.c
@@ -877,17 +877,7 @@ static int _server_rx_body_cb(h1_codec_request_state_t *hrs, qd_buffer_list_t *b
"[C%"PRIu64"][L%"PRIu64"] HTTP response body received len=%zu.",
hconn->conn_id, hconn->in_link_id, len);
- //
- // Compose a DATA performative for this section of the stream
- //
- qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0);
- qd_compose_insert_binary_buffers(field, body);
-
- //
- // Extend the streaming message and free the composed field
- //
- qd_message_extend(msg, field);
- qd_compose_free(field);
+ qd_message_body_data_append(msg, body);
//
// Notify the router that more data is ready to be pushed out on the delivery
diff --git a/src/adaptors/http2/http2_adaptor.c b/src/adaptors/http2/http2_adaptor.c
index d12427a..4d1cd14 100644
--- a/src/adaptors/http2/http2_adaptor.c
+++ b/src/adaptors/http2/http2_adaptor.c
@@ -388,10 +388,7 @@ static int on_data_chunk_recv_callback(nghttp2_session *session,
qd_buffer_list_append(&buffers, (uint8_t *)data, len);
if (stream_data->in_dlv) {
- qd_composed_field_t *body = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0);
- qd_compose_insert_binary_buffers(body, &buffers);
- qd_message_extend(stream_data->message, body);
- qd_compose_free(body);
+ qd_message_body_data_append(stream_data->message, &buffers);
}
else {
if (!stream_data->body) {
diff --git a/src/adaptors/reference_adaptor.c b/src/adaptors/reference_adaptor.c
index 3a89f63..7b9af30 100644
--- a/src/adaptors/reference_adaptor.c
+++ b/src/adaptors/reference_adaptor.c
@@ -462,16 +462,10 @@ static void on_stream(void *context)
qd_buffer_list_append(&buffer_list, (const uint8_t*) content, content_length);
//
- // Compose a DATA performative for this section of the stream
+ // append this data to the streaming message as one or more DATA
+ // performatives
//
- qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0);
- qd_compose_insert_binary_buffers(field, &buffer_list);
-
- //
- // Extend the streaming message and free the composed field
- //
- depth = qd_message_extend(adaptor->streaming_message, field);
- qd_compose_free(field);
+ depth = qd_message_body_data_append(adaptor->streaming_message, &buffer_list);
//
// Notify the router that more data is ready to be pushed out on the delivery
diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c
index e0d0d56..0e06059 100644
--- a/src/adaptors/tcp_adaptor.c
+++ b/src/adaptors/tcp_adaptor.c
@@ -139,10 +139,7 @@ static int handle_incoming(qdr_tcp_connection_t *conn)
grant_read_buffers(conn);
if (conn->instream) {
- qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0);
- qd_compose_insert_binary_buffers(field, &buffers);
- qd_message_extend(qdr_delivery_message(conn->instream), field);
- qd_compose_free(field);
+ qd_message_body_data_append(qdr_delivery_message(conn->instream), &buffers);
qdr_delivery_continue(tcp_adaptor->core, conn->instream, false);
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] Continuing message with %i bytes", conn->conn_id, conn->incoming_id, count);
} else {
diff --git a/src/message.c b/src/message.c
index 9f60d1f..23381db 100644
--- a/src/message.c
+++ b/src/message.c
@@ -2720,3 +2720,49 @@ bool qd_message_oversize(const qd_message_t *msg)
qd_message_content_t * mc = MSG_CONTENT(msg);
return mc->oversize;
}
+
+
+int qd_message_body_data_append(qd_message_t *message, qd_buffer_list_t *data)
+{
+ unsigned int length = DEQ_SIZE(*data);
+ qd_composed_field_t *field = 0;
+ int rc = 0;
+
+ if (length == 0)
+ return rc;
+
+ // DISPATCH-1803: ensure no body data section can violate the Q2 threshold.
+ // This allows the egress router to wait for an entire body data section
+ // to arrive and be validated before sending it out to the endpoint.
+ //
+ while (length > QD_QLIMIT_Q2_LOWER) {
+ qd_buffer_t *buf = DEQ_HEAD(*data);
+ for (int i = 0; i < QD_QLIMIT_Q2_LOWER; ++i) {
+ buf = DEQ_NEXT(buf);
+ }
+
+ // split the list at buf. buf becomes head of trailing list
+
+ qd_buffer_list_t trailer = DEQ_EMPTY;
+ DEQ_HEAD(trailer) = buf;
+ DEQ_TAIL(trailer) = DEQ_TAIL(*data);
+ DEQ_TAIL(*data) = DEQ_PREV(buf);
+ DEQ_NEXT(DEQ_TAIL(*data)) = 0;
+ DEQ_PREV(buf) = 0;
+ DEQ_SIZE(trailer) = length - QD_QLIMIT_Q2_LOWER;
+ DEQ_SIZE(*data) = QD_QLIMIT_Q2_LOWER;
+
+ field = qd_compose(QD_PERFORMATIVE_BODY_DATA, field);
+ qd_compose_insert_binary_buffers(field, data);
+
+ DEQ_MOVE(trailer, *data);
+ length -= QD_QLIMIT_Q2_LOWER;
+ }
+
+ field = qd_compose(QD_PERFORMATIVE_BODY_DATA, field);
+ qd_compose_insert_binary_buffers(field, data);
+
+ rc = qd_message_extend(message, field);
+ qd_compose_free(field);
+ return rc;
+}
diff --git a/tests/message_test.c b/tests/message_test.c
index 1e33143..87271df 100644
--- a/tests/message_test.c
+++ b/tests/message_test.c
@@ -939,6 +939,105 @@ static char *test_check_body_data(void * context)
}
+// Verify that qd_message_body_data_append() will break up a long binary data
+// field in order to avoid triggering Q2
+//
+static char *test_check_body_data_append(void * context)
+{
+ char *result = 0;
+ qd_message_t *msg = 0;
+ qd_message_t *out_msg = 0;
+
+ // generate a buffer list of binary data large enough to trigger Q2
+ //
+ const int buffer_count = (QD_QLIMIT_Q2_UPPER * 3) + 5;
+ qd_buffer_list_t bin_data = DEQ_EMPTY;
+ for (int i = 0; i < buffer_count; ++i) {
+ qd_buffer_t *buffy = qd_buffer();
+ // "fill" the buffer:
+ qd_buffer_insert(buffy, qd_buffer_capacity(buffy));
+ DEQ_INSERT_TAIL(bin_data, buffy);
+ }
+
+ // simulate building a message as an adaptor would:
+ msg = qd_message();
+ qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_HEADER, 0);
+ qd_compose_start_list(field);
+ qd_compose_insert_bool(field, 0); // durable
+ qd_compose_insert_null(field); // priority
+ qd_compose_end_list(field);
+ field = qd_compose(QD_PERFORMATIVE_PROPERTIES, field);
+ qd_compose_start_list(field);
+ qd_compose_insert_ulong(field, 666); // message-id
+ qd_compose_insert_null(field); // user-id
+ qd_compose_insert_string(field, "/whereevah"); // to
+ qd_compose_insert_string(field, "my-subject"); // subject
+ qd_compose_insert_string(field, "/reply-to"); // reply-to
+ qd_compose_end_list(field);
+
+ qd_message_compose_2(msg, field, false);
+ qd_compose_free(field);
+ int depth = qd_message_body_data_append(msg, &bin_data);
+ if (depth <= buffer_count) {
+ // expected to add extra buffer(s) for meta-data
+ result = "append length is incorrect";
+ goto exit;
+ }
+ qd_message_set_receive_complete(msg);
+
+ // now validate the message body sections
+
+ out_msg = qd_message_copy(msg);
+ if (qd_message_check_depth(out_msg, QD_DEPTH_BODY) != QD_MESSAGE_DEPTH_OK) {
+ result = "Invalid body depth check";
+ goto exit;
+ }
+
+ int bd_count = 0;
+ int total_buffers = 0;
+ qd_message_body_data_t *body_data = 0;
+ bool done = false;
+ while (!done) {
+ switch (qd_message_next_body_data(msg, &body_data)) {
+ case QD_MESSAGE_BODY_DATA_INCOMPLETE:
+ case QD_MESSAGE_BODY_DATA_INVALID:
+ case QD_MESSAGE_BODY_DATA_NOT_DATA:
+ result = "Next body data failed to get next body data";
+ goto exit;
+ case QD_MESSAGE_BODY_DATA_NO_MORE:
+ done = true;
+ break;
+ case QD_MESSAGE_BODY_DATA_OK:
+ bd_count += 1;
+ // qd_message_body_data_append() breaks the buffer list up into
+ // smaller lists that are no bigger than QD_QLIMIT_Q2_LOWER buffers
+ // long
+ total_buffers += qd_message_body_data_buffer_count(body_data);
+ if (qd_message_body_data_buffer_count(body_data) > QD_QLIMIT_Q2_LOWER) {
+ result = "Body data list length too long!";
+ goto exit;
+ }
+ qd_message_body_data_release(body_data);
+ break;
+ }
+ }
+
+ if (bd_count != (buffer_count / QD_QLIMIT_Q2_LOWER) + 1) {
+ result = "Unexpected count of body data sections!";
+ goto exit;
+ }
+
+ if (total_buffers != buffer_count) {
+ result = "Not all buffers were decoded!";
+ }
+
+exit:
+ qd_message_free(msg);
+ qd_message_free(out_msg);
+ return result;
+}
+
+
int message_tests(void)
{
int result = 0;
@@ -953,6 +1052,7 @@ int message_tests(void)
TEST_CASE(test_incomplete_annotations, 0);
TEST_CASE(test_check_weird_messages, 0);
TEST_CASE(test_check_body_data, 0);
+ TEST_CASE(test_check_body_data_append, 0);
return result;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org