You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ch...@apache.org on 2017/07/07 20:39:43 UTC

[06/14] qpid-dispatch git commit: DISPATCH-760: Message handling uses new scheme

DISPATCH-760: Message handling uses new scheme


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/21014c6e
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/21014c6e
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/21014c6e

Branch: refs/heads/master
Commit: 21014c6e9f8dd2ea05551f341487a9d486c074cd
Parents: b67b201
Author: Chuck Rolke <cr...@redhat.com>
Authored: Sat Jul 1 15:01:56 2017 -0400
Committer: Chuck Rolke <cr...@redhat.com>
Committed: Fri Jul 7 10:38:33 2017 -0400

----------------------------------------------------------------------
 include/qpid/dispatch/message.h |  46 +++++-
 src/message.c                   | 299 +++++++++++++++++++++++++++--------
 src/message_private.h           |  16 +-
 src/router_node.c               |  70 ++++----
 4 files changed, 313 insertions(+), 118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/21014c6e/include/qpid/dispatch/message.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h
index 523e150..d3f2bb4 100644
--- a/include/qpid/dispatch/message.h
+++ b/include/qpid/dispatch/message.h
@@ -133,16 +133,14 @@ void qd_message_free(qd_message_t *msg);
 qd_message_t *qd_message_copy(qd_message_t *msg);
 
 /**
- * Retrieve the message annotations from a message.
+ * Retrieve the message annotations from a message and place them in message storage.
  *
  * IMPORTANT: The pointer returned by this function remains owned by the message.
  *            The caller MUST NOT free the parsed field.
  *
  * @param msg Pointer to a received message.
- * @return Pointer to the parsed field for the message annotations.  If the message doesn't
- *         have message annotations, the return value shall be NULL.
  */
-qd_parsed_field_t *qd_message_message_annotations(qd_message_t *msg);
+void qd_message_message_annotations(qd_message_t *msg);
 
 /**
  * Set the value for the QD_MA_TRACE field in the outgoing message annotations
@@ -258,6 +256,46 @@ int qd_message_repr_len();
 
 qd_log_source_t* qd_message_log_source();
 
+/**
+ * Accessor for message field ingress
+ * 
+ * @param msg A pointer to the message
+ * @return the parsed field
+ */
+qd_parsed_field_t *qd_message_get_ingress    (qd_message_t *msg);
+
+/**
+ * Accessor for message field phase
+ * 
+ * @param msg A pointer to the message
+ * @return the parsed field
+ */
+qd_parsed_field_t *qd_message_get_phase      (qd_message_t *msg);
+
+/**
+ * Accessor for message field to_override
+ * 
+ * @param msg A pointer to the message
+ * @return the parsed field
+ */
+qd_parsed_field_t *qd_message_get_to_override(qd_message_t *msg);
+
+/**
+ * Accessor for message field trace
+ * 
+ * @param msg A pointer to the message
+ * @return the parsed field
+ */
+qd_parsed_field_t *qd_message_get_trace      (qd_message_t *msg);
+
+/**
+ * Accessor for message field phase
+ * 
+ * @param msg A pointer to the message
+ * @return the phase as an integer
+ */
+int                qd_message_get_phase_val  (qd_message_t *msg);
+
 ///@}
 
 #endif

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/21014c6e/src/message.c
----------------------------------------------------------------------
diff --git a/src/message.c b/src/message.c
index 706458d..3e78930 100644
--- a/src/message.c
+++ b/src/message.c
@@ -23,6 +23,7 @@
 #include <qpid/dispatch/threading.h>
 #include <qpid/dispatch/iterator.h>
 #include <qpid/dispatch/log.h>
+#include <qpid/dispatch/buffer.h>
 #include <proton/object.h>
 #include "message_private.h"
 #include "compose_private.h"
@@ -33,6 +34,7 @@
 #include <limits.h>
 #include <time.h>
 #include <inttypes.h>
