You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2020/10/20 15:22:23 UTC

[qpid-dispatch] branch dev-protocol-adaptors-2 updated: DISPATCH-1803: prevent body_data sections from violating Q2 limit

This is an automated email from the ASF dual-hosted git repository.

kgiusti pushed a commit to branch dev-protocol-adaptors-2
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push:
     new 3732ca4  DISPATCH-1803: prevent body_data sections from violating Q2 limit
3732ca4 is described below

commit 3732ca4334c3ba6cfb15870f37af6c1c435bea90
Author: Kenneth Giusti <kg...@apache.org>
AuthorDate: Wed Oct 14 09:20:00 2020 -0400

    DISPATCH-1803: prevent body_data sections from violating Q2 limit
    
    This closes #879
---
 include/qpid/dispatch/message.h    |  14 ++++++
 src/adaptors/http1/http1_client.c  |  12 +----
 src/adaptors/http1/http1_server.c  |  12 +----
 src/adaptors/http2/http2_adaptor.c |   5 +-
 src/adaptors/reference_adaptor.c   |  12 ++---
 src/adaptors/tcp_adaptor.c         |   5 +-
 src/message.c                      |  46 +++++++++++++++++
 tests/message_test.c               | 100 +++++++++++++++++++++++++++++++++++++
 8 files changed, 167 insertions(+), 39 deletions(-)

diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h
index f1f2295..9978d1c 100644
--- a/include/qpid/dispatch/message.h
+++ b/include/qpid/dispatch/message.h
@@ -378,6 +378,20 @@ typedef enum {
 qd_message_body_data_result_t qd_message_next_body_data(qd_message_t *msg, qd_message_body_data_t **body_data);
 
 
+/**
+ * qd_message_body_data_append
+ *
+ * Append the buffers in data as a sequence of one or more BODY_DATA sections
+ * to the given message.  The buffers in data are moved into the message
+ * content by this function.
+ *
+ * @param msg Pointer to message under construction
+ * @param data List of buffers containing body data.
+ * @return The number of buffers stored in the message's content
+ */
+int qd_message_body_data_append(qd_message_t *msg, qd_buffer_list_t *data);
+
+
 /** Put string representation of a message suitable for logging in buffer.
  * @return buffer
  */
diff --git a/src/adaptors/http1/http1_client.c b/src/adaptors/http1/http1_client.c
index 0f508db..585848e 100644
--- a/src/adaptors/http1/http1_client.c
+++ b/src/adaptors/http1/http1_client.c
@@ -772,17 +772,7 @@ static int _client_rx_body_cb(h1_codec_request_state_t *hrs, qd_buffer_list_t *b
            "[C%"PRIu64"][L%"PRIu64"] HTTP request body received len=%zu.",
            hconn->conn_id, hconn->in_link_id, len);
 
-    //
-    // Compose a DATA performative for this section of the stream
-    //
-    qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0);
-    qd_compose_insert_binary_buffers(field, body);
-
-    //
-    // Extend the streaming message and free the composed field
-    //
-    qd_message_extend(msg, field);
-    qd_compose_free(field);
+    qd_message_body_data_append(msg, body);
 
     //
     // Notify the router that more data is ready to be pushed out on the delivery
diff --git a/src/adaptors/http1/http1_server.c b/src/adaptors/http1/http1_server.c
index 9d9cbf5..f6aba2c 100644
--- a/src/adaptors/http1/http1_server.c
+++ b/src/adaptors/http1/http1_server.c
@@ -877,17 +877,7 @@ static int _server_rx_body_cb(h1_codec_request_state_t *hrs, qd_buffer_list_t *b
            "[C%"PRIu64"][L%"PRIu64"] HTTP response body received len=%zu.",
            hconn->conn_id, hconn->in_link_id, len);
 
-    //
-    // Compose a DATA performative for this section of the stream
-    //
-    qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0);
-    qd_compose_insert_binary_buffers(field, body);
-
-    //
-    // Extend the streaming message and free the composed field
-    //
-    qd_message_extend(msg, field);
-    qd_compose_free(field);
+    qd_message_body_data_append(msg, body);
 
     //
     // Notify the router that more data is ready to be pushed out on the delivery
