You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ch...@apache.org on 2020/10/13 13:31:02 UTC

[qpid-dispatch] branch dev-protocol-adaptors updated: DISPATCH-1799: Add unit test for message body_data functions

This is an automated email from the ASF dual-hosted git repository.

chug pushed a commit to branch dev-protocol-adaptors
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/dev-protocol-adaptors by this push:
     new 25ff354  DISPATCH-1799: Add unit test for message body_data functions
25ff354 is described below

commit 25ff3545d66d34cefafbe330194013ff53c20341
Author: Chuck Rolke <ch...@apache.org>
AuthorDate: Tue Oct 13 09:23:47 2020 -0400

    DISPATCH-1799: Add unit test for message body_data functions
    
    Passes messages with varying vbin sizes and vbin segment counts.
    The unit test framework then tests each combination with varying
    qd_buffer sizes.
    
    This closes #874
---
 tests/message_test.c | 238 ++++++++++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 237 insertions(+), 1 deletion(-)

diff --git a/tests/message_test.c b/tests/message_test.c
index 2966765..135adc8 100644
--- a/tests/message_test.c
+++ b/tests/message_test.c
@@ -24,8 +24,10 @@
 #include <qpid/dispatch/iterator.h>
 #include <qpid/dispatch/amqp.h>
 #include <proton/message.h>
+#include <proton/raw_connection.h>
 
-static unsigned char buffer[10000];
+#define FLAT_BUF_SIZE (100000)
+static unsigned char buffer[FLAT_BUF_SIZE];
 
 static size_t flatten_bufs(qd_message_content_t *content)
 {
@@ -703,6 +705,239 @@ exit:
     return result;
 }
 
