You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2016/09/28 19:53:35 UTC

qpid-dispatch git commit: DISPATCH-160 - Preserve non-reserved message annotations crossing router. Patch from Ganesh Murthy. This closes #101

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master 5bf550fb2 -> 0545502f8


DISPATCH-160 - Preserve non-reserved message annotations crossing router.
Patch from Ganesh Murthy.
This closes #101


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/0545502f
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/0545502f
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/0545502f

Branch: refs/heads/master
Commit: 0545502f8670a710447c4f2a9e8d5c72539273e3
Parents: 5bf550f
Author: Ted Ross <tr...@redhat.com>
Authored: Wed Sep 28 15:52:21 2016 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Wed Sep 28 15:52:21 2016 -0400

----------------------------------------------------------------------
 include/qpid/dispatch/amqp.h      |   1 +
 include/qpid/dispatch/iterator.h  |   9 ++
 src/amqp.c                        |   1 +
 src/iterator.c                    |   9 ++
 src/message.c                     | 196 ++++++++++++++++++++-------------
 src/parse.c                       |  76 ++++++++-----
 tests/message_test.c              |   2 -
 tests/system_tests_one_router.py  | 101 +++++++++++------
 tests/system_tests_two_routers.py |  88 +++++++++++----
 9 files changed, 319 insertions(+), 164 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0545502f/include/qpid/dispatch/amqp.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/amqp.h b/include/qpid/dispatch/amqp.h
index 774a431..3f9c778 100644
--- a/include/qpid/dispatch/amqp.h
+++ b/include/qpid/dispatch/amqp.h
@@ -91,6 +91,7 @@ enum {
 
 /** @name Message Annotation Headers */
 /// @{
+extern const char * const QD_MA_PREFIX;
 extern const char * const QD_MA_INGRESS;  ///< Ingress Router
 extern const char * const QD_MA_TRACE;    ///< Trace
 extern const char * const QD_MA_TO;       ///< To-Override

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0545502f/include/qpid/dispatch/iterator.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/iterator.h b/include/qpid/dispatch/iterator.h
index 2c490e3..b2669ef 100644
--- a/include/qpid/dispatch/iterator.h
+++ b/include/qpid/dispatch/iterator.h
@@ -181,6 +181,15 @@ void qd_address_iterator_set_phase(qd_field_iterator_t *iter, char phase);
 void qd_address_iterator_override_prefix(qd_field_iterator_t *iter, char prefix);
 
 /**
+ * Trims octets from the end of the iterator's field by reducing the length of the iterator.
+ *
+ * @param iter - the iterator whose length should be trimmed
+ * @param length - the length of the trimmed field.  If greater than or equal to the current length,
+ *                 then there shall be no effect.
+ */
+void qd_field_iterator_trim(qd_field_iterator_t *iter, int length);
+
+/**
  * Return the current octet in the iterator's view and step to the next.
  */
 unsigned char qd_field_iterator_octet(qd_field_iterator_t *iter);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0545502f/src/amqp.c
----------------------------------------------------------------------
diff --git a/src/amqp.c b/src/amqp.c
index 4602fe3..9568800 100644
--- a/src/amqp.c
+++ b/src/amqp.c
@@ -19,6 +19,7 @@
 
 #include <qpid/dispatch/amqp.h>
 
+const char * const QD_MA_PREFIX  = "x-opt-qd.";
 const char * const QD_MA_INGRESS = "x-opt-qd.ingress";
 const char * const QD_MA_TRACE   = "x-opt-qd.trace";
 const char * const QD_MA_TO      = "x-opt-qd.to";

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0545502f/src/iterator.c
----------------------------------------------------------------------
diff --git a/src/iterator.c b/src/iterator.c
index 75d0196..8ae5abf 100644
--- a/src/iterator.c
+++ b/src/iterator.c
@@ -444,6 +444,15 @@ void qd_address_iterator_set_phase(qd_field_iterator_t *iter, char phase)
     iter->phase = phase;
 }
 
+void qd_field_iterator_trim(qd_field_iterator_t *iter, int length)
+{
+    if (qd_field_iterator_length(iter) > length) {
+        iter->start_pointer        = iter->pointer;
+        iter->start_pointer.length = length;
+        iter->view_start_pointer   = iter->start_pointer;
+        iter->pointer              = iter->start_pointer;
+    }
+}
 
 void qd_address_iterator_override_prefix(qd_field_iterator_t *iter, char prefix)
 {

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0545502f/src/message.c
----------------------------------------------------------------------
diff --git a/src/message.c b/src/message.c
index ab16e9d..33f0ea9 100644
--- a/src/message.c
+++ b/src/message.c
@@ -787,44 +787,79 @@ static void send_handler(void *context, const unsigned char *start, int length)
     pn_link_send(pnl, (const char*) start, length);
 }
 