diff --git a/src/adaptors/http2/http2_adaptor.c b/src/adaptors/http2/http2_adaptor.c
index d12427a..4d1cd14 100644
--- a/src/adaptors/http2/http2_adaptor.c
+++ b/src/adaptors/http2/http2_adaptor.c
@@ -388,10 +388,7 @@ static int on_data_chunk_recv_callback(nghttp2_session *session,
     qd_buffer_list_append(&buffers, (uint8_t *)data, len);
 
     if (stream_data->in_dlv) {
-        qd_composed_field_t *body = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0);
-        qd_compose_insert_binary_buffers(body, &buffers);
-        qd_message_extend(stream_data->message, body);
-        qd_compose_free(body);
+        qd_message_body_data_append(stream_data->message, &buffers);
     }
     else {
         if (!stream_data->body) {
diff --git a/src/adaptors/reference_adaptor.c b/src/adaptors/reference_adaptor.c
index 3a89f63..7b9af30 100644
--- a/src/adaptors/reference_adaptor.c
+++ b/src/adaptors/reference_adaptor.c
@@ -462,16 +462,10 @@ static void on_stream(void *context)
         qd_buffer_list_append(&buffer_list, (const uint8_t*) content, content_length);
 
         //
-        // Compose a DATA performative for this section of the stream
+        // append this data to the streaming message as one or more DATA
+        // performatives
         //
-        qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0);
-        qd_compose_insert_binary_buffers(field, &buffer_list);
-
-        //
-        // Extend the streaming message and free the composed field
-        //
-        depth = qd_message_extend(adaptor->streaming_message, field);
-        qd_compose_free(field);
+        depth = qd_message_body_data_append(adaptor->streaming_message, &buffer_list);
 
         //
         // Notify the router that more data is ready to be pushed out on the delivery
diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c
index e0d0d56..0e06059 100644
--- a/src/adaptors/tcp_adaptor.c
+++ b/src/adaptors/tcp_adaptor.c
@@ -139,10 +139,7 @@ static int handle_incoming(qdr_tcp_connection_t *conn)
     grant_read_buffers(conn);
 
     if (conn->instream) {
-        qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0);
-        qd_compose_insert_binary_buffers(field, &buffers);
-        qd_message_extend(qdr_delivery_message(conn->instream), field);
-        qd_compose_free(field);
+        qd_message_body_data_append(qdr_delivery_message(conn->instream), &buffers);
         qdr_delivery_continue(tcp_adaptor->core, conn->instream, false);
         qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] Continuing message with %i bytes", conn->conn_id, conn->incoming_id, count);
     } else {
diff --git a/src/message.c b/src/message.c
index 9f60d1f..23381db 100644
--- a/src/message.c
+++ b/src/message.c
@@ -2720,3 +2720,49 @@ bool qd_message_oversize(const qd_message_t *msg)
     qd_message_content_t * mc = MSG_CONTENT(msg);
     return mc->oversize;
 }
+
+
+int qd_message_body_data_append(qd_message_t *message, qd_buffer_list_t *data)
+{
+    unsigned int        length = DEQ_SIZE(*data);
+    qd_composed_field_t *field = 0;
+    int rc = 0;
+
+    if (length == 0)
+        return rc;
+
+    // DISPATCH-1803: ensure no body data section can violate the Q2 threshold.
+    // This allows the egress router to wait for an entire body data section
+    // to arrive and be validated before sending it out to the endpoint.
+    //
+    while (length > QD_QLIMIT_Q2_LOWER) {
+        qd_buffer_t *buf = DEQ_HEAD(*data);
+        for (int i = 0; i < QD_QLIMIT_Q2_LOWER; ++i) {
+            buf = DEQ_NEXT(buf);
+        }
+
+        // split the list at buf.  buf becomes head of trailing list
+
+        qd_buffer_list_t trailer = DEQ_EMPTY;
+        DEQ_HEAD(trailer) = buf;
+        DEQ_TAIL(trailer) = DEQ_TAIL(*data);
+        DEQ_TAIL(*data) = DEQ_PREV(buf);
+        DEQ_NEXT(DEQ_TAIL(*data)) = 0;
+        DEQ_PREV(buf) = 0;
+        DEQ_SIZE(trailer) = length - QD_QLIMIT_Q2_LOWER;
+        DEQ_SIZE(*data) = QD_QLIMIT_Q2_LOWER;
+
+        field = qd_compose(QD_PERFORMATIVE_BODY_DATA, field);
+        qd_compose_insert_binary_buffers(field, data);
+
+        DEQ_MOVE(trailer, *data);
+        length -= QD_QLIMIT_Q2_LOWER;
+    }
+
+    field = qd_compose(QD_PERFORMATIVE_BODY_DATA, field);
+    qd_compose_insert_binary_buffers(field, data);
+
+    rc = qd_message_extend(message, field);
+    qd_compose_free(field);
+    return rc;
+}
diff --git a/tests/message_test.c b/tests/message_test.c
index 1e33143..87271df 100644
--- a/tests/message_test.c
+++ b/tests/message_test.c
@@ -939,6 +939,105 @@ static char *test_check_body_data(void * context)
 }
 
 
