You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2020/08/04 18:58:14 UTC

[qpid-dispatch] 28/32: Dataplane: Fixed message parsing so it can handle partial and streaming content.

This is an automated email from the ASF dual-hosted git repository.

tross pushed a commit to branch dev-protocol-adaptors
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit df0568a8d4f03d04fe5046ee4159c330bb4291ca
Author: Ted Ross <tr...@apache.org>
AuthorDate: Wed Jul 15 17:25:35 2020 -0400

    Dataplane: Fixed message parsing so it can handle partial and streaming content.
---
 src/message.c         | 256 +++++++++++++++++++++++++++++++-------------------
 src/message_private.h |   8 +-
 2 files changed, 165 insertions(+), 99 deletions(-)

diff --git a/src/message.c b/src/message.c
index 7f11e5e..c00b909 100644
--- a/src/message.c
+++ b/src/message.c
@@ -122,7 +122,6 @@ static void quote(char* bytes, int n, char **begin, char *end) {
 /**
  * Populates the buffer with formatted epoch_time
  */
-//static void format_time(pn_timestamp_t  epoch_time, char *format, char *buffer, size_t len)
 static void format_time(pn_timestamp_t epoch_time, char *format, char *buffer, size_t len)
 {
     struct timeval local_timeval;
@@ -369,36 +368,59 @@ char* qd_message_repr(qd_message_t *msg, char* buffer, size_t len, qd_log_bits f
     return buffer;
 }
 
+
+/**
+ * Return true if there is at least one consumable octet in the buffer chain
+ * starting at *cursor.  If the cursor is beyond the end of the buffer, and there
+ * is another buffer in the chain, move the cursor and buffer pointers to reference
+ * the first octet in the next buffer.  Note that this movement does NOT constitute
+ * advancement of the cursor in the buffer chain.
+ */
+static bool can_advance(unsigned char **cursor, qd_buffer_t **buffer)
+{
+    if (qd_buffer_cursor(*buffer) > *cursor)
+        return true;
+
+    if (DEQ_NEXT(*buffer)) {
+        *buffer = DEQ_NEXT(*buffer);
+        *cursor = qd_buffer_base(*buffer);
+    }
+
+    return qd_buffer_cursor(*buffer) > *cursor;
+}
+
+
 /**
  * Advance cursor through buffer chain by 'consume' bytes.
  * Cursor and buffer args are advanced to point to new position in buffer chain.
  *  - if the number of bytes in the buffer chain is less than or equal to
- *    the consume number then set *cursor and *buffer to NULL and
- *    return the number of missing bytes
+ *    the consume number then return false
  *  - the original buffer chain is not changed or freed.
  *
  * @param cursor Pointer into current buffer content
  * @param buffer pointer to current buffer
  * @param consume number of bytes to advance
- * @return 0 if all bytes consumed, != 0 if not enough bytes available
+ * @return true if all bytes consumed, false if not enough bytes available
  */
-static int advance(unsigned char **cursor, qd_buffer_t **buffer, int consume)
+static bool advance(unsigned char **cursor, qd_buffer_t **buffer, int consume)
 {
+    if (!can_advance(cursor, buffer))
+        return false;
+
     unsigned char *local_cursor = *cursor;
     qd_buffer_t   *local_buffer = *buffer;
 
     int remaining = qd_buffer_cursor(local_buffer) - local_cursor;
     while (consume > 0) {
-        if (consume < remaining) {
+        if (consume <= remaining) {
             local_cursor += consume;
             consume = 0;
         } else {
+            if (!local_buffer->next)
+                return false;
+
             consume -= remaining;
             local_buffer = local_buffer->next;
-            if (local_buffer == 0){
-                local_cursor = 0;
-                break;
-            }
             local_cursor = qd_buffer_base(local_buffer);
             remaining = qd_buffer_size(local_buffer);
         }
@@ -407,7 +429,7 @@ static int advance(unsigned char **cursor, qd_buffer_t **buffer, int consume)
     *cursor = local_cursor;
     *buffer = local_buffer;
 
-    return consume;
+    return true;
 }
 
 
@@ -457,21 +479,29 @@ static void advance_guarded(unsigned char **cursor, qd_buffer_t **buffer, int co
 }
 
 
-static unsigned char next_octet(unsigned char **cursor, qd_buffer_t **buffer)
+/**
+ * If there is an octet to be consumed, put it in octet and return true, else return false.
+ */
+static bool next_octet(unsigned char **cursor, qd_buffer_t **buffer, unsigned char *octet)
 {
-    unsigned char result = **cursor;
-    advance(cursor, buffer, 1);
-    return result;
+    if (can_advance(cursor, buffer)) {
+        *octet = **cursor;
+        advance(cursor, buffer, 1);
+        return true;
+    }
+    return false;
 }
 
 
-static int traverse_field(unsigned char **cursor, qd_buffer_t **buffer, qd_field_location_t *field)
+static bool traverse_field(unsigned char **cursor, qd_buffer_t **buffer, qd_field_location_t *field)
 {
     qd_buffer_t   *start_buffer = *buffer;
     unsigned char *start_cursor = *cursor;
+    unsigned char  tag;
+    unsigned char  octet;
 
-    unsigned char tag = next_octet(cursor, buffer);
-    if (!(*cursor)) return 0;
+    if (!next_octet(cursor, buffer, &tag))
+        return false;
 
     int    consume    = 0;
     size_t hdr_length = 1;
@@ -500,23 +530,33 @@ static int traverse_field(unsigned char **cursor, qd_buffer_t **buffer, qd_field
     case 0xD0 :
     case 0xF0 :
         hdr_length += 3;
-        consume |= ((int) next_octet(cursor, buffer)) << 24;
-        if (!(*cursor)) return 0;
-        consume |= ((int) next_octet(cursor, buffer)) << 16;
-        if (!(*cursor)) return 0;
-        consume |= ((int) next_octet(cursor, buffer)) << 8;
-        if (!(*cursor)) return 0;
+        if (!next_octet(cursor, buffer, &octet))
+            return false;
+        consume |= ((int) octet) << 24;
+
+        if (!next_octet(cursor, buffer, &octet))
+            return false;
+        consume |= ((int) octet) << 16;
+
+        if (!next_octet(cursor, buffer, &octet))
+            return false;
+        consume |= ((int) octet) << 8;
+
         // Fall through to the next case...
 
     case 0xA0 :
     case 0xC0 :
     case 0xE0 :
         hdr_length++;
-        consume |= (int) next_octet(cursor, buffer);
-        if (!(*cursor)) return 0;
+        if (!next_octet(cursor, buffer, &octet))
+            return false;
+        consume |= (int) octet;
         break;
     }
 
+    if (!advance(cursor, buffer, consume))
+        return false;
+
     if (field && !field->parsed) {
         field->buffer     = start_buffer;
         field->offset     = start_cursor - qd_buffer_base(start_buffer);
@@ -526,48 +566,58 @@ static int traverse_field(unsigned char **cursor, qd_buffer_t **buffer, qd_field
         field->tag        = tag;
     }
 
-    advance(cursor, buffer, consume);
-    return 1;
+    return true;
 }
 
 
-static int start_list(unsigned char **cursor, qd_buffer_t **buffer)
+static int get_list_count(unsigned char **cursor, qd_buffer_t **buffer)
 {
-    unsigned char tag = next_octet(cursor, buffer);
-    if (!(*cursor)) return 0;
-    int length = 0;
-    int count  = 0;
+    unsigned char tag;
+    unsigned char octet;
+
+    if (!next_octet(cursor, buffer, &tag))
+        return 0;
+
+    int count = 0;
 
     switch (tag) {
     case 0x45 :     // list0
         break;
     case 0xd0 :     // list32
-        length |= ((int) next_octet(cursor, buffer)) << 24;
-        if (!(*cursor)) return 0;
-        length |= ((int) next_octet(cursor, buffer)) << 16;
-        if (!(*cursor)) return 0;
-        length |= ((int) next_octet(cursor, buffer)) << 8;
-        if (!(*cursor)) return 0;
-        length |=  (int) next_octet(cursor, buffer);
-        if (!(*cursor)) return 0;
-
-        count |= ((int) next_octet(cursor, buffer)) << 24;
-        if (!(*cursor)) return 0;
-        count |= ((int) next_octet(cursor, buffer)) << 16;
-        if (!(*cursor)) return 0;
-        count |= ((int) next_octet(cursor, buffer)) << 8;
-        if (!(*cursor)) return 0;
-        count |=  (int) next_octet(cursor, buffer);
-        if (!(*cursor)) return 0;
+        //
+        // Advance past the list length
+        //
+        if (!advance(cursor, buffer, 4))
+            return 0;
+
+        if (!next_octet(cursor, buffer, &octet))
+            return 0;
+        count |= ((int) octet) << 24;
+
+        if (!next_octet(cursor, buffer, &octet))
+            return 0;
+        count |= ((int) octet) << 16;
+
+        if (!next_octet(cursor, buffer, &octet))
+            return 0;
+        count |= ((int) octet) << 8;
+
+        if (!next_octet(cursor, buffer, &octet))
+            return 0;
+        count |=  (int) octet;
 
         break;
 
     case 0xc0 :     // list8
-        length |= (int) next_octet(cursor, buffer);
-        if (!(*cursor)) return 0;
+        //
+        // Advance past the list length
+        //
+        if (!advance(cursor, buffer, 1))
+            return 0;
 
-        count |= (int) next_octet(cursor, buffer);
-        if (!(*cursor)) return 0;
+        if (!next_octet(cursor, buffer, &octet))
+            return 0;
+        count |= (int) octet;
         break;
     }
 
@@ -609,14 +659,13 @@ static qd_section_status_t message_section_check(qd_buffer_t         **buffer,
                                                  const unsigned char  *expected_tags,
                                                  qd_field_location_t  *location)
 {
-    qd_buffer_t   *test_buffer = *buffer;
-    unsigned char *test_cursor = *cursor;
-
-    if (!test_cursor)
+    if (!*cursor || !can_advance(cursor, buffer))
         return QD_SECTION_NEED_MORE;
 
+    qd_buffer_t   *test_buffer   = *buffer;
+    unsigned char *test_cursor   = *cursor;
     unsigned char *end_of_buffer = qd_buffer_cursor(test_buffer);
-    int idx = 0;
+    int            idx           = 0;
 
     while (idx < pattern_length && *test_cursor == pattern[idx]) {
         idx++;
@@ -656,14 +705,19 @@ static qd_section_status_t message_section_check(qd_buffer_t         **buffer,
     // Check that the full section is present, if so advance the pointers to
     // consume the whole section.
     //
-    int pre_consume = 1;  // Count the already extracted tag
+    int pre_consume  = 1;  // Count the already extracted tag
     uint32_t consume = 0;
-    unsigned char tag = next_octet(&test_cursor, &test_buffer);
+    unsigned char tag;
+    unsigned char octet;
+
+    if (!next_octet(&test_cursor, &test_buffer, &tag))
+        return QD_SECTION_NEED_MORE;
+
     unsigned char tag_subcat = tag & 0xF0;
 
     // if there is no more data the only valid data type is a null type (0x40),
     // size is implied as 0
-    if (!test_cursor && tag_subcat != 0x40)
+    if (!can_advance(&test_cursor, &test_buffer) && tag_subcat != 0x40)
         return QD_SECTION_NEED_MORE;
 
     switch (tag_subcat) {
@@ -680,12 +734,18 @@ static qd_section_status_t message_section_check(qd_buffer_t         **buffer,
     case 0xF0:
         // uint32_t size field:
         pre_consume += 3;
-        consume |= ((uint32_t) next_octet(&test_cursor, &test_buffer)) << 24;
-        if (!test_cursor) return QD_SECTION_NEED_MORE;
-        consume |= ((uint32_t) next_octet(&test_cursor, &test_buffer)) << 16;
-        if (!test_cursor) return QD_SECTION_NEED_MORE;
-        consume |= ((uint32_t) next_octet(&test_cursor, &test_buffer)) << 8;
-        if (!test_cursor) return QD_SECTION_NEED_MORE;
+        if (!next_octet(&test_cursor, &test_buffer, &octet))
+            return QD_SECTION_NEED_MORE;
+        consume |= ((uint32_t) octet) << 24;
+
+        if (!next_octet(&test_cursor, &test_buffer, &octet))
+            return QD_SECTION_NEED_MORE;
+        consume |= ((uint32_t) octet) << 16;
+
+        if (!next_octet(&test_cursor, &test_buffer, &octet))
+            return QD_SECTION_NEED_MORE;
+        consume |= ((uint32_t) octet) << 8;
+
         // Fall through to the next case...
 
     case 0xA0:
@@ -693,14 +753,15 @@ static qd_section_status_t message_section_check(qd_buffer_t         **buffer,
     case 0xE0:
         // uint8_t size field
         pre_consume += 1;
-        consume |= (uint32_t) next_octet(&test_cursor, &test_buffer);
-        if (!test_cursor && consume > 0) return QD_SECTION_NEED_MORE;
+        if (!next_octet(&test_cursor, &test_buffer, &octet))
+            return QD_SECTION_NEED_MORE;
+        consume |= (uint32_t) octet;
         break;
     }
 
     location->length = pre_consume + consume;
     if (consume) {
-        if (advance(&test_cursor, &test_buffer, consume) != 0) {
+        if (!advance(&test_cursor, &test_buffer, consume)) {
             return QD_SECTION_NEED_MORE;  // whole section not fully received
         }
     }
@@ -728,7 +789,7 @@ static qd_section_status_t message_section_check(qd_buffer_t         **buffer,
         start = DEQ_NEXT(start);
     }
 
-    location->parsed     = 1;
+    location->parsed = 1;
 
     *cursor = test_cursor;
     *buffer = test_buffer;
@@ -786,19 +847,19 @@ static qd_field_location_t *qd_message_properties_field(qd_message_t *msg, qd_me
     static const intptr_t offsets[] = {
         // position of the field's qd_field_location_t in the message content
         // object
-        (intptr_t) &((qd_message_content_t *)0)->field_message_id,
-        (intptr_t) &((qd_message_content_t *)0)->field_user_id,
-        (intptr_t) &((qd_message_content_t *)0)->field_to,
-        (intptr_t) &((qd_message_content_t *)0)->field_subject,
-        (intptr_t) &((qd_message_content_t *)0)->field_reply_to,
-        (intptr_t) &((qd_message_content_t *)0)->field_correlation_id,
-        (intptr_t) &((qd_message_content_t *)0)->field_content_type,
-        (intptr_t) &((qd_message_content_t *)0)->field_content_encoding,
-        (intptr_t) &((qd_message_content_t *)0)->field_absolute_expiry_time,
-        (intptr_t) &((qd_message_content_t *)0)->field_creation_time,
-        (intptr_t) &((qd_message_content_t *)0)->field_group_id,
-        (intptr_t) &((qd_message_content_t *)0)->field_group_sequence,
-        (intptr_t) &((qd_message_content_t *)0)->field_reply_to_group_id
+        (intptr_t) &((qd_message_content_t*) 0)->field_message_id,
+        (intptr_t) &((qd_message_content_t*) 0)->field_user_id,
+        (intptr_t) &((qd_message_content_t*) 0)->field_to,
+        (intptr_t) &((qd_message_content_t*) 0)->field_subject,
+        (intptr_t) &((qd_message_content_t*) 0)->field_reply_to,
+        (intptr_t) &((qd_message_content_t*) 0)->field_correlation_id,
+        (intptr_t) &((qd_message_content_t*) 0)->field_content_type,
+        (intptr_t) &((qd_message_content_t*) 0)->field_content_encoding,
+        (intptr_t) &((qd_message_content_t*) 0)->field_absolute_expiry_time,
+        (intptr_t) &((qd_message_content_t*) 0)->field_creation_time,
+        (intptr_t) &((qd_message_content_t*) 0)->field_group_id,
+        (intptr_t) &((qd_message_content_t*) 0)->field_group_sequence,
+        (intptr_t) &((qd_message_content_t*) 0)->field_reply_to_group_id
     };
     // update table above if new fields need to be accessed:
     assert(QD_FIELD_MESSAGE_ID <= field && field <= QD_FIELD_REPLY_TO_GROUP_ID);
@@ -810,23 +871,27 @@ static qd_field_location_t *qd_message_properties_field(qd_message_t *msg, qd_me
     }
 
     const int index = field - QD_FIELD_MESSAGE_ID;
-    qd_field_location_t *const location = (qd_field_location_t *)((char *)content + offsets[index]);
+    qd_field_location_t *const location = (qd_field_location_t*) ((char*) content + offsets[index]);
     if (location->parsed)
         return location;
 
     // requested field not parsed out.  Need to parse out up to the requested field:
     qd_buffer_t   *buffer = content->section_message_properties.buffer;
     unsigned char *cursor = qd_buffer_base(buffer) + content->section_message_properties.offset;
-    advance(&cursor, &buffer, content->section_message_properties.hdr_length);
-    if (index >= start_list(&cursor, &buffer)) return 0;  // properties list too short
+    if (!advance(&cursor, &buffer, content->section_message_properties.hdr_length))
+        return 0;
+    if (index >= get_list_count(&cursor, &buffer))
+        return 0;  // properties list too short
 
     int position = 0;
     while (position < index) {
-        qd_field_location_t *f = (qd_field_location_t *)((char *)content + offsets[position]);
-        if (f->parsed)
-            advance(&cursor, &buffer, f->hdr_length + f->length);
-        else // parse it out
-            if (!traverse_field(&cursor, &buffer, f)) return 0;
+        qd_field_location_t *f = (qd_field_location_t*) ((char*) content + offsets[position]);
+        if (f->parsed) {
+            if (!advance(&cursor, &buffer, f->hdr_length + f->length))
+                return 0;
+        } else // parse it out
+            if (!traverse_field(&cursor, &buffer, f))
+                return 0;
         position++;
     }
 
@@ -1862,7 +1927,7 @@ static qd_message_depth_status_t message_check_depth_LH(qd_message_content_t *co
             return QD_MESSAGE_DEPTH_INCOMPLETE;
 
         // no more data is going to come. OK if at the end and optional:
-        if (!content->parse_cursor && optional)
+        if (!can_advance(&content->parse_cursor, &content->parse_buffer) && optional)
             return QD_MESSAGE_DEPTH_OK;
 
         // otherwise we've got an invalid (truncated) header
@@ -2063,7 +2128,8 @@ qd_iterator_t *qd_message_field_iterator(qd_message_t *msg, qd_message_field_t f
 
     qd_buffer_t   *buffer = loc->buffer;
     unsigned char *cursor = qd_buffer_base(loc->buffer) + loc->offset;
-    advance(&cursor, &buffer, loc->hdr_length);
+    if (!advance(&cursor, &buffer, loc->hdr_length))
+        return 0;
 
     return qd_iterator_buffer(buffer, cursor - qd_buffer_base(buffer), loc->length, ITER_VIEW_ALL);
 }
diff --git a/src/message_private.h b/src/message_private.h
index 3d3c59e..47f8f3e 100644
--- a/src/message_private.h
+++ b/src/message_private.h
@@ -93,11 +93,11 @@ typedef struct {
     qd_field_location_t  field_group_id;
     qd_field_location_t  field_group_sequence;
     qd_field_location_t  field_reply_to_group_id;
-
     qd_field_location_t  body;                            // The body of the message
-    qd_buffer_t         *parse_buffer;
-    unsigned char       *parse_cursor;
-    qd_message_depth_t   parse_depth;
+
+    qd_buffer_t         *parse_buffer;                    // Pointer to the buffer where parsing should resume, if needed
+    unsigned char       *parse_cursor;                    // Pointer to octet in parse_buffer where parsing should resume, if needed
+    qd_message_depth_t   parse_depth;                     // The depth to which this message content has been parsed
     qd_iterator_t       *ma_field_iter_in;                // 'message field iterator' for msg.FIELD_MESSAGE_ANNOTATION
 
     qd_iterator_pointer_t ma_user_annotation_blob;        // Original user annotations


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org