You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2015/01/31 20:48:53 UTC

svn commit: r1656243 - /qpid/dispatch/trunk/src/router_node.c

Author: tross
Date: Sat Jan 31 19:48:53 2015
New Revision: 1656243

URL: http://svn.apache.org/r1656243
Log:
DISPATCH-6 - Completed link-attach propagation for receivers.

Modified:
    qpid/dispatch/trunk/src/router_node.c

Modified: qpid/dispatch/trunk/src/router_node.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_node.c?rev=1656243&r1=1656242&r2=1656243&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_node.c (original)
+++ qpid/dispatch/trunk/src/router_node.c Sat Jan 31 19:48:53 2015
@@ -1034,6 +1034,12 @@ typedef struct link_attach_t {
 ALLOC_DECLARE(link_attach_t);
 ALLOC_DEFINE(link_attach_t);
 
+typedef enum {
+    LINK_ATTACH_FORWARDED = 1,  ///< The attach was forwarded
+    LINK_ATTACH_NO_MATCH  = 2,  ///< No link-route address was found
+    LINK_ATTACH_NO_PATH   = 3   ///< Link-route exists but there's no reachable destination
+} link_attach_result_t;
+
 
 static void qd_router_attach_routed_link(void *context, bool discard)
 {
@@ -1057,6 +1063,7 @@ static void qd_router_attach_routed_link
         sys_mutex_lock(la->router->lock);
         rlink->connected_link = la->peer_link;
         la->peer_link->connected_link = rlink;
+        qd_entity_cache_add(QD_ROUTER_LINK_TYPE, rlink);
         DEQ_INSERT_TAIL(la->router->links, rlink);
         sys_mutex_unlock(la->router->lock);
 
@@ -1064,6 +1071,9 @@ static void qd_router_attach_routed_link
         pn_terminus_copy(qd_link_target(link), qd_link_remote_target(la->peer_qd_link));
 
         pn_link_open(qd_link_pn(link));
+
+        if (la->dir == QD_INCOMING)
+            pn_link_flow(qd_link_pn(link), 100);
     }
 
     if (la->link_name)
@@ -1072,58 +1082,23 @@ static void qd_router_attach_routed_link
 }
 
 
