You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2020/08/04 18:58:14 UTC
[qpid-dispatch] 28/32: Dataplane: Fixed message parsing so it can
handle partial and streaming content.
This is an automated email from the ASF dual-hosted git repository.
tross pushed a commit to branch dev-protocol-adaptors
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
commit df0568a8d4f03d04fe5046ee4159c330bb4291ca
Author: Ted Ross <tr...@apache.org>
AuthorDate: Wed Jul 15 17:25:35 2020 -0400
Dataplane: Fixed message parsing so it can handle partial and streaming content.
---
src/message.c | 256 +++++++++++++++++++++++++++++++-------------------
src/message_private.h | 8 +-
2 files changed, 165 insertions(+), 99 deletions(-)
diff --git a/src/message.c b/src/message.c
index 7f11e5e..c00b909 100644
--- a/src/message.c
+++ b/src/message.c
@@ -122,7 +122,6 @@ static void quote(char* bytes, int n, char **begin, char *end) {
/**
* Populates the buffer with formatted epoch_time
*/
-//static void format_time(pn_timestamp_t epoch_time, char *format, char *buffer, size_t len)
static void format_time(pn_timestamp_t epoch_time, char *format, char *buffer, size_t len)
{
struct timeval local_timeval;
@@ -369,36 +368,59 @@ char* qd_message_repr(qd_message_t *msg, char* buffer, size_t len, qd_log_bits f
return buffer;
}
+
+/**
+ * Return true if there is at least one consumable octet in the buffer chain
+ * starting at *cursor. If the cursor is beyond the end of the buffer, and there
+ * is another buffer in the chain, move the cursor and buffer pointers to reference
+ * the first octet in the next buffer. Note that this movement does NOT constitute
+ * advancement of the cursor in the buffer chain.
+ */
+static bool can_advance(unsigned char **cursor, qd_buffer_t **buffer)
+{
+ if (qd_buffer_cursor(*buffer) > *cursor)
+ return true;
+
+ if (DEQ_NEXT(*buffer)) {
+ *buffer = DEQ_NEXT(*buffer);
+ *cursor = qd_buffer_base(*buffer);
+ }
+
+ return qd_buffer_cursor(*buffer) > *cursor;
+}
+
+
/**
* Advance cursor through buffer chain by 'consume' bytes.
* Cursor and buffer args are advanced to point to new position in buffer chain.
* - if the number of bytes in the buffer chain is less than or equal to
- * the consume number then set *cursor and *buffer to NULL and
- * return the number of missing bytes
+ * the consume number then return false
* - the original buffer chain is not changed or freed.
*
* @param cursor Pointer into current buffer content
* @param buffer pointer to current buffer
* @param consume number of bytes to advance
- * @return 0 if all bytes consumed, != 0 if not enough bytes available
+ * @return true if all bytes consumed, false if not enough bytes available
*/
-static int advance(unsigned char **cursor, qd_buffer_t **buffer, int consume)
+static bool advance(unsigned char **cursor, qd_buffer_t **buffer, int consume)
{
+ if (!can_advance(cursor, buffer))
+ return false;
+
unsigned char *local_cursor = *cursor;
qd_buffer_t *local_buffer = *buffer;
int remaining = qd_buffer_cursor(local_buffer) - local_cursor;
while (consume > 0) {
- if (consume < remaining) {
+ if (consume <= remaining) {
local_cursor += consume;
consume = 0;
} else {
+ if (!local_buffer->next)
+ return false;
+
consume -= remaining;
local_buffer = local_buffer->next;
- if (local_buffer == 0){
- local_cursor = 0;
- break;
- }
local_cursor = qd_buffer_base(local_buffer);
remaining = qd_buffer_size(local_buffer);
}
@@ -407,7 +429,7 @@ static int advance(unsigned char **cursor, qd_buffer_t **buffer, int consume)
*cursor = local_cursor;
*buffer = local_buffer;
- return consume;
+ return true;
}
@@ -457,21 +479,29 @@ static void advance_guarded(unsigned char **cursor, qd_buffer_t **buffer, int co
}
-static unsigned char next_octet(unsigned char **cursor, qd_buffer_t **buffer)
+/**
+ * If there is an octet to be consumed, put it in octet and return true, else return false.
+ */
+static bool next_octet(unsigned char **cursor, qd_buffer_t **buffer, unsigned char *octet)
{
- unsigned char result = **cursor;
- advance(cursor, buffer, 1);
- return result;
+ if (can_advance(cursor, buffer)) {
+ *octet = **cursor;
+ advance(cursor, buffer, 1);
+ return true;
+ }
+ return false;
}
-static int traverse_field(unsigned char **cursor, qd_buffer_t **buffer, qd_field_location_t *field)
+static bool traverse_field(unsigned char **cursor, qd_buffer_t **buffer, qd_field_location_t *field)
{
qd_buffer_t *start_buffer = *buffer;
unsigned char *start_cursor = *cursor;
+ unsigned char tag;
+ unsigned char octet;
- unsigned char tag = next_octet(cursor, buffer);
- if (!(*cursor)) return 0;
+ if (!next_octet(cursor, buffer, &tag))
+ return false;
int consume = 0;
size_t hdr_length = 1;
@@ -500,23 +530,33 @@ static int traverse_field(unsigned char **cursor, qd_buffer_t **buffer, qd_field
case 0xD0 :
case 0xF0 :
hdr_length += 3;
- consume |= ((int) next_octet(cursor, buffer)) << 24;
- if (!(*cursor)) return 0;
- consume |= ((int) next_octet(cursor, buffer)) << 16;
- if (!(*cursor)) return 0;
- consume |= ((int) next_octet(cursor, buffer)) << 8;
- if (!(*cursor)) return 0;
+ if (!next_octet(cursor, buffer, &octet))
+ return false;
+ consume |= ((int) octet) << 24;
+
+ if (!next_octet(cursor, buffer, &octet))
+ return false;
+ consume |= ((int) octet) << 16;
+
+ if (!next_octet(cursor, buffer, &octet))
+ return false;
+ consume |= ((int) octet) << 8;
+
// Fall through to the next case...
case 0xA0 :
case 0xC0 :
case 0xE0 :
hdr_length++;
- consume |= (int) next_octet(cursor, buffer);
- if (!(*cursor)) return 0;
+ if (!next_octet(cursor, buffer, &octet))
+ return false;
+ consume |= (int) octet;
break;
}
+ if (!advance(cursor, buffer, consume))
+ return false;
+
if (field && !field->parsed) {
field->buffer = start_buffer;
field->offset = start_cursor - qd_buffer_base(start_buffer);
@@ -526,48 +566,58 @@ static int traverse_field(unsigned char **cursor, qd_buffer_t **buffer, qd_field
field->tag = tag;
}
- advance(cursor, buffer, consume);
- return 1;
+ return true;
}
-static int start_list(unsigned char **cursor, qd_buffer_t **buffer)
+static int get_list_count(unsigned char **cursor, qd_buffer_t **buffer)
{
- unsigned char tag = next_octet(cursor, buffer);
- if (!(*cursor)) return 0;
- int length = 0;
- int count = 0;
+ unsigned char tag;
+ unsigned char octet;
+
+ if (!next_octet(cursor, buffer, &tag))
+ return 0;
+
+ int count = 0;
switch (tag) {
case 0x45 : // list0
break;
case 0xd0 : // list32
- length |= ((int) next_octet(cursor, buffer)) << 24;
- if (!(*cursor)) return 0;
- length |= ((int) next_octet(cursor, buffer)) << 16;
- if (!(*cursor)) return 0;
- length |= ((int) next_octet(cursor, buffer)) << 8;
- if (!(*cursor)) return 0;
- length |= (int) next_octet(cursor, buffer);
- if (!(*cursor)) return 0;
-
- count |= ((int) next_octet(cursor, buffer)) << 24;
- if (!(*cursor)) return 0;
- count |= ((int) next_octet(cursor, buffer)) << 16;
- if (!(*cursor)) return 0;
- count |= ((int) next_octet(cursor, buffer)) << 8;
- if (!(*cursor)) return 0;
- count |= (int) next_octet(cursor, buffer);
- if (!(*cursor)) return 0;
+ //
+ // Advance past the list length
+ //
+ if (!advance(cursor, buffer, 4))
+ return 0;
+
+ if (!next_octet(cursor, buffer, &octet))
+ return 0;
+ count |= ((int) octet) << 24;
+
+ if (!next_octet(cursor, buffer, &octet))
+ return 0;
+ count |= ((int) octet) << 16;
+
+ if (!next_octet(cursor, buffer, &octet))
+ return 0;
+ count |= ((int) octet) << 8;
+
+ if (!next_octet(cursor, buffer, &octet))
+ return 0;
+ count |= (int) octet;
break;
case 0xc0 : // list8
- length |= (int) next_octet(cursor, buffer);
- if (!(*cursor)) return 0;
+ //
+ // Advance past the list length
+ //
+ if (!advance(cursor, buffer, 1))
+ return 0;
- count |= (int) next_octet(cursor, buffer);
- if (!(*cursor)) return 0;
+ if (!next_octet(cursor, buffer, &octet))
+ return 0;
+ count |= (int) octet;
break;
}
@@ -609,14 +659,13 @@ static qd_section_status_t message_section_check(qd_buffer_t **buffer,
const unsigned char *expected_tags,
qd_field_location_t *location)
{
- qd_buffer_t *test_buffer = *buffer;
- unsigned char *test_cursor = *cursor;
-
- if (!test_cursor)
+ if (!*cursor || !can_advance(cursor, buffer))
return QD_SECTION_NEED_MORE;
+ qd_buffer_t *test_buffer = *buffer;
+ unsigned char *test_cursor = *cursor;
unsigned char *end_of_buffer = qd_buffer_cursor(test_buffer);
- int idx = 0;
+ int idx = 0;
while (idx < pattern_length && *test_cursor == pattern[idx]) {
idx++;
@@ -656,14 +705,19 @@ static qd_section_status_t message_section_check(qd_buffer_t **buffer,
// Check that the full section is present, if so advance the pointers to
// consume the whole section.
//
- int pre_consume = 1; // Count the already extracted tag
+ int pre_consume = 1; // Count the already extracted tag
uint32_t consume = 0;
- unsigned char tag = next_octet(&test_cursor, &test_buffer);
+ unsigned char tag;
+ unsigned char octet;
+
+ if (!next_octet(&test_cursor, &test_buffer, &tag))
+ return QD_SECTION_NEED_MORE;
+
unsigned char tag_subcat = tag & 0xF0;
// if there is no more data the only valid data type is a null type (0x40),
// size is implied as 0
- if (!test_cursor && tag_subcat != 0x40)
+ if (!can_advance(&test_cursor, &test_buffer) && tag_subcat != 0x40)
return QD_SECTION_NEED_MORE;
switch (tag_subcat) {
@@ -680,12 +734,18 @@ static qd_section_status_t message_section_check(qd_buffer_t **buffer,
case 0xF0:
// uint32_t size field:
pre_consume += 3;
- consume |= ((uint32_t) next_octet(&test_cursor, &test_buffer)) << 24;
- if (!test_cursor) return QD_SECTION_NEED_MORE;
- consume |= ((uint32_t) next_octet(&test_cursor, &test_buffer)) << 16;
- if (!test_cursor) return QD_SECTION_NEED_MORE;
- consume |= ((uint32_t) next_octet(&test_cursor, &test_buffer)) << 8;
- if (!test_cursor) return QD_SECTION_NEED_MORE;
+ if (!next_octet(&test_cursor, &test_buffer, &octet))
+ return QD_SECTION_NEED_MORE;
+ consume |= ((uint32_t) octet) << 24;
+
+ if (!next_octet(&test_cursor, &test_buffer, &octet))
+ return QD_SECTION_NEED_MORE;
+ consume |= ((uint32_t) octet) << 16;
+
+ if (!next_octet(&test_cursor, &test_buffer, &octet))
+ return QD_SECTION_NEED_MORE;
+ consume |= ((uint32_t) octet) << 8;
+
// Fall through to the next case...
case 0xA0:
@@ -693,14 +753,15 @@ static qd_section_status_t message_section_check(qd_buffer_t **buffer,
case 0xE0:
// uint8_t size field
pre_consume += 1;
- consume |= (uint32_t) next_octet(&test_cursor, &test_buffer);
- if (!test_cursor && consume > 0) return QD_SECTION_NEED_MORE;
+ if (!next_octet(&test_cursor, &test_buffer, &octet))
+ return QD_SECTION_NEED_MORE;
+ consume |= (uint32_t) octet;
break;
}
location->length = pre_consume + consume;
if (consume) {
- if (advance(&test_cursor, &test_buffer, consume) != 0) {
+ if (!advance(&test_cursor, &test_buffer, consume)) {
return QD_SECTION_NEED_MORE; // whole section not fully received
}
}
@@ -728,7 +789,7 @@ static qd_section_status_t message_section_check(qd_buffer_t **buffer,
start = DEQ_NEXT(start);
}
- location->parsed = 1;
+ location->parsed = 1;
*cursor = test_cursor;
*buffer = test_buffer;
@@ -786,19 +847,19 @@ static qd_field_location_t *qd_message_properties_field(qd_message_t *msg, qd_me
static const intptr_t offsets[] = {
// position of the field's qd_field_location_t in the message content
// object
- (intptr_t) &((qd_message_content_t *)0)->field_message_id,
- (intptr_t) &((qd_message_content_t *)0)->field_user_id,
- (intptr_t) &((qd_message_content_t *)0)->field_to,
- (intptr_t) &((qd_message_content_t *)0)->field_subject,
- (intptr_t) &((qd_message_content_t *)0)->field_reply_to,
- (intptr_t) &((qd_message_content_t *)0)->field_correlation_id,
- (intptr_t) &((qd_message_content_t *)0)->field_content_type,
- (intptr_t) &((qd_message_content_t *)0)->field_content_encoding,
- (intptr_t) &((qd_message_content_t *)0)->field_absolute_expiry_time,
- (intptr_t) &((qd_message_content_t *)0)->field_creation_time,
- (intptr_t) &((qd_message_content_t *)0)->field_group_id,
- (intptr_t) &((qd_message_content_t *)0)->field_group_sequence,
- (intptr_t) &((qd_message_content_t *)0)->field_reply_to_group_id
+ (intptr_t) &((qd_message_content_t*) 0)->field_message_id,
+ (intptr_t) &((qd_message_content_t*) 0)->field_user_id,
+ (intptr_t) &((qd_message_content_t*) 0)->field_to,
+ (intptr_t) &((qd_message_content_t*) 0)->field_subject,
+ (intptr_t) &((qd_message_content_t*) 0)->field_reply_to,
+ (intptr_t) &((qd_message_content_t*) 0)->field_correlation_id,
+ (intptr_t) &((qd_message_content_t*) 0)->field_content_type,
+ (intptr_t) &((qd_message_content_t*) 0)->field_content_encoding,
+ (intptr_t) &((qd_message_content_t*) 0)->field_absolute_expiry_time,
+ (intptr_t) &((qd_message_content_t*) 0)->field_creation_time,
+ (intptr_t) &((qd_message_content_t*) 0)->field_group_id,
+ (intptr_t) &((qd_message_content_t*) 0)->field_group_sequence,
+ (intptr_t) &((qd_message_content_t*) 0)->field_reply_to_group_id
};
// update table above if new fields need to be accessed:
assert(QD_FIELD_MESSAGE_ID <= field && field <= QD_FIELD_REPLY_TO_GROUP_ID);
@@ -810,23 +871,27 @@ static qd_field_location_t *qd_message_properties_field(qd_message_t *msg, qd_me
}
const int index = field - QD_FIELD_MESSAGE_ID;
- qd_field_location_t *const location = (qd_field_location_t *)((char *)content + offsets[index]);
+ qd_field_location_t *const location = (qd_field_location_t*) ((char*) content + offsets[index]);
if (location->parsed)
return location;
// requested field not parsed out. Need to parse out up to the requested field:
qd_buffer_t *buffer = content->section_message_properties.buffer;
unsigned char *cursor = qd_buffer_base(buffer) + content->section_message_properties.offset;
- advance(&cursor, &buffer, content->section_message_properties.hdr_length);
- if (index >= start_list(&cursor, &buffer)) return 0; // properties list too short
+ if (!advance(&cursor, &buffer, content->section_message_properties.hdr_length))
+ return 0;
+ if (index >= get_list_count(&cursor, &buffer))
+ return 0; // properties list too short
int position = 0;
while (position < index) {
- qd_field_location_t *f = (qd_field_location_t *)((char *)content + offsets[position]);
- if (f->parsed)
- advance(&cursor, &buffer, f->hdr_length + f->length);
- else // parse it out
- if (!traverse_field(&cursor, &buffer, f)) return 0;
+ qd_field_location_t *f = (qd_field_location_t*) ((char*) content + offsets[position]);
+ if (f->parsed) {
+ if (!advance(&cursor, &buffer, f->hdr_length + f->length))
+ return 0;
+ } else // parse it out
+ if (!traverse_field(&cursor, &buffer, f))
+ return 0;
position++;
}
@@ -1862,7 +1927,7 @@ static qd_message_depth_status_t message_check_depth_LH(qd_message_content_t *co
return QD_MESSAGE_DEPTH_INCOMPLETE;
// no more data is going to come. OK if at the end and optional:
- if (!content->parse_cursor && optional)
+ if (!can_advance(&content->parse_cursor, &content->parse_buffer) && optional)
return QD_MESSAGE_DEPTH_OK;
// otherwise we've got an invalid (truncated) header
@@ -2063,7 +2128,8 @@ qd_iterator_t *qd_message_field_iterator(qd_message_t *msg, qd_message_field_t f
qd_buffer_t *buffer = loc->buffer;
unsigned char *cursor = qd_buffer_base(loc->buffer) + loc->offset;
- advance(&cursor, &buffer, loc->hdr_length);
+ if (!advance(&cursor, &buffer, loc->hdr_length))
+ return 0;
return qd_iterator_buffer(buffer, cursor - qd_buffer_base(buffer), loc->length, ITER_VIEW_ALL);
}
diff --git a/src/message_private.h b/src/message_private.h
index 3d3c59e..47f8f3e 100644
--- a/src/message_private.h
+++ b/src/message_private.h
@@ -93,11 +93,11 @@ typedef struct {
qd_field_location_t field_group_id;
qd_field_location_t field_group_sequence;
qd_field_location_t field_reply_to_group_id;
-
qd_field_location_t body; // The body of the message
- qd_buffer_t *parse_buffer;
- unsigned char *parse_cursor;
- qd_message_depth_t parse_depth;
+
+ qd_buffer_t *parse_buffer; // Pointer to the buffer where parsing should resume, if needed
+ unsigned char *parse_cursor; // Pointer to octet in parse_buffer where parsing should resume, if needed
+ qd_message_depth_t parse_depth; // The depth to which this message content has been parsed
qd_iterator_t *ma_field_iter_in; // 'message field iterator' for msg.FIELD_MESSAGE_ANNOTATION
qd_iterator_pointer_t ma_user_annotation_blob; // Original user annotations
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org