You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2019/03/11 20:05:41 UTC
[qpid-dispatch] branch master updated: DISPATCH-1281 - Batch the
cleanup of freed messages so they share a single unit of general-work per
core-thread action sweep. This closes #457
This is an automated email from the ASF dual-hosted git repository.
tross pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/master by this push:
new e87577b DISPATCH-1281 - Batch the cleanup of freed messages so they share a single unit of general-work per core-thread action sweep. This closes #457
e87577b is described below
commit e87577bb401490800716fd482d0b9eaf169d2fae
Author: Ted Ross <tr...@redhat.com>
AuthorDate: Mon Mar 11 16:04:41 2019 -0400
DISPATCH-1281 - Batch the cleanup of freed messages so they share a single unit of general-work per core-thread action sweep.
This closes #457
---
src/router_core/router_core.c | 1 +
src/router_core/router_core_private.h | 34 +++++++++++++++++++++++++---------
src/router_core/router_core_thread.c | 26 +++++++++++++++++++++++++-
src/router_core/transfer.c | 20 +++++++-------------
4 files changed, 58 insertions(+), 23 deletions(-)
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index c2dbdaa..99fe99b 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -32,6 +32,7 @@ ALLOC_DEFINE(qdr_delivery_ref_t);
ALLOC_DEFINE(qdr_link_t);
ALLOC_DEFINE(qdr_router_ref_t);
ALLOC_DEFINE(qdr_link_ref_t);
+ALLOC_DEFINE(qdr_delivery_cleanup_t);
ALLOC_DEFINE(qdr_general_work_t);
ALLOC_DEFINE(qdr_link_work_t);
ALLOC_DEFINE(qdr_connection_ref_t);
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 0865c78..894d19e 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -182,6 +182,20 @@ ALLOC_DECLARE(qdr_action_t);
DEQ_DECLARE(qdr_action_t, qdr_action_list_t);
//
+//
+//
+typedef struct qdr_delivery_cleanup_t qdr_delivery_cleanup_t;
+
+struct qdr_delivery_cleanup_t {
+ DEQ_LINKS(qdr_delivery_cleanup_t);
+ qd_message_t *msg;
+ qd_iterator_t *iter;
+};
+
+ALLOC_DECLARE(qdr_delivery_cleanup_t);
+DEQ_DECLARE(qdr_delivery_cleanup_t, qdr_delivery_cleanup_list_t);
+
+//
// General Work
//
// The following types are used to post work to the IO threads for
@@ -194,15 +208,16 @@ typedef void (*qdr_general_work_handler_t) (qdr_core_t *core, qdr_general_work_t
struct qdr_general_work_t {
DEQ_LINKS(qdr_general_work_t);
- qdr_general_work_handler_t handler;
- qdr_field_t *field;
- int maskbit;
- int inter_router_cost;
- qdr_receive_t on_message;
- void *on_message_context;
- qd_message_t *msg;
- uint64_t in_conn_id;
- int treatment;
+ qdr_general_work_handler_t handler;
+ qdr_field_t *field;
+ int maskbit;
+ int inter_router_cost;
+ qd_message_t *msg;
+ qdr_receive_t on_message;
+ void *on_message_context;
+ uint64_t in_conn_id;
+ int treatment;
+ qdr_delivery_cleanup_list_t delivery_cleanup_list;
};
ALLOC_DECLARE(qdr_general_work_t);
@@ -819,6 +834,7 @@ struct qdr_core_t {
qdr_exchange_list_t exchanges;
qdr_forwarder_t *forwarders[QD_TREATMENT_LINK_BALANCED + 1];
+ qdr_delivery_cleanup_list_t delivery_cleanup_list; ///< List of delivery cleanup items to be processed in an IO thread
// Overall delivery counters
uint64_t presettled_deliveries;
diff --git a/src/router_core/router_core_thread.c b/src/router_core/router_core_thread.c
index f3b0a36..c82d3f2 100644
--- a/src/router_core/router_core_thread.c
+++ b/src/router_core/router_core_thread.c
@@ -68,6 +68,22 @@ static void qdr_activate_connections_CT(qdr_core_t *core)
}
+static void qdr_do_message_to_addr_free(qdr_core_t *core, qdr_general_work_t *work)
+{
+ qdr_delivery_cleanup_t *cleanup = DEQ_HEAD(work->delivery_cleanup_list);
+
+ while (cleanup) {
+ DEQ_REMOVE_HEAD(work->delivery_cleanup_list);
+ if (cleanup->msg)
+ qd_message_free(cleanup->msg);
+ if (cleanup->iter)
+ qd_iterator_free(cleanup->iter);
+ free_qdr_delivery_cleanup_t(cleanup);
+ cleanup = DEQ_HEAD(work->delivery_cleanup_list);
+ }
+}
+
+
void qdr_modules_init(qdr_core_t *core)
{
//
@@ -84,7 +100,6 @@ void qdr_modules_init(qdr_core_t *core)
module = DEQ_NEXT(module);
}
-
}
@@ -154,6 +169,15 @@ void *router_core_thread(void *arg)
// Activate all connections that were flagged for activation during the above processing
//
qdr_activate_connections_CT(core);
+
+ //
+ // Schedule the cleanup of deliveries freed during this core-thread pass
+ //
+ if (DEQ_SIZE(core->delivery_cleanup_list) > 0) {
+ qdr_general_work_t *work = qdr_general_work(qdr_do_message_to_addr_free);
+ DEQ_MOVE(core->delivery_cleanup_list, work->delivery_cleanup_list);
+ qdr_post_general_work_CT(core, work);
+ }
}
qd_log(core->log, QD_LOG_INFO, "Router Core thread exited");
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 534cc11..2c0fe43 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -550,25 +550,19 @@ bool qdr_delivery_settled_CT(qdr_core_t *core, qdr_delivery_t *dlv)
}
-static void qdr_do_message_to_addr_free(qdr_core_t *core, qdr_general_work_t *work)
-{
- if (work->msg)
- qd_message_free(work->msg);
- if (work->on_message_context)
- qd_iterator_free((qd_iterator_t *)work->on_message_context);
-}
-
-
static void qdr_delete_delivery_internal_CT(qdr_core_t *core, qdr_delivery_t *delivery)
{
assert(sys_atomic_get(&delivery->ref_count) == 0);
qdr_link_t *link = delivery->link;
if (delivery->msg || delivery->to_addr) {
- qdr_general_work_t *work = qdr_general_work(qdr_do_message_to_addr_free);
- work->msg = delivery->msg;
- work->on_message_context = delivery->to_addr;
- qdr_post_general_work_CT(core, work);
+ qdr_delivery_cleanup_t *cleanup = new_qdr_delivery_cleanup_t();
+
+ DEQ_ITEM_INIT(cleanup);
+ cleanup->msg = delivery->msg;
+ cleanup->iter = delivery->to_addr;
+
+ DEQ_INSERT_TAIL(core->delivery_cleanup_list, cleanup);
}
if (delivery->tracking_addr) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org