You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2016/03/29 15:22:05 UTC

qpid-dispatch git commit: DISPATCH-57 - Implemented "balanced" delivery based on lowest number of unsettled deliveries.

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master 73e238660 -> 8475aca8f


DISPATCH-57 - Implemented "balanced" delivery based on lowest number of unsettled deliveries.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/8475aca8
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/8475aca8
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/8475aca8

Branch: refs/heads/master
Commit: 8475aca8f61e52128df3f932db1d880129196ea9
Parents: 73e2386
Author: Ted Ross <tr...@redhat.com>
Authored: Tue Mar 29 09:19:46 2016 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Tue Mar 29 09:19:46 2016 -0400

----------------------------------------------------------------------
 src/router_core/forwarder.c           | 70 ++++++++++++++++++++++++++++--
 src/router_core/router_core_private.h |  7 ---
 2 files changed, 67 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/8475aca8/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 8e41995..52789b1 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -340,8 +340,6 @@ int qdr_forward_closest_CT(qdr_core_t      *core,
         }
     }
 
-
-
     return 0;
 }
 
@@ -353,7 +351,73 @@ int qdr_forward_balanced_CT(qdr_core_t      *core,
                             bool             exclude_inprocess,
                             bool             control)
 {
-    return qdr_forward_closest_CT(core, addr, msg, in_delivery, exclude_inprocess, control);
+    qdr_link_t *out_link = 0;
+    uint32_t    link_backlog;
+    bool        transit = false;
+
+    //
+    // Find all the possible outbound links for this delivery, searching for the one with the
+    // smallest backlog.
+    //
+
+    //
+    // Start with the local links
+    //
+    qdr_link_ref_t *link_ref = DEQ_HEAD(addr->rlinks);
+    while (link_ref) {
+        qdr_link_t *link    = link_ref->link;
+        uint32_t    backlog = DEQ_SIZE(link->undelivered) + DEQ_SIZE(link->unsettled);
+
+        if (!out_link || link_backlog > backlog) {
+            out_link     = link;
+            link_backlog = backlog;
+        }
+
+        link_ref = DEQ_NEXT(link_ref);
+    }
+
+    if (!out_link || link_backlog > 0) {
+        //
+        // If we haven't already found a link with zero backlog, check the
+        // remotes as well.
+        //
+        int         router_bit;
+        int         c;
+        qdr_node_t *next_node;
+
+        for (QD_BITMASK_EACH(addr->rnodes, router_bit, c)) {
+            qdr_node_t *rnode = core->routers_by_mask_bit[router_bit];
+            if (rnode) {
+                if (rnode->next_hop)
+                    next_node = rnode->next_hop;
+                else
+                    next_node = rnode;
+
+                qdr_link_t *link = control ? next_node->peer_control_link : next_node->peer_data_link;
+                if (link) {
+                    uint32_t backlog = DEQ_SIZE(link->undelivered) + DEQ_SIZE(link->unsettled);
+                    if (backlog < link_backlog) {
+                        out_link     = link;
+                        link_backlog = backlog;
+                        transit      = true;
+                    }
+                }
+            }
+        }
+    }
+
+    if (out_link) {
+        qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, out_link, msg);
+        qdr_forward_deliver_CT(core, out_link, out_delivery);
+
+        if (transit)
+            addr->deliveries_transit++;
+        else
+            addr->deliveries_egress++;
+        return 1;
+    }
+
+    return 0;
 }
 
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/8475aca8/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 29de0f6..1a61620 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -314,7 +314,6 @@ struct qdr_address_t {
     qd_address_treatment_t     treatment;
     qdr_forwarder_t           *forwarder;
     int                        ref_count;     ///< Number of link-routes + auto-links referencing this address
-    bool                       toggle;
     bool                       block_deletion;
     bool                       local;
 
@@ -557,12 +556,6 @@ struct qdr_core_t {
     qdr_forwarder_t      *forwarders[QD_TREATMENT_LINK_BALANCED + 1];
 };
 
-typedef enum {
-    PASSTHROUGH,
-    TAP,
-    BYPASS
-} qdr_waypoint_mode_t;
-
 void *router_core_thread(void *arg);
 uint64_t qdr_identifier(qdr_core_t* core);
 void qdr_management_agent_on_message(void *context, qd_message_t *msg, int unused_link_id);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org