You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2022/02/17 21:03:36 UTC

[qpid-dispatch] branch main updated (25261f4 -> 32ddaa1)

This is an automated email from the ASF dual-hosted git repository.

kgiusti pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git.


    from 25261f4  DISPATCH-2234: Update JavaScript console packages for the 1.19.0 release (#1517)
     new 8722238  DISPATCH-1403: define a common buffer field API
     new 32ddaa1  DISPATCH-1487: Message annotations re-write

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 include/qpid/dispatch/amqp.h                       |  48 +-
 .../qpid/dispatch/buffer_field.h                   |  29 +-
 include/qpid/dispatch/compose.h                    |  89 +++
 include/qpid/dispatch/iterator.h                   |  32 +-
 include/qpid/dispatch/message.h                    | 146 ++--
 include/qpid/dispatch/parse.h                      | 107 ++-
 include/qpid/dispatch/protocol_adaptor.h           |  11 -
 include/qpid/dispatch/router.h                     |   3 +-
 src/amqp.c                                         |  22 +-
 src/buffer_field_api.h                             | 297 +++++++
 src/iterator.c                                     | 221 +-----
 src/message.c                                      | 688 ++++++++--------
 src/message_private.h                              |  37 +-
 src/parse.c                                        | 874 ++++++++++-----------
 src/python_embedded.c                              |  15 +-
 src/router_config.c                                |   1 +
 src/router_core/core_client_api.c                  |  15 +-
 src/router_core/exchange_bindings.c                |  16 +-
 src/router_core/management_agent.c                 |  25 +-
 .../address_lookup_server/address_lookup_server.c  |   7 +-
 .../edge_addr_tracking/edge_addr_tracking.c        |  10 +-
 .../modules/heartbeat_edge/heartbeat_edge.c        |   5 +-
 src/router_core/modules/mobile_sync/mobile.c       |  34 +-
 .../modules/test_hooks/core_test_hooks.c           |   4 +-
 src/router_core/transfer.c                         |  26 +-
 src/router_node.c                                  | 200 ++---
 tests/buffer_test.c                                | 244 ++++++
 tests/lsan.supp                                    |   4 +
 tests/message_test.c                               | 505 +++++++-----
 tests/parse_test.c                                 | 165 +++-
 tests/run_unit_tests_size.c                        | 116 ++-
 tests/system_tests_one_router.py                   | 197 +++--
 tests/system_tests_two_routers.py                  |  10 +-
 33 files changed, 2485 insertions(+), 1718 deletions(-)
 copy src/connection_manager_private.h => include/qpid/dispatch/buffer_field.h (61%)
 create mode 100644 src/buffer_field_api.h

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-dispatch] 01/02: DISPATCH-1403: define a common buffer field API

Posted by kg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kgiusti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit 87222386c1bb6f57eb7965c8a49f5a41002af8c4
Author: Kenneth Giusti <kg...@apache.org>
AuthorDate: Tue Nov 23 13:17:58 2021 -0500

    DISPATCH-1403: define a common buffer field API
    
    Export the buffer data handling routines in the interator library as a
    general API. Inlined for performance.
---
 include/qpid/dispatch/buffer_field.h |  43 +++++
 include/qpid/dispatch/iterator.h     |  32 ++--
 include/qpid/dispatch/parse.h        |  32 ++--
 src/buffer_field_api.h               | 299 +++++++++++++++++++++++++++++++++++
 src/iterator.c                       | 221 +++++---------------------
 src/message.c                        |   4 +-
 src/message_private.h                |   4 +-
 src/parse.c                          | 112 ++++---------
 tests/buffer_test.c                  | 244 ++++++++++++++++++++++++++++
 9 files changed, 683 insertions(+), 308 deletions(-)

diff --git a/include/qpid/dispatch/buffer_field.h b/include/qpid/dispatch/buffer_field.h
new file mode 100644
index 0000000..b43f26d
--- /dev/null
+++ b/include/qpid/dispatch/buffer_field.h
@@ -0,0 +1,43 @@
+#ifndef __dispatch_buffer_field_h__
+#define __dispatch_buffer_field_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/** @file
+ * Data fields spanning multiple buffers
+ * @internal
+ * @defgroup buffer_field buffer_field
+ * @{
+ */
+
+#include "qpid/dispatch/buffer.h"
+
+
+/* descriptor for a sequence of bytes in a buffer list
+ */
+typedef struct qd_buffer_field_t qd_buffer_field_t;
+struct qd_buffer_field_t {
+    qd_buffer_t   *buffer;     // hold start of data
+    const uint8_t *cursor;     // first octet of data
+    size_t         remaining;  // length of data
+};
+
+///@}
+
+#endif
diff --git a/include/qpid/dispatch/iterator.h b/include/qpid/dispatch/iterator.h
index b730872..157ca5f 100644
--- a/include/qpid/dispatch/iterator.h
+++ b/include/qpid/dispatch/iterator.h
@@ -19,7 +19,7 @@
  * under the License.
  */
 
-#include "qpid/dispatch/buffer.h"
+#include "qpid/dispatch/buffer_field.h"
 
 #include <stdbool.h>
 #include <stdint.h>
@@ -127,12 +127,6 @@ typedef enum {
 } qd_iterator_view_t;
 
 
-typedef struct {
-    qd_buffer_t   *buffer;
-    unsigned char *cursor;
-    int            remaining;
-} qd_iterator_pointer_t;
-
 /** @} */
 /** \name global
  * Global Methods
@@ -310,18 +304,6 @@ bool qd_iterator_equal(qd_iterator_t *iter, const unsigned char *string);
 bool qd_iterator_prefix(qd_iterator_t *iter, const char *prefix);
 
 /**
- * Return true iff the prefix string matches the characters addressed by ptr.
- * This function ignores octets beyond the length of the prefix.
- * Caller's pointer is held constant.
- *
- * @param ptr buffer chain cursor holding message bytes
- * @param skip AMQP housekeeping bytes to skip over before finding the incoming string
- * @param prefix the prefix to be matched
- * @return true if all bytes of prefix match bytes in user string
- */
-bool qd_iterator_prefix_ptr(const qd_iterator_pointer_t *ptr, uint32_t skip, const char *prefix);
-
-/**
  * Copy the iterator's view into buffer up to a maximum of n bytes.  View is
  * reset to the beginning and cursor is advanced by the number of bytes
  * copied. There is no trailing '\0' added.
@@ -458,11 +440,15 @@ bool qd_iterator_next_segment(qd_iterator_t *iter, uint32_t *hash);
  * Exposes iter's buffer, cursor, and remaining values.
  *
  * @param iter iter that still has data in its view.
- * @param ptr Pointer object which is to receive cursor position
+ * @return a copy of the iter's view cursor
+ */
+qd_buffer_field_t qd_iterator_get_view_cursor(const qd_iterator_t *iter);
+
+/**
+ * Construct an iterator from a buffer field
  */
-void qd_iterator_get_view_cursor(
-    const qd_iterator_t   *iter,
-    qd_iterator_pointer_t *ptr);
+qd_iterator_t *qd_iterator_buffer_field(const qd_buffer_field_t *bfield,
+                                        qd_iterator_view_t view);
 
 /** @} */
 /** @} */
diff --git a/include/qpid/dispatch/parse.h b/include/qpid/dispatch/parse.h
index d47a312..5fe0552 100644
--- a/include/qpid/dispatch/parse.h
+++ b/include/qpid/dispatch/parse.h
@@ -19,7 +19,7 @@
  * under the License.
  */
 
-#include "qpid/dispatch/buffer.h"
+#include "qpid/dispatch/buffer_field.h"
 #include "qpid/dispatch/iterator.h"
 
 /**@file
@@ -48,12 +48,12 @@ DEQ_DECLARE(qd_parsed_turbo_t, qd_parsed_turbo_list_t);
  */
 struct qd_parsed_turbo_t {
     DEQ_LINKS(qd_parsed_turbo_t);
-    qd_iterator_pointer_t bufptr;  // location/size of field in buffer
-    uint8_t               tag;
-    uint32_t              size;
-    uint32_t              count;
-    uint32_t              length_of_size;
-    uint32_t              length_of_count;
+    qd_buffer_field_t bufptr;  // location/size of field in buffer
+    uint8_t           tag;
+    uint32_t          size;
+    uint32_t          count;
+    uint32_t          length_of_size;
+    uint32_t          length_of_count;
 };
 
 /**
@@ -295,15 +295,15 @@ qd_parsed_field_t *qd_parse_value_by_key(qd_parsed_field_t *field, const char *k
  * @param blob_item_count number of map entries referenced by blob_iterator
  */
 void qd_parse_annotations(
-    bool                   strip_annotations_in,
-    qd_iterator_t         *ma_iter_in,
-    qd_parsed_field_t    **ma_ingress,
-    qd_parsed_field_t    **ma_phase,
-    qd_parsed_field_t    **ma_to_override,
-    qd_parsed_field_t    **ma_trace,
-    qd_parsed_field_t    **ma_stream,
-    qd_iterator_pointer_t *blob_pointer,
-    uint32_t              *blob_item_count);
+    bool                strip_annotations_in,
+    qd_iterator_t      *ma_iter_in,
+    qd_parsed_field_t **ma_ingress,
+    qd_parsed_field_t **ma_phase,
+    qd_parsed_field_t **ma_to_override,
+    qd_parsed_field_t **ma_trace,
+    qd_parsed_field_t **ma_stream,
+    qd_buffer_field_t  *blob_pointer,
+    uint32_t           *blob_item_count);
 
 /**
  * Identify which annotation is being parsed
diff --git a/src/buffer_field_api.h b/src/buffer_field_api.h
new file mode 100644
index 0000000..a2ac8cb
--- /dev/null
+++ b/src/buffer_field_api.h
@@ -0,0 +1,299 @@
+#ifndef __dispatch_buffer_field_api_h__
+#define __dispatch_buffer_field_api_h__ 1
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/** @file
+ * Inline API for common operations on buffer_fields
+ * @internal
+ * @{
+ */
+
+#include "qpid/dispatch/buffer_field.h"
+#include <stdbool.h>
+
+/* qd_buffer_field_normalize
+ *
+ * Invariant: a non-empty buffer fields cursor always points to a valid octet,
+ * never at the end of a non-terminal buffer. Normalizing ensures that
+ * invariant holds.
+ */
+static inline void qd_buffer_field_normalize(qd_buffer_field_t *bfield)
+{
+    assert(bfield);
+    if (bfield->remaining) {
+        while (bfield->cursor == qd_buffer_cursor(bfield->buffer)) {
+            bfield->buffer = DEQ_NEXT(bfield->buffer);
+            assert(bfield->buffer);  // error: remaining value incorrect
+            bfield->cursor = qd_buffer_base(bfield->buffer);
+        }
+    }
+}
+
+
+/* qd_buffer_field
+ *
+ * Constructor - ensures buffer field is well formed.
+ */
+static inline qd_buffer_field_t qd_buffer_field(qd_buffer_t *buffer, const uint8_t *cursor, size_t remaining)
+{
+    assert(buffer);
+    qd_buffer_field_t bf = {.buffer = buffer, .cursor = cursor, .remaining = remaining};
+    qd_buffer_field_normalize(&bf);
+    return bf;
+}
+
+
+/* qd_buffer_field_extend
+ *
+ * Increase the size of the field by amount octets
+ */
+static inline size_t qd_buffer_field_extend(qd_buffer_field_t *bfield, size_t amount)
+{
+    assert(bfield);
+    size_t old = bfield->remaining;
+    bfield->remaining += amount;
+    if (old == 0)  // move cursor to start of new data
+        qd_buffer_field_normalize(bfield);
+    return bfield->remaining;
+}
+
+
+/* qd_buffer_field_ncopy
+ *
+ * Copy up to n octets from bfield to dest, advance bfield by the number of
+ * octets copied
+ *
+ * @return total of octets copied - may be < n if len(bfield) < n
+ */
+static inline size_t qd_buffer_field_ncopy(qd_buffer_field_t *bfield, uint8_t *dest, size_t n)
+{
+    assert(bfield);
+
+    const uint8_t * const start = dest;
+    size_t count = MIN(n, bfield->remaining);
+
+    while (count) {
+        size_t avail = qd_buffer_cursor(bfield->buffer) - bfield->cursor;
+        if (count < avail) {
+            // fastpath: no need to adjust buffer pointers
+            memcpy(dest, bfield->cursor, count);
+            dest += count;
+            bfield->cursor += count;
+            bfield->remaining -= count;
+            return dest - start;
+        }
+
+        memcpy(dest, bfield->cursor, avail);
+        dest += avail;
+        count -= avail;
+
+        // count is >= what is available in the current buffer, move to next
+
+        bfield->remaining -= avail;
+        bfield->cursor += avail;
+        qd_buffer_field_normalize(bfield);
+    }
+
+    return dest - start;
+}
+
+
+/* qd_buffer_field_advance
+ *
+ * Move the cursor of bfield forward by amount octets.
+ *
+ * @return total of octets skipped - may be < amount if len(bfield) < amount
+ */
+static inline size_t qd_buffer_field_advance(qd_buffer_field_t *bfield, size_t amount)
+{
+    assert(bfield);
+
+    const size_t blen = bfield->remaining;
+    size_t count = MIN(amount, blen);
+
+    while (count > 0) {
+        size_t avail = qd_buffer_cursor(bfield->buffer) - bfield->cursor;
+
+        if (count < avail) {
+            // fastpath: no need to adjust buffer pointers
+            bfield->cursor += count;
+            bfield->remaining -= count;
+            break;
+        }
+
+        // count is >= what is available in the current buffer, move to next
+        count -= avail;
+        bfield->remaining -= avail;
+        bfield->cursor += avail;
+        qd_buffer_field_normalize(bfield);
+    }
+
+    return blen - bfield->remaining;
+}
+
+
+/* qd_buffer_field_octet
+ *
+ * Get the first octet of the field and move the cursor to the next octet (if
+ * present).  bfield length is decremented by 1
+ *
+ * @return true of octet read, false if no octet available (end of field).
+ */
+static inline bool qd_buffer_field_octet(qd_buffer_field_t *bfield, uint8_t *octet)
+{
+    assert(bfield);
+
+    if (bfield->remaining) {
+        assert(bfield->cursor < qd_buffer_cursor(bfield->buffer));
+        *octet = *bfield->cursor++;
+        bfield->remaining -= 1;
+        qd_buffer_field_normalize(bfield);
+        return true;
+    }
+    return false;
+}
+
+
+/* qd_buffer_field_uint32
+ *
+ * Get the next 4 octets of the field and convert them to a uint32 value.  Move
+ * the cursor past the 4 octets and decrement the length by 4. uint32 values
+ * are used extensively in the AMQP type encodings for meta data (size and
+ * count).
+ *
+ * @return true of uint32 read, false if not enough octets available (end of field).
+ */
+static inline bool qd_buffer_field_uint32(qd_buffer_field_t *bfield, uint32_t *value)
+{
+    assert(bfield);
+
+    if (bfield->remaining >= 4) {
+        uint8_t buf[4];
+        qd_buffer_field_ncopy(bfield, buf, 4);
+        *value = (((uint32_t) buf[0]) << 24)
+            | (((uint32_t) buf[1]) << 16)
+            | (((uint32_t) buf[2]) << 8)
+            | ((uint32_t) buf[3]);
+        return true;
+    }
+    return false;
+}
+
+
+/* qd_buffer_field_strdup
+ *
+ * Return a null terminated string containing the bfield data.  Caller assumes
+ * responsibility for calling free() on the returned value when finished with
+ * it.  Caller also should ensure the data is actually a value that can be
+ * rendered as a C string (e.g. no internal zero values).
+ *
+ * @return null terminated C string, must be free()ed by caller.
+ */
+static inline char *qd_buffer_field_strdup(qd_buffer_field_t *bfield)
+{
+    assert(bfield);
+
+    const size_t len = bfield->remaining + 1;
+    char *str = qd_malloc(len);
+    qd_buffer_field_ncopy(bfield, (uint8_t*) str, bfield->remaining);
+    str[len - 1] = 0;
+    return str;
+}
+
+
+/* qd_buffer_field_equal
+ *
+ * Check if the field is exactly equal to count octets of data. If equal
+ * advance the bfield count octets.
+ *
+ * @return true if equal
+ */
+static inline bool qd_buffer_field_equal(qd_buffer_field_t *bfield, const uint8_t *data, size_t count)
+{
+    assert(bfield);
+
+    if (bfield->remaining < count)
+        return false;
+
+    const qd_buffer_field_t save = *bfield;
+
+    while (count) {
+
+        size_t avail = qd_buffer_cursor(bfield->buffer) - bfield->cursor;
+
+        if (count < avail) {
+            // fastpath: no need to adjust buffer pointers
+            if (memcmp(data, bfield->cursor, count) != 0) {
+                *bfield = save;
+                return false;
+            }
+            bfield->cursor += count;
+            bfield->remaining -= count;
+            return true;
+        }
+
+        if (memcmp(data, bfield->cursor, avail) != 0) {
+            *bfield = save;
+            return false;
+        }
+
+        data += avail;
+        count -= avail;
+        bfield->remaining -= avail;
+        bfield->cursor += avail;
+        qd_buffer_field_normalize(bfield);
+    }
+
+    return true;
+}
+
+
+/* qd_buffer_list_append_field
+ *
+ * Copy the contents of bfield to the end of the buflist buffer chain. This
+ * copies all data - no bfield buffers are moved to buflist. bfield is advanced
+ * to the end of data.
+ *
+ * @return void
+ */
+static inline void qd_buffer_list_append_field(qd_buffer_list_t *buflist, qd_buffer_field_t *bfield)
+{
+    assert(buflist);
+    assert(bfield);
+
+    while (bfield->remaining) {
+        size_t avail = qd_buffer_cursor(bfield->buffer) - bfield->cursor;
+        size_t len = MIN(bfield->remaining, avail);
+
+        qd_buffer_list_append(buflist, bfield->cursor, len);
+        bfield->remaining -= len;
+        if (!bfield->remaining) {
+            bfield->cursor += len;
+        } else {
+            bfield->buffer = DEQ_NEXT(bfield->buffer);
+            assert(bfield->buffer);
+            bfield->cursor = qd_buffer_base(bfield->buffer);
+        }
+    }
+}
+
+///@}
+
+#endif
diff --git a/src/iterator.c b/src/iterator.c
index 3e46b37..3b34891 100644
--- a/src/iterator.c
+++ b/src/iterator.c
@@ -23,6 +23,7 @@
 #include "qpid/dispatch/amqp.h"
 #include "qpid/dispatch/ctools.h"
 #include "qpid/dispatch/hash.h"
+#include "buffer_field_api.h"
 
 #include <stdio.h>
 #include <string.h>
