You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2018/09/18 12:48:02 UTC

[1/2] qpid-dispatch git commit: DISPATCH-1096 - Added a new link-list-class for local (on-stack) lists. Tightened up some critical sections. This closes #375

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master 94bef705d -> 92f00baf6


DISPATCH-1096 - Added a new link-list-class for local (on-stack) lists.  Tightened up some critical sections.
This closes #375


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/92f00baf
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/92f00baf
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/92f00baf

Branch: refs/heads/master
Commit: 92f00baf66533d884cbeba186405cfee480f6992
Parents: a9ace97
Author: Ted Ross <tr...@redhat.com>
Authored: Tue Sep 18 08:42:48 2018 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Tue Sep 18 08:46:43 2018 -0400

----------------------------------------------------------------------
 src/router_core/connections.c         | 23 ++++++++++++++++++-----
 src/router_core/router_core.c         |  8 ++++++++
 src/router_core/router_core_private.h |  4 +++-
 3 files changed, 29 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/92f00baf/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 32fbf48..cbece87 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -218,6 +218,16 @@ int qdr_connection_process(qdr_connection_t *conn)
     DEQ_MOVE(conn->work_list, work_list);
     for (int priority = 0; priority < QDR_N_PRIORITIES; ++ priority) {
         DEQ_MOVE(conn->links_with_work[priority], links_with_work[priority]);
+
+        //
+        // Move the references from CLASS_WORK to CLASS_LOCAL so concurrent action in the core
+        // thread doesn't assume these links are referenced from the connection's list.
+        //
+        ref = DEQ_HEAD(links_with_work[priority]);
+        while (ref) {
+            move_link_ref(ref->link, QDR_LINK_LIST_CLASS_WORK, QDR_LINK_LIST_CLASS_LOCAL);
+            ref = DEQ_NEXT(ref);
+        }
     }
     sys_mutex_unlock(conn->work_lock);
 
@@ -249,23 +259,26 @@ int qdr_connection_process(qdr_connection_t *conn)
             qdr_link_work_t *link_work;
             free_link = false;
 
-            sys_mutex_lock(conn->work_lock);
             ref = DEQ_HEAD(links_with_work[priority]);
             if (ref) {
                 link = ref->link;
-                qdr_del_link_ref(links_with_work + priority, ref->link, QDR_LINK_LIST_CLASS_WORK);
+                qdr_del_link_ref(links_with_work + priority, ref->link, QDR_LINK_LIST_CLASS_LOCAL);
 
+                //
+                // The work lock must be used to protect accesses to the link's work_list and
+                // link_work->processing.
+                //
+                sys_mutex_lock(conn->work_lock);
                 link_work = DEQ_HEAD(link->work_list);
                 if (link_work) {
                     DEQ_REMOVE_HEAD(link->work_list);
                     link_work->processing = true;
                 }
+                sys_mutex_unlock(conn->work_lock);
             } else
                 link = 0;
-            sys_mutex_unlock(conn->work_lock);
 
             if (link) {
-
                 //
                 // Handle disposition/settlement updates
                 //
@@ -848,8 +861,8 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li
     //
     // Remove the reference to this link in the connection's reference lists
     //
-    qdr_del_link_ref(&conn->links, link, QDR_LINK_LIST_CLASS_CONNECTION);
     sys_mutex_lock(conn->work_lock);
+    qdr_del_link_ref(&conn->links, link, QDR_LINK_LIST_CLASS_CONNECTION);
     qdr_del_link_ref(conn->links_with_work + link->priority, link, QDR_LINK_LIST_CLASS_WORK);
     sys_mutex_unlock(conn->work_lock);
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/92f00baf/src/router_core/router_core.c
----------------------------------------------------------------------
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index 580cb92..e41a633 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -451,6 +451,14 @@ void qdr_del_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link, int cls)
 }
 
 