+#include <assert.h>
 
 const char *STR_AMQP_NULL = "null";
 const char *STR_AMQP_TRUE = "T";
@@ -849,7 +851,6 @@ qd_message_t *qd_message()
     msg->content->lock = sys_mutex();
     sys_atomic_init(&msg->content->ref_count, 1);
     msg->content->parse_depth = QD_DEPTH_NONE;
-    msg->content->parsed_message_annotations = 0;
 
     return (qd_message_t*) msg;
 }
@@ -870,8 +871,16 @@ void qd_message_free(qd_message_t *in_msg)
     rc = sys_atomic_dec(&content->ref_count) - 1;
 
     if (rc == 0) {
-        if (content->parsed_message_annotations)
-            qd_parse_free(content->parsed_message_annotations);
+        if (content->ma_field_iter_in)
+            qd_iterator_free(content->ma_field_iter_in);
+        if (content->ma_pf_ingress)
+            qd_parse_free(content->ma_pf_ingress);
+        if (content->ma_pf_phase)
+            qd_parse_free(content->ma_pf_phase);
+        if (content->ma_pf_to_override)
+            qd_parse_free(content->ma_pf_to_override);
+        if (content->ma_pf_trace)
+            qd_parse_free(content->ma_pf_trace);
 
         qd_buffer_t *buf = DEQ_HEAD(content->buffers);
         while (buf) {
@@ -902,6 +911,7 @@ qd_message_t *qd_message_copy(qd_message_t *in_msg)
     qd_buffer_list_clone(&copy->ma_trace, &msg->ma_trace);
     qd_buffer_list_clone(&copy->ma_ingress, &msg->ma_ingress);
     copy->ma_phase = msg->ma_phase;
+    copy->is_interrouter = msg->is_interrouter;
 
     copy->content = content;
 
@@ -912,30 +922,49 @@ qd_message_t *qd_message_copy(qd_message_t *in_msg)
     return (qd_message_t*) copy;
 }
 
-qd_parsed_field_t *qd_message_message_annotations(qd_message_t *in_msg)
+void qd_message_message_annotations(qd_message_t *in_msg)
 {
     qd_message_pvt_t     *msg     = (qd_message_pvt_t*) in_msg;
     qd_message_content_t *content = msg->content;
 
-    if (content->parsed_message_annotations)
-        return content->parsed_message_annotations;
+    if (content->ma_parsed)
+        return ;
+    content->ma_parsed = true;
 
-    qd_iterator_t *ma = qd_message_field_iterator(in_msg, QD_FIELD_MESSAGE_ANNOTATION);
-    if (ma == 0)
-        return 0;
+    content->ma_field_iter_in = qd_message_field_iterator(in_msg, QD_FIELD_MESSAGE_ANNOTATION);
+    if (content->ma_field_iter_in == 0)
+        return;
 
-    content->parsed_message_annotations = qd_parse(ma);
-    if (content->parsed_message_annotations == 0 ||
-        !qd_parse_ok(content->parsed_message_annotations) ||
-        !qd_parse_is_map(content->parsed_message_annotations)) {
-        qd_iterator_free(ma);
-        qd_parse_free(content->parsed_message_annotations);
-        content->parsed_message_annotations = 0;
-        return 0;
+    qd_parse_annotations(
+        msg->is_interrouter,
+        content->ma_field_iter_in,
+        &content->ma_pf_ingress,
+        &content->ma_pf_phase,
+        &content->ma_pf_to_override,
+        &content->ma_pf_trace,
+        &content->ma_user_annotation_blob,
+        &content->ma_count);
+
+    // Construct pseudo-field location of user annotations blob
+    // This holds all annotations if no router-specific annotations are present
+    if (content->ma_count > 0) {
+        qd_field_location_t   *cf  = &content->field_user_annotations;
+        qd_iterator_pointer_t *uab = &content->ma_user_annotation_blob;
+        cf->buffer = uab->buffer;
+        cf->offset = uab->cursor - qd_buffer_base(uab->buffer);
+        cf->length = uab->remaining;
+        cf->parsed = true;
+        if (content->ma_count > 4) {
+            //fprintf(stdout, "V2_DEV set ma_count to %d, len=%d\n", content->ma_count, (int)cf->length);
+        }
+    }
+
+    // extract phase
+    if (content->ma_pf_phase) {
+        content->ma_int_phase = qd_parse_as_int(content->ma_pf_phase);
     }
 
-    qd_iterator_free(ma);
-    return content->parsed_message_annotations;
+    return;
 }
 
 
@@ -991,6 +1020,9 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery)
     //
     if (!msg) {
         msg = (qd_message_pvt_t*) qd_message();
+        qd_link_t       *qdl = (qd_link_t *)pn_link_get_context(link);
+        qd_connection_t *qdc = qd_link_connection(qdl);
+        msg->is_interrouter = qd_connection_is_interrouter(qdc);
         pn_record_def(record, PN_DELIVERY_CTX, PN_WEAKREF);
         pn_record_set(record, PN_DELIVERY_CTX, (void*) msg);
     }
@@ -1070,40 +1102,78 @@ static void send_handler(void *context, const unsigned char *start, int length)
 }
 
 