@@ -51,9 +52,9 @@ ALLOC_DECLARE(qd_hash_segment_t);
 ALLOC_DEFINE(qd_hash_segment_t);
 
 struct qd_iterator_t {
-    qd_iterator_pointer_t   start_pointer;      // Pointer to the raw data
-    qd_iterator_pointer_t   view_start_pointer; // Pointer to the start of the view
-    qd_iterator_pointer_t   view_pointer;       // Pointer to the remaining view
+    qd_buffer_field_t       start_pointer;      // Pointer to the raw data
+    qd_buffer_field_t       view_start_pointer; // Pointer to the start of the view
+    qd_buffer_field_t       view_pointer;       // Pointer to the remaining view
     qd_iterator_view_t      view;
     int                     annotation_length;
     int                     annotation_remaining;
@@ -138,7 +139,7 @@ static void parse_address_view(qd_iterator_t *iter)
     // in order to aid the router in looking up addresses.
     //
 
-    qd_iterator_pointer_t save_pointer = iter->view_pointer;
+    qd_buffer_field_t save_pointer = iter->view_pointer;
     iter->annotation_length = 1;
 
     if (iter->prefix_override == '\0' && qd_iterator_prefix(iter, "_")) {
@@ -257,7 +258,7 @@ static void parse_node_view(qd_iterator_t *iter)
 void qd_iterator_remove_trailing_separator(qd_iterator_t *iter)
 {
     // Save the iterator's pointer so we can apply it back before returning from this function.
-    qd_iterator_pointer_t save_pointer = iter->view_pointer;
+    qd_buffer_field_t save_pointer = iter->view_pointer;
 
     char current_octet = 0;
     while (!iterator_at_end(iter)) {
@@ -301,7 +302,7 @@ static void view_initialize(qd_iterator_t *iter)
     //
     state_t               state = STATE_START;
     unsigned int          octet;
-    qd_iterator_pointer_t save_pointer = {0,0,0};
+    qd_buffer_field_t save_pointer = {0,0,0};
 
     while (!iterator_at_end(iter) && state != STATE_AT_NODE_ID) {
         octet = qd_iterator_octet(iter);
@@ -387,38 +388,11 @@ static inline int iterator_field_ncopy(qd_iterator_t *iter, unsigned char *buffe
 {
     assert(in_field_data(iter));
 
-    const unsigned char *start = buffer;
-    int count = MIN(n, iter->view_pointer.remaining);
     if (iter->view_pointer.buffer) {
-        do {
-            size_t avail = qd_buffer_cursor(iter->view_pointer.buffer) - iter->view_pointer.cursor;
-            // optimize: early exit when no need to advance buffer pointers
-            if (count < avail) {
-                memcpy(buffer, iter->view_pointer.cursor, count);
-                iter->view_pointer.cursor += count;
-                iter->view_pointer.remaining -= count;
-                return buffer - start + count;
-            }
-            // count is >= what is available in the current buffer, move to next
-            memcpy(buffer, iter->view_pointer.cursor, avail);
-            buffer += avail;
-            count -= avail;
-            iter->view_pointer.cursor += avail;
-            iter->view_pointer.remaining -= avail;
-            if (iter->view_pointer.remaining) {
-                iter->view_pointer.buffer = DEQ_NEXT(iter->view_pointer.buffer);
-                if (iter->view_pointer.buffer) {
-                    iter->view_pointer.cursor = qd_buffer_base(iter->view_pointer.buffer);
-                } else {
-                    // DISPATCH-1394: field is truncated (remaining is inaccurate!)
-                    iter->view_pointer.remaining = 0;
-                    break;
-                }
-            }
-        } while (count);
-        return buffer - start;
+        return qd_buffer_field_ncopy(&iter->view_pointer, (uint8_t*)buffer, n);
 
     } else {  // string or binary array
+        int count = MIN(n, iter->view_pointer.remaining);
         memcpy(buffer, iter->view_pointer.cursor, count);
         iter->view_pointer.cursor += count;
         iter->view_pointer.remaining -= count;
@@ -436,33 +410,11 @@ static inline void iterator_field_move_cursor(qd_iterator_t *iter, uint32_t leng
     // prefix
     assert(in_field_data(iter));
 
-    uint32_t count = MIN(length, iter->view_pointer.remaining);
     if (iter->view_pointer.buffer) {
-        do {
-            uint32_t avail = qd_buffer_cursor(iter->view_pointer.buffer) - iter->view_pointer.cursor;
-            // optimized: early exit when no need to update iterators buffer pointers
-            if (count < avail) {
-                iter->view_pointer.cursor += count;
-                iter->view_pointer.remaining -= count;
-                return;
-            }
-            // count is >= what is available in the current buffer, move to next
-            count -= avail;
-            iter->view_pointer.cursor += avail;
-            iter->view_pointer.remaining -= avail;
-            if (iter->view_pointer.remaining) {
-                iter->view_pointer.buffer = DEQ_NEXT(iter->view_pointer.buffer);
-                if (iter->view_pointer.buffer) {
-                    iter->view_pointer.cursor = qd_buffer_base(iter->view_pointer.buffer);
-                } else {
-                    // DISPATCH-1394: field is truncated (remaining is inaccurate!)
-                    iter->view_pointer.remaining = 0;
-                    return;
-                }
-            }
-        } while (count);
+        qd_buffer_field_advance(&iter->view_pointer, length);
 
     } else {    // string/binary data
+        uint32_t count = MIN(length, iter->view_pointer.remaining);
         iter->view_pointer.cursor    += count;
         iter->view_pointer.remaining -= count;
     }
@@ -484,42 +436,7 @@ static inline bool iterator_field_equal(qd_iterator_t *iter, const unsigned char
         return false;
 
     if (iter->view_pointer.buffer) {
-
-        qd_iterator_pointer_t save_pointer = iter->view_pointer;
-
-        do {
-            size_t avail = qd_buffer_cursor(iter->view_pointer.buffer) - iter->view_pointer.cursor;
-            // optimized: early exit when no need to update iterators buffer pointers
-            if (count < avail) {
-                if (memcmp(buffer, iter->view_pointer.cursor, count) != 0) {
-                    iter->view_pointer = save_pointer;
-                    return false;
-                }
-                iter->view_pointer.cursor    += count;
-                iter->view_pointer.remaining -= count;
-                return true;
-            }
-            // count is >= what is available in the current buffer
-            if (memcmp(buffer, iter->view_pointer.cursor, avail) != 0) {
-                iter->view_pointer = save_pointer;
-                return false;
-            }
-
-            buffer += avail;
-            count -= avail;
-            iter->view_pointer.cursor += avail;
-            iter->view_pointer.remaining -= avail;
-            if (iter->view_pointer.remaining) {
-                iter->view_pointer.buffer = DEQ_NEXT(iter->view_pointer.buffer);
-                if (iter->view_pointer.buffer) {
-                    iter->view_pointer.cursor = qd_buffer_base(iter->view_pointer.buffer);
-                } else {
-                    // DISPATCH-1394: field is truncated (remaining is inaccurate!)
-                    iter->view_pointer = save_pointer;
-                    return false;
-                }
-            }
-        } while (count);
+        return qd_buffer_field_equal(&iter->view_pointer, (const uint8_t*) buffer, count);
 
     } else {  // string or binary array
 
@@ -624,10 +541,8 @@ qd_iterator_t *qd_iterator_buffer(qd_buffer_t *buffer, int offset, int length, q
         return 0;
 
     ZERO(iter);
-    iter->start_pointer.buffer    = buffer;
-    iter->start_pointer.cursor    = qd_buffer_base(buffer) + offset;
-    iter->start_pointer.remaining = length;
-    iter->phase                   = '0';
+    iter->start_pointer = qd_buffer_field(buffer, qd_buffer_base(buffer) + offset, length);
+    iter->phase         = '0';
 
     qd_iterator_reset_view(iter, view);
 
@@ -739,18 +654,14 @@ unsigned char qd_iterator_octet(qd_iterator_t *iter)
         if (iter->view_pointer.remaining == 0)
             return (unsigned char) 0;
 
-        unsigned char result = *(iter->view_pointer.cursor);
-
-        // we know remaining > 0, so we can simply move the cursor
-
-        iter->view_pointer.cursor++;
-
-        // the slow path: if we've moved "off" the end, simply advance to the next buffer
-        if (--iter->view_pointer.remaining
-            && iter->view_pointer.buffer
-            && qd_buffer_cursor(iter->view_pointer.buffer) == iter->view_pointer.cursor) {
-            iter->view_pointer.buffer = iter->view_pointer.buffer->next;
-            iter->view_pointer.cursor = qd_buffer_base(iter->view_pointer.buffer);
+        unsigned char result;
+        if (iter->view_pointer.buffer) {
+            uint8_t octet;
+            (void)qd_buffer_field_octet(&iter->view_pointer, &octet);
+            result = (unsigned char) octet;
+        } else { // string or binary array
+            result = *(iter->view_pointer.cursor)++;
+            --iter->view_pointer.remaining;
         }
 
         if (iter->mode == MODE_TO_SLASH && iter->view_pointer.remaining && *(iter->view_pointer.cursor) == '/') {
@@ -878,8 +789,8 @@ bool qd_iterator_prefix(qd_iterator_t *iter, const char *prefix)
     if (!iter)
         return false;
 
-    qd_iterator_pointer_t save_pointer = iter->view_pointer;
-    unsigned char *c                   = (unsigned char*) prefix;
+    qd_buffer_field_t save_pointer = iter->view_pointer;
+    unsigned char *c               = (unsigned char*) prefix;
 
     while(*c) {
         if (*c != qd_iterator_octet(iter))
@@ -896,69 +807,6 @@ bool qd_iterator_prefix(qd_iterator_t *iter, const char *prefix)
 }
 
 
-// bare bones copy of field_iterator_move_cursor with no field/view baggage
-void iterator_pointer_move_cursor(qd_iterator_pointer_t *ptr, uint32_t length)
-{
-    uint32_t count = length > ptr->remaining ? ptr->remaining : length;
-
-    while (count) {
-        uint32_t remaining = qd_buffer_cursor(ptr->buffer) - ptr->cursor;
-        remaining = remaining > count ? count : remaining;
-        ptr->cursor += remaining;
-        ptr->remaining -= remaining;
-        count -= remaining;
-        if (ptr->cursor == qd_buffer_cursor(ptr->buffer)) {
-            ptr->buffer = ptr->buffer->next;
-            if (ptr->buffer == 0) {
-                ptr->remaining = 0;
-                ptr->cursor = 0;
-                break;
-            } else {
-                ptr->cursor = qd_buffer_base(ptr->buffer);
-            }
-        }
-    }
-}
-
-
-// bare bones copy of qd_iterator_prefix with no iterator baggage
-bool qd_iterator_prefix_ptr(const qd_iterator_pointer_t *ptr, uint32_t skip, const char *prefix)
-{
-    if (!ptr)
-        return false;
-
-    // if ptr->buffer holds enough bytes for the comparison then
-    // don't fiddle with the iterator motions. Just do the comparison directly.
-    const int avail = qd_buffer_cursor(ptr->buffer) - ptr->cursor;
-    if (avail >= skip + QD_MA_PREFIX_LEN) {
-        // there's enough in current buffer to do straight compare
-        const void * blk1 = ptr->cursor + skip;
-        const void * blk2 = prefix;
-        return memcmp(blk1, blk2, QD_MA_PREFIX_LEN) == 0;
-    }
-
-    // otherwise compare across buffer boundaries
-    // this, too, could be optimized a bit
-    qd_iterator_pointer_t lptr;
-    *&lptr = *ptr;
-
-    iterator_pointer_move_cursor(&lptr, skip);
-
-    unsigned char *c = (unsigned char*) prefix;
-    while(*c && lptr.remaining) {
-        unsigned char ic = *lptr.cursor;
-
-        if (*c != ic)
-            break;
-        c++;
-
-        iterator_pointer_move_cursor(&lptr, 1);
-    }
-
-    return *c == 0;
-}
-
-
 int qd_iterator_length(const qd_iterator_t *iter)
 {
     return iter ? iterator_length(iter) : 0;
@@ -1135,13 +983,9 @@ bool qd_iterator_next_segment(qd_iterator_t *iter, uint32_t *hash)
 }
 
 
-void qd_iterator_get_view_cursor(
-    const qd_iterator_t   *iter,
-    qd_iterator_pointer_t *ptr)
+qd_buffer_field_t qd_iterator_get_view_cursor(const qd_iterator_t *iter)
 {
-    ptr->buffer    = iter->view_pointer.buffer;
-    ptr->cursor    = iter->view_pointer.cursor;
-    ptr->remaining = iter->view_pointer.remaining;
+    return iter->view_pointer;
 }
 
 
@@ -1154,3 +998,16 @@ void qd_iterator_finalize(void)
     my_area = 0;
     my_router = 0;
 }
+
+
+qd_iterator_t *qd_iterator_buffer_field(const qd_buffer_field_t *bfield,
+                                        qd_iterator_view_t view)
+{
+    assert(bfield);
+    qd_buffer_field_t copy = *bfield;
+    qd_buffer_field_normalize(&copy);
+    return qd_iterator_buffer(copy.buffer,
+                              copy.cursor - qd_buffer_base(copy.buffer),
+                              copy.remaining,
+                              view);
+}
diff --git a/src/message.c b/src/message.c
index e35a14f..1a79fe2 100644
--- a/src/message.c
+++ b/src/message.c
@@ -1190,8 +1190,8 @@ void qd_message_message_annotations(qd_message_t *in_msg)
     // Construct pseudo-field location of user annotations blob
     // This holds all annotations if no router-specific annotations are present
     if (content->ma_count > 0) {
-        qd_field_location_t   *cf  = &content->field_user_annotations;
-        qd_iterator_pointer_t *uab = &content->ma_user_annotation_blob;
+        qd_field_location_t *cf  = &content->field_user_annotations;
+        qd_buffer_field_t   *uab = &content->ma_user_annotation_blob;
         cf->buffer = uab->buffer;
         cf->offset = uab->cursor - qd_buffer_base(uab->buffer);
         cf->length = uab->remaining;
diff --git a/src/message_private.h b/src/message_private.h
index 944612e..fff1b17 100644
--- a/src/message_private.h
+++ b/src/message_private.h
@@ -120,7 +120,7 @@ typedef struct {
     qd_message_depth_t   parse_depth;                     // Depth to which message content has been parsed
     qd_iterator_t       *ma_field_iter_in;                // Iter for msg.FIELD_MESSAGE_ANNOTATION
 
-    qd_iterator_pointer_t ma_user_annotation_blob;        // Original user annotations
+    qd_buffer_field_t    ma_user_annotation_blob;        // Original user annotations
                                                           //  with router annotations stripped
     uint32_t             ma_count;                        // Number of map elements in blob
                                                           //  after router fields stripped
@@ -152,7 +152,7 @@ typedef struct {
 } qd_message_content_t;
 
 struct qd_message_pvt_t {
-    qd_iterator_pointer_t          cursor;          // Pointer to current location of outgoing byte stream.
+    qd_buffer_field_t              cursor;          // Pointer to current location of outgoing byte stream.
     qd_message_depth_t             message_depth;   // Depth of incoming received message
     qd_message_depth_t             sent_depth;      // Depth of outgoing sent message
     qd_message_content_t          *content;         // Singleton content shared by reference between
diff --git a/src/parse.c b/src/parse.c
index 9ebbef7..3f43e5f 100644
--- a/src/parse.c
+++ b/src/parse.c
@@ -22,6 +22,7 @@
 #include "qpid/dispatch/alloc.h"
 #include "qpid/dispatch/amqp.h"
 #include "qpid/dispatch/ctools.h"
+#include "buffer_field_api.h"
 
 #include <assert.h>
 #include <inttypes.h>
@@ -237,7 +238,7 @@ const char *qd_parse_turbo(qd_iterator_t          *iter,
         ZERO(turbo);
 
         // Get the buffer pointers for the map element
-        qd_iterator_get_view_cursor(iter, &turbo->bufptr);
+        turbo->bufptr = qd_iterator_get_view_cursor(iter);
 
         // Get description of the map element
         parse_error = get_type_info(iter, &turbo->tag, &turbo->size, &turbo->count,
@@ -259,7 +260,9 @@ const char *qd_parse_turbo(qd_iterator_t          *iter,
     for (int idx=0; idx < n_allocs; idx += 2) {
         qd_parsed_turbo_t *turbo = DEQ_HEAD(*annos);
         assert(turbo);
-        if (qd_iterator_prefix_ptr(&turbo->bufptr, turbo->length_of_size + 1, QD_MA_PREFIX))
+        qd_buffer_field_t key = turbo->bufptr;
+        qd_buffer_field_advance(&key, turbo->length_of_size + 1);
+        if (qd_buffer_field_equal(&key, (const uint8_t*) QD_MA_PREFIX, QD_MA_PREFIX_LEN))
             break;
 
         // leading anno is a user annotation map key
@@ -714,73 +717,16 @@ qd_parsed_field_t *qd_parse_value_by_key(qd_parsed_field_t *field, const char *k
 }
 
 
-// TODO(kgiusti) - de-duplicate all the buffer chain walking code!
-// See DISPATCH-1403
-//
-static inline int _turbo_advance(qd_iterator_pointer_t *ptr, int length)
-{
-    const int start = ptr->remaining;
-    int move = MIN(length, ptr->remaining);
-    while (move > 0) {
-        int avail = qd_buffer_cursor(ptr->buffer) - ptr->cursor;
-        if (move < avail) {
-            ptr->cursor += move;
-            ptr->remaining -= move;
-            break;
-        }
-        move -= avail;
-        ptr->remaining -= avail;
-        if (ptr->remaining == 0) {
-            ptr->cursor += avail;   // move to end
-            break;
-        }
-
-        // More remaining in buffer chain: advance to next buffer in chain
-        assert(DEQ_NEXT(ptr->buffer));
-        if (!DEQ_NEXT(ptr->buffer)) {
-            // this is an error!  ptr->remainer is not accurate.  This should not happen
-            // since the MA field must be completely received at this point
-            // (see DISPATCH-1394).
-            int copied = start - ptr->remaining;
-            ptr->remaining = 0;
-            ptr->cursor += avail;  // force to end of chain
-            return copied;
-        }
-        ptr->buffer = DEQ_NEXT(ptr->buffer);
-        ptr->cursor = qd_buffer_base(ptr->buffer);
-    }
-    return start - ptr->remaining;
-}
-
-
-// TODO(kgiusti): deduplicate!
-// See DISPATCH-1403
-//
-static inline int _turbo_copy(qd_iterator_pointer_t *ptr, char *buffer, int length)
-{
-    int move = MIN(length, ptr->remaining);
-    char * const start = buffer;
-    while (ptr->remaining && move > 0) {
-        int avail = MIN(move, qd_buffer_cursor(ptr->buffer) - ptr->cursor);
-        memcpy(buffer, ptr->cursor, avail);
-        buffer += avail;
-        move -= avail;
-        _turbo_advance(ptr, avail);
-    }
-    return (buffer - start);
-}
-
-
 const char *qd_parse_annotations_v1(
-    bool                   strip_anno_in,
-    qd_iterator_t         *ma_iter_in,
-    qd_parsed_field_t    **ma_ingress,
-    qd_parsed_field_t    **ma_phase,
-    qd_parsed_field_t    **ma_to_override,
-    qd_parsed_field_t    **ma_trace,
-    qd_parsed_field_t    **ma_stream,
-    qd_iterator_pointer_t *blob_pointer,
-    uint32_t              *blob_item_count)
+    bool                strip_anno_in,
+    qd_iterator_t      *ma_iter_in,
+    qd_parsed_field_t **ma_ingress,
+    qd_parsed_field_t **ma_phase,
+    qd_parsed_field_t **ma_to_override,
+    qd_parsed_field_t **ma_trace,
+    qd_parsed_field_t **ma_stream,
+    qd_buffer_field_t  *blob_pointer,
+    uint32_t           *blob_item_count)
 {
     // Do full parse
     qd_iterator_reset(ma_iter_in);
@@ -804,7 +750,7 @@ const char *qd_parse_annotations_v1(
     if (!strip_anno_in) {
         anno = DEQ_HEAD(annos);
         while (anno) {
-            uint8_t * dp;                     // pointer to key name in raw buf or extract buf
+            const uint8_t *dp;                // pointer to key name in raw buf or extract buf
             char key_name[QD_MA_MAX_KEY_LEN]; // key name extracted across buf boundary
             int key_len = anno->size;
 
@@ -814,12 +760,12 @@ const char *qd_parse_annotations_v1(
                 dp = anno->bufptr.cursor + anno->length_of_size + 1;
             } else {
                 // Pull the key name from multiple buffers
-                qd_iterator_pointer_t wbuf = anno->bufptr;    // scratch buf pointers for getting key
-                _turbo_advance(&wbuf, anno->length_of_size + 1);
+                qd_buffer_field_t wbuf = anno->bufptr;    // scratch buf pointers for getting key
+                qd_buffer_field_advance(&wbuf, anno->length_of_size + 1);
                 int t_size = MIN(anno->size, QD_MA_MAX_KEY_LEN); // get this many total
-                key_len = _turbo_copy(&wbuf, key_name, t_size);
+                key_len = qd_buffer_field_ncopy(&wbuf, (uint8_t*) key_name, t_size);
 
-                dp = (uint8_t *)key_name;
+                dp = (const uint8_t *)key_name;
             }
 
             // Verify that the key starts with the prefix.
@@ -924,15 +870,15 @@ const char *qd_parse_annotations_v1(
 
 
 void qd_parse_annotations(
-    bool                   strip_annotations_in,
-    qd_iterator_t         *ma_iter_in,
-    qd_parsed_field_t    **ma_ingress,
-    qd_parsed_field_t    **ma_phase,
-    qd_parsed_field_t    **ma_to_override,
-    qd_parsed_field_t    **ma_trace,
-    qd_parsed_field_t    **ma_stream,
-    qd_iterator_pointer_t *blob_pointer,
-    uint32_t              *blob_item_count)
+    bool                strip_annotations_in,
+    qd_iterator_t      *ma_iter_in,
+    qd_parsed_field_t **ma_ingress,
+    qd_parsed_field_t **ma_phase,
+    qd_parsed_field_t **ma_to_override,
+    qd_parsed_field_t **ma_trace,
+    qd_parsed_field_t **ma_stream,
+    qd_buffer_field_t  *blob_pointer,
+    uint32_t           *blob_item_count)
 {
     *ma_ingress             = 0;
     *ma_phase               = 0;
@@ -964,7 +910,7 @@ void qd_parse_annotations(
 
     // If there are no router annotations then all annotations
     // are the user's opaque blob.
-    qd_iterator_get_view_cursor(raw_iter, blob_pointer);
+    *blob_pointer = qd_iterator_get_view_cursor(raw_iter);
 
     qd_iterator_free(raw_iter);
 
diff --git a/tests/buffer_test.c b/tests/buffer_test.c
index 8ad38e7..95fd7cc 100644
--- a/tests/buffer_test.c
+++ b/tests/buffer_test.c
@@ -19,6 +19,7 @@
 
 #define _GNU_SOURCE
 #include "qpid/dispatch/buffer.h"
+#include "buffer_field_api.h"
 
 #include "test_case.h"
 
@@ -125,6 +126,248 @@ static char *test_buffer_list_append(void *context)
 }
 
 
+static char *test_buffer_field(void *context)
+{
+    char *result = 0;
+    static const uint8_t data1[10] = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
+    static const uint8_t data2[10] = {0xF9, 0xF8, 0xF7, 0xF6, 0xF5, 0xF4, 0xF3, 0xF2, 0xF1, 0xF0};
+    qd_buffer_list_t list;
+    qd_buffer_list_t other_list;
+    qd_buffer_field_t bfield;
+
+    DEQ_INIT(list);
+    DEQ_INIT(other_list);
+
+    // test buffer list (2000 octets):
+    for (int i = 0; i < 100; ++i) {
+        qd_buffer_list_t tmp;
+        DEQ_INIT(tmp);
+        fill_buffer(&tmp, (unsigned char *) data1, 10);
+        qd_buffer_t *b = qd_buffer();
+        DEQ_INSERT_TAIL(tmp, b);  // empty buffer
+        DEQ_APPEND(list, tmp);
+        fill_buffer(&tmp, (unsigned char *) data2, 10);
+        DEQ_APPEND(list, tmp);
+    }
+
+    // verify octet read
+
+    bfield.buffer = DEQ_HEAD(list);
+    bfield.cursor = qd_buffer_base(bfield.buffer);
+    bfield.remaining = 2000;
+
+    int total_octets = 0;
+    size_t expected_length = 2000;
+    uint8_t next_octet = 0;
+    uint8_t octet = 0xFF;
+    while (qd_buffer_field_octet(&bfield, &octet)) {
+        total_octets += 1;
+        expected_length -= 1;
+
+        if (bfield.remaining != expected_length) {
+            result = "octet length not updated";
+            goto exit;
+        }
+        if (octet != next_octet) {
+            result = "Unexpected next octet";
+            goto exit;
+        }
+        if (next_octet == 0x09)
+            next_octet = 0xF9;
+        else if (next_octet == 0xF0)
+            next_octet = 0;
+        else if (next_octet < 0x09)
+            next_octet += 1;
+        else
+            next_octet -= 1;
+    }
+
+    if (total_octets != 2000 || bfield.remaining != 0) {
+        result = "Next octet wrong length";
+        goto exit;
+    }
+
+    // verify advance
+
+    bfield.buffer = DEQ_HEAD(list);
+    bfield.cursor = qd_buffer_base(bfield.buffer);
+    bfield.remaining = 2000;
+
+    size_t amount = qd_buffer_field_advance(&bfield, 2);
+    if (amount != 2) {
+        result = "advance 2 failed";
+        goto exit;
+    }
+
+    if (!qd_buffer_field_octet(&bfield, &octet) || octet != 2) {
+        result = "expected to advance to '2'";
+        goto exit;
+    }
+
+    amount = qd_buffer_field_advance(&bfield, 1995);
+    if (amount != 1995) {
+        result = "advance 1995 failed";
+        goto exit;
+    }
+
+    if (bfield.remaining != 2) {
+        result = "expected 2 last octets";
+        goto exit;
+    }
+
+    if (!qd_buffer_field_octet(&bfield, &octet) || octet != 0xF1) {
+        result = "expected to advance to '0xF1'";
+        goto exit;
+    }
+
+    amount = qd_buffer_field_advance(&bfield, 3);
+    if (amount != 1 || bfield.remaining != 0) {
+        result = "failed to advance to end of field";
+        goto exit;
+    }
+
+    // verify ncopy
+
+    bfield.buffer = DEQ_HEAD(list);
+    bfield.cursor = qd_buffer_base(bfield.buffer);
+    bfield.remaining = 2000;
+
+    uint8_t dest[10];
+    amount = qd_buffer_field_ncopy(&bfield, dest, 5);
+    if (amount != 5) {
+        result = "failed to ncopy 5";
+        goto exit;
+    }
+    if (memcmp(dest, data1, 5)) {
+        result = "ncopy 5 failed";
+        goto exit;
+    }
+    amount = qd_buffer_field_ncopy(&bfield, dest, 10);
+    if (amount != 10) {
+        result = "failed to ncopy 10";
+        goto exit;
+    }
+    if (memcmp(dest, &data1[5], 5) || memcmp(&dest[5], &data2[0], 5)) {
+        result = "ncopy 10 failed";
+        goto exit;
+    }
+    amount = qd_buffer_field_advance(&bfield, 1980);
+    if (amount != 1980) {
+        result = "advance 1980 failed";
+        goto exit;
+    }
+    amount = qd_buffer_field_ncopy(&bfield, dest, 10);
+    if (amount != 5) {
+        result = "ncopy expected 5 failed";
+        goto exit;
+    }
+    if (memcmp(dest, &data2[5], 5) || bfield.remaining != 0) {
+        result = "ncopy at end failed";
+        goto exit;
+    }
+
+    // verify equal
+
+    bfield.buffer = DEQ_HEAD(list);
+    bfield.cursor = qd_buffer_base(bfield.buffer);
+    bfield.remaining = 2000;
+
+    const uint8_t pattern[] = "\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\xF9\xF8\xF7\xF6\xF5\xF4\xF3\xF2\xF1\xF0";
+    const uint8_t pattern_bad[] = "\xF9\xF8\xF7\xF6\xF5\xF4\xF3\xF2\xF1\xF0\xAA";
+    if (qd_buffer_field_equal(&bfield, (uint8_t*) "\x00\x01\x03", 3)) {
+        result = "expected equal 3 to fail";
+        goto exit;
+    }
+    if (bfield.remaining != 2000) {
+        result = "do not advance on failed equal";
+        goto exit;
+    }
+    if (!qd_buffer_field_equal(&bfield, pattern, 20)) {
+        result = "expected pattern match";
+        goto exit;
+    }
+    if (bfield.remaining != 1980) {
+        result = "match did not advance";
+        goto exit;
+    }
+    (void)qd_buffer_field_advance(&bfield, 1960);
+    if (!qd_buffer_field_equal(&bfield, pattern, 10)) {
+        result = "expected sub pattern match";
+        goto exit;
+    }
+    if (qd_buffer_field_equal(&bfield, pattern_bad, 11)) {
+        result = "did not expect sub pattern match";
+        goto exit;
+    }
+    if (bfield.remaining != 10) {
+        result = "mismatch advanced";
+        goto exit;
+    }
+    if (!qd_buffer_field_equal(&bfield, &pattern[10], 9 )) {
+        result = "expected end sub pattern match";
+        goto exit;
+    }
+
+    if (!qd_buffer_field_octet(&bfield, &octet) || octet != 0xF0) {
+        result = "failed to octet read the extra trailing octet in the pattern";
+    }
+
+    // verify buffer list append
+
+    bfield.buffer = DEQ_HEAD(list);
+    bfield.cursor = qd_buffer_base(bfield.buffer);
+    bfield.remaining = 2000;
+
+    qd_buffer_field_t saved_bfield = bfield;
+    qd_buffer_t *bptr = 0;
+
+    qd_buffer_list_append_field(&other_list, &bfield);
+    if (bfield.remaining) {
+        result = "expected to append 2000 octets";
+        goto exit;
+    }
+    bptr = DEQ_HEAD(other_list);
+    uint32_t cmp_count = 0;
+    while (bptr) {
+        if (!qd_buffer_field_equal(&saved_bfield, qd_buffer_base(bptr), qd_buffer_size(bptr))) {
+            result = "expected list and buffers to be equal";
+            goto exit;
+        }
+        cmp_count += qd_buffer_size(bptr);
+        bptr = DEQ_NEXT(bptr);
+    }
+
+    if (saved_bfield.remaining != 0) {
+        result = "expected saved_bfield to be empty";
+        goto exit;
+    }
+
+    if (cmp_count != 2000) {
+        result = "did not compare 2000 octets";
+        goto exit;
+    }
+
+    qd_buffer_list_free_buffers(&other_list);
+
+    const char *append_str = "abcdefghijklmnopqrstuvwxyz";
+    qd_buffer_list_append(&other_list, (const uint8_t *)append_str, strlen(append_str));
+
+    bfield.buffer = DEQ_HEAD(other_list);
+    bfield.cursor = qd_buffer_base(bfield.buffer);
+    bfield.remaining = strlen(append_str);
+
+    if (!qd_buffer_field_equal(&bfield, (const uint8_t*) append_str, strlen(append_str))) {
+        result = "expected to equal append_str";
+        goto exit;
+    }
+
+exit:
+    qd_buffer_list_free_buffers(&list);
+    qd_buffer_list_free_buffers(&other_list);
+    return result;
+}
+
+
 int buffer_tests()
 {
     int result = 0;
@@ -132,6 +375,7 @@ int buffer_tests()
 
     TEST_CASE(test_buffer_list_clone, 0);
     TEST_CASE(test_buffer_list_append, 0);
+    TEST_CASE(test_buffer_field, 0);
 
     return result;
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-dispatch] 02/02: DISPATCH-1487: Message annotations re-write

Posted by kg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kgiusti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit 32ddaa1a9bd38f867aebe4bebbf4c9bf6177c106
Author: Kenneth Giusti <kg...@apache.org>
AuthorDate: Wed Dec 1 09:55:17 2021 -0500

    DISPATCH-1487: Message annotations re-write
    
    This patch completely refactors how message annotations are processed
    by the router. The primary changes done in this re-design are:
    
    - Removal of the old parse_turbo functionality.
    - Avoid all annotation process for link-routed messages
    - Avoid composing all outgoing annotations on the receive path. This
      is now done during qd_message_send().
    - Avoid composing the trace list and ingress manually in various parts
      of the code. This is now done once at qd_message_send().
    - Optimize parsing of incoming annotations using the buffer_field API.
    - Consolidate message compose functions into a message constructor.
    - Optimize the composing of outgoing annotations by:
      - pre-encoding all static key values and router id
      - in-line more of the encoding code
---
 include/qpid/dispatch/amqp.h                       |  48 +-
 include/qpid/dispatch/compose.h                    |  89 +++
 include/qpid/dispatch/message.h                    | 146 ++--
 include/qpid/dispatch/parse.h                      | 119 ++-
 include/qpid/dispatch/protocol_adaptor.h           |  11 -
 include/qpid/dispatch/router.h                     |   3 +-
 src/amqp.c                                         |  22 +-
 src/buffer_field_api.h                             |   6 +-
 src/message.c                                      | 688 +++++++++--------
 src/message_private.h                              |  35 +-
 src/parse.c                                        | 826 ++++++++++-----------
 src/python_embedded.c                              |  15 +-
 src/router_config.c                                |   1 +
 src/router_core/core_client_api.c                  |  15 +-
 src/router_core/exchange_bindings.c                |  16 +-
 src/router_core/management_agent.c                 |  25 +-
 .../address_lookup_server/address_lookup_server.c  |   7 +-
 .../edge_addr_tracking/edge_addr_tracking.c        |  10 +-
 .../modules/heartbeat_edge/heartbeat_edge.c        |   5 +-
 src/router_core/modules/mobile_sync/mobile.c       |  34 +-
 .../modules/test_hooks/core_test_hooks.c           |   4 +-
 src/router_core/transfer.c                         |  26 +-
 src/router_node.c                                  | 200 ++---
 tests/lsan.supp                                    |   4 +
 tests/message_test.c                               | 505 ++++++++-----
 tests/parse_test.c                                 | 165 +++-
 tests/run_unit_tests_size.c                        | 116 ++-
 tests/system_tests_one_router.py                   | 197 +++--
 tests/system_tests_two_routers.py                  |  10 +-
 29 files changed, 1887 insertions(+), 1461 deletions(-)

diff --git a/include/qpid/dispatch/amqp.h b/include/qpid/dispatch/amqp.h
index 4f307ba..6676d44 100644
--- a/include/qpid/dispatch/amqp.h
+++ b/include/qpid/dispatch/amqp.h
@@ -19,6 +19,8 @@
  * under the License.
  */
 
+#include <stdint.h>
+
 /**@file
  * AMQP constants.
  *
@@ -105,27 +107,37 @@ typedef enum {
 } qd_amqp_type_t;
 
 /** @name Message Annotation Headers */
+/// These are the map keys used for router-specific message annotations.
+/// Pre-encoded values are used to optimize building the annotations for
+/// outgoing messages.  Please keep these in sync with the values defined in
+/// amqp.c
 /// @{
 #define QD_ROUTER_ANNOTATIONS_VERSION     1
-extern const char * const QD_MA_PREFIX;
-extern const char * const QD_MA_INGRESS;  ///< Ingress Router
-extern const char * const QD_MA_TRACE;    ///< Trace
-extern const char * const QD_MA_TO;       ///< To-Override
-extern const char * const QD_MA_PHASE;    ///< Phase for override address
-extern const char * const QD_MA_CLASS;    ///< Message-Class
-extern const char * const QD_MA_STREAM;   ///< Indicate streaming message
-
-#define QD_MA_PREFIX_LEN  (9)
-#define QD_MA_INGRESS_LEN (16)
-#define QD_MA_TRACE_LEN   (14)
-#define QD_MA_TO_LEN      (11)
-#define QD_MA_PHASE_LEN   (14)
+extern const char * const    QD_MA_PREFIX;   /// key prefix
+extern const char * const    QD_MA_INGRESS;  ///< Ingress Router Id
+extern const uint8_t * const QD_MA_INGRESS_ENCODED;
+extern const char * const    QD_MA_TRACE;    ///< Trace list
+extern const uint8_t * const QD_MA_TRACE_ENCODED;
+extern const char * const    QD_MA_TO;       ///< To-Override
+extern const uint8_t * const QD_MA_TO_ENCODED;
+extern const char * const    QD_MA_PHASE;    ///< Phase for override address
+extern const uint8_t * const QD_MA_PHASE_ENCODED;
+extern const char * const    QD_MA_STREAM;   ///< Indicate streaming message
+extern const uint8_t * const QD_MA_STREAM_ENCODED;
+extern const char * const    QD_MA_CLASS;    ///< Message-Class (deprecated)
+
+#define QD_MA_PREFIX_LEN          (9)
+#define QD_MA_INGRESS_LEN         (16)
+#define QD_MA_INGRESS_ENCODED_LEN (2 + QD_MA_INGRESS_LEN)
+#define QD_MA_TRACE_LEN           (14)
+#define QD_MA_TRACE_ENCODED_LEN   (2 + QD_MA_TRACE_LEN)
+#define QD_MA_TO_LEN              (11)
+#define QD_MA_TO_ENCODED_LEN      (2 + QD_MA_TO_LEN)
+#define QD_MA_PHASE_LEN           (14)
+#define QD_MA_PHASE_ENCODED_LEN   (2 + QD_MA_PHASE_LEN)
+#define QD_MA_STREAM_LEN          (15)
+#define QD_MA_STREAM_ENCODED_LEN  (2 + QD_MA_STREAM_LEN)
 #define QD_MA_CLASS_LEN   (14)
-#define QD_MA_STREAM_LEN  (15)
-
-extern const int          QD_MA_MAX_KEY_LEN;  ///< strlen of longest key name
-extern const int          QD_MA_N_KEYS;       ///< number of router annotation keys
-extern const int          QD_MA_FILTER_LEN;   ///< size of annotation filter buffer
 /// @}
 
 /** @name Container Capabilities */
diff --git a/include/qpid/dispatch/compose.h b/include/qpid/dispatch/compose.h
index 95c99f0..85e4249 100644
--- a/include/qpid/dispatch/compose.h
+++ b/include/qpid/dispatch/compose.h
@@ -19,9 +19,12 @@
  * under the License.
  */
 
+#include "qpid/dispatch/amqp.h"
 #include "qpid/dispatch/buffer.h"
 #include "qpid/dispatch/iterator.h"
 
+#include <inttypes.h>
+
 /** A linked list of buffers composing a sequence of AMQP data objects. */
 typedef struct qd_composed_field_t qd_composed_field_t;
 
@@ -262,6 +265,92 @@ void qd_compose_insert_opaque_elements(qd_composed_field_t *field,
  */
 void qd_compose_insert_double(qd_composed_field_t *field, double value);
 
+/**
+ * Write a uint32 value in network order to buf.
+ *
+ * This is used throughout the code for writing the size and count components
+ * of variable-sized AMQP types.
+ *
+ * The caller must ensure buf references four contiguous octets in memory.
+ */
+static inline void qd_compose_uint32_encode(uint32_t value, uint8_t buf[])
+{
+  buf[0] = (uint8_t) ((value & 0xFF000000) >> 24);
+  buf[1] = (uint8_t) ((value & 0x00FF0000) >> 16);
+  buf[2] = (uint8_t) ((value & 0x0000FF00) >> 8);
+  buf[3] = (uint8_t) (value & 0x000000FF);
+}
+
+/**
+ * Compose the proper header for a map given entry count and data size.
+ *
+ * The caller must ensure hdr references nine contiguous octets in memory.
+ *
+ * @param size length of encoded map body (not including sizeof(count))
+ * @param count total number of elements in the map
+ * @return length of data in hdr in octets
+ */
+static inline int qd_compose_map_header(uint8_t hdr[], uint32_t size, uint32_t count)
+{
+    if (size + 1 <= UINT8_MAX) {  // + sizeof(count) field
+        hdr[0] = QD_AMQP_MAP8;
+        hdr[1] = size + 1;
+        hdr[2] = count;
+        return 3;
+    } else {
+        hdr[0] = QD_AMQP_MAP32;
+        qd_compose_uint32_encode(size + 4, &hdr[1]);
+        qd_compose_uint32_encode(count, &hdr[5]);
+        return 9;
+    }
+}
+
+
+/**
+ * Compose the proper header for a list given entry count and data size.
+ *
+ * The caller must ensure hdr references nine contiguous octets in memory.
+ *
+ * @param size length of encoded list body (not including sizeof(count))
+ * @param count total number of elements in the list
+ * @return length of data in hdr in octets
+ */
+static inline int qd_compose_list_header(uint8_t hdr[], uint32_t size, uint32_t count)
+{
+    if (size + 1 <= UINT8_MAX) {  // + sizeof(count) field
+        hdr[0] = QD_AMQP_LIST8;
+        hdr[1] = size + 1;
+        hdr[2] = count;
+        return 3;
+    } else {
+        hdr[0] = QD_AMQP_LIST32;
+        qd_compose_uint32_encode(size + 4, &hdr[1]);
+        qd_compose_uint32_encode(count, &hdr[5]);
+        return 9;
+    }
+}
+
+
+/**
+ * Compose the proper header for a string given its length in octets
+ *
+ * The caller must ensure hdr references five contiguous octets in memory.
+ *
+ * @param length of string in octets
+ * @return length of data in hdr in octets
+ */
+static inline int qd_compose_str_header(uint8_t hdr[], uint32_t length)
+{
+    if (length <= UINT8_MAX) {
+        hdr[0] = QD_AMQP_STR8_UTF8;
+        hdr[1] = length;
+        return 2;
+    } else {
+        hdr[0] = QD_AMQP_STR32_UTF8;
+        qd_compose_uint32_encode(length, &hdr[1]);
+        return 5;
+    }
+}
 
 ///@}
 
diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h
index 219e029..d39342f 100644
--- a/include/qpid/dispatch/message.h
+++ b/include/qpid/dispatch/message.h
@@ -148,44 +148,21 @@ qd_message_t *qd_message_copy(qd_message_t *msg);
 /**
  * Retrieve the message annotations from a message and place them in message storage.
  *
- * IMPORTANT: The pointer returned by this function remains owned by the message.
- *            The caller MUST NOT free the parsed field.
- *
  * @param msg Pointer to a received message.
+ * @return 0 on success, else an error message
  */
-void qd_message_message_annotations(qd_message_t *msg);
-
-/**
- * Set the value for the QD_MA_TRACE field in the outgoing message annotations
- * for the message.
- *
- * IMPORTANT: This method takes ownership of the trace_field - the calling
- * method must not reference it after this call.
- *
- * @param msg Pointer to an outgoing message.
- * @param trace_field Pointer to a composed field representing the list that
- * will be used as the value for the QD_MA_TRACE map entry.  If null, the
- * message will not have a QA_MA_TRACE message annotation field.  Ownership of
- * this field is transferred to the message.
- *
- */
-void qd_message_set_trace_annotation(qd_message_t *msg, qd_composed_field_t *trace_field);
+const char *qd_message_parse_annotations(qd_message_t *msg);
 
 /**
  * Set the value for the QD_MA_TO field in the outgoing message annotations for
  * the message.
  *
- * IMPORTANT: This method takes ownership of the to_field - the calling
- * method must not reference it after this call.
- *
  * @param msg Pointer to an outgoing message.
- * @param to_field Pointer to a composed field representing the to override
- * address that will be used as the value for the QD_MA_TO map entry.  If null,
- * the message will not have a QA_MA_TO message annotation field.  Ownership of
- * this field is transferred to the message.
- *
+ * @param to_field Pointer to a c string holding the to override address that
+ * will be used as the value for the outgoing QD_MA_TO annotations map entry.
+ * If null, the message will not have a QA_MA_TO message annotation field.
  */
-void qd_message_set_to_override_annotation(qd_message_t *msg, qd_composed_field_t *to_field);
+void qd_message_set_to_override_annotation(qd_message_t *msg, const char *to_field);
 
 /**
  * Set a phase for the phase annotation in the message.
@@ -198,13 +175,18 @@ void qd_message_set_phase_annotation(qd_message_t *msg, int phase);
 int  qd_message_get_phase_annotation(const qd_message_t *msg);
 
 /**
- * Indicate whether message should be considered to be streaming.
+ * Classify the message as streaming.
+ *
+ * Marking a message as streaming will prevent downstream routers from manually
+ * determining if this message should be sent on an inter-router streaming
+ * link. Once a message is classified as streaming it retains the
+ * classification until it is delivered to an endpoint
  *
  * @param msg Pointer to an outgoing message.
- * @param stream true if the message is streaming
  *
  */
-void qd_message_set_stream_annotation(qd_message_t *msg, bool stream);
+void qd_message_set_streaming_annotation(qd_message_t *msg);
+
 /**
  * Test whether received message should be considered to be streaming.
  *
@@ -212,23 +194,16 @@ void qd_message_set_stream_annotation(qd_message_t *msg, bool stream);
  * @return true if the received message has the streaming annotation set, else false.
  *
  */
-int qd_message_is_streaming(qd_message_t *msg);
+int qd_message_is_streaming(const qd_message_t *msg);
 
 /**
- * Set the value for the QD_MA_INGRESS field in the outgoing message
- * annotations for the message.
- *
- * IMPORTANT: This method takes ownership of the ingress_field - the calling
- * method must not reference it after this call.
- *
- * @param msg Pointer to an outgoing message.
- * @param ingress_field Pointer to a composed field representing ingress router
- * that will be used as the value for the QD_MA_INGRESS map entry.  If null,
- * the message will not have a QA_MA_INGRESS message annotation field.
- * Ownership of this field is transferred to the message.
+ * Prevent the router from doing any transformations to the message annotations
+ * section of the message.
  *
+ * Used by link-routing to completely skip all MA handling, including parsing
+ * MA on receive and restoring/composing MA on send.
  */
-void qd_message_set_ingress_annotation(qd_message_t *msg, qd_composed_field_t *ingress_field);
+void qd_message_disable_router_annotations(qd_message_t *in_msg);
 
 /**
  * Receive message data frame by frame via a delivery.  This function may be called more than once on the same
@@ -295,16 +270,27 @@ qd_iterator_t *qd_message_field_iterator(qd_message_t *msg, qd_message_field_t f
 ssize_t qd_message_field_length(qd_message_t *msg, qd_message_field_t field);
 ssize_t qd_message_field_copy(qd_message_t *msg, qd_message_field_t field, char *buffer, size_t *hdr_length);
 
+// Create a message using composed fields to supply content.
 //
-// Functions for composed messages
+// This message constructor will create a new message using each fields buffers
+// concatenated in order (f1 first, f2 second, etc). There is no need to
+// provide all three fields: concatenation stops at the first null fx pointer.
 //
+// Note well that while this constructor can support up to three separate
+// composed fields it is more efficent to chain as many message sections as
+// possible into as few separate composed fields as possible.  This means that
+// any passed composed field can contain several message sections.
+//
+// This constructor takes ownership of the composed fields - the caller must
+// not reference them after the call.
+//
+qd_message_t *qd_message_compose(qd_composed_field_t *f1,
+                                 qd_composed_field_t *f2,
+                                 qd_composed_field_t *f3,
+                                 bool receive_complete);
 
-// Convenience Functions
-void qd_message_compose_1(qd_message_t *msg, const char *to, qd_buffer_list_t *buffers);
-void qd_message_compose_2(qd_message_t *msg, qd_composed_field_t *content, bool receive_complete);
-void qd_message_compose_3(qd_message_t *msg, qd_composed_field_t *content1, qd_composed_field_t *content2, bool receive_complete);
-void qd_message_compose_4(qd_message_t *msg, qd_composed_field_t *content1, qd_composed_field_t *content2, qd_composed_field_t *content3, bool receive_complete);
-void qd_message_compose_5(qd_message_t *msg, qd_composed_field_t *field1, qd_composed_field_t *field2, qd_composed_field_t *field3, qd_composed_field_t *field4, bool receive_complete);
+// deprecated: use qd_message_compose() to create locally generated messages
+void qd_message_compose_3(qd_message_t *msg, qd_composed_field_t *field1, qd_composed_field_t *field2, bool receive_complete);
 
 /**
  * qd_message_extend
@@ -452,44 +438,66 @@ int qd_message_repr_len();
 qd_log_source_t* qd_message_log_source();
 
 /**
- * Accessor for message field ingress
- * 
+ * Accessor for incoming messages ingress router annotation
+ *
  * @param msg A pointer to the message
- * @return the parsed field
+ * @return the parsed field or 0 if no ingress present in msg
  */
-qd_parsed_field_t *qd_message_get_ingress(qd_message_t *msg);
+qd_parsed_field_t *qd_message_get_ingress_router(qd_message_t *msg);
 
 /**
- * Accessor for message field phase
- * 
+ * Discard received ingress router annotation.  This will cause the forwarded
+ * message to use the local router ID for the ingress router annotation.
+ *
+ * Used by exchange-bindings feature.
+ *
  * @param msg A pointer to the message
- * @return the parsed field
  */
-qd_parsed_field_t *qd_message_get_phase(qd_message_t *msg);
+void qd_message_reset_ingress_router_annotation(qd_message_t *msg);
+
+/**
+ * Do not include a ingress router annotation in the forwarded message.
+ *
+ * Used by the edge-router to filter out the ingress router annotation.
+ *
+ * @param msg A pointer to the message
+ */
+void qd_message_disable_ingress_router_annotation(qd_message_t *msg);
 
 /**
  * Accessor for message field to_override
- * 
+ *
  * @param msg A pointer to the message
- * @return the parsed field
+ * @return the parsed field or 0 if no to_override present
  */
 qd_parsed_field_t *qd_message_get_to_override(qd_message_t *msg);
 
 /**
- * Accessor for message field trace
- * 
- * @param msg A pointer to the message
+ * Accessor for incoming messages trace annotation
+ *
+ * @param msg A pointer to the received message
  * @return the parsed field
  */
 qd_parsed_field_t *qd_message_get_trace(qd_message_t *msg);
 
 /**
- * Accessor for message field phase
- * 
+ * Discard received trace annotations.  This will cause the forwarded message
+ * to re-start the trace list with the current router ID as the only entry.
+ *
+ * Used by exchange-bindings feature.
+ *
+ * @param msg A pointer to the message
+ */
+void qd_message_reset_trace_annotation(qd_message_t *msg);
+
+/**
+ * Do not include a trace list annotation in the forwarded message.
+ *
+ * Used by the edge-router to filter out the trace list.
+ *
  * @param msg A pointer to the message
- * @return the phase as an integer
  */
-int                qd_message_get_phase_val  (qd_message_t *msg);
+void qd_message_disable_trace_annotation(qd_message_t *msg);
 
 /**
  * Should the message be discarded.
diff --git a/include/qpid/dispatch/parse.h b/include/qpid/dispatch/parse.h
index 5fe0552..9b1efa4 100644
--- a/include/qpid/dispatch/parse.h
+++ b/include/qpid/dispatch/parse.h
@@ -27,67 +27,20 @@
  *
  * @defgroup parse parse
  *
- * Parse data from qd_iterator_t into a tree structure represeniting
+ * Parse data from raw octets into a tree structure representing
  * an AMQP data type tree.
  *@{
  */
 
 typedef struct qd_parsed_field_t qd_parsed_field_t;
-typedef struct qd_parsed_turbo_t qd_parsed_turbo_t;
-
-DEQ_DECLARE(qd_parsed_turbo_t, qd_parsed_turbo_list_t);
-
-/**@file
- * Parse raw data fields into skeletal AMQP data trees.
- *
- * @defgroup parse parse
- *
- * Parse data from qd_iterator_t into a tree structure representing
- * an AMQP data type tree.
- *@{
- */
-struct qd_parsed_turbo_t {
-    DEQ_LINKS(qd_parsed_turbo_t);
-    qd_buffer_field_t bufptr;  // location/size of field in buffer
-    uint8_t           tag;
-    uint32_t          size;
-    uint32_t          count;
-    uint32_t          length_of_size;
-    uint32_t          length_of_count;
-};
 
 /**
- * Parse a field delimited by a field iterator.
+ * Parse a field delimited by a buffer field.
  *
- * @param iter Field iterator for the field being parsed
+ * @param bfield holds the data to be parsed
  * @return A pointer to the newly created field.
  */
-qd_parsed_field_t *qd_parse(qd_iterator_t *iter);
-
-/**
- * Parse message annotations map from a raw iterator
- * It's called 'turbo' because it is supposed to be fast.
- * Distinguish between user annotations and router annotations
- * Enumerate the user entries count and size.
- * Return the router entries in a list.
- *
- * This function knows a priori:
- *   * the iter is a message annotations map
- *   * the map key prefix is QD_MA_PREFIX
- *   * there are 4 router map annotations at most
- *   * the router annotations are at the end of the map
- *
- * @param iter Field iterator for the message annotations map
- * @param annos returned list of router annotations map entries
- * @param user_entries number of map user items
- * @param user_bytes number of map user item bytes
- * @return 0 if success else pointer to error string
- */
-const char * qd_parse_turbo(
-                       qd_iterator_t          *iter,
-                       qd_parsed_turbo_list_t *annos,
-                       uint32_t               *user_entries,
-                       uint32_t               *user_bytes);
+qd_parsed_field_t *qd_parse(const qd_iterator_t *iter);
 
 /**
  * Free the resources associated with a parsed field.
@@ -146,7 +99,6 @@ uint8_t qd_parse_tag(qd_parsed_field_t *field);
  */
 qd_iterator_t *qd_parse_raw(qd_parsed_field_t *field);
 
-
 /**
  * Return an iterator for the typed content of the field. Contains the type followed by the raw content.
  *
@@ -159,6 +111,21 @@ qd_iterator_t *qd_parse_raw(qd_parsed_field_t *field);
 qd_iterator_t *qd_parse_typed(qd_parsed_field_t *field);
 
 /**
+ * Return the location and length of the raw value of the parsed field.
+ *
+ * The returned value does not include the parsed fields header. If the field
+ * is a container (map/list) then the returned value is the content of the
+ * container, including each entries encoded header.
+ *
+ * IMPORTANT: The returned location remains valid for the lifetime of the
+ * parsed field.
+ *
+ * @param field The field pointer returned by qd_parse.
+ * @return The location in the buffer chain containing the field's raw content.
+ */
+qd_buffer_field_t qd_parse_value(const qd_parsed_field_t *field);
+
+/**
  * Return the raw content as an unsigned integer up to 32-bits.  This is
  * valid only for scalar fields of a fixed size of 4-octets or fewer.
  *
@@ -203,6 +170,15 @@ int64_t qd_parse_as_long(qd_parsed_field_t *field);
 bool qd_parse_as_bool(qd_parsed_field_t *field);
 
 /**
+ * Return the raw content as a c-string.  This is valid only for SYM* and STR*
+ * parsed types. The caller is responsible for freeing the returned string.
+ *
+ * @param field The field pointer returned by qd_parse.
+ * @return a C string containing the value or 0 if conversion failed.
+ */
+char *qd_parse_as_string(const qd_parsed_field_t *parsed_field);
+
+/**
  * Return the number of sub-fields in a compound field.  If the field is
  * a list or array, this is the number of items in the list/array.  If
  * the field is a map, this is the number of key/value pairs in the map
@@ -291,19 +267,19 @@ qd_parsed_field_t *qd_parse_value_by_key(qd_parsed_field_t *field, const char *k
  * @param ma_phase returned parsed field: phase
  * @param ma_to_override returned parsed field: override
  * @param ma_trace returned parsed field: trace
- * @param blob_pointer returned buffer pointer to user's annotation blob
+ * @param  returned buffer pointer to user's annotation blob
  * @param blob_item_count number of map entries referenced by blob_iterator
  */
-void qd_parse_annotations(
-    bool                strip_annotations_in,
-    qd_iterator_t      *ma_iter_in,
-    qd_parsed_field_t **ma_ingress,
-    qd_parsed_field_t **ma_phase,
-    qd_parsed_field_t **ma_to_override,
-    qd_parsed_field_t **ma_trace,
-    qd_parsed_field_t **ma_stream,
-    qd_buffer_field_t  *blob_pointer,
-    uint32_t           *blob_item_count);
+const char *qd_parse_annotations(
+    bool                   strip_annotations_in,
+    qd_iterator_t         *ma_iter_in,
+    qd_parsed_field_t    **ma_ingress,
+    qd_parsed_field_t    **ma_phase,
+    qd_parsed_field_t    **ma_to_override,
+    qd_parsed_field_t    **ma_trace,
+    qd_parsed_field_t    **ma_stream,
+    qd_buffer_field_t     *user_annotations,
+    uint32_t              *user_count);
 
 /**
  * Identify which annotation is being parsed
@@ -317,6 +293,23 @@ typedef enum {
     QD_MAE_NONE
 } qd_ma_enum_t;
 
+
+/**
+ * Parse a 32 bit unsigned integer in network order to a native uint32 value.
+ *
+ * This is used throughout the code for decoding the size and count components
+ * of variable-sized AMQP types.
+ *
+ * The caller must ensure buf references four contiguous octets in memory.
+ */
+static inline uint32_t qd_parse_uint32_decode(const uint8_t buf[])
+{
+    return (((uint32_t) buf[0]) << 24)
+        |  (((uint32_t) buf[1]) << 16)
+        |  (((uint32_t) buf[2]) << 8)
+        |  ((uint32_t) buf[3]);
+}
+
 ///@}
 
 #endif
diff --git a/include/qpid/dispatch/protocol_adaptor.h b/include/qpid/dispatch/protocol_adaptor.h
index a384d3e..00551c2 100644
--- a/include/qpid/dispatch/protocol_adaptor.h
+++ b/include/qpid/dispatch/protocol_adaptor.h
@@ -835,17 +835,6 @@ void qdr_link_detach(qdr_link_t *link, qd_detach_type_t dt, qdr_error_t *error);
 void qdr_link_delete(qdr_link_t *link);
 
 /**
- * qdr_new_message_annotate
- *
- * Annotate a newly originated message prior to transmission.  This function adds the annotations
- * for ORIGIN and TRACE as well as any other needed pre-send annotations.
- *
- * @param core Pointer to the core object.
- * @param msg Pointer to the message to be annotated.
- */
-void qdr_new_message_annotate(qdr_core_t *core, qd_message_t *msg);
-
-/**
  * qdr_link_deliver
  *
  * Deliver a message to the router core for forwarding.  This function is used in cases where
diff --git a/include/qpid/dispatch/router.h b/include/qpid/dispatch/router.h
index a69cf40..8527948 100644
--- a/include/qpid/dispatch/router.h
+++ b/include/qpid/dispatch/router.h
@@ -91,7 +91,8 @@ struct qd_router_forwarder_t {
 
 typedef void (*qd_router_message_cb_t)(void *context, qd_message_t *msg, int link_id);
 
-const char *qd_router_id(const qd_dispatch_t *qd);
+const char *qd_router_id(void);
+const uint8_t *qd_router_id_encoded(size_t *len);  // encoded as AMQP STR
 
 qdr_core_t *qd_router_core(qd_dispatch_t *qd);
 
diff --git a/src/amqp.c b/src/amqp.c
index fab6269..e2ef737 100644
--- a/src/amqp.c
+++ b/src/amqp.c
@@ -24,16 +24,18 @@
 #include <stdlib.h>
 #include <string.h>
 
-const char * const QD_MA_PREFIX  = "x-opt-qd.";
-const char * const QD_MA_INGRESS = "x-opt-qd.ingress";
-const char * const QD_MA_TRACE   = "x-opt-qd.trace";
-const char * const QD_MA_TO      = "x-opt-qd.to";
-const char * const QD_MA_PHASE   = "x-opt-qd.phase";
-const char * const QD_MA_CLASS   = "x-opt-qd.class";
-const char * const QD_MA_STREAM  = "x-opt-qd.stream";
-const int          QD_MA_MAX_KEY_LEN = 16;
-const int          QD_MA_N_KEYS      = 5;  // max number of router annotations to send/receive
-const int          QD_MA_FILTER_LEN  = 5;  // N tailing inbound entries to search for stripping
+const char * const QD_MA_PREFIX             = "x-opt-qd.";
+const char * const    QD_MA_INGRESS         = "x-opt-qd.ingress";
+const uint8_t * const QD_MA_INGRESS_ENCODED = (uint8_t*) "\xA3\x10""x-opt-qd.ingress";
+const char * const    QD_MA_TRACE           = "x-opt-qd.trace";
+const uint8_t * const QD_MA_TRACE_ENCODED   = (uint8_t*) "\xA3\x0E""x-opt-qd.trace";
+const char * const    QD_MA_TO              = "x-opt-qd.to";
+const uint8_t * const QD_MA_TO_ENCODED      = (uint8_t*) "\xA3\x0B""x-opt-qd.to";
+const char * const    QD_MA_PHASE           = "x-opt-qd.phase";
+const uint8_t * const QD_MA_PHASE_ENCODED   = (uint8_t*) "\xA3\x0E""x-opt-qd.phase";
+const char * const    QD_MA_STREAM          = "x-opt-qd.stream";
+const uint8_t * const QD_MA_STREAM_ENCODED  = (uint8_t*) "\xA3\x0F""x-opt-qd.stream";
+const char * const QD_MA_CLASS              = "x-opt-qd.class";  // deprecated
 
 const char * const QD_CAPABILITY_ROUTER_CONTROL   = "qd.router";
 const char * const QD_CAPABILITY_ROUTER_DATA      = "qd.router-data";
diff --git a/src/buffer_field_api.h b/src/buffer_field_api.h
index a2ac8cb..7c60ac5 100644
--- a/src/buffer_field_api.h
+++ b/src/buffer_field_api.h
@@ -26,6 +26,7 @@
  */
 
 #include "qpid/dispatch/buffer_field.h"
+#include "qpid/dispatch/parse.h"
 #include <stdbool.h>
 
 /* qd_buffer_field_normalize
@@ -187,10 +188,7 @@ static inline bool qd_buffer_field_uint32(qd_buffer_field_t *bfield, uint32_t *v
     if (bfield->remaining >= 4) {
         uint8_t buf[4];
         qd_buffer_field_ncopy(bfield, buf, 4);
-        *value = (((uint32_t) buf[0]) << 24)
-            | (((uint32_t) buf[1]) << 16)
-            | (((uint32_t) buf[2]) << 8)
-            | ((uint32_t) buf[3]);
+        *value = qd_parse_uint32_decode(buf);
         return true;
     }
     return false;
diff --git a/src/message.c b/src/message.c
index 1a79fe2..69827cb 100644
--- a/src/message.c
+++ b/src/message.c
@@ -24,9 +24,9 @@
 #include "connection_manager_private.h"
 #include "message_private.h"
 #include "policy.h"
+#include "buffer_field_api.h"
 
 #include "qpid/dispatch/amqp.h"
-#include "qpid/dispatch/buffer.h"
 #include "qpid/dispatch/ctools.h"
 #include "qpid/dispatch/error.h"
 #include "qpid/dispatch/iterator.h"
@@ -42,7 +42,6 @@
 #include <string.h>
 #include <time.h>
 
- #define CHECK_Q2(blist) assert(DEQ_SIZE(blist) <= QD_QLIMIT_Q2_LOWER)
 
 #define LOCK   sys_mutex_lock
 #define UNLOCK sys_mutex_unlock
@@ -433,9 +432,12 @@ static bool advance(unsigned char **cursor, qd_buffer_t **buffer, int consume)
  * Advance cursor through buffer chain by 'consume' bytes.
  * Cursor and buffer args are advanced to point to new position in buffer chain.
  * Buffer content that is consumed is optionally passed to handler.
- *  - if the number of bytes in the buffer chain is less than or equal to 
+ *  - if the number of bytes in the buffer chain is less than or equal to
  *    the consume number then return the last buffer in the chain
- *    and a cursor pointing t the first unused byte in the buffer.
+ *    and a cursor pointing to the first unused byte in the buffer.
+ *  - if the number of bytes in the buffer chain is greater than the consume
+ *    number the returned buffer/cursor will point to the next available
+ *    octet of data.
  *  - the original buffer chain is not changed or freed.
  *
  * @param cursor pointer into current buffer content
@@ -444,12 +446,12 @@ static bool advance(unsigned char **cursor, qd_buffer_t **buffer, int consume)
  * @param handler pointer to processor function
  * @param context opaque argument for handler
  */
-static void advance_guarded(unsigned char **cursor, qd_buffer_t **buffer, int consume, buffer_process_t handler, void *context)
+static void advance_guarded(const uint8_t **cursor, qd_buffer_t **buffer, int consume, buffer_process_t handler, void *context)
 {
-    unsigned char *local_cursor = *cursor;
+    const uint8_t *local_cursor = *cursor;
     qd_buffer_t   *local_buffer = *buffer;
 
-    int remaining = qd_buffer_size(local_buffer) - (local_cursor - qd_buffer_base(local_buffer));
+    int remaining = qd_buffer_cursor(local_buffer) - local_cursor;
     while (consume > 0) {
         if (consume < remaining) {
             if (handler)
@@ -460,13 +462,13 @@ static void advance_guarded(unsigned char **cursor, qd_buffer_t **buffer, int co
             if (handler)
                 handler(context, local_cursor, remaining);
             consume -= remaining;
-            if (!local_buffer->next) {
-                local_cursor = qd_buffer_base(local_buffer) + qd_buffer_size(local_buffer);
+            if (!DEQ_NEXT(local_buffer)) {
+                local_cursor = qd_buffer_cursor(local_buffer);
                 break;
             }
-            local_buffer = local_buffer->next;
+            local_buffer = DEQ_NEXT(local_buffer);
             local_cursor = qd_buffer_base(local_buffer);
-            remaining = qd_buffer_size(local_buffer) - (local_cursor - qd_buffer_base(local_buffer));
+            remaining = qd_buffer_size(local_buffer);
         }
     }
 
@@ -1025,7 +1027,6 @@ qd_message_t *qd_message()
     msg->content->lock = sys_mutex();
     sys_atomic_init(&msg->content->aborted, 0);
     sys_atomic_init(&msg->content->discard, 0);
-    sys_atomic_init(&msg->content->ma_stream, 0);
     sys_atomic_init(&msg->content->no_body, 0);
     sys_atomic_init(&msg->content->oversize, 0);
     sys_atomic_init(&msg->content->priority, QDR_DEFAULT_PRIORITY);
@@ -1044,9 +1045,7 @@ void qd_message_free(qd_message_t *in_msg)
     qd_message_pvt_t          *msg        = (qd_message_pvt_t*) in_msg;
     qd_message_q2_unblocker_t  q2_unblock = {0};
 
-    qd_buffer_list_free_buffers(&msg->ma_to_override);
-    qd_buffer_list_free_buffers(&msg->ma_trace);
-    qd_buffer_list_free_buffers(&msg->ma_ingress);
+    free(msg->ma_to_override);
 
     sys_atomic_destroy(&msg->send_complete);
 
@@ -1100,8 +1099,6 @@ void qd_message_free(qd_message_t *in_msg)
             qd_iterator_free(content->ma_field_iter_in);
         if (content->ma_pf_ingress)
             qd_parse_free(content->ma_pf_ingress);
-        if (content->ma_pf_phase)
-            qd_parse_free(content->ma_pf_phase);
         if (content->ma_pf_to_override)
             qd_parse_free(content->ma_pf_to_override);
         if (content->ma_pf_trace)
@@ -1115,7 +1112,6 @@ void qd_message_free(qd_message_t *in_msg)
         sys_mutex_free(content->lock);
         sys_atomic_destroy(&content->aborted);
         sys_atomic_destroy(&content->discard);
-        sys_atomic_destroy(&content->ma_stream);
         sys_atomic_destroy(&content->no_body);
         sys_atomic_destroy(&content->oversize);
         sys_atomic_destroy(&content->priority);
@@ -1140,10 +1136,6 @@ qd_message_t *qd_message_copy(qd_message_t *in_msg)
 
     ZERO(copy);
 
-    qd_buffer_list_clone(&copy->ma_to_override, &msg->ma_to_override);
-    qd_buffer_list_clone(&copy->ma_trace, &msg->ma_trace);
-    qd_buffer_list_clone(&copy->ma_ingress, &msg->ma_ingress);
-    copy->ma_phase = msg->ma_phase;
     copy->strip_annotations_in  = msg->strip_annotations_in;
 
     copy->content = content;
@@ -1155,78 +1147,73 @@ qd_message_t *qd_message_copy(qd_message_t *in_msg)
     copy->tag_sent      = false;
     copy->is_fanout     = false;
 
-    qd_message_message_annotations((qd_message_t*) copy);
+    if (!content->ma_disabled) {
+        if (msg->ma_to_override)
+            copy->ma_to_override = qd_strdup(msg->ma_to_override);
+        copy->ma_filter_trace   = msg->ma_filter_trace;
+        copy->ma_filter_ingress = msg->ma_filter_ingress;
+        copy->ma_reset_trace    = msg->ma_reset_trace;
+        copy->ma_reset_ingress  = msg->ma_reset_ingress;
+        copy->ma_phase          = msg->ma_phase;
+        copy->ma_streaming      = msg->ma_streaming;
+    }
 
     sys_atomic_inc(&content->ref_count);
 
     return (qd_message_t*) copy;
 }
 
-void qd_message_message_annotations(qd_message_t *in_msg)
+const char *qd_message_parse_annotations(qd_message_t *in_msg)
 {
     qd_message_pvt_t     *msg     = (qd_message_pvt_t*) in_msg;
     qd_message_content_t *content = msg->content;
 
+    assert(!content->ma_disabled);  // should not be called when skipping MA processing
     if (content->ma_parsed)
-        return ;
+        return 0;
     content->ma_parsed = true;
 
     content->ma_field_iter_in = qd_message_field_iterator(in_msg, QD_FIELD_MESSAGE_ANNOTATION);
     if (content->ma_field_iter_in == 0)
-        return;
+        return 0;
 
     qd_parsed_field_t *ma_pf_stream = 0;
-    qd_parse_annotations(
-        msg->strip_annotations_in,
-        content->ma_field_iter_in,
-        &content->ma_pf_ingress,
-        &content->ma_pf_phase,
-        &content->ma_pf_to_override,
-        &content->ma_pf_trace,
-        &ma_pf_stream,
-        &content->ma_user_annotation_blob,
-        &content->ma_count);
-
-    // Construct pseudo-field location of user annotations blob
-    // This holds all annotations if no router-specific annotations are present
-    if (content->ma_count > 0) {
-        qd_field_location_t *cf  = &content->field_user_annotations;
-        qd_buffer_field_t   *uab = &content->ma_user_annotation_blob;
-        cf->buffer = uab->buffer;
-        cf->offset = uab->cursor - qd_buffer_base(uab->buffer);
-        cf->length = uab->remaining;
-        cf->parsed = true;
-    }
-
-    // extract phase
-    if (content->ma_pf_phase) {
-        content->ma_int_phase = qd_parse_as_int(content->ma_pf_phase);
+    qd_parsed_field_t *ma_pf_phase = 0;
+    const char *err = qd_parse_annotations(msg->strip_annotations_in,
+                                           content->ma_field_iter_in,
+                                           &content->ma_pf_ingress,
+                                           &ma_pf_phase,
+                                           &content->ma_pf_to_override,
+                                           &content->ma_pf_trace,
+                                           &ma_pf_stream,
+                                           &content->ma_user_annotations,
+                                           &content->ma_user_count);
+    if (err)
+        return(err);
+
+    // cache incoming values into the message
+
+    if (ma_pf_phase) {
+        msg->ma_phase = qd_parse_as_int(ma_pf_phase);
+        qd_parse_free(ma_pf_phase);
     }
 
     if (ma_pf_stream) {
-        SET_ATOMIC_BOOL(&content->ma_stream, qd_parse_as_int(ma_pf_stream));
+        msg->ma_streaming = true;
         qd_parse_free(ma_pf_stream);
     }
 
-    return;
+    return 0;
 }
 
 
-void qd_message_set_trace_annotation(qd_message_t *in_msg, qd_composed_field_t *trace_field)
+void qd_message_set_to_override_annotation(qd_message_t *in_msg, const char *to_field)
 {
     qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
-    qd_buffer_list_free_buffers(&msg->ma_trace);
-    qd_compose_take_buffers(trace_field, &msg->ma_trace);
-    qd_compose_free(trace_field);
+    free(msg->ma_to_override);
+    msg->ma_to_override = to_field ? qd_strdup(to_field) : 0;
 }
 
-void qd_message_set_to_override_annotation(qd_message_t *in_msg, qd_composed_field_t *to_field)
-{
-    qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
-    qd_buffer_list_free_buffers(&msg->ma_to_override);
-    qd_compose_take_buffers(to_field, &msg->ma_to_override);
-    qd_compose_free(to_field);
-}
 
 void qd_message_set_phase_annotation(qd_message_t *in_msg, int phase)
 {
@@ -1240,20 +1227,21 @@ int qd_message_get_phase_annotation(const qd_message_t *in_msg)
     return msg->ma_phase;
 }
 
-void qd_message_set_stream_annotation(qd_message_t *in_msg, bool stream)
+void qd_message_set_streaming_annotation(qd_message_t *in_msg)
 {
     qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
-    SET_ATOMIC_BOOL(&msg->content->ma_stream, stream);
+    msg->ma_streaming = true;
 }
 
-void qd_message_set_ingress_annotation(qd_message_t *in_msg, qd_composed_field_t *ingress_field)
+
+void qd_message_disable_router_annotations(qd_message_t *msg)
 {
-    qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
-    qd_buffer_list_free_buffers(&msg->ma_ingress);
-    qd_compose_take_buffers(ingress_field, &msg->ma_ingress);
-    qd_compose_free(ingress_field);
+    qd_message_content_t *content = ((qd_message_pvt_t *)msg)->content;
+    content->ma_disabled = true;
+    content->ma_parsed = true;
 }
 
+
 bool qd_message_is_discard(qd_message_t *msg)
 {
     if (!msg)
@@ -1670,133 +1658,185 @@ static void send_handler(void *context, const unsigned char *start, int length)
 }
 
 
-static void compose_message_annotations_v0(qd_message_pvt_t *msg, qd_buffer_list_t *out)
+// Restore MA to the original user-supplied MA values. This merely sets up the
+// annotations section and map header to hold only the user supplied
+// annotations.
+//
+// @return the length of the ma_header field in octets
+//
+static int restore_user_message_annotations(qd_message_pvt_t *msg, uint8_t *ma_header)
 {
-    if (msg->content->ma_count > 0) {
-        qd_composed_field_t *out_ma = qd_compose(QD_PERFORMATIVE_MESSAGE_ANNOTATIONS, 0);
-
-        qd_compose_start_map(out_ma);
+    qd_message_content_t *content = msg->content;
+    if (content->ma_user_count) {
 
-        // Bump the map size and count to reflect user's blob.
-        // Note that the blob is not inserted here. This code adjusts the
-        // size/count of the map that is under construction and the content
-        // is inserted by router-node
-        qd_compose_insert_opaque_elements(out_ma, msg->content->ma_count,
-                                          msg->content->field_user_annotations.length);
-        qd_compose_end_map(out_ma);
-        qd_compose_take_buffers(out_ma, out);
+        // setup the MA descriptor:
+        ma_header[0] = 0;
+        ma_header[1] = QD_AMQP_SMALLULONG;
+        ma_header[2] = QD_PERFORMATIVE_MESSAGE_ANNOTATIONS;
 
-        qd_compose_free(out_ma);
+        // setup the MA MAP header.  The type of header (MAP32/8) depends on
+        // the size of the map contents.
+        const int map_hdr_len = qd_compose_map_header(&ma_header[3],
+                                                      content->ma_user_annotations.remaining,
+                                                      content->ma_user_count);
+        return map_hdr_len + 3;
     }
+    return 0;
 }
 
 
-static void compose_message_annotations_v1(qd_message_pvt_t *msg, qd_buffer_list_t *out,
-                                           qd_buffer_list_t *out_trailer)
+// Generate the MA section header and the router annotations. Any user
+// annotations will be sent after ma_header and before ma_trailer.
+//
+// @return the length of the ma_header field in octets
+//
+static int compose_router_message_annotations(qd_message_pvt_t *msg, uint8_t *ma_header,
+                                              qd_buffer_list_t *ma_trailer)
 {
-    qd_composed_field_t *out_ma = qd_compose(QD_PERFORMATIVE_MESSAGE_ANNOTATIONS, 0);
+    qd_message_content_t *content = msg->content;
 
-    bool map_started = false;
+    // account for any user annotations to be sent before the router annotations
+    //
+    uint32_t mcount = content->ma_user_count;
+    uint32_t msize  = content->ma_user_annotations.remaining;
 
-    int field_count = 0;
-    qd_composed_field_t *field = qd_compose_subfield(0);
-    if (!field)
-        return;
+    if (msg->ma_phase) {
+        assert(msg->ma_phase < 128); // smallint
+        mcount += 2;
 
-    // add dispatch router specific annotations if any are defined
-    if (!DEQ_IS_EMPTY(msg->ma_to_override) ||
-        !DEQ_IS_EMPTY(msg->ma_trace) ||
-        !DEQ_IS_EMPTY(msg->ma_ingress) ||
-        msg->ma_phase != 0 ||
-        IS_ATOMIC_FLAG_SET(&msg->content->ma_stream)) {
+        // key:
+        msize += QD_MA_PHASE_ENCODED_LEN;
+        qd_buffer_list_append(ma_trailer, QD_MA_PHASE_ENCODED, QD_MA_PHASE_ENCODED_LEN);
 
-        if (!map_started) {
-            qd_compose_start_map(out_ma);
-            map_started = true;
-        }
+        // value:
+        msize += 2;  // tag + 1 byte value
+        uint8_t ma_phase[2];
+        ma_phase[0] = QD_AMQP_SMALLINT;
+        ma_phase[1] = msg->ma_phase;
+        qd_buffer_list_append(ma_trailer, ma_phase, 2);
+    }
 
-        if (!DEQ_IS_EMPTY(msg->ma_to_override)) {
-            qd_compose_insert_symbol(field, QD_MA_TO);
-            qd_compose_insert_buffers(field, &msg->ma_to_override);
-            field_count++;
-        }
+    if (msg->ma_streaming) {
+        mcount += 2;
 
-        if (!DEQ_IS_EMPTY(msg->ma_trace)) {
-            qd_compose_insert_symbol(field, QD_MA_TRACE);
-            qd_compose_insert_buffers(field, &msg->ma_trace);
-            field_count++;
-        }
+        // key:
+        msize += QD_MA_STREAM_ENCODED_LEN;
+        qd_buffer_list_append(ma_trailer, QD_MA_STREAM_ENCODED, QD_MA_STREAM_ENCODED_LEN);
 
-        if (!DEQ_IS_EMPTY(msg->ma_ingress)) {
-            qd_compose_insert_symbol(field, QD_MA_INGRESS);
-            qd_compose_insert_buffers(field, &msg->ma_ingress);
-            field_count++;
-        }
+        // value: historically sent as int value 1:
+        msize += 2;
+        const uint8_t streaming[2] = {QD_AMQP_SMALLINT, 1};
+        qd_buffer_list_append(ma_trailer, streaming, 2);
+    }
 
-        if (msg->ma_phase != 0) {
-            qd_compose_insert_symbol(field, QD_MA_PHASE);
-            qd_compose_insert_int(field, msg->ma_phase);
-            field_count++;
-        }
+    if (msg->ma_to_override || content->ma_pf_to_override) {
+        mcount += 2;
 
-        if (IS_ATOMIC_FLAG_SET(&msg->content->ma_stream)) {
-            qd_compose_insert_symbol(field, QD_MA_STREAM);
-            qd_compose_insert_int(field, 1);
-            field_count++;
-        }
-        // pad out to N fields
-        for  (; field_count < QD_MA_N_KEYS; field_count++) {
-            qd_compose_insert_symbol(field, QD_MA_PREFIX);
-            qd_compose_insert_string(field, "X");
+        // key:
+        msize += QD_MA_TO_ENCODED_LEN;
+        qd_buffer_list_append(ma_trailer, QD_MA_TO_ENCODED, QD_MA_TO_ENCODED_LEN);
+
+        // value: message specific value takes precedence over value in
+        // original received message to allow overriding the to-override
+        uint8_t hdr[5];  // max length of encoded str8/32 header
+        if (msg->ma_to_override) {
+            const size_t str_len = strlen(msg->ma_to_override);
+            const int hdr_len = qd_compose_str_header(hdr, str_len);
+
+            msize += hdr_len;
+            qd_buffer_list_append(ma_trailer, hdr, hdr_len);
+
+            msize += str_len;
+            qd_buffer_list_append(ma_trailer, (uint8_t*) msg->ma_to_override, str_len);
+
+        } else {
+            qd_buffer_field_t to = qd_parse_value(content->ma_pf_to_override);
+            const int hdr_len = qd_compose_str_header(hdr, to.remaining);
+
+            msize += hdr_len;
+            qd_buffer_list_append(ma_trailer, hdr, hdr_len);
+
+            msize += to.remaining;
+            qd_buffer_list_append_field(ma_trailer, &to);
         }
     }
 
-    if (msg->content->ma_count > 0) {
-        // insert the incoming message user blob
-        if (!map_started) {
-            qd_compose_start_map(out_ma);
-            map_started = true;
-        }
+    if (!msg->ma_filter_ingress) {
+        mcount += 2;
+
+        // key
+        msize += QD_MA_INGRESS_ENCODED_LEN;
+        qd_buffer_list_append(ma_trailer, QD_MA_INGRESS_ENCODED, QD_MA_INGRESS_ENCODED_LEN);
+
+        // value: use original value if present, else the local node is the
+        // ingress
+        if (content->ma_pf_ingress && !msg->ma_reset_ingress) {
+            uint8_t hdr[5];   // max size str8/32 header
+            qd_buffer_field_t ingress = qd_parse_value(content->ma_pf_ingress);
+            const int hdr_len = qd_compose_str_header(hdr, ingress.remaining);
 
-        // Bump the map size and count to reflect user's blob.
-        // Note that the blob is not inserted here. This code adjusts the
-        // size/count of the map that is under construction and the content
-        // is inserted by router-node
-        qd_compose_insert_opaque_elements(out_ma, msg->content->ma_count,
-                                          msg->content->field_user_annotations.length);
+            msize += hdr_len;
+            qd_buffer_list_append(ma_trailer, hdr, hdr_len);
+
+            msize += ingress.remaining;
+            qd_buffer_list_append_field(ma_trailer, &ingress);
+
+        } else {
+            size_t node_id_len;
+            const uint8_t *node_id = qd_router_id_encoded(&node_id_len);
+            msize += node_id_len;
+            qd_buffer_list_append(ma_trailer, node_id, node_id_len);
+        }
     }
 
-    if (field_count > 0) {
-        if (!map_started) {
-            qd_compose_start_map(out_ma);
-            map_started = true;
+    if (!msg->ma_filter_trace) {
+        mcount += 2;
+        size_t node_id_len;
+        const uint8_t *node_id = qd_router_id_encoded(&node_id_len);
+        uint32_t trace_count = 1;  // local node
+        uint32_t trace_len = node_id_len;
+        const bool use_incoming = content->ma_pf_trace && !msg->ma_reset_trace;
+
+        // key
+        msize += QD_MA_TRACE_ENCODED_LEN;
+        qd_buffer_list_append(ma_trailer, QD_MA_TRACE_ENCODED, QD_MA_TRACE_ENCODED_LEN);
+
+        // value: first compute trace list size and count since the list header
+        // must be written first
+        qd_buffer_field_t in_trace;
+        if (use_incoming) {
+            in_trace = qd_parse_value(content->ma_pf_trace);
+            trace_len += in_trace.remaining;
+            trace_count += qd_parse_sub_count(content->ma_pf_trace);
         }
-        qd_compose_insert_opaque_elements(out_ma, field_count * 2,
-                                          qd_buffer_list_length(&field->buffers));
 
-    }
+        uint8_t list_hdr[9];  // max len encoded list header
+        const int hdr_len = qd_compose_list_header(list_hdr, trace_len, trace_count);
 
-    if (map_started) {
-        qd_compose_end_map(out_ma);
-        qd_compose_take_buffers(out_ma, out);
-        qd_compose_take_buffers(field, out_trailer);
-    }
+        msize += hdr_len;
+        qd_buffer_list_append(ma_trailer, list_hdr, hdr_len);
 
-    qd_compose_free(out_ma);
-    qd_compose_free(field);
-}
+        if (use_incoming) {
+            msize += in_trace.remaining;
+            qd_buffer_list_append_field(ma_trailer, &in_trace);
+        }
 
+        msize += node_id_len;
+        qd_buffer_list_append(ma_trailer, node_id, node_id_len);
+    }
 
-// create a buffer chain holding the outgoing message annotations section
-static void compose_message_annotations(qd_message_pvt_t *msg, qd_buffer_list_t *out,
-                                        qd_buffer_list_t *out_trailer,
-                                        bool strip_annotations)
-{
-    if (strip_annotations) {
-        compose_message_annotations_v0(msg, out);
-    } else {
-        compose_message_annotations_v1(msg, out, out_trailer);
+    if (msize) {
+        // setup the MA section descriptor:
+        ma_header[0] = 0;
+        ma_header[1] = QD_AMQP_SMALLULONG;
+        ma_header[2] = QD_PERFORMATIVE_MESSAGE_ANNOTATIONS;
+
+        // setup the MA MAP header
+        const int hdr_size = qd_compose_map_header(&ma_header[3], msize, mcount);
+        return hdr_size + 3;
     }
+
+    return 0;
 }
 
 
@@ -1807,7 +1847,6 @@ void qd_message_send(qd_message_t *in_msg,
 {
     qd_message_pvt_t     *msg     = (qd_message_pvt_t*) in_msg;
     qd_message_content_t *content = msg->content;
-    qd_buffer_t          *buf     = 0;
     pn_link_t            *pnl     = qd_link_pn(link);
 
     *q3_stalled                   = false;
@@ -1825,97 +1864,90 @@ void qd_message_send(qd_message_t *in_msg,
             return;
         }
 
-        qd_buffer_list_t new_ma;
-        qd_buffer_list_t new_ma_trailer;
-        DEQ_INIT(new_ma);
-        DEQ_INIT(new_ma_trailer);
+        msg->cursor.buffer = DEQ_HEAD(content->buffers);
+        msg->cursor.cursor = qd_buffer_base(msg->cursor.buffer);
 
-        // Process  the message annotations if any
-        compose_message_annotations(msg, &new_ma, &new_ma_trailer, strip_annotations);
-
-        //
-        // Start with the very first buffer;
-        //
-        buf = DEQ_HEAD(content->buffers);
+        // Since link-routed messages do not set router annotations they will
+        // skip the following (content->ma_disabled will be true) and unconditionally
+        // start sending from the first octet of the content.
 
+        if (!content->ma_disabled) {
+            //
+            // Send header if present
+            //
+            const int header_consume = content->section_message_header.length + content->section_message_header.hdr_length;
+            if (header_consume > 0) {
+                assert(msg->cursor.cursor == content->section_message_header.offset + qd_buffer_base(msg->cursor.buffer));
+                advance_guarded(&msg->cursor.cursor, &msg->cursor.buffer, header_consume, send_handler, (void*) pnl);
+            }
 
-        //
-        // Send header if present
-        //
-        unsigned char *cursor = qd_buffer_base(buf);
-        int header_consume = content->section_message_header.length + content->section_message_header.hdr_length;
-        if (content->section_message_header.length > 0) {
-            buf    = content->section_message_header.buffer;
-            cursor = content->section_message_header.offset + qd_buffer_base(buf);
-            advance_guarded(&cursor, &buf, header_consume, send_handler, (void*) pnl);
-        }
+            //
+            // Send delivery annotation if present
+            //
+            const int da_consume = content->section_delivery_annotation.length + content->section_delivery_annotation.hdr_length;
+            if (da_consume > 0) {
+                assert(msg->cursor.cursor == content->section_delivery_annotation.offset + qd_buffer_base(msg->cursor.buffer));
+                advance_guarded(&msg->cursor.cursor, &msg->cursor.buffer, da_consume, send_handler, (void*) pnl);
+            }
 
-        //
-        // Send delivery annotation if present
-        //
-        int da_consume = content->section_delivery_annotation.length + content->section_delivery_annotation.hdr_length;
-        if (content->section_delivery_annotation.length > 0) {
-            buf    = content->section_delivery_annotation.buffer;
-            cursor = content->section_delivery_annotation.offset + qd_buffer_base(buf);
-            advance_guarded(&cursor, &buf, da_consume, send_handler, (void*) pnl);
-        }
+            //
+            // Send the message annotations section
+            //
 
-        //
-        // Send new message annotations map start if any
-        //
-        qd_buffer_t *da_buf = DEQ_HEAD(new_ma);
-        while (da_buf) {
-            char *to_send = (char*) qd_buffer_base(da_buf);
-            pn_link_send(pnl, to_send, qd_buffer_size(da_buf));
-            da_buf = DEQ_NEXT(da_buf);
-        }
-        qd_buffer_list_free_buffers(&new_ma);
+            uint8_t ma_header[12];  // max length for MA section and map header
+            int ma_header_len;      // size of ma_header content
+            qd_buffer_list_t ma_trailer = DEQ_EMPTY;
 
-        //
-        // Annotations possibly include an opaque blob of user annotations
-        //
-        if (content->field_user_annotations.length > 0) {
-            qd_buffer_t *buf2      = content->field_user_annotations.buffer;
-            unsigned char *cursor2 = content->field_user_annotations.offset + qd_buffer_base(buf);
-            advance_guarded(&cursor2, &buf2,
-                            content->field_user_annotations.length,
-                            send_handler, (void*) pnl);
-        }
-
-        //
-        // Annotations may include the v1 new_ma_trailer
-        //
-        qd_buffer_t *ta_buf = DEQ_HEAD(new_ma_trailer);
-        while (ta_buf) {
-            char *to_send = (char*) qd_buffer_base(ta_buf);
-            pn_link_send(pnl, to_send, qd_buffer_size(ta_buf));
-            ta_buf = DEQ_NEXT(ta_buf);
-        }
-        qd_buffer_list_free_buffers(&new_ma_trailer);
+            if (strip_annotations) {
+                // send the original user message annotations only (if present)
+                ma_header_len = restore_user_message_annotations(msg, ma_header);
+            } else {
+                ma_header_len = compose_router_message_annotations(msg, ma_header, &ma_trailer);
+            }
 
+            if (ma_header_len) {
+                //
+                // send annotation section and map header
+                //
+                pn_link_send(pnl, (char*) ma_header, ma_header_len);
 
-        //
-        // Skip over replaced message annotations
-        //
-        int ma_consume = content->section_message_annotation.hdr_length + content->section_message_annotation.length;
-        if (content->section_message_annotation.length > 0)
-            advance_guarded(&cursor, &buf, ma_consume, 0, 0);
+                //
+                // Now send any annotation set by the original endpoint
+                //
+                if (content->ma_user_annotations.remaining) {
+                    qd_buffer_t *buf2      = content->ma_user_annotations.buffer;
+                    const uint8_t *cursor2 = content->ma_user_annotations.cursor;
+                    advance_guarded(&cursor2, &buf2,
+                                    content->ma_user_annotations.remaining,
+                                    send_handler, (void*) pnl);
+                }
 
-        msg->cursor.buffer = buf;
+                //
+                // Next send router annotations
+                //
+                qd_buffer_t *ta_buf = DEQ_HEAD(ma_trailer);
+                while (ta_buf) {
+                    char *to_send = (char*) qd_buffer_base(ta_buf);
+                    pn_link_send(pnl, to_send, qd_buffer_size(ta_buf));
+                    ta_buf = DEQ_NEXT(ta_buf);
+                }
+                qd_buffer_list_free_buffers(&ma_trailer);
+            }
 
-        //
-        // If this message has no header and no delivery annotations and no message annotations, set the offset to 0.
-        //
-        if (header_consume == 0 && da_consume == 0 && ma_consume ==0)
-            msg->cursor.cursor = qd_buffer_base(buf);
-        else
-            msg->cursor.cursor = cursor;
+            //
+            // Skip over replaced message annotations
+            //
+            const int ma_consume = content->section_message_annotation.hdr_length + content->section_message_annotation.length;
+            if (ma_consume > 0) {
+                assert(msg->cursor.cursor == content->section_message_annotation.offset + qd_buffer_base(msg->cursor.buffer));
+                advance_guarded(&msg->cursor.cursor, &msg->cursor.buffer, ma_consume, 0, 0);
+            }
+        }
 
         msg->sent_depth = QD_DEPTH_MESSAGE_ANNOTATIONS;
-
     }
 
-    buf = msg->cursor.buffer;
+    qd_buffer_t *buf = msg->cursor.buffer;
 
     qd_message_q2_unblocker_t  q2_unblock = {0};
     pn_session_t              *pns        = pn_link_session(pnl);
@@ -2310,71 +2342,13 @@ ssize_t qd_message_field_copy(qd_message_t *msg, qd_message_field_t field, char
 }
 
 
-void qd_message_compose_1(qd_message_t *msg, const char *to, qd_buffer_list_t *buffers)
-{
-    qd_composed_field_t  *field   = qd_compose(QD_PERFORMATIVE_HEADER, 0);
-    qd_message_content_t *content = MSG_CONTENT(msg);
-    SET_ATOMIC_FLAG(&content->receive_complete);
-
-    qd_compose_start_list(field);
-    qd_compose_insert_bool(field, 0);     // durable
-    qd_compose_insert_null(field);        // priority
-    //qd_compose_insert_null(field);        // ttl
-    //qd_compose_insert_boolean(field, 0);  // first-acquirer
-    //qd_compose_insert_uint(field, 0);     // delivery-count
-    qd_compose_end_list(field);
-
-    qd_buffer_list_t out_ma;
-    qd_buffer_list_t out_ma_trailer;
-    DEQ_INIT(out_ma);
-    DEQ_INIT(out_ma_trailer);
-    compose_message_annotations((qd_message_pvt_t*)msg, &out_ma, &out_ma_trailer, false);
-    qd_compose_insert_buffers(field, &out_ma);
-    // TODO: user annotation blob goes here
-    qd_compose_insert_buffers(field, &out_ma_trailer);
-
-    field = qd_compose(QD_PERFORMATIVE_PROPERTIES, field);
-    qd_compose_start_list(field);
-    qd_compose_insert_null(field);          // message-id
-    qd_compose_insert_null(field);          // user-id
-    qd_compose_insert_string(field, to);    // to
-    //qd_compose_insert_null(field);          // subject
-    //qd_compose_insert_null(field);          // reply-to
-    //qd_compose_insert_null(field);          // correlation-id
-    //qd_compose_insert_null(field);          // content-type
-    //qd_compose_insert_null(field);          // content-encoding
-    //qd_compose_insert_timestamp(field, 0);  // absolute-expiry-time
-    //qd_compose_insert_timestamp(field, 0);  // creation-time
-    //qd_compose_insert_null(field);          // group-id
-    //qd_compose_insert_uint(field, 0);       // group-sequence
-    //qd_compose_insert_null(field);          // reply-to-group-id
-    qd_compose_end_list(field);
-
-    if (buffers) {
-        field = qd_compose(QD_PERFORMATIVE_BODY_DATA, field);
-        qd_compose_insert_binary_buffers(field, buffers);
-    }
-
-    qd_compose_take_buffers(field, &content->buffers);
-    qd_compose_free(field);
-}
-
-
-void qd_message_compose_2(qd_message_t *msg, qd_composed_field_t *field, bool complete)
-{
-    qd_message_content_t *content       = MSG_CONTENT(msg);
-    qd_buffer_list_t     *field_buffers = qd_compose_buffers(field);
-
-    content->buffers          = *field_buffers;
-    SET_ATOMIC_BOOL(&content->receive_complete, complete);
-
-    DEQ_INIT(*field_buffers); // Zero out the linkage to the now moved buffers.
-}
-
-
+// deprecated - use qd_message_compose() for creating locally generated messages
 void qd_message_compose_3(qd_message_t *msg, qd_composed_field_t *field1, qd_composed_field_t *field2, bool receive_complete)
 {
     qd_message_content_t *content        = MSG_CONTENT(msg);
+
+    LOCK(content->lock);
+
     SET_ATOMIC_BOOL(&content->receive_complete, receive_complete);
     qd_buffer_list_t     *field1_buffers = qd_compose_buffers(field1);
     qd_buffer_list_t     *field2_buffers = qd_compose_buffers(field2);
@@ -2382,44 +2356,49 @@ void qd_message_compose_3(qd_message_t *msg, qd_composed_field_t *field1, qd_com
     content->buffers = *field1_buffers;
     DEQ_INIT(*field1_buffers);
     DEQ_APPEND(content->buffers, (*field2_buffers));
-}
 
+    // initialize the Q2 flag:
+    if (_Q2_holdoff_should_block_LH(content))
+        content->q2_input_holdoff = true;
 
-void qd_message_compose_4(qd_message_t *msg, qd_composed_field_t *field1, qd_composed_field_t *field2, qd_composed_field_t *field3, bool receive_complete)
-{
-    qd_message_content_t *content        = MSG_CONTENT(msg);
-    SET_ATOMIC_BOOL(&content->receive_complete, receive_complete);
-    qd_buffer_list_t     *field1_buffers = qd_compose_buffers(field1);
-    CHECK_Q2(*field1_buffers);
-    qd_buffer_list_t     *field2_buffers = qd_compose_buffers(field2);
-    CHECK_Q2(*field2_buffers);
-    qd_buffer_list_t     *field3_buffers = qd_compose_buffers(field3);
-    CHECK_Q2(*field3_buffers);
+    UNLOCK(content->lock);
 
-    content->buffers = *field1_buffers;
-    DEQ_INIT(*field1_buffers);
-    DEQ_APPEND(content->buffers, (*field2_buffers));
-    DEQ_APPEND(content->buffers, (*field3_buffers));
+    // set up the locations of the message headers sent prior to the message
+    // annotations section.  This is used when composing outgoing router
+    // annotations:
+    qd_message_parse_annotations(msg);
 }
 
-void qd_message_compose_5(qd_message_t *msg, qd_composed_field_t *field1, qd_composed_field_t *field2, qd_composed_field_t *field3, qd_composed_field_t *field4, bool receive_complete)
+
+qd_message_t *qd_message_compose(qd_composed_field_t *f1,
+                                 qd_composed_field_t *f2,
+                                 qd_composed_field_t *f3,
+                                 bool receive_complete)
 {
-    qd_message_content_t *content        = MSG_CONTENT(msg);
+    qd_message_t *msg = qd_message();
+    if (!msg)
+        return 0;
+
+    qd_composed_field_t *fields[4] = {f1, f2, f3, 0};
+    qd_message_content_t *content = MSG_CONTENT(msg);
     SET_ATOMIC_BOOL(&content->receive_complete, receive_complete);
-    qd_buffer_list_t     *field1_buffers = qd_compose_buffers(field1);
-    CHECK_Q2(*field1_buffers);
-    qd_buffer_list_t     *field2_buffers = qd_compose_buffers(field2);
-    CHECK_Q2(*field2_buffers);
-    qd_buffer_list_t     *field3_buffers = qd_compose_buffers(field3);
-    CHECK_Q2(*field3_buffers);
-    qd_buffer_list_t     *field4_buffers = qd_compose_buffers(field4);
-    CHECK_Q2(*field4_buffers);
-    content->buffers = *field1_buffers;
-    DEQ_INIT(*field1_buffers);
-    DEQ_APPEND(content->buffers, (*field2_buffers));
-    DEQ_APPEND(content->buffers, (*field3_buffers));
-    DEQ_APPEND(content->buffers, (*field4_buffers));
 
+    for (int idx = 0; fields[idx] != 0; ++idx) {
+        qd_buffer_list_t *bufs = qd_compose_buffers(fields[idx]);
+        DEQ_APPEND(content->buffers, (*bufs));
+        qd_compose_free(fields[idx]);
+    }
+
+    // initialize the Q2 flag:
+    if (_Q2_holdoff_should_block_LH(content))
+        content->q2_input_holdoff = true;
+
+    // set up the locations of the message headers sent prior to the message
+    // annotations section.  This is used when composing outgoing router
+    // annotations:
+    qd_message_parse_annotations(msg);
+
+    return msg;
 }
 
 
@@ -2838,15 +2817,23 @@ qd_message_stream_data_result_t qd_message_next_stream_data(qd_message_t *in_msg
 }
 
 
-qd_parsed_field_t *qd_message_get_ingress(qd_message_t *msg)
+qd_parsed_field_t *qd_message_get_ingress_router(qd_message_t *msg)
 {
     return ((qd_message_pvt_t*) msg)->content->ma_pf_ingress;
 }
 
 
-qd_parsed_field_t *qd_message_get_phase(qd_message_t *msg)
+// used by exchange bindings to erase original ingress node id
+void qd_message_reset_ingress_router_annotation(qd_message_t *in_msg)
+{
+    qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
+    msg->ma_reset_ingress = true;
+}
+
+
+void qd_message_disable_ingress_router_annotation(qd_message_t *msg)
 {
-    return ((qd_message_pvt_t*) msg)->content->ma_pf_phase;
+    ((qd_message_pvt_t*) msg)->ma_filter_ingress = true;
 }
 
 
@@ -2862,15 +2849,24 @@ qd_parsed_field_t *qd_message_get_trace(qd_message_t *msg)
 }
 
 
-int qd_message_get_phase_val(qd_message_t *msg)
+void qd_message_disable_trace_annotation(qd_message_t *msg)
+{
+    ((qd_message_pvt_t*) msg)->ma_filter_trace = true;
+}
+
+
+// used by exchange bindings to erase old message trace list
+void qd_message_reset_trace_annotation(qd_message_t *in_msg)
 {
-    return ((qd_message_pvt_t*) msg)->content->ma_int_phase;
+    qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
+    msg->ma_reset_trace = true;
 }
 
-int qd_message_is_streaming(qd_message_t *msg)
+
+int qd_message_is_streaming(const qd_message_t *msg)
 {
-    qd_message_pvt_t *msg_pvt = (qd_message_pvt_t *)msg;
-    return IS_ATOMIC_FLAG_SET(&msg_pvt->content->ma_stream);
+    const qd_message_pvt_t *msg_pvt = (const qd_message_pvt_t *)msg;
+    return msg_pvt->ma_streaming;
 }
 
 
diff --git a/src/message_private.h b/src/message_private.h
index fff1b17..be43954 100644
--- a/src/message_private.h
+++ b/src/message_private.h
@@ -99,7 +99,6 @@ typedef struct {
     qd_field_location_t  section_application_properties;  // The application properties list
     qd_field_location_t  section_body;                    // The message body: Data
     qd_field_location_t  section_footer;                  // The footer
-    qd_field_location_t  field_user_annotations;          // Opaque user message annotations, not a real field.
 
     qd_field_location_t  field_message_id;                // The string value of the message-id
     qd_field_location_t  field_user_id;                   // The string value of the user-id
@@ -120,16 +119,21 @@ typedef struct {
     qd_message_depth_t   parse_depth;                     // Depth to which message content has been parsed
     qd_iterator_t       *ma_field_iter_in;                // Iter for msg.FIELD_MESSAGE_ANNOTATION
 
-    qd_buffer_field_t    ma_user_annotation_blob;        // Original user annotations
-                                                          //  with router annotations stripped
-    uint32_t             ma_count;                        // Number of map elements in blob
-                                                          //  after router fields stripped
+    // Original user-supplied message annotations: this is the location in the
+    // received message of all annotation key/value pairs provided by the
+    // origin endpoint.  Router-specific message annotations appear after these
+    // user values.
+    qd_buffer_field_t    ma_user_annotations;
+    uint32_t             ma_user_count;   // total # of user map entries
+
+    // Locations in the received message for the ingress-router ID, the
+    // to-override address, and the router trace list.  These fields are only
+    // present if the message has arrived from another router (not a client
+    // endpoint).
     qd_parsed_field_t   *ma_pf_ingress;
-    qd_parsed_field_t   *ma_pf_phase;
     qd_parsed_field_t   *ma_pf_to_override;
     qd_parsed_field_t   *ma_pf_trace;
-    int                  ma_int_phase;
-    sys_atomic_t         ma_stream;                      // Message is streaming
+
     uint64_t             max_message_size;               // Configured max; 0 if no max to enforce
     uint64_t             bytes_received;                 // Bytes returned by pn_link_recv()
                                                          //  when enforcing max_message_size
@@ -139,11 +143,13 @@ typedef struct {
 
     qd_message_q2_unblocker_t q2_unblocker;              // Callback and context to signal Q2 unblocked to receiver
 
+    bool                 ma_disabled;                    // true: link routing - no MA handling needed.
     bool                 ma_parsed;                      // Have parsed incoming message annotations message
-    sys_atomic_t         discard;                        // Message is being discarded
-    sys_atomic_t         receive_complete;               // Message has been completely received
     bool                 q2_input_holdoff;               // Q2 state: hold off calling pn_link_recv
     bool                 disable_q2_holdoff;             // Disable Q2 flow control
+
+    sys_atomic_t         discard;                        // Message is being discarded
+    sys_atomic_t         receive_complete;               // Message has been completely received
     sys_atomic_t         priority_parsed;                // Message priority has been parsed
     sys_atomic_t         oversize;                       // Policy oversize-message handling in effect
     sys_atomic_t         no_body;                        // HTTP2 request has no body
@@ -157,10 +163,13 @@ struct qd_message_pvt_t {
     qd_message_depth_t             sent_depth;      // Depth of outgoing sent message
     qd_message_content_t          *content;         // Singleton content shared by reference between
                                                     //  incoming and all outgoing copies
-    qd_buffer_list_t               ma_to_override;  // To field in outgoing message annotations.
-    qd_buffer_list_t               ma_trace;        // Trace list in outgoing message annotations
-    qd_buffer_list_t               ma_ingress;      // Ingress field in outgoing message annotations
+    char                          *ma_to_override;  // new outgoing value for to-override MA
     int                            ma_phase;        // Phase for override address
+    bool                           ma_streaming;    // Do not attempt to wait for entire msg to arrive.
+    bool                           ma_filter_trace; // Do not add trace list to outbound msg (sending to edge-router)
+    bool                           ma_filter_ingress;  // Do not add ingress router to outbound msg (sending to edge-router)
+    bool                           ma_reset_trace;     // exchange-bindings: discard incoming trace, replace with local node id
+    bool                           ma_reset_ingress;   // exchange-bindings: discard incoming ingress, replace with local node id
     qd_message_stream_data_list_t  stream_data_list;// Stream data parse structure
                                                     // TODO - move this to the content for one-time parsing (TLR)
     unsigned char                 *body_cursor;     // Stream: tracks the point in the content buffer chain
diff --git a/src/parse.c b/src/parse.c
index 3f43e5f..511d8d6 100644
--- a/src/parse.c
+++ b/src/parse.c
@@ -24,27 +24,37 @@
 #include "qpid/dispatch/ctools.h"
 #include "buffer_field_api.h"
 
+#include "buffer_field_api.h"
+
 #include <assert.h>
 #include <inttypes.h>
 #include <stdio.h>
 
 DEQ_DECLARE(qd_parsed_field_t, qd_parsed_field_list_t);
 
+
+typedef struct qd_amqp_field_t {
+    uint8_t           tag;
+    uint32_t          size;   // includes length of count!
+    uint32_t          count;
+    qd_buffer_field_t value;  // the raw (encoded) value
+} qd_amqp_field_t;
+
+
 struct qd_parsed_field_t {
     DEQ_LINKS(qd_parsed_field_t);
     const qd_parsed_field_t *parent;
     qd_parsed_field_list_t   children;
-    uint8_t                  tag;
-    qd_iterator_t           *raw_iter;
-    qd_iterator_t           *typed_iter;
+    qd_iterator_t           *typed_iter;  // iterator over the full field (header and value)
+    qd_iterator_t           *raw_iter;    // iterator over just the value
     const char              *parse_error;
+    qd_buffer_field_t        full_field;  // contains encoded AMQP type header and value
+    qd_amqp_field_t          amqp;        // decoded header and raw value
 };
 
 ALLOC_DECLARE(qd_parsed_field_t);
 ALLOC_DEFINE(qd_parsed_field_t);
 
-ALLOC_DECLARE(qd_parsed_turbo_t);
-ALLOC_DEFINE(qd_parsed_turbo_t);
 
 qd_parsed_field_t* qd_field_first_child(qd_parsed_field_t *field)
 {
@@ -56,118 +66,145 @@ qd_parsed_field_t* qd_field_next_child(qd_parsed_field_t *field)
     return DEQ_NEXT(field);
 }
 
+
+// length of size and count of AMQP data fields can be determined by the value
+// of the top 4 bits of the tag octet.  See AMQP 1.0 Part 1 Types.
+//
+static inline int tag_get_size_length(uint8_t tag)
+{
+    tag &= 0xF0;
+    if (tag < 0xA0) return 0;
+    if ((tag & 0x10) == 0) return 1;
+    return 4;
+}
+
+
+static inline int tag_get_count_length(uint8_t tag)
+{
+    tag &= 0xF0;
+    if (tag < 0xC0) return 0;
+    if ((tag & 0x10) == 0) return 1;
+    return 4;
+}
+
+
 /**
- * size = the number of bytes following tag:size (payload, including the count)
- * count = the number of elements. Applies only to compound structures
+ * Extract an AMQP value from the encoded data held in *bfield and store it in *value.
+ * bfield is expected to point to the tag octet and will be advanced past the decoded value.
+ * Returns 0 on success, else an error message.
  */
-static char *get_type_info(qd_iterator_t *iter, uint8_t *tag, uint32_t *size, uint32_t *count, uint32_t *length_of_size, uint32_t *length_of_count)
+static inline char *parse_amqp_field(qd_buffer_field_t *bfield, qd_amqp_field_t *value)
 {
-    if (qd_iterator_end(iter))
-        return "Insufficient Data to Determine Tag";
+    ZERO(value);
 
-    *tag             = qd_iterator_octet(iter);
-    *count           = 0;
-    *size            = 0;
-    *length_of_count = 0;
-    *length_of_size  = 0;
+    if (!qd_buffer_field_octet(bfield, &value->tag))
+        return "Insufficient Data to Determine Tag";
 
+    uint32_t length_of_count = tag_get_count_length(value->tag);
+    uint32_t length_of_size  = tag_get_size_length(value->tag);
 
-    switch (*tag & 0xF0) {
+    // extract size and content (optional)
+    switch (value->tag & 0xF0) {
     case 0x40:
-        *size = 0;
         break;
     case 0x50:
-        *size = 1;
+        value->size = 1;
         break;
     case 0x60:
-        *size = 2;
+        value->size = 2;
         break;
     case 0x70:
-        *size = 4;
+        value->size = 4;
         break;
     case 0x80:
-        *size = 8;
+        value->size = 8;
         break;
     case 0x90:
-        *size = 16;
+        value->size = 16;
         break;
     case 0xB0:
     case 0xD0:
     case 0xF0:
-        *size += ((unsigned int) qd_iterator_octet(iter)) << 24;
-        *size += ((unsigned int) qd_iterator_octet(iter)) << 16;
-        *size += ((unsigned int) qd_iterator_octet(iter)) << 8;
-        *length_of_size = 3;
-        // fall through to the next case
-
+    {
+        (void) length_of_size; // ignore unused var error
+        assert(length_of_size == 4);
+        if (!qd_buffer_field_uint32(bfield, &value->size)) {
+            return "Insufficient Data to Determine Length";
+        }
+        if (length_of_count) {
+            assert(length_of_count == 4);
+            if (!qd_buffer_field_uint32(bfield, &value->count)) {
+                return "Insufficient Data to Determine Count";
+            }
+        }
+    }
+        break;
     case 0xA0:
     case 0xC0:
     case 0xE0:
-        if (qd_iterator_end(iter))
+    {
+        uint8_t octet;
+        assert(length_of_size == 1);
+        if (!qd_buffer_field_octet(bfield, &octet)) {
             return "Insufficient Data to Determine Length";
-        *size += (unsigned int) qd_iterator_octet(iter);
-        *length_of_size += 1;
+        }
+        value->size = octet;
+        if (length_of_count) {
+            assert(length_of_count == 1);
+            if (!qd_buffer_field_octet(bfield, &octet)) {
+                return "Insufficient Data to Determine Count";
+            }
+            value->count = octet;
+        }
         break;
+    }
 
     default:
         return "Invalid Tag - No Length Information";
     }
 
-    switch (*tag & 0xF0) {
-    case 0xD0:
-    case 0xF0:
-        *count += ((unsigned int) qd_iterator_octet(iter)) << 24;
-        *count += ((unsigned int) qd_iterator_octet(iter)) << 16;
-        *count += ((unsigned int) qd_iterator_octet(iter)) << 8;
-        *length_of_count = 3;
-        // fall through to the next case
-
-    case 0xC0:
-    case 0xE0:
-        if (qd_iterator_end(iter))
-            return "Insufficient Data to Determine Count";
-        *count += (unsigned int) qd_iterator_octet(iter);
-        *length_of_count += 1;
-        break;
-    }
-
-    if ((*tag == QD_AMQP_MAP8 || *tag == QD_AMQP_MAP32) && (*count & 1))
+    if ((value->tag == QD_AMQP_MAP8 || value->tag == QD_AMQP_MAP32) && (value->count & 1))
         return "Odd Number of Elements in a Map";
 
-    if (*length_of_count > *size)
+    if (length_of_count > value->size)
         return "Insufficient Length to Determine Count";
 
+    value->value = *bfield;
+    value->value.remaining = value->size - length_of_count;
+    size_t moved = qd_buffer_field_advance(bfield, value->value.remaining);
+    if (moved != value->value.remaining)
+        return "Truncated field";
+
     return 0;
 }
 
-static qd_parsed_field_t *qd_parse_internal(qd_iterator_t *iter, qd_parsed_field_t *p)
+
+
+// bfield contains the encoded AMQP data to be parsed.  bfield starts at the
+// type tag octet and should be long enough to hold the entire AMQP data type.
+// On return bfield has been advanced past the encoded AMQP data.
+//
+static qd_parsed_field_t *qd_parse_internal(qd_buffer_field_t *bfield, qd_parsed_field_t *p)
 {
     qd_parsed_field_t *field = new_qd_parsed_field_t();
     if (!field)
         return 0;
-
+    ZERO(field);
     DEQ_ITEM_INIT(field);
     DEQ_INIT(field->children);
-    field->parent   = p;
-    field->raw_iter = 0;
-    field->typed_iter = qd_iterator_dup(iter);
-
-    uint32_t size            = 0;
-    uint32_t count           = 0;
-    uint32_t length_of_count = 0;
-    uint32_t length_of_size  = 0;
-
-    field->parse_error = get_type_info(iter, &field->tag, &size, &count, &length_of_size, &length_of_count);
+    field->parent     = p;
+    field->full_field = *bfield;
 
+    field->parse_error = parse_amqp_field(bfield, &field->amqp);
     if (!field->parse_error) {
-        qd_iterator_trim_view(field->typed_iter, size + length_of_size + 1); // + 1 accounts for the tag length
-
-        field->raw_iter = qd_iterator_sub(iter, size - length_of_count);
-
-        qd_iterator_advance(iter, size - length_of_count);
-
-        for (uint32_t idx = 0; idx < count; idx++) {
-            qd_parsed_field_t *child = qd_parse_internal(field->raw_iter, field);
+        // truncate full_field in case bfield holds multiple values.
+        // since bfield has advanced past the parsed field we just subtract it.
+        field->full_field.remaining -= bfield->remaining;
+
+        // now parse out the content of any contained types:
+        qd_buffer_field_t children = field->amqp.value;
+        for (uint32_t idx = 0; idx < field->amqp.count; idx++) {
+            qd_parsed_field_t *child = qd_parse_internal(&children, field);
             DEQ_INSERT_TAIL(field->children, child);
             if (!qd_parse_ok(child)) {
                 field->parse_error = child->parse_error;
@@ -180,106 +217,13 @@ static qd_parsed_field_t *qd_parse_internal(qd_iterator_t *iter, qd_parsed_field
 }
 
 
-qd_parsed_field_t *qd_parse(qd_iterator_t *iter)
+qd_parsed_field_t *qd_parse(const qd_iterator_t *iter)
 {
     if (!iter)
         return 0;
-    return qd_parse_internal(iter, 0);
-}
-
-
-const char *qd_parse_turbo(qd_iterator_t          *iter,
-                           qd_parsed_turbo_list_t *annos,
-                           uint32_t               *user_entries,
-                           uint32_t               *user_bytes)
-{
-    if (!iter || !annos || !user_entries || !user_bytes)
-        return  "missing argument";
-
-    DEQ_INIT(*annos);
-    *user_entries = 0;
-    *user_bytes = 0;
-
-    // The iter is addressing the message-annotations map.
-    // Open the field describing the map's items
-    uint8_t  tag             = 0;
-    uint32_t size            = 0;
-    uint32_t count           = 0;
-    uint32_t length_of_count = 0;
-    uint32_t length_of_size  = 0;
-    const char * parse_error = get_type_info(iter, &tag, &size, &count, &length_of_size, &length_of_count);
-
-    if (parse_error)
-        return parse_error;
-
-    if (count == 0)
-        return 0;
-
-    int n_allocs = 0;
-
-    // Do skeletal parse of each map element
-    for (uint32_t idx = 0; idx < count; idx++) {
-        qd_parsed_turbo_t *turbo;
-        if (n_allocs < QD_MA_FILTER_LEN * 2) {
-            turbo = new_qd_parsed_turbo_t();
-            n_allocs++;
-
-        } else {
-            // Retire an existing element.
-            // If there are this many in the list then this one cannot be a
-            // router annotation and must be a user annotation.
-            turbo = DEQ_HEAD(*annos);
-            *user_entries += 1;
-            *user_bytes += sizeof(turbo->tag) + turbo->size + turbo->length_of_size;
-            DEQ_REMOVE_HEAD(*annos);
-        }
-        if (!turbo)
-            return "failed to allocate qd_parsed_turbo_t";
-        ZERO(turbo);
-
-        // Get the buffer pointers for the map element
-        turbo->bufptr = qd_iterator_get_view_cursor(iter);
-
-        // Get description of the map element
-        parse_error = get_type_info(iter, &turbo->tag, &turbo->size, &turbo->count,
-                                    &turbo->length_of_size, &turbo->length_of_count);
-        if (parse_error) {
-            free_qd_parsed_turbo_t(turbo);
-            return parse_error;
-        }
-
-        // Save parsed element
-        DEQ_INSERT_TAIL(*annos, turbo);
-
-        // Advance map iterator to next map element
-        qd_iterator_advance(iter, turbo->size - turbo->length_of_count);
-    }
 
-    // remove leading annos in the queue if their prefix is not a match and
-    // return them as part of the user annotations
-    for (int idx=0; idx < n_allocs; idx += 2) {
-        qd_parsed_turbo_t *turbo = DEQ_HEAD(*annos);
-        assert(turbo);
-        qd_buffer_field_t key = turbo->bufptr;
-        qd_buffer_field_advance(&key, turbo->length_of_size + 1);
-        if (qd_buffer_field_equal(&key, (const uint8_t*) QD_MA_PREFIX, QD_MA_PREFIX_LEN))
-            break;
-
-        // leading anno is a user annotation map key
-        // remove the key and value from the list and accumulate them as user items
-        *user_bytes += sizeof(turbo->tag) + turbo->size + turbo->length_of_size;
-        DEQ_REMOVE_HEAD(*annos);
-        free_qd_parsed_turbo_t(turbo);
-
-        turbo = DEQ_HEAD(*annos);
-        assert(turbo);
-        *user_bytes += sizeof(turbo->tag) + turbo->size + turbo->length_of_size;
-        DEQ_REMOVE_HEAD(*annos);
-        free_qd_parsed_turbo_t(turbo);
-
-        *user_entries += 2;
-    }
-    return parse_error;
+    qd_buffer_field_t bfield = qd_iterator_get_view_cursor(iter);
+    return qd_parse_internal(&bfield, 0);
 }
 
 
@@ -316,11 +260,11 @@ static qd_parsed_field_t *qd_parse_dup_internal(const qd_parsed_field_t *field,
         return 0;
 
     ZERO(dup);
-    dup->parent      = parent;
-    dup->tag         = field->tag;
-    dup->raw_iter    = qd_iterator_dup(field->raw_iter);
-    dup->typed_iter  = qd_iterator_dup(field->typed_iter);
-    dup->parse_error = field->parse_error;
+    dup->parent     = parent;
+    dup->raw_iter   = qd_iterator_dup(field->raw_iter);
+    dup->typed_iter = qd_iterator_dup(field->typed_iter);
+    dup->amqp       = field->amqp;
+    dup->full_field = field->full_field;
 
     qd_parsed_field_t *child = DEQ_HEAD(field->children);
     while (child) {
@@ -341,37 +285,57 @@ qd_parsed_field_t *qd_parse_dup(const qd_parsed_field_t *field)
 
 int qd_parse_ok(qd_parsed_field_t *field)
 {
-    return field->parse_error == 0;
+    return field && field->parse_error == 0;
 }
 
 
 const char *qd_parse_error(qd_parsed_field_t *field)
 {
-    return field->parse_error;
+    return field ? field->parse_error : "No field";
 }
 
 
 uint8_t qd_parse_tag(qd_parsed_field_t *field)
 {
-    return field->tag;
+    assert(field);
+    return field->amqp.tag;
 }
 
 
+// just the data (no header/tag)
 qd_iterator_t *qd_parse_raw(qd_parsed_field_t *field)
 {
     if (!field)
         return 0;
+    if (!field->raw_iter) {
+        field->raw_iter = qd_iterator_buffer_field(&field->amqp.value,
+                                                   ITER_VIEW_ALL);
+    }
 
     return field->raw_iter;
 }
 
 
+// includes type header, tag and data
 qd_iterator_t *qd_parse_typed(qd_parsed_field_t *field)
 {
+    if (!field)
+        return 0;
+    if (!field->typed_iter) {
+        field->typed_iter = qd_iterator_buffer_field(&field->full_field,
+                                                     ITER_VIEW_ALL);
+    }
     return field->typed_iter;
 }
 
 
+qd_buffer_field_t qd_parse_value(const qd_parsed_field_t *field)
+{
+    assert(field && !field->parse_error);
+    return field->amqp.value;
+}
+
+
 uint32_t qd_parse_as_uint(qd_parsed_field_t *field)
 {
     uint32_t result = 0;
@@ -388,35 +352,39 @@ uint32_t qd_parse_as_uint(qd_parsed_field_t *field)
 }
 
 
-uint64_t qd_parse_as_ulong(qd_parsed_field_t *field)
+uint64_t qd_parse_as_ulong(qd_parsed_field_t *parsed_field)
 {
     uint64_t result = 0;
+    uint32_t tmp32 = 0;
+    uint8_t  octet = 0;
 
-    qd_iterator_reset(field->raw_iter);
+    qd_buffer_field_t field = parsed_field->amqp.value;
 
-    switch (field->tag) {
+    switch (parsed_field->amqp.tag) {
     case QD_AMQP_ULONG:
     case QD_AMQP_TIMESTAMP:
-        result |= ((uint64_t) qd_iterator_octet(field->raw_iter)) << 56;
-        result |= ((uint64_t) qd_iterator_octet(field->raw_iter)) << 48;
-        result |= ((uint64_t) qd_iterator_octet(field->raw_iter)) << 40;
-        result |= ((uint64_t) qd_iterator_octet(field->raw_iter)) << 32;
-        // Fall Through...
+        qd_buffer_field_uint32(&field, &tmp32);
+        result = ((uint64_t) tmp32) << 32;
+        qd_buffer_field_uint32(&field, &tmp32);
+        result |= ((uint64_t) tmp32);
+        break;
 
     case QD_AMQP_UINT:
-        result |= ((uint64_t) qd_iterator_octet(field->raw_iter)) << 24;
-        result |= ((uint64_t) qd_iterator_octet(field->raw_iter)) << 16;
-        // Fall Through...
+        qd_buffer_field_uint32(&field, &tmp32);
+        result = tmp32;
+        break;
 
     case QD_AMQP_USHORT:
-        result |= ((uint64_t) qd_iterator_octet(field->raw_iter)) << 8;
+        qd_buffer_field_octet(&field, &octet);
+        result = ((uint64_t) octet) << 8;
         // Fall Through...
 
     case QD_AMQP_BOOLEAN:
     case QD_AMQP_UBYTE:
     case QD_AMQP_SMALLUINT:
     case QD_AMQP_SMALLULONG:
-        result |= (uint64_t) qd_iterator_octet(field->raw_iter);
+        qd_buffer_field_octet(&field, &octet);
+        result |= (uint64_t) octet;
         break;
 
     case QD_AMQP_TRUE:
@@ -435,11 +403,10 @@ uint64_t qd_parse_as_ulong(qd_parsed_field_t *field)
     case QD_AMQP_SYM32:
         {
             // conversion from string to 64 bit unsigned integer:
-            // the maximum unsigned 64 bit value would need 20 characters.
-            char buf[64];
-            qd_iterator_strncpy(field->raw_iter, buf, sizeof(buf));
-            if (sscanf(buf, "%"SCNu64, &result) != 1)
-                field->parse_error = "Cannot convert string to unsigned long";
+            char *value = qd_buffer_field_strdup(&field);
+            if (sscanf(value, "%"SCNu64, &result) != 1)
+                parsed_field->parse_error = "Cannot convert string to unsigned long";
+            free(value);
         }
         break;
 
@@ -451,12 +418,12 @@ uint64_t qd_parse_as_ulong(qd_parsed_field_t *field)
     case QD_AMQP_SMALLLONG:
     {
         // if a signed integer is positive, accept it
-        int64_t ltmp = qd_parse_as_long(field);
-        if (qd_parse_ok(field)) {
+        int64_t ltmp = qd_parse_as_long(parsed_field);
+        if (qd_parse_ok(parsed_field)) {
             if (ltmp >= 0) {
                 result = (uint64_t)ltmp;
             } else {
-                field->parse_error = "Unable to parse negative integer as unsigned";
+                parsed_field->parse_error = "Unable to parse negative integer as unsigned";
             }
         }
     }
@@ -464,7 +431,7 @@ uint64_t qd_parse_as_ulong(qd_parsed_field_t *field)
 
 
     default:
-        field->parse_error = "Unable to parse as an unsigned integer";
+        parsed_field->parse_error = "Unable to parse as an unsigned integer";
         // catch any missing types during development
         assert(false);
     }
@@ -489,48 +456,51 @@ int32_t qd_parse_as_int(qd_parsed_field_t *field)
 }
 
 
-int64_t qd_parse_as_long(qd_parsed_field_t *field)
+int64_t qd_parse_as_long(qd_parsed_field_t *parsed_field)
 {
     int64_t result = 0;
 
-    qd_iterator_reset(field->raw_iter);
+    qd_buffer_field_t field = parsed_field->amqp.value;
 
-    switch (field->tag) {
+    switch (parsed_field->amqp.tag) {
     case QD_AMQP_LONG: {
-        uint64_t tmp = ((uint64_t) qd_iterator_octet(field->raw_iter)) << 56;
-        tmp |= ((uint64_t) qd_iterator_octet(field->raw_iter)) << 48;
-        tmp |= ((uint64_t) qd_iterator_octet(field->raw_iter)) << 40;
-        tmp |= ((uint64_t) qd_iterator_octet(field->raw_iter)) << 32;
-        tmp |= ((uint64_t) qd_iterator_octet(field->raw_iter)) << 24;
-        tmp |= ((uint64_t) qd_iterator_octet(field->raw_iter)) << 16;
-        tmp |= ((uint64_t) qd_iterator_octet(field->raw_iter)) << 8;
-        tmp |= (uint64_t) qd_iterator_octet(field->raw_iter);
-        result = (int64_t) tmp;
+        uint64_t convert;
+        uint32_t tmp32 = 0;
+        qd_buffer_field_uint32(&field, &tmp32);
+        convert = ((uint64_t) tmp32) << 32;
+        qd_buffer_field_uint32(&field, &tmp32);
+        convert |= (uint64_t) tmp32;
+        result = (int64_t) convert;
         break;
     }
 
     case QD_AMQP_INT: {
-        uint32_t tmp = ((uint32_t) qd_iterator_octet(field->raw_iter)) << 24;
-        tmp |= ((uint32_t) qd_iterator_octet(field->raw_iter)) << 16;
-        tmp |= ((uint32_t) qd_iterator_octet(field->raw_iter)) << 8;
-        tmp |= ((uint32_t) qd_iterator_octet(field->raw_iter));
+        uint32_t tmp = 0;
+        qd_buffer_field_uint32(&field, &tmp);
         result = (int32_t) tmp;
         break;
     }
 
     case QD_AMQP_SHORT: {
-        uint16_t tmp = ((uint16_t) qd_iterator_octet(field->raw_iter)) << 8;
-        tmp |= ((uint16_t) qd_iterator_octet(field->raw_iter));
-        result = (int16_t) tmp;
+        uint16_t convert;
+        uint8_t octet = 0;
+        qd_buffer_field_octet(&field, &octet);
+        convert = ((uint16_t) octet) << 8;
+        qd_buffer_field_octet(&field, &octet);
+        convert |= ((uint16_t) octet);
+        result = (int16_t) convert;
         break;
     }
 
     case QD_AMQP_BYTE:
     case QD_AMQP_BOOLEAN:
     case QD_AMQP_SMALLLONG:
-    case QD_AMQP_SMALLINT:
-        result = (int8_t) qd_iterator_octet(field->raw_iter);
+    case QD_AMQP_SMALLINT: {
+        uint8_t octet = 0;
+        qd_buffer_field_octet(&field, &octet);
+        result = (int8_t) octet;
         break;
+    }
 
     case QD_AMQP_TRUE:
         result = 1;
@@ -548,11 +518,10 @@ int64_t qd_parse_as_long(qd_parsed_field_t *field)
     case QD_AMQP_SYM32:
         {
             // conversion from string to 64 bit integer:
-            // the maximum 64 bit value would need 20 characters.
-            char buf[64];
-            qd_iterator_strncpy(field->raw_iter, buf, sizeof(buf));
-            if (sscanf(buf, "%"SCNi64, &result) != 1)
-                field->parse_error = "Cannot convert string to long";
+            char *value = qd_buffer_field_strdup(&field);
+            if (sscanf(value, "%"SCNi64, &result) != 1)
+                parsed_field->parse_error = "Cannot convert string to long";
+            free(value);
         }
         break;
 
@@ -564,10 +533,10 @@ int64_t qd_parse_as_long(qd_parsed_field_t *field)
     case QD_AMQP_ULONG:
     {
         // if an unsigned integer "fits" accept it
-        uint64_t utmp = qd_parse_as_ulong(field);
-        if (qd_parse_ok(field)) {
+        uint64_t utmp = qd_parse_as_ulong(parsed_field);
+        if (qd_parse_ok(parsed_field)) {
             uint64_t max = INT8_MAX;
-            switch (field->tag) {
+            switch (parsed_field->amqp.tag) {
             case QD_AMQP_USHORT:
                 max = INT16_MAX;
                 break;
@@ -581,14 +550,14 @@ int64_t qd_parse_as_long(qd_parsed_field_t *field)
             if (utmp <= max) {
                 result = (int64_t)utmp;
             } else {
-                field->parse_error = "Unable to parse unsigned integer as a signed integer";
+                parsed_field->parse_error = "Unable to parse unsigned integer as a signed integer";
             }
         }
     }
     break;
 
     default:
-        field->parse_error = "Unable to parse as a signed integer";
+        parsed_field->parse_error = "Unable to parse as a signed integer";
         // catch any missing types during development
         assert(false);
     }
@@ -597,17 +566,20 @@ int64_t qd_parse_as_long(qd_parsed_field_t *field)
 }
 
 
-bool qd_parse_as_bool(qd_parsed_field_t *field)
+bool qd_parse_as_bool(qd_parsed_field_t *parsed_field)
 {
     bool result = false;
 
-    qd_iterator_reset(field->raw_iter);
+    qd_buffer_field_t field = parsed_field->amqp.value;
 
-    switch (field->tag) {
+    switch (parsed_field->amqp.tag) {
     case QD_AMQP_BYTE:
-    case QD_AMQP_BOOLEAN:
-        result = !!qd_iterator_octet(field->raw_iter);
+    case QD_AMQP_BOOLEAN: {
+        uint8_t octet = 0;
+        qd_buffer_field_octet(&field, &octet);
+        result = !!octet;
         break;
+    }
 
     case QD_AMQP_TRUE:
         result = true;
@@ -618,11 +590,31 @@ bool qd_parse_as_bool(qd_parsed_field_t *field)
 }
 
 
+char *qd_parse_as_string(const qd_parsed_field_t *parsed_field)
+{
+    char *str = 0;
+    switch (parsed_field->amqp.tag) {
+    case QD_AMQP_STR8_UTF8:
+    case QD_AMQP_SYM8:
+    case QD_AMQP_STR32_UTF8:
+    case QD_AMQP_SYM32: {
+        qd_buffer_field_t tmp = parsed_field->amqp.value;
+        str = qd_buffer_field_strdup(&tmp);
+        break;
+    }
+    default:
+        break;
+    }
+
+    return str;
+}
+
+
 uint32_t qd_parse_sub_count(qd_parsed_field_t *field)
 {
     uint32_t count = DEQ_SIZE(field->children);
 
-    if (field->tag == QD_AMQP_MAP8 || field->tag == QD_AMQP_MAP32)
+    if (field->amqp.tag == QD_AMQP_MAP8 || field->amqp.tag == QD_AMQP_MAP32)
         count = count >> 1;
 
     return count;
@@ -631,7 +623,7 @@ uint32_t qd_parse_sub_count(qd_parsed_field_t *field)
 
 qd_parsed_field_t *qd_parse_sub_key(qd_parsed_field_t *field, uint32_t idx)
 {
-    if (field->tag != QD_AMQP_MAP8 && field->tag != QD_AMQP_MAP32)
+    if (field->amqp.tag != QD_AMQP_MAP8 && field->amqp.tag != QD_AMQP_MAP32)
         return 0;
 
     idx = idx << 1;
@@ -647,7 +639,7 @@ qd_parsed_field_t *qd_parse_sub_key(qd_parsed_field_t *field, uint32_t idx)
 
 qd_parsed_field_t *qd_parse_sub_value(qd_parsed_field_t *field, uint32_t idx)
 {
-    if (field->tag == QD_AMQP_MAP8 || field->tag == QD_AMQP_MAP32)
+    if (field->amqp.tag == QD_AMQP_MAP8 || field->amqp.tag == QD_AMQP_MAP32)
         idx = (idx << 1) + 1;
 
     qd_parsed_field_t *key = DEQ_HEAD(field->children);
@@ -671,7 +663,7 @@ int qd_parse_is_map(qd_parsed_field_t *field)
     if (!field)
         return 0;
 
-    return is_tag_a_map(field->tag);
+    return is_tag_a_map(field->amqp.tag);
 }
 
 
@@ -680,9 +672,9 @@ int qd_parse_is_list(qd_parsed_field_t *field)
     if (!field)
         return 0;
 
-    return field->tag == QD_AMQP_LIST8
-        || field->tag == QD_AMQP_LIST32
-        || field->tag == QD_AMQP_LIST0;
+    return field->amqp.tag == QD_AMQP_LIST8
+        || field->amqp.tag == QD_AMQP_LIST32
+        || field->amqp.tag == QD_AMQP_LIST0;
 }
 
 
@@ -692,6 +684,13 @@ int qd_parse_is_scalar(qd_parsed_field_t *field)
 }
 
 
+static inline bool qd_parse_is_string(const qd_parsed_field_t *field)
+{
+    return field->amqp.tag == QD_AMQP_STR8_UTF8
+        || field->amqp.tag == QD_AMQP_STR32_UTF8;
+}
+
+
 qd_parsed_field_t *qd_parse_value_by_key(qd_parsed_field_t *field, const char *key)
 {
     if (!key)
@@ -704,11 +703,10 @@ qd_parsed_field_t *qd_parse_value_by_key(qd_parsed_field_t *field, const char *k
         if (!sub)
             return 0;
 
-        qd_iterator_t *iter = qd_parse_raw(sub);
-        if (!iter)
-            return 0;
+        qd_buffer_field_t value = sub->amqp.value;
+        size_t len = strlen(key);
 
-        if (qd_iterator_equal(iter, (const unsigned char*) key)) {
+        if (qd_buffer_field_equal(&value, (const uint8_t*) key, len)) {
             return qd_parse_sub_value(field, idx);
         }
     }
@@ -717,206 +715,184 @@ qd_parsed_field_t *qd_parse_value_by_key(qd_parsed_field_t *field, const char *k
 }
 
 
-const char *qd_parse_annotations_v1(
-    bool                strip_anno_in,
-    qd_iterator_t      *ma_iter_in,
-    qd_parsed_field_t **ma_ingress,
-    qd_parsed_field_t **ma_phase,
-    qd_parsed_field_t **ma_to_override,
-    qd_parsed_field_t **ma_trace,
-    qd_parsed_field_t **ma_stream,
-    qd_buffer_field_t  *blob_pointer,
-    uint32_t           *blob_item_count)
+const char *qd_parse_annotations(
+    bool                   strip_annotations_in,
+    qd_iterator_t         *ma_iter_in,
+    qd_parsed_field_t    **ma_ingress,
+    qd_parsed_field_t    **ma_phase,
+    qd_parsed_field_t    **ma_to_override,
+    qd_parsed_field_t    **ma_trace,
+    qd_parsed_field_t    **ma_stream,
+    qd_buffer_field_t     *user_annotations,
+    uint32_t              *user_count)
 {
-    // Do full parse
-    qd_iterator_reset(ma_iter_in);
+    *ma_ingress             = 0;
+    *ma_phase               = 0;
+    *ma_to_override         = 0;
+    *ma_trace               = 0;
+    ZERO(user_annotations);
+    *user_count        = 0;
 
-    qd_parsed_turbo_list_t annos;
-    uint32_t               user_entries;
-    uint32_t               user_bytes;
-    const char * parse_error = qd_parse_turbo(ma_iter_in, &annos, &user_entries, &user_bytes);
-    if (parse_error) {
-        return parse_error;
-    }
+    if (!ma_iter_in)
+        return 0;  // ok - MA not present
 
-    // define a shorthand name for the qd message annotation key prefix length
-#define QMPL QD_MA_PREFIX_LEN
-
-    // trace, phase, and class keys are all the same length
-    assert(QD_MA_TRACE_LEN == QD_MA_PHASE_LEN);
-    assert(QD_MA_TRACE_LEN == QD_MA_CLASS_LEN);
-    
-    qd_parsed_turbo_t *anno;
-    if (!strip_anno_in) {
-        anno = DEQ_HEAD(annos);
-        while (anno) {
-            const uint8_t *dp;                // pointer to key name in raw buf or extract buf
-            char key_name[QD_MA_MAX_KEY_LEN]; // key name extracted across buf boundary
-            int key_len = anno->size;
-
-            const int avail = qd_buffer_cursor(anno->bufptr.buffer) - anno->bufptr.cursor;
-            if (avail >= anno->size + anno->length_of_size + 1) {
-                // The best case: key name is completely in current raw buffer
-                dp = anno->bufptr.cursor + anno->length_of_size + 1;
-            } else {
-                // Pull the key name from multiple buffers
-                qd_buffer_field_t wbuf = anno->bufptr;    // scratch buf pointers for getting key
-                qd_buffer_field_advance(&wbuf, anno->length_of_size + 1);
-                int t_size = MIN(anno->size, QD_MA_MAX_KEY_LEN); // get this many total
-                key_len = qd_buffer_field_ncopy(&wbuf, (uint8_t*) key_name, t_size);
+    const char *error = 0;
+    qd_buffer_field_t bfield = qd_iterator_get_view_cursor(ma_iter_in);
 
-                dp = (const uint8_t *)key_name;
-            }
+    qd_amqp_field_t ma_map;
+    error = parse_amqp_field(&bfield, &ma_map);
+    if (error)
+        return error;
 
-            // Verify that the key starts with the prefix.
-            // Once a key with the routing prefix is observed in the annotation
-            // stream then the remainder of the keys must be routing keys.
-            // Padding keys are not real routing annotations but they have
-            // the routing prefix.
-            assert(key_len >= QMPL && memcmp(QD_MA_PREFIX, dp, QMPL) == 0);
-
-            // Advance pointer to data beyond the common prefix
-            dp += QMPL;
-
-            qd_ma_enum_t ma_type = QD_MAE_NONE;
-            switch (key_len) {
-                case QD_MA_TO_LEN:
-                    if (memcmp(QD_MA_TO + QMPL,      dp, QD_MA_TO_LEN - QMPL) == 0) {
-                        ma_type = QD_MAE_TO;
-                    }
-                    break;
-                case QD_MA_TRACE_LEN:
-                    if (memcmp(QD_MA_TRACE + QMPL,  dp, QD_MA_TRACE_LEN - QMPL) == 0) {
-                        ma_type = QD_MAE_TRACE;
-                    } else
-                    if (memcmp(QD_MA_PHASE + QMPL,  dp, QD_MA_PHASE_LEN - QMPL) == 0) {
-                        ma_type = QD_MAE_PHASE;
-                    }
-                    break;
-                case QD_MA_INGRESS_LEN:
-                    if (memcmp(QD_MA_INGRESS + QMPL, dp, QD_MA_INGRESS_LEN - QMPL) == 0) {
-                        ma_type = QD_MAE_INGRESS;
-                    }
-                    break;
-                case QD_MA_STREAM_LEN:
-                    if (memcmp(QD_MA_STREAM + QMPL, dp, QD_MA_STREAM_LEN - QMPL) == 0) {
-                        ma_type = QD_MAE_STREAM;
-                    }
-                    break;
-                default:
-                    // padding annotations are ignored here
-                    break;
-            }
+    if (ma_map.tag != QD_AMQP_MAP8 && ma_map.tag != QD_AMQP_MAP32)
+        return "Invalid message annotations section - missing map type";
 
-            // Process the data field
-            anno = DEQ_NEXT(anno);
-            assert(anno);
-
-            if (ma_type != QD_MAE_NONE) {
-                // produce a parsed_field for the data
-                qd_iterator_t *val_iter =
-                    qd_iterator_buffer(anno->bufptr.buffer,
-                                    anno->bufptr.cursor - qd_buffer_base(anno->bufptr.buffer),
-                                    anno->size + anno->length_of_size,
-                                    ITER_VIEW_ALL);
-                assert(val_iter);
-
-                qd_parsed_field_t *val_field = qd_parse(val_iter);
-                assert(val_field);
-
-                // transfer ownership of the extracted value to the message
-                switch (ma_type) {
-                    case QD_MAE_INGRESS:
-                        *ma_ingress = val_field;
-                        break;
-                    case QD_MAE_TRACE:
-                        *ma_trace = val_field;
-                        break;
-                    case QD_MAE_TO:
-                        *ma_to_override = val_field;
-                        break;
-                    case QD_MAE_PHASE:
-                        *ma_phase = val_field;
-                        break;
-                    case QD_MAE_STREAM:
-                        *ma_stream = val_field;
-                        break;
-                    case QD_MAE_NONE:
-                        assert(false);
-                        break;
-                }
+    if (ma_map.count & 0x01)
+        return "Invalid MA map count (odd number of fields)";
 
-                qd_iterator_free(val_iter);
-            }
-            anno = DEQ_NEXT(anno);
-        }
-    }
+    if (ma_map.count == 0)
+        return 0;  // empty map, ignore
 
-    anno = DEQ_HEAD(annos);
-    while (anno) {
-        DEQ_REMOVE_HEAD(annos);
-        free_qd_parsed_turbo_t(anno);
-        anno = DEQ_HEAD(annos);
-    }
 
-    // Adjust size of user annotation blob by the size of the router
-    // annotations
-    blob_pointer->remaining = user_bytes;
-    assert(blob_pointer->remaining >= 0);
+    // ma_map.value now holds all of the key/value fields in the map and points
+    // to the first key/value pair.  The router-specific map entries always
+    // come after any user-supplied MA data. Snapshot the current location for
+    // the start of user data
 
-    *blob_item_count = user_entries;
-    return 0;
-}
+    user_annotations->buffer = ma_map.value.buffer;
+    user_annotations->cursor = ma_map.value.cursor;
 
+    bool user_anno = true;       // assume first annotations are non-router
+    size_t user_annos_size = 0;
+    uint32_t user_annos_count = 0;
 
-void qd_parse_annotations(
-    bool                strip_annotations_in,
-    qd_iterator_t      *ma_iter_in,
-    qd_parsed_field_t **ma_ingress,
-    qd_parsed_field_t **ma_phase,
-    qd_parsed_field_t **ma_to_override,
-    qd_parsed_field_t **ma_trace,
-    qd_parsed_field_t **ma_stream,
-    qd_buffer_field_t  *blob_pointer,
-    uint32_t           *blob_item_count)
-{
-    *ma_ingress             = 0;
-    *ma_phase               = 0;
-    *ma_to_override         = 0;
-    *ma_trace               = 0;
-    ZERO(blob_pointer);
-    *blob_item_count        = 0;
+    // Now iterate over each key looking for router-specific map entires
+    qd_buffer_field_t ma_fields = ma_map.value;
 
-    if (!ma_iter_in)
-        return;
+    int kv_count = ma_map.count / 2;  // pairs of key,value fields
+    while (kv_count--) {
+        qd_amqp_field_t key;
 
-    uint8_t  tag             = 0;
-    uint32_t size            = 0;
-    uint32_t length_of_count = 0;
-    uint32_t length_of_size  = 0;
+        // extract key, advance ma_fields to the value field
+        error = parse_amqp_field(&ma_fields, &key);
+        if (error)
+            return error;
 
-    const char *parse_error = get_type_info(ma_iter_in, &tag,
-                                            &size, blob_item_count, &length_of_size,
-                                            &length_of_count);
-    if (parse_error)
-        return;
+        if (key.tag == QD_AMQP_SYM8 || key.tag == QD_AMQP_SYM32) {
 
-    if (!is_tag_a_map(tag)) {
-        return;
-    }
+            switch (key.value.remaining) {
+            case QD_MA_PREFIX_LEN:
+                if (qd_buffer_field_equal(&key.value, (uint8_t*) QD_MA_PREFIX, QD_MA_PREFIX_LEN)) {
+                    qd_amqp_field_t skip;
+                    user_anno = false;
+                    // empty router annotation - ignore it
+                    error = parse_amqp_field(&ma_fields, &skip);
+                    if (error)
+                        return error;
+                }
+                break;
+            case QD_MA_TO_LEN:
+                if (qd_buffer_field_equal(&key.value, (uint8_t*) QD_MA_TO, QD_MA_TO_LEN)) {
+                    user_anno = false;
+                    if (!strip_annotations_in) {
+                        (*ma_to_override) = qd_parse_internal(&ma_fields, 0);
+                        if (!qd_parse_ok((*ma_to_override)))
+                            return (*ma_to_override)->parse_error;
+                        if (!qd_parse_is_string(*ma_to_override))
+                            return "to-override not a valid string type";
+                    }
+                }
+                break;
+            case QD_MA_TRACE_LEN:
+                // Same length as QD_MA_PHASE_LEN and QD_MA_CLASS_LEN:
+                assert(QD_MA_TRACE_LEN == QD_MA_PHASE_LEN);
+                assert(QD_MA_PHASE_LEN == QD_MA_CLASS_LEN);
+                if (qd_buffer_field_equal(&key.value, (uint8_t*) QD_MA_TRACE, QD_MA_TRACE_LEN)) {
+                    user_anno = false;
+                    if (!strip_annotations_in) {
+                        (*ma_trace) = qd_parse_internal(&ma_fields, 0);
+                        if (!qd_parse_ok((*ma_trace)))
+                            return (*ma_trace)->parse_error;
+                        if (!qd_parse_is_list((*ma_trace)))
+                            return "trace annotation is not a list";
+                        bool all_str = true;
+                        for (qd_parsed_field_t *node = DEQ_HEAD((*ma_trace)->children);
+                             node && all_str;
+                             node = DEQ_NEXT(node)) {
+                            all_str = qd_parse_is_string(node);
+                        }
+                        if (!all_str)
+                            return "trace list contains non-string entries";
+                    }
 
-    // Initial snapshot on size/content of annotation payload
-    qd_iterator_t *raw_iter = qd_iterator_sub(ma_iter_in, (size - length_of_count));
+                } else if (qd_buffer_field_equal(&key.value, (uint8_t*) QD_MA_PHASE, QD_MA_PHASE_LEN)) {
+                    user_anno = false;
+                    // always encoded as an int, may be small:
+                    if (!strip_annotations_in) {
+                        (*ma_phase) = qd_parse_internal(&ma_fields, 0);
+                        if (!qd_parse_ok((*ma_phase)))
+                            return (*ma_phase)->parse_error;
+                    }
+
+                } else if (qd_buffer_field_equal(&key.value, (uint8_t*) QD_MA_CLASS, QD_MA_CLASS_LEN)) {
+                    // no longer used - skip it
+                    qd_amqp_field_t skip;
+                    user_anno = false;
+                    error = parse_amqp_field(&ma_fields, &skip);
+                    if (error)
+                        return error;
+                }
+                break;
+            case QD_MA_STREAM_LEN:
+                if (qd_buffer_field_equal(&key.value, (uint8_t*) QD_MA_STREAM, QD_MA_STREAM_LEN)) {
+                    user_anno = false;
+                    if (!strip_annotations_in) {
+                        (*ma_stream) = qd_parse_internal(&ma_fields, 0);
+                        if (!qd_parse_ok((*ma_stream)))
+                            return (*ma_stream)->parse_error;
+                    }
+                }
+                break;
+            case QD_MA_INGRESS_LEN:
+                if (qd_buffer_field_equal(&key.value, (uint8_t*) QD_MA_INGRESS, QD_MA_INGRESS_LEN)) {
+                    user_anno = false;
+                    if (!strip_annotations_in) {
+                        (*ma_ingress) = qd_parse_internal(&ma_fields, 0);
+                        if (!qd_parse_ok((*ma_ingress)))
+                            return (*ma_ingress)->parse_error;
+                        if (!qd_parse_is_string(*ma_ingress))
+                            return "ingress router not a string type";
+                    }
+                }
+                break;
 
-    // If there are no router annotations then all annotations
-    // are the user's opaque blob.
-    *blob_pointer = qd_iterator_get_view_cursor(raw_iter);
+            default:  // user value
+                break;
+            }
+        }
+
+        if (user_anno) {
+            qd_amqp_field_t user;
+
+            // move past the value:
+            error = parse_amqp_field(&ma_fields, &user);
+            if (error)
+                return error;
 
-    qd_iterator_free(raw_iter);
+            size_t key_len = 1 + tag_get_size_length(key.tag) + key.size;
+            size_t value_len = 1 + tag_get_size_length(user.tag) + user.size;
+            user_annos_size += key_len + value_len;
+            user_annos_count += 2;
 
-    (void) qd_parse_annotations_v1(strip_annotations_in, ma_iter_in, ma_ingress, ma_phase,
-                                    ma_to_override, ma_trace, ma_stream,
-                                    blob_pointer, blob_item_count);
+        } else if (strip_annotations_in) {
+            // hit the first non-user key - stop
+            break;
+        }
+     }
+
+    user_annotations->remaining = user_annos_size;
+    *user_count = user_annos_count;
 
-    return;
+    return 0;
 }
+
diff --git a/src/python_embedded.c b/src/python_embedded.c
index 833df4a..79d0cc1 100644
--- a/src/python_embedded.c
+++ b/src/python_embedded.c
@@ -776,19 +776,7 @@ static PyObject *qd_python_send(PyObject *self, PyObject *args)
         return 0;
 
     if (compose_python_message(&field, message, ioa->qd) == QD_ERROR_NONE) {
-        qd_message_t *msg = qd_message();
-        qd_message_compose_2(msg, field, true);
-
-        qd_composed_field_t *ingress = qd_compose_subfield(0);
-        qd_compose_insert_string(ingress, qd_router_id(ioa->qd));
-
-        qd_composed_field_t *trace = qd_compose_subfield(0);
-        qd_compose_start_list(trace);
-        qd_compose_insert_string(trace, qd_router_id(ioa->qd));
-        qd_compose_end_list(trace);
-
-        qd_message_set_ingress_annotation(msg, ingress);
-        qd_message_set_trace_annotation(msg, trace);
+        qd_message_t *msg = qd_message_compose(field, 0, 0, true);
 
         PyObject *address = PyObject_GetAttrString(message, "address");
         if (address) {
@@ -802,7 +790,6 @@ static PyObject *qd_python_send(PyObject *self, PyObject *args)
             }
             Py_DECREF(address);
         }
-        qd_compose_free(field);
         qd_message_free(msg);
         Py_RETURN_NONE;
     }
diff --git a/src/router_config.c b/src/router_config.c
index a564523..35e5990 100644
--- a/src/router_config.c
+++ b/src/router_config.c
@@ -34,6 +34,7 @@ static void qdi_router_configure_body(qdr_core_t              *core,
 
     qd_iterator_t *iter = qd_iterator_buffer(DEQ_HEAD(buffers), 0, qd_buffer_list_length(&buffers), ITER_VIEW_ALL);
     qd_parsed_field_t   *in_body = qd_parse(iter);
+    assert(qd_parse_ok(in_body));
     qd_iterator_free(iter);
 
     qd_iterator_t *name_iter = 0;
diff --git a/src/router_core/core_client_api.c b/src/router_core/core_client_api.c
index 78df91f..c292eed 100644
--- a/src/router_core/core_client_api.c
+++ b/src/router_core/core_client_api.c
@@ -679,20 +679,13 @@ static qd_message_t *_create_message_CT(qdrc_client_t *client,
         qd_compose_end_list(fld);
     }
 
-    qd_message_t *message = qd_message();
-    if (req->app_properties && req->body) {
-        qd_message_compose_4(message, fld, req->app_properties, req->body, true);
-    } else if (req->body) {
-        qd_message_compose_3(message, fld, req->body, true);
-    } else if (req->app_properties) {
-        qd_message_compose_3(message, fld, req->app_properties, true);
+    qd_message_t *message = 0;
+    if (req->app_properties) {
+        message = qd_message_compose(fld, req->app_properties, req->body, true);
     } else {
-        qd_message_compose_2(message, fld, true);
+        message = qd_message_compose(fld, req->body, 0, true);
     }
-    qd_compose_free(fld);
-    qd_compose_free(req->body);
     req->body = 0;
-    qd_compose_free(req->app_properties);
     req->app_properties = 0;
 
     return message;
diff --git a/src/router_core/exchange_bindings.c b/src/router_core/exchange_bindings.c
index a1d4d7f..4ae2381 100644
--- a/src/router_core/exchange_bindings.c
+++ b/src/router_core/exchange_bindings.c
@@ -220,16 +220,8 @@ int qdr_forward_exchange_CT(qdr_core_t     *core,
             in_delivery->link_exclusion = 0;
         }
 
-        const char *node_id = qd_router_id(core->qd);
-        qd_composed_field_t *trace_field = qd_compose_subfield(0);
-        qd_compose_start_list(trace_field);
-        qd_compose_insert_string(trace_field, node_id);
-        qd_compose_end_list(trace_field);
-        qd_message_set_trace_annotation(msg, trace_field);
-
-        qd_composed_field_t *ingress_field = qd_compose_subfield(0);
-        qd_compose_insert_string(ingress_field, node_id);
-        qd_message_set_ingress_annotation(msg, ingress_field);
+        qd_message_reset_trace_annotation(msg);
+        qd_message_reset_ingress_router_annotation(msg);
     }
 
     next_hop_t *next_hop = DEQ_HEAD(transmit_list);
@@ -322,9 +314,7 @@ static int send_message(qdr_core_t     *core,
            next_hop->exchange->name, next_hop->next_hop);
 
     // set "to override" and "phase" message annotations based on the next hop
-    qd_composed_field_t *to_field = qd_compose_subfield(0);
-    qd_compose_insert_string(to_field, (char *)next_hop->next_hop);
-    qd_message_set_to_override_annotation(copy, to_field);  // frees to_field
+    qd_message_set_to_override_annotation(copy, (const char*) next_hop->next_hop);
     qd_message_set_phase_annotation(copy, next_hop->phase);
 
     count = qdr_forward_message_CT(core, next_hop->qdr_addr, copy, in_delivery, exclude_inprocess, control);
diff --git a/src/router_core/management_agent.c b/src/router_core/management_agent.c
index c8abef0..fd76da3 100644
--- a/src/router_core/management_agent.c
+++ b/src/router_core/management_agent.c
@@ -80,7 +80,6 @@ typedef enum {
 
 
 typedef struct qd_management_context_t {
-    qd_message_t               *msg;
     qd_message_t               *source;
     qd_composed_field_t        *field;
     qdr_query_t                *query;
@@ -96,8 +95,7 @@ ALLOC_DEFINE(qd_management_context_t);
 /**
  * Convenience function to create and initialize context (qd_management_context_t)
  */
-static qd_management_context_t* qd_management_context(qd_message_t               *msg,
-                                                      qd_message_t               *source,
+static qd_management_context_t* qd_management_context(qd_message_t               *source,
                                                       qd_composed_field_t        *field,
                                                       qdr_query_t                *query,
                                                       qdr_core_t                 *core,
@@ -107,7 +105,6 @@ static qd_management_context_t* qd_management_context(qd_message_t
     qd_management_context_t *ctx = new_qd_management_context_t();
     ctx->count  = count;
     ctx->field  = field;
-    ctx->msg    = msg;
     ctx->source = qd_message_copy(source);
     ctx->query  = query;
     ctx->current_count = 0;
@@ -218,19 +215,15 @@ static void qd_manage_response_handler(void *context, const qd_amqp_error_t *sta
     qd_set_response_status(status, &fld);
 
     // Finally, compose and send the message.
-    qd_message_compose_3(ctx->msg, fld, ctx->field, true);
-
-    qdr_send_to1(ctx->core, ctx->msg, reply_to, true, false);
+    qd_message_t *msg = qd_message_compose(fld, ctx->field, 0, true);
+    qdr_send_to1(ctx->core, msg, reply_to, true, false);
+    qd_message_free(msg);
 
     // We have come to the very end. Free the appropriate memory.
     // Just go over this with Ted to see if I freed everything.
 
     qd_iterator_free(reply_to);
-    qd_compose_free(fld);
-
-    qd_message_free(ctx->msg);
     qd_message_free(ctx->source);
-    qd_compose_free(ctx->field);
 
     if (need_free) {
         qdr_query_free(ctx->query);
@@ -259,7 +252,7 @@ static void qd_core_agent_query_handler(qdr_core_t                 *core,
     qd_compose_insert_string(field, ATTRIBUTE_NAMES);
 
     // Call local function that creates and returns a local qd_management_context_t object containing the values passed in.
-    qd_management_context_t *ctx = qd_management_context(qd_message(), msg, field, 0, core, operation_type, (*count));
+    qd_management_context_t *ctx = qd_management_context(msg, field, 0, core, operation_type, (*count));
 
     // Grab the attribute names from the incoming message body. The attribute names will be used later on in the response.
     qd_parsed_field_t *attribute_names_parsed_field = 0;
@@ -304,7 +297,7 @@ static void qd_core_agent_read_handler(qdr_core_t                 *core,
     qdr_manage_handler(core, qd_manage_response_handler);
 
     // Call local function that creates and returns a qd_management_context_t containing the values passed in.
-    qd_management_context_t *ctx = qd_management_context(qd_message(), msg, body, 0, core, operation_type, 0);
+    qd_management_context_t *ctx = qd_management_context(msg, body, 0, core, operation_type, 0);
 
     //Call the read API function
     qdr_manage_read(core, ctx, entity_type, name_iter, identity_iter, body, in_conn);
@@ -327,7 +320,7 @@ static void qd_core_agent_create_handler(qdr_core_t                 *core,
     qdr_manage_handler(core, qd_manage_response_handler);
 
     // Call local function that creates and returns a qd_management_context_t containing the values passed in.
-    qd_management_context_t *ctx = qd_management_context(qd_message(), msg, out_body, 0, core, operation_type, 0);
+    qd_management_context_t *ctx = qd_management_context(msg, out_body, 0, core, operation_type, 0);
 
     qd_iterator_t *body_iter = qd_message_field_iterator(msg, QD_FIELD_BODY);
 
@@ -353,7 +346,7 @@ static void qd_core_agent_update_handler(qdr_core_t                 *core,
     // Set the callback function.
     qdr_manage_handler(core, qd_manage_response_handler);
 
-    qd_management_context_t *ctx = qd_management_context(qd_message(), msg, out_body, 0, core, operation_type, 0);
+    qd_management_context_t *ctx = qd_management_context(msg, out_body, 0, core, operation_type, 0);
 
     qd_iterator_t *iter = qd_message_field_iterator(msg, QD_FIELD_BODY);
     qd_parsed_field_t *in_body= qd_parse(iter);
@@ -381,7 +374,7 @@ static void qd_core_agent_delete_handler(qdr_core_t                 *core,
     qdr_manage_handler(core, qd_manage_response_handler);
 
     // Call local function that creates and returns a qd_management_context_t containing the values passed in.
-    qd_management_context_t *ctx = qd_management_context(qd_message(), msg, body, 0, core, operation_type, 0);
+    qd_management_context_t *ctx = qd_management_context(msg, body, 0, core, operation_type, 0);
 
     qdr_manage_delete(core, ctx, entity_type, name_iter, identity_iter, in_conn);
 }
diff --git a/src/router_core/modules/address_lookup_server/address_lookup_server.c b/src/router_core/modules/address_lookup_server/address_lookup_server.c
index e2bd3b2..1ea4fbd 100644
--- a/src/router_core/modules/address_lookup_server/address_lookup_server.c
+++ b/src/router_core/modules/address_lookup_server/address_lookup_server.c
@@ -73,6 +73,7 @@ static uint64_t _send_reply(_endpoint_ref_t             *epr,
                "Link route address reply failed - invalid request message properties"
                " (container=%s, endpoint=%p)",
                epr->container_id, (void *)epr->endpoint);
+        qd_compose_free(body);
         return PN_REJECTED;
     }
 
@@ -101,12 +102,9 @@ static uint64_t _send_reply(_endpoint_ref_t             *epr,
     qd_compose_insert_uint(fld,   status);
     qd_compose_end_map(fld);
 
-    qd_message_t *msg = qd_message();
-
-    qd_message_compose_3(msg, fld, body, true);
+    qd_message_t *msg = qd_message_compose(fld, body, 0, true);
     qdr_in_process_send_to_CT(_server_state.core, reply_to, msg, true, false);
     qd_message_free(msg);
-    qd_compose_free(fld);
 
     return PN_ACCEPTED;
 }
@@ -161,7 +159,6 @@ static uint64_t _do_link_route_lookup(_endpoint_ref_t   *epr,
                               cid,
                               reply_to,
                               out_body);
-    qd_compose_free(out_body);
 
     if (qd_log_enabled(_server_state.core->log, QD_LOG_TRACE)) {
         char *as = (char *)qd_iterator_copy(addr_i);
diff --git a/src/router_core/modules/edge_addr_tracking/edge_addr_tracking.c b/src/router_core/modules/edge_addr_tracking/edge_addr_tracking.c
index 56fad32..65f5b04 100644
--- a/src/router_core/modules/edge_addr_tracking/edge_addr_tracking.c
+++ b/src/router_core/modules/edge_addr_tracking/edge_addr_tracking.c
@@ -51,8 +51,6 @@ struct  qdr_addr_tracking_module_context_t {
 
 static qd_message_t *qdcm_edge_create_address_dlv(qdr_core_t *core, qdr_address_t   *addr, bool insert_addr)
 {
-    qd_message_t *msg = qd_message();
-
     //
     // Start header
     //
@@ -71,13 +69,9 @@ static qd_message_t *qdcm_edge_create_address_dlv(qdr_core_t *core, qdr_address_
     qd_compose_insert_bool(body, insert_addr);
     qd_compose_end_list(body);
 
-    // Finally, compose and retuen the message so it can be sent out.
-    qd_message_compose_3(msg, fld, body, true);
-
-    qd_compose_free(body);
-    qd_compose_free(fld);
+    // Finally, compose and return the message so it can be sent out.
 
-    return msg;
+    return qd_message_compose(fld, body, 0, true);
 }
 
 static qdr_addr_endpoint_state_t *qdrc_get_endpoint_state_for_connection(qdr_addr_endpoint_state_list_t  endpoint_state_list, qdr_connection_t *conn)
diff --git a/src/router_core/modules/heartbeat_edge/heartbeat_edge.c b/src/router_core/modules/heartbeat_edge/heartbeat_edge.c
index 19c0e69..cd7e131 100644
--- a/src/router_core/modules/heartbeat_edge/heartbeat_edge.c
+++ b/src/router_core/modules/heartbeat_edge/heartbeat_edge.c
@@ -138,10 +138,7 @@ static void on_timer(qdr_core_t *core, void *context)
         qd_compose_insert_int(body, client->next_msg_id);
         client->next_msg_id++;
 
-        qd_message_t *msg = qd_message();
-        qd_message_compose_2(msg, body, true);
-
-        qd_compose_free(body);
+        qd_message_t *msg = qd_message_compose(body, 0, 0, true);
 
         qdr_delivery_t *dlv = qdrc_endpoint_delivery_CT(client->core, client->endpoint, msg);
         qdrc_endpoint_send_CT(client->core, client->endpoint, dlv, true);
diff --git a/src/router_core/modules/mobile_sync/mobile.c b/src/router_core/modules/mobile_sync/mobile.c
index 8a9816d..0fde683 100644
--- a/src/router_core/modules/mobile_sync/mobile.c
+++ b/src/router_core/modules/mobile_sync/mobile.c
@@ -228,26 +228,10 @@ static void qcm_mobile_sync_compose_diff_hint_list(qdrm_mobile_sync_t *msync, qd
 
 static qd_message_t *qcm_mobile_sync_compose_differential_mau(qdrm_mobile_sync_t *msync, const char *address)
 {
-    qd_message_t        *msg     = qd_message();
     qd_composed_field_t *headers = qcm_mobile_sync_message_headers(address, MAU);
     qd_composed_field_t *body    = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0);
 
     //
-    // Add the ingress and trace annotations to the message to prevent this multicast from bouncing
-    // back to us.
-    //
-    qd_composed_field_t *ingress = qd_compose_subfield(0);
-    qd_compose_insert_string(ingress, qd_router_id(msync->core->qd));
-
-    qd_composed_field_t *trace = qd_compose_subfield(0);
-    qd_compose_start_list(trace);
-    qd_compose_insert_string(trace, qd_router_id(msync->core->qd));
-    qd_compose_end_list(trace);
-
-    qd_message_set_ingress_annotation(msg, ingress);
-    qd_message_set_trace_annotation(msg, trace);
-
-    //
     // Generate the message body
     //
     qd_compose_start_map(body);
@@ -274,16 +258,14 @@ static qd_message_t *qcm_mobile_sync_compose_differential_mau(qdrm_mobile_sync_t
 
     qd_compose_end_map(body);
 
-    qd_message_compose_3(msg, headers, body, true);
-    qd_compose_free(headers);
-    qd_compose_free(body);
+    qd_message_t *msg = qd_message_compose(headers, body, 0, true);
+
     return msg;
 }
 
 
 static qd_message_t *qcm_mobile_sync_compose_absolute_mau(qdrm_mobile_sync_t *msync, const char *address)
 {
-    qd_message_t        *msg     = qd_message();
     qd_composed_field_t *headers = qcm_mobile_sync_message_headers(address, MAU);
     qd_composed_field_t *body    = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0);
 
@@ -342,16 +324,13 @@ static qd_message_t *qcm_mobile_sync_compose_absolute_mau(qdrm_mobile_sync_t *ms
     }
     qd_compose_end_list(body);
     qd_compose_end_map(body);
-    qd_message_compose_3(msg, headers, body, true);
-    qd_compose_free(headers);
-    qd_compose_free(body);
-    return msg;
+
+    return qd_message_compose(headers, body, 0, true);
 }
 
 
 static qd_message_t *qcm_mobile_sync_compose_mar(qdrm_mobile_sync_t *msync, qdr_node_t *router)
 {
-    qd_message_t        *msg     = qd_message();
     qd_composed_field_t *headers = qcm_mobile_sync_message_headers(router->wire_address_ma, MAR);
     qd_composed_field_t *body    = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0);
 
@@ -370,10 +349,7 @@ static qd_message_t *qcm_mobile_sync_compose_mar(qdrm_mobile_sync_t *msync, qdr_
 
     qd_compose_end_map(body);
 
-    qd_message_compose_3(msg, headers, body, true);
-    qd_compose_free(headers);
-    qd_compose_free(body);
-    return msg;
+    return qd_message_compose(headers, body, 0, true);
 }
 
 
diff --git a/src/router_core/modules/test_hooks/core_test_hooks.c b/src/router_core/modules/test_hooks/core_test_hooks.c
index 850e1dd..d6cba2d 100644
--- a/src/router_core/modules/test_hooks/core_test_hooks.c
+++ b/src/router_core/modules/test_hooks/core_test_hooks.c
@@ -86,7 +86,6 @@ static void source_send(test_endpoint_t *ep, bool presettled)
     static uint32_t      sequence = 0;
     static char          stringbuf[100];
     qdr_delivery_t      *dlv;
-    qd_message_t        *msg   = qd_message();
     qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_HEADER, 0);
 
     sprintf(stringbuf, "Sequence: %"PRIu32, sequence);
@@ -109,9 +108,8 @@ static void source_send(test_endpoint_t *ep, bool presettled)
     field = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, field);
     qd_compose_insert_string(field, stringbuf);
 
+    qd_message_t *msg = qd_message_compose(field, 0, 0, true);
     dlv = qdrc_endpoint_delivery_CT(ep->node->core, ep->ep, msg);
-    qd_message_compose_2(msg, field, true);
-    qd_compose_free(field);
     qdrc_endpoint_send_CT(ep->node->core, ep->ep, dlv, presettled);
 
     if (--ep->credit > 0) {
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 6c05462..fc14ee9 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -38,23 +38,6 @@ static void qdr_send_to_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
 // Interface Functions
 //==================================================================================
 
-void qdr_new_message_annotate(qdr_core_t *core, qd_message_t *msg)
-{
-    if (core->router_mode != QD_ROUTER_MODE_EDGE) {
-        qd_composed_field_t *ingress = qd_compose_subfield(0);
-        qd_compose_insert_string(ingress, qd_router_id(core->qd));
-
-        qd_composed_field_t *trace = qd_compose_subfield(0);
-        qd_compose_start_list(trace);
-        qd_compose_insert_string(trace, qd_router_id(core->qd));
-        qd_compose_end_list(trace);
-
-        qd_message_set_ingress_annotation(msg, ingress);
-        qd_message_set_trace_annotation(msg, trace);
-    }
-}
-
-
 qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, qd_message_t *msg, qd_iterator_t *ingress,
                                  bool settled, qd_bitmask_t *link_exclusion, int ingress_index,
                                  uint64_t remote_disposition,
@@ -144,6 +127,9 @@ qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t *
     dlv->link_id            = link->identity;
     dlv->conn_id            = link->conn_id;
     dlv->dispo_lock         = sys_mutex();
+
+    qd_message_disable_router_annotations(msg);  // routed links do not use router annotations
+
     qd_log(link->core->log, QD_LOG_DEBUG, DLV_FMT" Delivery created qdr_link_deliver_to_routed_link", DLV_ARGS(dlv));
 
     qdr_delivery_incref(dlv, "qdr_link_deliver_to_routed_link - newly created delivery, add to action list");
@@ -664,10 +650,8 @@ static void qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery
     // itself associated with a fallback destination.
     //
     if (fanout == 0 && !!addr && !!addr->fallback && !link->fallback) {
-        const char          *key      = (const char*) qd_hash_key_by_handle(addr->fallback->hash_handle);
-        qd_composed_field_t *to_field = qd_compose_subfield(0);
-        qd_compose_insert_string(to_field, key + 2);
-        qd_message_set_to_override_annotation(dlv->msg, to_field);
+        const char *key = (const char*) qd_hash_key_by_handle(addr->fallback->hash_handle);
+        qd_message_set_to_override_annotation(dlv->msg, key + 2);
         qd_message_set_phase_annotation(dlv->msg, key[1] - '0');
         fanout = qdr_forward_message_CT(core, addr->fallback, dlv->msg, dlv, false, link->link_type == QD_LINK_CONTROL);
         if (fanout > 0) {
diff --git a/src/router_node.c b/src/router_node.c
index aa38395..a9fa048 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -45,6 +45,8 @@ static char *container_role = "route-container";
 static char *edge_role      = "edge";
 static char *direct_prefix;
 static char *node_id;
+static uint8_t *encoded_node_id;
+static size_t encoded_node_id_len;
 
 static void deferred_AMQP_rx_handler(void *context, bool discard);
 static bool parse_failover_property_list(qd_router_t *router, qd_connection_t *conn, pn_data_t *props);
@@ -320,106 +322,49 @@ static int AMQP_writable_conn_handler(void *type_context, qd_connection_t *conn,
 }
 
 
-static qd_iterator_t *router_annotate_message(qd_router_t   *router,
-                                              qd_message_t  *msg,
-                                              qd_bitmask_t **link_exclusions,
-                                              uint32_t      *distance,
-                                              int           *ingress_index)
+static qd_iterator_t *process_router_annotations(qd_router_t   *router,
+                                                 qd_message_t  *msg,
+                                                 qd_bitmask_t **link_exclusions,
+                                                 uint32_t      *distance,
+                                                 int           *ingress_index)
 {
     qd_iterator_t *ingress_iter = 0;
     bool           edge_mode    = router->router_mode == QD_ROUTER_MODE_EDGE;
 
     *link_exclusions = 0;
     *distance        = 0;
+    *ingress_index   = 0;
 
-    qd_parsed_field_t *trace   = qd_message_get_trace(msg);
-    qd_parsed_field_t *ingress = qd_message_get_ingress(msg);
-    qd_parsed_field_t *to      = qd_message_get_to_override(msg);
-    qd_parsed_field_t *phase   = qd_message_get_phase(msg);
-
-    //
-    // QD_MA_TRACE:
-    // If there is a trace field, append this router's ID to the trace.
-    // If the router ID is already in the trace the msg has looped.
-    // This code does not check for the loop condition.
-    //
-    // Edge routers do not add their IDs to the trace list.
-    //
-    if (!edge_mode) {
-        qd_composed_field_t *trace_field = qd_compose_subfield(0);
-        qd_compose_start_list(trace_field);
-        if (trace) {
-            if (qd_parse_is_list(trace)) {
-                //
-                // Return the distance in hops that this delivery has traveled.
-                //
-                *distance = qd_parse_sub_count(trace);
-
-                //
-                // Create a link-exclusion map for the items in the trace.  This map will
-                // contain a one-bit for each link that leads to a neighbor router that
-                // the message has already passed through.
-                //
-                *link_exclusions = qd_tracemask_create(router->tracemask, trace, ingress_index);
+    if (!edge_mode) { // Edge routers do not use trace or ingress meta-data
+        qd_parsed_field_t *trace = qd_message_get_trace(msg);
+        if (trace && qd_parse_is_list(trace)) {
+            //
+            // Return the distance in hops that this delivery has traveled.
+            //
+            *distance = qd_parse_sub_count(trace);
 
-                //
-                // Append this router's ID to the trace.
-                //
-                uint32_t idx = 0;
-                qd_parsed_field_t *trace_item = qd_parse_sub_value(trace, idx);
-                while (trace_item) {
-                    qd_iterator_t *iter = qd_parse_raw(trace_item);
-                    qd_iterator_reset_view(iter, ITER_VIEW_ALL);
-                    qd_compose_insert_string_iterator(trace_field, iter);
-                    idx++;
-                    trace_item = qd_parse_sub_value(trace, idx);
-                }
-            }
+            //
+            // Create a link-exclusion map for the items in the trace.  This map will
+            // contain a one-bit for each link that leads to a neighbor router that
+            // the message has already passed through.
+            //
+            *link_exclusions = qd_tracemask_create(router->tracemask, trace, ingress_index);
         }
 
-        qd_compose_insert_string(trace_field, node_id);
-        qd_compose_end_list(trace_field);
-        qd_message_set_trace_annotation(msg, trace_field);
-    }
-
-    //
-    // QD_MA_TO:
-    // Preserve the existing value.
-    //
-    if (to) {
-        qd_composed_field_t *to_field = qd_compose_subfield(0);
-        qd_compose_insert_string_iterator(to_field, qd_parse_raw(to));
-        qd_message_set_to_override_annotation(msg, to_field);
-    }
-
-    //
-    // QD_MA_PHASE:
-    // Preserve the existing value.
-    //
-    if (phase) {
-        qd_message_set_phase_annotation(msg, qd_message_get_phase_val(msg));
-    }
-
-    //
-    // QD_MA_INGRESS:
-    // If there is no ingress field, annotate the ingress as
-    // this router else keep the original field.
-    //
-    // Edge routers do not annotate the ingress field.
-    //
-    if (!edge_mode) {
-        qd_composed_field_t *ingress_field = qd_compose_subfield(0);
+        qd_parsed_field_t *ingress = qd_message_get_ingress_router(msg);
         if (ingress && qd_parse_is_scalar(ingress)) {
             ingress_iter = qd_parse_raw(ingress);
-            qd_compose_insert_string_iterator(ingress_field, ingress_iter);
-        } else
-            qd_compose_insert_string(ingress_field, node_id);
-        qd_message_set_ingress_annotation(msg, ingress_field);
+        }
+
+    } else {
+        // Edge routers do not propagate trace or ingress
+        qd_message_disable_trace_annotation(msg);
+        qd_message_disable_ingress_router_annotation(msg);
     }
 
     //
     // Return the iterator to the ingress field _if_ it was present.
-    // If we added the ingress, return NULL.
+    // Otherwise this router is the ingress - return NULL.
     //
     return ingress_iter;
 }
@@ -717,7 +662,24 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link)
         }
     }
 
-    qd_message_message_annotations(msg);
+    const char *ma_error = qd_message_parse_annotations(msg);
+    if (ma_error) {
+        qd_log(router->log_source, QD_LOG_WARNING,
+               "[C%"PRIu64"][L%"PRIu64"] Message rejected - invalid MA section: %s",
+               conn->connection_id, qd_link_link_id(link), ma_error);
+
+        pn_condition_t *condition = pn_disposition_condition(pn_delivery_local(pnd));
+        pn_condition_set_name(condition, "amqp:invalid-field");
+        pn_condition_set_description(condition, ma_error);
+        pn_delivery_update(pnd, PN_REJECTED);
+        qd_message_set_discard(msg, true);
+        pn_link_flow(pn_link, 1);
+        if (receive_complete) {
+            pn_delivery_settle(pnd);
+            qd_message_free(msg);
+        }
+        return next_delivery;
+    }
 
     //
     // Head of line blocking avoidance (DISPATCH-1545)
@@ -754,7 +716,7 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link)
     uint32_t       distance = 0;
     int            ingress_index = 0; // Default to _this_ router
     qd_bitmask_t  *link_exclusions = 0;
-    qd_iterator_t *ingress_iter = router_annotate_message(router, msg, &link_exclusions, &distance, &ingress_index);
+    qd_iterator_t *ingress_iter = process_router_annotations(router, msg, &link_exclusions, &distance, &ingress_index);
 
     //
     // If this delivery has traveled further than the known radius of the network topology (plus 1),
@@ -803,9 +765,9 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link)
                 if (tenant_space) {
                     qd_iterator_reset_view(addr_iter, ITER_VIEW_ADDRESS_WITH_SPACE);
                     qd_iterator_annotate_space(addr_iter, tenant_space, tenant_space_length);
-                    qd_composed_field_t *to_override = qd_compose_subfield(0);
-                    qd_compose_insert_string_iterator(to_override, addr_iter);
-                    qd_message_set_to_override_annotation(msg, to_override);
+                    char *iter_str = (char *)qd_iterator_copy(addr_iter);
+                    qd_message_set_to_override_annotation(msg, iter_str);
+                    free(iter_str);
                 }
             }
         }
@@ -858,17 +820,18 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link)
         }
 
         if (term_addr) {
-            qd_composed_field_t *to_override = qd_compose_subfield(0);
             int tenant_space_length;
             const char *tenant_space = _get_tenant_space(conn, &tenant_space_length);
             if (tenant_space) {
                 qd_iterator_t *aiter = qd_iterator_string(term_addr, ITER_VIEW_ADDRESS_WITH_SPACE);
                 qd_iterator_annotate_space(aiter, tenant_space, tenant_space_length);
-                qd_compose_insert_string_iterator(to_override, aiter);
+                char *iter_str = (char *) qd_iterator_copy(aiter);
+                qd_message_set_to_override_annotation(msg, iter_str);
+                free(iter_str);
                 qd_iterator_free(aiter);
             } else
-                qd_compose_insert_string(to_override, term_addr);
-            qd_message_set_to_override_annotation(msg, to_override);
+                qd_message_set_to_override_annotation(msg, term_addr);
+
             int phase = qdr_link_phase(rlink);
             if (phase != 0)
                 qd_message_set_phase_annotation(msg, phase);
@@ -1643,16 +1606,46 @@ static qd_node_type_t router_node = {"router", 0, 0,
                                      AMQP_outbound_opened_handler,
                                      AMQP_closed_handler};
 
-qd_router_t *qd_router(qd_dispatch_t *qd, qd_router_mode_t mode, const char *area, const char *id)
-{
-    qd_container_register_node_type(qd, &router_node);
 
-    size_t dplen = 9 + strlen(area) + strlen(id);
+// not api, but needed by unit tests
+void qd_router_id_initialize(const char *area, const char *id)
+{
+    size_t dplen = 2 + strlen(area) + strlen(id);
     node_id = (char*) qd_malloc(dplen);
     strcpy(node_id, area);
     strcat(node_id, "/");
     strcat(node_id, id);
 
+    // Node ID as an AMQP encoded str value.  Used when composing trace list
+    // and ingress message annotations into the outgoing message
+
+    const uint32_t id_len = strlen(node_id);
+    const uint32_t extra = 5;  // 5 octets = max AMQP STRx header length
+    encoded_node_id = (uint8_t*) qd_malloc(id_len + extra + 1); // 1 = string terminator
+    const int hdr_len = qd_compose_str_header(encoded_node_id, id_len);
+    assert(hdr_len <= extra);
+    strcpy((char*) &encoded_node_id[hdr_len], node_id);
+    encoded_node_id_len = hdr_len + id_len;
+}
+
+
+// not api, but needed by unit tests
+void qd_router_id_finalize(void)
+{
+    free(node_id);
+    node_id = 0;
+    free(encoded_node_id);
+    encoded_node_id = 0;
+    encoded_node_id_len = 0;
+}
+
+
+qd_router_t *qd_router(qd_dispatch_t *qd, qd_router_mode_t mode, const char *area, const char *id)
+{
+    qd_container_register_node_type(qd, &router_node);
+
+    qd_router_id_initialize(area, id);
+
     qd_router_t *router = NEW(qd_router_t);
     ZERO(router);
 
@@ -2184,17 +2177,26 @@ void qd_router_free(qd_router_t *router)
     qd_router_python_free(router);
 
     free(router);
-    free(node_id);
+    qd_router_id_finalize();
     free(direct_prefix);
 }
 
 
-const char *qd_router_id(const qd_dispatch_t *qd)
+const char *qd_router_id(void)
 {
+    assert(node_id);
     return node_id;
 }
 
 
+const uint8_t *qd_router_id_encoded(size_t *len)
+{
+    assert(encoded_node_id && encoded_node_id_len);
+    *len = encoded_node_id_len;
+    return encoded_node_id;
+}
+
+
 qdr_core_t *qd_router_core(qd_dispatch_t *qd)
 {
     return qd->router->router_core;
diff --git a/tests/lsan.supp b/tests/lsan.supp
index a130fa7..39174f8 100644
--- a/tests/lsan.supp
+++ b/tests/lsan.supp
@@ -35,6 +35,10 @@ leak:^pni_init_default_logger$
 # DISPATCH-1844 - shutdown leak
 leak:sys_mutex
 
+# leak of qd_message_t (DISPATCH-1699)
+leak:^qd_message_copy$
+leak:^qd_message_set_to_override_annotation$
+
 # Ubuntu 16.04 (Xenial)
 #
 leak:_ctypes_alloc_format_string
diff --git a/tests/message_test.c b/tests/message_test.c
index 997205a..475ca51 100644
--- a/tests/message_test.c
+++ b/tests/message_test.c
@@ -32,15 +32,19 @@
 #define FLAT_BUF_SIZE (100000)
 static unsigned char buffer[FLAT_BUF_SIZE];
 
-static size_t flatten_bufs(qd_message_content_t *content)
+// given a buffer list, copy the data it contains into a single contiguous
+// buffer.
+static size_t flatten_bufs(const qd_buffer_list_t *content)
 {
     unsigned char *cursor = buffer;
-    qd_buffer_t *buf      = DEQ_HEAD(content->buffers);
+    qd_buffer_t *buf      = DEQ_HEAD((*content));
 
     while (buf) {
+        // if this asserts you need a bigger buffer!
+        assert(((size_t) (cursor - buffer)) + qd_buffer_size(buf) < FLAT_BUF_SIZE);
         memcpy(cursor, qd_buffer_base(buf), qd_buffer_size(buf));
         cursor += qd_buffer_size(buf);
-        buf = buf->next;
+        buf = DEQ_NEXT(buf);
     }
 
     return (size_t) (cursor - buffer);
@@ -49,20 +53,10 @@ static size_t flatten_bufs(qd_message_content_t *content)
 
 static void set_content(qd_message_content_t *content, unsigned char *buffer, size_t len)
 {
-    unsigned char        *cursor = buffer;
-    qd_buffer_t *buf;
+    qd_buffer_list_t blist = DEQ_EMPTY;
 
-    while (len > (size_t) (cursor - buffer)) {
-        buf = qd_buffer();
-        size_t segment   = qd_buffer_capacity(buf);
-        size_t remaining = len - (size_t) (cursor - buffer);
-        if (segment > remaining)
-            segment = remaining;
-        memcpy(qd_buffer_base(buf), cursor, segment);
-        cursor += segment;
-        qd_buffer_insert(buf, segment);
-        DEQ_INSERT_TAIL(content->buffers, buf);
-    }
+    qd_buffer_list_append(&blist, buffer, len);
+    DEQ_APPEND(content->buffers, blist);
     SET_ATOMIC_FLAG(&content->receive_complete);
 }
 
@@ -82,7 +76,23 @@ static char* test_send_to_messenger(void *context)
 {
     qd_message_t         *msg     = qd_message();
     qd_message_content_t *content = MSG_CONTENT(msg);
-    qd_message_compose_1(msg, "test_addr_0", 0);
+
+    qd_composed_field_t *header = qd_compose(QD_PERFORMATIVE_HEADER, 0);
+    qd_compose_start_list(header);
+    qd_compose_insert_bool(header, true);  // durable
+    qd_compose_end_list(header);
+
+    qd_composed_field_t *props = qd_compose(QD_PERFORMATIVE_PROPERTIES, 0);
+    qd_compose_start_list(props);
+    qd_compose_insert_null(props);          // message-id
+    qd_compose_insert_null(props);          // user-id
+    qd_compose_insert_string(props, "test_addr_0");    // to
+    qd_compose_end_list(props);
+
+    qd_message_compose_3(msg, header, props, true);
+    qd_compose_free(header);
+    qd_compose_free(props);
+
     qd_buffer_t *buf = DEQ_HEAD(content->buffers);
     if (buf == 0) {
         qd_message_free(msg);
@@ -90,7 +100,7 @@ static char* test_send_to_messenger(void *context)
     }
 
     pn_message_t *pn_msg = pn_message();
-    size_t len = flatten_bufs(content);
+    size_t len = flatten_bufs(&content->buffers);
     int result = pn_message_decode(pn_msg, (char *)buffer, len);
     if (result != 0) {
         pn_message_free(pn_msg);
@@ -98,6 +108,12 @@ static char* test_send_to_messenger(void *context)
         return "Error in pn_message_decode";
     }
 
+    if (!pn_message_is_durable(pn_msg)) {
+        pn_message_free(pn_msg);
+        qd_message_free(msg);
+        return "Durable flag not set";
+    }
+
     if (strcmp(pn_message_get_address(pn_msg), "test_addr_0") != 0) {
         pn_message_free(pn_msg);
         qd_message_free(msg);
@@ -361,115 +377,114 @@ static char* test_check_multiple(void *context)
 }
 
 
-static char* test_send_message_annotations(void *context)
+// Create a proton message containing router-specific annotations.
+// Ensure the annotations are properly parsed.
+//
+static char* test_parse_message_annotations(void *context)
 {
+    char *error = 0;
+
+    pn_message_t *pn_msg = pn_message();
+    pn_message_set_durable(pn_msg, true);
+    pn_message_set_address(pn_msg, "test_addr_0");
+    pn_data_t *pn_ma = pn_message_annotations(pn_msg);
+    pn_data_clear(pn_ma);
+    pn_data_put_map(pn_ma);
+    pn_data_enter(pn_ma);
+
+    pn_data_put_symbol(pn_ma, pn_bytes(strlen(QD_MA_INGRESS), QD_MA_INGRESS));
+    pn_data_put_string(pn_ma, pn_bytes(strlen("distress"), "distress"));
+
+    pn_data_put_symbol(pn_ma, pn_bytes(strlen(QD_MA_TRACE), QD_MA_TRACE));
+    pn_data_put_list(pn_ma);
+    pn_data_enter(pn_ma);
+    pn_data_put_string(pn_ma, pn_bytes(strlen("Node1"), "Node1"));
+    pn_data_put_string(pn_ma, pn_bytes(strlen("Node2"), "Node2"));
+    pn_data_exit(pn_ma);
+
+    pn_data_put_symbol(pn_ma, pn_bytes(strlen(QD_MA_TO), QD_MA_TO));
+    pn_data_put_string(pn_ma, pn_bytes(strlen("to/address"), "to/address"));
+
+    pn_data_put_symbol(pn_ma, pn_bytes(strlen(QD_MA_PHASE), QD_MA_PHASE));
+    pn_data_put_int(pn_ma, 9);
+
+    pn_data_put_symbol(pn_ma, pn_bytes(strlen(QD_MA_STREAM), QD_MA_STREAM));
+    pn_data_put_int(pn_ma, 1);
+    pn_data_exit(pn_ma);
+
+    // convert the proton message to a dispatch message
+
     qd_message_t         *msg     = qd_message();
     qd_message_content_t *content = MSG_CONTENT(msg);
-    char *error = 0;
+    size_t                    len = FLAT_BUF_SIZE;
 
-    qd_composed_field_t *trace = qd_compose_subfield(0);
-    qd_compose_start_list(trace);
-    qd_compose_insert_string(trace, "Node1");
-    qd_compose_insert_string(trace, "Node2");
-    qd_compose_end_list(trace);
-    qd_message_set_trace_annotation(msg, trace);
+    pn_message_encode(pn_msg, (char*) buffer, &len);
+    qd_buffer_list_t blist = DEQ_EMPTY;
+    qd_buffer_list_append(&blist, buffer, len);
+    DEQ_MOVE(blist, content->buffers);
 
-    qd_composed_field_t *to_override = qd_compose_subfield(0);
-    qd_compose_insert_string(to_override, "to/address");
-    qd_message_set_to_override_annotation(msg, to_override);
+    // now parse the sections:
+    if (qd_message_check_depth(msg, QD_DEPTH_PROPERTIES) != QD_MESSAGE_DEPTH_OK) {
+        error = "Failed to validate message";
+        goto exit;
+    }
 
-    qd_composed_field_t *ingress = qd_compose_subfield(0);
-    qd_compose_insert_string(ingress, "distress");
-    qd_message_set_ingress_annotation(msg, ingress);
+    error = (char*) qd_message_parse_annotations(msg);
+    if (error) {
+        goto exit;
+    }
 
-    qd_message_compose_1(msg, "test_addr_0", 0);
-    qd_buffer_t *buf = DEQ_HEAD(content->buffers);
-    if (buf == 0) {
-        qd_message_free(msg);
-        return "Expected a buffer in the test message";
+    // validate sections parsed correctly:
+
+    qd_parsed_field_t *pf_trace = qd_message_get_trace(msg);
+    if (!pf_trace) {
+        error = "TRACE not found!";
+        goto exit;
+    }
+    if (qd_parse_sub_count(pf_trace) != 2
+        || !qd_iterator_equal(qd_parse_raw(qd_parse_sub_value(pf_trace, 0)),
+                              (const unsigned char*) "Node1")
+        || !qd_iterator_equal(qd_parse_raw(qd_parse_sub_value(pf_trace, 1)),
+                              (const unsigned char*) "Node2")) {
+        error = "Invalid trace list";
+        goto exit;
     }
 
-    pn_message_t *pn_msg = pn_message();
-    size_t len = flatten_bufs(content);
-    int result = pn_message_decode(pn_msg, (char *)buffer, len);
-    if (result != 0) {
-        error = "Error in pn_message_decode";
+    qd_parsed_field_t *pf_to = qd_message_get_to_override(msg);
+    if (!pf_to) {
+        error = "TO override not found!";
+        goto exit;
+    }
+    if (!qd_iterator_equal(qd_parse_raw(pf_to), (const unsigned char*) "to/address")) {
+        error = "Invalid TO override!";
         goto exit;
     }
 
-    pn_data_t *ma = pn_message_annotations(pn_msg);
-    if (!ma) {
-        error = "Missing message annotations";
+    qd_parsed_field_t *pf_ingress = qd_message_get_ingress_router(msg);
+    if (!pf_ingress) {
+        error = "INGRESS not found!";
         goto exit;
     }
-    pn_data_rewind(ma);
-    pn_data_next(ma);
-    if (pn_data_type(ma) != PN_MAP) {
-        error = "Invalid message annotation type";
+    if (!qd_iterator_equal(qd_parse_raw(pf_ingress), (const unsigned char*) "distress")) {
+        error = "Invalid ingress override!";
         goto exit;
     }
-    if (pn_data_get_map(ma) != QD_MA_N_KEYS * 2) {
-        error = "Invalid map length";
+
+    if (!qd_message_is_streaming(msg)) {
+        error = "streaming flag not parsed!";
         goto exit;
     }
 
-    pn_data_enter(ma);
-    for (int i = 0; i < QD_MA_N_KEYS; i++) {
-        pn_data_next(ma);
-        if (pn_data_type(ma) != PN_SYMBOL) {
-            error = "Bad map index";
-            goto exit;
-        }
-        pn_bytes_t sym = pn_data_get_symbol(ma);
-        if (!strncmp(QD_MA_PREFIX, sym.start, sym.size)) {
-            pn_data_next(ma);
-            sym = pn_data_get_string(ma);
-        } else if (!strncmp(QD_MA_INGRESS, sym.start, sym.size)) {
-            pn_data_next(ma);
-            sym = pn_data_get_string(ma);
-            if (strncmp("distress", sym.start, sym.size)) {
-                error = "Bad ingress";
-                goto exit;
-            }
-            //fprintf(stderr, "[%.*s]\n", (int)sym.size, sym.start);
-        } else if (!strncmp(QD_MA_TO, sym.start, sym.size)) {
-            pn_data_next(ma);
-            sym = pn_data_get_string(ma);
-            if (strncmp("to/address", sym.start, sym.size)) {
-                error = "Bad to override";
-                goto exit;
-            }
-            //fprintf(stderr, "[%.*s]\n", (int)sym.size, sym.start);
-        } else if (!strncmp(QD_MA_TRACE, sym.start, sym.size)) {
-            pn_data_next(ma);
-            if (pn_data_type(ma) != PN_LIST) {
-                error = "List not found";
-                goto exit;
-            }
-            pn_data_enter(ma);
-            pn_data_next(ma);
-            sym = pn_data_get_string(ma);
-            if (strncmp("Node1", sym.start, sym.size)) {
-                error = "Bad trace entry";
-                goto exit;
-            }
-            //fprintf(stderr, "[%.*s]\n", (int)sym.size, sym.start);
-            pn_data_next(ma);
-            sym = pn_data_get_string(ma);
-            if (strncmp("Node2", sym.start, sym.size)) {
-                error = "Bad trace entry";
-                goto exit;
-            }
-            //fprintf(stderr, "[%.*s]\n", (int)sym.size, sym.start);
-            pn_data_exit(ma);
-        } else error = "Unexpected map key";
+    if (qd_message_get_phase_annotation(msg) != 9) {
+        error = "phase not parsed!";
+        goto exit;
     }
 
+
 exit:
 
     pn_message_free(pn_msg);
     qd_message_free(msg);
-
     return error;
 }
 
@@ -712,7 +727,7 @@ exit:
 // Testing protocol adapter 'stream_data' interfaces
 //
 
-static void stream_data_generate_message(qd_message_t *msg, char *s_chunk_size, char *s_n_chunks)
+static qd_message_t *stream_data_generate_message(char *s_chunk_size, char *s_n_chunks, bool flatten)
 {
     // Fill a message with n_chunks of vbin chunk_size body data.
 
@@ -720,34 +735,55 @@ static void stream_data_generate_message(qd_message_t *msg, char *s_chunk_size,
     int   n_chunks   = atoi(s_n_chunks);
 
     // Add message headers
-    qd_message_compose_1(msg, "whom-it-may-concern", 0);
 
-    // Add the chunks. This creates the test state for not-flattened buffers.
-    for (int j=0; j<n_chunks; j++) {
-        // Create 'buf2' as a linear buffer of the raw data to be sent.
-        // Buffer filled with chunk index + 1.
-        unsigned char *buf2 = (unsigned char *)malloc(chunk_size);
-        memset(buf2, j+1, chunk_size);
+    qd_composed_field_t *header = qd_compose(QD_PERFORMATIVE_HEADER, 0);
+    qd_compose_start_list(header);
+    qd_compose_insert_bool(header, 0);     // durable
+    qd_compose_insert_null(header);        // priority
+    qd_compose_end_list(header);
+
+    qd_composed_field_t *props = qd_compose(QD_PERFORMATIVE_PROPERTIES, 0);
+    qd_compose_start_list(props);
+    qd_compose_insert_null(props);          // message-id
+    qd_compose_insert_null(props);          // user-id
+    qd_compose_insert_string(props, "whom-it-may-concern");    // to
+    qd_compose_end_list(props);
 
-        // Use 'set_content' to convert raw buffer 'buf2'
-        // into a buffer list in message 'mule'.
-        qd_message_t *mule = qd_message();
-        qd_message_content_t *mule_content = MSG_CONTENT(mule);
-        set_content(mule_content, buf2, chunk_size);
+    qd_message_t *msg = qd_message();
+    qd_message_compose_3(msg, header, props, false);
+    qd_compose_free(header);
+    qd_compose_free(props);
 
-        // Extend message 'msg' with the buffer list in 'mule'
-        // and wrap the addition in a BODY_DATA performative.
-        // After this the content buffer list in 'mule' is empty and
-        // the buffers in 'field' are inserted into message 'msg'.
+    // Generate the chunks. Each chunk is wrapped in a BODY_DATA section. Each
+    // body section resides in its own buffer list.  This creates a sparse body
+    // buffer chain which will exercise buffer boundary checking.
+
+    unsigned char *buf2 = (unsigned char *)malloc(chunk_size);
+    qd_buffer_list_t body = DEQ_EMPTY;
+
+    for (int j=0; j<n_chunks; j++) {
+        qd_buffer_list_t     tmp = DEQ_EMPTY;
         qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0);
-        qd_compose_insert_binary_buffers(field, &mule_content->buffers);
-        qd_message_extend(msg, field, 0);
 
-        // Clean up temporary resources
-        free(buf2);
+        memset(buf2, j+1, chunk_size);
+        qd_compose_insert_binary(field, (const uint8_t*) buf2, chunk_size);
+        qd_compose_take_buffers(field, &tmp);
+        DEQ_APPEND(body, tmp);
         qd_compose_free(field);
-        qd_message_free(mule);
     }
+
+    if (!flatten) {
+        DEQ_APPEND(MSG_CONTENT(msg)->buffers, body);
+    } else {
+        // compact the separate body buffer chains into the smallest buffer
+        // chain possible
+        size_t flat_size = flatten_bufs(&body);
+        qd_buffer_list_append(&(MSG_CONTENT(msg)->buffers), buffer, flat_size);
+        qd_buffer_list_free_buffers(&body);
+    }
+
+    free(buf2);
+    return msg;
 }
 
 static void free_stream_data_list(qd_message_t *msg_in)
@@ -783,36 +819,15 @@ static char *check_stream_data(char *s_chunk_size, char *s_n_chunks, bool flatte
     char *result     = 0;
     int   received;     // got this much of chunk_size chunk
 
-    // Messages for setting/sensing body data
-    qd_message_t         *msg     = qd_message();
-    qd_message_t         *copy    = qd_message_copy(msg);
-    qd_message_pvt_t     *msg_pvt = (qd_message_pvt_t *)msg;
-
     // Set the original message content
-    stream_data_generate_message(msg, s_chunk_size, s_n_chunks);
-
-    // flatten if required
-    if (flatten) {
-        // check that the flatten buffer is big enough
-        assert(FLAT_BUF_SIZE > (n_chunks * (chunk_size
-                                            // per-chunk vbin descriptor overhead:
-                                            + (chunk_size > 511 ? 8 : 5))
-                                + 100));  // leave plenty of allocaton for header
-
-        // compress message into flatten buffer
-        size_t flat_size = flatten_bufs(MSG_CONTENT(msg));
 
-        // erase buffer list in msg and copy
-        qd_buffer_list_free_buffers(&msg_pvt->content->buffers);
-
-        // reconstruct buffer list from flat buffer
-        qd_buffer_list_append(&msg_pvt->content->buffers, buffer, flat_size);
-    }
+    qd_message_t *msg = stream_data_generate_message(s_chunk_size, s_n_chunks, flatten);
 
     // check the chunks
     // Define the number of raw buffers to be extracted on each loop
 #define N_PN_RAW_BUFFS (2)
 
+    qd_message_t *copy = qd_message_copy(msg);
     qd_message_stream_data_t *stream_data;
 
     for (int j=0; j<n_chunks; j++) {
@@ -970,18 +985,13 @@ static char *test_check_stream_data_append(void * context)
     }
 
     // simulate building a message as an adaptor would:
-    msg = qd_message();
-
-    qd_alloc_safe_ptr_t unblock_arg = {0};
-    unblock_arg.ptr = (void*) &unblock_called;
-
-    qd_message_set_q2_unblocked_handler(msg, q2_unblocked_handler, unblock_arg);
 
     qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_HEADER, 0);
     qd_compose_start_list(field);
     qd_compose_insert_bool(field, 0);     // durable
     qd_compose_insert_null(field);        // priority
     qd_compose_end_list(field);
+
     field = qd_compose(QD_PERFORMATIVE_PROPERTIES, field);
     qd_compose_start_list(field);
     qd_compose_insert_ulong(field, 666);    // message-id
@@ -991,8 +1001,11 @@ static char *test_check_stream_data_append(void * context)
     qd_compose_insert_string(field, "/reply-to");   // reply-to
     qd_compose_end_list(field);
 
-    qd_message_compose_2(msg, field, false);
-    qd_compose_free(field);
+    msg = qd_message_compose(field, 0, 0, false);
+
+    qd_alloc_safe_ptr_t unblock_arg = {0};
+    unblock_arg.ptr = (void*) &unblock_called;
+    qd_message_set_q2_unblocked_handler(msg, q2_unblocked_handler, unblock_arg);
 
     // snapshot the message buffer count to use as a baseline
     const size_t base_bufct = DEQ_SIZE(MSG_CONTENT(msg)->buffers);
@@ -1116,23 +1129,23 @@ static char *test_check_stream_data_fanout(void *context)
     qd_message_t *out_msg2 = 0;
 
     // simulate building a message as an adaptor would:
-    in_msg = qd_message();
-    qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_HEADER, 0);
-    qd_compose_start_list(field);
-    qd_compose_insert_bool(field, 0);     // durable
-    qd_compose_insert_null(field);        // priority
-    qd_compose_end_list(field);
-    field = qd_compose(QD_PERFORMATIVE_PROPERTIES, field);
-    qd_compose_start_list(field);
-    qd_compose_insert_ulong(field, 666);    // message-id
-    qd_compose_insert_null(field);                 // user-id
-    qd_compose_insert_string(field, "/whereevah"); // to
-    qd_compose_insert_string(field, "my-subject");  // subject
-    qd_compose_insert_string(field, "/reply-to");   // reply-to
-    qd_compose_end_list(field);
 
-    qd_message_compose_2(in_msg, field, false);
-    qd_compose_free(field);
+    qd_composed_field_t *header = qd_compose(QD_PERFORMATIVE_HEADER, 0);
+    qd_compose_start_list(header);
+    qd_compose_insert_bool(header, 0);     // durable
+    qd_compose_insert_null(header);        // priority
+    qd_compose_end_list(header);
+
+    qd_composed_field_t *props = qd_compose(QD_PERFORMATIVE_PROPERTIES, 0);
+    qd_compose_start_list(props);
+    qd_compose_insert_ulong(props, 666);    // message-id
+    qd_compose_insert_null(props);                 // user-id
+    qd_compose_insert_string(props, "/whereevah"); // to
+    qd_compose_insert_string(props, "my-subject");  // subject
+    qd_compose_insert_string(props, "/reply-to");   // reply-to
+    qd_compose_end_list(props);
+
+    in_msg = qd_message_compose(header, props, 0, false);
 
     // snapshot the message buffer count to use as a baseline
     const size_t base_bufct = DEQ_SIZE(MSG_CONTENT(in_msg)->buffers);
@@ -1140,7 +1153,7 @@ static char *test_check_stream_data_fanout(void *context)
     // construct a couple of body data sections, cheek-to-jowl in a buffer
     // chain
 #define sd_count  5
-    field = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0);
+    qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0);
     memset(buffer, '1', 99);
     qd_compose_insert_binary(field, buffer, 99);
 
@@ -1247,7 +1260,7 @@ static char *test_check_stream_data_footer(void *context)
     qd_message_t *out_msg2 = 0;
 
     // simulate building a message as an adaptor would:
-    in_msg = qd_message();
+
     qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_HEADER, 0);
     qd_compose_start_list(field);
     qd_compose_insert_bool(field, 0);     // durable
@@ -1262,8 +1275,7 @@ static char *test_check_stream_data_footer(void *context)
     qd_compose_insert_string(field, "/reply-to");   // reply-to
     qd_compose_end_list(field);
 
-    qd_message_compose_2(in_msg, field, false);
-    qd_compose_free(field);
+    in_msg = qd_message_compose(field, 0, 0, false);
 
     // snapshot the message buffer count to use as a baseline
     const size_t base_bufct = DEQ_SIZE(MSG_CONTENT(in_msg)->buffers);
@@ -1367,11 +1379,6 @@ static char *test_q2_callback_on_disable(void *context)
 
     // first test: ensure calling disable without being in Q2 does not invoke the
     // handler:
-    msg = qd_message();
-
-    qd_alloc_safe_ptr_t unblock_arg = {0};
-    unblock_arg.ptr = (void*) &unblock_called;
-    qd_message_set_q2_unblocked_handler(msg, q2_unblocked_handler, unblock_arg);
 
     qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_HEADER, 0);
     qd_compose_start_list(field);
@@ -1387,8 +1394,12 @@ static char *test_q2_callback_on_disable(void *context)
     qd_compose_insert_string(field, "/reply-to");   // reply-to
     qd_compose_end_list(field);
 
-    qd_message_compose_2(msg, field, false);
-    qd_compose_free(field);
+    msg = qd_message_compose(field, 0, 0, false);
+
+    qd_alloc_safe_ptr_t unblock_arg = {0};
+    unblock_arg.ptr = (void*) &unblock_called;
+    qd_message_set_q2_unblocked_handler(msg, q2_unblocked_handler, unblock_arg);
+
 
     qd_message_Q2_holdoff_disable(msg);
 
@@ -1401,11 +1412,6 @@ static char *test_q2_callback_on_disable(void *context)
 
     // now try it again with a message with Q2 active
 
-    msg = qd_message();
-
-    unblock_arg.ptr = (void*) &unblock_called;
-    qd_message_set_q2_unblocked_handler(msg, q2_unblocked_handler, unblock_arg);
-
     field = qd_compose(QD_PERFORMATIVE_HEADER, 0);
     qd_compose_start_list(field);
     qd_compose_insert_bool(field, 0);     // durable
@@ -1420,8 +1426,9 @@ static char *test_q2_callback_on_disable(void *context)
     qd_compose_insert_string(field, "/reply-to");   // reply-to
     qd_compose_end_list(field);
 
-    qd_message_compose_2(msg, field, false);
-    qd_compose_free(field);
+    msg = qd_message_compose(field, 0, 0, false);
+    unblock_arg.ptr = (void*) &unblock_called;
+    qd_message_set_q2_unblocked_handler(msg, q2_unblocked_handler, unblock_arg);
 
     // grow message until Q2 activates
 
@@ -1533,6 +1540,123 @@ exit:
 }
 
 
+// verify that a locally generated message containing message annotations can
+// be correctly parsed
+static char *test_local_message_compose(void * context)
+{
+    char *result = 0;
+    qd_composed_field_t *header = qd_compose(QD_PERFORMATIVE_HEADER, 0);
+    qd_compose_start_list(header);
+    qd_compose_insert_bool(header, true);  // durable
+    qd_compose_end_list(header);
+
+    qd_composed_field_t *da = qd_compose(QD_PERFORMATIVE_DELIVERY_ANNOTATIONS, 0);
+    qd_compose_start_map(da);
+    qd_compose_insert_symbol(da, "key1");
+    qd_compose_insert_string(da, "value1");
+    qd_compose_end_map(da);
+
+    qd_composed_field_t *ma = qd_compose(QD_PERFORMATIVE_MESSAGE_ANNOTATIONS, 0);
+    qd_compose_start_map(ma);
+
+    qd_compose_insert_symbol(ma, "User Key 1");
+    qd_compose_insert_string(ma, "User Value 1");
+
+    qd_compose_insert_symbol(ma, "User Key 2");
+    qd_compose_insert_string(ma, "User Value 2");
+
+    qd_compose_insert_symbol(ma, QD_MA_INGRESS);
+    qd_compose_insert_string(ma, "0/InRouter");
+
+    qd_compose_insert_symbol(ma, QD_MA_TRACE);
+    qd_compose_start_list(ma);
+    qd_compose_insert_string(ma, "1/Router");
+    qd_compose_insert_string(ma, "2/Router");
+    qd_compose_end_list(ma);
+
+    qd_compose_insert_symbol(ma, QD_MA_TO);
+    qd_compose_insert_string(ma, "address1");
+
+    qd_compose_insert_symbol(ma, QD_MA_PHASE);
+    qd_compose_insert_int(ma, 7);
+
+    qd_compose_insert_symbol(ma, QD_MA_STREAM);
+    qd_compose_insert_int(ma, 1);
+
+    qd_compose_end_map(ma);
+
+    qd_message_t *msg = qd_message_compose(header, da, ma, true);
+    qd_message_content_t *content = MSG_CONTENT(msg);
+
+    // verify that the internals of the content have been properly initialized.
+    // It should appear as if the message has arrived from proton like any
+    // other message.
+
+    if (!content->section_message_header.parsed) {
+        result = "Header section not parsed";
+        goto exit;
+    }
+
+    if (!content->section_delivery_annotation.parsed) {
+        result = "Delivery Annotation section not parsed";
+        goto exit;
+    }
+
+    if (!content->section_message_annotation.parsed || !content->ma_parsed) {
+        result = "Message Annotation section not parsed";
+        goto exit;
+    }
+
+    if (content->ma_user_count != 4) {
+        result = "failed to find user message annotations";
+        goto exit;
+    }
+
+    if (content->ma_user_annotations.remaining != 52) {
+        result = "wrong length of user annotations";
+        goto exit;
+    }
+
+    if (!content->ma_pf_ingress
+        || !qd_iterator_equal(qd_parse_raw(content->ma_pf_ingress),
+                              (const unsigned char*) "0/InRouter")) {
+        result = "ingress MA not correct";
+        goto exit;
+    }
+
+    if (!content->ma_pf_to_override
+        || !qd_iterator_equal(qd_parse_raw(content->ma_pf_to_override),
+                              (const unsigned char*) "address1")) {
+        result = "to-override MA not correct";
+        goto exit;
+    }
+
+    if (!content->ma_pf_trace
+        || qd_parse_sub_count(content->ma_pf_trace) != 2
+        || !qd_iterator_equal(qd_parse_raw(qd_parse_sub_value(content->ma_pf_trace, 0)),
+                              (const unsigned char*) "1/Router")
+        || !qd_iterator_equal(qd_parse_raw(qd_parse_sub_value(content->ma_pf_trace, 1)),
+                              (const unsigned char*) "2/Router")) {
+        result = "Invalid trace list";
+        goto exit;
+    }
+
+    if (((qd_message_pvt_t *)msg)->ma_phase != 7) {
+        result = "incorrect phase MA";
+        goto exit;
+    }
+
+    if (!((qd_message_pvt_t *)msg)->ma_streaming) {
+        result = "incorrect streaming MA";
+        goto exit;
+    }
+
+exit:
+
+    qd_message_free(msg);
+    return result;
+}
+
 int message_tests(void)
 {
     int result = 0;
@@ -1542,7 +1666,7 @@ int message_tests(void)
     TEST_CASE(test_receive_from_messenger, 0);
     TEST_CASE(test_message_properties, 0);
     TEST_CASE(test_check_multiple, 0);
-    TEST_CASE(test_send_message_annotations, 0);
+    TEST_CASE(test_parse_message_annotations, 0);
     TEST_CASE(test_q2_input_holdoff_sensing, 0);
     TEST_CASE(test_incomplete_annotations, 0);
     TEST_CASE(test_check_weird_messages, 0);
@@ -1552,6 +1676,7 @@ int message_tests(void)
     TEST_CASE(test_check_stream_data_footer, 0);
     TEST_CASE(test_q2_callback_on_disable, 0);
     TEST_CASE(test_q2_ignore_headers, 0);
+    TEST_CASE(test_local_message_compose, 0);
 
     return result;
 }
diff --git a/tests/parse_test.c b/tests/parse_test.c
index 67ecff0..ba075cd 100644
--- a/tests/parse_test.c
+++ b/tests/parse_test.c
@@ -102,12 +102,13 @@ static char *test_parser_fixed_scalars(void *context)
     qd_iterator_t *field = NULL;
     qd_parsed_field_t *parsed = NULL;
     static char error[1024];
+    qd_buffer_list_t buflist = DEQ_EMPTY;
 
     error[0] = 0;
 
     while (fs_vectors[idx].data) {
-        field = qd_iterator_binary(fs_vectors[idx].data,
-                                   fs_vectors[idx].length, ITER_VIEW_ALL);
+        qd_buffer_list_append(&buflist, (const uint8_t *)fs_vectors[idx].data, fs_vectors[idx].length);
+        field = qd_iterator_buffer(DEQ_HEAD(buflist), 0, fs_vectors[idx].length, ITER_VIEW_ALL);
         parsed = qd_parse(field);
 
         qd_iterator_t *typed_iter = qd_parse_typed(parsed);
@@ -157,10 +158,12 @@ static char *test_parser_fixed_scalars(void *context)
         field = 0;
         qd_parse_free(parsed);
         parsed = 0;
+        qd_buffer_list_free_buffers(&buflist);
     }
 
     qd_iterator_free(field);
     qd_parse_free(parsed);
+    qd_buffer_list_free_buffers(&buflist);
     return *error ? error : 0;
 }
 
@@ -220,13 +223,16 @@ static char *test_integer_conversion(void *context)
         {"\x53\x80",                             2, QD_AMQP_INT,   true, 0, 0},
         {"\x52\x80",                             2, QD_AMQP_INT,   true, 0, 0},
         {"\x50\x80",                             2, QD_AMQP_LONG,  true, 0, 0},
-        {"\x60\x80",                             2, QD_AMQP_LONG,  true, 0, 0},
+        {"\x60\x80\x00",                         3, QD_AMQP_LONG,  true, 0, 0},
         {NULL},
     };
 
+    qd_buffer_list_t buflist = DEQ_EMPTY;
     char *error = NULL;
+
     for (int i = 0; fs_vectors[i].data && !error; ++i) {
-        qd_iterator_t     *data_iter = qd_iterator_binary(fs_vectors[i].data, fs_vectors[i].length, ITER_VIEW_ALL);
+        qd_buffer_list_append(&buflist, (const uint8_t *)fs_vectors[i].data, fs_vectors[i].length);
+        qd_iterator_t *data_iter = qd_iterator_buffer(DEQ_HEAD(buflist), 0, fs_vectors[i].length, ITER_VIEW_ALL);
         qd_parsed_field_t *field = qd_parse(data_iter);
 
         if (!qd_parse_ok(field)) {
@@ -277,107 +283,181 @@ static char *test_integer_conversion(void *context)
 
         qd_iterator_free(data_iter);
         qd_parse_free(field);
+        qd_buffer_list_free_buffers(&buflist);
     }
 
+    qd_buffer_list_free_buffers(&buflist);
     return error;
 }
 
 
 static char *test_map(void *context)
 {
-    static char error[1000];
-    const char *data =
-        "\xd1\x00\x00\x00\x2d\x00\x00\x00\x06"    // map32, 6 items
+    qd_iterator_t     *val_iter = 0;
+    qd_iterator_t     *typed_iter = 0;
+    qd_iterator_t     *key_iter = 0;
+    static char error[1000] = "";
+    const uint8_t data[] =
+        "\xd1\x00\x00\x00\x43\x00\x00\x00\x08"    // map32, 8 items
         "\xa3\x05\x66irst\xa1\x0evalue_of_first"  // (23) "first":"value_of_first"
         "\xa3\x06second\x52\x20"                  // (10) "second":32
-        "\xa3\x05third\x41";                      // (8)  "third":true
-    int data_len = 50;
+        "\xa3\x05third\x41"                       // (8)  "third":true
+        "\x80\x00\x00\x00\x00\x00\x00\x80\x00"    // (9) 32768:
+        "\xb0\x00\x00\x00\x08"                    // (5+8) vbin "!+!+!+!+"
+        "\x21\x2b\x21\x2b\x21\x2b\x21\x2b";
+
+    qd_buffer_list_t buflist = DEQ_EMPTY;
+    qd_buffer_list_append(&buflist, data, sizeof(data));
 
-    qd_iterator_t     *data_iter = qd_iterator_binary(data, data_len, ITER_VIEW_ALL);
+    qd_iterator_t     *data_iter = qd_iterator_buffer(DEQ_HEAD(buflist), 0, sizeof(data), ITER_VIEW_ALL);
     qd_parsed_field_t *field     = qd_parse(data_iter);
+    qd_iterator_free(data_iter);
 
     if (!qd_parse_ok(field)) {
         snprintf(error, 1000, "Parse failed: %s", qd_parse_error(field));
-        qd_iterator_free(data_iter);
-        qd_parse_free(field);
-        return error;
+        goto exit;
     }
 
     if (!qd_parse_is_map(field)) {
-        qd_iterator_free(data_iter);
-        qd_parse_free(field);
-        return "Expected field to be a map";
+        snprintf(error, sizeof(error), "Expected field to be a map");
+        goto exit;
     }
 
     uint32_t count = qd_parse_sub_count(field);
-    if (count != 3) {
-        snprintf(error, 1000, "Expected sub-count==3, got %"PRIu32, count);
-        qd_iterator_free(data_iter);
-        qd_parse_free(field);
-        return error;
+    if (count != 4) {
+        snprintf(error, 1000, "Expected sub-count==4, got %"PRIu32, count);
+        goto exit;
     }
 
+    // Validate "first":"value_of_first"
+
     qd_parsed_field_t *key_field  = qd_parse_sub_key(field, 0);
-    qd_iterator_t     *key_iter   = qd_parse_raw(key_field);
-    qd_iterator_t     *typed_iter = qd_parse_typed(key_field);
+    key_iter   = qd_parse_raw(key_field);
+    typed_iter = qd_parse_typed(key_field);
     if (!qd_iterator_equal(key_iter, (unsigned char*) "first")) {
         unsigned char     *result   = qd_iterator_copy(key_iter);
         snprintf(error, 1000, "First key: expected 'first', got '%s'", result);
         free (result);
-        qd_iterator_free(data_iter);
-        qd_parse_free(field);
-        return error;
+        goto exit;
     }
 
     if (!qd_iterator_equal(typed_iter, (unsigned char*) "\xa3\x05\x66irst"))
         return "Incorrect typed iterator on first-key";
 
     qd_parsed_field_t *val_field = qd_parse_sub_value(field, 0);
-    qd_iterator_t     *val_iter  = qd_parse_raw(val_field);
+    val_iter  = qd_parse_raw(val_field);
     typed_iter = qd_parse_typed(val_field);
     if (!qd_iterator_equal(val_iter, (unsigned char*) "value_of_first")) {
         unsigned char     *result   = qd_iterator_copy(val_iter);
         snprintf(error, 1000, "First value: expected 'value_of_first', got '%s'", result);
         free (result);
-        return error;
+        goto exit;
     }
 
     if (!qd_iterator_equal(typed_iter, (unsigned char*) "\xa1\x0evalue_of_first"))
         return "Incorrect typed iterator on first-key";
 
+    // Validate "second:32"
+
     key_field = qd_parse_sub_key(field, 1);
     key_iter  = qd_parse_raw(key_field);
     if (!qd_iterator_equal(key_iter, (unsigned char*) "second")) {
         unsigned char     *result   = qd_iterator_copy(key_iter);
         snprintf(error, 1000, "Second key: expected 'second', got '%s'", result);
         free (result);
-        return error;
+        goto exit;
     }
 
     val_field = qd_parse_sub_value(field, 1);
     if (qd_parse_as_uint(val_field) != 32) {
         snprintf(error, 1000, "Second value: expected 32, got %"PRIu32, qd_parse_as_uint(val_field));
-        return error;
+        goto exit;
     }
 
+    // Validate "third":true
+
     key_field = qd_parse_sub_key(field, 2);
     key_iter  = qd_parse_raw(key_field);
     if (!qd_iterator_equal(key_iter, (unsigned char*) "third")) {
         unsigned char     *result   = qd_iterator_copy(key_iter);
         snprintf(error, 1000, "Third key: expected 'third', got '%s'", result);
         free (result);
-        return error;
+        goto exit;
     }
 
     val_field = qd_parse_sub_value(field, 2);
     if (!qd_parse_as_bool(val_field)) {
         snprintf(error, 1000, "Third value: expected true");
-        return error;
+        goto exit;
     }
 
-    qd_iterator_free(data_iter);
+    // Validate 32768:"!+!+!+!+"
+
+    uint8_t octet;
+    uint8_t ncopy_buf[8];
+
+    key_field = qd_parse_sub_key(field, 3);
+    typed_iter = qd_parse_typed(key_field);
+
+    octet = qd_iterator_octet(typed_iter);
+    if (octet != (uint8_t)0x80) {
+        snprintf(error, sizeof(error), "4th Key not ulong type");
+        goto exit;
+    }
+    if (qd_iterator_ncopy_octets(typed_iter, ncopy_buf, 8) != 8) {
+        snprintf(error, sizeof(error), "4th Key incorrect length");
+        goto exit;
+    }
+    if (memcmp(ncopy_buf, "\x00\x00\x00\x00\x00\x00\x80\x00", 8) != 0) {
+        snprintf(error, sizeof(error), "4th key encoding value incorrect");
+        goto exit;
+    }
+    if (qd_parse_as_ulong(key_field) != 32768) {
+        snprintf(error, sizeof(error), "4th key value not 32768");
+        goto exit;
+    }
+
+    val_field = qd_parse_sub_value(field, 3);
+    val_iter = qd_parse_raw(val_field);
+    typed_iter = qd_parse_typed(val_field);
+
+    octet = qd_iterator_octet(typed_iter);
+    if (octet != (uint8_t)0xb0) {
+        snprintf(error, sizeof(error), "4th Value not vbin32 type: 0x%X", (unsigned int)octet);
+        goto exit;
+    }
+    if (qd_iterator_ncopy_octets(typed_iter, ncopy_buf, 4) != 4) {
+        snprintf(error, sizeof(error), "4th Value incorrect length");
+        goto exit;
+    }
+    if (memcmp(ncopy_buf, "\x00\x00\x00\x08", 4) != 0) {
+        snprintf(error, sizeof(error), "4th Value encoding incorrect");
+        goto exit;
+    }
+
+    if (qd_iterator_octet(val_iter) != '!' || qd_iterator_octet(val_iter) != '+') {
+        snprintf(error, sizeof(error), "4th Value [0-1] incorrect");
+        goto exit;
+    }
+    if (qd_iterator_ncopy_octets(typed_iter, ncopy_buf, 4) != 4) {
+        snprintf(error, sizeof(error), "4th Value sub-copy failed");
+        goto exit;
+    }
+    if (memcmp(ncopy_buf, "!+!+", 4) != 0) {
+        snprintf(error, sizeof(error), "4th Value sub-copy incorrect");
+        goto exit;
+    }
+    if (qd_iterator_octet(val_iter) != '!' || qd_iterator_octet(val_iter) != '+') {
+        snprintf(error, sizeof(error), "4th Value [6-7] incorrect");
+        goto exit;
+    }
+
+
+exit:
     qd_parse_free(field);
-    return 0;
+    qd_buffer_list_free_buffers(&buflist);
+
+    return error[0] ? error : 0;
 }
 
 
@@ -402,27 +482,36 @@ struct err_vector_t {
 static char *test_parser_errors(void *context)
 {
     int idx = 0;
+    qd_buffer_list_t buflist = DEQ_EMPTY;
     static char error[1024];
 
     while (err_vectors[idx].data) {
-        qd_iterator_t *field  = qd_iterator_binary(err_vectors[idx].data,
-                                                   err_vectors[idx].length, ITER_VIEW_ALL);
+        if (err_vectors[idx].length) {
+            qd_buffer_list_append(&buflist, (const uint8_t *)err_vectors[idx].data, err_vectors[idx].length);
+        } else {
+            qd_buffer_t *tmp = qd_buffer();
+            DEQ_INSERT_HEAD(buflist, tmp);
+        }
+        qd_iterator_t *field  = qd_iterator_buffer(DEQ_HEAD(buflist), 0, err_vectors[idx].length, ITER_VIEW_ALL);
         qd_parsed_field_t *parsed = qd_parse(field);
         if (qd_parse_ok(parsed)) {
             qd_parse_free(parsed);
             qd_iterator_free(field);
             sprintf(error, "(%d) Unexpected Parse Success", idx);
+            qd_buffer_list_free_buffers(&buflist);
             return error;
         }
         if (strcmp(qd_parse_error(parsed), err_vectors[idx].expected_error) != 0) {
-            qd_parse_free(parsed);
-            qd_iterator_free(field);
             sprintf(error, "(%d) Error: Expected %s, Got %s", idx,
                     err_vectors[idx].expected_error, qd_parse_error(parsed));
+            qd_parse_free(parsed);
+            qd_iterator_free(field);
+            qd_buffer_list_free_buffers(&buflist);
             return error;
         }
         qd_parse_free(parsed);
         qd_iterator_free(field);
+        qd_buffer_list_free_buffers(&buflist);
         idx++;
     }
 
diff --git a/tests/run_unit_tests_size.c b/tests/run_unit_tests_size.c
index 8d37146..de6f84e 100644
--- a/tests/run_unit_tests_size.c
+++ b/tests/run_unit_tests_size.c
@@ -20,19 +20,129 @@
 #include "qpid/dispatch/alloc.h"
 #include "qpid/dispatch/buffer.h"
 #include "qpid/dispatch/iterator.h"
+#include "qpid/dispatch/router.h"
 
 void qd_log_initialize(void);
 void qd_log_finalize(void);
 void qd_error_initialize();
+void qd_router_id_initialize(const char *, const char *);
+void qd_router_id_finalize(void);
+
 
 int message_tests();
 int field_tests();
 int parse_tests();
 int buffer_tests();
 
+// validate router id constructor/encoder
+//
+static int router_id_tests(void)
+{
+    int result = 0;
+    const char *id;
+    const uint8_t *encoded_id;
+    size_t len = 0;
+    char boundary_id[257];
+
+    qd_router_id_initialize("0", "shortId");
+    id = qd_router_id();
+    if (strcmp(id, "0/shortId") != 0) {
+        fprintf(stderr, "Invalid shortId (%s)\n", id);
+        result = 1;
+        goto exit;
+    }
+
+    encoded_id = qd_router_id_encoded(&len);
+    if (len != strlen(id) + 2) {
+        fprintf(stderr, "shortId encode failed - bad len\n");
+        result = 1;
+        goto exit;
+    }
+    if (encoded_id[0] != QD_AMQP_STR8_UTF8
+        || encoded_id[1] != strlen(id)
+        || memcmp(&encoded_id[2], id, strlen(id)) != 0) {
+
+        fprintf(stderr, "shortId encode failed - bad format\n");
+        result = 1;
+        goto exit;
+    }
+
+    qd_router_id_finalize();
+
+    //
+    // this ID will be exactly 255 chars long (STR8).
+    //
+
+    memset(boundary_id, 'B', 253);
+    boundary_id[253] = 0;
+
+    qd_router_id_initialize("0", boundary_id);
+    id = qd_router_id();
+    assert(strlen(id) == 255);
+    if (strncmp(id, "0/", 2) != 0 || strcmp(&id[2], boundary_id) != 0) {
+        fprintf(stderr, "Invalid boundary 255 id (%s)\n", id);
+        result = 1;
+        goto exit;
+    }
+
+    encoded_id = qd_router_id_encoded(&len);
+    if (len != strlen(id) + 2) {
+        fprintf(stderr, "bounary 255 encode failed - bad len\n");
+        result = 1;
+        goto exit;
+    }
+    if (encoded_id[0] != QD_AMQP_STR8_UTF8
+        || encoded_id[1] != strlen(id)
+        || memcmp(&encoded_id[2], id, strlen(id)) != 0) {
+
+        fprintf(stderr, "boundary encode failed - bad format\n");
+        result = 1;
+        goto exit;
+    }
+
+    qd_router_id_finalize();
+
+    //
+    // this ID will be exactly 256 chars long (STR32).
+    //
+
+    memset(boundary_id, 'B', 254);
+    boundary_id[255] = 0;
+
+    qd_router_id_initialize("0", boundary_id);
+    id = qd_router_id();
+    assert(strlen(id) == 256);
+    if (strncmp(id, "0/", 2) != 0 || strcmp(&id[2], boundary_id) != 0) {
+        fprintf(stderr, "Invalid boundary 256 id (%s)\n", id);
+        result = 1;
+        goto exit;
+    }
+
+    encoded_id = qd_router_id_encoded(&len);
+    if (len != strlen(id) + 5) {
+        fprintf(stderr, "bounary 256 encode failed - bad len\n");
+        result = 1;
+        goto exit;
+    }
+    if (encoded_id[0] != QD_AMQP_STR32_UTF8
+        || qd_parse_uint32_decode(&encoded_id[1]) != strlen(id)
+        || memcmp(&encoded_id[5], id, strlen(id)) != 0) {
+
+        fprintf(stderr, "boundary 256 encode failed - bad format\n");
+        result = 1;
+        goto exit;
+    }
+
+
+exit:
+    qd_router_id_finalize();
+    return result;
+}
+
 int main(int argc, char** argv)
 {
     size_t buffer_size = 512;
+    int result = 0;
 
     if (argc > 1) {
         buffer_size = atoi(argv[1]);
@@ -45,7 +155,9 @@ int main(int argc, char** argv)
     qd_error_initialize();
     qd_buffer_set_size(buffer_size);
 
-    int result = 0;
+    result += router_id_tests();
+    qd_router_id_initialize("0", "UnitTestRouter");
+
     result += message_tests();
     result += field_tests();
     result += parse_tests();
@@ -54,6 +166,8 @@ int main(int argc, char** argv)
     qd_log_finalize();
     qd_alloc_finalize();
     qd_iterator_finalize();
+    qd_router_id_finalize();
+
     return result;
 }
 
diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py
index d6367f2..299e1ed 100644
--- a/tests/system_tests_one_router.py
+++ b/tests/system_tests_one_router.py
@@ -421,7 +421,14 @@ class OneRouterTest(TestCase):
     def test_09_message_annotations(self) :
         addr = self.address + '/closest/' + str(OneRouterTest.closest_count)
         OneRouterTest.closest_count += 1
-        test = MessageAnnotations(addr, n_messages=10)
+        test = MessageAnnotations(addr)
+        test.run()
+        self.assertIsNone(test.error)
+
+    def test_09_1_bad_message_annotations(self) :
+        addr = self.address + '/closest/' + str(OneRouterTest.closest_count)
+        OneRouterTest.closest_count += 1
+        test = BadMessageAnnotations(addr)
         test.run()
         self.assertIsNone(test.error)
 
@@ -2032,15 +2039,10 @@ class ThreeAck (MessagingHandler) :
     # See PROTON-395 .
 
 
-class MessageAnnotations (MessagingHandler) :
-    def __init__(self,
-                 addr,
-                 n_messages
-                 ) :
-        super(MessageAnnotations, self) . __init__(prefetch=n_messages)
-        self.addr        = addr
-        self.n_messages  = n_messages
-
+class MessageAnnotations(MessagingHandler):
+    def __init__(self, addr) :
+        super(MessageAnnotations, self).__init__()
+        self.addr       = addr
         self.test_timer = None
         self.sender     = None
         self.receiver   = None
@@ -2051,9 +2053,11 @@ class MessageAnnotations (MessagingHandler) :
     def run(self) :
         Container(self).run()
 
-    def bail(self, travail) :
+    def bail(self, travail):
         self.bailing = True
         self.error = travail
+        if self.error:
+            print("message annotations failure: %s" % self.error, flush=True)
         self.send_conn.close()
         self.recv_conn.close()
         self.test_timer.cancel()
@@ -2070,75 +2074,172 @@ class MessageAnnotations (MessagingHandler) :
         self.test_timer  = event.reactor.schedule(TIMEOUT, TestTimeout(self))
 
     def on_sendable(self, event) :
-
-        if event.sender.credit < 1 :
+        if self.n_sent == 5:
             return
+
         # No added annotations.
-        msg = Message(body=self.n_sent)
+        msg = Message(body="Message [0]")
         self.n_sent += 1
         self.sender.send(msg)
 
         # Add an annotation.
-        msg = Message(body=self.n_sent)
+        msg = Message(body="Message [1]")
+        self.n_sent += 1
+        msg.annotations = {'x-opt-qd.ingress':
+                           'i_changed_the_annotation'}
+        self.sender.send(msg)
+
+        # Supply a trace list.
+        msg = Message(body="Message [2]")
         self.n_sent += 1
-        msg.annotations = {'x-opt-qd.ingress': 'i_changed_the_annotation'}
+        msg.annotations = {'x-opt-qd.trace': ['0/first-hop']}
         self.sender.send(msg)
 
-        # Try to supply an invalid type for trace.
-        msg = Message(body=self.n_sent)
+        # ensure obsolete MA are not propagated
+        msg = Message(body="Message [3]")
         self.n_sent += 1
-        msg.annotations = {'x-opt-qd.trace' : 45}
+        msg.annotations = {'userA': 'A',
+                           'userB': 'B',
+                           'x-opt-qd.class': 9,
+                           'x-opt-qd.': 'X'}
         self.sender.send(msg)
 
-        # Add a value to the trace list.
-        msg = Message(body=self.n_sent)
+        # supply a phase and stream
+        msg = Message(body="Message [4]")
         self.n_sent += 1
-        msg.annotations = {'x-opt-qd.trace' : ['0/first-hop']}
+        msg.annotations = {'x-opt-qd.phase': 7,
+                           'x-opt-qd.stream': 1}
         self.sender.send(msg)
 
     def on_message(self, event) :
         ingress_router_name = '0/QDR'
         self.n_received += 1
-        if self.n_received >= self.n_messages :
-            self.bail(None)
-            return
 
         annotations = event.message.annotations
+        body = event.message.body
 
-        if self.n_received == 1 :
-            if annotations['x-opt-qd.ingress'] != ingress_router_name :
-                self.bail('Bad ingress router name on msg %d' % self.n_received)
+        if body == "Message[0]":
+            ingress = annotations.get('x-opt-qd.ingress')
+            if ingress != ingress_router_name:
+                self.bail('Msg[0] bad ingress router value: %s' % annotations)
                 return
-            if annotations['x-opt-qd.trace'] != [ingress_router_name] :
-                self.bail('Bad trace on msg %d.' % self.n_received)
+            trace = annotations.get('x-opt-qd.trace')
+            if trace != [ingress_router_name]:
+                self.bail('Msg[0] bad ingress trace value: %s' % annotations)
                 return
 
-        elif self.n_received == 2 :
-            if annotations['x-opt-qd.ingress'] != 'i_changed_the_annotation' :
-                self.bail('Bad ingress router name on msg %d' % self.n_received)
+        elif body == "Message[1]":
+            ingress = annotations.get('x-opt-qd.ingress')
+            if ingress != 'i_changed_the_annotation':
+                self.bail('Msg[1] bad ingress router value: %s' % annotations)
                 return
-            if annotations['x-opt-qd.trace'] != [ingress_router_name] :
-                self.bail('Bad trace on msg %d .' % self.n_received)
+
+        elif body == "Message[2]":
+            trace = annotations.get('x-opt-qd.trace')
+            if trace != ['0/first-hop', ingress_router_name]:
+                self.bail('Msg[2] bad ingress trace value: %s' % annotations)
                 return
 
-        elif self.n_received == 3 :
-            # The invalid type for trace has no effect.
-            if annotations['x-opt-qd.ingress'] != ingress_router_name :
-                self.bail('Bad ingress router name on msg %d ' % self.n_received)
+        elif body == "Message[3]":
+            if 'x-opt-qd.class' in annotations:
+                self.bail('Msg[3] unexpected class value: %s' % annotations)
+                return
+            if 'x-opt-qd.' in annotations:
+                self.bail('Msg[3] unexpected null value: %s' % annotations)
                 return
-            if annotations['x-opt-qd.trace'] != [ingress_router_name] :
-                self.bail('Bad trace on msg %d' % self.n_received)
+            if (annotations.get('userA') != 'A' or annotations.get('userB') != 'B'):
+                self.bail('Msg[3] unexpected user values: %s' % annotations)
                 return
 
-        elif self.n_received == 4 :
-            if annotations['x-opt-qd.ingress'] != ingress_router_name :
-                self.bail('Bad ingress router name on msg %d ' % self.n_received)
+        elif body == "Message[4]":
+            if annotations.get('phase') != 7:
+                self.bail('Msg[4] unexpected phase: %s' % annotations)
                 return
-            # The sender prepended a value to the trace list.
-            if annotations['x-opt-qd.trace'] != ['0/first-hop', ingress_router_name] :
-                self.bail('Bad trace on msg %d' % self.n_received)
+            if annotations.get('stream') != 2:
+                self.bail('Msg[4] unexpected streaming: %s' % annotations)
                 return
-            # success
+
+        if self.n_received == 5:
+            self.bail(None)
+
+
+class BadMessageAnnotations(MessagingHandler):
+    """
+    Ensure the router can handle incorrectly formatted message annotations
+    """
+    def __init__(self, addr) :
+        super(BadMessageAnnotations, self).__init__(auto_accept=False)
+        self.addr       = addr
+        self.test_timer = None
+        self.sender     = None
+        self.receiver   = None
+        self.n_sent     = 0
+        self.n_received = 0
+        self.bailing    = False
+        self.messages = [
+            Message(body="Bad Message 1",
+                    annotations={'x-opt-qd.to': 3.145}),
+            Message(body="Bad Message 2",
+                    annotations={'x-opt-qd.trace': 'stringy'}),
+            Message(body="Bad Message 3",
+                    annotations={'x-opt-qd.trace': ['0/rA', '0/rB', 12, '0/rC']}),
+            Message(body="Bad Message 4",
+                    annotations={'x-opt-qd.ingress': ['0/rA']}),
+            Message(body="Good Message",
+                    annotations={'key': ['value']})]
+
+    def run(self) :
+        Container(self).run()
+
+    def bail(self, travail):
+        self.bailing = True
+        self.error = travail
+        if self.error:
+            print("test failure: %s" % self.error, flush=True)
+        self.send_conn.close()
+        self.recv_conn.close()
+        self.test_timer.cancel()
+
+    def timeout(self):
+        self.bail("Timeout Expired")
+
+    def on_start(self, event):
+        self.send_conn = event.container.connect(self.addr)
+        self.recv_conn = event.container.connect(self.addr)
+
+        self.sender      = event.container.create_sender(self.send_conn, self.addr)
+        self.receiver    = event.container.create_receiver(self.recv_conn, self.addr)
+        self.test_timer  = event.reactor.schedule(TIMEOUT, TestTimeout(self))
+
+    def on_sendable(self, event) :
+        if self.n_sent == len(self.messages):
+            return
+
+        self.sender.send(self.messages[self.n_sent])
+        self.n_sent += 1
+
+    def on_message(self, event) :
+        self.n_received += 1
+        if event.message.body != "Good Message":
+            self.bail("Got a bad message: %s" % event.message)
+            return
+        if event.message.annotations.get('key') != ['value']:
+            self.bail("Got unexpected user annotations: %s" % event.message.annotations)
+            return
+        event.delivery.update(Delivery.ACCEPTED)
+        event.delivery.settle()
+
+        if self.n_received == len(self.messages):
+            self.bail(None)
+
+    def on_rejected(self, event):
+        self.n_received += 1
+        rc = event.delivery.remote.condition
+        if rc.name != "amqp:invalid-field":
+            self.bail("Unexpected rejection: %s:%s" % (rc.name,
+                                                       rc.description))
+            return
+        if self.n_received == len(self.messages):
             self.bail(None)
 
 
diff --git a/tests/system_tests_two_routers.py b/tests/system_tests_two_routers.py
index a48f1b7..a8e027e 100644
--- a/tests/system_tests_two_routers.py
+++ b/tests/system_tests_two_routers.py
@@ -1154,11 +1154,15 @@ class MessageAnnotationsStripBothAddIngressTrace(MessagingHandler):
         if event.sender == self.sender:
             if self.msg_not_sent:
                 msg = Message(body={'number': 0})
+                ingress_delivery_annotations = {'x-opt-qd.trace': 999,
+                                                'Hello': 'there'}
                 ingress_message_annotations = {'work': 'hard',
                                                'x-opt-qd': 'humble',
                                                'x-opt-qd.ingress': 'ingress-router',
                                                'x-opt-qd.trace': ['0/QDR.A']}
                 msg.annotations = ingress_message_annotations
+                msg.instructions = {'x-opt-qd.trace': 999,
+                                    'Hello': 'there'}
                 event.sender.send(msg)
                 self.msg_not_sent = False
 
@@ -1166,7 +1170,11 @@ class MessageAnnotationsStripBothAddIngressTrace(MessagingHandler):
         if self.receiver == event.receiver:
             if 0 == event.message.body['number']:
                 if event.message.annotations == {'work': 'hard', 'x-opt-qd': 'humble'}:
-                    self.error = None
+                    if event.message.instructions == {'x-opt-qd.trace': 999,
+                                                      'Hello': 'there'}:
+                        self.error = None
+                    else:
+                        self.error = "invalid delivery annos: %s" % event.message.instructions
             self.timer.cancel()
             self.conn1.close()
             self.conn2.close()

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org