You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2016/03/29 15:22:05 UTC
qpid-dispatch git commit: DISPATCH-57 - Implemented "balanced"
delivery based on lowest number of unsettled deliveries.
Repository: qpid-dispatch
Updated Branches:
refs/heads/master 73e238660 -> 8475aca8f
DISPATCH-57 - Implemented "balanced" delivery based on lowest number of unsettled deliveries.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/8475aca8
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/8475aca8
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/8475aca8
Branch: refs/heads/master
Commit: 8475aca8f61e52128df3f932db1d880129196ea9
Parents: 73e2386
Author: Ted Ross <tr...@redhat.com>
Authored: Tue Mar 29 09:19:46 2016 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Tue Mar 29 09:19:46 2016 -0400
----------------------------------------------------------------------
src/router_core/forwarder.c | 70 ++++++++++++++++++++++++++++--
src/router_core/router_core_private.h | 7 ---
2 files changed, 67 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/8475aca8/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 8e41995..52789b1 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -340,8 +340,6 @@ int qdr_forward_closest_CT(qdr_core_t *core,
}
}
-
-
return 0;
}
@@ -353,7 +351,73 @@ int qdr_forward_balanced_CT(qdr_core_t *core,
bool exclude_inprocess,
bool control)
{
- return qdr_forward_closest_CT(core, addr, msg, in_delivery, exclude_inprocess, control);
+ qdr_link_t *out_link = 0;
+ uint32_t link_backlog;
+ bool transit = false;
+
+ //
+ // Find all the possible outbound links for this delivery, searching for the one with the
+ // smallest backlog.
+ //
+
+ //
+ // Start with the local links
+ //
+ qdr_link_ref_t *link_ref = DEQ_HEAD(addr->rlinks);
+ while (link_ref) {
+ qdr_link_t *link = link_ref->link;
+ uint32_t backlog = DEQ_SIZE(link->undelivered) + DEQ_SIZE(link->unsettled);
+
+ if (!out_link || link_backlog > backlog) {
+ out_link = link;
+ link_backlog = backlog;
+ }
+
+ link_ref = DEQ_NEXT(link_ref);
+ }
+
+ if (!out_link || link_backlog > 0) {
+ //
+ // If we haven't already found a link with zero backlog, check the
+ // remotes as well.
+ //
+ int router_bit;
+ int c;
+ qdr_node_t *next_node;
+
+ for (QD_BITMASK_EACH(addr->rnodes, router_bit, c)) {
+ qdr_node_t *rnode = core->routers_by_mask_bit[router_bit];
+ if (rnode) {
+ if (rnode->next_hop)
+ next_node = rnode->next_hop;
+ else
+ next_node = rnode;
+
+ qdr_link_t *link = control ? next_node->peer_control_link : next_node->peer_data_link;
+ if (link) {
+ uint32_t backlog = DEQ_SIZE(link->undelivered) + DEQ_SIZE(link->unsettled);
+ if (backlog < link_backlog) {
+ out_link = link;
+ link_backlog = backlog;
+ transit = true;
+ }
+ }
+ }
+ }
+ }
+
+ if (out_link) {
+ qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, out_link, msg);
+ qdr_forward_deliver_CT(core, out_link, out_delivery);
+
+ if (transit)
+ addr->deliveries_transit++;
+ else
+ addr->deliveries_egress++;
+ return 1;
+ }
+
+ return 0;
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/8475aca8/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 29de0f6..1a61620 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -314,7 +314,6 @@ struct qdr_address_t {
qd_address_treatment_t treatment;
qdr_forwarder_t *forwarder;
int ref_count; ///< Number of link-routes + auto-links referencing this address
- bool toggle;
bool block_deletion;
bool local;
@@ -557,12 +556,6 @@ struct qdr_core_t {
qdr_forwarder_t *forwarders[QD_TREATMENT_LINK_BALANCED + 1];
};
-typedef enum {
- PASSTHROUGH,
- TAP,
- BYPASS
-} qdr_waypoint_mode_t;
-
void *router_core_thread(void *arg);
uint64_t qdr_identifier(qdr_core_t* core);
void qdr_management_agent_on_message(void *context, qd_message_t *msg, int unused_link_id);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org