You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2019/07/17 15:55:13 UTC
[qpid-dispatch] branch master updated: DISPATCH-1372 alloc_pool
intrusive linked list can be replaced by a linked stack
This is an automated email from the ASF dual-hosted git repository.
kgiusti pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/master by this push:
new 94180b8 DISPATCH-1372 alloc_pool intrusive linked list can be replaced by a linked stack
94180b8 is described below
commit 94180b80d603ffa3a3ce33fad85eda3659357b8f
Author: Francesco Nigro <ni...@gmail.com>
AuthorDate: Fri Jun 21 09:05:03 2019 +0200
DISPATCH-1372 alloc_pool intrusive linked list can be replaced by a linked stack
Signed-off-by: Kenneth Giusti <kg...@apache.org>
This closes #525
---
src/alloc_pool.c | 212 +++++++++++++++++++++++++++++++++++++++++++++----------
1 file changed, 173 insertions(+), 39 deletions(-)
diff --git a/src/alloc_pool.c b/src/alloc_pool.c
index 28e8da3..c63c0dc 100644
--- a/src/alloc_pool.c
+++ b/src/alloc_pool.c
@@ -30,8 +30,10 @@
const char *QD_ALLOCATOR_TYPE = "allocator";
-typedef struct qd_alloc_type_t qd_alloc_type_t;
-typedef struct qd_alloc_item_t qd_alloc_item_t;
+typedef struct qd_alloc_type_t qd_alloc_type_t;
+typedef struct qd_alloc_item_t qd_alloc_item_t;
+typedef struct qd_alloc_chunk_t qd_alloc_chunk_t;
+typedef struct qd_alloc_linked_stack_t qd_alloc_linked_stack_t;
struct qd_alloc_type_t {
DEQ_LINKS(qd_alloc_type_t);
@@ -44,7 +46,6 @@ DEQ_DECLARE(qd_alloc_type_t, qd_alloc_type_list_t);
#define PATTERN_BACK 0xbabecafe
struct qd_alloc_item_t {
- DEQ_LINKS(qd_alloc_item_t);
uint32_t sequence;
#ifdef QD_MEMORY_DEBUG
qd_alloc_type_desc_t *desc;
@@ -52,12 +53,147 @@ struct qd_alloc_item_t {
#endif
};
-DEQ_DECLARE(qd_alloc_item_t, qd_alloc_item_list_t);
+//128 has been chosen because many CPUs arch use an
+//adiacent line prefetching optimization that load
+//2*cache line bytes in batch
+#define CHUNK_SIZE 128/sizeof(void*)
+struct qd_alloc_chunk_t {
+ qd_alloc_chunk_t *prev; //do not use DEQ_LINKS here: field position could affect access cost
+ qd_alloc_item_t *items[CHUNK_SIZE];
+ qd_alloc_chunk_t *next;
+};
+
+struct qd_alloc_linked_stack_t {
+ //the base
+ qd_alloc_chunk_t *top_chunk;
+ uint32_t top; //qd_alloc_item* top_item = top_chunk->items[top+1] <-> top > 0
+ uint64_t size;
+ qd_alloc_chunk_t base_chunk;
+};
+
+static inline void init_stack(qd_alloc_linked_stack_t *stack)
+{
+ stack->top_chunk = &stack->base_chunk;
+ stack->top_chunk->next = NULL;
+ stack->top = 0;
+ stack->size = 0;
+}
+
+static inline void prev_chunk_stack(qd_alloc_linked_stack_t *const stack)
+{
+ const uint32_t chunk_size = CHUNK_SIZE;
+ assert(stack->top == 0);
+ assert(stack->size != 0);
+ assert(stack->top_chunk != &stack->base_chunk);
+ qd_alloc_chunk_t *prev = stack->top_chunk->prev;
+ //TODO(franz): stack->top_chunk could be passed externally and walked its nexts
+ // to recycle the last chunk.
+ // Just need to pay attention to null out released_chunk->prev->next
+ // to make it unreachable from the stack
+ stack->top_chunk = prev;
+ stack->top = chunk_size;
+}
+
+static inline qd_alloc_item_t *pop_stack(qd_alloc_linked_stack_t *const stack)
+{
+ if (stack->top == 0) {
+ if (stack->size == 0) {
+ assert(stack->top_chunk == &stack->base_chunk);
+ return NULL;
+ }
+ prev_chunk_stack(stack);
+ }
+ stack->top--;
+ assert(stack->top >= 0 && stack->top < CHUNK_SIZE);
+ stack->size--;
+ assert(stack->size >= 0);
+ qd_alloc_item_t *item = stack->top_chunk->items[stack->top];
+ assert(item != NULL);
+ return item;
+}
+
+static inline void free_stack_chunks(qd_alloc_linked_stack_t *stack)
+{
+ assert(stack->size == 0);
+ //the assumption here is that next is always correctly set
+ qd_alloc_chunk_t *chunk = stack->base_chunk.next;
+ while (chunk != NULL) {
+ qd_alloc_chunk_t *next = chunk->next;
+ free(chunk);
+ chunk = next;
+ }
+}
+
+static inline bool next_chunk_stack(qd_alloc_linked_stack_t *const stack)
+{
+ assert(stack->top == CHUNK_SIZE);
+ qd_alloc_chunk_t *top = stack->top_chunk->next;
+ if (top == NULL) {
+ top = NEW(qd_alloc_chunk_t);
+ if (top == NULL) {
+ return false;
+ }
+ stack->top_chunk->next = top;
+ top->prev = stack->top_chunk;
+ top->next = NULL;
+ }
+ assert(top->prev == stack->top_chunk);
+ assert(stack->top_chunk->next == top);
+ stack->top_chunk = top;
+ stack->top = 0;
+ return true;
+}
+
+static inline bool push_stack(qd_alloc_linked_stack_t *stack, qd_alloc_item_t *item)
+{
+ const uint32_t chunk_size = CHUNK_SIZE;
+ if (stack->top == chunk_size) {
+ if (!next_chunk_stack(stack)) {
+ return false;
+ }
+ }
+ stack->size++;
+ stack->top_chunk->items[stack->top] = item;
+ stack->top++;
+ return true;
+}
+
+static inline int unordered_move_stack(qd_alloc_linked_stack_t *from, qd_alloc_linked_stack_t *to, uint32_t length)
+{
+ length = from->size < length ? from->size : length;
+ if (length == 0) {
+ return 0;
+ }
+ uint32_t remaining = length;
+ const uint32_t chunk_size = CHUNK_SIZE;
+ while (remaining > 0) {
+ //top will tell us how much data we could memcpy
+ uint32_t to_copy = remaining;
+ if (from->top == 0) {
+ prev_chunk_stack(from);
+ }
+ to_copy = from->top < to_copy ? from->top : to_copy;
+ if (to->top == chunk_size) {
+ if (!next_chunk_stack(to)) {
+ return length - remaining;
+ }
+ }
+ uint32_t remaining_to = chunk_size - to->top;
+ to_copy = remaining_to < to_copy ? remaining_to : to_copy;
+ from->top -= to_copy;
+ memcpy(&to->top_chunk->items[to->top], &from->top_chunk->items[from->top], to_copy * sizeof(qd_alloc_item_t *));
+ to->top += to_copy;
+ to->size += to_copy;
+ from->size -= to_copy;
+ remaining -= to_copy;
+ }
+ return length;
+}
struct qd_alloc_pool_t {
DEQ_LINKS(qd_alloc_pool_t);
- qd_alloc_item_list_t free_list;
+ qd_alloc_linked_stack_t free_list;
};
qd_alloc_config_t qd_alloc_default_config_big = {16, 32, 0};
@@ -85,7 +221,7 @@ static void qd_alloc_init(qd_alloc_type_desc_t *desc)
desc->global_pool = NEW(qd_alloc_pool_t);
DEQ_ITEM_INIT(desc->global_pool);
- DEQ_INIT(desc->global_pool->free_list);
+ init_stack(&desc->global_pool->free_list);
desc->lock = sys_mutex();
DEQ_INIT(desc->tpool_list);
#if QD_MEMORY_STATS
@@ -125,7 +261,7 @@ void *qd_alloc(qd_alloc_type_desc_t *desc, qd_alloc_pool_t **tpool)
if (*tpool == 0) {
NEW_CACHE_ALIGNED(qd_alloc_pool_t, *tpool);
DEQ_ITEM_INIT(*tpool);
- DEQ_INIT((*tpool)->free_list);
+ init_stack(&(*tpool)->free_list);
sys_mutex_lock(desc->lock);
DEQ_INSERT_TAIL(desc->tpool_list, *tpool);
sys_mutex_unlock(desc->lock);
@@ -138,9 +274,8 @@ void *qd_alloc(qd_alloc_type_desc_t *desc, qd_alloc_pool_t **tpool)
// list and return it. Since everything we've touched is thread-local,
// there is no need to acquire a lock.
//
- qd_alloc_item_t *item = DEQ_HEAD(pool->free_list);
+ qd_alloc_item_t *item = pop_stack(&pool->free_list);
if (item) {
- DEQ_REMOVE_HEAD(pool->free_list);
#ifdef QD_MEMORY_DEBUG
item->desc = desc;
item->header = PATTERN_FRONT;
@@ -159,19 +294,21 @@ void *qd_alloc(qd_alloc_type_desc_t *desc, qd_alloc_pool_t **tpool)
//
// Rebalance a full batch from the global free list to the thread list.
//
+ const int moved = unordered_move_stack(&desc->global_pool->free_list, &pool->free_list,
+ desc->config->transfer_batch_size);
+ assert(moved == desc->config->transfer_batch_size);
#if QD_MEMORY_STATS
desc->stats->batches_rebalanced_to_threads++;
- desc->stats->held_by_threads += desc->config->transfer_batch_size;
+ desc->stats->held_by_threads += moved;
#endif
- for (idx = 0; idx < desc->config->transfer_batch_size; idx++) {
- item = DEQ_HEAD(desc->global_pool->free_list);
- DEQ_REMOVE_HEAD(desc->global_pool->free_list);
- DEQ_INSERT_TAIL(pool->free_list, item);
- }
} else {
//
// Allocate a full batch from the heap and put it on the thread list.
//
+ //TODO(franz):
+ // - would be better to allocate in batches == transfer_batch_size
+ // and put a small (== sizeof(transfer_batch_size)) ref_count to help the final free
+ // - could be beneficial directly to delink a chunk?
for (idx = 0; idx < desc->config->transfer_batch_size; idx++) {
size_t size = sizeof(qd_alloc_item_t) + desc->total_size
#ifdef QD_MEMORY_DEBUG
@@ -181,8 +318,10 @@ void *qd_alloc(qd_alloc_type_desc_t *desc, qd_alloc_pool_t **tpool)
ALLOC_CACHE_ALIGNED(size, item);
if (item == 0)
break;
- DEQ_ITEM_INIT(item);
- DEQ_INSERT_TAIL(pool->free_list, item);
+ if (!push_stack(&pool->free_list, item)) {
+ free(item);
+ break;
+ }
item->sequence = 0;
#if QD_MEMORY_STATS
desc->stats->held_by_threads++;
@@ -192,9 +331,8 @@ void *qd_alloc(qd_alloc_type_desc_t *desc, qd_alloc_pool_t **tpool)
}
sys_mutex_unlock(desc->lock);
- item = DEQ_HEAD(pool->free_list);
+ item = pop_stack(&pool->free_list);
if (item) {
- DEQ_REMOVE_HEAD(pool->free_list);
#ifdef QD_MEMORY_DEBUG
item->desc = desc;
item->header = PATTERN_FRONT;
@@ -213,7 +351,6 @@ void qd_dealloc(qd_alloc_type_desc_t *desc, qd_alloc_pool_t **tpool, char *p)
{
if (!p) return;
qd_alloc_item_t *item = ((qd_alloc_item_t*) p) - 1;
- int idx;
#ifdef QD_MEMORY_DEBUG
assert (desc->header == PATTERN_FRONT);
@@ -232,7 +369,7 @@ void qd_dealloc(qd_alloc_type_desc_t *desc, qd_alloc_pool_t **tpool, char *p)
if (*tpool == 0) {
*tpool = NEW(qd_alloc_pool_t);
DEQ_ITEM_INIT(*tpool);
- DEQ_INIT((*tpool)->free_list);
+ init_stack(&(*tpool)->free_list);
sys_mutex_lock(desc->lock);
DEQ_INSERT_TAIL(desc->tpool_list, *tpool);
sys_mutex_unlock(desc->lock);
@@ -241,9 +378,11 @@ void qd_dealloc(qd_alloc_type_desc_t *desc, qd_alloc_pool_t **tpool, char *p)
qd_alloc_pool_t *pool = *tpool;
item->sequence++;
- DEQ_INSERT_TAIL(pool->free_list, item);
+ if (!push_stack(&pool->free_list, item)) {
+ free(item);
+ }
- if (DEQ_SIZE(pool->free_list) <= desc->config->local_free_list_max)
+ if (DEQ_SIZE(pool->free_list) < desc->config->local_free_list_max)
return;
//
@@ -251,24 +390,20 @@ void qd_dealloc(qd_alloc_type_desc_t *desc, qd_alloc_pool_t **tpool, char *p)
// rebalanced back to the global list.
//
sys_mutex_lock(desc->lock);
+ const int moved = unordered_move_stack(&pool->free_list, &desc->global_pool->free_list,
+ desc->config->transfer_batch_size);
+ assert(moved == desc->config->transfer_batch_size);
#if QD_MEMORY_STATS
desc->stats->batches_rebalanced_to_global++;
- desc->stats->held_by_threads -= desc->config->transfer_batch_size;
+ desc->stats->held_by_threads -= moved;
#endif
- for (idx = 0; idx < desc->config->transfer_batch_size; idx++) {
- item = DEQ_HEAD(pool->free_list);
- DEQ_REMOVE_HEAD(pool->free_list);
- DEQ_INSERT_TAIL(desc->global_pool->free_list, item);
- }
-
//
// If there's a global_free_list size limit, remove items until the limit is
// not exceeded.
//
if (desc->config->global_free_list_max != 0) {
while (DEQ_SIZE(desc->global_pool->free_list) > desc->config->global_free_list_max) {
- item = DEQ_HEAD(desc->global_pool->free_list);
- DEQ_REMOVE_HEAD(desc->global_pool->free_list);
+ item = pop_stack(&desc->global_pool->free_list);
free(item);
#if QD_MEMORY_STATS
desc->stats->total_free_to_heap++;
@@ -329,15 +464,15 @@ void qd_alloc_finalize(void)
//
// Reclaim the items on the global free pool
//
- item = DEQ_HEAD(desc->global_pool->free_list);
+ item = pop_stack(&desc->global_pool->free_list);
while (item) {
- DEQ_REMOVE_HEAD(desc->global_pool->free_list);
free(item);
#if QD_MEMORY_STATS
desc->stats->total_free_to_heap++;
#endif
- item = DEQ_HEAD(desc->global_pool->free_list);
+ item = pop_stack(&desc->global_pool->free_list);
}
+ free_stack_chunks(&desc->global_pool->free_list);
free(desc->global_pool);
desc->global_pool = 0;
@@ -346,17 +481,16 @@ void qd_alloc_finalize(void)
//
qd_alloc_pool_t *tpool = DEQ_HEAD(desc->tpool_list);
while (tpool) {
- item = DEQ_HEAD(tpool->free_list);
+ item = pop_stack(&tpool->free_list);
while (item) {
- DEQ_REMOVE_HEAD(tpool->free_list);
free(item);
#if QD_MEMORY_STATS
desc->stats->total_free_to_heap++;
#endif
- item = DEQ_HEAD(tpool->free_list);
+ item = pop_stack(&tpool->free_list);
}
-
DEQ_REMOVE_HEAD(desc->tpool_list);
+ free_stack_chunks(&tpool->free_list);
free(tpool);
tpool = DEQ_HEAD(desc->tpool_list);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org