You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2016/05/04 04:08:31 UTC
[2/2] qpid-dispatch git commit: DISPATCH-57 - Fixed balanced
forwarding so it a) wont involve "full" links,
and b) will honor the valid-origin for messages when choosing a path to the
destination, and c) uses the router cost as a threshold for using inte
DISPATCH-57 - Fixed balanced forwarding so it a) wont involve "full" links, and b) will honor
the valid-origin for messages when choosing a path to the destination, and c) uses the router
cost as a threshold for using inter-router links.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/8fa96352
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/8fa96352
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/8fa96352
Branch: refs/heads/master
Commit: 8fa9635252abac997266f2388ba5a5844f34e88b
Parents: 704003c
Author: Ted Ross <tr...@redhat.com>
Authored: Tue May 3 21:59:48 2016 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Tue May 3 22:05:48 2016 -0400
----------------------------------------------------------------------
src/router_core/connections.c | 10 ++-
src/router_core/forwarder.c | 126 ++++++++++++++++++++---------
src/router_core/route_tables.c | 1 -
src/router_core/router_core_private.h | 12 ++-
src/router_core/transfer.c | 7 ++
5 files changed, 111 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/8fa96352/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 69379b1..55638eb 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -276,7 +276,7 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn,
link->name = (char*) malloc(strlen(name) + 1);
strcpy(link->name, name);
link->link_direction = dir;
- link->capacity = dir == QD_INCOMING ? conn->link_capacity : 0;
+ link->capacity = conn->link_capacity;
link->admin_enabled = true;
link->oper_status = QDR_LINK_OPER_DOWN;
@@ -691,12 +691,16 @@ void qdr_check_addr_CT(qdr_core_t *core, qdr_address_t *addr, bool was_local)
// deleted.
//
if (DEQ_SIZE(addr->subscriptions) == 0 && DEQ_SIZE(addr->rlinks) == 0 && DEQ_SIZE(addr->inlinks) == 0 &&
- qd_bitmask_cardinality(addr->rnodes) == 0 && addr->ref_count == 0 && !addr->block_deletion) {
+ qd_bitmask_cardinality(addr->rnodes) == 0 && addr->ref_count == 0 && !addr->block_deletion &&
+ addr->tracked_deliveries == 0) {
qd_hash_remove_by_handle(core->addr_hash, addr->hash_handle);
DEQ_REMOVE(core->addrs, addr);
qd_hash_handle_free(addr->hash_handle);
qd_bitmask_free(addr->rnodes);
- qd_bitmask_free(addr->closest_remotes);
+ if (addr->treatment == QD_TREATMENT_ANYCAST_CLOSEST)
+ qd_bitmask_free(addr->closest_remotes);
+ else if (addr->treatment == QD_TREATMENT_ANYCAST_BALANCED)
+ free(addr->outstanding_deliveries);
free_qdr_address_t(addr);
}
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/8fa96352/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index b82a6e5..836bc02 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -371,18 +371,18 @@ int qdr_forward_closest_CT(qdr_core_t *core,
}
//
- // Forward to remote routers with subscribers using the appropriate
- // link for the traffic class: control or data
- //
- qdr_node_t *next_node;
-
- //
// If the cached list of closest remotes is stale (i.e. cost data has changed),
// recompute the closest remote routers.
//
if (addr->cost_epoch != core->cost_epoch)
qdr_forward_find_closest_remotes_CT(core, addr);
+ //
+ // Forward to remote routers with subscribers using the appropriate
+ // link for the traffic class: control or data
+ //
+ qdr_node_t *next_node;
+
if (addr->next_remote >= 0) {
qdr_node_t *rnode = core->routers_by_mask_bit[addr->next_remote];
if (rnode) {
@@ -416,13 +416,29 @@ int qdr_forward_balanced_CT(qdr_core_t *core,
bool exclude_inprocess,
bool control)
{
- qdr_link_t *out_link = 0;
- uint32_t link_backlog = UINT32_MAX;
- bool transit = false;
+ //
+ // Control messages should never use balanced treatment.
+ //
+ assert(!control);
+
+ //
+ // If this is the first time through here, allocate the array for outstanding delivery counts.
+ //
+ if (addr->outstanding_deliveries == 0) {
+ addr->outstanding_deliveries = NEW_ARRAY(int, qd_bitmask_width());
+ for (int i = 0; i < qd_bitmask_width(); i++)
+ addr->outstanding_deliveries[i] = 0;
+ }
+
+ qdr_link_t *chosen_link = 0;
+ int chosen_link_bit = -1;
+ uint32_t link_value = UINT32_MAX;
+ bool transit = false;
//
// Find all the possible outbound links for this delivery, searching for the one with the
- // smallest backlog.
+ // smallest eligible value. Value = outstanding_deliveries + minimum_downrange_cost.
+ // A link is ineligible if the outstanding_deliveries is equal to the link's capacity.
//
//
@@ -430,51 +446,81 @@ int qdr_forward_balanced_CT(qdr_core_t *core,
//
qdr_link_ref_t *link_ref = DEQ_HEAD(addr->rlinks);
while (link_ref) {
- qdr_link_t *link = link_ref->link;
- uint32_t backlog = DEQ_SIZE(link->undelivered) + DEQ_SIZE(link->unsettled);
+ qdr_link_t *link = link_ref->link;
+ uint32_t value = DEQ_SIZE(link->undelivered) + DEQ_SIZE(link->unsettled);
+ bool eligible = link->capacity > value;
- if (!out_link || link_backlog > backlog) {
- out_link = link;
- link_backlog = backlog;
+ //
+ // If this is the best eligible link thus far, choose it.
+ //
+ if (eligible && link_value > value) {
+ chosen_link = link;
+ link_value = value;
}
link_ref = DEQ_NEXT(link_ref);
}
- if (!out_link || link_backlog > 0) {
+ //
+ // If we haven't already found a link with zero (best possible) value, check the
+ // inter-router links as well.
+ //
+ if (!chosen_link || link_value > 0) {
//
- // If we haven't already found a link with zero backlog, check the
- // remotes as well.
+ // Get the mask bit associated with the ingress router for the message.
+ // This will be compared against the "valid_origin" masks for each
+ // candidate destination router.
//
- int router_bit;
- int c;
- qdr_node_t *next_node;
-
- for (QD_BITMASK_EACH(addr->rnodes, router_bit, c)) {
- qdr_node_t *rnode = core->routers_by_mask_bit[router_bit];
- if (rnode) {
- if (rnode->next_hop)
- next_node = rnode->next_hop;
- else
- next_node = rnode;
+ int origin = 0;
+ qd_field_iterator_t *ingress_iter = in_delivery ? in_delivery->origin : 0;
+
+ if (ingress_iter) {
+ qd_address_iterator_reset_view(ingress_iter, ITER_VIEW_NODE_HASH);
+ qdr_address_t *origin_addr;
+ qd_hash_retrieve(core->addr_hash, ingress_iter, (void*) &origin_addr);
+ if (origin_addr && qd_bitmask_cardinality(origin_addr->rnodes) == 1)
+ qd_bitmask_first_set(origin_addr->rnodes, &origin);
+ }
- qdr_link_t *link = control ? next_node->peer_control_link : next_node->peer_data_link;
- if (link) {
- uint32_t backlog = DEQ_SIZE(link->undelivered) + DEQ_SIZE(link->unsettled);
- if (backlog < link_backlog) {
- out_link = link;
- link_backlog = backlog;
- transit = true;
- }
+ int c;
+ int node_bit;
+ for (QD_BITMASK_EACH(addr->rnodes, node_bit, c)) {
+ qdr_node_t *rnode = core->routers_by_mask_bit[node_bit];
+ qdr_node_t *next_node = rnode->next_hop ? rnode->next_hop : rnode;
+ qdr_link_t *link = next_node->peer_data_link;
+ int link_bit = link->conn->mask_bit;
+ int value = addr->outstanding_deliveries[link_bit];
+ if (value < link->capacity && qd_bitmask_value(rnode->valid_origins, origin)) {
+ //
+ // Link is eligible, adjust the value by the bias (node cost).
+ //
+ value += rnode->cost;
+ if (link_value > value) {
+ chosen_link = link;
+ chosen_link_bit = link_bit;
+ link_value = value;
+ transit = true;
}
}
}
}
- if (out_link) {
- qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, out_link, msg);
- qdr_forward_deliver_CT(core, out_link, out_delivery);
+ if (chosen_link) {
+ qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, chosen_link, msg);
+ qdr_forward_deliver_CT(core, chosen_link, out_delivery);
+
+ //
+ // If the delivery is unsettled and the link is inter-router, account for the outstanding delivery.
+ //
+ if (!in_delivery->settled && chosen_link_bit >= 0) {
+ addr->outstanding_deliveries[chosen_link_bit]++;
+ out_delivery->tracking_addr = addr;
+ addr->tracked_deliveries++;
+ }
+ //
+ // Bump the appropriate counter based on where we sent the delivery.
+ //
if (transit)
addr->deliveries_transit++;
else
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/8fa96352/src/router_core/route_tables.c
----------------------------------------------------------------------
diff --git a/src/router_core/route_tables.c b/src/router_core/route_tables.c
index 8f59d39..d8fd860 100644
--- a/src/router_core/route_tables.c
+++ b/src/router_core/route_tables.c
@@ -389,7 +389,6 @@ static void qdr_del_router_CT(qdr_core_t *core, qdr_action_t *action, bool disca
DEQ_REMOVE(core->addrs, oaddr);
qd_hash_handle_free(oaddr->hash_handle);
core->routers_by_mask_bit[router_maskbit] = 0;
- qd_bitmask_free(oaddr->closest_remotes);
qd_bitmask_free(oaddr->rnodes);
free_qdr_address_t(oaddr);
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/8fa96352/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index a3e0b17..36e3ec0 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -210,6 +210,7 @@ struct qdr_delivery_t {
uint8_t tag[32];
int tag_length;
qd_bitmask_t *link_exclusion;
+ qdr_address_t *tracking_addr;
};
ALLOC_DECLARE(qdr_delivery_t);
@@ -320,11 +321,20 @@ struct qdr_address_t {
int ref_count; ///< Number of link-routes + auto-links referencing this address
bool block_deletion;
bool local;
+ uint32_t tracked_deliveries;
+ uint64_t cost_epoch;
- uint64_t cost_epoch;
+ //
+ // State for "closest" treatment
+ //
qd_bitmask_t *closest_remotes;
int next_remote;
+ //
+ // State for "balanced" treatment
+ //
+ int *outstanding_deliveries;
+
/**@name Statistics */
///@{
uint64_t deliveries_ingress;
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/8fa96352/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index ffe11c9..c79b089 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -295,6 +295,13 @@ bool qdr_delivery_settled_CT(qdr_core_t *core, qdr_delivery_t *dlv)
if (link->link_direction == QD_OUTGOING)
sys_mutex_unlock(conn->work_lock);
+ if (dlv->tracking_addr) {
+ int link_bit = link->conn->mask_bit;
+ dlv->tracking_addr->outstanding_deliveries[link_bit]--;
+ dlv->tracking_addr->tracked_deliveries--;
+ dlv->tracking_addr = 0;
+ }
+
//
// If this is an incoming link and it is not link-routed, issue
// one replacement credit on the link.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org