-// create a buffer chain holding the outgoing message annotations section
-static void compose_message_annotations(qd_message_pvt_t *msg, qd_buffer_list_t *out, bool strip_annotations)
+// create_message_annotations
+//
+// Create a buffer chain holding the outgoing message annotations section.
+// Three types of outbound link are available:
+//  V1 - 0.8.x routers that implement the original annotation scheme
+//  V2 - 1.0.x routers that implement the new annotation scheme
+//  V0 - End system clients that do not get any router annotations
+//
+// These routines do not strictly create the buffer chain in a ready-to-go form.
+// Rather hints are left to transmit the annotations in three sections:
+//   1. The optional map header returned in 'out'. This may hold:
+//     a. empty if no user annotations and no router annotations
+//     b. A map header if just user annotations
+//     In case b the map header relies on the transmit routine to
+//     send the opaque blob of user annotations.
+//   2. The optional blob of user annotations. The buffers for this section
+//      are not copied into any buffer chain. Instead they are consumed
+//      directly from the incoming message buffer chain.
+//      These buffers are discovered in qd_message_message_annotations
+//      and located using msg.content->ma_user_annotation_blob.
+//   3. The optional V1 annotation key-value pairs.
+//      These are identified by the out_trailer buffer chain.
+//
+// Function qd_message_send must synthesize the annotations by sending
+// buffer chain 'out', the user's blob, and buffer chain 'out_trailer' as required.
+// The map in chain 'out' has been created with a gimmick that adjusts the size
+// of the map in the outbound message even though the the data is not present in
+// buffer chain 'out'.
+static void compose_message_annotations_v0(qd_message_pvt_t *msg, qd_buffer_list_t *out)
 {
     qd_composed_field_t *out_ma = qd_compose(QD_PERFORMATIVE_MESSAGE_ANNOTATIONS, 0);
 
     bool map_started = false;
 
-    //We will have to add the custom annotations
-    qd_parsed_field_t *in_ma = qd_parse_dup(msg->content->parsed_message_annotations);
-    if (in_ma) {
-        uint32_t count = qd_parse_sub_count(in_ma);
+    if (msg->content->ma_count > 0) {
+        // insert the incoming message remaining elements
+        if (!map_started) {
+            qd_compose_start_map(out_ma);
+            map_started = true;
+        }
 
-        for (uint32_t idx = 0; idx < count; idx++) {
-            qd_parsed_field_t *sub_key  = qd_parse_sub_key(in_ma, idx);
-            if (!sub_key)
-                continue;
+        // Bump the map size and count to reflect user's blob.
+        // Note that the blob is not inserted here. This code adjusts the
+        // size/count of the map that is under construction and the content
+        // is inserted by router-node
+        qd_compose_insert_opaque_elements(out_ma, msg->content->ma_count,
+                                          msg->content->field_user_annotations.length);
+    }
 
-            qd_iterator_t *iter = qd_parse_raw(sub_key);
+    if (map_started) {
+        qd_compose_end_map(out_ma);
+        qd_compose_take_buffers(out_ma, out);
+    }
 
-            if (!qd_iterator_prefix(iter, QD_MA_PREFIX)) {
-                if (!map_started) {
-                    qd_compose_start_map(out_ma);
-                    map_started = true;
-                }
-                qd_parsed_field_t *sub_value = qd_parse_sub_value(in_ma, idx);
-                qd_compose_insert_typed_iterator(out_ma, qd_parse_typed(sub_key));
-                qd_compose_insert_typed_iterator(out_ma, qd_parse_typed(sub_value));
-            }
-        }
+    qd_compose_free(out_ma);
+}
 
-        qd_parse_free(in_ma);
-    }
 
