You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2014/01/31 17:48:56 UTC

svn commit: r1563170 - in /qpid/dispatch/trunk/src: router_node.c router_private.h

Author: tross
Date: Fri Jan 31 16:48:56 2014
New Revision: 1563170

URL: http://svn.apache.org/r1563170
Log:
DISPATCH-1 - The router will now honor the target of an incoming link if there is no
             "to" field in the message properties.

Modified:
    qpid/dispatch/trunk/src/router_node.c
    qpid/dispatch/trunk/src/router_private.h

Modified: qpid/dispatch/trunk/src/router_node.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_node.c?rev=1563170&r1=1563169&r2=1563170&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_node.c (original)
+++ qpid/dispatch/trunk/src/router_node.c Fri Jan 31 16:48:56 2014
@@ -385,9 +385,12 @@ static int router_writable_link_handler(
 }
 
 
-static qd_field_iterator_t *router_annotate_message(qd_router_t *router, qd_message_t *msg, int *drop)
+static qd_field_iterator_t *router_annotate_message(qd_router_t       *router,
+                                                    qd_parsed_field_t *in_da,
+                                                    qd_message_t      *msg,
+                                                    int               *drop,
+                                                    const char        *to_override)
 {
-    qd_parsed_field_t   *in_da        = qd_message_delivery_annotations(msg);
     qd_composed_field_t *out_da       = qd_compose(QD_PERFORMATIVE_DELIVERY_ANNOTATIONS, 0);
     qd_field_iterator_t *ingress_iter = 0;
 
@@ -395,13 +398,21 @@ static qd_field_iterator_t *router_annot
     qd_parsed_field_t *ingress = 0;
 
     if (in_da) {
-        trace   = qd_parse_value_by_key(in_da, QD_DA_TRACE);
+        trace = qd_parse_value_by_key(in_da, QD_DA_TRACE);
         ingress = qd_parse_value_by_key(in_da, QD_DA_INGRESS);
     }
 
     qd_compose_start_map(out_da);
 
     //
+    // If there is a to_override provided, insert a TO field.
+    //
+    if (to_override) {
+        qd_compose_insert_string(out_da, QD_DA_TO);
+        qd_compose_insert_string(out_da, to_override);
+    }
+
+    //
     // If there is a trace field, append this router's ID to the trace.
     //
     qd_compose_insert_string(out_da, QD_DA_TRACE);
@@ -661,9 +672,43 @@ static void router_rx_handler(void* cont
     valid_message = qd_message_check(msg, QD_DEPTH_PROPERTIES);
 
     if (valid_message) {
-        qd_field_iterator_t *iter = qd_message_field_iterator(msg, QD_FIELD_TO);
+        qd_parsed_field_t   *in_da     = qd_message_delivery_annotations(msg);
+        qd_field_iterator_t *iter      = 0;
+        bool                 free_iter = true;
         qd_address_t        *addr;
-        int                  fanout = 0;
+        int                  fanout       = 0;
+        char                *to_override  = 0;
+
+        //
+        // If the message has delivery annotations, get the to-override field from the annotations.
+        //
+        if (in_da) {
+            qd_parsed_field_t *da_to = qd_parse_value_by_key(in_da, QD_DA_TO);
+            if (da_to) {
+                iter      = qd_parse_raw(da_to);
+                free_iter = false;
+            }
+        }
+
+        //
+        // If there was no to-override field, use the TO field from the
+        // message properties.
+        //
+        if (!iter)
+            iter = qd_message_field_iterator(msg, QD_FIELD_TO);
+
+        //
+        // Handle the case where the TO field is absent and the incoming link has a target
+        // address.  Use the target address in the lookup in lieu of a TO address.
+        // Note also that the message must then be annotated with a TO-OVERRIDE field in
+        // the delivery annotations.
+        //
+        // ref: https://issues.apache.org/jira/browse/DISPATCH-1
+        //
+        if (!iter && rlink->target) {
+            iter = qd_field_iterator_string(rlink->target, ITER_VIEW_ALL);
+            to_override = rlink->target;
+        }
 
         if (iter) {
             qd_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
@@ -681,7 +726,8 @@ static void router_rx_handler(void* cont
             qd_field_iterator_reset_view(iter, ITER_VIEW_NO_HOST);
             int is_local  = qd_field_iterator_prefix(iter, local_prefix);
             int is_direct = qd_field_iterator_prefix(iter, direct_prefix);
-            qd_field_iterator_free(iter);
+            if (free_iter)
+                qd_field_iterator_free(iter);
 
             if (addr) {
                 //
@@ -699,7 +745,7 @@ static void router_rx_handler(void* cont
                 // returns a 'drop' indication if it detects that the message will loop.
                 //
                 int drop = 0;
-                qd_field_iterator_t *ingress_iter = router_annotate_message(router, msg, &drop);
+                qd_field_iterator_t *ingress_iter = router_annotate_message(router, in_da, msg, &drop, to_override);
 
                 //
                 // Forward to the in-process handler for this address if there is one.  The
@@ -847,6 +893,7 @@ static int router_incoming_link_handler(
     qd_router_t *router    = (qd_router_t*) context;
     pn_link_t   *pn_link   = qd_link_pn(link);
     int          is_router = qd_router_terminus_is_router(qd_link_remote_source(link));
+    const char  *r_tgt     = pn_terminus_get_address(qd_link_remote_target(link));
 
     if (is_router && !qd_router_connection_is_inter_router(qd_link_connection(link))) {
         qd_log(router->log_source, QD_LOG_WARNING, "Incoming link claims router capability but is not on an inter-router connection");
@@ -863,9 +910,15 @@ static int router_incoming_link_handler(
     rlink->connected_link = 0;
     rlink->peer_link      = 0;
     rlink->ref            = 0;
+    rlink->target         = 0;
     DEQ_INIT(rlink->event_fifo);
     DEQ_INIT(rlink->msg_fifo);
 
+    if (!is_router && r_tgt) {
+        rlink->target = (char*) malloc(strlen(r_tgt) + 1);
+        strcpy(rlink->target, r_tgt);
+    }
+
     qd_link_set_context(link, rlink);
 
     sys_mutex_lock(router->lock);
@@ -961,6 +1014,7 @@ static int router_outgoing_link_handler(
     rlink->connected_link = 0;
     rlink->peer_link      = 0;
     rlink->ref            = 0;
+    rlink->target         = 0;
     DEQ_INIT(rlink->event_fifo);
     DEQ_INIT(rlink->msg_fifo);
 
@@ -1099,6 +1153,8 @@ static int router_link_detach_handler(vo
     qd_router_check_addr(router, oaddr, 1);
 
     // TODO - wrap the free to handle the recursive items
+    if (rlink->target)
+        free(rlink->target);
     free_qd_router_link_t(rlink);
 
     return 0;
@@ -1158,6 +1214,8 @@ static void router_outbound_open_handler
     rlink->link           = receiver;
     rlink->connected_link = 0;
     rlink->peer_link      = 0;
+    rlink->ref            = 0;
+    rlink->target         = 0;
     DEQ_INIT(rlink->event_fifo);
     DEQ_INIT(rlink->msg_fifo);
 
@@ -1181,6 +1239,8 @@ static void router_outbound_open_handler
     rlink->link           = sender;
     rlink->connected_link = 0;
     rlink->peer_link      = 0;
+    rlink->ref            = 0;
+    rlink->target         = 0;
     DEQ_INIT(rlink->event_fifo);
     DEQ_INIT(rlink->msg_fifo);
 

Modified: qpid/dispatch/trunk/src/router_private.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_private.h?rev=1563170&r1=1563169&r2=1563170&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_private.h (original)
+++ qpid/dispatch/trunk/src/router_private.h Fri Jan 31 16:48:56 2014
@@ -73,6 +73,7 @@ struct qd_router_link_t {
     qd_router_link_t       *connected_link;  // [ref] If this is a link-route, reference the connected link
     qd_router_link_t       *peer_link;       // [ref] If this is a bidirectional link-route, reference the peer link
     qd_router_link_ref_t   *ref;             // Pointer to a containing reference object
+    char                   *target;          // Target address for incoming links
     qd_routed_event_list_t  event_fifo;      // FIFO of outgoing delivery/link events (no messages)
     qd_routed_event_list_t  msg_fifo;        // FIFO of outgoing message deliveries
 };



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org