You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2016/04/22 23:55:27 UTC
[6/7] qpid-dispatch git commit: DISPATCH-208 - Updated "closest"
implementation to use the node-cost data.
DISPATCH-208 - Updated "closest" implementation to use the node-cost data.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/9b056fca
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/9b056fca
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/9b056fca
Branch: refs/heads/master
Commit: 9b056fcad8fb7a5b23afa27301f55c6c509ffe4d
Parents: ff83fc2
Author: Ted Ross <tr...@redhat.com>
Authored: Fri Apr 22 17:42:59 2016 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Fri Apr 22 17:51:38 2016 -0400
----------------------------------------------------------------------
src/router_core/connections.c | 1 +
src/router_core/forwarder.c | 46 ++++++++++++++++++++---
src/router_core/route_tables.c | 59 +++++++++++++++++++++++++-----
src/router_core/router_core_private.h | 9 ++++-
4 files changed, 98 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/9b056fca/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 0a4f1ed..b53d157 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -694,6 +694,7 @@ void qdr_check_addr_CT(qdr_core_t *core, qdr_address_t *addr, bool was_local)
DEQ_REMOVE(core->addrs, addr);
qd_hash_handle_free(addr->hash_handle);
qd_bitmask_free(addr->rnodes);
+ qd_bitmask_free(addr->closest_remotes);
free_qdr_address_t(addr);
}
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/9b056fca/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 274beb0..b82a6e5 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -71,6 +71,33 @@ static bool qdr_forward_attach_null_CT(qdr_core_t *core,
}
+static void qdr_forward_find_closest_remotes_CT(qdr_core_t *core, qdr_address_t *addr)
+{
+ qdr_node_t *rnode = DEQ_HEAD(core->routers);
+ int lowest_cost = 0;
+
+ if (!addr->closest_remotes)
+ addr->closest_remotes = qd_bitmask(0);
+ addr->cost_epoch = core->cost_epoch;
+ addr->next_remote = -1;
+
+ qd_bitmask_clear_all(addr->closest_remotes);
+ while (rnode) {
+ if (qd_bitmask_value(addr->rnodes, rnode->mask_bit)) {
+ if (lowest_cost == 0) {
+ lowest_cost = rnode->cost;
+ addr->next_remote = rnode->mask_bit;
+ }
+ if (lowest_cost == rnode->cost)
+ qd_bitmask_set_bit(addr->closest_remotes, rnode->mask_bit);
+ else
+ break;
+ }
+ rnode = DEQ_NEXT(rnode);
+ }
+}
+
+
qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t *in_dlv, qdr_link_t *link, qd_message_t *msg)
{
qdr_delivery_t *dlv = new_qdr_delivery_t();
@@ -347,15 +374,22 @@ int qdr_forward_closest_CT(qdr_core_t *core,
// Forward to remote routers with subscribers using the appropriate
// link for the traffic class: control or data
//
- // TODO - presently, this picks one remote link to send to. This needs
- // to be enhanced so it properly chooses the route to the closest destination.
- //
- int router_bit;
qdr_node_t *next_node;
- if (qd_bitmask_first_set(addr->rnodes, &router_bit)) {
- qdr_node_t *rnode = core->routers_by_mask_bit[router_bit];
+ //
+ // If the cached list of closest remotes is stale (i.e. cost data has changed),
+ // recompute the closest remote routers.
+ //
+ if (addr->cost_epoch != core->cost_epoch)
+ qdr_forward_find_closest_remotes_CT(core, addr);
+
+ if (addr->next_remote >= 0) {
+ qdr_node_t *rnode = core->routers_by_mask_bit[addr->next_remote];
if (rnode) {
+ _qdbm_next(addr->closest_remotes, &addr->next_remote);
+ if (addr->next_remote == -1)
+ qd_bitmask_first_set(addr->closest_remotes, &addr->next_remote);
+
if (rnode->next_hop)
next_node = rnode->next_hop;
else
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/9b056fca/src/router_core/route_tables.c
----------------------------------------------------------------------
diff --git a/src/router_core/route_tables.c b/src/router_core/route_tables.c
index 02c2c65..8f59d39 100644
--- a/src/router_core/route_tables.c
+++ b/src/router_core/route_tables.c
@@ -177,12 +177,49 @@ void qdr_core_unsubscribe(qdr_subscription_t *sub)
// In-Thread Functions
//==================================================================================
+//
+// React to the updated cost of a router node. The core->routers list is to be kept
+// sorted by cost, from least to most.
+//
+void qdr_route_table_update_cost_CT(qdr_core_t *core, qdr_node_t *rnode)
+{
+ qdr_node_t *ptr;
+ bool needs_reinsertion = false;
+
+ ptr = DEQ_PREV(rnode);
+ if (ptr && ptr->cost > rnode->cost)
+ needs_reinsertion = true;
+ else {
+ ptr = DEQ_NEXT(rnode);
+ if (ptr && ptr->cost < rnode->cost)
+ needs_reinsertion = true;
+ }
+
+ if (needs_reinsertion) {
+ core->cost_epoch++;
+ DEQ_REMOVE(core->routers, rnode);
+ ptr = DEQ_TAIL(core->routers);
+ while (ptr) {
+ if (rnode->cost >= ptr->cost) {
+ DEQ_INSERT_AFTER(core->routers, rnode, ptr);
+ break;
+ }
+ ptr = DEQ_PREV(ptr);
+ }
+
+ if (!ptr)
+ DEQ_INSERT_HEAD(core->routers, rnode);
+ }
+}
+
+
void qdr_route_table_setup_CT(qdr_core_t *core)
{
DEQ_INIT(core->addrs);
DEQ_INIT(core->routers);
core->addr_hash = qd_hash(12, 32, 0);
core->conn_id_hash = qd_hash(6, 4, 0);
+ core->cost_epoch = 1;
if (core->router_mode == QD_ROUTER_MODE_INTERIOR) {
core->hello_addr = qdr_add_local_address_CT(core, 'L', "qdhello", QD_TREATMENT_MULTICAST_FLOOD);
@@ -262,8 +299,15 @@ static void qdr_add_router_CT(qdr_core_t *core, qdr_action_t *action, bool disca
rnode->peer_data_link = 0;
rnode->ref_count = 0;
rnode->valid_origins = qd_bitmask(0);
+ rnode->cost = 0;
- DEQ_INSERT_TAIL(core->routers, rnode);
+ //
+ // Insert at the head of the list because we don't yet know the cost to this
+ // router node and we've set the cost to zero. This puts it in a properly-sorted
+ // position. Also, don't bump the cost_epoch here because this new router won't be
+ // used until it is assigned a cost.
+ //
+ DEQ_INSERT_HEAD(core->routers, rnode);
//
// Link the router record to the address record.
@@ -338,12 +382,15 @@ static void qdr_del_router_CT(qdr_core_t *core, qdr_action_t *action, bool disca
//
qd_bitmask_free(rnode->valid_origins);
DEQ_REMOVE(core->routers, rnode);
+ core->cost_epoch++;
free_qdr_node_t(rnode);
qd_hash_remove_by_handle(core->addr_hash, oaddr->hash_handle);
DEQ_REMOVE(core->addrs, oaddr);
qd_hash_handle_free(oaddr->hash_handle);
core->routers_by_mask_bit[router_maskbit] = 0;
+ qd_bitmask_free(oaddr->closest_remotes);
+ qd_bitmask_free(oaddr->rnodes);
free_qdr_address_t(oaddr);
}
@@ -465,6 +512,7 @@ static void qdr_set_cost_CT(qdr_core_t *core, qdr_action_t *action, bool discard
qdr_node_t *rnode = core->routers_by_mask_bit[router_maskbit];
rnode->cost = cost;
+ qdr_route_table_update_cost_CT(core, rnode);
}
@@ -503,10 +551,6 @@ static void qdr_set_valid_origins_CT(qdr_core_t *core, qdr_action_t *action, boo
static void qdr_map_destination_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
- //
- // TODO - handle the class-prefix and phase explicitly
- //
-
int router_maskbit = action->args.route_table.router_maskbit;
qdr_field_t *address = action->args.route_table.address;
@@ -540,11 +584,8 @@ static void qdr_map_destination_CT(qdr_core_t *core, qdr_action_t *action, bool
qdr_node_t *rnode = core->routers_by_mask_bit[router_maskbit];
qd_bitmask_set_bit(addr->rnodes, router_maskbit);
rnode->ref_count++;
+ addr->cost_epoch--;
qdr_addr_start_inlinks_CT(core, addr);
-
- //
- // TODO - If this affects a waypoint, create the proper side effects
- //
} while (false);
qdr_field_free(address);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/9b056fca/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 49f7257..061e346 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -321,6 +321,10 @@ struct qdr_address_t {
bool block_deletion;
bool local;
+ uint64_t cost_epoch;
+ qd_bitmask_t *closest_remotes;
+ int next_remote;
+
/**@name Statistics */
///@{
uint64_t deliveries_ingress;
@@ -549,11 +553,12 @@ struct qdr_core_t {
qdr_address_t *router_addr_T;
qdr_address_t *routerma_addr_T;
- qdr_node_list_t routers;
+ qdr_node_list_t routers; ///< List of routers, in order of cost, from lowest to highest
qd_bitmask_t *neighbor_free_mask;
qdr_node_t **routers_by_mask_bit;
qdr_link_t **control_links_by_mask_bit;
qdr_link_t **data_links_by_mask_bit;
+ uint64_t cost_epoch;
uint64_t next_tag;
@@ -566,7 +571,7 @@ struct qdr_core_t {
void *router_core_thread(void *arg);
uint64_t qdr_identifier(qdr_core_t* core);
-void qdr_management_agent_on_message(void *context, qd_message_t *msg, int unused_link_id, int unused_cost);
+void qdr_management_agent_on_message(void *context, qd_message_t *msg, int link_id, int cost);
void qdr_route_table_setup_CT(qdr_core_t *core);
void qdr_agent_setup_CT(qdr_core_t *core);
void qdr_forwarder_setup_CT(qdr_core_t *core);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org