-    //Add the dispatch router specific annotations only if strip_annotations is false.
+static void compose_message_annotations_v1(qd_message_pvt_t *msg, qd_buffer_list_t *out,
+                                           qd_buffer_list_t *out_trailer, bool strip_annotations)
+{
+    qd_composed_field_t *out_ma = qd_compose(QD_PERFORMATIVE_MESSAGE_ANNOTATIONS, 0);
+
+    bool map_started = false;
+
+    // v1 annotations go into the out_trailer
+    int field_count = 0;
+    qd_composed_field_t *field = qd_compose_subfield(0);
+    if (!field)
+        return;
+
+    // If not stripping then add dispatch router specific annotations
     if (!strip_annotations) {
         if (!DEQ_IS_EMPTY(msg->ma_to_override) ||
             !DEQ_IS_EMPTY(msg->ma_trace) ||
@@ -1116,35 +1186,80 @@ static void compose_message_annotations(qd_message_pvt_t *msg, qd_buffer_list_t
             }
 
             if (!DEQ_IS_EMPTY(msg->ma_to_override)) {
-                qd_compose_insert_symbol(out_ma, QD_MA_TO);
-                qd_compose_insert_buffers(out_ma, &msg->ma_to_override);
+                qd_compose_insert_symbol(field, QD_MA_TO);
+                qd_compose_insert_buffers(field, &msg->ma_to_override);
+                field_count++;
             }
 
             if (!DEQ_IS_EMPTY(msg->ma_trace)) {
-                qd_compose_insert_symbol(out_ma, QD_MA_TRACE);
-                qd_compose_insert_buffers(out_ma, &msg->ma_trace);
+                qd_compose_insert_symbol(field, QD_MA_TRACE);
+                qd_compose_insert_buffers(field, &msg->ma_trace);
+                field_count++;
             }
 
             if (!DEQ_IS_EMPTY(msg->ma_ingress)) {
-                qd_compose_insert_symbol(out_ma, QD_MA_INGRESS);
-                qd_compose_insert_buffers(out_ma, &msg->ma_ingress);
+                qd_compose_insert_symbol(field, QD_MA_INGRESS);
+                qd_compose_insert_buffers(field, &msg->ma_ingress);
+                field_count++;
             }
 
             if (msg->ma_phase != 0) {
-                qd_compose_insert_symbol(out_ma, QD_MA_PHASE);
-                qd_compose_insert_int(out_ma, msg->ma_phase);
+                qd_compose_insert_symbol(field, QD_MA_PHASE);
+                qd_compose_insert_int(field, msg->ma_phase);
+                field_count++;
             }
         }
     }
 
+    if (msg->content->ma_count > 0) {
+        // insert the incoming message user blob
+        if (!map_started) {
+            qd_compose_start_map(out_ma);
+            map_started = true;
+        }
+
+        // Bump the map size and count to reflect user's blob.
+        // Note that the blob is not inserted here. This code adjusts the
+        // size/count of the map that is under construction and the content
+        // is inserted by router-node
+        qd_compose_insert_opaque_elements(out_ma, msg->content->ma_count,
+                                          msg->content->field_user_annotations.length);
+    }
+
+    if (field_count > 0) {
+        if (!map_started) {
+            qd_compose_start_map(out_ma);
+            map_started = true;
+        }
+        qd_compose_insert_opaque_elements(out_ma, field_count * 2,
+                                          qd_buffer_list_length(&field->buffers));
+
+    }
+
     if (map_started) {
         qd_compose_end_map(out_ma);
         qd_compose_take_buffers(out_ma, out);
+        qd_compose_take_buffers(field, out_trailer);
     }
 
     qd_compose_free(out_ma);
+    qd_compose_free(field);
+}
+
+
+// create a buffer chain holding the outgoing message annotations section
+static void compose_message_annotations(qd_message_pvt_t *msg, qd_buffer_list_t *out,
+                                        qd_buffer_list_t *out_trailer,
+                                        bool strip_annotations, bool is_interrouter)
+{
+    if (is_interrouter) {
+        compose_message_annotations_v1(msg, out, out_trailer, strip_annotations);
+    } else {
+        compose_message_annotations_v0(msg, out);
+    }
 }
 