+//
+// Testing protocol adapter 'body_data' interfaces
+//
+
+static void body_data_generate_message(qd_message_t *msg, char *s_chunk_size, char *s_n_chunks)
+{
+    // Fill a message with n_chunks of vbin chunk_size body data.
+
+    int   chunk_size = atoi(s_chunk_size);
+    int   n_chunks   = atoi(s_n_chunks);
+
+    // Add message headers
+    qd_message_compose_1(msg, "whom-it-may-concern", 0);
+
+    // Add the chunks. This creates the test state for not-flattened buffers.
+    for (int j=0; j<n_chunks; j++) {
+        // Create 'buf2' as a linear buffer of the raw data to be sent.
+        // Buffer filled with chunk index + 1.
+        unsigned char *buf2 = (unsigned char *)malloc(chunk_size);
+        memset(buf2, j+1, chunk_size);
+
+        // Use 'set_content' to convert raw buffer 'buf2'
+        // into a buffer list in message 'mule'.
+        qd_message_t *mule = qd_message();
+        qd_message_content_t *mule_content = MSG_CONTENT(mule);
+        set_content(mule_content, buf2, chunk_size);
+
+        // Extend message 'msg' with the buffer list in 'mule'
+        // and wrap the addition in a BODY_DATA performative.
+        // After this the content buffer list in 'mule' is empty and
+        // the buffers in 'field' are inserted into message 'msg'.
+        qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0);
+        qd_compose_insert_binary_buffers(field, &mule_content->buffers);
+        qd_message_extend(msg, field);
+
+        // Clean up temporary resources
+        free(buf2);
+        qd_compose_free(field);
+        qd_message_free(mule);
+    }
+}
+
+static void free_body_data_list(qd_message_t *msg_in)
+{
+    // DISPATCH-1800 - this should not be required here
+    qd_message_pvt_t *msg = (qd_message_pvt_t *)msg_in;
+    qd_message_body_data_t *bd = DEQ_HEAD(msg->body_data_list);
+    while (bd) {
+        qd_message_body_data_t *next = DEQ_NEXT(bd);
+        free_qd_message_body_data_t(bd);
+        bd = next;
+    }
+
+}
+
+static char *check_body_data(char *s_chunk_size, char *s_n_chunks, bool flatten)
+{
+    // Fill a message with n chunks of vbin chunk_size body data.
+    // Then test by retrieving n chunks from a message copy and verifing.
+    //
+    // 'flatten' messes with message buffers after they have been composed.
+    // * Not flattened means that vbin headers stand alone in separate buffers and
+    //   vbin data always starts in the first byte of a new buffer. This is the
+    //   buffer condition when a message is forwarded between adaptors on a single
+    //   router. The receiver and sender have two messages but share message content.
+    // * Flattened means that vbin headers and vbin data are packed into the buffer
+    //   list. This is the buffer condition when a message is forwarded between
+    //   routers and the receiver is handling the vbin segments.
+
+    int   chunk_size = atoi(s_chunk_size);
+    int   n_chunks   = atoi(s_n_chunks);
+
+    char *result     = 0;
+    int   received;     // got this much of chunk_size chunk
+
+    // Messages for setting/sensing body data
+    qd_message_t         *msg     = qd_message();
+    qd_message_t         *copy    = qd_message_copy(msg);
+    qd_message_pvt_t     *msg_pvt = (qd_message_pvt_t *)msg;
+
+    // Set the original message content
+    body_data_generate_message(msg, s_chunk_size, s_n_chunks);
+
+    // flatten if required
+    if (flatten) {
+        // check that the flatten buffer is big enough
+        int vbin_size = chunk_size > 511 ? 8 : 5;  // per-chunk vbin descriptor overhead
+        int header_size = 100;                     // leave plenty of allocaton for header
+        int msg_size = n_chunks * (chunk_size + vbin_size) + header_size;
+        assert(msg_size < FLAT_BUF_SIZE);
+
+        // compress message into flatten buffer
+        size_t flat_size = flatten_bufs(MSG_CONTENT(msg));
+
+        // erase buffer list in msg and copy
+        qd_buffer_list_free_buffers(&msg_pvt->content->buffers);
+
+        // reconstruct buffer list from flat buffer
+        qd_buffer_list_append(&msg_pvt->content->buffers, buffer, flat_size);
+    }
+
+    // check the chunks
+    // Define the number of raw buffers to be extracted on each loop
+#define N_PN_RAW_BUFFS (2)
+
+    qd_message_body_data_t *body_data;
+
+    for (int j=0; j<n_chunks; j++) {
+        received = 0; // this chunk received size in bytes.
+
+        // Set up the next_body_data snapshot
+        qd_message_body_data_result_t body_data_result = qd_message_next_body_data(copy, &body_data);
+
+        if (body_data_result == QD_MESSAGE_BODY_DATA_OK) {
+            // check body_data payload length
+            if (body_data->payload.length != chunk_size) {
+                printf("********** check_body_data: BUFFER_SIZE=%zu, pn-buf-array-size:%d, "
+                    "chunk_size:%s, n_chunks:%s, payload length error : %zu \n",
+                    BUFFER_SIZE, N_PN_RAW_BUFFS, s_chunk_size, s_n_chunks, body_data->payload.length);
+                fflush(stdout);
+                result = "qd_message_next_body_data returned wrong payload length.";
+                break;
+            }
+
+            // Loop to extract the body data
+            //  * verify content
+            //  * verify body data length
+
+            // buffs        - body data is extracted through this array of raw buffers
+            pn_raw_buffer_t buffs[N_PN_RAW_BUFFS];
+
+            // used_buffers - Number of qd_buffers in content buffer chain consumed so far.
+            //                This number must increase as dictated by qd_message_body_data_buffers()
+            //                when vbin segments are consumed from the current body_data chunk.
+            //                A single vbin segment may consume 0, 1, or many qd_buffers.
+            size_t used_buffers = 0;
+
+            while (received < chunk_size) {
+                ZERO(buffs);
+                size_t n_used = qd_message_body_data_buffers(body_data, buffs, used_buffers, N_PN_RAW_BUFFS);
+                if (n_used > 0) {
+                    for (size_t ii=0; ii<n_used; ii++) {
+                        char e_char = (char)(j + 1);   // expected char in payload
+                        // Verify the content of the bufffer
+                        for (uint32_t idx=0; idx < buffs[ii].size; idx++) {
+                            char actual = buffs[ii].bytes[buffs[ii].offset + idx];
+                            if (e_char != actual) {
+                                printf("********** check_body_data: BUFFER_SIZE=%zu, pn-buf-array-size:%d, "
+                                    "chunk_size:%s, n_chunks:%s, verify error at index %d, expected:%d, actual:%d \n",
+                                    BUFFER_SIZE, N_PN_RAW_BUFFS, s_chunk_size, s_n_chunks, received + idx, e_char,
+                                    actual);
+                                fflush(stdout);
+                                result = "verify error";
+                            }
+                        }
+                        received += buffs[ii].size;
+                    }
+                    used_buffers += n_used;
+                    if (!!result) break;
+                } else {
+                    printf("********** check_body_data: BUFFER_SIZE=%zu, pn-buf-array-size:%d, "
+                        "chunk_size:%s, n_chunks:%s, received %d bytes (not enough) \n",
+                        BUFFER_SIZE, N_PN_RAW_BUFFS, s_chunk_size, s_n_chunks, received);
+                    fflush(stdout);
+                    result = "Did not receive enough data";
+                    break;
+                }
+                if (received > chunk_size) {
+                    printf("********** check_body_data: BUFFER_SIZE=%zu, pn-buf-array-size:%d, "
+                        "chunk_size:%s, n_chunks:%s, received %d bytes (too many) \n",
+                        BUFFER_SIZE, N_PN_RAW_BUFFS, s_chunk_size, s_n_chunks, received);
+                    result = "Received too much data";
+                    break;
+                }
+            }
+            // successful check
+
+        } else if (body_data_result == QD_MESSAGE_BODY_DATA_INCOMPLETE) {
+            result = "DATA_INCOMPLETE"; break;
+        } else {
+            switch (body_data_result) {
+            case QD_MESSAGE_BODY_DATA_NO_MORE:
+                result = "EOS"; break;
+            case QD_MESSAGE_BODY_DATA_INVALID:
+                result = "Invalid body data for streaming message"; break;
+            case QD_MESSAGE_BODY_DATA_NOT_DATA:
+                result = "Invalid body; expected data section"; break;
+            default:
+                result = "result: default"; break;
+            }
+        }
+    }
+
+    free_body_data_list(msg);
+    qd_message_free(msg);
+    if (!!copy) {
+        free_body_data_list(copy);
+        qd_message_free(copy);
+    }
+    return result;
+}
+
+static char *test_check_body_data(void * context)
+{
+    char *result = 0;
+
+#define N_CHUNK_SIZES (10)
+    char *chunk_sizes[N_CHUNK_SIZES] = {"1", "10", "100", "510", "511", "512", "513", "1023", "1024", "1025"};
+
+#define N_N_CHUNKS (4)
+    char *n_chunks[N_N_CHUNKS]       = {"1", "2", "10", "25"};
+
+    for (int i=0; i<N_CHUNK_SIZES; i++) {
+        for (int j=0; j<N_N_CHUNKS; j++) {
+            result = check_body_data(chunk_sizes[i], n_chunks[j], false);
+            if (!!result) {
+                printf("test_check_body_data: chunk_size:%s, n_chunks:%s, flatten:%s, result:%s   \n",
+                       chunk_sizes[i], n_chunks[j], "false", result);
+                fflush(stdout);
+                return result;
+            }
+            result = check_body_data(chunk_sizes[i], n_chunks[j], true);
+            if (!!result) {
+                printf("test_check_body_data: chunk_size:%s, n_chunks:%s, flatten:%s, result:%s   \n",
+                       chunk_sizes[i], n_chunks[j], "true", result);
+                fflush(stdout);
+                return result;
+            }
+        }
+    }
+    return result;
+}
+
 
 int message_tests(void)
 {
@@ -717,6 +952,7 @@ int message_tests(void)
     TEST_CASE(test_q2_input_holdoff_sensing, 0);
     TEST_CASE(test_incomplete_annotations, 0);
     TEST_CASE(test_check_weird_messages, 0);
+    TEST_CASE(test_check_body_data, 0);
 
     return result;
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org