You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ch...@apache.org on 2017/07/07 20:39:42 UTC
[05/14] qpid-dispatch git commit: DISPATCH-760: Add new annotation
parser
DISPATCH-760: Add new annotation parser
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/b67b2013
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/b67b2013
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/b67b2013
Branch: refs/heads/master
Commit: b67b2013fd6c666c80119704821b22c8ee431257
Parents: 7f650d8
Author: Chuck Rolke <cr...@redhat.com>
Authored: Fri Jun 30 16:57:17 2017 -0400
Committer: Chuck Rolke <cr...@redhat.com>
Committed: Fri Jul 7 10:38:27 2017 -0400
----------------------------------------------------------------------
include/qpid/dispatch/parse.h | 72 +++++++++++
src/iterator.c | 10 ++
src/parse.c | 248 ++++++++++++++++++++++++++++++++++++-
3 files changed, 329 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b67b2013/include/qpid/dispatch/parse.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/parse.h b/include/qpid/dispatch/parse.h
index 454e99f..2273902 100644
--- a/include/qpid/dispatch/parse.h
+++ b/include/qpid/dispatch/parse.h
@@ -33,6 +33,28 @@
*/
typedef struct qd_parsed_field_t qd_parsed_field_t;
+typedef struct qd_parsed_turbo_t qd_parsed_turbo_t;
+
+DEQ_DECLARE(qd_parsed_turbo_t, qd_parsed_turbo_list_t);
+
+/**@file
+ * Parse raw data fields into skeletal AMQP data trees.
+ *
+ * @defgroup parse parse
+ *
+ * Parse data from qd_iterator_t into a tree structure representing
+ * an AMQP data type tree.
+ *@{
+ */
+struct qd_parsed_turbo_t {
+ DEQ_LINKS(qd_parsed_turbo_t);
+ qd_iterator_pointer_t bufptr; // location/size of field in buffer
+ uint8_t tag;
+ uint32_t size;
+ uint32_t count;
+ uint32_t length_of_size;
+ uint32_t length_of_count;
+};
/**
* Parse a field delimited by a field iterator.
@@ -43,6 +65,31 @@ typedef struct qd_parsed_field_t qd_parsed_field_t;
qd_parsed_field_t *qd_parse(qd_iterator_t *iter);
/**
+ * Parse message annotations map from a raw iterator
+ * It's called 'turbo' because it is supposed to be fast.
+ * Distinguish between user annotations and router annotations
+ * Enumerate the user entries count and size.
+ * Return the router entries in a list.
+ *
+ * This function knows a priori:
+ * * the iter is a message annotations map
+ * * the map key prefix is QD_MA_PREFIX
+ * * there are 4 router map annotations at most
+ * * the router annotations are at the end of the map
+ *
+ * @param iter Field iterator for the message annotations map
+ * @param annos returned list of router annotations map entries
+ * @param user_entries number of map user items
+ * @param user_bytes number of map user item bytes
+ * @return 0 if success else pointer to error string
+ */
+const char * qd_parse_turbo(
+ qd_iterator_t *iter,
+ qd_parsed_turbo_list_t *annos,
+ uint32_t *user_entries,
+ uint32_t *user_bytes);
+
+/**
* Free the resources associated with a parsed field.
*
* @param field A field pointer returned by qd_parse.
@@ -229,6 +276,31 @@ int qd_parse_is_scalar(qd_parsed_field_t *field);
*/
qd_parsed_field_t *qd_parse_value_by_key(qd_parsed_field_t *field, const char *key);
+/**
+ * Parse a message annotation map field.
+ * Return parsed fields for the four router entries or null if any is absent
+ * and a blob pointer and count for the user entries in the map which are passed
+ * through as part of the message payload.
+ *
+ * @param is_interrouter true if connection is to a router
+ * @param ma_iter_in Field iterator for the annotation map field being parsed.
+ * @param ma_ingress returned parsed field: ingress
+ * @param ma_phase returned parsed field: phase
+ * @param ma_to_override returned parsed field: override
+ * @param ma_trace returned parsed field: trace
+ * @param blob_pointer returned buffer pointer to user's annotation blob
+ * @param blob_item_count number of map entries referenced by blob_iterator
+ */
+void qd_parse_annotations(
+ bool is_interrouter,
+ qd_iterator_t *ma_iter_in,
+ qd_parsed_field_t **ma_ingress,
+ qd_parsed_field_t **ma_phase,
+ qd_parsed_field_t **ma_to_override,
+ qd_parsed_field_t **ma_trace,
+ qd_iterator_pointer_t *blob_pointer,
+ uint32_t *blob_item_count);
+
///@}
#endif
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b67b2013/src/iterator.c
----------------------------------------------------------------------
diff --git a/src/iterator.c b/src/iterator.c
index c456d16..f25d163 100644
--- a/src/iterator.c
+++ b/src/iterator.c
@@ -899,3 +899,13 @@ bool qd_iterator_next_segment(qd_iterator_t *iter, uint32_t *hash)
return true;
}
+
+
+void qd_iterator_get_view_cursor(
+ const qd_iterator_t *iter,
+ qd_iterator_pointer_t *ptr)
+{
+ ptr->buffer = iter->view_pointer.buffer;
+ ptr->cursor = iter->view_pointer.cursor;
+ ptr->remaining = iter->view_pointer.remaining;
+}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b67b2013/src/parse.c
----------------------------------------------------------------------
diff --git a/src/parse.c b/src/parse.c
index 48c6a90..2f04564 100644
--- a/src/parse.c
+++ b/src/parse.c
@@ -37,6 +37,9 @@ struct qd_parsed_field_t {
ALLOC_DECLARE(qd_parsed_field_t);
ALLOC_DEFINE(qd_parsed_field_t);
+ALLOC_DECLARE(qd_parsed_turbo_t);
+ALLOC_DEFINE(qd_parsed_turbo_t);
+
/**
* size = the number of bytes following the tag
* count = the number of elements. Applies only to compound structures
@@ -169,6 +172,101 @@ qd_parsed_field_t *qd_parse(qd_iterator_t *iter)
}
+const char *qd_parse_turbo(qd_iterator_t *iter,
+ qd_parsed_turbo_list_t *annos,
+ uint32_t *user_entries,
+ uint32_t *user_bytes)
+{
+ if (!iter || !annos || !user_entries || !user_bytes)
+ return "missing argument";
+
+ DEQ_INIT(*annos);
+ *user_entries = 0;
+ *user_bytes = 0;
+
+ // The iter is addressing the message-annotations map.
+ // Open the field describing the map's items
+ uint8_t tag = 0;
+ uint32_t size = 0;
+ uint32_t count = 0;
+ uint32_t length_of_count = 0;
+ uint32_t length_of_size = 0;
+ const char * parse_error = get_type_info(iter, &tag, &size, &count, &length_of_size, &length_of_count);
+
+ if (parse_error)
+ return parse_error;
+
+ if (count == 0)
+ return 0;
+
+ // with four router annotations there will be 8 annos (4 key,val pairs) returned at most
+#define MAX_ALLOCS 8
+ int n_allocs = 0;
+
+ // Do skeletal parse of each map element
+ for (uint32_t idx = 0; idx < count; idx++) {
+ qd_parsed_turbo_t *turbo;
+ if (n_allocs < MAX_ALLOCS) {
+ turbo = new_qd_parsed_turbo_t();
+ n_allocs++;
+
+ } else {
+ // Retire an existing element.
+ // If there are this many in the list then this one cannot be a
+ // router annotation and must be a user annotation.
+ turbo = DEQ_HEAD(*annos);
+ assert(turbo);
+ *user_entries += 1;
+ *user_bytes += sizeof(turbo->tag) + turbo->size + turbo->length_of_size;
+ DEQ_REMOVE_HEAD(*annos);
+ }
+ if (!turbo)
+ return "failed to allocate qd_parsed_turbo_t";
+ ZERO(turbo);
+
+ // Get the buffer pointers for the map element
+ qd_iterator_get_view_cursor(iter, &turbo->bufptr);
+
+ // Get description of the map element
+ parse_error = get_type_info(iter, &turbo->tag, &turbo->size, &turbo->count,
+ &turbo->length_of_size, &turbo->length_of_count);
+ if (parse_error) {
+ return parse_error;
+ }
+
+ // Save parsed element
+ DEQ_INSERT_TAIL(*annos, turbo);
+
+ // Advance map iterator to next map element
+ qd_iterator_advance(iter, turbo->size - turbo->length_of_count);
+ }
+
+ // remove leading annos in the queue if their prefix is not a match and
+ // return them as part of the user annotations
+ for (int idx=0; idx < n_allocs; idx += 2) {
+ qd_parsed_turbo_t *turbo = DEQ_HEAD(*annos);
+ assert(turbo);
+ if (qd_iterator_prefix_ptr(&turbo->bufptr, turbo->length_of_size + 1, QD_MA_PREFIX))
+ break;
+
+ // leading anno is a user annotation map key
+ // remove the key and value from the list and accumulate them as user items
+ *user_bytes += sizeof(turbo->tag) + turbo->size + turbo->length_of_size;
+ DEQ_REMOVE_HEAD(*annos);
+ free_qd_parsed_turbo_t(turbo);
+
+ turbo = DEQ_HEAD(*annos);
+ assert(turbo);
+ *user_bytes += sizeof(turbo->tag) + turbo->size + turbo->length_of_size;
+ DEQ_REMOVE_HEAD(*annos);
+ free_qd_parsed_turbo_t(turbo);
+
+ *user_entries += 2;
+ }
+ return parse_error;
+}
+
+
void qd_parse_free(qd_parsed_field_t *field)
{
if (!field)
@@ -443,9 +541,15 @@ qd_parsed_field_t *qd_parse_sub_value(qd_parsed_field_t *field, uint32_t idx)
}
+int is_tag_a_map(uint8_t tag)
+{
+ return tag == QD_AMQP_MAP8 || tag == QD_AMQP_MAP32;
+}
+
+
int qd_parse_is_map(qd_parsed_field_t *field)
{
- return field->tag == QD_AMQP_MAP8 || field->tag == QD_AMQP_MAP32;
+ return is_tag_a_map(field->tag);
}
@@ -481,3 +585,145 @@ qd_parsed_field_t *qd_parse_value_by_key(qd_parsed_field_t *field, const char *k
return 0;
}
+
+
+const char *qd_parse_annotations_v1(
+ qd_iterator_t *ma_iter_in,
+ qd_parsed_field_t **ma_ingress,
+ qd_parsed_field_t **ma_phase,
+ qd_parsed_field_t **ma_to_override,
+ qd_parsed_field_t **ma_trace,
+ qd_iterator_pointer_t *blob_pointer,
+ uint32_t *blob_item_count)
+{
+ // Do full parse
+ qd_iterator_reset(ma_iter_in);
+
+ qd_parsed_turbo_list_t annos;
+ uint32_t user_entries;
+ uint32_t user_bytes;
+ const char * parse_error = qd_parse_turbo(ma_iter_in, &annos, &user_entries, &user_bytes);
+ if (parse_error) {
+ return parse_error;
+ }
+
+ qd_parsed_turbo_t *anno = DEQ_HEAD(annos);
+ while (anno) {
+ qd_iterator_t *key_iter =
+ qd_iterator_buffer(anno->bufptr.buffer,
+ anno->bufptr.cursor - qd_buffer_base(anno->bufptr.buffer),
+ anno->size,
+ ITER_VIEW_ALL);
+ assert(key_iter);
+
+ qd_parsed_field_t *key_field = qd_parse(key_iter);
+ assert(key_field);
+
+ qd_iterator_t *iter = qd_parse_raw(key_field);
+ assert(iter);
+
+ qd_parsed_turbo_t *anno_val = DEQ_NEXT(anno);
+ assert(anno_val);
+
+ qd_iterator_t *val_iter =
+ qd_iterator_buffer(anno_val->bufptr.buffer,
+ anno_val->bufptr.cursor - qd_buffer_base(anno_val->bufptr.buffer),
+ anno_val->size,
+ ITER_VIEW_ALL);
+ assert(val_iter);
+
+ qd_parsed_field_t *val_field = qd_parse(val_iter);
+ assert(val_field);
+
+ // transfer ownership of the extracted value to the message
+ if (qd_iterator_equal(iter, (unsigned char*) QD_MA_TRACE)) {
+ *ma_trace = val_field;
+ } else if (qd_iterator_equal(iter, (unsigned char*) QD_MA_INGRESS)) {
+ *ma_ingress = val_field;
+ } else if (qd_iterator_equal(iter, (unsigned char*) QD_MA_TO)) {
+ *ma_to_override = val_field;
+ } else if (qd_iterator_equal(iter, (unsigned char*) QD_MA_PHASE)) {
+ *ma_phase = val_field;
+ } else {
+ // TODO: this key had the QD_MA_PREFIX but it does not match
+ // one of the actual fields.
+ }
+
+ qd_iterator_free(key_iter);
+ qd_parse_free(key_field);
+ qd_iterator_free(val_iter);
+ // val_field is handed over the message_private and is freed with the message
+
+ anno = DEQ_NEXT(anno_val);
+ }
+
+ anno = DEQ_HEAD(annos);
+ while (anno) {
+ DEQ_REMOVE_HEAD(annos);
+ free_qd_parsed_turbo_t(anno);
+ anno = DEQ_HEAD(annos);
+ }
+
+ // Adjust size of user annotation blob by the size of the router
+ // annotations
+ blob_pointer->remaining = user_bytes;
+ assert(blob_pointer->remaining >= 0);
+
+ *blob_item_count = user_entries;
+ assert(*blob_item_count >= 0);
+ return 0;
+}
+
+
+void qd_parse_annotations(
+ bool is_interrouter,
+ qd_iterator_t *ma_iter_in,
+ qd_parsed_field_t **ma_ingress,
+ qd_parsed_field_t **ma_phase,
+ qd_parsed_field_t **ma_to_override,
+ qd_parsed_field_t **ma_trace,
+ qd_iterator_pointer_t *blob_pointer,
+ uint32_t *blob_item_count)
+{
+ *ma_ingress = 0;
+ *ma_phase = 0;
+ *ma_to_override = 0;
+ *ma_trace = 0;
+ ZERO(blob_pointer);
+ *blob_item_count = 0;
+
+ if (!ma_iter_in)
+ return;
+
+ uint8_t tag = 0;
+ uint32_t size = 0;
+ uint32_t length_of_count = 0;
+ uint32_t length_of_size = 0;
+
+ const char *parse_error = get_type_info(ma_iter_in, &tag,
+ &size, blob_item_count, &length_of_size,
+ &length_of_count);
+ if (parse_error)
+ return;
+
+ if (!is_tag_a_map(tag)) {
+ return;
+ }
+
+ // Initial snapshot on size/content of annotation payload
+ qd_iterator_t *raw_iter = qd_iterator_sub(ma_iter_in, (size - length_of_count));
+
+ // If there are no router annotations then all annotations
+ // are the user's opaque blob.
+ qd_iterator_get_view_cursor(raw_iter, blob_pointer);
+
+ qd_iterator_free(raw_iter);
+
+ if (is_interrouter) {
+ (void) qd_parse_annotations_v1(ma_iter_in, ma_ingress, ma_phase,
+ ma_to_override, ma_trace,
+ blob_pointer, blob_item_count);
+ }
+
+ return;
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org