You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2018/09/18 12:48:03 UTC
[2/2] qpid-dispatch git commit: DISPATCH 1096 - store priority in
links
DISPATCH 1096 - store priority in links
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/a9ace970
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/a9ace970
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/a9ace970
Branch: refs/heads/master
Commit: a9ace9708ec8c48ca1868eb5998befe81640bfb0
Parents: 94bef70
Author: Michael Goulish <mg...@redhat.com>
Authored: Thu Sep 13 10:33:02 2018 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Tue Sep 18 08:46:43 2018 -0400
----------------------------------------------------------------------
src/router_core/connections.c | 18 ++++++------------
src/router_core/forwarder.c | 14 +++++++-------
src/router_core/router_core_private.h | 3 ++-
src/router_core/transfer.c | 3 +--
4 files changed, 16 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a9ace970/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 2723c6c..32fbf48 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -817,9 +817,7 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li
if (link->link_type == QD_LINK_CONTROL)
core->control_links_by_mask_bit[conn->mask_bit] = 0;
if (link->link_type == QD_LINK_ROUTER)
- for (int priority = 0; priority < QDR_N_PRIORITIES; ++ priority)
- if (link == core->data_links_by_mask_bit[conn->mask_bit].links[priority])
- core->data_links_by_mask_bit[conn->mask_bit].links[priority] = 0;
+ core->data_links_by_mask_bit[conn->mask_bit].links[link->priority] = 0;
}
//
@@ -852,9 +850,7 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li
//
qdr_del_link_ref(&conn->links, link, QDR_LINK_LIST_CLASS_CONNECTION);
sys_mutex_lock(conn->work_lock);
- for (int priority = 0; priority < QDR_N_PRIORITIES; ++ priority) {
- qdr_del_link_ref(conn->links_with_work + priority, link, QDR_LINK_LIST_CLASS_WORK);
- }
+ qdr_del_link_ref(conn->links_with_work + link->priority, link, QDR_LINK_LIST_CLASS_WORK);
sys_mutex_unlock(conn->work_lock);
//
@@ -1438,11 +1434,14 @@ static void qdr_detach_link_control_CT(qdr_core_t *core, qdr_connection_t *conn,
static void qdr_attach_link_data_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_link_t *link)
{
if (conn->role == QDR_ROLE_INTER_ROUTER) {
+ // As inter-router links are attached to this connection, they
+ // are assigned priorities in the order in which they are attached.
int next_slot = core->data_links_by_mask_bit[conn->mask_bit].count ++;
if (next_slot >= QDR_N_PRIORITIES) {
qd_log(core->log, QD_LOG_ERROR, "Attempt to attach too many inter-router links for priority sheaf.");
return;
}
+ link->priority = next_slot;
core->data_links_by_mask_bit[conn->mask_bit].links[next_slot] = link;
}
@@ -1483,12 +1482,7 @@ static void qdr_attach_link_data_CT(qdr_core_t *core, qdr_connection_t *conn, qd
static void qdr_detach_link_data_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_link_t *link)
{
if (conn->role == QDR_ROLE_INTER_ROUTER)
- for (int priority = 0; priority < QDR_N_PRIORITIES; ++ priority) {
- if (link == core->data_links_by_mask_bit[conn->mask_bit].links[priority]) {
- core->data_links_by_mask_bit[conn->mask_bit].links[priority] = 0;
- break;
- }
- }
+ core->data_links_by_mask_bit[conn->mask_bit].links[link->priority] = 0;
//
// TODO - This needs to be refactored in terms of a non-inter-router link type
//
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a9ace970/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 4364c47..8ccb049 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -175,7 +175,7 @@ static void qdr_forward_drop_presettled_CT_LH(qdr_core_t *core, qdr_link_t *link
}
-void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *out_link, qdr_delivery_t *out_dlv, int priority)
+void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *out_link, qdr_delivery_t *out_dlv)
{
sys_mutex_lock(out_link->conn->work_lock);
@@ -208,7 +208,7 @@ void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *out_link, qdr_delivery
work->value = 1;
DEQ_INSERT_TAIL(out_link->work_list, work);
}
- qdr_add_link_ref(out_link->conn->links_with_work + priority, out_link, QDR_LINK_LIST_CLASS_WORK);
+ qdr_add_link_ref(out_link->conn->links_with_work + out_link->priority, out_link, QDR_LINK_LIST_CLASS_WORK);
out_dlv->link_work = work;
sys_mutex_unlock(out_link->conn->work_lock);
@@ -285,7 +285,7 @@ int qdr_forward_multicast_CT(qdr_core_t *core,
while (link_ref) {
qdr_link_t *out_link = link_ref->link;
qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, out_link, msg);
- qdr_forward_deliver_CT(core, out_link, out_delivery, priority);
+ qdr_forward_deliver_CT(core, out_link, out_delivery);
fanout++;
if (out_link->link_type != QD_LINK_CONTROL && out_link->link_type != QD_LINK_ROUTER) {
addr->deliveries_egress++;
@@ -360,7 +360,7 @@ int qdr_forward_multicast_CT(qdr_core_t *core,
core->data_links_by_mask_bit[link_bit].links[priority];
if (dest_link && (!link_exclusion || qd_bitmask_value(link_exclusion, link_bit) == 0)) {
qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, dest_link, msg);
- qdr_forward_deliver_CT(core, dest_link, out_delivery, priority);
+ qdr_forward_deliver_CT(core, dest_link, out_delivery);
fanout++;
addr->deliveries_transit++;
if (dest_link->link_type == QD_LINK_ROUTER)
@@ -485,7 +485,7 @@ int qdr_forward_closest_CT(qdr_core_t *core,
if (link_ref) {
out_link = link_ref->link;
out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, out_link, msg);
- qdr_forward_deliver_CT(core, out_link, out_delivery, qd_message_get_priority(msg));
+ qdr_forward_deliver_CT(core, out_link, out_delivery);
//
// If there are multiple local subscribers, rotate the list of link references
@@ -536,7 +536,7 @@ int qdr_forward_closest_CT(qdr_core_t *core,
out_link = control ? PEER_CONTROL_LINK(core, next_node) : peer_data_link(core, next_node, priority);
if (out_link) {
out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, out_link, msg);
- qdr_forward_deliver_CT(core, out_link, out_delivery, priority);
+ qdr_forward_deliver_CT(core, out_link, out_delivery);
addr->deliveries_transit++;
if (out_link->link_type == QD_LINK_ROUTER)
core->deliveries_transit++;
@@ -685,7 +685,7 @@ int qdr_forward_balanced_CT(qdr_core_t *core,
if (chosen_link) {
qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, chosen_link, msg);
- qdr_forward_deliver_CT(core, chosen_link, out_delivery, qd_message_get_priority(msg));
+ qdr_forward_deliver_CT(core, chosen_link, out_delivery);
//
// If the delivery is unsettled and the link is inter-router, account for the outstanding delivery.
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a9ace970/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 7d9b7e1..ee8930c 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -422,6 +422,7 @@ struct qdr_link_t {
uint64_t released_deliveries;
uint64_t modified_deliveries;
uint64_t *ingress_histogram;
+ uint8_t priority;
};
ALLOC_DECLARE(qdr_link_t);
@@ -846,7 +847,7 @@ void qdr_post_general_work_CT(qdr_core_t *core, qdr_general_work_t *work);
void qdr_check_addr_CT(qdr_core_t *core, qdr_address_t *addr, bool was_local);
bool qdr_is_addr_treatment_multicast(qdr_address_t *addr);
qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t *peer, qdr_link_t *link, qd_message_t *msg);
-void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *dlv, int priority);
+void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *dlv);
void qdr_connection_free(qdr_connection_t *conn);
void qdr_connection_activate_CT(qdr_core_t *core, qdr_connection_t *conn);
qd_address_treatment_t qdr_treatment_for_address_CT(qdr_core_t *core, qdr_connection_t *conn, qd_iterator_t *iter, int *in_phase, int *out_phase);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a9ace970/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index ae1dbd9..68645ea 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -1001,8 +1001,7 @@ static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool dis
peer->tag_length = action->args.connection.tag_length;
memcpy(peer->tag, action->args.connection.tag, peer->tag_length);
- // Adding this work at priority 0.
- qdr_forward_deliver_CT(core, link->connected_link, peer, 0);
+ qdr_forward_deliver_CT(core, link->connected_link, peer);
link->total_deliveries++;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org