You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2016/04/22 23:55:27 UTC

[6/7] qpid-dispatch git commit: DISPATCH-208 - Updated "closest" implementation to use the node-cost data.

DISPATCH-208 - Updated "closest" implementation to use the node-cost data.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/9b056fca
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/9b056fca
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/9b056fca

Branch: refs/heads/master
Commit: 9b056fcad8fb7a5b23afa27301f55c6c509ffe4d
Parents: ff83fc2
Author: Ted Ross <tr...@redhat.com>
Authored: Fri Apr 22 17:42:59 2016 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Fri Apr 22 17:51:38 2016 -0400

----------------------------------------------------------------------
 src/router_core/connections.c         |  1 +
 src/router_core/forwarder.c           | 46 ++++++++++++++++++++---
 src/router_core/route_tables.c        | 59 +++++++++++++++++++++++++-----
 src/router_core/router_core_private.h |  9 ++++-
 4 files changed, 98 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/9b056fca/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 0a4f1ed..b53d157 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -694,6 +694,7 @@ void qdr_check_addr_CT(qdr_core_t *core, qdr_address_t *addr, bool was_local)
         DEQ_REMOVE(core->addrs, addr);
         qd_hash_handle_free(addr->hash_handle);
         qd_bitmask_free(addr->rnodes);
+        qd_bitmask_free(addr->closest_remotes);
         free_qdr_address_t(addr);
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/9b056fca/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 274beb0..b82a6e5 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -71,6 +71,33 @@ static bool qdr_forward_attach_null_CT(qdr_core_t     *core,
 }
 
 
