You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2019/11/26 19:38:02 UTC

[qpid-dispatch] branch master updated: DISPATCH-1475: prevent I/O thread from accessing link during deletion

This is an automated email from the ASF dual-hosted git repository.

kgiusti pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c5ee1c  DISPATCH-1475: prevent I/O thread from accessing link during deletion
8c5ee1c is described below

commit 8c5ee1cdc7cf93b997834696e50017b5a84f8361
Author: Kenneth Giusti <kg...@apache.org>
AuthorDate: Tue Nov 26 09:53:04 2019 -0500

    DISPATCH-1475: prevent I/O thread from accessing link during deletion
    
    Fix includes consistently adding link to proper priority slot in
    connections links_with_work array.
---
 src/router_core/connections.c | 14 +++++++++-----
 src/router_core/delivery.c    |  6 ++----
 src/router_core/forwarder.c   |  2 +-
 src/router_core/transfer.c    |  6 ++----
 4 files changed, 14 insertions(+), 14 deletions(-)

diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 4dc2092..29d9e06 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -679,8 +679,7 @@ void qdr_link_enqueue_work_CT(qdr_core_t      *core,
 
     sys_mutex_lock(conn->work_lock);
     DEQ_INSERT_TAIL(link->work_list, work);
-    // Enqueue work at priority 0.
-    qdr_add_link_ref(conn->links_with_work, link, QDR_LINK_LIST_CLASS_WORK);
+    qdr_add_link_ref(&conn->links_with_work[link->priority], link, QDR_LINK_LIST_CLASS_WORK);
     sys_mutex_unlock(conn->work_lock);
 
     qdr_connection_activate_CT(core, conn);
@@ -975,7 +974,7 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li
     //
     sys_mutex_lock(conn->work_lock);
     qdr_del_link_ref(&conn->links, link, QDR_LINK_LIST_CLASS_CONNECTION);
-    qdr_del_link_ref(conn->links_with_work + link->priority, link, QDR_LINK_LIST_CLASS_WORK);
+    qdr_del_link_ref(&conn->links_with_work[link->priority], link, QDR_LINK_LIST_CLASS_WORK);
     sys_mutex_unlock(conn->work_lock);
 
     if (link->ref[QDR_LINK_LIST_CLASS_ADDRESS]) {
@@ -1018,8 +1017,13 @@ static void qdr_link_cleanup_protected_CT(qdr_core_t *core, qdr_connection_t *co
     bool do_cleanup = false;
 
     sys_mutex_lock(conn->work_lock);
-    if (link->processing)
+    // prevent an I/O thread from processing this link (DISPATCH-1475)
+    qdr_del_link_ref(&conn->links_with_work[link->priority], link, QDR_LINK_LIST_CLASS_WORK);
+    if (link->processing) {
+        // Cannot cleanup link because I/O thread is currently processing it
+        // Mark it so the I/O thread will notify the core when processing is complete
         link->ready_to_free = true;
+    }
     else
         do_cleanup = true;
     sys_mutex_unlock(conn->work_lock);
@@ -1368,7 +1372,7 @@ static void qdr_connection_closed_CT(qdr_core_t *core, qdr_action_t *action, boo
     for (int priority = 0; priority < QDR_N_PRIORITIES; ++ priority) {
         link_ref = DEQ_HEAD(conn->links_with_work[priority]);
         while (link_ref) {
-            qdr_del_link_ref(conn->links_with_work + priority, link_ref->link, QDR_LINK_LIST_CLASS_WORK);
+            qdr_del_link_ref(&conn->links_with_work[priority], link_ref->link, QDR_LINK_LIST_CLASS_WORK);
             link_ref = DEQ_HEAD(conn->links_with_work[priority]);
         }
     }
diff --git a/src/router_core/delivery.c b/src/router_core/delivery.c
index 5e3cc52..fb11e02 100644
--- a/src/router_core/delivery.c
+++ b/src/router_core/delivery.c
@@ -1013,8 +1013,7 @@ void qdr_deliver_continue_peers_CT(qdr_core_t *core, qdr_delivery_t *in_dlv)
         if (!!work && !!peer_link) {
             sys_mutex_lock(peer_link->conn->work_lock);
             if (work->processing || work == DEQ_HEAD(peer_link->work_list)) {
-                // Adding this work at priority 0.
-                qdr_add_link_ref(peer_link->conn->links_with_work, peer_link, QDR_LINK_LIST_CLASS_WORK);
+                qdr_add_link_ref(&peer_link->conn->links_with_work[peer_link->priority], peer_link, QDR_LINK_LIST_CLASS_WORK);
                 sys_mutex_unlock(peer_link->conn->work_lock);
 
                 //
@@ -1116,8 +1115,7 @@ void qdr_delivery_push_CT(qdr_core_t *core, qdr_delivery_t *dlv)
     if (dlv->where != QDR_DELIVERY_IN_UNDELIVERED) {
         qdr_delivery_incref(dlv, "qdr_delivery_push_CT - add to updated list");
         qdr_add_delivery_ref_CT(&link->updated_deliveries, dlv);
-        // Adding this work at priority 0.
-        qdr_add_link_ref(link->conn->links_with_work, link, QDR_LINK_LIST_CLASS_WORK);
+        qdr_add_link_ref(&link->conn->links_with_work[link->priority], link, QDR_LINK_LIST_CLASS_WORK);
         activate = true;
     }
     sys_mutex_unlock(link->conn->work_lock);
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 04575f3..ab73501 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -234,7 +234,7 @@ void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *out_link, qdr_delivery
         work->value     = 1;
         DEQ_INSERT_TAIL(out_link->work_list, work);
     }
-    qdr_add_link_ref(out_link->conn->links_with_work + out_link->priority, out_link, QDR_LINK_LIST_CLASS_WORK);
+    qdr_add_link_ref(&out_link->conn->links_with_work[out_link->priority], out_link, QDR_LINK_LIST_CLASS_WORK);
 
     out_dlv->link_work = work;
     sys_mutex_unlock(out_link->conn->work_lock);
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 1331f0c..2794e92 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -320,8 +320,7 @@ static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action, bool discar
         sys_mutex_lock(link->conn->work_lock);
 
         if (DEQ_SIZE(link->undelivered) > 0) {
-            // Adding this work at priority 0.
-            qdr_add_link_ref(link->conn->links_with_work, link, QDR_LINK_LIST_CLASS_WORK);
+            qdr_add_link_ref(&link->conn->links_with_work[link->priority], link, QDR_LINK_LIST_CLASS_WORK);
             activate = true;
         }
 
@@ -371,8 +370,7 @@ static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action, bool discar
             if (work)
                 DEQ_INSERT_TAIL(link->work_list, work);
             if (DEQ_SIZE(link->undelivered) > 0 || drain_was_set) {
-                // Adding this work at priority 0.
-                qdr_add_link_ref(link->conn->links_with_work, link, QDR_LINK_LIST_CLASS_WORK);
+                qdr_add_link_ref(&link->conn->links_with_work[link->priority], link, QDR_LINK_LIST_CLASS_WORK);
                 activate = true;
             }
             sys_mutex_unlock(link->conn->work_lock);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org