+void move_link_ref(qdr_link_t *link, int from_cls, int to_cls)
+{
+    assert(link->ref[to_cls] == 0);
+    link->ref[to_cls] = link->ref[from_cls];
+    link->ref[from_cls] = 0;
+}
+
+
 void qdr_add_connection_ref(qdr_connection_ref_list_t *ref_list, qdr_connection_t *conn)
 {
     qdr_connection_ref_t *ref = new_qdr_connection_ref_t();

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/92f00baf/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index ee8930c..019449c 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -370,7 +370,8 @@ void qdr_del_delivery_ref(qdr_delivery_ref_list_t *list, qdr_delivery_ref_t *ref
 #define QDR_LINK_LIST_CLASS_ADDRESS    0
 #define QDR_LINK_LIST_CLASS_WORK       1
 #define QDR_LINK_LIST_CLASS_CONNECTION 2
-#define QDR_LINK_LIST_CLASSES          3
+#define QDR_LINK_LIST_CLASS_LOCAL      3
+#define QDR_LINK_LIST_CLASSES          4
 
 typedef enum {
     QDR_LINK_OPER_UP,
@@ -438,6 +439,7 @@ DEQ_DECLARE(qdr_link_ref_t, qdr_link_ref_list_t);
 
 void qdr_add_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link, int cls);
 void qdr_del_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link, int cls);
+void move_link_ref(qdr_link_t *link, int from_cls, int to_cls);
 
 
 struct qdr_connection_ref_t {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/2] qpid-dispatch git commit: DISPATCH 1096 - store priority in links

Posted by tr...@apache.org.
DISPATCH 1096 - store priority in links


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/a9ace970
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/a9ace970
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/a9ace970

Branch: refs/heads/master
Commit: a9ace9708ec8c48ca1868eb5998befe81640bfb0
Parents: 94bef70
Author: Michael Goulish <mg...@redhat.com>
Authored: Thu Sep 13 10:33:02 2018 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Tue Sep 18 08:46:43 2018 -0400

----------------------------------------------------------------------
 src/router_core/connections.c         | 18 ++++++------------
 src/router_core/forwarder.c           | 14 +++++++-------
 src/router_core/router_core_private.h |  3 ++-
 src/router_core/transfer.c            |  3 +--
 4 files changed, 16 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a9ace970/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 2723c6c..32fbf48 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -817,9 +817,7 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li
         if (link->link_type == QD_LINK_CONTROL)
             core->control_links_by_mask_bit[conn->mask_bit] = 0;
         if (link->link_type == QD_LINK_ROUTER)
-            for (int priority = 0; priority < QDR_N_PRIORITIES; ++ priority)
-                if (link == core->data_links_by_mask_bit[conn->mask_bit].links[priority])
-                    core->data_links_by_mask_bit[conn->mask_bit].links[priority] = 0;
+            core->data_links_by_mask_bit[conn->mask_bit].links[link->priority] = 0;
     }
 
     //
@@ -852,9 +850,7 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li
     //
     qdr_del_link_ref(&conn->links, link, QDR_LINK_LIST_CLASS_CONNECTION);
     sys_mutex_lock(conn->work_lock);
-    for (int priority = 0; priority < QDR_N_PRIORITIES; ++ priority) {
-        qdr_del_link_ref(conn->links_with_work + priority, link, QDR_LINK_LIST_CLASS_WORK);
-    }
+    qdr_del_link_ref(conn->links_with_work + link->priority, link, QDR_LINK_LIST_CLASS_WORK);
     sys_mutex_unlock(conn->work_lock);
 
     //
@@ -1438,11 +1434,14 @@ static void qdr_detach_link_control_CT(qdr_core_t *core, qdr_connection_t *conn,
 static void qdr_attach_link_data_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_link_t *link)
 {
     if (conn->role == QDR_ROLE_INTER_ROUTER) {
+        // As inter-router links are attached to this connection, they
+        // are assigned priorities in the order in which they are attached.
         int next_slot = core->data_links_by_mask_bit[conn->mask_bit].count ++;
         if (next_slot >= QDR_N_PRIORITIES) {
             qd_log(core->log, QD_LOG_ERROR, "Attempt to attach too many inter-router links for priority sheaf.");
             return;
         }
+        link->priority = next_slot;
         core->data_links_by_mask_bit[conn->mask_bit].links[next_slot] = link;
     }
 
@@ -1483,12 +1482,7 @@ static void qdr_attach_link_data_CT(qdr_core_t *core, qdr_connection_t *conn, qd
 static void qdr_detach_link_data_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_link_t *link)
 {
     if (conn->role == QDR_ROLE_INTER_ROUTER)
-        for (int priority = 0; priority < QDR_N_PRIORITIES; ++ priority) {
-            if (link == core->data_links_by_mask_bit[conn->mask_bit].links[priority]) {
-                core->data_links_by_mask_bit[conn->mask_bit].links[priority] = 0;
-                break;
-            }
-        }
+        core->data_links_by_mask_bit[conn->mask_bit].links[link->priority] = 0;
     //
     // TODO - This needs to be refactored in terms of a non-inter-router link type
     //

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a9ace970/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 4364c47..8ccb049 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -175,7 +175,7 @@ static void qdr_forward_drop_presettled_CT_LH(qdr_core_t *core, qdr_link_t *link
 }
 
 
-void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *out_link, qdr_delivery_t *out_dlv, int priority)
+void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *out_link, qdr_delivery_t *out_dlv)
 {
     sys_mutex_lock(out_link->conn->work_lock);
 
@@ -208,7 +208,7 @@ void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *out_link, qdr_delivery
         work->value     = 1;
         DEQ_INSERT_TAIL(out_link->work_list, work);
     }