+static void qdr_forward_find_closest_remotes_CT(qdr_core_t *core, qdr_address_t *addr)
+{
+    qdr_node_t *rnode       = DEQ_HEAD(core->routers);
+    int         lowest_cost = 0;
+
+    if (!addr->closest_remotes)
+        addr->closest_remotes = qd_bitmask(0);
+    addr->cost_epoch  = core->cost_epoch;
+    addr->next_remote = -1;
+
+    qd_bitmask_clear_all(addr->closest_remotes);
+    while (rnode) {
+        if (qd_bitmask_value(addr->rnodes, rnode->mask_bit)) {
+            if (lowest_cost == 0) {
+                lowest_cost = rnode->cost;
+                addr->next_remote = rnode->mask_bit;
+            }
+            if (lowest_cost == rnode->cost)
+                qd_bitmask_set_bit(addr->closest_remotes, rnode->mask_bit);
+            else
+                break;
+        }
+        rnode = DEQ_NEXT(rnode);
+    }
+}
+
+
 qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t *in_dlv, qdr_link_t *link, qd_message_t *msg)
 {
     qdr_delivery_t *dlv = new_qdr_delivery_t();
@@ -347,15 +374,22 @@ int qdr_forward_closest_CT(qdr_core_t      *core,
     // Forward to remote routers with subscribers using the appropriate
     // link for the traffic class: control or data
     //
-    // TODO - presently, this picks one remote link to send to.  This needs
-    //        to be enhanced so it properly chooses the route to the closest destination.
-    //
-    int         router_bit;
     qdr_node_t *next_node;
 
-    if (qd_bitmask_first_set(addr->rnodes, &router_bit)) {
-        qdr_node_t *rnode = core->routers_by_mask_bit[router_bit];
+    //
+    // If the cached list of closest remotes is stale (i.e. cost data has changed),
+    // recompute the closest remote routers.
+    //
+    if (addr->cost_epoch != core->cost_epoch)
+        qdr_forward_find_closest_remotes_CT(core, addr);
+
+    if (addr->next_remote >= 0) {
+        qdr_node_t *rnode = core->routers_by_mask_bit[addr->next_remote];
         if (rnode) {
+            _qdbm_next(addr->closest_remotes, &addr->next_remote);
+            if (addr->next_remote == -1)
+                qd_bitmask_first_set(addr->closest_remotes, &addr->next_remote);
+
             if (rnode->next_hop)
                 next_node = rnode->next_hop;
             else

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/9b056fca/src/router_core/route_tables.c
----------------------------------------------------------------------
diff --git a/src/router_core/route_tables.c b/src/router_core/route_tables.c
index 02c2c65..8f59d39 100644
--- a/src/router_core/route_tables.c
+++ b/src/router_core/route_tables.c
@@ -177,12 +177,49 @@ void qdr_core_unsubscribe(qdr_subscription_t *sub)
 // In-Thread Functions
 //==================================================================================
 
+//
+// React to the updated cost of a router node.  The core->routers list is to be kept
+// sorted by cost, from least to most.
+//
+void qdr_route_table_update_cost_CT(qdr_core_t *core, qdr_node_t *rnode)
+{
+    qdr_node_t *ptr;
+    bool needs_reinsertion = false;
+
+    ptr = DEQ_PREV(rnode);
+    if (ptr && ptr->cost > rnode->cost)
+        needs_reinsertion = true;
+    else {
+        ptr = DEQ_NEXT(rnode);
+        if (ptr && ptr->cost < rnode->cost)
+            needs_reinsertion = true;
+    }
+
+    if (needs_reinsertion) {
+        core->cost_epoch++;
+        DEQ_REMOVE(core->routers, rnode);
+        ptr = DEQ_TAIL(core->routers);
+        while (ptr) {
+            if (rnode->cost >= ptr->cost) {
+                DEQ_INSERT_AFTER(core->routers, rnode, ptr);
+                break;
+            }
+            ptr = DEQ_PREV(ptr);
+        }
+
+        if (!ptr)
+            DEQ_INSERT_HEAD(core->routers, rnode);
+    }
+}
+
+
 void qdr_route_table_setup_CT(qdr_core_t *core)
 {
     DEQ_INIT(core->addrs);
     DEQ_INIT(core->routers);
     core->addr_hash    = qd_hash(12, 32, 0);
     core->conn_id_hash = qd_hash(6, 4, 0);
+    core->cost_epoch   = 1;
 
     if (core->router_mode == QD_ROUTER_MODE_INTERIOR) {
         core->hello_addr      = qdr_add_local_address_CT(core, 'L', "qdhello",     QD_TREATMENT_MULTICAST_FLOOD);
@@ -262,8 +299,15 @@ static void qdr_add_router_CT(qdr_core_t *core, qdr_action_t *action, bool disca
         rnode->peer_data_link    = 0;
         rnode->ref_count         = 0;
         rnode->valid_origins     = qd_bitmask(0);
+        rnode->cost              = 0;
 
-        DEQ_INSERT_TAIL(core->routers, rnode);
+        //
+        // Insert at the head of the list because we don't yet know the cost to this
+        // router node and we've set the cost to zero.  This puts it in a properly-sorted
+        // position.  Also, don't bump the cost_epoch here because this new router won't be
+        // used until it is assigned a cost.
+        //
+        DEQ_INSERT_HEAD(core->routers, rnode);
 
         //
         // Link the router record to the address record.
@@ -338,12 +382,15 @@ static void qdr_del_router_CT(qdr_core_t *core, qdr_action_t *action, bool disca
     //
     qd_bitmask_free(rnode->valid_origins);
     DEQ_REMOVE(core->routers, rnode);
+    core->cost_epoch++;
     free_qdr_node_t(rnode);
 
     qd_hash_remove_by_handle(core->addr_hash, oaddr->hash_handle);
     DEQ_REMOVE(core->addrs, oaddr);
     qd_hash_handle_free(oaddr->hash_handle);
     core->routers_by_mask_bit[router_maskbit] = 0;
+    qd_bitmask_free(oaddr->closest_remotes);
+    qd_bitmask_free(oaddr->rnodes);
     free_qdr_address_t(oaddr);
 }
 
@@ -465,6 +512,7 @@ static void qdr_set_cost_CT(qdr_core_t *core, qdr_action_t *action, bool discard
 
     qdr_node_t *rnode = core->routers_by_mask_bit[router_maskbit];
     rnode->cost = cost;
+    qdr_route_table_update_cost_CT(core, rnode);
 }
 
 
@@ -503,10 +551,6 @@ static void qdr_set_valid_origins_CT(qdr_core_t *core, qdr_action_t *action, boo
 
 static void qdr_map_destination_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
 {
-    //
-    // TODO - handle the class-prefix and phase explicitly
-    //
-
     int          router_maskbit = action->args.route_table.router_maskbit;
     qdr_field_t *address        = action->args.route_table.address;
 
@@ -540,11 +584,8 @@ static void qdr_map_destination_CT(qdr_core_t *core, qdr_action_t *action, bool
         qdr_node_t *rnode = core->routers_by_mask_bit[router_maskbit];
         qd_bitmask_set_bit(addr->rnodes, router_maskbit);
         rnode->ref_count++;
+        addr->cost_epoch--;
         qdr_addr_start_inlinks_CT(core, addr);
-
-        //
-        // TODO - If this affects a waypoint, create the proper side effects
-        //
     } while (false);
 
     qdr_field_free(address);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/9b056fca/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 49f7257..061e346 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -321,6 +321,10 @@ struct qdr_address_t {
     bool                       block_deletion;
     bool                       local;
 
+    uint64_t      cost_epoch;
+    qd_bitmask_t *closest_remotes;
+    int           next_remote;
+
     /**@name Statistics */
     ///@{
     uint64_t deliveries_ingress;
@@ -549,11 +553,12 @@ struct qdr_core_t {
     qdr_address_t             *router_addr_T;
     qdr_address_t             *routerma_addr_T;
 
-    qdr_node_list_t       routers;
+    qdr_node_list_t       routers;            ///< List of routers, in order of cost, from lowest to highest
     qd_bitmask_t         *neighbor_free_mask;
     qdr_node_t          **routers_by_mask_bit;
     qdr_link_t          **control_links_by_mask_bit;
     qdr_link_t          **data_links_by_mask_bit;
+    uint64_t              cost_epoch;
 
     uint64_t              next_tag;
 
@@ -566,7 +571,7 @@ struct qdr_core_t {
 
 void *router_core_thread(void *arg);
 uint64_t qdr_identifier(qdr_core_t* core);
-void qdr_management_agent_on_message(void *context, qd_message_t *msg, int unused_link_id, int unused_cost);
+void qdr_management_agent_on_message(void *context, qd_message_t *msg, int link_id, int cost);
 void  qdr_route_table_setup_CT(qdr_core_t *core);
 void  qdr_agent_setup_CT(qdr_core_t *core);
 void  qdr_forwarder_setup_CT(qdr_core_t *core);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org