You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2019/07/17 15:55:13 UTC

[qpid-dispatch] branch master updated: DISPATCH-1372 alloc_pool intrusive linked list can be replaced by a linked stack

This is an automated email from the ASF dual-hosted git repository.

kgiusti pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/master by this push:
     new 94180b8  DISPATCH-1372 alloc_pool intrusive linked list can be replaced by a linked stack
94180b8 is described below

commit 94180b80d603ffa3a3ce33fad85eda3659357b8f
Author: Francesco Nigro <ni...@gmail.com>
AuthorDate: Fri Jun 21 09:05:03 2019 +0200

    DISPATCH-1372 alloc_pool intrusive linked list can be replaced by a linked stack
    
    Signed-off-by: Kenneth Giusti <kg...@apache.org>
    
    This closes #525
---
 src/alloc_pool.c | 212 +++++++++++++++++++++++++++++++++++++++++++++----------
 1 file changed, 173 insertions(+), 39 deletions(-)

diff --git a/src/alloc_pool.c b/src/alloc_pool.c
index 28e8da3..c63c0dc 100644
--- a/src/alloc_pool.c
+++ b/src/alloc_pool.c
@@ -30,8 +30,10 @@
 
 const char *QD_ALLOCATOR_TYPE = "allocator";
 
-typedef struct qd_alloc_type_t qd_alloc_type_t;
-typedef struct qd_alloc_item_t qd_alloc_item_t;
+typedef struct qd_alloc_type_t          qd_alloc_type_t;
+typedef struct qd_alloc_item_t          qd_alloc_item_t;
+typedef struct qd_alloc_chunk_t         qd_alloc_chunk_t;
+typedef struct qd_alloc_linked_stack_t  qd_alloc_linked_stack_t;
 
 struct qd_alloc_type_t {
     DEQ_LINKS(qd_alloc_type_t);
@@ -44,7 +46,6 @@ DEQ_DECLARE(qd_alloc_type_t, qd_alloc_type_list_t);
 #define PATTERN_BACK  0xbabecafe
 
 struct qd_alloc_item_t {
-    DEQ_LINKS(qd_alloc_item_t);
     uint32_t              sequence;
 #ifdef QD_MEMORY_DEBUG
     qd_alloc_type_desc_t *desc;
@@ -52,12 +53,147 @@ struct qd_alloc_item_t {
 #endif
 };
 
-DEQ_DECLARE(qd_alloc_item_t, qd_alloc_item_list_t);
+//128 has been chosen because many CPUs arch use an
+//adiacent line prefetching optimization that load
+//2*cache line bytes in batch
+#define CHUNK_SIZE 128/sizeof(void*)
 
+struct qd_alloc_chunk_t {
+    qd_alloc_chunk_t     *prev;                 //do not use DEQ_LINKS here: field position could affect access cost
+    qd_alloc_item_t      *items[CHUNK_SIZE];
+    qd_alloc_chunk_t     *next;
+};
+
+struct qd_alloc_linked_stack_t {
+    //the base
+    qd_alloc_chunk_t     *top_chunk;
+    uint32_t              top;                  //qd_alloc_item* top_item = top_chunk->items[top+1] <-> top > 0
+    uint64_t              size;
+    qd_alloc_chunk_t      base_chunk;
+};
+
+static inline void init_stack(qd_alloc_linked_stack_t *stack)
+{
+    stack->top_chunk = &stack->base_chunk;
+    stack->top_chunk->next = NULL;
+    stack->top = 0;
+    stack->size = 0;
+}
+
+static inline void prev_chunk_stack(qd_alloc_linked_stack_t *const stack)
+{
+    const uint32_t chunk_size = CHUNK_SIZE;
+    assert(stack->top == 0);
+    assert(stack->size != 0);
+    assert(stack->top_chunk != &stack->base_chunk);
+    qd_alloc_chunk_t *prev = stack->top_chunk->prev;
+    //TODO(franz):  stack->top_chunk could be passed externally and walked its nexts
+    //              to recycle the last chunk.
+    //              Just need to pay attention to null out released_chunk->prev->next
+    //              to make it unreachable from the stack
+    stack->top_chunk = prev;
+    stack->top = chunk_size;
+}
+
+static inline qd_alloc_item_t *pop_stack(qd_alloc_linked_stack_t *const stack)
+{
+    if (stack->top == 0) {
+        if (stack->size == 0) {
+            assert(stack->top_chunk == &stack->base_chunk);
+            return NULL;
+        }
+        prev_chunk_stack(stack);
+    }
+    stack->top--;
+    assert(stack->top >= 0 && stack->top < CHUNK_SIZE);
+    stack->size--;
+    assert(stack->size >= 0);
+    qd_alloc_item_t *item = stack->top_chunk->items[stack->top];
+    assert(item != NULL);
+    return item;
+}
+
+static inline void free_stack_chunks(qd_alloc_linked_stack_t *stack)
+{
+    assert(stack->size == 0);
+    //the assumption here is that next is always correctly set
+    qd_alloc_chunk_t *chunk = stack->base_chunk.next;
+    while (chunk != NULL) {
+        qd_alloc_chunk_t *next = chunk->next;
+        free(chunk);
+        chunk = next;
+    }
+}
+
+static inline bool next_chunk_stack(qd_alloc_linked_stack_t *const stack)
+{
+    assert(stack->top == CHUNK_SIZE);
+    qd_alloc_chunk_t *top = stack->top_chunk->next;
+    if (top == NULL) {
+        top = NEW(qd_alloc_chunk_t);
+        if (top == NULL) {
+            return false;
+        }
+        stack->top_chunk->next = top;
+        top->prev = stack->top_chunk;
+        top->next = NULL;
+    }
+    assert(top->prev == stack->top_chunk);
+    assert(stack->top_chunk->next == top);
+    stack->top_chunk = top;
+    stack->top = 0;
+    return true;
+}
+
+static inline bool push_stack(qd_alloc_linked_stack_t *stack, qd_alloc_item_t *item)
+{
+    const uint32_t chunk_size = CHUNK_SIZE;
+    if (stack->top == chunk_size) {
+        if (!next_chunk_stack(stack)) {
+            return false;
+        }
+    }
+    stack->size++;
+    stack->top_chunk->items[stack->top] = item;
+    stack->top++;
+    return true;
+}
+
+static inline int unordered_move_stack(qd_alloc_linked_stack_t *from, qd_alloc_linked_stack_t *to, uint32_t length)
+{
+    length = from->size < length ? from->size : length;
+    if (length == 0) {
+        return 0;
+    }
+    uint32_t remaining = length;
+    const uint32_t chunk_size = CHUNK_SIZE;
+    while (remaining > 0) {
+        //top will tell us how much data we could memcpy
+        uint32_t to_copy = remaining;
+        if (from->top == 0) {
+            prev_chunk_stack(from);
+        }
+        to_copy = from->top < to_copy ? from->top : to_copy;
+        if (to->top == chunk_size) {
+            if (!next_chunk_stack(to)) {
+                return length - remaining;
+            }
+        }
+        uint32_t remaining_to = chunk_size - to->top;
+        to_copy = remaining_to < to_copy ? remaining_to : to_copy;
+        from->top -= to_copy;
+        memcpy(&to->top_chunk->items[to->top], &from->top_chunk->items[from->top], to_copy * sizeof(qd_alloc_item_t *));
+        to->top += to_copy;
+        to->size += to_copy;
+        from->size -= to_copy;
+        remaining -= to_copy;
+    }
+    return length;
+}
 
 struct qd_alloc_pool_t {
     DEQ_LINKS(qd_alloc_pool_t);
-    qd_alloc_item_list_t free_list;
+    qd_alloc_linked_stack_t free_list;
 };
 
 qd_alloc_config_t qd_alloc_default_config_big   = {16,  32, 0};
@@ -85,7 +221,7 @@ static void qd_alloc_init(qd_alloc_type_desc_t *desc)
 
         desc->global_pool = NEW(qd_alloc_pool_t);
         DEQ_ITEM_INIT(desc->global_pool);
-        DEQ_INIT(desc->global_pool->free_list);
+        init_stack(&desc->global_pool->free_list);
         desc->lock = sys_mutex();
         DEQ_INIT(desc->tpool_list);
 #if QD_MEMORY_STATS
@@ -125,7 +261,7 @@ void *qd_alloc(qd_alloc_type_desc_t *desc, qd_alloc_pool_t **tpool)
     if (*tpool == 0) {
         NEW_CACHE_ALIGNED(qd_alloc_pool_t, *tpool);
         DEQ_ITEM_INIT(*tpool);
-        DEQ_INIT((*tpool)->free_list);
+        init_stack(&(*tpool)->free_list);
         sys_mutex_lock(desc->lock);
         DEQ_INSERT_TAIL(desc->tpool_list, *tpool);
         sys_mutex_unlock(desc->lock);
@@ -138,9 +274,8 @@ void *qd_alloc(qd_alloc_type_desc_t *desc, qd_alloc_pool_t **tpool)
     // list and return it.  Since everything we've touched is thread-local,
     // there is no need to acquire a lock.
     //
-    qd_alloc_item_t *item = DEQ_HEAD(pool->free_list);
+    qd_alloc_item_t *item = pop_stack(&pool->free_list);
     if (item) {
-        DEQ_REMOVE_HEAD(pool->free_list);
 #ifdef QD_MEMORY_DEBUG
         item->desc   = desc;
         item->header = PATTERN_FRONT;
@@ -159,19 +294,21 @@ void *qd_alloc(qd_alloc_type_desc_t *desc, qd_alloc_pool_t **tpool)
         //
         // Rebalance a full batch from the global free list to the thread list.
         //
+        const int moved = unordered_move_stack(&desc->global_pool->free_list, &pool->free_list,
+                                               desc->config->transfer_batch_size);
+        assert(moved == desc->config->transfer_batch_size);
 #if QD_MEMORY_STATS
         desc->stats->batches_rebalanced_to_threads++;
-        desc->stats->held_by_threads += desc->config->transfer_batch_size;
+        desc->stats->held_by_threads += moved;
 #endif
-        for (idx = 0; idx < desc->config->transfer_batch_size; idx++) {
-            item = DEQ_HEAD(desc->global_pool->free_list);
-            DEQ_REMOVE_HEAD(desc->global_pool->free_list);
-            DEQ_INSERT_TAIL(pool->free_list, item);
-        }
     } else {
         //
         // Allocate a full batch from the heap and put it on the thread list.
         //
+        //TODO(franz):
+        //  -   would be better to allocate in batches == transfer_batch_size
+        //      and put a small (== sizeof(transfer_batch_size)) ref_count to help the final free
+        //  -   could be beneficial directly to delink a chunk?
         for (idx = 0; idx < desc->config->transfer_batch_size; idx++) {
             size_t size = sizeof(qd_alloc_item_t) + desc->total_size
 #ifdef QD_MEMORY_DEBUG
@@ -181,8 +318,10 @@ void *qd_alloc(qd_alloc_type_desc_t *desc, qd_alloc_pool_t **tpool)
             ALLOC_CACHE_ALIGNED(size, item);
             if (item == 0)
                 break;
-            DEQ_ITEM_INIT(item);
-            DEQ_INSERT_TAIL(pool->free_list, item);
+            if (!push_stack(&pool->free_list, item)) {
+                free(item);
+                break;
+            }
             item->sequence = 0;
 #if QD_MEMORY_STATS
             desc->stats->held_by_threads++;
@@ -192,9 +331,8 @@ void *qd_alloc(qd_alloc_type_desc_t *desc, qd_alloc_pool_t **tpool)
     }
     sys_mutex_unlock(desc->lock);
 
-    item = DEQ_HEAD(pool->free_list);
+    item = pop_stack(&pool->free_list);
     if (item) {
-        DEQ_REMOVE_HEAD(pool->free_list);
 #ifdef QD_MEMORY_DEBUG
         item->desc = desc;
         item->header = PATTERN_FRONT;
@@ -213,7 +351,6 @@ void qd_dealloc(qd_alloc_type_desc_t *desc, qd_alloc_pool_t **tpool, char *p)
 {
     if (!p) return;
     qd_alloc_item_t *item = ((qd_alloc_item_t*) p) - 1;
-    int              idx;
 
 #ifdef QD_MEMORY_DEBUG
     assert (desc->header  == PATTERN_FRONT);
@@ -232,7 +369,7 @@ void qd_dealloc(qd_alloc_type_desc_t *desc, qd_alloc_pool_t **tpool, char *p)
     if (*tpool == 0) {
         *tpool = NEW(qd_alloc_pool_t);
         DEQ_ITEM_INIT(*tpool);
-        DEQ_INIT((*tpool)->free_list);
+        init_stack(&(*tpool)->free_list);
         sys_mutex_lock(desc->lock);
         DEQ_INSERT_TAIL(desc->tpool_list, *tpool);
         sys_mutex_unlock(desc->lock);
@@ -241,9 +378,11 @@ void qd_dealloc(qd_alloc_type_desc_t *desc, qd_alloc_pool_t **tpool, char *p)
     qd_alloc_pool_t *pool = *tpool;
 
     item->sequence++;
-    DEQ_INSERT_TAIL(pool->free_list, item);
+    if (!push_stack(&pool->free_list, item)) {
+        free(item);
+    }
 
-    if (DEQ_SIZE(pool->free_list) <= desc->config->local_free_list_max)
+    if (DEQ_SIZE(pool->free_list) < desc->config->local_free_list_max)
         return;
 
     //
@@ -251,24 +390,20 @@ void qd_dealloc(qd_alloc_type_desc_t *desc, qd_alloc_pool_t **tpool, char *p)
     // rebalanced back to the global list.
     //
     sys_mutex_lock(desc->lock);
+    const int moved = unordered_move_stack(&pool->free_list, &desc->global_pool->free_list,
+                                           desc->config->transfer_batch_size);
+    assert(moved == desc->config->transfer_batch_size);
 #if QD_MEMORY_STATS
     desc->stats->batches_rebalanced_to_global++;
-    desc->stats->held_by_threads -= desc->config->transfer_batch_size;
+    desc->stats->held_by_threads -= moved;
 #endif
-    for (idx = 0; idx < desc->config->transfer_batch_size; idx++) {
-        item = DEQ_HEAD(pool->free_list);
-        DEQ_REMOVE_HEAD(pool->free_list);
-        DEQ_INSERT_TAIL(desc->global_pool->free_list, item);
-    }
-
     //
     // If there's a global_free_list size limit, remove items until the limit is
     // not exceeded.
     //
     if (desc->config->global_free_list_max != 0) {
         while (DEQ_SIZE(desc->global_pool->free_list) > desc->config->global_free_list_max) {
-            item = DEQ_HEAD(desc->global_pool->free_list);
-            DEQ_REMOVE_HEAD(desc->global_pool->free_list);
+            item = pop_stack(&desc->global_pool->free_list);
             free(item);
 #if QD_MEMORY_STATS
             desc->stats->total_free_to_heap++;
@@ -329,15 +464,15 @@ void qd_alloc_finalize(void)
         //
         // Reclaim the items on the global free pool
         //
-        item = DEQ_HEAD(desc->global_pool->free_list);
+        item = pop_stack(&desc->global_pool->free_list);
         while (item) {
-            DEQ_REMOVE_HEAD(desc->global_pool->free_list);
             free(item);
 #if QD_MEMORY_STATS
             desc->stats->total_free_to_heap++;
 #endif
-            item = DEQ_HEAD(desc->global_pool->free_list);
+            item = pop_stack(&desc->global_pool->free_list);
         }
+        free_stack_chunks(&desc->global_pool->free_list);
         free(desc->global_pool);
         desc->global_pool = 0;
 
@@ -346,17 +481,16 @@ void qd_alloc_finalize(void)
         //
         qd_alloc_pool_t *tpool = DEQ_HEAD(desc->tpool_list);
         while (tpool) {
-            item = DEQ_HEAD(tpool->free_list);
+            item = pop_stack(&tpool->free_list);
             while (item) {
-                DEQ_REMOVE_HEAD(tpool->free_list);
                 free(item);
 #if QD_MEMORY_STATS
                 desc->stats->total_free_to_heap++;
 #endif
-                item = DEQ_HEAD(tpool->free_list);
+                item = pop_stack(&tpool->free_list);
             }
-
             DEQ_REMOVE_HEAD(desc->tpool_list);
+            free_stack_chunks(&tpool->free_list);
             free(tpool);
             tpool = DEQ_HEAD(desc->tpool_list);
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org