You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2018/09/18 12:48:02 UTC
[1/2] qpid-dispatch git commit: DISPATCH-1096 - Added a new
link-list-class for local (on-stack) lists. Tightened up some critical
sections. This closes #375
Repository: qpid-dispatch
Updated Branches:
refs/heads/master 94bef705d -> 92f00baf6
DISPATCH-1096 - Added a new link-list-class for local (on-stack) lists. Tightened up some critical sections.
This closes #375
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/92f00baf
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/92f00baf
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/92f00baf
Branch: refs/heads/master
Commit: 92f00baf66533d884cbeba186405cfee480f6992
Parents: a9ace97
Author: Ted Ross <tr...@redhat.com>
Authored: Tue Sep 18 08:42:48 2018 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Tue Sep 18 08:46:43 2018 -0400
----------------------------------------------------------------------
src/router_core/connections.c | 23 ++++++++++++++++++-----
src/router_core/router_core.c | 8 ++++++++
src/router_core/router_core_private.h | 4 +++-
3 files changed, 29 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/92f00baf/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 32fbf48..cbece87 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -218,6 +218,16 @@ int qdr_connection_process(qdr_connection_t *conn)
DEQ_MOVE(conn->work_list, work_list);
for (int priority = 0; priority < QDR_N_PRIORITIES; ++ priority) {
DEQ_MOVE(conn->links_with_work[priority], links_with_work[priority]);
+
+ //
+ // Move the references from CLASS_WORK to CLASS_LOCAL so concurrent action in the core
+ // thread doesn't assume these links are referenced from the connection's list.
+ //
+ ref = DEQ_HEAD(links_with_work[priority]);
+ while (ref) {
+ move_link_ref(ref->link, QDR_LINK_LIST_CLASS_WORK, QDR_LINK_LIST_CLASS_LOCAL);
+ ref = DEQ_NEXT(ref);
+ }
}
sys_mutex_unlock(conn->work_lock);
@@ -249,23 +259,26 @@ int qdr_connection_process(qdr_connection_t *conn)
qdr_link_work_t *link_work;
free_link = false;
- sys_mutex_lock(conn->work_lock);
ref = DEQ_HEAD(links_with_work[priority]);
if (ref) {
link = ref->link;
- qdr_del_link_ref(links_with_work + priority, ref->link, QDR_LINK_LIST_CLASS_WORK);
+ qdr_del_link_ref(links_with_work + priority, ref->link, QDR_LINK_LIST_CLASS_LOCAL);
+ //
+ // The work lock must be used to protect accesses to the link's work_list and
+ // link_work->processing.
+ //
+ sys_mutex_lock(conn->work_lock);
link_work = DEQ_HEAD(link->work_list);
if (link_work) {
DEQ_REMOVE_HEAD(link->work_list);
link_work->processing = true;
}
+ sys_mutex_unlock(conn->work_lock);
} else
link = 0;
- sys_mutex_unlock(conn->work_lock);
if (link) {
-
//
// Handle disposition/settlement updates
//
@@ -848,8 +861,8 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li
//
// Remove the reference to this link in the connection's reference lists
//
- qdr_del_link_ref(&conn->links, link, QDR_LINK_LIST_CLASS_CONNECTION);
sys_mutex_lock(conn->work_lock);
+ qdr_del_link_ref(&conn->links, link, QDR_LINK_LIST_CLASS_CONNECTION);
qdr_del_link_ref(conn->links_with_work + link->priority, link, QDR_LINK_LIST_CLASS_WORK);
sys_mutex_unlock(conn->work_lock);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/92f00baf/src/router_core/router_core.c
----------------------------------------------------------------------
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index 580cb92..e41a633 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -451,6 +451,14 @@ void qdr_del_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link, int cls)
}
+void move_link_ref(qdr_link_t *link, int from_cls, int to_cls)
+{
+ assert(link->ref[to_cls] == 0);
+ link->ref[to_cls] = link->ref[from_cls];
+ link->ref[from_cls] = 0;
+}
+
+
void qdr_add_connection_ref(qdr_connection_ref_list_t *ref_list, qdr_connection_t *conn)
{
qdr_connection_ref_t *ref = new_qdr_connection_ref_t();
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/92f00baf/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index ee8930c..019449c 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -370,7 +370,8 @@ void qdr_del_delivery_ref(qdr_delivery_ref_list_t *list, qdr_delivery_ref_t *ref
#define QDR_LINK_LIST_CLASS_ADDRESS 0
#define QDR_LINK_LIST_CLASS_WORK 1
#define QDR_LINK_LIST_CLASS_CONNECTION 2
-#define QDR_LINK_LIST_CLASSES 3
+#define QDR_LINK_LIST_CLASS_LOCAL 3
+#define QDR_LINK_LIST_CLASSES 4
typedef enum {
QDR_LINK_OPER_UP,
@@ -438,6 +439,7 @@ DEQ_DECLARE(qdr_link_ref_t, qdr_link_ref_list_t);
void qdr_add_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link, int cls);
void qdr_del_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link, int cls);
+void move_link_ref(qdr_link_t *link, int from_cls, int to_cls);
struct qdr_connection_ref_t {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/2] qpid-dispatch git commit: DISPATCH 1096 - store priority in
links
Posted by tr...@apache.org.
DISPATCH 1096 - store priority in links
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/a9ace970
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/a9ace970
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/a9ace970
Branch: refs/heads/master
Commit: a9ace9708ec8c48ca1868eb5998befe81640bfb0
Parents: 94bef70
Author: Michael Goulish <mg...@redhat.com>
Authored: Thu Sep 13 10:33:02 2018 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Tue Sep 18 08:46:43 2018 -0400
----------------------------------------------------------------------
src/router_core/connections.c | 18 ++++++------------
src/router_core/forwarder.c | 14 +++++++-------
src/router_core/router_core_private.h | 3 ++-
src/router_core/transfer.c | 3 +--
4 files changed, 16 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a9ace970/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 2723c6c..32fbf48 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -817,9 +817,7 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li
if (link->link_type == QD_LINK_CONTROL)
core->control_links_by_mask_bit[conn->mask_bit] = 0;
if (link->link_type == QD_LINK_ROUTER)
- for (int priority = 0; priority < QDR_N_PRIORITIES; ++ priority)
- if (link == core->data_links_by_mask_bit[conn->mask_bit].links[priority])
- core->data_links_by_mask_bit[conn->mask_bit].links[priority] = 0;
+ core->data_links_by_mask_bit[conn->mask_bit].links[link->priority] = 0;
}
//
@@ -852,9 +850,7 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li
//
qdr_del_link_ref(&conn->links, link, QDR_LINK_LIST_CLASS_CONNECTION);
sys_mutex_lock(conn->work_lock);
- for (int priority = 0; priority < QDR_N_PRIORITIES; ++ priority) {
- qdr_del_link_ref(conn->links_with_work + priority, link, QDR_LINK_LIST_CLASS_WORK);
- }
+ qdr_del_link_ref(conn->links_with_work + link->priority, link, QDR_LINK_LIST_CLASS_WORK);
sys_mutex_unlock(conn->work_lock);
//
@@ -1438,11 +1434,14 @@ static void qdr_detach_link_control_CT(qdr_core_t *core, qdr_connection_t *conn,
static void qdr_attach_link_data_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_link_t *link)
{
if (conn->role == QDR_ROLE_INTER_ROUTER) {
+ // As inter-router links are attached to this connection, they
+ // are assigned priorities in the order in which they are attached.
int next_slot = core->data_links_by_mask_bit[conn->mask_bit].count ++;
if (next_slot >= QDR_N_PRIORITIES) {
qd_log(core->log, QD_LOG_ERROR, "Attempt to attach too many inter-router links for priority sheaf.");
return;
}
+ link->priority = next_slot;
core->data_links_by_mask_bit[conn->mask_bit].links[next_slot] = link;
}
@@ -1483,12 +1482,7 @@ static void qdr_attach_link_data_CT(qdr_core_t *core, qdr_connection_t *conn, qd
static void qdr_detach_link_data_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_link_t *link)
{
if (conn->role == QDR_ROLE_INTER_ROUTER)
- for (int priority = 0; priority < QDR_N_PRIORITIES; ++ priority) {
- if (link == core->data_links_by_mask_bit[conn->mask_bit].links[priority]) {
- core->data_links_by_mask_bit[conn->mask_bit].links[priority] = 0;
- break;
- }
- }
+ core->data_links_by_mask_bit[conn->mask_bit].links[link->priority] = 0;
//
// TODO - This needs to be refactored in terms of a non-inter-router link type
//
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a9ace970/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 4364c47..8ccb049 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -175,7 +175,7 @@ static void qdr_forward_drop_presettled_CT_LH(qdr_core_t *core, qdr_link_t *link
}
-void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *out_link, qdr_delivery_t *out_dlv, int priority)
+void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *out_link, qdr_delivery_t *out_dlv)
{
sys_mutex_lock(out_link->conn->work_lock);
@@ -208,7 +208,7 @@ void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *out_link, qdr_delivery
work->value = 1;
DEQ_INSERT_TAIL(out_link->work_list, work);
}
- qdr_add_link_ref(out_link->conn->links_with_work + priority, out_link, QDR_LINK_LIST_CLASS_WORK);
+ qdr_add_link_ref(out_link->conn->links_with_work + out_link->priority, out_link, QDR_LINK_LIST_CLASS_WORK);
out_dlv->link_work = work;
sys_mutex_unlock(out_link->conn->work_lock);
@@ -285,7 +285,7 @@ int qdr_forward_multicast_CT(qdr_core_t *core,
while (link_ref) {
qdr_link_t *out_link = link_ref->link;
qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, out_link, msg);
- qdr_forward_deliver_CT(core, out_link, out_delivery, priority);
+ qdr_forward_deliver_CT(core, out_link, out_delivery);
fanout++;
if (out_link->link_type != QD_LINK_CONTROL && out_link->link_type != QD_LINK_ROUTER) {
addr->deliveries_egress++;
@@ -360,7 +360,7 @@ int qdr_forward_multicast_CT(qdr_core_t *core,
core->data_links_by_mask_bit[link_bit].links[priority];
if (dest_link && (!link_exclusion || qd_bitmask_value(link_exclusion, link_bit) == 0)) {
qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, dest_link, msg);
- qdr_forward_deliver_CT(core, dest_link, out_delivery, priority);
+ qdr_forward_deliver_CT(core, dest_link, out_delivery);
fanout++;
addr->deliveries_transit++;
if (dest_link->link_type == QD_LINK_ROUTER)
@@ -485,7 +485,7 @@ int qdr_forward_closest_CT(qdr_core_t *core,
if (link_ref) {
out_link = link_ref->link;
out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, out_link, msg);
- qdr_forward_deliver_CT(core, out_link, out_delivery, qd_message_get_priority(msg));
+ qdr_forward_deliver_CT(core, out_link, out_delivery);
//
// If there are multiple local subscribers, rotate the list of link references
@@ -536,7 +536,7 @@ int qdr_forward_closest_CT(qdr_core_t *core,
out_link = control ? PEER_CONTROL_LINK(core, next_node) : peer_data_link(core, next_node, priority);
if (out_link) {
out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, out_link, msg);
- qdr_forward_deliver_CT(core, out_link, out_delivery, priority);
+ qdr_forward_deliver_CT(core, out_link, out_delivery);
addr->deliveries_transit++;
if (out_link->link_type == QD_LINK_ROUTER)
core->deliveries_transit++;
@@ -685,7 +685,7 @@ int qdr_forward_balanced_CT(qdr_core_t *core,
if (chosen_link) {
qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, chosen_link, msg);
- qdr_forward_deliver_CT(core, chosen_link, out_delivery, qd_message_get_priority(msg));
+ qdr_forward_deliver_CT(core, chosen_link, out_delivery);
//
// If the delivery is unsettled and the link is inter-router, account for the outstanding delivery.
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a9ace970/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 7d9b7e1..ee8930c 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -422,6 +422,7 @@ struct qdr_link_t {
uint64_t released_deliveries;
uint64_t modified_deliveries;
uint64_t *ingress_histogram;
+ uint8_t priority;
};
ALLOC_DECLARE(qdr_link_t);
@@ -846,7 +847,7 @@ void qdr_post_general_work_CT(qdr_core_t *core, qdr_general_work_t *work);
void qdr_check_addr_CT(qdr_core_t *core, qdr_address_t *addr, bool was_local);
bool qdr_is_addr_treatment_multicast(qdr_address_t *addr);
qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t *peer, qdr_link_t *link, qd_message_t *msg);
-void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *dlv, int priority);
+void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *dlv);
void qdr_connection_free(qdr_connection_t *conn);
void qdr_connection_activate_CT(qdr_core_t *core, qdr_connection_t *conn);
qd_address_treatment_t qdr_treatment_for_address_CT(qdr_core_t *core, qdr_connection_t *conn, qd_iterator_t *iter, int *in_phase, int *out_phase);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a9ace970/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index ae1dbd9..68645ea 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -1001,8 +1001,7 @@ static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool dis
peer->tag_length = action->args.connection.tag_length;
memcpy(peer->tag, action->args.connection.tag, peer->tag_length);
- // Adding this work at priority 0.
- qdr_forward_deliver_CT(core, link->connected_link, peer, 0);
+ qdr_forward_deliver_CT(core, link->connected_link, peer);
link->total_deliveries++;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org