You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2016/09/28 19:53:35 UTC
qpid-dispatch git commit: DISPATCH-160 - Preserve non-reserved
message annotations crossing router. Patch from Ganesh Murthy. This closes
#101
Repository: qpid-dispatch
Updated Branches:
refs/heads/master 5bf550fb2 -> 0545502f8
DISPATCH-160 - Preserve non-reserved message annotations crossing router.
Patch from Ganesh Murthy.
This closes #101
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/0545502f
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/0545502f
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/0545502f
Branch: refs/heads/master
Commit: 0545502f8670a710447c4f2a9e8d5c72539273e3
Parents: 5bf550f
Author: Ted Ross <tr...@redhat.com>
Authored: Wed Sep 28 15:52:21 2016 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Wed Sep 28 15:52:21 2016 -0400
----------------------------------------------------------------------
include/qpid/dispatch/amqp.h | 1 +
include/qpid/dispatch/iterator.h | 9 ++
src/amqp.c | 1 +
src/iterator.c | 9 ++
src/message.c | 196 ++++++++++++++++++++-------------
src/parse.c | 76 ++++++++-----
tests/message_test.c | 2 -
tests/system_tests_one_router.py | 101 +++++++++++------
tests/system_tests_two_routers.py | 88 +++++++++++----
9 files changed, 319 insertions(+), 164 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0545502f/include/qpid/dispatch/amqp.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/amqp.h b/include/qpid/dispatch/amqp.h
index 774a431..3f9c778 100644
--- a/include/qpid/dispatch/amqp.h
+++ b/include/qpid/dispatch/amqp.h
@@ -91,6 +91,7 @@ enum {
/** @name Message Annotation Headers */
/// @{
+extern const char * const QD_MA_PREFIX;
extern const char * const QD_MA_INGRESS; ///< Ingress Router
extern const char * const QD_MA_TRACE; ///< Trace
extern const char * const QD_MA_TO; ///< To-Override
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0545502f/include/qpid/dispatch/iterator.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/iterator.h b/include/qpid/dispatch/iterator.h
index 2c490e3..b2669ef 100644
--- a/include/qpid/dispatch/iterator.h
+++ b/include/qpid/dispatch/iterator.h
@@ -181,6 +181,15 @@ void qd_address_iterator_set_phase(qd_field_iterator_t *iter, char phase);
void qd_address_iterator_override_prefix(qd_field_iterator_t *iter, char prefix);
/**
+ * Trims octets from the end of the iterator's field by reducing the length of the iterator.
+ *
+ * @param iter - the iterator whose length should be trimmed
+ * @param length - the length of the trimmed field. If greater than or equal to the current length,
+ * then there shall be no effect.
+ */
+void qd_field_iterator_trim(qd_field_iterator_t *iter, int length);
+
+/**
* Return the current octet in the iterator's view and step to the next.
*/
unsigned char qd_field_iterator_octet(qd_field_iterator_t *iter);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0545502f/src/amqp.c
----------------------------------------------------------------------
diff --git a/src/amqp.c b/src/amqp.c
index 4602fe3..9568800 100644
--- a/src/amqp.c
+++ b/src/amqp.c
@@ -19,6 +19,7 @@
#include <qpid/dispatch/amqp.h>
+const char * const QD_MA_PREFIX = "x-opt-qd.";
const char * const QD_MA_INGRESS = "x-opt-qd.ingress";
const char * const QD_MA_TRACE = "x-opt-qd.trace";
const char * const QD_MA_TO = "x-opt-qd.to";
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0545502f/src/iterator.c
----------------------------------------------------------------------
diff --git a/src/iterator.c b/src/iterator.c
index 75d0196..8ae5abf 100644
--- a/src/iterator.c
+++ b/src/iterator.c
@@ -444,6 +444,15 @@ void qd_address_iterator_set_phase(qd_field_iterator_t *iter, char phase)
iter->phase = phase;
}
+void qd_field_iterator_trim(qd_field_iterator_t *iter, int length)
+{
+ if (qd_field_iterator_length(iter) > length) {
+ iter->start_pointer = iter->pointer;
+ iter->start_pointer.length = length;
+ iter->view_start_pointer = iter->start_pointer;
+ iter->pointer = iter->start_pointer;
+ }
+}
void qd_address_iterator_override_prefix(qd_field_iterator_t *iter, char prefix)
{
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0545502f/src/message.c
----------------------------------------------------------------------
diff --git a/src/message.c b/src/message.c
index ab16e9d..33f0ea9 100644
--- a/src/message.c
+++ b/src/message.c
@@ -787,44 +787,79 @@ static void send_handler(void *context, const unsigned char *start, int length)
pn_link_send(pnl, (const char*) start, length);
}
+
// create a buffer chain holding the outgoing message annotations section
-static bool compose_message_annotations(qd_message_pvt_t *msg, qd_buffer_list_t *out)
+static void compose_message_annotations(qd_message_pvt_t *msg, qd_buffer_list_t *out, bool strip_annotations)
{
- if (!DEQ_IS_EMPTY(msg->ma_to_override) ||
- !DEQ_IS_EMPTY(msg->ma_trace) ||
- !DEQ_IS_EMPTY(msg->ma_ingress)) {
+ qd_composed_field_t *out_ma = qd_compose(QD_PERFORMATIVE_MESSAGE_ANNOTATIONS, 0);
- qd_composed_field_t *out_ma = qd_compose(QD_PERFORMATIVE_MESSAGE_ANNOTATIONS, 0);
- qd_compose_start_map(out_ma);
+ bool map_started = false;
- if (!DEQ_IS_EMPTY(msg->ma_to_override)) {
- qd_compose_insert_symbol(out_ma, QD_MA_TO);
- qd_compose_insert_buffers(out_ma, &msg->ma_to_override);
- }
+ //We will have to add the custom annotations
+ qd_parsed_field_t *in_ma = msg->content->parsed_message_annotations;
- if (!DEQ_IS_EMPTY(msg->ma_trace)) {
- qd_compose_insert_symbol(out_ma, QD_MA_TRACE);
- qd_compose_insert_buffers(out_ma, &msg->ma_trace);
- }
+ if (in_ma) {
+ uint32_t count = qd_parse_sub_count(in_ma);
+
+ for (uint32_t idx = 0; idx < count; idx++) {
+ qd_parsed_field_t *sub_key = qd_parse_sub_key(in_ma, idx);
+ if (!sub_key)
+ continue;
- if (!DEQ_IS_EMPTY(msg->ma_ingress)) {
- qd_compose_insert_symbol(out_ma, QD_MA_INGRESS);
- qd_compose_insert_buffers(out_ma, &msg->ma_ingress);
+ qd_field_iterator_t *iter = qd_parse_raw(sub_key);
+
+ if (!qd_field_iterator_prefix(iter, QD_MA_PREFIX)) {
+ if (!map_started) {
+ qd_compose_start_map(out_ma);
+ map_started = true;
+ }
+ qd_parsed_field_t *sub_value = qd_parse_sub_value(in_ma, idx);
+ qd_compose_insert_typed_iterator(out_ma, qd_parse_typed(sub_key));
+ qd_compose_insert_typed_iterator(out_ma, qd_parse_typed(sub_value));
+ }
}
+ }
+
+ //Add the dispatch router specific annotations only if strip_annotations is false.
+ if (!strip_annotations) {
+ if (!DEQ_IS_EMPTY(msg->ma_to_override) ||
+ !DEQ_IS_EMPTY(msg->ma_trace) ||
+ !DEQ_IS_EMPTY(msg->ma_ingress) ||
+ msg->ma_phase != 0) {
+
+ if (!map_started) {
+ qd_compose_start_map(out_ma);
+ map_started = true;
+ }
+
+ if (!DEQ_IS_EMPTY(msg->ma_to_override)) {
+ qd_compose_insert_symbol(out_ma, QD_MA_TO);
+ qd_compose_insert_buffers(out_ma, &msg->ma_to_override);
+ }
+
+ if (!DEQ_IS_EMPTY(msg->ma_trace)) {
+ qd_compose_insert_symbol(out_ma, QD_MA_TRACE);
+ qd_compose_insert_buffers(out_ma, &msg->ma_trace);
+ }
+
+ if (!DEQ_IS_EMPTY(msg->ma_ingress)) {
+ qd_compose_insert_symbol(out_ma, QD_MA_INGRESS);
+ qd_compose_insert_buffers(out_ma, &msg->ma_ingress);
+ }
- if (msg->ma_phase != 0) {
- qd_compose_insert_symbol(out_ma, QD_MA_PHASE);
- qd_compose_insert_int(out_ma, msg->ma_phase);
+ if (msg->ma_phase != 0) {
+ qd_compose_insert_symbol(out_ma, QD_MA_PHASE);
+ qd_compose_insert_int(out_ma, msg->ma_phase);
+ }
}
+ }
+ if (map_started) {
qd_compose_end_map(out_ma);
-
qd_compose_take_buffers(out_ma, out);
- qd_compose_free(out_ma);
- return true;
}
- return false;
+ qd_compose_free(out_ma);
}
void qd_message_send(qd_message_t *in_msg,
@@ -845,65 +880,68 @@ void qd_message_send(qd_message_t *in_msg,
qd_buffer_list_t new_ma;
DEQ_INIT(new_ma);
- if (strip_annotations || compose_message_annotations(msg, &new_ma)) {
- //
- // This is the case where the message annotations have been modified.
- // The message send must be divided into sections: The existing header;
- // the new message annotations; the rest of the existing message.
- // Note that the original message annotations that are still in the
- // buffer chain must not be sent.
- //
- // Start by making sure that we've parsed the message sections through
- // the message annotations
- //
- // ??? NO LONGER NECESSARY???
- if (!qd_message_check(in_msg, QD_DEPTH_MESSAGE_ANNOTATIONS)) {
- qd_log(log_source, QD_LOG_ERROR, "Cannot send: %s", qd_error_message);
- return;
- }
+ // Process the message annotations if any
+ compose_message_annotations(msg, &new_ma, strip_annotations);
- //
- // Send header if present
- //
- cursor = qd_buffer_base(buf);
- if (content->section_message_header.length > 0) {
- buf = content->section_message_header.buffer;
- cursor = content->section_message_header.offset + qd_buffer_base(buf);
- advance(&cursor, &buf,
- content->section_message_header.length + content->section_message_header.hdr_length,
- send_handler, (void*) pnl);
- }
+ //
+ // This is the case where the message annotations have been modified.
+ // The message send must be divided into sections: The existing header;
+ // the new message annotations; the rest of the existing message.
+ // Note that the original message annotations that are still in the
+ // buffer chain must not be sent.
+ //
+ // Start by making sure that we've parsed the message sections through
+ // the message annotations
+ //
+ // ??? NO LONGER NECESSARY???
+ if (!qd_message_check(in_msg, QD_DEPTH_MESSAGE_ANNOTATIONS)) {
+ qd_log(log_source, QD_LOG_ERROR, "Cannot send: %s", qd_error_message);
+ return;
+ }
- //
- // Send new message annotations
- //
- qd_buffer_t *da_buf = DEQ_HEAD(new_ma);
- while (da_buf) {
- pn_link_send(pnl, (char*) qd_buffer_base(da_buf), qd_buffer_size(da_buf));
- da_buf = DEQ_NEXT(da_buf);
- }
- qd_buffer_list_free_buffers(&new_ma);
+ //
+ // Send header if present
+ //
+ cursor = qd_buffer_base(buf);
+ if (content->section_message_header.length > 0) {
+ buf = content->section_message_header.buffer;
+ cursor = content->section_message_header.offset + qd_buffer_base(buf);
+ advance(&cursor, &buf,
+ content->section_message_header.length + content->section_message_header.hdr_length,
+ send_handler, (void*) pnl);
+ }
- //
- // Skip over replaced message annotations
- //
- if (content->section_message_annotation.length > 0)
- advance(&cursor, &buf,
- content->section_message_annotation.hdr_length + content->section_message_annotation.length,
- 0, 0);
+ //
+ // Send new message annotations
+ //
+ qd_buffer_t *da_buf = DEQ_HEAD(new_ma);
+ while (da_buf) {
+ char *to_send = (char*) qd_buffer_base(da_buf);
+ pn_link_send(pnl, to_send, qd_buffer_size(da_buf));
+ da_buf = DEQ_NEXT(da_buf);
+ }
+ qd_buffer_list_free_buffers(&new_ma);
- //
- // Send remaining partial buffer
- //
- if (buf) {
- size_t len = qd_buffer_size(buf) - (cursor - qd_buffer_base(buf));
- advance(&cursor, &buf, len, send_handler, (void*) pnl);
- }
+ //
+ // Skip over replaced message annotations
+ //
+ if (content->section_message_annotation.length > 0)
+ advance(&cursor, &buf,
+ content->section_message_annotation.hdr_length + content->section_message_annotation.length,
+ 0, 0);
- // Fall through to process the remaining buffers normally
- // Note that 'advance' will have moved us to the next buffer in the chain.
+ //
+ // Send remaining partial buffer
+ //
+ if (buf) {
+ size_t len = qd_buffer_size(buf) - (cursor - qd_buffer_base(buf));
+ advance(&cursor, &buf, len, send_handler, (void*) pnl);
}
+ // Fall through to process the remaining buffers normally
+ // Note that 'advance' will have moved us to the next buffer in the chain.
+
+
while (buf) {
pn_link_send(pnl, (char*) qd_buffer_base(buf), qd_buffer_size(buf));
buf = DEQ_NEXT(buf);
@@ -1140,9 +1178,9 @@ void qd_message_compose_1(qd_message_t *msg, const char *to, qd_buffer_list_t *b
qd_compose_end_list(field);
qd_buffer_list_t out_ma;
- if (compose_message_annotations((qd_message_pvt_t*)msg, &out_ma)) {
- qd_compose_insert_buffers(field, &out_ma);
- }
+ DEQ_INIT(out_ma);
+ compose_message_annotations((qd_message_pvt_t*)msg, &out_ma, false);
+ qd_compose_insert_buffers(field, &out_ma);
field = qd_compose(QD_PERFORMATIVE_PROPERTIES, field);
qd_compose_start_list(field);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0545502f/src/parse.c
----------------------------------------------------------------------
diff --git a/src/parse.c b/src/parse.c
index 7a633f1..091c32e 100644
--- a/src/parse.c
+++ b/src/parse.c
@@ -37,29 +37,48 @@ struct qd_parsed_field_t {
ALLOC_DECLARE(qd_parsed_field_t);
ALLOC_DEFINE(qd_parsed_field_t);
-
-static char *get_type_info(qd_field_iterator_t *iter, uint8_t *tag, uint32_t *length, uint32_t *count, uint32_t *clen)
+/**
+ * size = the number of bytes following the tag
+ * count = the number of elements. Applies only to compound structures
+ */
+static char *get_type_info(qd_field_iterator_t *iter, uint8_t *tag, uint32_t *size, uint32_t *count, uint32_t *length_of_size, uint32_t *length_of_count)
{
if (qd_field_iterator_end(iter))
return "Insufficient Data to Determine Tag";
- *tag = qd_field_iterator_octet(iter);
- *count = 0;
- *length = 0;
- *clen = 0;
+
+ *tag = qd_field_iterator_octet(iter);
+ *count = 0;
+ *size = 0;
+ *length_of_count = 0;
+ *length_of_size = 0;
+
switch (*tag & 0xF0) {
- case 0x40: *length = 0; break;
- case 0x50: *length = 1; break;
- case 0x60: *length = 2; break;
- case 0x70: *length = 4; break;
- case 0x80: *length = 8; break;
- case 0x90: *length = 16; break;
+ case 0x40:
+ *size = 0;
+ break;
+ case 0x50:
+ *size = 1;
+ break;
+ case 0x60:
+ *size = 2;
+ break;
+ case 0x70:
+ *size = 4;
+ break;
+ case 0x80:
+ *size = 8;
+ break;
+ case 0x90:
+ *size = 16;
+ break;
case 0xB0:
case 0xD0:
case 0xF0:
- *length += ((unsigned int) qd_field_iterator_octet(iter)) << 24;
- *length += ((unsigned int) qd_field_iterator_octet(iter)) << 16;
- *length += ((unsigned int) qd_field_iterator_octet(iter)) << 8;
+ *size += ((unsigned int) qd_field_iterator_octet(iter)) << 24;
+ *size += ((unsigned int) qd_field_iterator_octet(iter)) << 16;
+ *size += ((unsigned int) qd_field_iterator_octet(iter)) << 8;
+ *length_of_size = 3;
// fall through to the next case
case 0xA0:
@@ -67,7 +86,8 @@ static char *get_type_info(qd_field_iterator_t *iter, uint8_t *tag, uint32_t *le
case 0xE0:
if (qd_field_iterator_end(iter))
return "Insufficient Data to Determine Length";
- *length += (unsigned int) qd_field_iterator_octet(iter);
+ *size += (unsigned int) qd_field_iterator_octet(iter);
+ *length_of_size += 1;
break;
default:
@@ -80,7 +100,7 @@ static char *get_type_info(qd_field_iterator_t *iter, uint8_t *tag, uint32_t *le
*count += ((unsigned int) qd_field_iterator_octet(iter)) << 24;
*count += ((unsigned int) qd_field_iterator_octet(iter)) << 16;
*count += ((unsigned int) qd_field_iterator_octet(iter)) << 8;
- *clen = 3;
+ *length_of_count = 3;
// fall through to the next case
case 0xC0:
@@ -88,20 +108,19 @@ static char *get_type_info(qd_field_iterator_t *iter, uint8_t *tag, uint32_t *le
if (qd_field_iterator_end(iter))
return "Insufficient Data to Determine Count";
*count += (unsigned int) qd_field_iterator_octet(iter);
- *clen += 1;
+ *length_of_count += 1;
break;
}
if ((*tag == QD_AMQP_MAP8 || *tag == QD_AMQP_MAP32) && (*count & 1))
return "Odd Number of Elements in a Map";
- if (*clen > *length)
+ if (*length_of_count > *size)
return "Insufficient Length to Determine Count";
return 0;
}
-
static qd_parsed_field_t *qd_parse_internal(qd_field_iterator_t *iter, qd_parsed_field_t *p)
{
qd_parsed_field_t *field = new_qd_parsed_field_t();
@@ -114,15 +133,20 @@ static qd_parsed_field_t *qd_parse_internal(qd_field_iterator_t *iter, qd_parsed
field->raw_iter = 0;
field->typed_iter = qd_field_iterator_dup(iter);
- uint32_t length;
- uint32_t count;
- uint32_t length_of_count;
+ uint32_t size = 0;
+ uint32_t count = 0;
+ uint32_t length_of_count = 0;
+ uint32_t length_of_size = 0;
- field->parse_error = get_type_info(iter, &field->tag, &length, &count, &length_of_count);
+ field->parse_error = get_type_info(iter, &field->tag, &size, &count, &length_of_size, &length_of_count);
if (!field->parse_error) {
- field->raw_iter = qd_field_iterator_sub(iter, length);
- qd_field_iterator_advance(iter, length - length_of_count);
+ qd_field_iterator_trim(field->typed_iter, size + length_of_size + 1); // + 1 accounts for the tag length
+
+ field->raw_iter = qd_field_iterator_sub(iter, size - length_of_count);
+
+ qd_field_iterator_advance(iter, size - length_of_count);
+
for (uint32_t idx = 0; idx < count; idx++) {
qd_parsed_field_t *child = qd_parse_internal(field->raw_iter, field);
DEQ_INSERT_TAIL(field->children, child);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0545502f/tests/message_test.c
----------------------------------------------------------------------
diff --git a/tests/message_test.c b/tests/message_test.c
index 7fd4cd0..7984d87 100644
--- a/tests/message_test.c
+++ b/tests/message_test.c
@@ -65,7 +65,6 @@ static char* test_send_to_messenger(void *context)
{
qd_message_t *msg = qd_message();
qd_message_content_t *content = MSG_CONTENT(msg);
-
qd_message_compose_1(msg, "test_addr_0", 0);
qd_buffer_t *buf = DEQ_HEAD(content->buffers);
if (buf == 0) return "Expected a buffer in the test message";
@@ -289,7 +288,6 @@ static char* test_send_message_annotations(void *context)
int message_tests(void)
{
int result = 0;
-
TEST_CASE(test_send_to_messenger, 0);
TEST_CASE(test_receive_from_messenger, 0);
TEST_CASE(test_message_properties, 0);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0545502f/tests/system_tests_one_router.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py
index e3dd981..d993549 100644
--- a/tests/system_tests_one_router.py
+++ b/tests/system_tests_one_router.py
@@ -356,7 +356,6 @@ class RouterTest(TestCase):
M1 = self.messenger()
M2 = self.messenger()
-
M1.start()
M2.start()
M2.subscribe(addr)
@@ -366,10 +365,9 @@ class RouterTest(TestCase):
tm.address = addr
-
- ##
- ## No inbound delivery annotations
- ##
+ #
+ # No inbound delivery annotations
+ #
for i in range(10):
tm.body = {'number': i}
M1.put(tm)
@@ -384,9 +382,9 @@ class RouterTest(TestCase):
self.assertEqual(ma['x-opt-qd.ingress'], '0/QDR')
self.assertEqual(ma['x-opt-qd.trace'], ['0/QDR'])
- ##
- ## Pre-existing ingress
- ##
+ #
+ # Pre-existing ingress
+ #
tm.annotations = {'x-opt-qd.ingress': 'ingress-router'}
for i in range(10):
tm.body = {'number': i}
@@ -402,9 +400,9 @@ class RouterTest(TestCase):
self.assertEqual(ma['x-opt-qd.ingress'], 'ingress-router')
self.assertEqual(ma['x-opt-qd.trace'], ['0/QDR'])
- ##
- ## Invalid trace type
- ##
+ #
+ # Invalid trace type
+ #
tm.annotations = {'x-opt-qd.trace' : 45}
for i in range(10):
tm.body = {'number': i}
@@ -420,9 +418,9 @@ class RouterTest(TestCase):
self.assertEqual(ma['x-opt-qd.ingress'], '0/QDR')
self.assertEqual(ma['x-opt-qd.trace'], ['0/QDR'])
- ##
- ## Empty trace
- ##
+ #
+ # Empty trace
+ #
tm.annotations = {'x-opt-qd.trace' : []}
for i in range(10):
tm.body = {'number': i}
@@ -438,9 +436,9 @@ class RouterTest(TestCase):
self.assertEqual(ma['x-opt-qd.ingress'], '0/QDR')
self.assertEqual(ma['x-opt-qd.trace'], ['0/QDR'])
- ##
- ## Non-empty trace
- ##
+ #
+ # Non-empty trace
+ #
tm.annotations = {'x-opt-qd.trace' : ['0/first.hop']}
for i in range(10):
tm.body = {'number': i}
@@ -464,8 +462,7 @@ class RouterTest(TestCase):
# The default for stripAnnotations is "both" (which means strip annotations on both ingress and egress)
# This test will test the stripAnnotations = no option - meaning no annotations must be stripped.
# We will send in a custom annotation and make that we get back 3 annotations on the received message
- # Skipping this test temporarily
- def notest_08a_test_strip_message_annotations_no_custom_not_implemented(self):
+ def test_08a_strip_message_annotations_custom(self):
addr = self.router.addresses[1]+"/strip_message_annotations_no_custom/1"
M1 = self.messenger()
@@ -481,7 +478,6 @@ class RouterTest(TestCase):
ingress_message_annotations = {}
ingress_message_annotations['custom-annotation'] = '1/Custom_Annotation'
-
ingress_message.annotations = ingress_message_annotations
M1.put(ingress_message)
@@ -492,10 +488,9 @@ class RouterTest(TestCase):
egress_message = Message()
M2.get(egress_message)
- #Make sure 'Hello World!' is in the message body dict
+ # Make sure 'Hello World!' is in the message body dict
self.assertEqual('Hello World!', egress_message.body['message'])
-
egress_message_annotations = egress_message.annotations
self.assertEqual(egress_message_annotations.__class__, dict)
@@ -506,7 +501,7 @@ class RouterTest(TestCase):
M1.stop()
M2.stop()
- #stripAnnotations property is set to "no"
+ # stripAnnotations property is set to "no"
def test_08a_test_strip_message_annotations_no(self):
addr = self.router.addresses[1]+"/strip_message_annotations_no/1"
@@ -535,7 +530,6 @@ class RouterTest(TestCase):
#Make sure 'Hello World!' is in the message body dict
self.assertEqual('Hello World!', egress_message.body['message'])
-
egress_message_annotations = egress_message.annotations
self.assertEqual(egress_message_annotations.__class__, dict)
@@ -545,7 +539,7 @@ class RouterTest(TestCase):
M1.stop()
M2.stop()
- #stripAnnotations property is set to "no"
+ # stripAnnotations property is set to "no"
def test_08a_test_strip_message_annotations_no_add_trace(self):
addr = self.router.addresses[1]+"/strip_message_annotations_no_add_trace/1"
@@ -560,12 +554,12 @@ class RouterTest(TestCase):
ingress_message.address = addr
ingress_message.body = {'message': 'Hello World!'}
- ##
- ## Pre-existing ingress and trace
- ##
- ingress_message_annotations = {'x-opt-qd.ingress': 'ingress-router', 'x-opt-qd.trace': ['0/QDR.1']}
- ingress_message.annotations = ingress_message_annotations
-
+ #
+ # Pre-existing ingress and trace
+ #
+ ingress_message_annotations = {'x-opt-qd.ingress': 'ingress-router',
+ 'x-opt-qd.trace': ['0/QDR.1'],
+ 'work': 'hard'}
ingress_message.annotations = ingress_message_annotations
M1.put(ingress_message)
@@ -576,22 +570,22 @@ class RouterTest(TestCase):
egress_message = Message()
M2.get(egress_message)
- #Make sure 'Hello World!' is in the message body dict
+ # Make sure 'Hello World!' is in the message body dict
self.assertEqual('Hello World!', egress_message.body['message'])
-
egress_message_annotations = egress_message.annotations
self.assertEqual(egress_message_annotations.__class__, dict)
self.assertEqual(egress_message_annotations['x-opt-qd.ingress'], 'ingress-router')
+ # Make sure the user defined annotation also makes it out.
+ self.assertEqual(egress_message_annotations['work'], 'hard')
self.assertEqual(egress_message_annotations['x-opt-qd.trace'], ['0/QDR.1', '0/QDR'])
M1.stop()
M2.stop()
-
- #Dont send any pre-existing ingress or trace annotations. Make sure that there are no outgoing message annotations
- #stripAnnotations property is set to "both"
+ # Dont send any pre-existing ingress or trace annotations. Make sure that there are no outgoing message annotations
+ # stripAnnotations property is set to "both"
def test_08a_test_strip_message_annotations_both(self):
addr = self.router.addresses[2]+"/strip_message_annotations_both/1"
@@ -620,6 +614,41 @@ class RouterTest(TestCase):
M1.stop()
M2.stop()
+ # Dont send any pre-existing ingress or trace annotations. Send in a custom annotation.
+ # Make sure that the custom annotation comes out and nothing else.
+ # stripAnnotations property is set to "both"
+ def test_08a_test_strip_message_annotations_both_custom(self):
+ addr = self.router.addresses[2]+"/strip_message_annotations_both/1"
+
+ M1 = self.messenger()
+ M2 = self.messenger()
+
+ M1.start()
+ M2.start()
+ M2.subscribe(addr)
+
+ ingress_message = Message()
+ ingress_message.address = addr
+ ingress_message.body = {'message': 'Hello World!'}
+
+ # Only annotations with prefix "x-opt-qd." will be stripped
+ ingress_message_annotations = {'stay': 'humble', 'x-opt-qd': 'work'}
+ ingress_message.annotations = ingress_message_annotations
+
+ #Put and send the message
+ M1.put(ingress_message)
+ M1.send()
+
+ # Receive the message
+ M2.recv(1)
+ egress_message = Message()
+ M2.get(egress_message)
+
+ self.assertEqual(egress_message.annotations, ingress_message_annotations)
+
+ M1.stop()
+ M2.stop()
+
#Dont send any pre-existing ingress or trace annotations. Make sure that there are no outgoing message annotations
#stripAnnotations property is set to "out"
def test_08a_test_strip_message_annotations_out(self):
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0545502f/tests/system_tests_two_routers.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_two_routers.py b/tests/system_tests_two_routers.py
index fdc5534..db82838 100644
--- a/tests/system_tests_two_routers.py
+++ b/tests/system_tests_two_routers.py
@@ -416,7 +416,7 @@ class RouterTest(TestCase):
ingress_message = Message()
ingress_message.address = addr
ingress_message.body = {'message': 'Hello World!'}
- ingress_message_annotations = {}
+ ingress_message_annotations = {'work': 'hard', 'stay': 'humble'}
ingress_message.annotations = ingress_message_annotations
@@ -436,14 +436,17 @@ class RouterTest(TestCase):
self.assertEqual(egress_message_annotations.__class__, dict)
self.assertEqual(egress_message_annotations['x-opt-qd.ingress'], '0/QDR.A')
+ self.assertEqual(egress_message_annotations['work'], 'hard')
+ self.assertEqual(egress_message_annotations['stay'], 'humble')
self.assertEqual(egress_message_annotations['x-opt-qd.trace'], ['0/QDR.A', '0/QDR.B'])
M1.stop()
M2.stop()
- #This unit test is currently skipped because dispatch router do not pass thru custom message annotations. Once the feature is added the @unittest.skip decorator can be removed.
- #The stripAnnotations property is set to 'no'
- def notest_08a_test_strip_message_annotations_no_custom_not_implemented(self):
+ # This unit test is currently skipped because dispatch router do not pass thru custom message annotations.
+ # Once the feature is added the @unittest.skip decorator can be removed.
+ # The stripAnnotations property is set to 'no'
+ def test_08a_strip_message_annotations_custom(self):
addr = "amqp:/message_annotations_strip_no_custom/1"
M1 = self.messenger()
@@ -462,8 +465,7 @@ class RouterTest(TestCase):
ingress_message.body = {'message': 'Hello World!'}
ingress_message_annotations = {}
ingress_message_annotations['custom-annotation'] = '1/Custom_Annotation'
-
-
+
ingress_message.annotations = ingress_message_annotations
M1.put(ingress_message)
@@ -474,10 +476,9 @@ class RouterTest(TestCase):
egress_message = Message()
M2.get(egress_message)
- #Make sure 'Hello World!' is in the message body dict
+ # Make sure 'Hello World!' is in the message body dict
self.assertEqual('Hello World!', egress_message.body['message'])
-
-
+
egress_message_annotations = egress_message.annotations
self.assertEqual(egress_message_annotations.__class__, dict)
@@ -537,9 +538,9 @@ class RouterTest(TestCase):
M1.stop()
M2.stop()
- #Test to see if the dispatch router specific annotations were stripped.
- #The stripAnnotations property is set to 'both'
- #Send a message to the router with pre-existing ingress and trace annotations and make sure that nothing comes out.
+ # Test to see if the dispatch router specific annotations were stripped.
+ # The stripAnnotations property is set to 'both'
+ # Send a message to the router with pre-existing ingress and trace annotations and make sure that nothing comes out.
def test_08a_test_strip_message_annotations_both_add_ingress_trace(self):
addr = "amqp:/strip_message_annotations_both_add_ingress_trace/1"
@@ -559,10 +560,14 @@ class RouterTest(TestCase):
ingress_message.body = {'message': 'Hello World!'}
##
- ## Pre-existing ingress and trace. Intentionally populate the trace with the 0/QDR.A which is the trace of the first router.
- ## If the inbound annotations were not stripped, the router would drop this message since it would consider this message as being looped.
- ##
- ingress_message_annotations = {'x-opt-qd.ingress': 'ingress-router', 'x-opt-qd.trace': ['0/QDR.A']}
+ # Pre-existing ingress and trace. Intentionally populate the trace with the 0/QDR.A which is the trace
+ # of the first router. If the inbound annotations were not stripped, the router would drop this message
+ # since it would consider this message as being looped.
+ #
+ ingress_message_annotations = {'x-opt-qd.ingress': 'ingress-router',
+ 'x-opt-qd.trace': ['0/QDR.A'],
+ 'work': 'hard',
+ 'x-opt-qd': 'humble'}
ingress_message.annotations = ingress_message_annotations
#Put and send the message
@@ -573,15 +578,15 @@ class RouterTest(TestCase):
M2.recv(1)
egress_message = Message()
M2.get(egress_message)
-
- self.assertEqual(egress_message.annotations, None)
+
+ # Router specific annotations (annotations with prefix "x-opt-qd.") will be stripped. User defined annotations will not be stripped.
+ self.assertEqual(egress_message.annotations, {'work': 'hard', 'x-opt-qd': 'humble'})
M1.stop()
M2.stop()
-
- #Send in pre-existing trace and ingress and annotations and make sure that there are no outgoing annotations.
- #stripAnnotations property is set to "in"
+ # Send in pre-existing trace and ingress and annotations and make sure that there are no outgoing annotations.
+ # stripAnnotations property is set to "in"
def test_08a_test_strip_message_annotations_out(self):
addr = "amqp:/strip_message_annotations_out/1"
@@ -618,6 +623,47 @@ class RouterTest(TestCase):
M1.stop()
M2.stop()
+
+ # Send in pre-existing trace and ingress and annotations and make sure that there are no outgoing annotations.
+ # stripAnnotations property is set to "in"
+ def test_08a_test_strip_message_annotations_out_custom(self):
+ addr = "amqp:/strip_message_annotations_out/1"
+
+ M1 = self.messenger()
+ M2 = self.messenger()
+
+ M1.route("amqp:/*", self.routers[0].addresses[3]+"/$1")
+ M2.route("amqp:/*", self.routers[1].addresses[3]+"/$1")
+
+ M1.start()
+ M2.start()
+ M2.subscribe(addr)
+ self.routers[0].wait_address("strip_message_annotations_out/1", 0, 1)
+
+ ingress_message = Message()
+ ingress_message.address = addr
+ ingress_message.body = {'message': 'Hello World!'}
+
+ # Annotations with prefix "x-opt-qd." will be skipped
+ ingress_message_annotations = {'work': 'hard', "x-opt-qd": "custom", "x-opt-qd.": "custom"}
+ ingress_message.annotations = ingress_message_annotations
+
+ # Put and send the message
+ M1.put(ingress_message)
+ M1.send()
+
+ # Receive the message
+ egress_message = Message()
+ M2.recv(1)
+ M2.get(egress_message)
+
+ # Make sure 'Hello World!' is in the message body dict
+ self.assertEqual('Hello World!', egress_message.body['message'])
+
+ self.assertEqual(egress_message.annotations, {'work': 'hard', "x-opt-qd": "custom"})
+
+ M1.stop()
+ M2.stop()
#Send in pre-existing trace and ingress and annotations and make sure that they are not in the outgoing annotations.
#stripAnnotations property is set to "in"
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org