+
 void qd_message_send(qd_message_t *in_msg,
                      qd_link_t    *link,
                      bool          strip_annotations)
@@ -1154,28 +1269,16 @@ void qd_message_send(qd_message_t *in_msg,
     qd_buffer_t          *buf     = DEQ_HEAD(content->buffers);
     unsigned char        *cursor;
     pn_link_t            *pnl     = qd_link_pn(link);
+    qd_connection_t      *qdc     = qd_link_connection(link);
+    bool                  is_rtr  = qd_connection_is_interrouter(qdc);
 
     qd_buffer_list_t new_ma;
+    qd_buffer_list_t new_ma_trailer;
     DEQ_INIT(new_ma);
+    DEQ_INIT(new_ma_trailer);
 
     // Process  the message annotations if any
-    compose_message_annotations(msg, &new_ma, strip_annotations);
-
-    //
-    // This is the case where the message annotations have been modified.
-    // The message send must be divided into sections:  The existing header;
-    // the new message annotations; the rest of the existing message.
-    // Note that the original message annotations that are still in the
-    // buffer chain must not be sent.
-    //
-    // Start by making sure that we've parsed the message sections through
-    // the message annotations
-    //
-    // ??? NO LONGER NECESSARY???
-    if (!qd_message_check(in_msg, QD_DEPTH_MESSAGE_ANNOTATIONS)) {
-        qd_log(log_source, QD_LOG_ERROR, "Cannot send: %s", qd_error_message);
-        return;
-    }
+    compose_message_annotations(msg, &new_ma, &new_ma_trailer, strip_annotations, is_rtr);
 
     //
     // Send header if present
@@ -1201,7 +1304,7 @@ void qd_message_send(qd_message_t *in_msg,
     }
 
     //
-    // Send new message annotations
+    // Send new message annotations map start if any
     //
     qd_buffer_t *da_buf = DEQ_HEAD(new_ma);
     while (da_buf) {
@@ -1212,6 +1315,29 @@ void qd_message_send(qd_message_t *in_msg,
     qd_buffer_list_free_buffers(&new_ma);
 
     //
+    // Annotations possibly include an opaque blob of user annotations
+    //
+    if (content->field_user_annotations.length > 0) {
+        qd_buffer_t *buf2      = content->field_user_annotations.buffer;
+        unsigned char *cursor2 = content->field_user_annotations.offset + qd_buffer_base(buf);
+        advance(&cursor2, &buf2,
+                content->field_user_annotations.length,
+                send_handler, (void*) pnl);
+    }
+
+    //
+    // Annotations may include the v1 new_ma_trailer
+    //
+    qd_buffer_t *ta_buf = DEQ_HEAD(new_ma_trailer);
+    while (ta_buf) {
+        char *to_send = (char*) qd_buffer_base(ta_buf);
+        pn_link_send(pnl, to_send, qd_buffer_size(ta_buf));
+        ta_buf = DEQ_NEXT(ta_buf);
+    }
+    qd_buffer_list_free_buffers(&new_ma_trailer);
+
+
+    //
     // Skip over replaced message annotations
     //
     if (content->section_message_annotation.length > 0)
@@ -1472,9 +1598,13 @@ void qd_message_compose_1(qd_message_t *msg, const char *to, qd_buffer_list_t *b
     qd_compose_end_list(field);
 
     qd_buffer_list_t out_ma;
+    qd_buffer_list_t out_ma_trailer;
     DEQ_INIT(out_ma);
-    compose_message_annotations((qd_message_pvt_t*)msg, &out_ma, false);
+    DEQ_INIT(out_ma_trailer);
+    compose_message_annotations((qd_message_pvt_t*)msg, &out_ma, &out_ma_trailer, false, true);
     qd_compose_insert_buffers(field, &out_ma);
+    // TODO: user annotation blob goes here
+    qd_compose_insert_buffers(field, &out_ma_trailer);
 
     field = qd_compose(QD_PERFORMATIVE_PROPERTIES, field);
     qd_compose_start_list(field);
@@ -1530,3 +1660,32 @@ void qd_message_compose_3(qd_message_t *msg, qd_composed_field_t *field1, qd_com
     }
 }
 
+
+qd_parsed_field_t *qd_message_get_ingress    (qd_message_t *msg)
+{
+    return ((qd_message_pvt_t*)msg)->content->ma_pf_ingress;
+}
+
+
+qd_parsed_field_t *qd_message_get_phase      (qd_message_t *msg)
+{
+    return ((qd_message_pvt_t*)msg)->content->ma_pf_phase;
+}
+
+
+qd_parsed_field_t *qd_message_get_to_override(qd_message_t *msg)
+{
+    return ((qd_message_pvt_t*)msg)->content->ma_pf_to_override;
+}
+
+
+qd_parsed_field_t *qd_message_get_trace      (qd_message_t *msg)
+{
+    return ((qd_message_pvt_t*)msg)->content->ma_pf_trace;
+}
+
+
+int qd_message_get_phase_val(qd_message_t *msg)
+{
+    return ((qd_message_pvt_t*)msg)->content->ma_int_phase;
+}

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/21014c6e/src/message_private.h
----------------------------------------------------------------------
diff --git a/src/message_private.h b/src/message_private.h
index d59f7a9..c2dc7e2 100644
--- a/src/message_private.h
+++ b/src/message_private.h
@@ -75,6 +75,7 @@ typedef struct {
     qd_field_location_t  section_application_properties;  // The application properties list
     qd_field_location_t  section_body;                    // The message body: Data
     qd_field_location_t  section_footer;                  // The footer
+    qd_field_location_t  field_user_annotations;          // Opaque user message annotations, not a real field.
     qd_field_location_t  field_message_id;                // The string value of the message-id
     qd_field_location_t  field_user_id;                   // The string value of the user-id
     qd_field_location_t  field_to;                        // The string value of the to field
@@ -93,7 +94,19 @@ typedef struct {
     qd_buffer_t         *parse_buffer;
     unsigned char       *parse_cursor;
     qd_message_depth_t   parse_depth;
-    qd_parsed_field_t   *parsed_message_annotations;
+
+    bool                 ma_parsed;                       // have parsed annotations in incoming message
+    qd_iterator_t       *ma_field_iter_in;                // 'message field iterator' for msg.FIELD_MESSAGE_ANNOTATION
+
+    qd_iterator_pointer_t ma_user_annotation_blob;        // Original user annotations
+                                                          // with router annotations stripped
+    uint32_t             ma_count;                        // Number of map elements in blob
+                                                          // after the router fields stripped
+    qd_parsed_field_t   *ma_pf_ingress;
+    qd_parsed_field_t   *ma_pf_phase;
+    qd_parsed_field_t   *ma_pf_to_override;
+    qd_parsed_field_t   *ma_pf_trace;
+    int                  ma_int_phase;
 } qd_message_content_t;
 
 typedef struct {
@@ -103,6 +116,7 @@ typedef struct {
     qd_buffer_list_t      ma_trace;        // trace list in outgoing message annotations
     qd_buffer_list_t      ma_ingress;      // ingress field in outgoing message annotations
     int                   ma_phase;        // phase for the override address
+    bool                  is_interrouter;  // true if peer is another router
 } qd_message_pvt_t;
 
 ALLOC_DECLARE(qd_message_t);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/21014c6e/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index ae30831..a4146bd 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -93,49 +93,26 @@ static int AMQP_writable_conn_handler(void *type_context, qd_connection_t *conn,
 
 
 static qd_iterator_t *router_annotate_message(qd_router_t       *router,
-                                              qd_parsed_field_t *in_ma,
                                               qd_message_t      *msg,
                                               qd_bitmask_t     **link_exclusions,
                                               bool               strip_inbound_annotations)
 {
     qd_iterator_t *ingress_iter = 0;
 
-    qd_parsed_field_t *trace   = 0;
-    qd_parsed_field_t *ingress = 0;
-    qd_parsed_field_t *to      = 0;
-    qd_parsed_field_t *phase   = 0;
+    bool s = strip_inbound_annotations;
 
-    *link_exclusions = 0;
+    qd_parsed_field_t *trace   = s ? 0 : qd_message_get_trace(msg);
+    qd_parsed_field_t *ingress = s ? 0 : qd_message_get_ingress(msg);
+    qd_parsed_field_t *to      = s ? 0 : qd_message_get_to_override(msg);
+    qd_parsed_field_t *phase   = s ? 0 : qd_message_get_phase(msg);
 
-    if (in_ma && !strip_inbound_annotations) {
-        uint32_t count = qd_parse_sub_count(in_ma);
-        bool done = false;
-
-        for (uint32_t idx = 0; idx < count && !done; idx++) {
-            qd_parsed_field_t *sub  = qd_parse_sub_key(in_ma, idx);
-            if (!sub)
-                continue;
-            qd_iterator_t *iter = qd_parse_raw(sub);
-            if (!iter)
-                continue;
-
-            if        (qd_iterator_equal(iter, (unsigned char*) QD_MA_TRACE)) {
-                trace = qd_parse_sub_value(in_ma, idx);
-            } else if (qd_iterator_equal(iter, (unsigned char*) QD_MA_INGRESS)) {
-                ingress = qd_parse_sub_value(in_ma, idx);
-            } else if (qd_iterator_equal(iter, (unsigned char*) QD_MA_TO)) {
-                to = qd_parse_sub_value(in_ma, idx);
-            } else if (qd_iterator_equal(iter, (unsigned char*) QD_MA_PHASE)) {
-                phase = qd_parse_sub_value(in_ma, idx);
-            }
-            done = trace && ingress && to && phase;
-        }
-    }
+    *link_exclusions = 0;
 
     //
     // QD_MA_TRACE:
     // If there is a trace field, append this router's ID to the trace.
     // If the router ID is already in the trace the msg has looped.
+    // This code does not check for the loop condition.
     //
     qd_composed_field_t *trace_field = qd_compose_subfield(0);
     qd_compose_start_list(trace_field);
@@ -182,8 +159,7 @@ static qd_iterator_t *router_annotate_message(qd_router_t       *router,
     // Preserve the existing value.
     //
     if (phase) {
-        int phase_val = qd_parse_as_int(phase);
-        qd_message_set_phase_annotation(msg, phase_val);
+        qd_message_set_phase_annotation(msg, qd_message_get_phase_val(msg));
     }
 
     //
@@ -265,8 +241,10 @@ static void AMQP_rx_handler(void* context, qd_link_t *link, pn_delivery_t *pnd)
     //
     if (qdr_link_is_routed(rlink)) {
         pn_delivery_tag_t dtag = pn_delivery_tag(pnd);
-        delivery = qdr_link_deliver_to_routed_link(rlink, msg, pn_delivery_settled(pnd), (uint8_t*) dtag.start, dtag.size,
-                                                   pn_disposition_type(pn_delivery_remote(pnd)), pn_disposition_data(pn_delivery_remote(pnd)));
+        delivery = qdr_link_deliver_to_routed_link(rlink, msg, pn_delivery_settled(pnd),
+                                                   (uint8_t*) dtag.start, dtag.size,
+                                                   pn_disposition_type(pn_delivery_remote(pnd)),
+                                                   pn_disposition_data(pn_delivery_remote(pnd)));
 
         if (delivery) {
             if (pn_delivery_settled(pnd))
@@ -308,6 +286,13 @@ static void AMQP_rx_handler(void* context, qd_link_t *link, pn_delivery_t *pnd)
     // 'to' field.  If the link is not anonymous, we don't need the 'to' field as we will be
     // using the address from the link target.
     //
+    // Validate the content of the delivery as an AMQP message.  This is done partially, only
+    // to validate that we can find the fields we need to route the message.
+    //
+    // If the link is anonymous, we must validate through the message properties to find the
+    // 'to' field.  If the link is not anonymous, we don't need the 'to' field as we will be
+    // using the address from the link target.
+    //
     qd_message_depth_t  validation_depth = (anonymous_link || check_user) ? QD_DEPTH_PROPERTIES : QD_DEPTH_MESSAGE_ANNOTATIONS;
     bool                valid_message    = qd_message_check(msg, validation_depth);
 
@@ -334,10 +319,11 @@ static void AMQP_rx_handler(void* context, qd_link_t *link, pn_delivery_t *pnd)
             }
         }
 
-        qd_parsed_field_t   *in_ma        = qd_message_message_annotations(msg);
+        qd_message_message_annotations(msg);
         qd_bitmask_t        *link_exclusions;
         bool                 strip        = qdr_link_strip_annotations_in(rlink);
-        qd_iterator_t *ingress_iter = router_annotate_message(router, in_ma, msg, &link_exclusions, strip);
+
+        qd_iterator_t *ingress_iter = router_annotate_message(router, msg, &link_exclusions, strip);
 
         if (anonymous_link) {
             qd_iterator_t *addr_iter = 0;
@@ -346,12 +332,10 @@ static void AMQP_rx_handler(void* context, qd_link_t *link, pn_delivery_t *pnd)
             //
             // If the message has delivery annotations, get the to-override field from the annotations.
             //
-            if (in_ma) {
-                qd_parsed_field_t *ma_to = qd_parse_value_by_key(in_ma, QD_MA_TO);
-                if (ma_to) {
-                    addr_iter = qd_iterator_dup(qd_parse_raw(ma_to));
-                    phase = qd_message_get_phase_annotation(msg);
-                }
+            qd_parsed_field_t *ma_to = qd_message_get_to_override(msg);
+            if (ma_to) {
+                addr_iter = qd_iterator_dup(qd_parse_raw(ma_to));
+                phase = qd_message_get_phase_val(msg);
             }
 
             //
@@ -1047,7 +1031,7 @@ static void CORE_link_deliver(void *context, qdr_link_t *link, qdr_delivery_t *d
     // handle any delivery-state on the transfer e.g. transactional-state
     qdr_delivery_write_extension_state(dlv, pdlv, true);
     //
-    // If the remote send settle mode is set to 'settled', we should settle the delivery on behalf of the receiver.
+    // If the remote send settle mode is set to 'settled' then settle the delivery on behalf of the receiver.
     //
     bool remote_snd_settled = qd_link_remote_snd_settle_mode(qlink) == PN_SND_SETTLED;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org