You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2016/05/04 04:08:31 UTC

[2/2] qpid-dispatch git commit: DISPATCH-57 - Fixed balanced forwarding so it a) wont involve "full" links, and b) will honor the valid-origin for messages when choosing a path to the destination, and c) uses the router cost as a threshold for using inte

DISPATCH-57 - Fixed balanced forwarding so it a) wont involve "full" links, and b) will honor
the valid-origin for messages when choosing a path to the destination, and c) uses the router
cost as a threshold for using inter-router links.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/8fa96352
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/8fa96352
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/8fa96352

Branch: refs/heads/master
Commit: 8fa9635252abac997266f2388ba5a5844f34e88b
Parents: 704003c
Author: Ted Ross <tr...@redhat.com>
Authored: Tue May 3 21:59:48 2016 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Tue May 3 22:05:48 2016 -0400

----------------------------------------------------------------------
 src/router_core/connections.c         |  10 ++-
 src/router_core/forwarder.c           | 126 ++++++++++++++++++++---------
 src/router_core/route_tables.c        |   1 -
 src/router_core/router_core_private.h |  12 ++-
 src/router_core/transfer.c            |   7 ++
 5 files changed, 111 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/8fa96352/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 69379b1..55638eb 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -276,7 +276,7 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn,
     link->name = (char*) malloc(strlen(name) + 1);
     strcpy(link->name, name);
     link->link_direction = dir;
-    link->capacity       = dir == QD_INCOMING ? conn->link_capacity : 0;
+    link->capacity       = conn->link_capacity;
     link->admin_enabled  = true;
     link->oper_status    = QDR_LINK_OPER_DOWN;
 