+
 // create a buffer chain holding the outgoing message annotations section
-static bool compose_message_annotations(qd_message_pvt_t *msg, qd_buffer_list_t *out)
+static void compose_message_annotations(qd_message_pvt_t *msg, qd_buffer_list_t *out, bool strip_annotations)
 {
-    if (!DEQ_IS_EMPTY(msg->ma_to_override) ||
-        !DEQ_IS_EMPTY(msg->ma_trace) ||
-        !DEQ_IS_EMPTY(msg->ma_ingress)) {
+    qd_composed_field_t *out_ma = qd_compose(QD_PERFORMATIVE_MESSAGE_ANNOTATIONS, 0);
 
-        qd_composed_field_t *out_ma = qd_compose(QD_PERFORMATIVE_MESSAGE_ANNOTATIONS, 0);
-        qd_compose_start_map(out_ma);
+    bool map_started = false;
 
-        if (!DEQ_IS_EMPTY(msg->ma_to_override)) {
-            qd_compose_insert_symbol(out_ma, QD_MA_TO);
-            qd_compose_insert_buffers(out_ma, &msg->ma_to_override);
-        }
+    //We will have to add the custom annotations
+    qd_parsed_field_t   *in_ma = msg->content->parsed_message_annotations;
 
-        if (!DEQ_IS_EMPTY(msg->ma_trace)) {
-            qd_compose_insert_symbol(out_ma, QD_MA_TRACE);
-            qd_compose_insert_buffers(out_ma, &msg->ma_trace);
-        }
+    if (in_ma) {
+        uint32_t count = qd_parse_sub_count(in_ma);
+
+        for (uint32_t idx = 0; idx < count; idx++) {
+            qd_parsed_field_t *sub_key  = qd_parse_sub_key(in_ma, idx);
+            if (!sub_key)
+                continue;
 
-        if (!DEQ_IS_EMPTY(msg->ma_ingress)) {
-            qd_compose_insert_symbol(out_ma, QD_MA_INGRESS);
-            qd_compose_insert_buffers(out_ma, &msg->ma_ingress);
+            qd_field_iterator_t *iter = qd_parse_raw(sub_key);
+
+            if (!qd_field_iterator_prefix(iter, QD_MA_PREFIX)) {
+                if (!map_started) {
+                    qd_compose_start_map(out_ma);
+                    map_started = true;
+                }
+                qd_parsed_field_t *sub_value = qd_parse_sub_value(in_ma, idx);
+                qd_compose_insert_typed_iterator(out_ma, qd_parse_typed(sub_key));
+                qd_compose_insert_typed_iterator(out_ma, qd_parse_typed(sub_value));
+            }
         }
+    }
+
+    //Add the dispatch router specific annotations only if strip_annotations is false.
+    if (!strip_annotations) {
+        if (!DEQ_IS_EMPTY(msg->ma_to_override) ||
+            !DEQ_IS_EMPTY(msg->ma_trace) ||
+            !DEQ_IS_EMPTY(msg->ma_ingress) ||
+            msg->ma_phase != 0) {
+
+            if (!map_started) {
+                qd_compose_start_map(out_ma);
+                map_started = true;
+            }
+
+            if (!DEQ_IS_EMPTY(msg->ma_to_override)) {
+                qd_compose_insert_symbol(out_ma, QD_MA_TO);
+                qd_compose_insert_buffers(out_ma, &msg->ma_to_override);
+            }
+
+            if (!DEQ_IS_EMPTY(msg->ma_trace)) {
+                qd_compose_insert_symbol(out_ma, QD_MA_TRACE);
+                qd_compose_insert_buffers(out_ma, &msg->ma_trace);
+            }
+
+            if (!DEQ_IS_EMPTY(msg->ma_ingress)) {
+                qd_compose_insert_symbol(out_ma, QD_MA_INGRESS);
+                qd_compose_insert_buffers(out_ma, &msg->ma_ingress);
+            }
 
-        if (msg->ma_phase != 0) {
-            qd_compose_insert_symbol(out_ma, QD_MA_PHASE);
-            qd_compose_insert_int(out_ma, msg->ma_phase);
+            if (msg->ma_phase != 0) {
+                qd_compose_insert_symbol(out_ma, QD_MA_PHASE);
+                qd_compose_insert_int(out_ma, msg->ma_phase);
+            }
         }
+    }
 
+    if (map_started) {
         qd_compose_end_map(out_ma);
-
         qd_compose_take_buffers(out_ma, out);
-        qd_compose_free(out_ma);
-        return true;
     }
 
-    return false;
+    qd_compose_free(out_ma);
 }
 
 void qd_message_send(qd_message_t *in_msg,
@@ -845,65 +880,68 @@ void qd_message_send(qd_message_t *in_msg,
     qd_buffer_list_t new_ma;
     DEQ_INIT(new_ma);
 
-    if (strip_annotations || compose_message_annotations(msg, &new_ma)) {
-        //
-        // This is the case where the message annotations have been modified.
-        // The message send must be divided into sections:  The existing header;
-        // the new message annotations; the rest of the existing message.
-        // Note that the original message annotations that are still in the
-        // buffer chain must not be sent.
-        //
-        // Start by making sure that we've parsed the message sections through
-        // the message annotations
-        //
-        // ??? NO LONGER NECESSARY???
-        if (!qd_message_check(in_msg, QD_DEPTH_MESSAGE_ANNOTATIONS)) {
-            qd_log(log_source, QD_LOG_ERROR, "Cannot send: %s", qd_error_message);
-            return;
-        }
+    // Process  the message annotations if any
+    compose_message_annotations(msg, &new_ma, strip_annotations);
 
-        //
-        // Send header if present
-        //
-        cursor = qd_buffer_base(buf);
-        if (content->section_message_header.length > 0) {
-            buf    = content->section_message_header.buffer;
-            cursor = content->section_message_header.offset + qd_buffer_base(buf);
-            advance(&cursor, &buf,
-                    content->section_message_header.length + content->section_message_header.hdr_length,
-                    send_handler, (void*) pnl);
-        }
+    //
+    // This is the case where the message annotations have been modified.
+    // The message send must be divided into sections:  The existing header;
+    // the new message annotations; the rest of the existing message.
+    // Note that the original message annotations that are still in the
+    // buffer chain must not be sent.
+    //
+    // Start by making sure that we've parsed the message sections through
+    // the message annotations
+    //
+    // ??? NO LONGER NECESSARY???
+    if (!qd_message_check(in_msg, QD_DEPTH_MESSAGE_ANNOTATIONS)) {
+        qd_log(log_source, QD_LOG_ERROR, "Cannot send: %s", qd_error_message);
+        return;
+    }
 
-        //
-        // Send new message annotations
-        //
-        qd_buffer_t *da_buf = DEQ_HEAD(new_ma);
-        while (da_buf) {
-            pn_link_send(pnl, (char*) qd_buffer_base(da_buf), qd_buffer_size(da_buf));
-            da_buf = DEQ_NEXT(da_buf);
-        }
-        qd_buffer_list_free_buffers(&new_ma);
+    //
+    // Send header if present
+    //
+    cursor = qd_buffer_base(buf);
+    if (content->section_message_header.length > 0) {
+        buf    = content->section_message_header.buffer;
+        cursor = content->section_message_header.offset + qd_buffer_base(buf);
+        advance(&cursor, &buf,
+                content->section_message_header.length + content->section_message_header.hdr_length,
+                send_handler, (void*) pnl);
+    }
 
-        //
-        // Skip over replaced message annotations
-        //
-        if (content->section_message_annotation.length > 0)
-            advance(&cursor, &buf,
-                    content->section_message_annotation.hdr_length + content->section_message_annotation.length,
-                    0, 0);
+    //
+    // Send new message annotations
+    //
+    qd_buffer_t *da_buf = DEQ_HEAD(new_ma);
+    while (da_buf) {
+        char *to_send = (char*) qd_buffer_base(da_buf);
+        pn_link_send(pnl, to_send, qd_buffer_size(da_buf));
+        da_buf = DEQ_NEXT(da_buf);
+    }
+    qd_buffer_list_free_buffers(&new_ma);
 
-        //
-        // Send remaining partial buffer
-        //
-        if (buf) {
-            size_t len = qd_buffer_size(buf) - (cursor - qd_buffer_base(buf));
-            advance(&cursor, &buf, len, send_handler, (void*) pnl);
-        }
+    //
+    // Skip over replaced message annotations
+    //
+    if (content->section_message_annotation.length > 0)
+        advance(&cursor, &buf,
+                content->section_message_annotation.hdr_length + content->section_message_annotation.length,
+                0, 0);
 
-        // Fall through to process the remaining buffers normally
-        // Note that 'advance' will have moved us to the next buffer in the chain.
+    //
+    // Send remaining partial buffer
+    //
+    if (buf) {
+        size_t len = qd_buffer_size(buf) - (cursor - qd_buffer_base(buf));
+        advance(&cursor, &buf, len, send_handler, (void*) pnl);
     }
 
+    // Fall through to process the remaining buffers normally
+    // Note that 'advance' will have moved us to the next buffer in the chain.
+
+
     while (buf) {
         pn_link_send(pnl, (char*) qd_buffer_base(buf), qd_buffer_size(buf));
         buf = DEQ_NEXT(buf);
@@ -1140,9 +1178,9 @@ void qd_message_compose_1(qd_message_t *msg, const char *to, qd_buffer_list_t *b
     qd_compose_end_list(field);
 
     qd_buffer_list_t out_ma;
-    if (compose_message_annotations((qd_message_pvt_t*)msg, &out_ma)) {
-        qd_compose_insert_buffers(field, &out_ma);
-    }
+    DEQ_INIT(out_ma);
+    compose_message_annotations((qd_message_pvt_t*)msg, &out_ma, false);
+    qd_compose_insert_buffers(field, &out_ma);
 
     field = qd_compose(QD_PERFORMATIVE_PROPERTIES, field);
     qd_compose_start_list(field);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0545502f/src/parse.c
----------------------------------------------------------------------
diff --git a/src/parse.c b/src/parse.c
index 7a633f1..091c32e 100644
--- a/src/parse.c
+++ b/src/parse.c
@@ -37,29 +37,48 @@ struct qd_parsed_field_t {
 ALLOC_DECLARE(qd_parsed_field_t);
 ALLOC_DEFINE(qd_parsed_field_t);
 
-
-static char *get_type_info(qd_field_iterator_t *iter, uint8_t *tag, uint32_t *length, uint32_t *count, uint32_t *clen)
+/**
+ * size = the number of bytes following the tag
+ * count = the number of elements. Applies only to compound structures
+ */
+static char *get_type_info(qd_field_iterator_t *iter, uint8_t *tag, uint32_t *size, uint32_t *count, uint32_t *length_of_size, uint32_t *length_of_count)
 {
     if (qd_field_iterator_end(iter))
         return "Insufficient Data to Determine Tag";
-    *tag      = qd_field_iterator_octet(iter);
-    *count    = 0;
-    *length   = 0;
-    *clen     = 0;
+
+    *tag             = qd_field_iterator_octet(iter);
+    *count           = 0;
+    *size            = 0;
+    *length_of_count = 0;
+    *length_of_size  = 0;
+
 
     switch (*tag & 0xF0) {
-    case 0x40: *length = 0;  break;
-    case 0x50: *length = 1;  break;
-    case 0x60: *length = 2;  break;
-    case 0x70: *length = 4;  break;
-    case 0x80: *length = 8;  break;
-    case 0x90: *length = 16; break;
+    case 0x40:
+        *size = 0;
+        break;
+    case 0x50:
+        *size = 1;
+        break;
+    case 0x60:
+        *size = 2;
+        break;
+    case 0x70:
+        *size = 4;
+        break;
+    case 0x80:
+        *size = 8;
+        break;
+    case 0x90:
+        *size = 16;
+        break;
     case 0xB0:
     case 0xD0:
     case 0xF0:
-        *length += ((unsigned int) qd_field_iterator_octet(iter)) << 24;
-        *length += ((unsigned int) qd_field_iterator_octet(iter)) << 16;
-        *length += ((unsigned int) qd_field_iterator_octet(iter)) << 8;
+        *size += ((unsigned int) qd_field_iterator_octet(iter)) << 24;
+        *size += ((unsigned int) qd_field_iterator_octet(iter)) << 16;
+        *size += ((unsigned int) qd_field_iterator_octet(iter)) << 8;
+        *length_of_size = 3;
         // fall through to the next case
 
     case 0xA0:
@@ -67,7 +86,8 @@ static char *get_type_info(qd_field_iterator_t *iter, uint8_t *tag, uint32_t *le
     case 0xE0:
         if (qd_field_iterator_end(iter))
             return "Insufficient Data to Determine Length";
-        *length += (unsigned int) qd_field_iterator_octet(iter);
+        *size += (unsigned int) qd_field_iterator_octet(iter);
+        *length_of_size += 1;
         break;
 
     default:
@@ -80,7 +100,7 @@ static char *get_type_info(qd_field_iterator_t *iter, uint8_t *tag, uint32_t *le
         *count += ((unsigned int) qd_field_iterator_octet(iter)) << 24;
         *count += ((unsigned int) qd_field_iterator_octet(iter)) << 16;
         *count += ((unsigned int) qd_field_iterator_octet(iter)) << 8;
-        *clen = 3;
+        *length_of_count = 3;
         // fall through to the next case
 
     case 0xC0:
@@ -88,20 +108,19 @@ static char *get_type_info(qd_field_iterator_t *iter, uint8_t *tag, uint32_t *le
         if (qd_field_iterator_end(iter))
             return "Insufficient Data to Determine Count";
         *count += (unsigned int) qd_field_iterator_octet(iter);
-        *clen += 1;
+        *length_of_count += 1;
         break;
     }
 
     if ((*tag == QD_AMQP_MAP8 || *tag == QD_AMQP_MAP32) && (*count & 1))
         return "Odd Number of Elements in a Map";
 
-    if (*clen > *length)
+    if (*length_of_count > *size)
         return "Insufficient Length to Determine Count";
 
     return 0;
 }
 
-
 static qd_parsed_field_t *qd_parse_internal(qd_field_iterator_t *iter, qd_parsed_field_t *p)
 {
     qd_parsed_field_t *field = new_qd_parsed_field_t();
@@ -114,15 +133,20 @@ static qd_parsed_field_t *qd_parse_internal(qd_field_iterator_t *iter, qd_parsed
     field->raw_iter = 0;
     field->typed_iter = qd_field_iterator_dup(iter);
 
-    uint32_t length;
-    uint32_t count;
-    uint32_t length_of_count;
+    uint32_t size            = 0;
+    uint32_t count           = 0;
+    uint32_t length_of_count = 0;
+    uint32_t length_of_size  = 0;
 
-    field->parse_error = get_type_info(iter, &field->tag, &length, &count, &length_of_count);
+    field->parse_error = get_type_info(iter, &field->tag, &size, &count, &length_of_size, &length_of_count);
 
     if (!field->parse_error) {
-        field->raw_iter = qd_field_iterator_sub(iter, length);
-        qd_field_iterator_advance(iter, length - length_of_count);
+        qd_field_iterator_trim(field->typed_iter, size + length_of_size + 1); // + 1 accounts for the tag length
+
+        field->raw_iter = qd_field_iterator_sub(iter, size - length_of_count);
+
+        qd_field_iterator_advance(iter, size - length_of_count);
+
         for (uint32_t idx = 0; idx < count; idx++) {
             qd_parsed_field_t *child = qd_parse_internal(field->raw_iter, field);
             DEQ_INSERT_TAIL(field->children, child);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0545502f/tests/message_test.c
----------------------------------------------------------------------
diff --git a/tests/message_test.c b/tests/message_test.c
index 7fd4cd0..7984d87 100644
--- a/tests/message_test.c
+++ b/tests/message_test.c
@@ -65,7 +65,6 @@ static char* test_send_to_messenger(void *context)
 {
     qd_message_t         *msg     = qd_message();
     qd_message_content_t *content = MSG_CONTENT(msg);
-
     qd_message_compose_1(msg, "test_addr_0", 0);
     qd_buffer_t *buf = DEQ_HEAD(content->buffers);
     if (buf == 0) return "Expected a buffer in the test message";
@@ -289,7 +288,6 @@ static char* test_send_message_annotations(void *context)
 int message_tests(void)
 {
     int result = 0;
-
     TEST_CASE(test_send_to_messenger, 0);
     TEST_CASE(test_receive_from_messenger, 0);
     TEST_CASE(test_message_properties, 0);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0545502f/tests/system_tests_one_router.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py
index e3dd981..d993549 100644
--- a/tests/system_tests_one_router.py
+++ b/tests/system_tests_one_router.py
@@ -356,7 +356,6 @@ class RouterTest(TestCase):
         M1 = self.messenger()
         M2 = self.messenger()
 
-
         M1.start()
         M2.start()
         M2.subscribe(addr)
@@ -366,10 +365,9 @@ class RouterTest(TestCase):
 
         tm.address = addr
 
-
-        ##
-        ## No inbound delivery annotations
-        ##
+        #
+        # No inbound delivery annotations
+        #
         for i in range(10):
             tm.body = {'number': i}
             M1.put(tm)
@@ -384,9 +382,9 @@ class RouterTest(TestCase):
             self.assertEqual(ma['x-opt-qd.ingress'], '0/QDR')
             self.assertEqual(ma['x-opt-qd.trace'], ['0/QDR'])
 
-        ##
-        ## Pre-existing ingress
-        ##
+        #
+        # Pre-existing ingress
+        #
         tm.annotations = {'x-opt-qd.ingress': 'ingress-router'}
         for i in range(10):
             tm.body = {'number': i}
@@ -402,9 +400,9 @@ class RouterTest(TestCase):
             self.assertEqual(ma['x-opt-qd.ingress'], 'ingress-router')
             self.assertEqual(ma['x-opt-qd.trace'], ['0/QDR'])
 
-        ##
-        ## Invalid trace type
-        ##
+        #
+        # Invalid trace type
+        #
         tm.annotations = {'x-opt-qd.trace' : 45}
         for i in range(10):
             tm.body = {'number': i}
@@ -420,9 +418,9 @@ class RouterTest(TestCase):
             self.assertEqual(ma['x-opt-qd.ingress'], '0/QDR')
             self.assertEqual(ma['x-opt-qd.trace'], ['0/QDR'])
 
-        ##
-        ## Empty trace
-        ##
+        #
+        # Empty trace
+        #
         tm.annotations = {'x-opt-qd.trace' : []}
         for i in range(10):
             tm.body = {'number': i}
@@ -438,9 +436,9 @@ class RouterTest(TestCase):
             self.assertEqual(ma['x-opt-qd.ingress'], '0/QDR')
             self.assertEqual(ma['x-opt-qd.trace'], ['0/QDR'])
 
-        ##
-        ## Non-empty trace
-        ##
+        #
+        # Non-empty trace
+        #
         tm.annotations = {'x-opt-qd.trace' : ['0/first.hop']}
         for i in range(10):
             tm.body = {'number': i}
@@ -464,8 +462,7 @@ class RouterTest(TestCase):
     # The default for stripAnnotations is "both" (which means strip annotations on both ingress and egress)
     # This test will test the stripAnnotations = no option - meaning no annotations must be stripped.
     # We will send in a custom annotation and make that we get back 3 annotations on the received message
-    # Skipping this test temporarily
-    def notest_08a_test_strip_message_annotations_no_custom_not_implemented(self):
+    def test_08a_strip_message_annotations_custom(self):
         addr = self.router.addresses[1]+"/strip_message_annotations_no_custom/1"
 
         M1 = self.messenger()
@@ -481,7 +478,6 @@ class RouterTest(TestCase):
         ingress_message_annotations = {}
         ingress_message_annotations['custom-annotation'] = '1/Custom_Annotation'
 
-
         ingress_message.annotations = ingress_message_annotations
 
         M1.put(ingress_message)
@@ -492,10 +488,9 @@ class RouterTest(TestCase):
         egress_message = Message()
         M2.get(egress_message)
 
-        #Make sure 'Hello World!' is in the message body dict
+        # Make sure 'Hello World!' is in the message body dict
         self.assertEqual('Hello World!', egress_message.body['message'])
 
-
         egress_message_annotations = egress_message.annotations
 
         self.assertEqual(egress_message_annotations.__class__, dict)
@@ -506,7 +501,7 @@ class RouterTest(TestCase):
         M1.stop()
         M2.stop()
 
-    #stripAnnotations property is set to "no"
+    # stripAnnotations property is set to "no"
     def test_08a_test_strip_message_annotations_no(self):
         addr = self.router.addresses[1]+"/strip_message_annotations_no/1"
 
@@ -535,7 +530,6 @@ class RouterTest(TestCase):
         #Make sure 'Hello World!' is in the message body dict
         self.assertEqual('Hello World!', egress_message.body['message'])
 
-
         egress_message_annotations = egress_message.annotations
 
         self.assertEqual(egress_message_annotations.__class__, dict)
@@ -545,7 +539,7 @@ class RouterTest(TestCase):
         M1.stop()
         M2.stop()
 
-    #stripAnnotations property is set to "no"
+    # stripAnnotations property is set to "no"
     def test_08a_test_strip_message_annotations_no_add_trace(self):
         addr = self.router.addresses[1]+"/strip_message_annotations_no_add_trace/1"
 
@@ -560,12 +554,12 @@ class RouterTest(TestCase):
         ingress_message.address = addr
         ingress_message.body = {'message': 'Hello World!'}
 
-        ##
-        ## Pre-existing ingress and trace
-        ##
-        ingress_message_annotations = {'x-opt-qd.ingress': 'ingress-router', 'x-opt-qd.trace': ['0/QDR.1']}
-        ingress_message.annotations = ingress_message_annotations
-
+        #
+        # Pre-existing ingress and trace
+        #
+        ingress_message_annotations = {'x-opt-qd.ingress': 'ingress-router',
+                                       'x-opt-qd.trace': ['0/QDR.1'],
+                                       'work': 'hard'}
         ingress_message.annotations = ingress_message_annotations
 
         M1.put(ingress_message)
@@ -576,22 +570,22 @@ class RouterTest(TestCase):
         egress_message = Message()
         M2.get(egress_message)
 
-        #Make sure 'Hello World!' is in the message body dict
+        # Make sure 'Hello World!' is in the message body dict
         self.assertEqual('Hello World!', egress_message.body['message'])
 
-
         egress_message_annotations = egress_message.annotations
 
         self.assertEqual(egress_message_annotations.__class__, dict)
         self.assertEqual(egress_message_annotations['x-opt-qd.ingress'], 'ingress-router')
+        # Make sure the user defined annotation also makes it out.
+        self.assertEqual(egress_message_annotations['work'], 'hard')
         self.assertEqual(egress_message_annotations['x-opt-qd.trace'], ['0/QDR.1', '0/QDR'])
 
         M1.stop()
         M2.stop()
 
-
-    #Dont send any pre-existing ingress or trace annotations. Make sure that there are no outgoing message annotations
-    #stripAnnotations property is set to "both"
+    # Dont send any pre-existing ingress or trace annotations. Make sure that there are no outgoing message annotations
+    # stripAnnotations property is set to "both"
     def test_08a_test_strip_message_annotations_both(self):
         addr = self.router.addresses[2]+"/strip_message_annotations_both/1"
 
@@ -620,6 +614,41 @@ class RouterTest(TestCase):
         M1.stop()
         M2.stop()
 
+    # Dont send any pre-existing ingress or trace annotations. Send in a custom annotation.
+    # Make sure that the custom annotation comes out and nothing else.
+    # stripAnnotations property is set to "both"
+    def test_08a_test_strip_message_annotations_both_custom(self):
+        addr = self.router.addresses[2]+"/strip_message_annotations_both/1"
+
+        M1 = self.messenger()
+        M2 = self.messenger()
+
+        M1.start()
+        M2.start()
+        M2.subscribe(addr)
+
+        ingress_message = Message()
+        ingress_message.address = addr
+        ingress_message.body = {'message': 'Hello World!'}
+
+        # Only annotations with prefix "x-opt-qd." will be stripped
+        ingress_message_annotations = {'stay': 'humble', 'x-opt-qd': 'work'}
+        ingress_message.annotations = ingress_message_annotations
+
+        #Put and send the message
+        M1.put(ingress_message)
+        M1.send()
+
+        # Receive the message
+        M2.recv(1)
+        egress_message = Message()
+        M2.get(egress_message)
+
+        self.assertEqual(egress_message.annotations, ingress_message_annotations)
+
+        M1.stop()
+        M2.stop()
+
     #Dont send any pre-existing ingress or trace annotations. Make sure that there are no outgoing message annotations
     #stripAnnotations property is set to "out"
     def test_08a_test_strip_message_annotations_out(self):

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0545502f/tests/system_tests_two_routers.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_two_routers.py b/tests/system_tests_two_routers.py
index fdc5534..db82838 100644
--- a/tests/system_tests_two_routers.py
+++ b/tests/system_tests_two_routers.py
@@ -416,7 +416,7 @@ class RouterTest(TestCase):
         ingress_message = Message()
         ingress_message.address = addr
         ingress_message.body = {'message': 'Hello World!'}
-        ingress_message_annotations = {}        
+        ingress_message_annotations = {'work': 'hard', 'stay': 'humble'}
         
         ingress_message.annotations = ingress_message_annotations
         
@@ -436,14 +436,17 @@ class RouterTest(TestCase):
         
         self.assertEqual(egress_message_annotations.__class__, dict)
         self.assertEqual(egress_message_annotations['x-opt-qd.ingress'], '0/QDR.A')
+        self.assertEqual(egress_message_annotations['work'], 'hard')
+        self.assertEqual(egress_message_annotations['stay'], 'humble')
         self.assertEqual(egress_message_annotations['x-opt-qd.trace'], ['0/QDR.A', '0/QDR.B'])
         
         M1.stop()
         M2.stop()
         
-    #This unit test is currently skipped because dispatch router do not pass thru custom message annotations. Once the feature is added the @unittest.skip decorator can be removed.
-    #The stripAnnotations property is set to 'no'
-    def notest_08a_test_strip_message_annotations_no_custom_not_implemented(self):
+    # This unit test is currently skipped because dispatch router do not pass thru custom message annotations.
+    # Once the feature is added the @unittest.skip decorator can be removed.
+    # The stripAnnotations property is set to 'no'
+    def test_08a_strip_message_annotations_custom(self):
         addr = "amqp:/message_annotations_strip_no_custom/1"
         
         M1 = self.messenger()
@@ -462,8 +465,7 @@ class RouterTest(TestCase):
         ingress_message.body = {'message': 'Hello World!'}
         ingress_message_annotations = {}
         ingress_message_annotations['custom-annotation'] = '1/Custom_Annotation'
-        
-        
+
         ingress_message.annotations = ingress_message_annotations
         
         M1.put(ingress_message)
@@ -474,10 +476,9 @@ class RouterTest(TestCase):
         egress_message = Message()
         M2.get(egress_message)
         
-        #Make sure 'Hello World!' is in the message body dict
+        # Make sure 'Hello World!' is in the message body dict
         self.assertEqual('Hello World!', egress_message.body['message'])
-        
-        
+
         egress_message_annotations = egress_message.annotations
         
         self.assertEqual(egress_message_annotations.__class__, dict)
@@ -537,9 +538,9 @@ class RouterTest(TestCase):
         M1.stop()
         M2.stop()
         
-    #Test to see if the dispatch router specific annotations were stripped.
-    #The stripAnnotations property is set to 'both'
-    #Send a message to the router with pre-existing ingress and trace annotations and make sure that nothing comes out.
+    # Test to see if the dispatch router specific annotations were stripped.
+    # The stripAnnotations property is set to 'both'
+    # Send a message to the router with pre-existing ingress and trace annotations and make sure that nothing comes out.
     def test_08a_test_strip_message_annotations_both_add_ingress_trace(self):
         addr = "amqp:/strip_message_annotations_both_add_ingress_trace/1"
         
@@ -559,10 +560,14 @@ class RouterTest(TestCase):
         ingress_message.body = {'message': 'Hello World!'}
         
         ##
-        ## Pre-existing ingress and trace. Intentionally populate the trace with the 0/QDR.A which is the trace of the first router.
-        ## If the inbound annotations were not stripped, the router would drop this message since it would consider this message as being looped.
-        ##
-        ingress_message_annotations = {'x-opt-qd.ingress': 'ingress-router', 'x-opt-qd.trace': ['0/QDR.A']}
+        # Pre-existing ingress and trace. Intentionally populate the trace with the 0/QDR.A which is the trace
+        # of the first router. If the inbound annotations were not stripped, the router would drop this message
+        # since it would consider this message as being looped.
+        #
+        ingress_message_annotations = {'x-opt-qd.ingress': 'ingress-router',
+                                       'x-opt-qd.trace': ['0/QDR.A'],
+                                       'work': 'hard',
+                                       'x-opt-qd': 'humble'}
         ingress_message.annotations = ingress_message_annotations
         
         #Put and send the message
@@ -573,15 +578,15 @@ class RouterTest(TestCase):
         M2.recv(1)
         egress_message = Message()
         M2.get(egress_message)
-        
-        self.assertEqual(egress_message.annotations, None)
+
+        # Router specific annotations (annotations with prefix "x-opt-qd.") will be stripped. User defined annotations will not be stripped.
+        self.assertEqual(egress_message.annotations, {'work': 'hard', 'x-opt-qd': 'humble'})
         
         M1.stop()
         M2.stop()
 
-
-    #Send in pre-existing trace and ingress and annotations and make sure that there are no outgoing annotations.
-    #stripAnnotations property is set to "in"
+    # Send in pre-existing trace and ingress and annotations and make sure that there are no outgoing annotations.
+    # stripAnnotations property is set to "in"
     def test_08a_test_strip_message_annotations_out(self):
         addr = "amqp:/strip_message_annotations_out/1"
         
@@ -618,6 +623,47 @@ class RouterTest(TestCase):
         
         M1.stop()
         M2.stop()
+
+    # Send in pre-existing trace and ingress and annotations and make sure that there are no outgoing annotations.
+    # stripAnnotations property is set to "in"
+    def test_08a_test_strip_message_annotations_out_custom(self):
+        addr = "amqp:/strip_message_annotations_out/1"
+
+        M1 = self.messenger()
+        M2 = self.messenger()
+
+        M1.route("amqp:/*", self.routers[0].addresses[3]+"/$1")
+        M2.route("amqp:/*", self.routers[1].addresses[3]+"/$1")
+
+        M1.start()
+        M2.start()
+        M2.subscribe(addr)
+        self.routers[0].wait_address("strip_message_annotations_out/1", 0, 1)
+
+        ingress_message = Message()
+        ingress_message.address = addr
+        ingress_message.body = {'message': 'Hello World!'}
+
+        # Annotations with prefix "x-opt-qd." will be skipped
+        ingress_message_annotations = {'work': 'hard', "x-opt-qd": "custom", "x-opt-qd.": "custom"}
+        ingress_message.annotations = ingress_message_annotations
+
+        # Put and send the message
+        M1.put(ingress_message)
+        M1.send()
+
+        # Receive the message
+        egress_message = Message()
+        M2.recv(1)
+        M2.get(egress_message)
+
+        # Make sure 'Hello World!' is in the message body dict
+        self.assertEqual('Hello World!', egress_message.body['message'])
+
+        self.assertEqual(egress_message.annotations, {'work': 'hard', "x-opt-qd": "custom"})
+
+        M1.stop()
+        M2.stop()
     
     #Send in pre-existing trace and ingress and annotations and make sure that they are not in the outgoing annotations.
     #stripAnnotations property is set to "in"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org