You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2015/01/31 20:48:53 UTC
svn commit: r1656243 - /qpid/dispatch/trunk/src/router_node.c
Author: tross
Date: Sat Jan 31 19:48:53 2015
New Revision: 1656243
URL: http://svn.apache.org/r1656243
Log:
DISPATCH-6 - Completed link-attach propagation for receivers.
Modified:
qpid/dispatch/trunk/src/router_node.c
Modified: qpid/dispatch/trunk/src/router_node.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_node.c?rev=1656243&r1=1656242&r2=1656243&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_node.c (original)
+++ qpid/dispatch/trunk/src/router_node.c Sat Jan 31 19:48:53 2015
@@ -1034,6 +1034,12 @@ typedef struct link_attach_t {
ALLOC_DECLARE(link_attach_t);
ALLOC_DEFINE(link_attach_t);
+typedef enum {
+ LINK_ATTACH_FORWARDED = 1, ///< The attach was forwarded
+ LINK_ATTACH_NO_MATCH = 2, ///< No link-route address was found
+ LINK_ATTACH_NO_PATH = 3 ///< Link-route exists but there's no reachable destination
+} link_attach_result_t;
+
static void qd_router_attach_routed_link(void *context, bool discard)
{
@@ -1057,6 +1063,7 @@ static void qd_router_attach_routed_link
sys_mutex_lock(la->router->lock);
rlink->connected_link = la->peer_link;
la->peer_link->connected_link = rlink;
+ qd_entity_cache_add(QD_ROUTER_LINK_TYPE, rlink);
DEQ_INSERT_TAIL(la->router->links, rlink);
sys_mutex_unlock(la->router->lock);
@@ -1064,6 +1071,9 @@ static void qd_router_attach_routed_link
pn_terminus_copy(qd_link_target(link), qd_link_remote_target(la->peer_qd_link));
pn_link_open(qd_link_pn(link));
+
+ if (la->dir == QD_INCOMING)
+ pn_link_flow(qd_link_pn(link), 100);
}
if (la->link_name)
@@ -1072,58 +1082,23 @@ static void qd_router_attach_routed_link
}
-/**
- * New Incoming Link Handler
- */
-static int router_incoming_link_handler(void* context, qd_link_t *link)
+link_attach_result_t qd_router_link_route(qd_router_t *router,
+ qd_router_link_t *rlink,
+ const char *term_addr,
+ qd_direction_t dir)
{
- qd_router_t *router = (qd_router_t*) context;
- pn_link_t *pn_link = qd_link_pn(link);
- int is_router = qd_router_terminus_is_router(qd_link_remote_source(link));
- const char *r_tgt = pn_terminus_get_address(qd_link_remote_target(link));
-
- if (is_router && !qd_router_connection_is_inter_router(qd_link_connection(link))) {
- qd_log(router->log_source, QD_LOG_WARNING,
- "Incoming link claims router capability but is not on an inter-router connection");
- pn_link_close(pn_link);
- return 0;
- }
-
- qd_router_link_t *rlink = new_qd_router_link_t();
- DEQ_ITEM_INIT(rlink);
- rlink->link_type = is_router ? QD_LINK_ROUTER : QD_LINK_ENDPOINT;
- rlink->link_direction = QD_INCOMING;
- rlink->owning_addr = 0;
- rlink->waypoint = 0;
- rlink->link = link;
- rlink->connected_link = 0;
- rlink->ref = 0;
- rlink->target = 0;
- DEQ_INIT(rlink->event_fifo);
- DEQ_INIT(rlink->msg_fifo);
-
- if (!is_router && r_tgt) {
- rlink->target = (char*) malloc(strlen(r_tgt) + 1);
- strcpy(rlink->target, r_tgt);
- }
-
- qd_link_set_context(link, rlink);
-
- sys_mutex_lock(router->lock);
- rlink->mask_bit = is_router ? qd_router_find_mask_bit_LH(router, link) : 0;
- qd_entity_cache_add(QD_ROUTER_LINK_TYPE, rlink);
- DEQ_INSERT_TAIL(router->links, rlink);
-
//
// Lookup the target address to see if we can link-route this attach.
//
- qd_address_t *addr = router_lookup_terminus_LH(router, r_tgt);
- if (addr && !is_router) {
+ qd_address_t *addr = router_lookup_terminus_LH(router, term_addr);
+ if (addr) {
//
// This is a link-attach routable target. Propagate the attach downrange.
// Check first for a locally connected container.
//
- qd_router_lrp_ref_t *lrpref = DEQ_HEAD(addr->lrps);
+ qd_link_t *link = rlink->link;
+ pn_link_t *pn_link = qd_link_pn(link);
+ qd_router_lrp_ref_t *lrpref = DEQ_HEAD(addr->lrps);
if (lrpref) {
qd_connection_t *conn = lrpref->lrp->container->conn;
if (conn) {
@@ -1132,7 +1107,7 @@ static int router_incoming_link_handler(
la->peer_link = rlink;
la->peer_qd_link = link;
la->link_name = strdup(pn_link_name(pn_link));
- la->dir = QD_OUTGOING;
+ la->dir = dir;
la->conn = conn;
qd_connection_invoke_deferred(conn, qd_router_attach_routed_link, la);
}
@@ -1155,20 +1130,83 @@ static int router_incoming_link_handler(
la->peer_link = rlink;
la->peer_qd_link = link;
la->link_name = strdup(pn_link_name(pn_link));
- la->dir = QD_OUTGOING;
+ la->dir = dir;
la->conn = out_conn;
qd_connection_invoke_deferred(out_conn, qd_router_attach_routed_link, la);
}
}
- }
+ } else
+ return LINK_ATTACH_NO_PATH;
+ } else
+ return LINK_ATTACH_NO_MATCH;
+ return LINK_ATTACH_FORWARDED;
+}
+
+
+/**
+ * New Incoming Link Handler
+ */
+static int router_incoming_link_handler(void* context, qd_link_t *link)
+{
+ qd_router_t *router = (qd_router_t*) context;
+ pn_link_t *pn_link = qd_link_pn(link);
+ int is_router = qd_router_terminus_is_router(qd_link_remote_source(link));
+ const char *r_tgt = pn_terminus_get_address(qd_link_remote_target(link));
+
+ if (is_router && !qd_router_connection_is_inter_router(qd_link_connection(link))) {
+ qd_log(router->log_source, QD_LOG_WARNING,
+ "Incoming link claims router capability but is not on an inter-router connection");
+ pn_link_close(pn_link);
+ return 0;
+ }
+
+ qd_router_link_t *rlink = new_qd_router_link_t();
+ DEQ_ITEM_INIT(rlink);
+ rlink->link_type = is_router ? QD_LINK_ROUTER : QD_LINK_ENDPOINT;
+ rlink->link_direction = QD_INCOMING;
+ rlink->owning_addr = 0;
+ rlink->waypoint = 0;
+ rlink->link = link;
+ rlink->connected_link = 0;
+ rlink->ref = 0;
+ rlink->target = 0;
+ DEQ_INIT(rlink->event_fifo);
+ DEQ_INIT(rlink->msg_fifo);
+
+ if (!is_router && r_tgt) {
+ rlink->target = (char*) malloc(strlen(r_tgt) + 1);
+ strcpy(rlink->target, r_tgt);
}
+ qd_link_set_context(link, rlink);
+
+ sys_mutex_lock(router->lock);
+ rlink->mask_bit = is_router ? qd_router_find_mask_bit_LH(router, link) : 0;
+ qd_entity_cache_add(QD_ROUTER_LINK_TYPE, rlink);
+ DEQ_INSERT_TAIL(router->links, rlink);
+
+ //
+ // Attempt to link-route this attach
+ //
+ link_attach_result_t la_result = LINK_ATTACH_NO_MATCH;
+ if (!is_router)
+ la_result = qd_router_link_route(router, rlink, r_tgt, QD_OUTGOING);
sys_mutex_unlock(router->lock);
pn_terminus_copy(qd_link_source(link), qd_link_remote_source(link));
pn_terminus_copy(qd_link_target(link), qd_link_remote_target(link));
- pn_link_flow(pn_link, 1000);
- pn_link_open(pn_link);
+
+ //
+ // If link-routing was successful or there was no matching link-route
+ // address, open the link. If link-routing was supposed to work but
+ // there was no reachable destination, close the link.
+ //
+ if (la_result == LINK_ATTACH_NO_PATH)
+ pn_link_close(pn_link);
+ else {
+ pn_link_flow(pn_link, 1000);
+ pn_link_open(pn_link);
+ }
return 0;
}
@@ -1189,6 +1227,7 @@ static int router_outgoing_link_handler(
char phase = '0';
qd_address_semantics_t semantics;
qd_address_t *addr = 0;
+ link_attach_result_t la_result = LINK_ATTACH_NO_MATCH;
if (is_router && !qd_router_connection_is_inter_router(qd_link_connection(link))) {
qd_log(router->log_source, QD_LOG_WARNING,
@@ -1275,33 +1314,36 @@ static int router_outgoing_link_handler(
// address, that address needs to be set up in the address list.
//
char temp_addr[1000]; // TODO: Use pn_string or aprintf.
+ la_result = qd_router_link_route(router, rlink, r_src, QD_INCOMING);
- if (is_dynamic) {
- qd_router_generate_temp_addr(router, temp_addr, 1000);
- iter = qd_field_iterator_string(temp_addr, ITER_VIEW_ADDRESS_HASH);
- pn_terminus_set_address(qd_link_source(link), temp_addr);
- qd_log(router->log_source, QD_LOG_INFO, "Assigned temporary routable address=%s", temp_addr);
- } else
- qd_log(router->log_source, QD_LOG_INFO, "Registered local address=%s phase=%c", r_src, phase);
-
- qd_hash_retrieve(router->addr_hash, iter, (void**) &addr);
- if (!addr) {
- addr = qd_address();
- qd_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
- DEQ_INSERT_TAIL(router->addrs, addr);
- addr->semantics = semantics;
- qd_entity_cache_add(QD_ROUTER_ADDRESS_TYPE, addr);
- }
+ if (la_result == LINK_ATTACH_NO_MATCH) {
+ if (is_dynamic) {
+ qd_router_generate_temp_addr(router, temp_addr, 1000);
+ iter = qd_field_iterator_string(temp_addr, ITER_VIEW_ADDRESS_HASH);
+ pn_terminus_set_address(qd_link_source(link), temp_addr);
+ qd_log(router->log_source, QD_LOG_INFO, "Assigned temporary routable address=%s", temp_addr);
+ } else
+ qd_log(router->log_source, QD_LOG_INFO, "Registered local address=%s phase=%c", r_src, phase);
+
+ qd_hash_retrieve(router->addr_hash, iter, (void**) &addr);
+ if (!addr) {
+ addr = qd_address();
+ qd_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
+ DEQ_INSERT_TAIL(router->addrs, addr);
+ addr->semantics = semantics;
+ qd_entity_cache_add(QD_ROUTER_ADDRESS_TYPE, addr);
+ }
- rlink->owning_addr = addr;
- qd_router_add_link_ref_LH(&addr->rlinks, rlink);
+ rlink->owning_addr = addr;
+ qd_router_add_link_ref_LH(&addr->rlinks, rlink);
- //
- // If this is not a dynamic address and it is the first local subscription
- // to the address, supply the address to the router module for propagation
- // to other nodes.
- //
- propagate = (!is_dynamic) && (DEQ_SIZE(addr->rlinks) == 1);
+ //
+ // If this is not a dynamic address and it is the first local subscription
+ // to the address, supply the address to the router module for propagation
+ // to other nodes.
+ //
+ propagate = (!is_dynamic) && (DEQ_SIZE(addr->rlinks) == 1);
+ }
}
qd_entity_cache_add(QD_ROUTER_LINK_TYPE, rlink);
DEQ_INSERT_TAIL(router->links, rlink);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org