You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2015/03/12 20:32:16 UTC

svn commit: r1666279 - in /qpid/dispatch/trunk: include/qpid/dispatch/ src/ tests/

Author: kgiusti
Date: Thu Mar 12 19:32:16 2015
New Revision: 1666279

URL: http://svn.apache.org/r1666279
Log:
DISPATCH-98: Refactor message annotation handling

Allows the various annotations to be set on a per-message basis.

Added:
    qpid/dispatch/trunk/tests/buffer_test.c   (with props)
Modified:
    qpid/dispatch/trunk/include/qpid/dispatch/buffer.h
    qpid/dispatch/trunk/include/qpid/dispatch/compose.h
    qpid/dispatch/trunk/include/qpid/dispatch/ctools.h
    qpid/dispatch/trunk/include/qpid/dispatch/message.h
    qpid/dispatch/trunk/src/buffer.c
    qpid/dispatch/trunk/src/compose.c
    qpid/dispatch/trunk/src/message.c
    qpid/dispatch/trunk/src/message_private.h
    qpid/dispatch/trunk/src/router_node.c
    qpid/dispatch/trunk/tests/CMakeLists.txt
    qpid/dispatch/trunk/tests/compose_test.c
    qpid/dispatch/trunk/tests/message_test.c
    qpid/dispatch/trunk/tests/run_unit_tests_size.c

Modified: qpid/dispatch/trunk/include/qpid/dispatch/buffer.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/buffer.h?rev=1666279&r1=1666278&r2=1666279&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/buffer.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/buffer.h Thu Mar 12 19:32:16 2015
@@ -92,6 +92,32 @@ size_t qd_buffer_size(qd_buffer_t *buf);
  */
 void qd_buffer_insert(qd_buffer_t *buf, size_t len);
 
+/**
+ * Create a new buffer list by cloning an existing one.
+ *
+ * @param dst A pointer to a list to contain the new buffers
+ * @param src A pointer to an existing buffer list
+ * @return the number of bytes of data in the new chain
+ */
+unsigned int qd_buffer_list_clone(qd_buffer_list_t *dst, const qd_buffer_list_t *src);
+
+/**
+ * Free all the buffers contained in a buffer list
+ *
+ * @param list A pointer to a list containing buffers.  On return this list
+ * will be set to an empty list.
+ */
+void qd_buffer_list_free_buffers(qd_buffer_list_t *list);
+
+/**
+ * Return the total number of data bytes in a buffer list
+ *
+ * @param list A pointer to a list containing buffers.
+ * @return total number of bytes of data in the buffer list
+ */
+unsigned int qd_buffer_list_length(const qd_buffer_list_t *list);
+
+
 ///@}
 
 #endif

