You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2017/09/22 15:00:10 UTC

qpid-dispatch git commit: DISPATCH-835 - Split the link-cleanup logic into delivery cleanup and everything else cleanup. Added a CT function to delete links so this is no longer spread across two threads.

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master af96b23fe -> f8f2b0ae7


DISPATCH-835 - Split the link-cleanup logic into delivery cleanup and everything else cleanup.  Added a CT function to delete links so this is no longer spread across two threads.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/f8f2b0ae
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/f8f2b0ae
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/f8f2b0ae

Branch: refs/heads/master
Commit: f8f2b0ae736e03d31a697ecd02dcdfd774f2bdeb
Parents: af96b23
Author: Ted Ross <tr...@redhat.com>
Authored: Fri Sep 22 10:58:17 2017 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Fri Sep 22 10:58:17 2017 -0400

----------------------------------------------------------------------
 include/qpid/dispatch/router_core.h   |   9 +++
 src/router_core/connections.c         | 125 +++++++++++++++++++----------
 src/router_core/router_core_private.h |   1 -
 3 files changed, 90 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f8f2b0ae/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 59b11c0..701cf1c 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -528,6 +528,15 @@ void qdr_link_second_attach(qdr_link_t *link, qdr_terminus_t *source, qdr_termin
 void qdr_link_detach(qdr_link_t *link, qd_detach_type_t dt, qdr_error_t *error);
 
 /**
+ * qdr_link_delete
+ *
+ * Request that the router-core delete this link and free all its associated resources.
+ *
+ * @param link The link pointer returned by qdr_link_first_attach or in a FIRST_ATTACH event.
+ */
+void qdr_link_delete(qdr_link_t *link);
+
+/**
  * qdr_link_deliver
  *
  * Deliver a message to the router core for forwarding.  This function is used in cases where

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f8f2b0ae/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index c3244dc..da767bd 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -29,6 +29,7 @@ static void qdr_connection_closed_CT(qdr_core_t *core, qdr_action_t *action, boo
 static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
 static void qdr_link_inbound_second_attach_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
 static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdr_link_delete_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
 
 ALLOC_DEFINE(qdr_connection_t);
 ALLOC_DEFINE(qdr_connection_work_t);
@@ -322,7 +323,7 @@ int qdr_connection_process(qdr_connection_t *conn)
             }
 
             if (free_link)
-                free_qdr_link_t(link);
+                qdr_link_delete(link);
         }
     } while (free_link || link);
 
@@ -464,6 +465,15 @@ void qdr_link_detach(qdr_link_t *link, qd_detach_type_t dt, qdr_error_t *error)
 }
 
 
+void qdr_link_delete(qdr_link_t *link)
+{
+    qdr_action_t *action = qdr_action(qdr_link_delete_CT, "link_delete");
+
+    action->args.connection.link = link;
+    qdr_action_enqueue(link->core, action);
+}
+
+
 void qdr_connection_handlers(qdr_core_t                *core,
                              void                      *context,
                              qdr_connection_activate_t  activate,
@@ -588,41 +598,17 @@ static void qdr_generate_link_name(const char *label, char *buffer, size_t lengt
 }
 
 
-static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_link_t *link)
+static void qdr_link_cleanup_deliveries_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_link_t *link)
 {
     //
-    // Remove the link from the master list of links
-    //
-    DEQ_REMOVE(core->open_links, link);
-
-    //
-    // If the link has a connected peer, unlink the peer
-    //
-    if (link->connected_link) {
-        link->connected_link->connected_link = 0;
-        link->connected_link = 0;
-    }
-
-    //
-    // If this link is involved in inter-router communication, remove its reference
-    // from the core mask-bit tables
-    //
-    if (link->link_type == QD_LINK_CONTROL)
-        core->control_links_by_mask_bit[conn->mask_bit] = 0;
-    if (link->link_type == QD_LINK_ROUTER)
-        core->data_links_by_mask_bit[conn->mask_bit] = 0;
-
-    //
     // Clean up the lists of deliveries on this link
     //
     qdr_delivery_ref_list_t updated_deliveries;
     qdr_delivery_list_t     undelivered;
     qdr_delivery_list_t     unsettled;
     qdr_delivery_list_t     settled;
-    qdr_link_work_list_t    work_list;
 
     sys_mutex_lock(conn->work_lock);
-    DEQ_MOVE(link->work_list, work_list);
     DEQ_MOVE(link->updated_deliveries, updated_deliveries);
 
     DEQ_MOVE(link->undelivered, undelivered);
@@ -651,17 +637,6 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li
     sys_mutex_unlock(conn->work_lock);
 
     //
-    // Free the work list
-    //
-    qdr_link_work_t *link_work = DEQ_HEAD(work_list);
-    while (link_work) {
-        DEQ_REMOVE_HEAD(work_list);
-        qdr_error_free(link_work->error);
-        free_qdr_link_work_t(link_work);
-        link_work = DEQ_HEAD(work_list);
-    }
-
-    //
     // Free all the 'updated' references
     //
     qdr_delivery_ref_t *ref = DEQ_HEAD(updated_deliveries);
@@ -777,6 +752,57 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li
         qdr_delivery_decref_CT(core, dlv);
         dlv = DEQ_HEAD(settled);
     }
+}
+
+
+static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_link_t *link)
+{
+    //
+    // Remove the link from the master list of links
+    //
+    DEQ_REMOVE(core->open_links, link);
+
+    //
+    // If the link has a connected peer, unlink the peer
+    //
+    if (link->connected_link) {
+        link->connected_link->connected_link = 0;
+        link->connected_link = 0;
+    }
+
+    //
+    // If this link is involved in inter-router communication, remove its reference
+    // from the core mask-bit tables
+    //
+    if (link->link_type == QD_LINK_CONTROL)
+        core->control_links_by_mask_bit[conn->mask_bit] = 0;
+    if (link->link_type == QD_LINK_ROUTER)
+        core->data_links_by_mask_bit[conn->mask_bit] = 0;
+
+    //
+    // Clean up the work list
+    //
+    qdr_link_work_list_t work_list;
+
+    sys_mutex_lock(conn->work_lock);
+    DEQ_MOVE(link->work_list, work_list);
+    sys_mutex_unlock(conn->work_lock);
+
+    //
+    // Free the work list
+    //
+    qdr_link_work_t *link_work = DEQ_HEAD(work_list);
+    while (link_work) {
+        DEQ_REMOVE_HEAD(work_list);
+        qdr_error_free(link_work->error);
+        free_qdr_link_work_t(link_work);
+        link_work = DEQ_HEAD(work_list);
+    }
+
+    //
+    // Clean up any remaining deliveries
+    //
+    qdr_link_cleanup_deliveries_CT(core, conn, link);
 
     //
     // Remove the reference to this link in the connection's reference lists
@@ -877,9 +903,6 @@ void qdr_link_outbound_detach_CT(qdr_core_t *core, qdr_link_t *link, qdr_error_t
         }
     }
 
-    if (link->detach_count == 2)
-        qdr_link_cleanup_CT(core, link->conn, link);
-
     qdr_link_enqueue_work_CT(core, link, work);
 }
 
@@ -1692,12 +1715,13 @@ static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t *action, b
         }
     }
 
-    //
-    // TODO - If this link is owned by an auto_link, handle the unexpected detach.
-    //
-
     if (link->detach_count == 1) {
         //
+        // Handle the disposition of any deliveries that remain on the link
+        //
+        qdr_link_cleanup_deliveries_CT(core, conn, link);
+        
+        //
         // If the detach occurred via protocol, send a detach back.
         //
         if (dt != QD_LOST)
@@ -1719,3 +1743,16 @@ static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t *action, b
 }
 
 
+static void qdr_link_delete_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
+{
+    if (discard)
+        return;
+
+    qdr_link_t *link = action->args.connection.link;
+
+    if (link && link->conn) {
+        qdr_link_cleanup_CT(core, link->conn, link);
+        free_qdr_link_t(link);
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/f8f2b0ae/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 3a4a9cb..9535708 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -537,7 +537,6 @@ struct qdr_connection_t {
     qdr_connection_work_list_t  work_list;
     sys_mutex_t                *work_lock;
     qdr_link_ref_list_t         links;
-    qdr_link_ref_list_t         links_with_deliveries;
     qdr_link_ref_list_t         links_with_work;
     char                       *tenant_space;
     int                         tenant_space_len;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org