You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ch...@apache.org on 2017/07/07 20:39:42 UTC

[05/14] qpid-dispatch git commit: DISPATCH-760: Add new annotation parser

DISPATCH-760: Add new annotation parser


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/b67b2013
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/b67b2013
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/b67b2013

Branch: refs/heads/master
Commit: b67b2013fd6c666c80119704821b22c8ee431257
Parents: 7f650d8
Author: Chuck Rolke <cr...@redhat.com>
Authored: Fri Jun 30 16:57:17 2017 -0400
Committer: Chuck Rolke <cr...@redhat.com>
Committed: Fri Jul 7 10:38:27 2017 -0400

----------------------------------------------------------------------
 include/qpid/dispatch/parse.h |  72 +++++++++++
 src/iterator.c                |  10 ++
 src/parse.c                   | 248 ++++++++++++++++++++++++++++++++++++-
 3 files changed, 329 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b67b2013/include/qpid/dispatch/parse.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/parse.h b/include/qpid/dispatch/parse.h
index 454e99f..2273902 100644
--- a/include/qpid/dispatch/parse.h
+++ b/include/qpid/dispatch/parse.h
@@ -33,6 +33,28 @@
  */
 
 typedef struct qd_parsed_field_t qd_parsed_field_t;
+typedef struct qd_parsed_turbo_t qd_parsed_turbo_t;
+
+DEQ_DECLARE(qd_parsed_turbo_t, qd_parsed_turbo_list_t);
+
+/**@file
+ * Parse raw data fields into skeletal AMQP data trees.
+ *
+ * @defgroup parse parse
+ *
+ * Parse data from qd_iterator_t into a tree structure representing
+ * an AMQP data type tree.
+ *@{
+ */
+struct qd_parsed_turbo_t {
+    DEQ_LINKS(qd_parsed_turbo_t);
+    qd_iterator_pointer_t bufptr;  // location/size of field in buffer
+    uint8_t               tag;
+    uint32_t              size;
+    uint32_t              count;
+    uint32_t              length_of_size;
+    uint32_t              length_of_count;
+};
 
 /**
  * Parse a field delimited by a field iterator.
@@ -43,6 +65,31 @@ typedef struct qd_parsed_field_t qd_parsed_field_t;
 qd_parsed_field_t *qd_parse(qd_iterator_t *iter);
 
 /**
+ * Parse message annotations map from a raw iterator
+ * It's called 'turbo' because it is supposed to be fast.
+ * Distinguish between user annotations and router annotations
+ * Enumerate the user entries count and size.
+ * Return the router entries in a list.
+ *
+ * This function knows a priori:
+ *   * the iter is a message annotations map
+ *   * the map key prefix is QD_MA_PREFIX
+ *   * there are 4 router map annotations at most
+ *   * the router annotations are at the end of the map
+ *
+ * @param iter Field iterator for the message annotations map
+ * @param annos returned list of router annotations map entries
+ * @param user_entries number of map user items
+ * @param user_bytes number of map user item bytes
+ * @return 0 if success else pointer to error string
+ */
+const char * qd_parse_turbo(
+                       qd_iterator_t          *iter,
+                       qd_parsed_turbo_list_t *annos,
+                       uint32_t               *user_entries,
+                       uint32_t               *user_bytes);
+
+/**
  * Free the resources associated with a parsed field.
  *
  * @param field A field pointer returned by qd_parse.
@@ -229,6 +276,31 @@ int qd_parse_is_scalar(qd_parsed_field_t *field);
  */
 qd_parsed_field_t *qd_parse_value_by_key(qd_parsed_field_t *field, const char *key);
 