+// Verify that qd_message_body_data_append() will break up a long binary data
+// field in order to avoid triggering Q2
+//
+static char *test_check_body_data_append(void * context)
+{
+    char *result = 0;
+    qd_message_t *msg = 0;
+    qd_message_t *out_msg = 0;
+
+    // generate a buffer list of binary data large enough to trigger Q2
+    //
+    const int buffer_count = (QD_QLIMIT_Q2_UPPER * 3) + 5;
+    qd_buffer_list_t bin_data = DEQ_EMPTY;
+    for (int i = 0; i < buffer_count; ++i) {
+        qd_buffer_t *buffy = qd_buffer();
+        // "fill" the buffer:
+        qd_buffer_insert(buffy, qd_buffer_capacity(buffy));
+        DEQ_INSERT_TAIL(bin_data, buffy);
+    }
+
+    // simulate building a message as an adaptor would:
+    msg = qd_message();
+    qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_HEADER, 0);
+    qd_compose_start_list(field);
+    qd_compose_insert_bool(field, 0);     // durable
+    qd_compose_insert_null(field);        // priority
+    qd_compose_end_list(field);
+    field = qd_compose(QD_PERFORMATIVE_PROPERTIES, field);
+    qd_compose_start_list(field);
+    qd_compose_insert_ulong(field, 666);    // message-id
+    qd_compose_insert_null(field);                 // user-id
+    qd_compose_insert_string(field, "/whereevah"); // to
+    qd_compose_insert_string(field, "my-subject");  // subject
+    qd_compose_insert_string(field, "/reply-to");   // reply-to
+    qd_compose_end_list(field);
+
+    qd_message_compose_2(msg, field, false);
+    qd_compose_free(field);
+    int depth = qd_message_body_data_append(msg, &bin_data);
+    if (depth <= buffer_count) {
+        // expected to add extra buffer(s) for meta-data
+        result = "append length is incorrect";
+        goto exit;
+    }
+    qd_message_set_receive_complete(msg);
+
+    // now validate the message body sections
+
+    out_msg = qd_message_copy(msg);
+    if (qd_message_check_depth(out_msg, QD_DEPTH_BODY) != QD_MESSAGE_DEPTH_OK) {
+        result = "Invalid body depth check";
+        goto exit;
+    }
+
+    int bd_count = 0;
+    int total_buffers = 0;
+    qd_message_body_data_t *body_data = 0;
+    bool done = false;
+    while (!done) {
+        switch (qd_message_next_body_data(msg, &body_data)) {
+        case QD_MESSAGE_BODY_DATA_INCOMPLETE:
+        case QD_MESSAGE_BODY_DATA_INVALID:
+        case QD_MESSAGE_BODY_DATA_NOT_DATA:
+            result = "Next body data failed to get next body data";
+            goto exit;
+        case QD_MESSAGE_BODY_DATA_NO_MORE:
+            done = true;
+            break;
+        case QD_MESSAGE_BODY_DATA_OK:
+            bd_count += 1;
+            // qd_message_body_data_append() breaks the buffer list up into
+            // smaller lists that are no bigger than QD_QLIMIT_Q2_LOWER buffers
+            // long
+            total_buffers += qd_message_body_data_buffer_count(body_data);
+            if (qd_message_body_data_buffer_count(body_data) > QD_QLIMIT_Q2_LOWER) {
+                result = "Body data list length too long!";
+                goto exit;
+            }
+            qd_message_body_data_release(body_data);
+            break;
+        }
+    }
+
+    if (bd_count != (buffer_count / QD_QLIMIT_Q2_LOWER) + 1) {
+        result = "Unexpected count of body data sections!";
+        goto exit;
+    }
+
+    if (total_buffers != buffer_count) {
+        result = "Not all buffers were decoded!";
+    }
+
+exit:
+    qd_message_free(msg);
+    qd_message_free(out_msg);
+    return result;
+}
+
+
 int message_tests(void)
 {
     int result = 0;
@@ -953,6 +1052,7 @@ int message_tests(void)
     TEST_CASE(test_incomplete_annotations, 0);
     TEST_CASE(test_check_weird_messages, 0);
     TEST_CASE(test_check_body_data, 0);
+    TEST_CASE(test_check_body_data_append, 0);
 
     return result;
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org