@@ -691,12 +691,16 @@ void qdr_check_addr_CT(qdr_core_t *core, qdr_address_t *addr, bool was_local)
     // deleted.
     //
     if (DEQ_SIZE(addr->subscriptions) == 0 && DEQ_SIZE(addr->rlinks) == 0 && DEQ_SIZE(addr->inlinks) == 0 &&
-        qd_bitmask_cardinality(addr->rnodes) == 0 && addr->ref_count == 0 && !addr->block_deletion) {
+        qd_bitmask_cardinality(addr->rnodes) == 0 && addr->ref_count == 0 && !addr->block_deletion &&
+        addr->tracked_deliveries == 0) {
         qd_hash_remove_by_handle(core->addr_hash, addr->hash_handle);
         DEQ_REMOVE(core->addrs, addr);
         qd_hash_handle_free(addr->hash_handle);
         qd_bitmask_free(addr->rnodes);
-        qd_bitmask_free(addr->closest_remotes);
+        if      (addr->treatment == QD_TREATMENT_ANYCAST_CLOSEST)
+            qd_bitmask_free(addr->closest_remotes);
+        else if (addr->treatment == QD_TREATMENT_ANYCAST_BALANCED)
+            free(addr->outstanding_deliveries);
         free_qdr_address_t(addr);
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/8fa96352/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index b82a6e5..836bc02 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -371,18 +371,18 @@ int qdr_forward_closest_CT(qdr_core_t      *core,
     }
 
     //
-    // Forward to remote routers with subscribers using the appropriate
-    // link for the traffic class: control or data
-    //
-    qdr_node_t *next_node;
-
-    //
     // If the cached list of closest remotes is stale (i.e. cost data has changed),
     // recompute the closest remote routers.
     //
     if (addr->cost_epoch != core->cost_epoch)
         qdr_forward_find_closest_remotes_CT(core, addr);
 
+    //
+    // Forward to remote routers with subscribers using the appropriate
+    // link for the traffic class: control or data
+    //
+    qdr_node_t *next_node;
+
     if (addr->next_remote >= 0) {
         qdr_node_t *rnode = core->routers_by_mask_bit[addr->next_remote];
         if (rnode) {
@@ -416,13 +416,29 @@ int qdr_forward_balanced_CT(qdr_core_t      *core,
                             bool             exclude_inprocess,
                             bool             control)
 {
-    qdr_link_t *out_link = 0;
-    uint32_t    link_backlog = UINT32_MAX;
-    bool        transit = false;
+    //
+    // Control messages should never use balanced treatment.
+    //
+    assert(!control);
+
+    //
+    // If this is the first time through here, allocate the array for outstanding delivery counts.
+    //
+    if (addr->outstanding_deliveries == 0) {
+        addr->outstanding_deliveries = NEW_ARRAY(int, qd_bitmask_width());
+        for (int i = 0; i < qd_bitmask_width(); i++)
+            addr->outstanding_deliveries[i] = 0;
+    }
+
+    qdr_link_t *chosen_link     = 0;
+    int         chosen_link_bit = -1;
+    uint32_t    link_value      = UINT32_MAX;
+    bool        transit         = false;
 
     //
     // Find all the possible outbound links for this delivery, searching for the one with the
-    // smallest backlog.
+    // smallest eligible value.  Value = outstanding_deliveries + minimum_downrange_cost.
+    // A link is ineligible if the outstanding_deliveries is equal to the link's capacity.
     //
 
     //
@@ -430,51 +446,81 @@ int qdr_forward_balanced_CT(qdr_core_t      *core,
     //
     qdr_link_ref_t *link_ref = DEQ_HEAD(addr->rlinks);
     while (link_ref) {
-        qdr_link_t *link    = link_ref->link;
-        uint32_t    backlog = DEQ_SIZE(link->undelivered) + DEQ_SIZE(link->unsettled);
+        qdr_link_t *link     = link_ref->link;
+        uint32_t    value    = DEQ_SIZE(link->undelivered) + DEQ_SIZE(link->unsettled);
+        bool        eligible = link->capacity > value;
 
-        if (!out_link || link_backlog > backlog) {
-            out_link     = link;
-            link_backlog = backlog;
+        //
+        // If this is the best eligible link thus far, choose it.
+        //
+        if (eligible && link_value > value) {
+            chosen_link = link;
+            link_value  = value;
         }
 
         link_ref = DEQ_NEXT(link_ref);
     }
 
-    if (!out_link || link_backlog > 0) {
+    //
+    // If we haven't already found a link with zero (best possible) value, check the
+    // inter-router links as well.
+    //
+    if (!chosen_link || link_value > 0) {
         //
-        // If we haven't already found a link with zero backlog, check the
-        // remotes as well.
+        // Get the mask bit associated with the ingress router for the message.
+        // This will be compared against the "valid_origin" masks for each
+        // candidate destination router.
         //
-        int         router_bit;
-        int         c;
-        qdr_node_t *next_node;
-
-        for (QD_BITMASK_EACH(addr->rnodes, router_bit, c)) {
-            qdr_node_t *rnode = core->routers_by_mask_bit[router_bit];
-            if (rnode) {
-                if (rnode->next_hop)
-                    next_node = rnode->next_hop;
-                else
-                    next_node = rnode;
+        int origin = 0;
+        qd_field_iterator_t *ingress_iter = in_delivery ? in_delivery->origin : 0;
+
+        if (ingress_iter) {
+            qd_address_iterator_reset_view(ingress_iter, ITER_VIEW_NODE_HASH);
+            qdr_address_t *origin_addr;
+            qd_hash_retrieve(core->addr_hash, ingress_iter, (void*) &origin_addr);
+            if (origin_addr && qd_bitmask_cardinality(origin_addr->rnodes) == 1)
+                qd_bitmask_first_set(origin_addr->rnodes, &origin);
+        }
 
-                qdr_link_t *link = control ? next_node->peer_control_link : next_node->peer_data_link;
-                if (link) {
-                    uint32_t backlog = DEQ_SIZE(link->undelivered) + DEQ_SIZE(link->unsettled);
-                    if (backlog < link_backlog) {
-                        out_link     = link;
-                        link_backlog = backlog;
-                        transit      = true;
-                    }
+        int c;
+        int node_bit;
+        for (QD_BITMASK_EACH(addr->rnodes, node_bit, c)) {
+            qdr_node_t *rnode     = core->routers_by_mask_bit[node_bit];
+            qdr_node_t *next_node = rnode->next_hop ? rnode->next_hop : rnode;
+            qdr_link_t *link      = next_node->peer_data_link;
+            int         link_bit  = link->conn->mask_bit;
+            int         value     = addr->outstanding_deliveries[link_bit];
+            if (value < link->capacity && qd_bitmask_value(rnode->valid_origins, origin)) {
+                //
+                // Link is eligible, adjust the value by the bias (node cost).
+                //
+                value += rnode->cost;
+                if (link_value > value) {
+                    chosen_link     = link;
+                    chosen_link_bit = link_bit;
+                    link_value      = value;
+                    transit         = true;
                 }
             }
         }
     }
 
-    if (out_link) {
-        qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, out_link, msg);
-        qdr_forward_deliver_CT(core, out_link, out_delivery);
+    if (chosen_link) {
+        qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, chosen_link, msg);
+        qdr_forward_deliver_CT(core, chosen_link, out_delivery);
+
+        //
+        // If the delivery is unsettled and the link is inter-router, account for the outstanding delivery.
+        //
+        if (!in_delivery->settled && chosen_link_bit >= 0) {
+            addr->outstanding_deliveries[chosen_link_bit]++;
+            out_delivery->tracking_addr = addr;
+            addr->tracked_deliveries++;
+        }
 
+        //
+        // Bump the appropriate counter based on where we sent the delivery.
+        //
         if (transit)
             addr->deliveries_transit++;
         else

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/8fa96352/src/router_core/route_tables.c
----------------------------------------------------------------------
diff --git a/src/router_core/route_tables.c b/src/router_core/route_tables.c
index 8f59d39..d8fd860 100644
--- a/src/router_core/route_tables.c
+++ b/src/router_core/route_tables.c
@@ -389,7 +389,6 @@ static void qdr_del_router_CT(qdr_core_t *core, qdr_action_t *action, bool disca
     DEQ_REMOVE(core->addrs, oaddr);
     qd_hash_handle_free(oaddr->hash_handle);
     core->routers_by_mask_bit[router_maskbit] = 0;
-    qd_bitmask_free(oaddr->closest_remotes);
     qd_bitmask_free(oaddr->rnodes);
     free_qdr_address_t(oaddr);
 }

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/8fa96352/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index a3e0b17..36e3ec0 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -210,6 +210,7 @@ struct qdr_delivery_t {
     uint8_t              tag[32];
     int                  tag_length;
     qd_bitmask_t        *link_exclusion;
+    qdr_address_t       *tracking_addr;
 };
 
 ALLOC_DECLARE(qdr_delivery_t);
@@ -320,11 +321,20 @@ struct qdr_address_t {
     int                        ref_count;     ///< Number of link-routes + auto-links referencing this address
     bool                       block_deletion;
     bool                       local;
+    uint32_t                   tracked_deliveries;
+    uint64_t                   cost_epoch;
 
-    uint64_t      cost_epoch;
+    //
+    // State for "closest" treatment
+    //
     qd_bitmask_t *closest_remotes;
     int           next_remote;
 
+    //
+    // State for "balanced" treatment
+    //
+    int *outstanding_deliveries;
+    
     /**@name Statistics */
     ///@{
     uint64_t deliveries_ingress;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/8fa96352/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index ffe11c9..c79b089 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -295,6 +295,13 @@ bool qdr_delivery_settled_CT(qdr_core_t *core, qdr_delivery_t *dlv)
     if (link->link_direction == QD_OUTGOING)
         sys_mutex_unlock(conn->work_lock);
 
+    if (dlv->tracking_addr) {
+        int link_bit = link->conn->mask_bit;
+        dlv->tracking_addr->outstanding_deliveries[link_bit]--;
+        dlv->tracking_addr->tracked_deliveries--;
+        dlv->tracking_addr = 0;
+    }
+
     //
     // If this is an incoming link and it is not link-routed, issue
     // one replacement credit on the link.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org