-    qdr_add_link_ref(out_link->conn->links_with_work + priority, out_link, QDR_LINK_LIST_CLASS_WORK);
+    qdr_add_link_ref(out_link->conn->links_with_work + out_link->priority, out_link, QDR_LINK_LIST_CLASS_WORK);
 
     out_dlv->link_work = work;
     sys_mutex_unlock(out_link->conn->work_lock);
@@ -285,7 +285,7 @@ int qdr_forward_multicast_CT(qdr_core_t      *core,
         while (link_ref) {
             qdr_link_t     *out_link     = link_ref->link;
             qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, out_link, msg);
-            qdr_forward_deliver_CT(core, out_link, out_delivery, priority);
+            qdr_forward_deliver_CT(core, out_link, out_delivery);
             fanout++;
             if (out_link->link_type != QD_LINK_CONTROL && out_link->link_type != QD_LINK_ROUTER) {
                 addr->deliveries_egress++;
@@ -360,7 +360,7 @@ int qdr_forward_multicast_CT(qdr_core_t      *core,
                 core->data_links_by_mask_bit[link_bit].links[priority];
             if (dest_link && (!link_exclusion || qd_bitmask_value(link_exclusion, link_bit) == 0)) {
                 qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, dest_link, msg);
-                qdr_forward_deliver_CT(core, dest_link, out_delivery, priority);
+                qdr_forward_deliver_CT(core, dest_link, out_delivery);
                 fanout++;
                 addr->deliveries_transit++;
                 if (dest_link->link_type == QD_LINK_ROUTER)
@@ -485,7 +485,7 @@ int qdr_forward_closest_CT(qdr_core_t      *core,
     if (link_ref) {
         out_link     = link_ref->link;
         out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, out_link, msg);
-        qdr_forward_deliver_CT(core, out_link, out_delivery, qd_message_get_priority(msg));
+        qdr_forward_deliver_CT(core, out_link, out_delivery);
 
         //
         // If there are multiple local subscribers, rotate the list of link references
@@ -536,7 +536,7 @@ int qdr_forward_closest_CT(qdr_core_t      *core,
             out_link = control ? PEER_CONTROL_LINK(core, next_node) : peer_data_link(core, next_node, priority);
             if (out_link) {
                 out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, out_link, msg);
-                qdr_forward_deliver_CT(core, out_link, out_delivery, priority);
+                qdr_forward_deliver_CT(core, out_link, out_delivery);
                 addr->deliveries_transit++;
                 if (out_link->link_type == QD_LINK_ROUTER)
                     core->deliveries_transit++;
@@ -685,7 +685,7 @@ int qdr_forward_balanced_CT(qdr_core_t      *core,
 
     if (chosen_link) {
         qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, chosen_link, msg);
-        qdr_forward_deliver_CT(core, chosen_link, out_delivery, qd_message_get_priority(msg));
+        qdr_forward_deliver_CT(core, chosen_link, out_delivery);
 
         //
         // If the delivery is unsettled and the link is inter-router, account for the outstanding delivery.

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a9ace970/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 7d9b7e1..ee8930c 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -422,6 +422,7 @@ struct qdr_link_t {
     uint64_t  released_deliveries;
     uint64_t  modified_deliveries;
     uint64_t *ingress_histogram;
+    uint8_t   priority;
 };
 
 ALLOC_DECLARE(qdr_link_t);
@@ -846,7 +847,7 @@ void qdr_post_general_work_CT(qdr_core_t *core, qdr_general_work_t *work);
 void qdr_check_addr_CT(qdr_core_t *core, qdr_address_t *addr, bool was_local);
 bool qdr_is_addr_treatment_multicast(qdr_address_t *addr);
 qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t *peer, qdr_link_t *link, qd_message_t *msg);
-void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *dlv, int priority);
+void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *dlv);
 void qdr_connection_free(qdr_connection_t *conn);
 void qdr_connection_activate_CT(qdr_core_t *core, qdr_connection_t *conn);
 qd_address_treatment_t qdr_treatment_for_address_CT(qdr_core_t *core, qdr_connection_t *conn, qd_iterator_t *iter, int *in_phase, int *out_phase);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a9ace970/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index ae1dbd9..68645ea 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -1001,8 +1001,7 @@ static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool dis
         peer->tag_length = action->args.connection.tag_length;
         memcpy(peer->tag, action->args.connection.tag, peer->tag_length);
 
-        // Adding this work at priority 0.
-        qdr_forward_deliver_CT(core, link->connected_link, peer, 0);
+        qdr_forward_deliver_CT(core, link->connected_link, peer);
 
         link->total_deliveries++;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org