You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2014/01/31 17:48:56 UTC
svn commit: r1563170 - in /qpid/dispatch/trunk/src: router_node.c
router_private.h
Author: tross
Date: Fri Jan 31 16:48:56 2014
New Revision: 1563170
URL: http://svn.apache.org/r1563170
Log:
DISPATCH-1 - The router will now honor the target of an incoming link if there is no
"to" field in the message properties.
Modified:
qpid/dispatch/trunk/src/router_node.c
qpid/dispatch/trunk/src/router_private.h
Modified: qpid/dispatch/trunk/src/router_node.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_node.c?rev=1563170&r1=1563169&r2=1563170&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_node.c (original)
+++ qpid/dispatch/trunk/src/router_node.c Fri Jan 31 16:48:56 2014
@@ -385,9 +385,12 @@ static int router_writable_link_handler(
}
-static qd_field_iterator_t *router_annotate_message(qd_router_t *router, qd_message_t *msg, int *drop)
+static qd_field_iterator_t *router_annotate_message(qd_router_t *router,
+ qd_parsed_field_t *in_da,
+ qd_message_t *msg,
+ int *drop,
+ const char *to_override)
{
- qd_parsed_field_t *in_da = qd_message_delivery_annotations(msg);
qd_composed_field_t *out_da = qd_compose(QD_PERFORMATIVE_DELIVERY_ANNOTATIONS, 0);
qd_field_iterator_t *ingress_iter = 0;
@@ -395,13 +398,21 @@ static qd_field_iterator_t *router_annot
qd_parsed_field_t *ingress = 0;
if (in_da) {
- trace = qd_parse_value_by_key(in_da, QD_DA_TRACE);
+ trace = qd_parse_value_by_key(in_da, QD_DA_TRACE);
ingress = qd_parse_value_by_key(in_da, QD_DA_INGRESS);
}
qd_compose_start_map(out_da);
//
+ // If there is a to_override provided, insert a TO field.
+ //
+ if (to_override) {
+ qd_compose_insert_string(out_da, QD_DA_TO);
+ qd_compose_insert_string(out_da, to_override);
+ }
+
+ //
// If there is a trace field, append this router's ID to the trace.
//
qd_compose_insert_string(out_da, QD_DA_TRACE);
@@ -661,9 +672,43 @@ static void router_rx_handler(void* cont
valid_message = qd_message_check(msg, QD_DEPTH_PROPERTIES);
if (valid_message) {
- qd_field_iterator_t *iter = qd_message_field_iterator(msg, QD_FIELD_TO);
+ qd_parsed_field_t *in_da = qd_message_delivery_annotations(msg);
+ qd_field_iterator_t *iter = 0;
+ bool free_iter = true;
qd_address_t *addr;
- int fanout = 0;
+ int fanout = 0;
+ char *to_override = 0;
+
+ //
+ // If the message has delivery annotations, get the to-override field from the annotations.
+ //
+ if (in_da) {
+ qd_parsed_field_t *da_to = qd_parse_value_by_key(in_da, QD_DA_TO);
+ if (da_to) {
+ iter = qd_parse_raw(da_to);
+ free_iter = false;
+ }
+ }
+
+ //
+ // If there was no to-override field, use the TO field from the
+ // message properties.
+ //
+ if (!iter)
+ iter = qd_message_field_iterator(msg, QD_FIELD_TO);
+
+ //
+ // Handle the case where the TO field is absent and the incoming link has a target
+ // address. Use the target address in the lookup in lieu of a TO address.
+ // Note also that the message must then be annotated with a TO-OVERRIDE field in
+ // the delivery annotations.
+ //
+ // ref: https://issues.apache.org/jira/browse/DISPATCH-1
+ //
+ if (!iter && rlink->target) {
+ iter = qd_field_iterator_string(rlink->target, ITER_VIEW_ALL);
+ to_override = rlink->target;
+ }
if (iter) {
qd_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
@@ -681,7 +726,8 @@ static void router_rx_handler(void* cont
qd_field_iterator_reset_view(iter, ITER_VIEW_NO_HOST);
int is_local = qd_field_iterator_prefix(iter, local_prefix);
int is_direct = qd_field_iterator_prefix(iter, direct_prefix);
- qd_field_iterator_free(iter);
+ if (free_iter)
+ qd_field_iterator_free(iter);
if (addr) {
//
@@ -699,7 +745,7 @@ static void router_rx_handler(void* cont
// returns a 'drop' indication if it detects that the message will loop.
//
int drop = 0;
- qd_field_iterator_t *ingress_iter = router_annotate_message(router, msg, &drop);
+ qd_field_iterator_t *ingress_iter = router_annotate_message(router, in_da, msg, &drop, to_override);
//
// Forward to the in-process handler for this address if there is one. The
@@ -847,6 +893,7 @@ static int router_incoming_link_handler(
qd_router_t *router = (qd_router_t*) context;
pn_link_t *pn_link = qd_link_pn(link);
int is_router = qd_router_terminus_is_router(qd_link_remote_source(link));
+ const char *r_tgt = pn_terminus_get_address(qd_link_remote_target(link));
if (is_router && !qd_router_connection_is_inter_router(qd_link_connection(link))) {
qd_log(router->log_source, QD_LOG_WARNING, "Incoming link claims router capability but is not on an inter-router connection");
@@ -863,9 +910,15 @@ static int router_incoming_link_handler(
rlink->connected_link = 0;
rlink->peer_link = 0;
rlink->ref = 0;
+ rlink->target = 0;
DEQ_INIT(rlink->event_fifo);
DEQ_INIT(rlink->msg_fifo);
+ if (!is_router && r_tgt) {
+ rlink->target = (char*) malloc(strlen(r_tgt) + 1);
+ strcpy(rlink->target, r_tgt);
+ }
+
qd_link_set_context(link, rlink);
sys_mutex_lock(router->lock);
@@ -961,6 +1014,7 @@ static int router_outgoing_link_handler(
rlink->connected_link = 0;
rlink->peer_link = 0;
rlink->ref = 0;
+ rlink->target = 0;
DEQ_INIT(rlink->event_fifo);
DEQ_INIT(rlink->msg_fifo);
@@ -1099,6 +1153,8 @@ static int router_link_detach_handler(vo
qd_router_check_addr(router, oaddr, 1);
// TODO - wrap the free to handle the recursive items
+ if (rlink->target)
+ free(rlink->target);
free_qd_router_link_t(rlink);
return 0;
@@ -1158,6 +1214,8 @@ static void router_outbound_open_handler
rlink->link = receiver;
rlink->connected_link = 0;
rlink->peer_link = 0;
+ rlink->ref = 0;
+ rlink->target = 0;
DEQ_INIT(rlink->event_fifo);
DEQ_INIT(rlink->msg_fifo);
@@ -1181,6 +1239,8 @@ static void router_outbound_open_handler
rlink->link = sender;
rlink->connected_link = 0;
rlink->peer_link = 0;
+ rlink->ref = 0;
+ rlink->target = 0;
DEQ_INIT(rlink->event_fifo);
DEQ_INIT(rlink->msg_fifo);
Modified: qpid/dispatch/trunk/src/router_private.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_private.h?rev=1563170&r1=1563169&r2=1563170&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_private.h (original)
+++ qpid/dispatch/trunk/src/router_private.h Fri Jan 31 16:48:56 2014
@@ -73,6 +73,7 @@ struct qd_router_link_t {
qd_router_link_t *connected_link; // [ref] If this is a link-route, reference the connected link
qd_router_link_t *peer_link; // [ref] If this is a bidirectional link-route, reference the peer link
qd_router_link_ref_t *ref; // Pointer to a containing reference object
+ char *target; // Target address for incoming links
qd_routed_event_list_t event_fifo; // FIFO of outgoing delivery/link events (no messages)
qd_routed_event_list_t msg_fifo; // FIFO of outgoing message deliveries
};
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org