+/**
+ * Parse a message annotation map field.
+ * Return parsed fields for the four router entries or null if any is absent
+ * and a blob pointer and count for the user entries in the map which are passed
+ * through as part of the message payload.
+ *
+ * @param is_interrouter true if connection is to a router
+ * @param ma_iter_in Field iterator for the annotation map field being parsed.
+ * @param ma_ingress returned parsed field: ingress
+ * @param ma_phase returned parsed field: phase
+ * @param ma_to_override returned parsed field: override
+ * @param ma_trace returned parsed field: trace
+ * @param blob_pointer returned buffer pointer to user's annotation blob
+ * @param blob_item_count number of map entries referenced by blob_iterator
+ */
+void qd_parse_annotations(
+    bool                   is_interrouter,
+    qd_iterator_t         *ma_iter_in,
+    qd_parsed_field_t    **ma_ingress,
+    qd_parsed_field_t    **ma_phase,
+    qd_parsed_field_t    **ma_to_override,
+    qd_parsed_field_t    **ma_trace,
+    qd_iterator_pointer_t *blob_pointer,
+    uint32_t              *blob_item_count);
+
 ///@}
 
 #endif

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b67b2013/src/iterator.c
----------------------------------------------------------------------
diff --git a/src/iterator.c b/src/iterator.c
index c456d16..f25d163 100644
--- a/src/iterator.c
+++ b/src/iterator.c
@@ -899,3 +899,13 @@ bool qd_iterator_next_segment(qd_iterator_t *iter, uint32_t *hash)
 
     return true;
 }