Modified: qpid/dispatch/trunk/include/qpid/dispatch/compose.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/compose.h?rev=1666279&r1=1666278&r2=1666279&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/compose.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/compose.h Thu Mar 12 19:32:16 2015
@@ -209,6 +209,33 @@ void qd_compose_insert_symbol(qd_compose
  */
 void qd_compose_insert_typed_iterator(qd_composed_field_t *field, qd_field_iterator_t *iter);
 
+/**
+ * Begin composing a new sub field that can be appended to a composed field.
+ *
+ * @param extend An existing field onto which to append the new field or NULL to
+ *        create a standalone field.
+ * @return A pointer to the newly created field.
+ */
+qd_composed_field_t *qd_compose_subfield(qd_composed_field_t *extend);
+
+/**
+ * Steal the underlying buffers away from a composed field.
+ *
+ * @param field A composed field whose buffers will be taken
+ * @param list To hold the extracted buffers
+ */
+void qd_compose_take_buffers(qd_composed_field_t *field,
+                             qd_buffer_list_t *list);
+/**
+ * Append a buffer list into a composed field.  If field is a container type
+ * (list, map), the container's count will be incremented by one.
+ *
+ * @param field A field created by ::qd_compose().
+ * @param list A list of buffers containing a single completely encoded data
+ * object.  Ownership of these buffers is given to field.
+ */
+void qd_compose_insert_buffers(qd_composed_field_t *field, qd_buffer_list_t *list);
+
 ///@}
 
 #endif

Modified: qpid/dispatch/trunk/include/qpid/dispatch/ctools.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/ctools.h?rev=1666279&r1=1666278&r2=1666279&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/ctools.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/ctools.h Thu Mar 12 19:32:16 2015
@@ -45,6 +45,7 @@
 #define DEQ_EMPTY {0,0,0,0}
 
 #define DEQ_INIT(d) do { (d).head = 0; (d).tail = 0; (d).scratch = 0; (d).size = 0; } while (0)
+#define DEQ_IS_EMPTY(d) ((d).head == 0)
 #define DEQ_ITEM_INIT(i) do { (i)->next = 0; (i)->prev = 0; } while(0)
 #define DEQ_HEAD(d) ((d).head)
 #define DEQ_TAIL(d) ((d).tail)
@@ -156,4 +157,17 @@ do {
     CT_ASSERT((d).size || (!(d).head && !(d).tail)); \
 } while (0)
 
+#define DEQ_APPEND(d1,d2)               \
+do {                                    \
+    if (!(d1).head)                     \
+        (d1) = (d2);                    \
+    else if ((d2).head) {               \
+        (d1).tail->next = (d2).head;    \
+        (d2).head->prev = (d1).tail;    \
+        (d1).tail = (d2).tail;          \
+        (d1).size += (d2).size;         \
+    }                                   \
+    DEQ_INIT(d2);                       \
+} while (0)
+
 #endif

Modified: qpid/dispatch/trunk/include/qpid/dispatch/message.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/message.h?rev=1666279&r1=1666278&r2=1666279&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/message.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/message.h Thu Mar 12 19:32:16 2015
@@ -144,18 +144,52 @@ qd_message_t *qd_message_copy(qd_message
 qd_parsed_field_t *qd_message_message_annotations(qd_message_t *msg);
 
 /**
- * Set the message annotations for the message.  If the message already has message annotations,
- * they will be overwritten/replaced by the new field.
+ * Set the value for the QD_MA_TRACE field in the outgoing message annotations
+ * for the message.
+ *
+ * IMPORTANT: This method takes ownership of the trace_field - the calling
+ * method must not reference it after this call.
+ *
+ * @param msg Pointer to an outgoing message.
+ * @param trace_field Pointer to a composed field representing the list that
+ * will be used as the value for the QD_MA_TRACE map entry.  If null, the
+ * message will not have a QA_MA_TRACE message annotation field.  Ownership of
+ * this field is transferred to the message.
+ *
+ */
+void qd_message_set_trace_annotation(qd_message_t *msg, qd_composed_field_t *trace_field);
+
+/**
+ * Set the value for the QD_MA_TO field in the outgoing message annotations for
+ * the message.
+ *
+ * IMPORTANT: This method takes ownership of the to_field - the calling
+ * method must not reference it after this call.
+ *
+ * @param msg Pointer to an outgoing message.
+ * @param to_field Pointer to a composed field representing the to overrid
+ * address that will be used as the value for the QD_MA_TO map entry.  If null,
+ * the message will not have a QA_MA_TO message annotation field.  Ownership of
+ * this field is transferred to the message.
+ *
+ */
+void qd_message_set_to_override_annotation(qd_message_t *msg, qd_composed_field_t *to_field);
+
+/**
+ * Set the value for the QD_MA_INGRESS field in the outgoing message
+ * annotations for the message.
+ *
+ * IMPORTANT: This method takes ownership of the ingress_field - the calling
+ * method must not reference it after this call.
+ *
+ * @param msg Pointer to an outgoing message.
+ * @param ingress_field Pointer to a composed field representing ingress router
+ * that will be used as the value for the QD_MA_INGRESS map entry.  If null,
+ * the message will not have a QA_MA_INGRESS message annotation field.
+ * Ownership of this field is transferred to the message.
  *
- * @param msg Pointer to a receiver message.
- * @param da Pointer to a composed field representing the new message annotations of the message.
- *           If null, the message will not have a message annotations field.
- *           IMPORTANT: The message will not take ownership of the composed field.  The
- *                      caller is responsible for freeing it after this call.  Since the contents
- *                      are copied into the message, it is safe to free the composed field
- *                      any time after the call to this function.
  */
-void qd_message_set_message_annotations(qd_message_t *msg, qd_composed_field_t *da);
+void qd_message_set_ingress_annotation(qd_message_t *msg, qd_composed_field_t *ingress_field);
 
 /**
  * Receive message data via a delivery.  This function may be called more than once on the same

Modified: qpid/dispatch/trunk/src/buffer.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/buffer.c?rev=1666279&r1=1666278&r2=1666279&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/buffer.c (original)
+++ qpid/dispatch/trunk/src/buffer.c Thu Mar 12 19:32:16 2015
@@ -20,6 +20,8 @@
 #include <qpid/dispatch/buffer.h>
 #include "alloc.h"
 
+#include <string.h>
+
 static size_t buffer_size = 512;
 static int    size_locked = 0;
 
@@ -81,3 +83,52 @@ void qd_buffer_insert(qd_buffer_t *buf,
     buf->size += len;
     assert(buf->size <= buffer_size);
 }
+
+unsigned int qd_buffer_list_clone(qd_buffer_list_t *dst, const qd_buffer_list_t *src)
+{
+    uint32_t len = 0;
+    DEQ_INIT(*dst);
+    qd_buffer_t *buf = DEQ_HEAD(*src);
+    while (buf) {
+        size_t to_copy = qd_buffer_size(buf);
+        unsigned char *src = qd_buffer_base(buf);
+        len += to_copy;
+        while (to_copy) {
+            qd_buffer_t *newbuf = qd_buffer();
+            size_t count = qd_buffer_capacity(newbuf);
+            // default buffer capacity may have changed,
+            // so don't assume it will fit:
+            if (count > to_copy) count = to_copy;
+            memcpy(qd_buffer_cursor(newbuf), src, count);
+            qd_buffer_insert(newbuf, count);
+            DEQ_INSERT_TAIL(*dst, newbuf);
+            src += count;
+            to_copy -= count;
+        }
+        buf = DEQ_NEXT(buf);
+    }
+    return len;
+}
+
+
+void qd_buffer_list_free_buffers(qd_buffer_list_t *list)
+{
+    qd_buffer_t *buf = DEQ_HEAD(*list);
+    while (buf) {
+        DEQ_REMOVE_HEAD(*list);
+        qd_buffer_free(buf);
+        buf = DEQ_HEAD(*list);
+    }
+}
+
+
+unsigned int qd_buffer_list_length(const qd_buffer_list_t *list)
+{
+    unsigned int len = 0;
+    qd_buffer_t *buf = DEQ_HEAD(*list);
+    while (buf) {
+        len += qd_buffer_size(buf);
+        buf = DEQ_NEXT(buf);
+    }
+    return len;
+}

Modified: qpid/dispatch/trunk/src/compose.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/compose.c?rev=1666279&r1=1666278&r2=1666279&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/compose.c (original)
+++ qpid/dispatch/trunk/src/compose.c Thu Mar 12 19:32:16 2015
@@ -35,6 +35,14 @@ static void bump_count(qd_composed_field
         comp->count++;
 }
 
+static void bump_length(qd_composed_field_t *field,
+                        uint32_t length)
+{
+    qd_composite_t *comp = DEQ_HEAD(field->fieldStack);
+    if (comp)
+        comp->length += length;
+}
+
 
 static void qd_insert(qd_composed_field_t *field, const uint8_t *seq, size_t len)
 {
@@ -185,7 +193,7 @@ static void qd_compose_end_composite(qd_
 }
 
 
-qd_composed_field_t *qd_compose(uint64_t performative, qd_composed_field_t *extend)
+qd_composed_field_t *qd_compose_subfield(qd_composed_field_t *extend)
 {
     qd_composed_field_t *field = extend;
 
@@ -200,8 +208,18 @@ qd_composed_field_t *qd_compose(uint64_t
         DEQ_INIT(field->fieldStack);
     }
 
-    qd_insert_8(field, 0x00);
-    qd_compose_insert_ulong(field, performative);
+    return field;
+}
+
+
+qd_composed_field_t *qd_compose(uint64_t performative, qd_composed_field_t *extend)
+{
+    qd_composed_field_t *field = qd_compose_subfield(extend);
+
+    if (field) {
+        qd_insert_8(field, 0x00);
+        qd_compose_insert_ulong(field, performative);
+    }
 
     return field;
 }
@@ -392,6 +410,8 @@ void qd_compose_insert_binary_buffers(qd
         DEQ_INSERT_TAIL(field->buffers, buf);
         buf = DEQ_HEAD(*buffers);
     }
+    bump_length(field, len);
+    bump_count(field);
 }
 
 
@@ -470,3 +490,24 @@ qd_buffer_list_t *qd_compose_buffers(qd_
 {
     return &field->buffers;
 }
+
+void qd_compose_take_buffers(qd_composed_field_t *field,
+                             qd_buffer_list_t *list)
+{
+    // assumption: extracting partially built containers is wrong:
+    assert(DEQ_SIZE(field->fieldStack) == 0);
+    *list = *qd_compose_buffers(field);
+    DEQ_INIT(field->buffers); // Zero out the linkage to the now moved buffers.
+}
+
+void qd_compose_insert_buffers(qd_composed_field_t *field,
+                               qd_buffer_list_t *list)
+{
+    uint32_t len = qd_buffer_list_length(list);
+    if (len) {
+        DEQ_APPEND(field->buffers, *list);
+        bump_length(field, len);
+        bump_count(field);
+    }
+}
+

Modified: qpid/dispatch/trunk/src/message.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/message.c?rev=1666279&r1=1666278&r2=1666279&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/message.c (original)
+++ qpid/dispatch/trunk/src/message.c Thu Mar 12 19:32:16 2015
@@ -536,6 +536,9 @@ qd_message_t *qd_message()
         return 0;
 
     DEQ_ITEM_INIT(msg);
+    DEQ_INIT(msg->ma_to_override);
+    DEQ_INIT(msg->ma_trace);
+    DEQ_INIT(msg->ma_ingress);
     msg->content = new_qd_message_content_t();
 
     if (msg->content == 0) {
@@ -558,6 +561,11 @@ void qd_message_free(qd_message_t *in_ms
     if (!in_msg) return;
     uint32_t rc;
     qd_message_pvt_t     *msg     = (qd_message_pvt_t*) in_msg;
+
+    qd_buffer_list_free_buffers(&msg->ma_to_override);
+    qd_buffer_list_free_buffers(&msg->ma_trace);
+    qd_buffer_list_free_buffers(&msg->ma_ingress);
+
     qd_message_content_t *content = msg->content;
 
     sys_mutex_lock(content->lock);
@@ -575,13 +583,6 @@ void qd_message_free(qd_message_t *in_ms
             buf = DEQ_HEAD(content->buffers);
         }
 
-        buf = DEQ_HEAD(content->new_message_annotations);
-        while (buf) {
-            DEQ_REMOVE_HEAD(content->new_message_annotations);
-            qd_buffer_free(buf);
-            buf = DEQ_HEAD(content->new_message_annotations);
-        }
-
         sys_mutex_free(content->lock);
         free_qd_message_content_t(content);
     }
@@ -600,6 +601,10 @@ qd_message_t *qd_message_copy(qd_message
         return 0;
 
     DEQ_ITEM_INIT(copy);
+    qd_buffer_list_clone(&copy->ma_to_override, &msg->ma_to_override);
+    qd_buffer_list_clone(&copy->ma_trace, &msg->ma_trace);
+    qd_buffer_list_clone(&copy->ma_ingress, &msg->ma_ingress);
+
     copy->content = content;
 
     sys_mutex_lock(content->lock);
@@ -637,16 +642,29 @@ qd_parsed_field_t *qd_message_message_an
 }
 
 
-void qd_message_set_message_annotations(qd_message_t *msg, qd_composed_field_t *da)
+void qd_message_set_trace_annotation(qd_message_t *in_msg, qd_composed_field_t *trace_field)
 {
-    qd_message_content_t *content       = MSG_CONTENT(msg);
-    qd_buffer_list_t     *field_buffers = qd_compose_buffers(da);
+    qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
+    qd_buffer_list_free_buffers(&msg->ma_trace);
+    qd_compose_take_buffers(trace_field, &msg->ma_trace);
+    qd_compose_free(trace_field);
+}
 
-    assert(DEQ_SIZE(content->new_message_annotations) == 0);
-    content->new_message_annotations = *field_buffers;
-    DEQ_INIT(*field_buffers); // Zero out the linkage to the now moved buffers.
+void qd_message_set_to_override_annotation(qd_message_t *in_msg, qd_composed_field_t *to_field)
+{
+    qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
+    qd_buffer_list_free_buffers(&msg->ma_to_override);
+    qd_compose_take_buffers(to_field, &msg->ma_to_override);
+    qd_compose_free(to_field);
 }
 
+void qd_message_set_ingress_annotation(qd_message_t *in_msg, qd_composed_field_t *ingress_field)
+{
+    qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
+    qd_buffer_list_free_buffers(&msg->ma_ingress);
+    qd_compose_take_buffers(ingress_field, &msg->ma_ingress);
+    qd_compose_free(ingress_field);
+}
 
 qd_message_t *qd_message_receive(qd_delivery_t *delivery)
 {
@@ -740,6 +758,41 @@ static void send_handler(void *context,
     pn_link_send(pnl, (const char*) start, length);
 }
 
+// create a buffer chain holding the outgoing message annotations section
+static bool compose_message_annotations(qd_message_pvt_t *msg, qd_buffer_list_t *out)
+{
+    DEQ_INIT(*out);
+    if (!DEQ_IS_EMPTY(msg->ma_to_override) ||
+        !DEQ_IS_EMPTY(msg->ma_trace) ||
+        !DEQ_IS_EMPTY(msg->ma_ingress)) {
+
+        qd_composed_field_t *out_ma = qd_compose(QD_PERFORMATIVE_MESSAGE_ANNOTATIONS, 0);
+        qd_compose_start_map(out_ma);
+
+        if (!DEQ_IS_EMPTY(msg->ma_to_override)) {
+            qd_compose_insert_symbol(out_ma, QD_MA_TO);
+            qd_compose_insert_buffers(out_ma, &msg->ma_to_override);
+        }
+
+        if (!DEQ_IS_EMPTY(msg->ma_trace)) {
+            qd_compose_insert_symbol(out_ma, QD_MA_TRACE);
+            qd_compose_insert_buffers(out_ma, &msg->ma_trace);
+        }
+
+        if (!DEQ_IS_EMPTY(msg->ma_ingress)) {
+            qd_compose_insert_symbol(out_ma, QD_MA_INGRESS);
+            qd_compose_insert_buffers(out_ma, &msg->ma_ingress);
+        }
+
+        qd_compose_end_map(out_ma);
+
+        qd_compose_take_buffers(out_ma, out);
+        qd_compose_free(out_ma);
+        return true;
+    }
+
+    return false;
+}
 
 void qd_message_send(qd_message_t *in_msg, qd_link_t *link)
 {
@@ -754,7 +807,8 @@ void qd_message_send(qd_message_t *in_ms
            qd_message_repr(in_msg, repr, sizeof(repr)),
            pn_link_name(pnl));
 
-    if (DEQ_SIZE(content->new_message_annotations) > 0) {
+    qd_buffer_list_t new_ma;
+    if (compose_message_annotations(msg, &new_ma)) {
         //
         // This is the case where the message annotations have been modified.
         // The message send must be divided into sections:  The existing header;
@@ -765,6 +819,7 @@ void qd_message_send(qd_message_t *in_ms
         // Start by making sure that we've parsed the message sections through
         // the message annotations
         //
+        // ??? NO LONGER NECESSARY???
         if (!qd_message_check(in_msg, QD_DEPTH_MESSAGE_ANNOTATIONS)) {
             qd_log(log_source, QD_LOG_ERROR, "Cannot send: %s", qd_error_message);
             return;
@@ -785,11 +840,12 @@ void qd_message_send(qd_message_t *in_ms
         //
         // Send new message annotations
         //
-        qd_buffer_t *da_buf = DEQ_HEAD(content->new_message_annotations);
+        qd_buffer_t *da_buf = DEQ_HEAD(new_ma);
         while (da_buf) {
             pn_link_send(pnl, (char*) qd_buffer_base(da_buf), qd_buffer_size(da_buf));
             da_buf = DEQ_NEXT(da_buf);
         }
+        qd_buffer_list_free_buffers(&new_ma);
 
         //
         // Skip over replaced message annotations
@@ -1046,6 +1102,11 @@ void qd_message_compose_1(qd_message_t *
     //qd_compose_insert_uint(field, 0);     // delivery-count
     qd_compose_end_list(field);
 
+    qd_buffer_list_t out_ma;
+    if (compose_message_annotations((qd_message_pvt_t*)msg, &out_ma)) {
+        qd_compose_insert_buffers(field, &out_ma);
+    }
+
     field = qd_compose(QD_PERFORMATIVE_PROPERTIES, field);
     qd_compose_start_list(field);
     qd_compose_insert_null(field);          // message-id
@@ -1068,10 +1129,7 @@ void qd_message_compose_1(qd_message_t *
         qd_compose_insert_binary_buffers(field, buffers);
     }
 
-    qd_buffer_list_t *field_buffers = qd_compose_buffers(field);
-    content->buffers = *field_buffers;
-    DEQ_INIT(*field_buffers); // Zero out the linkage to the now moved buffers.
-
+    qd_compose_take_buffers(field, &content->buffers);
     qd_compose_free(field);
 }
 

Modified: qpid/dispatch/trunk/src/message_private.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/message_private.h?rev=1666279&r1=1666278&r2=1666279&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/message_private.h (original)
+++ qpid/dispatch/trunk/src/message_private.h Thu Mar 12 19:32:16 2015
@@ -59,11 +59,6 @@ typedef struct {
 
 
 // TODO - consider using pointers to qd_field_location_t below to save memory
-// TODO - we need a second buffer list for modified annotations and header
-//        There are three message scenarios:
-//            1) Received message is held and forwarded unmodified - single buffer list
-//            2) Received message is held and modified before forwarding - two buffer lists
-//            3) Message is composed internally - single buffer list
 // TODO - provide a way to allocate a message without a lock for the link-routing case.
 //        It's likely that link-routing will cause no contention for the message content.
 //
@@ -72,7 +67,6 @@ typedef struct {
     sys_mutex_t         *lock;
     uint32_t             ref_count;                       // The number of messages referencing this
     qd_buffer_list_t     buffers;                         // The buffer chain containing the message
-    qd_buffer_list_t     new_message_annotations;         // The buffer chain containing the new message annotations (MOVE TO MSG_PVT)
     qd_field_location_t  section_message_header;          // The message header list
     qd_field_location_t  section_delivery_annotation;     // The delivery annotation map
     qd_field_location_t  section_message_annotation;      // The message annotation map
@@ -96,6 +90,10 @@ typedef struct {
 typedef struct {
     DEQ_LINKS(qd_message_t);   // Deque linkage that overlays the qd_message_t
     qd_message_content_t *content;
+    qd_buffer_list_t      ma_to_override;  // to field in outgoing message annotations.
+    qd_buffer_list_t      ma_trace;        // trace list in outgoing message annotations
+    qd_buffer_list_t      ma_ingress;      // ingress field in outgoing message annotations
+
 } qd_message_pvt_t;
 
 ALLOC_DECLARE(qd_message_t);

Modified: qpid/dispatch/trunk/src/router_node.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_node.c?rev=1666279&r1=1666278&r2=1666279&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_node.c (original)
+++ qpid/dispatch/trunk/src/router_node.c Thu Mar 12 19:32:16 2015
@@ -546,7 +546,6 @@ static qd_field_iterator_t *router_annot
                                                     int               *drop,
                                                     const char        *to_override)
 {
-    qd_composed_field_t *out_ma       = qd_compose(QD_PERFORMATIVE_MESSAGE_ANNOTATIONS, 0);
     qd_field_iterator_t *ingress_iter = 0;
 
     qd_parsed_field_t *trace   = 0;
@@ -554,62 +553,81 @@ static qd_field_iterator_t *router_annot
     qd_parsed_field_t *to      = 0;
 
     if (in_ma) {
-        trace   = qd_parse_value_by_key(in_ma, QD_MA_TRACE);
-        ingress = qd_parse_value_by_key(in_ma, QD_MA_INGRESS);
-        to      = qd_parse_value_by_key(in_ma, QD_MA_TO);
-    }
-
-    qd_compose_start_map(out_ma);
+        uint32_t count = qd_parse_sub_count(in_ma);
+        bool done = false;
 
-    //
-    // If there is a to_override provided, insert a TO field.
-    //
-    if (to_override) {
-        qd_compose_insert_symbol(out_ma, QD_MA_TO);
-        qd_compose_insert_string(out_ma, to_override);
-    } else if (to) {
-        qd_compose_insert_symbol(out_ma, QD_MA_TO);
-        qd_compose_insert_string_iterator(out_ma, qd_parse_raw(to));
+        for (uint32_t idx = 0; idx < count && !done; idx++) {
+            qd_parsed_field_t *sub  = qd_parse_sub_key(in_ma, idx);
+            if (!sub) continue;
+            qd_field_iterator_t *iter = qd_parse_raw(sub);
+            if (!iter) continue;
+
+            if (qd_field_iterator_equal(iter, (unsigned char *)QD_MA_TRACE)) {
+                trace = qd_parse_sub_value(in_ma, idx);
+            } else if (qd_field_iterator_equal(iter, (unsigned char *)QD_MA_INGRESS)) {
+                ingress = qd_parse_sub_value(in_ma, idx);
+            } else if (qd_field_iterator_equal(iter, (unsigned char *)QD_MA_TO)) {
+                to = qd_parse_sub_value(in_ma, idx);
+            }
+            done = trace && ingress && to;
+        }
     }
 
     //
+    // QD_MA_TRACE:
     // If there is a trace field, append this router's ID to the trace.
+    // If the router ID is already in the trace the msg has looped.
     //
-    qd_compose_insert_symbol(out_ma, QD_MA_TRACE);
-    qd_compose_start_list(out_ma);
+    qd_composed_field_t *trace_field = qd_compose_subfield(0);
+    qd_compose_start_list(trace_field);
     if (trace) {
         if (qd_parse_is_list(trace)) {
             uint32_t idx = 0;
             qd_parsed_field_t *trace_item = qd_parse_sub_value(trace, idx);
             while (trace_item) {
                 qd_field_iterator_t *iter = qd_parse_raw(trace_item);
-                if (qd_field_iterator_equal(iter, (unsigned char*) node_id))
+                if (qd_field_iterator_equal(iter, (unsigned char*) node_id)) {
                     *drop = 1;
-                qd_compose_insert_string_iterator(out_ma, iter);
+                    return 0;  // no further processing necessary
+                }
+                qd_field_iterator_reset(iter);
+                qd_compose_insert_string_iterator(trace_field, iter);
                 idx++;
                 trace_item = qd_parse_sub_value(trace, idx);
             }
         }
     }
+    qd_compose_insert_string(trace_field, node_id);
+    qd_compose_end_list(trace_field);
+    qd_message_set_trace_annotation(msg, trace_field);
 
-    qd_compose_insert_string(out_ma, node_id);
-    qd_compose_end_list(out_ma);
+    //
+    // QD_MA_TO:
+    // The supplied to override takes precedense over any existing
+    // value.
+    //
+    if (to_override) {  // takes precedence over existing value
+        qd_composed_field_t *to_field = qd_compose_subfield(0);
+        qd_compose_insert_string(to_field, to_override);
+        qd_message_set_to_override_annotation(msg, to_field);
+    } else if (to) {
+        qd_composed_field_t *to_field = qd_compose_subfield(0);
+        qd_compose_insert_string_iterator(to_field, qd_parse_raw(to));
+        qd_message_set_to_override_annotation(msg, to_field);
+    }
 
     //
-    // If there is no ingress field, annotate the ingress as this router else
-    // keep the original field.
+    // QD_MA_INGRESS:
+    // If there is no ingress field, annotate the ingress as
+    // this router else keep the original field.
     //
-    qd_compose_insert_symbol(out_ma, QD_MA_INGRESS);
+    qd_composed_field_t *ingress_field = qd_compose_subfield(0);
     if (ingress && qd_parse_is_scalar(ingress)) {
         ingress_iter = qd_parse_raw(ingress);
-        qd_compose_insert_string_iterator(out_ma, ingress_iter);
+        qd_compose_insert_string_iterator(ingress_field, ingress_iter);
     } else
-        qd_compose_insert_string(out_ma, node_id);
-
-    qd_compose_end_map(out_ma);
-
-    qd_message_set_message_annotations(msg, out_ma);
-    qd_compose_free(out_ma);
+        qd_compose_insert_string(ingress_field, node_id);
+    qd_message_set_ingress_annotation(msg, ingress_field);
 
     //
     // Return the iterator to the ingress field _if_ it was present.

Modified: qpid/dispatch/trunk/tests/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/CMakeLists.txt?rev=1666279&r1=1666278&r2=1666279&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/CMakeLists.txt (original)
+++ qpid/dispatch/trunk/tests/CMakeLists.txt Thu Mar 12 19:32:16 2015
@@ -49,6 +49,7 @@ target_link_libraries(unit_tests qpid-di
 set(unit_test_size_SOURCES
     field_test.c
     message_test.c
+    buffer_test.c
     run_unit_tests_size.c
     )
 

Added: qpid/dispatch/trunk/tests/buffer_test.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/buffer_test.c?rev=1666279&view=auto
==============================================================================
--- qpid/dispatch/trunk/tests/buffer_test.c (added)
+++ qpid/dispatch/trunk/tests/buffer_test.c Thu Mar 12 19:32:16 2015
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#define _GNU_SOURCE
+#include <stdio.h>
+#include <string.h>
+#include "test_case.h"
+#include <qpid/dispatch/buffer.h>
+
+
+static void fill_buffer(qd_buffer_list_t *list,
+                        const unsigned char *data,
+                        int length)
+{
+    DEQ_INIT(*list);
+    while (length > 0) {
+        qd_buffer_t *buf = qd_buffer();
+        size_t count = qd_buffer_capacity(buf);
+        if (length < count) count = length;
+        memcpy(qd_buffer_cursor(buf),
+               data, count);
+        qd_buffer_insert(buf, count);
+        DEQ_INSERT_TAIL(*list, buf);
+        data += count;
+        length -= count;
+    }
+}
+
+static int compare_buffer(const qd_buffer_list_t *list,
+                          const unsigned char *data,
+                          int length)
+{
+    qd_buffer_t *buf = DEQ_HEAD(*list);
+    while (buf && length > 0) {
+        size_t count = qd_buffer_size(buf);
+        if (length < count) count = length;
+        if (memcmp(qd_buffer_base(buf), data, count))
+            return 0;
+        length -= count;
+        data += count;
+        buf = DEQ_NEXT(buf);
+    }
+    return !buf && length == 0;
+}
+
+
+static const char pattern[] = "This piggy went 'wee wee wee' all the way home!";
+static const int pattern_len = sizeof(pattern);
+
+static char *test_buffer_list_clone(void *context)
+{
+    qd_buffer_list_t list;
+    fill_buffer(&list, (unsigned char *)pattern, pattern_len);
+    if (qd_buffer_list_length(&list) != pattern_len) return "Invalid fill?";
+
+    qd_buffer_list_t copy;
+    unsigned int len = qd_buffer_list_clone(&copy, &list);
+    if (len != pattern_len) return "Copy failed";
+
+    // 'corrupt' source buffer list:
+    *qd_buffer_base(DEQ_HEAD(list)) = (unsigned char)'X';
+    qd_buffer_list_free_buffers(&list);
+    if (!DEQ_IS_EMPTY(list)) return "List should be empty!";
+
+    // ensure copy is un-molested:
+    if (!compare_buffer(&copy, (unsigned char *)pattern, pattern_len)) return "Buffer list corrupted";
+
+    return 0;
+}
+
+
+int buffer_tests()
+{
+    int result = 0;
+
+    TEST_CASE(test_buffer_list_clone, 0);
+
+    return result;
+}
+

Propchange: qpid/dispatch/trunk/tests/buffer_test.c
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/dispatch/trunk/tests/compose_test.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/compose_test.c?rev=1666279&r1=1666278&r2=1666279&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/compose_test.c (original)
+++ qpid/dispatch/trunk/tests/compose_test.c Thu Mar 12 19:32:16 2015
@@ -278,6 +278,67 @@ static char *test_compose_scalars(void *
 }
 
 
+// verify composition via a set of sub-fields
+static char *vector3 =
+    "\x00\x53\x72"                              // message annotations
+    "\xD1\x00\x00\x00\x1A\x00\x00\x00\x04"      // map32, 26 bytes, 4 fields
+    "\xA1\x04Key1"                              // str8
+    "\x70\x00\x00\x03\xE7"                      // uint 999
+    "\xA1\x04Key2"                              // str8
+    "\x70\x00\x00\x03\x78";                     // uint 888
+static int vector3_length = 34;
+
+static char *test_compose_subfields(void *context)
+{
+    qd_composed_field_t *sub1 = qd_compose_subfield(0);
+    qd_compose_insert_string(sub1, "Key1");
+    qd_composed_field_t *sub2 = qd_compose_subfield(0);
+    qd_compose_insert_uint(sub2, 999);
+
+    qd_composed_field_t *sub3 = qd_compose_subfield(0);
+    qd_compose_insert_string(sub3, "Key2");
+
+    //
+    qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_MESSAGE_ANNOTATIONS, 0);
+    qd_compose_start_map(field);
+    qd_compose_insert_buffers(field, &sub1->buffers);
+    if (!DEQ_IS_EMPTY(sub1->buffers)) return "Buffer chain ownership not transferred!";
+    qd_compose_free(sub1);
+    qd_compose_insert_buffers(field, &sub2->buffers);
+    qd_compose_free(sub2);
+
+    qd_compose_insert_buffers(field, &sub3->buffers);
+    qd_compose_free(sub3);
+    qd_compose_insert_uint(field, 888);
+    qd_compose_end_map(field);
+
+    qd_buffer_list_t list;
+    qd_compose_take_buffers(field, &list);
+    if (!DEQ_IS_EMPTY(field->buffers)) return "Buffer list not removed!";
+
+    qd_compose_free(field);
+
+    if (qd_buffer_list_length(&list) != vector3_length)
+        return "Improper encoded length";
+
+    unsigned char *src = (unsigned char *)vector3;
+    qd_buffer_t *buf = DEQ_HEAD(list);
+    while (buf) {
+        unsigned char *c = qd_buffer_base(buf);
+        while (c != (qd_buffer_base(buf) + qd_buffer_size(buf))) {
+            if (*c != *src) return "Pattern Mismatch";
+            c++;
+            src++;
+        }
+        buf = DEQ_NEXT(buf);
+    }
+
+    qd_buffer_list_free_buffers(&list);
+
+    return 0;
+}
+
+
 int compose_tests()
 {
     int result = 0;
@@ -285,6 +346,7 @@ int compose_tests()
     TEST_CASE(test_compose_list_of_maps, 0);
     TEST_CASE(test_compose_nested_composites, 0);
     TEST_CASE(test_compose_scalars, 0);
+    TEST_CASE(test_compose_subfields, 0);
 
     return result;
 }

Modified: qpid/dispatch/trunk/tests/message_test.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/message_test.c?rev=1666279&r1=1666278&r2=1666279&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/message_test.c (original)
+++ qpid/dispatch/trunk/tests/message_test.c Thu Mar 12 19:32:16 2015
@@ -22,6 +22,7 @@
 #include <string.h>
 #include "message_private.h"
 #include <qpid/dispatch/iterator.h>
+#include <qpid/dispatch/amqp.h>
 #include <proton/message.h>
 
 static char buffer[10000];
@@ -212,6 +213,82 @@ static char* test_check_multiple(void *c
 }
 
 
+static char* test_send_message_annotations(void *context)
+{
+    qd_message_t         *msg     = qd_message();
+    qd_message_content_t *content = MSG_CONTENT(msg);
+
+    qd_composed_field_t *trace = qd_compose_subfield(0);
+    qd_compose_start_list(trace);
+    qd_compose_insert_string(trace, "Node1");
+    qd_compose_insert_string(trace, "Node2");
+    qd_compose_end_list(trace);
+    qd_message_set_trace_annotation(msg, trace);
+    qd_compose_free(trace);
+
+    qd_composed_field_t *to_override = qd_compose_subfield(0);
+    qd_compose_insert_string(to_override, "to/address");
+    qd_message_set_to_override_annotation(msg, to_override);
+    qd_compose_free(to_override);
+
+    qd_composed_field_t *ingress = qd_compose_subfield(0);
+    qd_compose_insert_string(ingress, "distress");
+    qd_message_set_ingress_annotation(msg, ingress);
+    qd_compose_free(ingress);
+
+    qd_message_compose_1(msg, "test_addr_0", 0);
+    qd_buffer_t *buf = DEQ_HEAD(content->buffers);
+    if (buf == 0) return "Expected a buffer in the test message";
+
+    pn_message_t *pn_msg = pn_message();
+    size_t len = flatten_bufs(content);
+    int result = pn_message_decode(pn_msg, buffer, len);
+    if (result != 0) return "Error in pn_message_decode";
+
+    pn_data_t *ma = pn_message_annotations(pn_msg);
+    if (!ma) return "Missing message annotations";
+    pn_data_rewind(ma);
+    pn_data_next(ma);
+    if (pn_data_type(ma) != PN_MAP) return "Invalid message annotation type";
+    if (pn_data_get_map(ma) != 6) return "Invalid map length";
+    pn_data_enter(ma);
+    for (int i = 0; i < 6; i+=2) {
+        pn_data_next(ma);
+        if (pn_data_type(ma) != PN_SYMBOL) return "Bad map index";
+        pn_bytes_t sym = pn_data_get_symbol(ma);
+        if (!strncmp(QD_MA_INGRESS, sym.start, sym.size)) {
+            pn_data_next(ma);
+            sym = pn_data_get_string(ma);
+            if (strncmp("distress", sym.start, sym.size)) return "Bad ingress";
+            //fprintf(stderr, "[%.*s]\n", (int)sym.size, sym.start);
+        } else if (!strncmp(QD_MA_TO, sym.start, sym.size)) {
+            pn_data_next(ma);
+            sym = pn_data_get_string(ma);
+            if (strncmp("to/address", sym.start, sym.size)) return "Bad to override";
+            //fprintf(stderr, "[%.*s]\n", (int)sym.size, sym.start);
+        } else if (!strncmp(QD_MA_TRACE, sym.start, sym.size)) {
+            pn_data_next(ma);
+            if (pn_data_type(ma) != PN_LIST) return "List not found";
+            pn_data_enter(ma);
+            pn_data_next(ma);
+            sym = pn_data_get_string(ma);
+            if (strncmp("Node1", sym.start, sym.size)) return "Bad trace entry";
+            //fprintf(stderr, "[%.*s]\n", (int)sym.size, sym.start);
+            pn_data_next(ma);
+            sym = pn_data_get_string(ma);
+            if (strncmp("Node2", sym.start, sym.size)) return "Bad trace entry";
+            //fprintf(stderr, "[%.*s]\n", (int)sym.size, sym.start);
+            pn_data_exit(ma);
+        } else return "Unexpected map key";
+    }
+
+    pn_message_free(pn_msg);
+    qd_message_free(msg);
+
+    return 0;
+}
+
+
 int message_tests(void)
 {
     int result = 0;
@@ -220,6 +297,7 @@ int message_tests(void)
     TEST_CASE(test_receive_from_messenger, 0);
     TEST_CASE(test_message_properties, 0);
     TEST_CASE(test_check_multiple, 0);
+    TEST_CASE(test_send_message_annotations, 0);
 
     return result;
 }

Modified: qpid/dispatch/trunk/tests/run_unit_tests_size.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/run_unit_tests_size.c?rev=1666279&r1=1666278&r2=1666279&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/run_unit_tests_size.c (original)
+++ qpid/dispatch/trunk/tests/run_unit_tests_size.c Thu Mar 12 19:32:16 2015
@@ -22,6 +22,7 @@
 
 int message_tests();
 int field_tests();
+int buffer_tests();
 
 int main(int argc, char** argv)
 {
@@ -39,8 +40,9 @@ int main(int argc, char** argv)
     int result = 0;
     result += message_tests();
     result += field_tests();
-    qd_alloc_finalize();
+    result += buffer_tests();
 
+    qd_alloc_finalize();
     return result;
 }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org