You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2016/03/31 19:00:33 UTC
qpid-dispatch git commit: DISPATCH-127 - Carry the ingress address
phase across inter-router links so waypoints can be accessed from remote
routers.
Repository: qpid-dispatch
Updated Branches:
refs/heads/master 2a03648c8 -> 9d08d0a67
DISPATCH-127 - Carry the ingress address phase across inter-router links so waypoints can be
accessed from remote routers.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/9d08d0a6
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/9d08d0a6
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/9d08d0a6
Branch: refs/heads/master
Commit: 9d08d0a678d71c275ab0c05d4f28b1549630e132
Parents: 2a03648
Author: Ted Ross <tr...@redhat.com>
Authored: Thu Mar 31 12:59:14 2016 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Thu Mar 31 12:59:14 2016 -0400
----------------------------------------------------------------------
include/qpid/dispatch/amqp.h | 1 +
include/qpid/dispatch/message.h | 10 ++++++++++
include/qpid/dispatch/router_core.h | 11 +++++++++++
src/amqp.c | 1 +
src/message.c | 19 ++++++++++++++++++
src/message_private.h | 2 +-
src/router_core/connections.c | 6 ++++++
src/router_node.c | 33 +++++++++++++++++++++++++++-----
tests/system_tests_link_routes.py | 5 -----
9 files changed, 77 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/9d08d0a6/include/qpid/dispatch/amqp.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/amqp.h b/include/qpid/dispatch/amqp.h
index d4bc340..54773b5 100644
--- a/include/qpid/dispatch/amqp.h
+++ b/include/qpid/dispatch/amqp.h
@@ -94,6 +94,7 @@ enum {
extern const char * const QD_MA_INGRESS; ///< Ingress Router
extern const char * const QD_MA_TRACE; ///< Trace
extern const char * const QD_MA_TO; ///< To-Override
+extern const char * const QD_MA_PHASE; ///< Phase for override address
extern const char * const QD_MA_CLASS; ///< Message-Class
/// @}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/9d08d0a6/include/qpid/dispatch/message.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h
index 7ab7b9c..66601ba 100644
--- a/include/qpid/dispatch/message.h
+++ b/include/qpid/dispatch/message.h
@@ -176,6 +176,16 @@ void qd_message_set_trace_annotation(qd_message_t *msg, qd_composed_field_t *tra
void qd_message_set_to_override_annotation(qd_message_t *msg, qd_composed_field_t *to_field);
/**
+ * Set a phase for the phase annotation in the message.
+ *
+ * @param msg Pointer to an outgoing message.
+ * @param phase The phase of the address for the outgoing message.
+ *
+ */
+void qd_message_set_phase_annotation(qd_message_t *msg, int phase);
+int qd_message_get_phase_annotation(const qd_message_t *msg);
+
+/**
* Set the value for the QD_MA_INGRESS field in the outgoing message
* annotations for the message.
*
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/9d08d0a6/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 6e26b41..3956b5c 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -382,6 +382,17 @@ qd_link_type_t qdr_link_type(const qdr_link_t *link);
qd_direction_t qdr_link_direction(const qdr_link_t *link);
/**
+ * qdr_link_phase
+ *
+ * If this link is associated with an auto_link, return the address phase. Otherwise
+ * return zero.
+ *
+ * @param link Link object
+ * @return 0 or the phase of the link's auto_link.
+ */
+int qdr_link_phase(const qdr_link_t *link);
+
+/**
* qdr_link_is_anonymous
*
* Indicate whether the link is anonymous. Note that this is determined inside
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/9d08d0a6/src/amqp.c
----------------------------------------------------------------------
diff --git a/src/amqp.c b/src/amqp.c
index c02216d..656f7ef 100644
--- a/src/amqp.c
+++ b/src/amqp.c
@@ -22,6 +22,7 @@
const char * const QD_MA_INGRESS = "x-opt-qd.ingress";
const char * const QD_MA_TRACE = "x-opt-qd.trace";
const char * const QD_MA_TO = "x-opt-qd.to";
+const char * const QD_MA_PHASE = "x-opt-qd.phase";
const char * const QD_MA_CLASS = "x-opt-qd.class";
const char * const QD_CAPABILITY_ROUTER_CONTROL = "qd.router";
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/9d08d0a6/src/message.c
----------------------------------------------------------------------
diff --git a/src/message.c b/src/message.c
index c8ade93..c5dc028 100644
--- a/src/message.c
+++ b/src/message.c
@@ -547,6 +547,7 @@ qd_message_t *qd_message()
DEQ_INIT(msg->ma_to_override);
DEQ_INIT(msg->ma_trace);
DEQ_INIT(msg->ma_ingress);
+ msg->ma_phase = 0;
msg->content = new_qd_message_content_t();
if (msg->content == 0) {
@@ -612,6 +613,7 @@ qd_message_t *qd_message_copy(qd_message_t *in_msg)
qd_buffer_list_clone(©->ma_to_override, &msg->ma_to_override);
qd_buffer_list_clone(©->ma_trace, &msg->ma_trace);
qd_buffer_list_clone(©->ma_ingress, &msg->ma_ingress);
+ copy->ma_phase = msg->ma_phase;
copy->content = content;
@@ -666,6 +668,18 @@ void qd_message_set_to_override_annotation(qd_message_t *in_msg, qd_composed_fie
qd_compose_free(to_field);
}
+void qd_message_set_phase_annotation(qd_message_t *in_msg, int phase)
+{
+ qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
+ msg->ma_phase = phase;
+}
+
+int qd_message_get_phase_annotation(const qd_message_t *in_msg)
+{
+ qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
+ return msg->ma_phase;
+}
+
void qd_message_set_ingress_annotation(qd_message_t *in_msg, qd_composed_field_t *ingress_field)
{
qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
@@ -790,6 +804,11 @@ static bool compose_message_annotations(qd_message_pvt_t *msg, qd_buffer_list_t
qd_compose_insert_buffers(out_ma, &msg->ma_ingress);
}
+ if (msg->ma_phase != 0) {
+ qd_compose_insert_symbol(out_ma, QD_MA_PHASE);
+ qd_compose_insert_int(out_ma, msg->ma_phase);
+ }
+
qd_compose_end_map(out_ma);
qd_compose_take_buffers(out_ma, out);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/9d08d0a6/src/message_private.h
----------------------------------------------------------------------
diff --git a/src/message_private.h b/src/message_private.h
index f09fa25..8ede2c7 100644
--- a/src/message_private.h
+++ b/src/message_private.h
@@ -93,7 +93,7 @@ typedef struct {
qd_buffer_list_t ma_to_override; // to field in outgoing message annotations.
qd_buffer_list_t ma_trace; // trace list in outgoing message annotations
qd_buffer_list_t ma_ingress; // ingress field in outgoing message annotations
-
+ int ma_phase; // phase for the override address
} qd_message_pvt_t;
ALLOC_DECLARE(qd_message_t);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/9d08d0a6/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 4cfc8cd..9b6e63b 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -217,6 +217,12 @@ qd_direction_t qdr_link_direction(const qdr_link_t *link)
}
+int qdr_link_phase(const qdr_link_t *link)
+{
+ return link && link->auto_link ? link->auto_link->phase : 0;
+}
+
+
bool qdr_link_is_anonymous(const qdr_link_t *link)
{
return link->owning_addr == 0;
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/9d08d0a6/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index 4bc5ad9..41a8d9a 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -94,6 +94,7 @@ static qd_field_iterator_t *router_annotate_message(qd_router_t *router,
qd_parsed_field_t *trace = 0;
qd_parsed_field_t *ingress = 0;
qd_parsed_field_t *to = 0;
+ qd_parsed_field_t *phase = 0;
*link_exclusions = 0;
@@ -115,8 +116,10 @@ static qd_field_iterator_t *router_annotate_message(qd_router_t *router,
ingress = qd_parse_sub_value(in_ma, idx);
} else if (qd_field_iterator_equal(iter, (unsigned char*) QD_MA_TO)) {
to = qd_parse_sub_value(in_ma, idx);
+ } else if (qd_field_iterator_equal(iter, (unsigned char*) QD_MA_PHASE)) {
+ phase = qd_parse_sub_value(in_ma, idx);
}
- done = trace && ingress && to;
+ done = trace && ingress && to && phase;
}
}
@@ -166,6 +169,15 @@ static qd_field_iterator_t *router_annotate_message(qd_router_t *router,
}
//
+ // QD_MA_PHASE:
+ // Preserve the existing value.
+ //
+ if (phase) {
+ int phase_val = qd_parse_as_int(phase);
+ qd_message_set_phase_annotation(msg, phase_val);
+ }
+
+ //
// QD_MA_INGRESS:
// If there is no ingress field, annotate the ingress as
// this router else keep the original field.
@@ -264,14 +276,17 @@ static void AMQP_rx_handler(void* context, qd_link_t *link, pn_delivery_t *pnd)
if (anonymous_link) {
qd_field_iterator_t *addr_iter = 0;
+ int phase = 0;
//
// If the message has delivery annotations, get the to-override field from the annotations.
//
if (in_ma) {
qd_parsed_field_t *ma_to = qd_parse_value_by_key(in_ma, QD_MA_TO);
- if (ma_to)
+ if (ma_to) {
addr_iter = qd_field_iterator_dup(qd_parse_raw(ma_to));
+ phase = qd_message_get_phase_annotation(msg);
+ }
}
//
@@ -282,15 +297,23 @@ static void AMQP_rx_handler(void* context, qd_link_t *link, pn_delivery_t *pnd)
if (addr_iter) {
qd_address_iterator_reset_view(addr_iter, ITER_VIEW_ADDRESS_HASH);
+ if (phase > 0)
+ qd_address_iterator_set_phase(addr_iter, '0' + (char) phase);
delivery = qdr_link_deliver_to(rlink, msg, ingress_iter, addr_iter, pn_delivery_settled(pnd),
link_exclusions);
}
} else {
- const char *r_tgt = pn_terminus_get_address(qd_link_remote_target(link));
- if (r_tgt) {
+ const char *term_addr = pn_terminus_get_address(qd_link_remote_target(link));
+ if (!term_addr)
+ term_addr = pn_terminus_get_address(qd_link_source(link));
+
+ if (term_addr) {
qd_composed_field_t *to_override = qd_compose_subfield(0);
- qd_compose_insert_string(to_override, r_tgt);
+ qd_compose_insert_string(to_override, term_addr);
qd_message_set_to_override_annotation(msg, to_override);
+ int phase = qdr_link_phase(rlink);
+ if (phase != 0)
+ qd_message_set_phase_annotation(msg, phase);
}
delivery = qdr_link_deliver(rlink, msg, ingress_iter, pn_delivery_settled(pnd), link_exclusions);
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/9d08d0a6/tests/system_tests_link_routes.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_link_routes.py b/tests/system_tests_link_routes.py
index 89a3fa7..6f3ca61 100644
--- a/tests/system_tests_link_routes.py
+++ b/tests/system_tests_link_routes.py
@@ -126,11 +126,8 @@ class LinkRoutePatternTest(TestCase):
"""
out = self.run_qdstat_linkRoute(self.routers[1].addresses[0])
out_list = out.split()
- self.assertEqual(out_list.count('org.apache.qpid.dispatch.config.linkRoute'), 2)
- self.assertEqual(out_list.count('org.apache.qpid.dispatch.config.linkRoute'), 2)
self.assertEqual(out_list.count('in'), 1)
self.assertEqual(out_list.count('out'), 1)
- self.assertEqual(out_list.count('broker'), 2)
def test_ccc_qdstat_link_routes_routerC(self):
"""
@@ -140,8 +137,6 @@ class LinkRoutePatternTest(TestCase):
out = self.run_qdstat_linkRoute(self.routers[2].addresses[1])
out_list = out.split()
- self.assertEqual(out_list.count('org.apache.qpid.dispatch.config.linkRoute'), 2)
- self.assertEqual(out_list.count('org.apache.qpid.dispatch.config.linkRoute'), 2)
self.assertEqual(out_list.count('in'), 1)
self.assertEqual(out_list.count('out'), 1)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org