+
+
+void qd_iterator_get_view_cursor(
+    const qd_iterator_t   *iter,
+    qd_iterator_pointer_t *ptr)
+{
+    ptr->buffer    = iter->view_pointer.buffer;
+    ptr->cursor    = iter->view_pointer.cursor;
+    ptr->remaining = iter->view_pointer.remaining;
+}

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b67b2013/src/parse.c
----------------------------------------------------------------------
diff --git a/src/parse.c b/src/parse.c
index 48c6a90..2f04564 100644
--- a/src/parse.c
+++ b/src/parse.c
@@ -37,6 +37,9 @@ struct qd_parsed_field_t {
 ALLOC_DECLARE(qd_parsed_field_t);
 ALLOC_DEFINE(qd_parsed_field_t);
 
+ALLOC_DECLARE(qd_parsed_turbo_t);
+ALLOC_DEFINE(qd_parsed_turbo_t);
+
 /**
  * size = the number of bytes following the tag
  * count = the number of elements. Applies only to compound structures
@@ -169,6 +172,101 @@ qd_parsed_field_t *qd_parse(qd_iterator_t *iter)
 }
 
 
+const char *qd_parse_turbo(qd_iterator_t          *iter,
+                           qd_parsed_turbo_list_t *annos,
+                           uint32_t               *user_entries,
+                           uint32_t               *user_bytes)
+{
+    if (!iter || !annos || !user_entries || !user_bytes)
+        return  "missing argument";
+
+    DEQ_INIT(*annos);
+    *user_entries = 0;
+    *user_bytes = 0;
+
+    // The iter is addressing the message-annotations map.
+    // Open the field describing the map's items
+    uint8_t  tag             = 0;
+    uint32_t size            = 0;
+    uint32_t count           = 0;
+    uint32_t length_of_count = 0;
+    uint32_t length_of_size  = 0;
+    const char * parse_error = get_type_info(iter, &tag, &size, &count, &length_of_size, &length_of_count);
+
+    if (parse_error)
+        return parse_error;
+
+    if (count == 0)
+        return 0;
+
+    // with four router annotations there will be 8 annos (4 key,val pairs) returned at most
+#define MAX_ALLOCS 8
+    int n_allocs = 0;
+
+    // Do skeletal parse of each map element
+    for (uint32_t idx = 0; idx < count; idx++) {
+        qd_parsed_turbo_t *turbo;
+        if (n_allocs < MAX_ALLOCS) {
+            turbo = new_qd_parsed_turbo_t();
+            n_allocs++;
+
+        } else {
+            // Retire an existing element.
+            // If there are this many in the list then this one cannot be a
+            // router annotation and must be a user annotation.
+            turbo = DEQ_HEAD(*annos);
+            assert(turbo);
+            *user_entries += 1;
+            *user_bytes += sizeof(turbo->tag) + turbo->size + turbo->length_of_size;
+            DEQ_REMOVE_HEAD(*annos);
+        }
+        if (!turbo)
+            return "failed to allocate qd_parsed_turbo_t";
+        ZERO(turbo);
+
+        // Get the buffer pointers for the map element
+        qd_iterator_get_view_cursor(iter, &turbo->bufptr);
+
+        // Get description of the map element
+        parse_error = get_type_info(iter, &turbo->tag, &turbo->size, &turbo->count,
+                                    &turbo->length_of_size, &turbo->length_of_count);
+        if (parse_error) {
+            return parse_error;
+        }
+
+        // Save parsed element
+        DEQ_INSERT_TAIL(*annos, turbo);
+
+        // Advance map iterator to next map element
+        qd_iterator_advance(iter, turbo->size - turbo->length_of_count);
+    }
+
+    // remove leading annos in the queue if their prefix is not a match and
+    // return them as part of the user annotations
+    for (int idx=0; idx < n_allocs; idx += 2) {
+        qd_parsed_turbo_t *turbo = DEQ_HEAD(*annos);
+        assert(turbo);
+        if (qd_iterator_prefix_ptr(&turbo->bufptr, turbo->length_of_size + 1, QD_MA_PREFIX))
+            break;
+
+        // leading anno is a user annotation map key
+        // remove the key and value from the list and accumulate them as user items
+        *user_bytes += sizeof(turbo->tag) + turbo->size + turbo->length_of_size;
+        DEQ_REMOVE_HEAD(*annos);
+        free_qd_parsed_turbo_t(turbo);
+
+        turbo = DEQ_HEAD(*annos);
+        assert(turbo);
+        *user_bytes += sizeof(turbo->tag) + turbo->size + turbo->length_of_size;
+        DEQ_REMOVE_HEAD(*annos);
+        free_qd_parsed_turbo_t(turbo);
+
+        *user_entries += 2;
+    }
+    return parse_error;
+}
+
+
 void qd_parse_free(qd_parsed_field_t *field)
 {
     if (!field)
@@ -443,9 +541,15 @@ qd_parsed_field_t *qd_parse_sub_value(qd_parsed_field_t *field, uint32_t idx)
 }
 
 
+int is_tag_a_map(uint8_t tag)
+{
+    return tag == QD_AMQP_MAP8 || tag == QD_AMQP_MAP32;
+}
+
+
 int qd_parse_is_map(qd_parsed_field_t *field)
 {
-    return field->tag == QD_AMQP_MAP8 || field->tag == QD_AMQP_MAP32;
+    return is_tag_a_map(field->tag);
 }
 
 
@@ -481,3 +585,145 @@ qd_parsed_field_t *qd_parse_value_by_key(qd_parsed_field_t *field, const char *k
 
     return 0;
 }
+
+
+const char *qd_parse_annotations_v1(
+    qd_iterator_t         *ma_iter_in,
+    qd_parsed_field_t    **ma_ingress,
+    qd_parsed_field_t    **ma_phase,
+    qd_parsed_field_t    **ma_to_override,
+    qd_parsed_field_t    **ma_trace,
+    qd_iterator_pointer_t *blob_pointer,
+    uint32_t              *blob_item_count)
+{
+    // Do full parse
+    qd_iterator_reset(ma_iter_in);
+
+    qd_parsed_turbo_list_t annos;
+    uint32_t               user_entries;
+    uint32_t               user_bytes;
+    const char * parse_error = qd_parse_turbo(ma_iter_in, &annos, &user_entries, &user_bytes);
+    if (parse_error) {
+        return parse_error;
+    }
+
+    qd_parsed_turbo_t *anno = DEQ_HEAD(annos);
+    while (anno) {
+        qd_iterator_t *key_iter =
+            qd_iterator_buffer(anno->bufptr.buffer,
+                               anno->bufptr.cursor - qd_buffer_base(anno->bufptr.buffer),
+                               anno->size,
+                               ITER_VIEW_ALL);
+        assert(key_iter);
+
+        qd_parsed_field_t *key_field = qd_parse(key_iter);
+        assert(key_field);
+
+        qd_iterator_t *iter = qd_parse_raw(key_field);
+        assert(iter);
+
+        qd_parsed_turbo_t *anno_val = DEQ_NEXT(anno);
+        assert(anno_val);
+
+        qd_iterator_t *val_iter =
+            qd_iterator_buffer(anno_val->bufptr.buffer,
+                               anno_val->bufptr.cursor - qd_buffer_base(anno_val->bufptr.buffer),
+                               anno_val->size,
+                               ITER_VIEW_ALL);
+        assert(val_iter);
+
+        qd_parsed_field_t *val_field = qd_parse(val_iter);
+        assert(val_field);
+
+        // transfer ownership of the extracted value to the message
+        if        (qd_iterator_equal(iter, (unsigned char*) QD_MA_TRACE)) {
+            *ma_trace = val_field;
+        } else if (qd_iterator_equal(iter, (unsigned char*) QD_MA_INGRESS)) {
+            *ma_ingress = val_field;
+        } else if (qd_iterator_equal(iter, (unsigned char*) QD_MA_TO)) {
+            *ma_to_override = val_field;
+        } else if (qd_iterator_equal(iter, (unsigned char*) QD_MA_PHASE)) {
+            *ma_phase = val_field;
+        } else {
+            // TODO: this key had the QD_MA_PREFIX but it does not match
+            //       one of the actual fields.
+        }
+
+        qd_iterator_free(key_iter);
+        qd_parse_free(key_field);
+        qd_iterator_free(val_iter);
+        // val_field is handed over the message_private and is freed with the message
+
+        anno = DEQ_NEXT(anno_val);
+    }
+
+    anno = DEQ_HEAD(annos);
+    while (anno) {
+        DEQ_REMOVE_HEAD(annos);
+        free_qd_parsed_turbo_t(anno);
+        anno = DEQ_HEAD(annos);
+    }
+
+    // Adjust size of user annotation blob by the size of the router
+    // annotations
+    blob_pointer->remaining = user_bytes;
+    assert(blob_pointer->remaining >= 0);
+
+    *blob_item_count = user_entries;
+    assert(*blob_item_count >= 0);
+    return 0;
+}
+
+
+void qd_parse_annotations(
+    bool                   is_interrouter,
+    qd_iterator_t         *ma_iter_in,
+    qd_parsed_field_t    **ma_ingress,
+    qd_parsed_field_t    **ma_phase,
+    qd_parsed_field_t    **ma_to_override,
+    qd_parsed_field_t    **ma_trace,
+    qd_iterator_pointer_t *blob_pointer,
+    uint32_t              *blob_item_count)
+{
+    *ma_ingress             = 0;
+    *ma_phase               = 0;
+    *ma_to_override         = 0;
+    *ma_trace               = 0;
+    ZERO(blob_pointer);
+    *blob_item_count        = 0;
+
+    if (!ma_iter_in)
+        return;
+
+    uint8_t  tag             = 0;
+    uint32_t size            = 0;
+    uint32_t length_of_count = 0;
+    uint32_t length_of_size  = 0;
+
+    const char *parse_error = get_type_info(ma_iter_in, &tag,
+                                            &size, blob_item_count, &length_of_size,
+                                            &length_of_count);
+    if (parse_error)
+        return;
+
+    if (!is_tag_a_map(tag)) {
+        return;
+    }
+
+    // Initial snapshot on size/content of annotation payload
+    qd_iterator_t *raw_iter = qd_iterator_sub(ma_iter_in, (size - length_of_count));
+
+    // If there are no router annotations then all annotations
+    // are the user's opaque blob.
+    qd_iterator_get_view_cursor(raw_iter, blob_pointer);
+
+    qd_iterator_free(raw_iter);
+
+    if (is_interrouter) {
+        (void) qd_parse_annotations_v1(ma_iter_in, ma_ingress, ma_phase,
+                                       ma_to_override, ma_trace,
+                                       blob_pointer, blob_item_count);
+    }
+
+    return;
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org