-/**
- * New Incoming Link Handler
- */
-static int router_incoming_link_handler(void* context, qd_link_t *link)
+link_attach_result_t qd_router_link_route(qd_router_t      *router,
+                                          qd_router_link_t *rlink,
+                                          const char       *term_addr,
+                                          qd_direction_t    dir)
 {
-    qd_router_t *router    = (qd_router_t*) context;
-    pn_link_t   *pn_link   = qd_link_pn(link);
-    int          is_router = qd_router_terminus_is_router(qd_link_remote_source(link));
-    const char  *r_tgt     = pn_terminus_get_address(qd_link_remote_target(link));
-
-    if (is_router && !qd_router_connection_is_inter_router(qd_link_connection(link))) {
-        qd_log(router->log_source, QD_LOG_WARNING,
-               "Incoming link claims router capability but is not on an inter-router connection");
-        pn_link_close(pn_link);
-        return 0;
-    }
-
-    qd_router_link_t *rlink = new_qd_router_link_t();
-    DEQ_ITEM_INIT(rlink);
-    rlink->link_type      = is_router ? QD_LINK_ROUTER : QD_LINK_ENDPOINT;
-    rlink->link_direction = QD_INCOMING;
-    rlink->owning_addr    = 0;
-    rlink->waypoint       = 0;
-    rlink->link           = link;
-    rlink->connected_link = 0;
-    rlink->ref            = 0;
-    rlink->target         = 0;
-    DEQ_INIT(rlink->event_fifo);
-    DEQ_INIT(rlink->msg_fifo);
-
-    if (!is_router && r_tgt) {
-        rlink->target = (char*) malloc(strlen(r_tgt) + 1);
-        strcpy(rlink->target, r_tgt);
-    }
-
-    qd_link_set_context(link, rlink);
-
-    sys_mutex_lock(router->lock);
-    rlink->mask_bit = is_router ? qd_router_find_mask_bit_LH(router, link) : 0;
-    qd_entity_cache_add(QD_ROUTER_LINK_TYPE, rlink);
-    DEQ_INSERT_TAIL(router->links, rlink);
-
     //
     // Lookup the target address to see if we can link-route this attach.
     //
-    qd_address_t *addr = router_lookup_terminus_LH(router, r_tgt);
-    if (addr && !is_router) {
+    qd_address_t *addr = router_lookup_terminus_LH(router, term_addr);
+    if (addr) {
         //
         // This is a link-attach routable target.  Propagate the attach downrange.
         // Check first for a locally connected container.
         //
-        qd_router_lrp_ref_t *lrpref = DEQ_HEAD(addr->lrps);
+        qd_link_t           *link    = rlink->link;
+        pn_link_t           *pn_link = qd_link_pn(link);
+        qd_router_lrp_ref_t *lrpref  = DEQ_HEAD(addr->lrps);
         if (lrpref) {
             qd_connection_t *conn = lrpref->lrp->container->conn;
             if (conn) {
@@ -1132,7 +1107,7 @@ static int router_incoming_link_handler(
                 la->peer_link    = rlink;
                 la->peer_qd_link = link;
                 la->link_name    = strdup(pn_link_name(pn_link));
-                la->dir          = QD_OUTGOING;
+                la->dir          = dir;
                 la->conn         = conn;
                 qd_connection_invoke_deferred(conn, qd_router_attach_routed_link, la);
             }
@@ -1155,20 +1130,83 @@ static int router_incoming_link_handler(
                     la->peer_link    = rlink;
                     la->peer_qd_link = link;
                     la->link_name    = strdup(pn_link_name(pn_link));
-                    la->dir          = QD_OUTGOING;
+                    la->dir          = dir;
                     la->conn         = out_conn;
                     qd_connection_invoke_deferred(out_conn, qd_router_attach_routed_link, la);
                 }
             }
-        }
+        } else
+            return LINK_ATTACH_NO_PATH;
+    } else
+        return LINK_ATTACH_NO_MATCH;
+    return LINK_ATTACH_FORWARDED;
+}
+
+
+/**
+ * New Incoming Link Handler
+ */
+static int router_incoming_link_handler(void* context, qd_link_t *link)
+{
+    qd_router_t *router    = (qd_router_t*) context;
+    pn_link_t   *pn_link   = qd_link_pn(link);
+    int          is_router = qd_router_terminus_is_router(qd_link_remote_source(link));
+    const char  *r_tgt     = pn_terminus_get_address(qd_link_remote_target(link));
+
+    if (is_router && !qd_router_connection_is_inter_router(qd_link_connection(link))) {
+        qd_log(router->log_source, QD_LOG_WARNING,
+               "Incoming link claims router capability but is not on an inter-router connection");
+        pn_link_close(pn_link);
+        return 0;
+    }
+
+    qd_router_link_t *rlink = new_qd_router_link_t();
+    DEQ_ITEM_INIT(rlink);
+    rlink->link_type      = is_router ? QD_LINK_ROUTER : QD_LINK_ENDPOINT;
+    rlink->link_direction = QD_INCOMING;
+    rlink->owning_addr    = 0;
+    rlink->waypoint       = 0;
+    rlink->link           = link;
+    rlink->connected_link = 0;
+    rlink->ref            = 0;
+    rlink->target         = 0;
+    DEQ_INIT(rlink->event_fifo);
+    DEQ_INIT(rlink->msg_fifo);
+
+    if (!is_router && r_tgt) {
+        rlink->target = (char*) malloc(strlen(r_tgt) + 1);
+        strcpy(rlink->target, r_tgt);
     }
 
+    qd_link_set_context(link, rlink);
+
+    sys_mutex_lock(router->lock);
+    rlink->mask_bit = is_router ? qd_router_find_mask_bit_LH(router, link) : 0;
+    qd_entity_cache_add(QD_ROUTER_LINK_TYPE, rlink);
+    DEQ_INSERT_TAIL(router->links, rlink);
+
+    //
+    // Attempt to link-route this attach
+    //
+    link_attach_result_t la_result = LINK_ATTACH_NO_MATCH;
+    if (!is_router)
+        la_result = qd_router_link_route(router, rlink, r_tgt, QD_OUTGOING);
     sys_mutex_unlock(router->lock);
 
     pn_terminus_copy(qd_link_source(link), qd_link_remote_source(link));
     pn_terminus_copy(qd_link_target(link), qd_link_remote_target(link));
-    pn_link_flow(pn_link, 1000);
-    pn_link_open(pn_link);
+
+    //
+    // If link-routing was successful or there was no matching link-route
+    // address, open the link.  If link-routing was supposed to work but
+    // there was no reachable destination, close the link.
+    //
+    if (la_result == LINK_ATTACH_NO_PATH)
+        pn_link_close(pn_link);
+    else {
+        pn_link_flow(pn_link, 1000);
+        pn_link_open(pn_link);
+    }
 
     return 0;
 }
