You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2019/03/11 20:05:41 UTC

[qpid-dispatch] branch master updated: DISPATCH-1281 - Batch the cleanup of freed messages so they share a single unit of general-work per core-thread action sweep. This closes #457

This is an automated email from the ASF dual-hosted git repository.

tross pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/master by this push:
     new e87577b  DISPATCH-1281 - Batch the cleanup of freed messages so they share a single unit of general-work per core-thread action sweep. This closes #457
e87577b is described below

commit e87577bb401490800716fd482d0b9eaf169d2fae
Author: Ted Ross <tr...@redhat.com>
AuthorDate: Mon Mar 11 16:04:41 2019 -0400

    DISPATCH-1281 - Batch the cleanup of freed messages so they share a single unit of general-work per core-thread action sweep.
    This closes #457
---
 src/router_core/router_core.c         |  1 +
 src/router_core/router_core_private.h | 34 +++++++++++++++++++++++++---------
 src/router_core/router_core_thread.c  | 26 +++++++++++++++++++++++++-
 src/router_core/transfer.c            | 20 +++++++-------------
 4 files changed, 58 insertions(+), 23 deletions(-)

diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index c2dbdaa..99fe99b 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -32,6 +32,7 @@ ALLOC_DEFINE(qdr_delivery_ref_t);
 ALLOC_DEFINE(qdr_link_t);
 ALLOC_DEFINE(qdr_router_ref_t);
 ALLOC_DEFINE(qdr_link_ref_t);
+ALLOC_DEFINE(qdr_delivery_cleanup_t);
 ALLOC_DEFINE(qdr_general_work_t);
 ALLOC_DEFINE(qdr_link_work_t);
 ALLOC_DEFINE(qdr_connection_ref_t);
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 0865c78..894d19e 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -182,6 +182,20 @@ ALLOC_DECLARE(qdr_action_t);
 DEQ_DECLARE(qdr_action_t, qdr_action_list_t);
 
 //
+//
+//
+typedef struct qdr_delivery_cleanup_t qdr_delivery_cleanup_t;
+
+struct qdr_delivery_cleanup_t {
+    DEQ_LINKS(qdr_delivery_cleanup_t);
+    qd_message_t  *msg;
+    qd_iterator_t *iter;
+};
+
+ALLOC_DECLARE(qdr_delivery_cleanup_t);
+DEQ_DECLARE(qdr_delivery_cleanup_t, qdr_delivery_cleanup_list_t);
+
+//
 // General Work
 //
 // The following types are used to post work to the IO threads for
@@ -194,15 +208,16 @@ typedef void (*qdr_general_work_handler_t) (qdr_core_t *core, qdr_general_work_t
 
 struct qdr_general_work_t {
     DEQ_LINKS(qdr_general_work_t);
-    qdr_general_work_handler_t  handler;
-    qdr_field_t                *field;
-    int                         maskbit;
-    int                         inter_router_cost;
-    qdr_receive_t               on_message;
-    void                       *on_message_context;
-    qd_message_t               *msg;
-    uint64_t                    in_conn_id;
-    int                         treatment;
+    qdr_general_work_handler_t   handler;
+    qdr_field_t                 *field;
+    int                          maskbit;
+    int                          inter_router_cost;
+    qd_message_t                *msg;
+    qdr_receive_t                on_message;
+    void                        *on_message_context;
+    uint64_t                     in_conn_id;
+    int                          treatment;
+    qdr_delivery_cleanup_list_t  delivery_cleanup_list;
 };
 
 ALLOC_DECLARE(qdr_general_work_t);
@@ -819,6 +834,7 @@ struct qdr_core_t {
     qdr_exchange_list_t   exchanges;
     qdr_forwarder_t      *forwarders[QD_TREATMENT_LINK_BALANCED + 1];
 
+    qdr_delivery_cleanup_list_t  delivery_cleanup_list;  ///< List of delivery cleanup items to be processed in an IO thread
 
     // Overall delivery counters
     uint64_t           presettled_deliveries;
diff --git a/src/router_core/router_core_thread.c b/src/router_core/router_core_thread.c
index f3b0a36..c82d3f2 100644
--- a/src/router_core/router_core_thread.c
+++ b/src/router_core/router_core_thread.c
@@ -68,6 +68,22 @@ static void qdr_activate_connections_CT(qdr_core_t *core)
 }
 
 
+static void qdr_do_message_to_addr_free(qdr_core_t *core, qdr_general_work_t *work)
+{
+    qdr_delivery_cleanup_t *cleanup = DEQ_HEAD(work->delivery_cleanup_list);
+
+    while (cleanup) {
+        DEQ_REMOVE_HEAD(work->delivery_cleanup_list);
+        if (cleanup->msg)
+            qd_message_free(cleanup->msg);
+        if (cleanup->iter)
+            qd_iterator_free(cleanup->iter);
+        free_qdr_delivery_cleanup_t(cleanup);
+        cleanup = DEQ_HEAD(work->delivery_cleanup_list);
+    }
+}
+
+
 void qdr_modules_init(qdr_core_t *core)
 {
     //
@@ -84,7 +100,6 @@ void qdr_modules_init(qdr_core_t *core)
 
         module = DEQ_NEXT(module);
     }
-
 }
 
 
@@ -154,6 +169,15 @@ void *router_core_thread(void *arg)
         // Activate all connections that were flagged for activation during the above processing
         //
         qdr_activate_connections_CT(core);
+
+        //
+        // Schedule the cleanup of deliveries freed during this core-thread pass
+        //
+        if (DEQ_SIZE(core->delivery_cleanup_list) > 0) {
+            qdr_general_work_t *work = qdr_general_work(qdr_do_message_to_addr_free);
+            DEQ_MOVE(core->delivery_cleanup_list, work->delivery_cleanup_list);
+            qdr_post_general_work_CT(core, work);
+        }
     }
 
     qd_log(core->log, QD_LOG_INFO, "Router Core thread exited");
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 534cc11..2c0fe43 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -550,25 +550,19 @@ bool qdr_delivery_settled_CT(qdr_core_t *core, qdr_delivery_t *dlv)
 }
 
 
-static void qdr_do_message_to_addr_free(qdr_core_t *core, qdr_general_work_t *work)
-{
-    if (work->msg)
-        qd_message_free(work->msg);
-    if (work->on_message_context)
-        qd_iterator_free((qd_iterator_t *)work->on_message_context);
-}
-
-
 static void qdr_delete_delivery_internal_CT(qdr_core_t *core, qdr_delivery_t *delivery)
 {
     assert(sys_atomic_get(&delivery->ref_count) == 0);
     qdr_link_t *link = delivery->link;
 
     if (delivery->msg || delivery->to_addr) {
-        qdr_general_work_t *work = qdr_general_work(qdr_do_message_to_addr_free);
-        work->msg                = delivery->msg;
-        work->on_message_context = delivery->to_addr;
-        qdr_post_general_work_CT(core, work);
+        qdr_delivery_cleanup_t *cleanup = new_qdr_delivery_cleanup_t();
+
+        DEQ_ITEM_INIT(cleanup);
+        cleanup->msg  = delivery->msg;
+        cleanup->iter = delivery->to_addr;
+
+        DEQ_INSERT_TAIL(core->delivery_cleanup_list, cleanup);
     }
 
     if (delivery->tracking_addr) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org