@@ -1189,6 +1227,7 @@ static int router_outgoing_link_handler(
     char                    phase = '0';
     qd_address_semantics_t  semantics;
     qd_address_t           *addr = 0;
+    link_attach_result_t    la_result = LINK_ATTACH_NO_MATCH;
 
     if (is_router && !qd_router_connection_is_inter_router(qd_link_connection(link))) {
         qd_log(router->log_source, QD_LOG_WARNING,
@@ -1275,33 +1314,36 @@ static int router_outgoing_link_handler(
         // address, that address needs to be set up in the address list.
         //
         char temp_addr[1000]; // TODO: Use pn_string or aprintf.
+        la_result = qd_router_link_route(router, rlink, r_src, QD_INCOMING);
 
-        if (is_dynamic) {
-            qd_router_generate_temp_addr(router, temp_addr, 1000);
-            iter = qd_field_iterator_string(temp_addr, ITER_VIEW_ADDRESS_HASH);
-            pn_terminus_set_address(qd_link_source(link), temp_addr);
-            qd_log(router->log_source, QD_LOG_INFO, "Assigned temporary routable address=%s", temp_addr);
-        } else
-            qd_log(router->log_source, QD_LOG_INFO, "Registered local address=%s phase=%c", r_src, phase);
-
-        qd_hash_retrieve(router->addr_hash, iter, (void**) &addr);
-        if (!addr) {
-            addr = qd_address();
-            qd_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
-            DEQ_INSERT_TAIL(router->addrs, addr);
-            addr->semantics = semantics;
-            qd_entity_cache_add(QD_ROUTER_ADDRESS_TYPE, addr);
-        }
+        if (la_result == LINK_ATTACH_NO_MATCH) {
+            if (is_dynamic) {
+                qd_router_generate_temp_addr(router, temp_addr, 1000);
+                iter = qd_field_iterator_string(temp_addr, ITER_VIEW_ADDRESS_HASH);
+                pn_terminus_set_address(qd_link_source(link), temp_addr);
+                qd_log(router->log_source, QD_LOG_INFO, "Assigned temporary routable address=%s", temp_addr);
+            } else
+                qd_log(router->log_source, QD_LOG_INFO, "Registered local address=%s phase=%c", r_src, phase);
+
+            qd_hash_retrieve(router->addr_hash, iter, (void**) &addr);
+            if (!addr) {
+                addr = qd_address();
+                qd_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
+                DEQ_INSERT_TAIL(router->addrs, addr);
+                addr->semantics = semantics;
+                qd_entity_cache_add(QD_ROUTER_ADDRESS_TYPE, addr);
+            }
 
-        rlink->owning_addr = addr;
-        qd_router_add_link_ref_LH(&addr->rlinks, rlink);
+            rlink->owning_addr = addr;
+            qd_router_add_link_ref_LH(&addr->rlinks, rlink);
 
-        //
-        // If this is not a dynamic address and it is the first local subscription
-        // to the address, supply the address to the router module for propagation
-        // to other nodes.
-        //
-        propagate = (!is_dynamic) && (DEQ_SIZE(addr->rlinks) == 1);
+            //
+            // If this is not a dynamic address and it is the first local subscription
+            // to the address, supply the address to the router module for propagation
+            // to other nodes.
+            //
+            propagate = (!is_dynamic) && (DEQ_SIZE(addr->rlinks) == 1);
+        }
     }
     qd_entity_cache_add(QD_ROUTER_LINK_TYPE, rlink);
     DEQ_INSERT_TAIL(router->links, rlink);



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org