You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by bp...@apache.org on 2015/12/11 11:56:58 UTC
celix git commit: CELIX-77: add threadpool support
Repository: celix
Updated Branches:
refs/heads/develop b56c47d14 -> 1e73e4d1c
CELIX-77: add threadpool support
Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/1e73e4d1
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/1e73e4d1
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/1e73e4d1
Branch: refs/heads/develop
Commit: 1e73e4d1cad60bb1af4cd9f31382af2cd22687db
Parents: b56c47d
Author: Bjoern Petri <bp...@apache.org>
Authored: Fri Dec 11 11:56:34 2015 +0100
Committer: Bjoern Petri <bp...@apache.org>
Committed: Fri Dec 11 11:56:34 2015 +0100
----------------------------------------------------------------------
utils/CMakeLists.txt | 96 ++---
utils/private/src/thpool.c | 562 +++++++++++++++++++++++++++
utils/private/test/thread_pool_test.cpp | 118 ++++++
utils/public/docs/Design.md | 52 +++
utils/public/docs/FAQ.md | 33 ++
utils/public/docs/README.md | 62 +++
utils/public/include/thpool.h | 164 ++++++++
7 files changed, 1043 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/celix/blob/1e73e4d1/utils/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/utils/CMakeLists.txt b/utils/CMakeLists.txt
index 93ff353..62a953f 100644
--- a/utils/CMakeLists.txt
+++ b/utils/CMakeLists.txt
@@ -24,28 +24,31 @@ if (UTILS)
include_directories("private/include")
include_directories("public/include")
add_library(celix_utils SHARED
- private/src/array_list.c
- public/include/array_list.h
- private/include/array_list_private.h
+ private/src/array_list.c
+ public/include/array_list.h
+ private/include/array_list_private.h
- private/src/hash_map.c
- public/include/hash_map.h
- private/include/hash_map_private.h
+ private/src/hash_map.c
+ public/include/hash_map.h
+ private/include/hash_map_private.h
- private/src/linked_list.c
- private/src/linked_list_iterator.c
- public/include/linked_list.h
- public/include/linked_list_iterator.h
- private/include/linked_list_private.h
+ private/src/linked_list.c
+ private/src/linked_list_iterator.c
+ public/include/linked_list.h
+ public/include/linked_list_iterator.h
+ private/include/linked_list_private.h
- public/include/exports.h
-
- private/src/celix_threads.c
- public/include/celix_threads.h
- )
+ public/include/exports.h
+
+ private/src/celix_threads.c
+ public/include/celix_threads.h
+
+ private/src/thpool.c
+ public/include/thpool.h
+ )
IF(UNIX)
- target_link_libraries(celix_utils m pthread)
+ target_link_libraries(celix_utils m pthread)
ENDIF(UNIX)
install(TARGETS celix_utils DESTINATION lib COMPONENT framework)
@@ -54,34 +57,39 @@ if (UTILS)
celix_subproject(UTILS-TESTS "Option to build the utilities library tests" "OFF")
- if (ENABLE_TESTING AND UTILS-TESTS)
- find_package(CppUTest REQUIRED)
+ if (ENABLE_TESTING AND UTILS-TESTS)
+ find_package(CppUTest REQUIRED)
+
+ include_directories(${CUNIT_INCLUDE_DIRS})
+ include_directories(${CPPUTEST_INCLUDE_DIR})
+ include_directories("${PROJECT_SOURCE_DIR}/utils/public/include")
+ include_directories("${PROJECT_SOURCE_DIR}/utils/private/include")
+
+ add_executable(hash_map_test private/test/hash_map_test.cpp)
+ target_link_libraries(hash_map_test celix_utils ${CPPUTEST_LIBRARY} pthread)
+
+ add_executable(array_list_test private/test/array_list_test.cpp)
+ target_link_libraries(array_list_test celix_utils ${CPPUTEST_LIBRARY} pthread)
+
+ add_executable(celix_threads_test private/test/celix_threads_test.cpp)
+ target_link_libraries(celix_threads_test celix_utils ${CPPUTEST_LIBRARY} ${CPPUTEST_EXT_LIBRARY} pthread)
+ add_executable(linked_list_test private/test/linked_list_test.cpp)
+ target_link_libraries(linked_list_test celix_utils ${CPPUTEST_LIBRARY} pthread)
+
+ add_executable(thread_pool_test private/test/thread_pool_test.cpp)
+ target_link_libraries(thread_pool_test celix_utils ${CPPUTEST_LIBRARY} pthread)
- include_directories(${CUNIT_INCLUDE_DIRS})
- include_directories(${CPPUTEST_INCLUDE_DIR})
- include_directories("${PROJECT_SOURCE_DIR}/utils/public/include")
- include_directories("${PROJECT_SOURCE_DIR}/utils/private/include")
-
- add_executable(hash_map_test private/test/hash_map_test.cpp)
- target_link_libraries(hash_map_test celix_utils ${CPPUTEST_LIBRARY} pthread)
-
- add_executable(array_list_test private/test/array_list_test.cpp)
- target_link_libraries(array_list_test celix_utils ${CPPUTEST_LIBRARY} pthread)
-
- add_executable(celix_threads_test private/test/celix_threads_test.cpp)
- target_link_libraries(celix_threads_test celix_utils ${CPPUTEST_LIBRARY} ${CPPUTEST_EXT_LIBRARY} pthread)
-
- add_executable(linked_list_test private/test/linked_list_test.cpp)
- target_link_libraries(linked_list_test celix_utils ${CPPUTEST_LIBRARY} pthread)
-
- add_test(NAME run_array_list_test COMMAND array_list_test)
- add_test(NAME run_hash_map_test COMMAND hash_map_test)
- add_test(NAME run_celix_threads_test COMMAND celix_threads_test)
- add_test(NAME run_linked_list_test COMMAND linked_list_test)
- SETUP_TARGET_FOR_COVERAGE(array_list_test array_list_test ${CMAKE_BINARY_DIR}/coverage/array_list_test/array_list_test)
- SETUP_TARGET_FOR_COVERAGE(hash_map hash_map_test ${CMAKE_BINARY_DIR}/coverage/hash_map_test/hash_map_test)
- SETUP_TARGET_FOR_COVERAGE(celix_threads_test celix_threads_test ${CMAKE_BINARY_DIR}/coverage/celix_threads_test/celix_threads_test)
- SETUP_TARGET_FOR_COVERAGE(linked_list_test linked_list_test ${CMAKE_BINARY_DIR}/coverage/linked_list_test/linked_list_test)
+ add_test(NAME run_array_list_test COMMAND array_list_test)
+ add_test(NAME run_hash_map_test COMMAND hash_map_test)
+ add_test(NAME run_celix_threads_test COMMAND celix_threads_test)
+ add_test(NAME run_thread_pool_test COMMAND thread_pool_test)
+ add_test(NAME run_linked_list_test COMMAND linked_list_test)
+
+ SETUP_TARGET_FOR_COVERAGE(array_list_test array_list_test ${CMAKE_BINARY_DIR}/coverage/array_list_test/array_list_test)
+ SETUP_TARGET_FOR_COVERAGE(hash_map hash_map_test ${CMAKE_BINARY_DIR}/coverage/hash_map_test/hash_map_test)
+ SETUP_TARGET_FOR_COVERAGE(celix_threads_test celix_threads_test ${CMAKE_BINARY_DIR}/coverage/celix_threads_test/celix_threads_test)
+ SETUP_TARGET_FOR_COVERAGE(thread_pool_test thread_pool_test ${CMAKE_BINARY_DIR}/coverage/thread_pool_test/thread_pool_test)
+ SETUP_TARGET_FOR_COVERAGE(linked_list_test linked_list_test ${CMAKE_BINARY_DIR}/coverage/linked_list_test/linked_list_test)
endif(ENABLE_TESTING AND UTILS-TESTS)
endif (UTILS)
http://git-wip-us.apache.org/repos/asf/celix/blob/1e73e4d1/utils/private/src/thpool.c
----------------------------------------------------------------------
diff --git a/utils/private/src/thpool.c b/utils/private/src/thpool.c
new file mode 100644
index 0000000..f81350e
--- /dev/null
+++ b/utils/private/src/thpool.c
@@ -0,0 +1,562 @@
+/* ********************************
+ * Author: Johan Hanssen Seferidis
+ * License: MIT
+ * Description: Library providing a threading pool where you can add
+ * work. For usage, check the thpool.h file or README.md
+ *
+ *//** @file thpool.h *//*
+ *
+ ********************************/
+
+
+#include <unistd.h>
+#include <signal.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <pthread.h>
+#include <errno.h>
+#include <time.h>
+#include <sys/prctl.h>
+
+#include "thpool.h"
+
+#ifdef THPOOL_DEBUG
+#define THPOOL_DEBUG 1
+#else
+#define THPOOL_DEBUG 0
+#endif
+
+#define MAX_NANOSEC 999999999
+#define CEIL(X) ((X-(int)(X)) > 0 ? (int)(X+1) : (int)(X))
+
+static volatile int threads_keepalive;
+static volatile int threads_on_hold;
+
+
+
+
+
+/* ========================== STRUCTURES ============================ */
+
+
+/* Binary semaphore */
+typedef struct bsem {
+ pthread_mutex_t mutex;
+ pthread_cond_t cond;
+ int v;
+} bsem;
+
+
+/* Job */
+typedef struct job{
+ struct job* prev; /* pointer to previous job */
+ void* (*function)(void* arg); /* function pointer */
+ void* arg; /* function's argument */
+} job;
+
+
+/* Job queue */
+typedef struct jobqueue{
+ pthread_mutex_t rwmutex; /* used for queue r/w access */
+ job *front; /* pointer to front of queue */
+ job *rear; /* pointer to rear of queue */
+ bsem *has_jobs; /* flag as binary semaphore */
+ int len; /* number of jobs in queue */
+} jobqueue;
+
+
+/* Thread */
+typedef struct thread{
+ int id; /* friendly id */
+ pthread_t pthread; /* pointer to actual thread */
+ struct thpool_* thpool_p; /* access to thpool */
+} thread;
+
+
+/* Threadpool */
+typedef struct thpool_{
+ thread** threads; /* pointer to threads */
+ volatile int num_threads_alive; /* threads currently alive */
+ volatile int num_threads_working; /* threads currently working */
+ pthread_mutex_t thcount_lock; /* used for thread count etc */
+ jobqueue* jobqueue_p; /* pointer to the job queue */
+} thpool_;
+
+
+
+
+
+/* ========================== PROTOTYPES ============================ */
+
+
+static void thread_init(thpool_* thpool_p, struct thread** thread_p, int id);
+static void* thread_do(struct thread* thread_p);
+static void thread_hold();
+static void thread_destroy(struct thread* thread_p);
+
+static int jobqueue_init(thpool_* thpool_p);
+static void jobqueue_clear(thpool_* thpool_p);
+static void jobqueue_push(thpool_* thpool_p, struct job* newjob_p);
+static struct job* jobqueue_pull(thpool_* thpool_p);
+static void jobqueue_destroy(thpool_* thpool_p);
+
+static void bsem_init(struct bsem *bsem_p, int value);
+static void bsem_reset(struct bsem *bsem_p);
+static void bsem_post(struct bsem *bsem_p);
+static void bsem_post_all(struct bsem *bsem_p);
+static void bsem_wait(struct bsem *bsem_p);
+
+
+
+
+
+/* ========================== THREADPOOL ============================ */
+
+
+/* Initialise thread pool */
+struct thpool_* thpool_init(int num_threads){
+
+ threads_on_hold = 0;
+ threads_keepalive = 1;
+
+ if ( num_threads < 0){
+ num_threads = 0;
+ }
+
+ /* Make new thread pool */
+ thpool_* thpool_p;
+ thpool_p = (struct thpool_*)malloc(sizeof(struct thpool_));
+ if (thpool_p == NULL){
+ fprintf(stderr, "thpool_init(): Could not allocate memory for thread pool\n");
+ return NULL;
+ }
+ thpool_p->num_threads_alive = 0;
+ thpool_p->num_threads_working = 0;
+
+ /* Initialise the job queue */
+ if (jobqueue_init(thpool_p) == -1){
+ fprintf(stderr, "thpool_init(): Could not allocate memory for job queue\n");
+ free(thpool_p);
+ return NULL;
+ }
+
+ /* Make threads in pool */
+ thpool_p->threads = (struct thread**)malloc(num_threads * sizeof(struct thread));
+ if (thpool_p->threads == NULL){
+ fprintf(stderr, "thpool_init(): Could not allocate memory for threads\n");
+ jobqueue_destroy(thpool_p);
+ free(thpool_p->jobqueue_p);
+ free(thpool_p);
+ return NULL;
+ }
+
+ pthread_mutex_init(&(thpool_p->thcount_lock), NULL);
+
+ /* Thread init */
+ int n;
+ for (n=0; n<num_threads; n++){
+ thread_init(thpool_p, &thpool_p->threads[n], n);
+ if (THPOOL_DEBUG)
+ printf("THPOOL_DEBUG: Created thread %d in pool \n", n);
+ }
+
+ /* Wait for threads to initialize */
+ while (thpool_p->num_threads_alive != num_threads) {}
+
+ return thpool_p;
+}
+
+
+/* Add work to the thread pool */
+int thpool_add_work(thpool_* thpool_p, void *(*function_p)(void*), void* arg_p){
+ job* newjob;
+
+ newjob=(struct job*)malloc(sizeof(struct job));
+ if (newjob==NULL){
+ fprintf(stderr, "thpool_add_work(): Could not allocate memory for new job\n");
+ return -1;
+ }
+
+ /* add function and argument */
+ newjob->function=function_p;
+ newjob->arg=arg_p;
+
+ /* add job to queue */
+ pthread_mutex_lock(&thpool_p->jobqueue_p->rwmutex);
+ jobqueue_push(thpool_p, newjob);
+ pthread_mutex_unlock(&thpool_p->jobqueue_p->rwmutex);
+
+ return 0;
+}
+
+
+/* Wait until all jobs have finished */
+void thpool_wait(thpool_* thpool_p){
+
+ /* Continuous polling */
+ double timeout = 1.0;
+ time_t start, end;
+ double tpassed = 0.0;
+ time (&start);
+ while (tpassed < timeout &&
+ (thpool_p->jobqueue_p->len || thpool_p->num_threads_working))
+ {
+ time (&end);
+ tpassed = difftime(end,start);
+ }
+
+ /* Exponential polling */
+ long init_nano = 1; /* MUST be above 0 */
+ long new_nano;
+ double multiplier = 1.01;
+ int max_secs = 20;
+
+ struct timespec polling_interval;
+ polling_interval.tv_sec = 0;
+ polling_interval.tv_nsec = init_nano;
+
+ while (thpool_p->jobqueue_p->len || thpool_p->num_threads_working)
+ {
+ nanosleep(&polling_interval, NULL);
+ if ( polling_interval.tv_sec < max_secs ){
+ new_nano = CEIL(polling_interval.tv_nsec * multiplier);
+ polling_interval.tv_nsec = new_nano % MAX_NANOSEC;
+ if ( new_nano > MAX_NANOSEC ) {
+ polling_interval.tv_sec ++;
+ }
+ }
+ else break;
+ }
+
+ /* Fall back to max polling */
+ while (thpool_p->jobqueue_p->len || thpool_p->num_threads_working){
+ sleep(max_secs);
+ }
+}
+
+
+/* Destroy the threadpool */
+void thpool_destroy(thpool_* thpool_p){
+
+ volatile int threads_total = thpool_p->num_threads_alive;
+
+ /* End each thread 's infinite loop */
+ threads_keepalive = 0;
+
+ /* Give one second to kill idle threads */
+ double TIMEOUT = 1.0;
+ time_t start, end;
+ double tpassed = 0.0;
+ time (&start);
+ while (tpassed < TIMEOUT && thpool_p->num_threads_alive){
+ bsem_post_all(thpool_p->jobqueue_p->has_jobs);
+ time (&end);
+ tpassed = difftime(end,start);
+ }
+
+ /* Poll remaining threads */
+ while (thpool_p->num_threads_alive){
+ bsem_post_all(thpool_p->jobqueue_p->has_jobs);
+ sleep(1);
+ }
+
+ /* Job queue cleanup */
+ jobqueue_destroy(thpool_p);
+ free(thpool_p->jobqueue_p);
+
+ /* Deallocs */
+ int n;
+ for (n=0; n < threads_total; n++){
+ thread_destroy(thpool_p->threads[n]);
+ }
+ free(thpool_p->threads);
+ free(thpool_p);
+}
+
+
+/* Pause all threads in threadpool */
+void thpool_pause(thpool_* thpool_p) {
+ int n;
+ for (n=0; n < thpool_p->num_threads_alive; n++){
+ pthread_kill(thpool_p->threads[n]->pthread, SIGUSR1);
+ }
+}
+
+
+/* Resume all threads in threadpool */
+void thpool_resume(thpool_* thpool_p) {
+ threads_on_hold = 0;
+}
+
+
+
+
+
+/* ============================ THREAD ============================== */
+
+
+/* Initialize a thread in the thread pool
+ *
+ * @param thread address to the pointer of the thread to be created
+ * @param id id to be given to the thread
+ *
+ */
+static void thread_init (thpool_* thpool_p, struct thread** thread_p, int id){
+
+ *thread_p = (struct thread*)malloc(sizeof(struct thread));
+ if (thread_p == NULL){
+ fprintf(stderr, "thpool_init(): Could not allocate memory for thread\n");
+ exit(1);
+ }
+
+ (*thread_p)->thpool_p = thpool_p;
+ (*thread_p)->id = id;
+
+ pthread_create(&(*thread_p)->pthread, NULL, (void *)thread_do, (*thread_p));
+ pthread_detach((*thread_p)->pthread);
+
+}
+
+
+/* Sets the calling thread on hold */
+static void thread_hold () {
+ threads_on_hold = 1;
+ while (threads_on_hold){
+ sleep(1);
+ }
+}
+
+
+/* What each thread is doing
+*
+* In principle this is an endless loop. The only time this loop gets interuppted is once
+* thpool_destroy() is invoked or the program exits.
+*
+* @param thread thread that will run this function
+* @return nothing
+*/
+static void* thread_do(struct thread* thread_p){
+ /* Set thread name for profiling and debuging */
+ char thread_name[128] = {0};
+ sprintf(thread_name, "thread-pool-%d", thread_p->id);
+ prctl(PR_SET_NAME, thread_name);
+
+ /* Assure all threads have been created before starting serving */
+ thpool_* thpool_p = thread_p->thpool_p;
+
+ /* Register signal handler */
+ struct sigaction act;
+ act.sa_handler = thread_hold;
+ if (sigaction(SIGUSR1, &act, NULL) == -1) {
+ fprintf(stderr, "thread_do(): cannot handle SIGUSR1");
+ }
+
+ /* Mark thread as alive (initialized) */
+ pthread_mutex_lock(&thpool_p->thcount_lock);
+ thpool_p->num_threads_alive += 1;
+ pthread_mutex_unlock(&thpool_p->thcount_lock);
+
+ while(threads_keepalive){
+
+ bsem_wait(thpool_p->jobqueue_p->has_jobs);
+
+ if (threads_keepalive){
+
+ pthread_mutex_lock(&thpool_p->thcount_lock);
+ thpool_p->num_threads_working++;
+ pthread_mutex_unlock(&thpool_p->thcount_lock);
+
+ /* Read job from queue and execute it */
+ void*(*func_buff)(void* arg);
+ void* arg_buff;
+ job* job_p;
+ pthread_mutex_lock(&thpool_p->jobqueue_p->rwmutex);
+ job_p = jobqueue_pull(thpool_p);
+ pthread_mutex_unlock(&thpool_p->jobqueue_p->rwmutex);
+ if (job_p) {
+ func_buff = job_p->function;
+ arg_buff = job_p->arg;
+ func_buff(arg_buff);
+ free(job_p);
+ }
+
+ pthread_mutex_lock(&thpool_p->thcount_lock);
+ thpool_p->num_threads_working--;
+ pthread_mutex_unlock(&thpool_p->thcount_lock);
+
+ }
+ }
+ pthread_mutex_lock(&thpool_p->thcount_lock);
+ thpool_p->num_threads_alive --;
+ pthread_mutex_unlock(&thpool_p->thcount_lock);
+
+ return NULL;
+}
+
+
+/* Frees a thread */
+static void thread_destroy (thread* thread_p){
+ free(thread_p);
+}
+
+
+
+
+
+/* ============================ JOB QUEUE =========================== */
+
+
+/* Initialize queue */
+static int jobqueue_init(thpool_* thpool_p){
+
+ thpool_p->jobqueue_p = (struct jobqueue*)malloc(sizeof(struct jobqueue));
+ if (thpool_p->jobqueue_p == NULL){
+ return -1;
+ }
+ thpool_p->jobqueue_p->len = 0;
+ thpool_p->jobqueue_p->front = NULL;
+ thpool_p->jobqueue_p->rear = NULL;
+
+ thpool_p->jobqueue_p->has_jobs = (struct bsem*)malloc(sizeof(struct bsem));
+ if (thpool_p->jobqueue_p->has_jobs == NULL){
+ return -1;
+ }
+
+ pthread_mutex_init(&(thpool_p->jobqueue_p->rwmutex), NULL);
+ bsem_init(thpool_p->jobqueue_p->has_jobs, 0);
+
+ return 0;
+}
+
+
+/* Clear the queue */
+static void jobqueue_clear(thpool_* thpool_p){
+
+ while(thpool_p->jobqueue_p->len){
+ free(jobqueue_pull(thpool_p));
+ }
+
+ thpool_p->jobqueue_p->front = NULL;
+ thpool_p->jobqueue_p->rear = NULL;
+ bsem_reset(thpool_p->jobqueue_p->has_jobs);
+ thpool_p->jobqueue_p->len = 0;
+
+}
+
+
+/* Add (allocated) job to queue
+ *
+ * Notice: Caller MUST hold a mutex
+ */
+static void jobqueue_push(thpool_* thpool_p, struct job* newjob){
+
+ newjob->prev = NULL;
+
+ switch(thpool_p->jobqueue_p->len){
+
+ case 0: /* if no jobs in queue */
+ thpool_p->jobqueue_p->front = newjob;
+ thpool_p->jobqueue_p->rear = newjob;
+ break;
+
+ default: /* if jobs in queue */
+ thpool_p->jobqueue_p->rear->prev = newjob;
+ thpool_p->jobqueue_p->rear = newjob;
+
+ }
+ thpool_p->jobqueue_p->len++;
+
+ bsem_post(thpool_p->jobqueue_p->has_jobs);
+}
+
+
+/* Get first job from queue(removes it from queue)
+ *
+ * Notice: Caller MUST hold a mutex
+ */
+static struct job* jobqueue_pull(thpool_* thpool_p){
+
+ job* job_p;
+ job_p = thpool_p->jobqueue_p->front;
+
+ switch(thpool_p->jobqueue_p->len){
+
+ case 0: /* if no jobs in queue */
+ break;
+
+ case 1: /* if one job in queue */
+ thpool_p->jobqueue_p->front = NULL;
+ thpool_p->jobqueue_p->rear = NULL;
+ thpool_p->jobqueue_p->len = 0;
+ break;
+
+ default: /* if >1 jobs in queue */
+ thpool_p->jobqueue_p->front = job_p->prev;
+ thpool_p->jobqueue_p->len--;
+ /* more than one job in queue -> post it */
+ bsem_post(thpool_p->jobqueue_p->has_jobs);
+
+ }
+
+ return job_p;
+}
+
+
+/* Free all queue resources back to the system */
+static void jobqueue_destroy(thpool_* thpool_p){
+ jobqueue_clear(thpool_p);
+ free(thpool_p->jobqueue_p->has_jobs);
+}
+
+
+
+
+
+/* ======================== SYNCHRONISATION ========================= */
+
+
+/* Init semaphore to 1 or 0 */
+static void bsem_init(bsem *bsem_p, int value) {
+ if (value < 0 || value > 1) {
+ fprintf(stderr, "bsem_init(): Binary semaphore can take only values 1 or 0");
+ exit(1);
+ }
+ pthread_mutex_init(&(bsem_p->mutex), NULL);
+ pthread_cond_init(&(bsem_p->cond), NULL);
+ bsem_p->v = value;
+}
+
+
+/* Reset semaphore to 0 */
+static void bsem_reset(bsem *bsem_p) {
+ bsem_init(bsem_p, 0);
+}
+
+
+/* Post to at least one thread */
+static void bsem_post(bsem *bsem_p) {
+ pthread_mutex_lock(&bsem_p->mutex);
+ bsem_p->v = 1;
+ pthread_cond_signal(&bsem_p->cond);
+ pthread_mutex_unlock(&bsem_p->mutex);
+}
+
+
+/* Post to all threads */
+static void bsem_post_all(bsem *bsem_p) {
+ pthread_mutex_lock(&bsem_p->mutex);
+ bsem_p->v = 1;
+ pthread_cond_broadcast(&bsem_p->cond);
+ pthread_mutex_unlock(&bsem_p->mutex);
+}
+
+
+/* Wait on semaphore until semaphore has value 0 */
+static void bsem_wait(bsem* bsem_p) {
+ pthread_mutex_lock(&bsem_p->mutex);
+ while (bsem_p->v != 1) {
+ pthread_cond_wait(&bsem_p->cond, &bsem_p->mutex);
+ }
+ bsem_p->v = 0;
+ pthread_mutex_unlock(&bsem_p->mutex);
+}
http://git-wip-us.apache.org/repos/asf/celix/blob/1e73e4d1/utils/private/test/thread_pool_test.cpp
----------------------------------------------------------------------
diff --git a/utils/private/test/thread_pool_test.cpp b/utils/private/test/thread_pool_test.cpp
new file mode 100644
index 0000000..5dae4c8
--- /dev/null
+++ b/utils/private/test/thread_pool_test.cpp
@@ -0,0 +1,118 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership. The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * array_list_test.cpp
+ *
+ * \date Sep 15, 2015
+ * \author Menno van der Graaf & Alexander
+ * \copyright Apache License, Version 2.0
+ */
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+
+#include "CppUTest/TestHarness.h"
+#include "CppUTest/TestHarness_c.h"
+#include "CppUTest/CommandLineTestRunner.h"
+
+extern "C" {
+#include "celix_threads.h"
+#include "thpool.h"
+}
+
+celix_thread_mutex_t mutex;
+int sum=0;
+
+
+void * increment(void *) {
+ celixThreadMutex_lock(&mutex);
+ sum ++;
+ celixThreadMutex_unlock(&mutex);
+ return NULL;
+}
+
+int main(int argc, char** argv) {
+ return RUN_ALL_TESTS(argc, argv);
+}
+
+
+//----------------------TEST THREAD FUNCTION DECLARATIONS----------------------
+
+//----------------------TESTGROUP DEFINES----------------------
+
+TEST_GROUP(thread_pool) {
+ threadpool myPool;
+
+ void setup(void) {
+ }
+
+ void teardown(void) {
+ }
+};
+
+
+//----------------------THREAD_POOL TESTS----------------------
+
+TEST(thread_pool, create) {
+
+ myPool = thpool_init(5); // pool of 5 threads
+ CHECK((myPool != NULL));
+ thpool_destroy(myPool);
+}
+
+TEST(thread_pool, do_work) {
+
+ myPool = thpool_init(5); // pool of 5 threads
+ celixThreadMutex_create(&mutex, NULL);
+ CHECK((myPool != NULL));
+ int n;
+ sum = 0;
+ int num_jobs = 1000;
+ for (n = 0; n < num_jobs; n++){
+ thpool_add_work(myPool, increment, NULL);
+ }
+ thpool_wait(myPool);
+ thpool_destroy(myPool);
+ CHECK_EQUAL(1000, sum);
+ celixThreadMutex_destroy(&mutex);
+}
+
+TEST(thread_pool, do_work_with_pause) {
+
+ myPool = thpool_init(5); // pool of 5 threads
+ celixThreadMutex_create(&mutex, NULL);
+ CHECK((myPool != NULL));
+ int n;
+ sum = 0;
+ int num_jobs = 500000;
+ for (n = 0; n < num_jobs; n++){
+ thpool_add_work(myPool, increment, NULL);
+ }
+ sleep(1);
+ thpool_pause(myPool);
+ for (n = 0; n < num_jobs; n++){
+ thpool_add_work(myPool, increment, NULL);
+ }
+ thpool_resume(myPool);
+ thpool_wait(myPool);
+ thpool_destroy(myPool);
+ CHECK_EQUAL(1000000, sum);
+ celixThreadMutex_destroy(&mutex);
+}
http://git-wip-us.apache.org/repos/asf/celix/blob/1e73e4d1/utils/public/docs/Design.md
----------------------------------------------------------------------
diff --git a/utils/public/docs/Design.md b/utils/public/docs/Design.md
new file mode 100644
index 0000000..00fe1b4
--- /dev/null
+++ b/utils/public/docs/Design.md
@@ -0,0 +1,52 @@
+## High level
+
+ Description: Library providing a threading pool where you can add work on the fly. The number
+ of threads in the pool is adjustable when creating the pool. In most cases
+ this should equal the number of threads supported by your cpu.
+
+ For an example on how to use the threadpool, check the main.c file or just read
+ the documentation found in the README.md file.
+
+ In this header file a detailed overview of the functions and the threadpool's logical
+ scheme is presented in case you wish to tweak or alter something.
+
+
+
+ _______________________________________________________
+ / \
+ | JOB QUEUE | job1 | job2 | job3 | job4 | .. |
+ | |
+ | threadpool | thread1 | thread2 | .. |
+ \_______________________________________________________/
+
+
+ Description: Jobs are added to the job queue. Once a thread in the pool
+ is idle, it is assigned with the first job from the queue(and
+ erased from the queue). It's each thread's job to read from
+ the queue serially(using lock) and executing each job
+ until the queue is empty.
+
+
+ Scheme:
+
+ thpool______ jobqueue____ ______
+ | | | | .----------->|_job0_| Newly added job
+ | | | rear ----------' |_job1_|
+ | jobqueue----------------->| | |_job2_|
+ | | | front ----------. |__..__|
+ |___________| |___________| '----------->|_jobn_| Job for thread to take
+
+
+ job0________
+ | |
+ | function---->
+ | |
+ | arg------->
+ | | job1________
+ | next-------------->| |
+ |___________| | |..
+
+
+## Synchronisation
+
+*Mutexes* and *binary semaphores* are the main tools to achieve synchronisation between threads.
http://git-wip-us.apache.org/repos/asf/celix/blob/1e73e4d1/utils/public/docs/FAQ.md
----------------------------------------------------------------------
diff --git a/utils/public/docs/FAQ.md b/utils/public/docs/FAQ.md
new file mode 100644
index 0000000..584a699
--- /dev/null
+++ b/utils/public/docs/FAQ.md
@@ -0,0 +1,33 @@
+
+###Why isn't pthread_exit() used to exit a thread?
+`thread_do` used to use pthread_exit(). However that resulted in
+hard times of testing for memory leaks. The reason is that on pthread_exit()
+not all memory is freed bt pthread (probably for future threads or false
+belief that the application is terminating). For these reasons a simple return
+is used.
+
+Interestingly using `pthread_exit()` results in much more memory being allocated.
+
+
+###Why do you use sleep() after calling thpool_destroy()?
+This is needed only in the tests. The reason is that if you call thpool_destroy
+and then exit immedietely, maybe the program will exit before all the threads
+had the time to deallocate. In that way it is impossible to check for memory
+leaks.
+
+In production you don't have to worry about this since if you call exit,
+immedietely after you destroyied the pool, the threads will be freed
+anyway by the OS. If you eitherway destroy the pool in the middle of your
+program it doesn't matter again since the program will not exit immediately
+and thus threads will have more than enough time to terminate.
+
+
+
+###Why does wait() use all my CPU?
+Normally `wait()` will spike CPU usage to full when called. This is normal as long as it doesn't last for more than 1 second. The reason this happens is that `wait()` goes through various phases of polling (what is called smart polling).
+
+ * Initially there is no interval between polling and hence the 100% use of your CPU.
+ * After that the polling interval grows exponentially.
+ * Finally after x seconds, if there is still work, polling falls back to a very big interval.
+
+The reason `wait()` works in this way, is that the function is mostly used when someone wants to wait for some calculation to finish. So if the calculation is assumed to take a long time then we don't want to poll too often. Still we want to poll fast in case the calculation is a simple one. To solve these two problems, this seemingly awkward behaviour is present.
http://git-wip-us.apache.org/repos/asf/celix/blob/1e73e4d1/utils/public/docs/README.md
----------------------------------------------------------------------
diff --git a/utils/public/docs/README.md b/utils/public/docs/README.md
new file mode 100644
index 0000000..0a07ebc
--- /dev/null
+++ b/utils/public/docs/README.md
@@ -0,0 +1,62 @@
+![Build status](http://178.62.170.124:3000/pithikos/c-thread-pool/badge/?branch=master)
+
+# C Thread Pool
+
+This is a minimal but fully functional threadpool implementation.
+
+ * ANCI C and POSIX compliant
+ * Number of threads can be chosen on initialization
+ * Minimal but powerful interface
+ * Full documentation
+
+The threadpool is under MIT license. Notice that this project took a considerable amount of work and sacrifice of my free time and the reason I give it for free (even for commercial use) is so when you become rich and wealthy you don't forget about us open-source creatures of the night. Cheers!
+
+
+## v2 Changes
+
+This is an updated and heavily refactored version of my original threadpool. The main things taken into consideration in this new version are:
+
+ * Synchronisation control from the user (pause/resume/wait)
+ * Thorough testing for memory leaks and race conditions
+ * Cleaner and more opaque API
+ * Smart polling - polling interval changes on-the-fly
+
+
+## Run an example
+
+The library is not precompiled so you have to compile it with your project. The thread pool
+uses POSIX threads so if you compile with gcc on Linux you have to use the flag `-pthread` like this:
+
+ gcc example.c thpool.c -D THPOOL_DEBUG -pthread -o example
+
+
+Then run the executable like this:
+
+ ./example
+
+
+## Basic usage
+
+1. Include the header in your source file: `#include "thpool.h"`
+2. Create a thread pool with number of threads you want: `threadpool thpool = thpool_init(4);`
+3. Add work to the pool: `thpool_add_work(thpool, (void*)function_p, (void*)arg_p);`
+
+The workers(threads) will start their work automatically as fast as there is new work
+in the pool. If you want to wait for all added work to be finished before continuing
+you can use `thpool_wait(thpool);`. If you want to destroy the pool you can use
+`thpool_destroy(thpool);`.
+
+
+
+## API
+
+For a deeper look into the documentation check in the [thpool.h](https://github.com/Pithikos/C-Thread-Pool/blob/master/thpool.h) file. Below is a fast practical overview.
+
+| Function example | Description |
+|---------------------------------|---------------------------------------------------------------------|
+| ***thpool_init(4)*** | Will return a new threadpool with `4` threads. |
+| ***thpool_add_work(thpool, (void*)function_p, (void*)arg_p)*** | Will add new work to the pool. Work is simply a function. You can pass a single argument to the function if you wish. If not, `NULL` should be passed. |
+| ***thpool_wait(thpool)*** | Will wait for all jobs (both in queue and currently running) to finish. |
+| ***thpool_destroy(thpool)*** | This will destroy the threadpool. If jobs are currently being executed, then it will wait for them to finish. |
+| ***thpool_pause(thpool)*** | All threads in the threadpool will pause no matter if they are idle or executing work. |
+| ***thpool_resume(thpool)*** | If the threadpool is paused, then all threads will resume from where they were. |
http://git-wip-us.apache.org/repos/asf/celix/blob/1e73e4d1/utils/public/include/thpool.h
----------------------------------------------------------------------
diff --git a/utils/public/include/thpool.h b/utils/public/include/thpool.h
new file mode 100644
index 0000000..ab3063b
--- /dev/null
+++ b/utils/public/include/thpool.h
@@ -0,0 +1,164 @@
+/**********************************
+ * @author Johan Hanssen Seferidis
+ * License: MIT
+ *
+ **********************************/
+
+#ifndef _THPOOL_
+#define _THPOOL_
+
+
+
+
+
+/* =================================== API ======================================= */
+
+
+typedef struct thpool_* threadpool;
+
+
+/**
+ * @brief Initialize threadpool
+ *
+ * Initializes a threadpool. This function will not return untill all
+ * threads have initialized successfully.
+ *
+ * @example
+ *
+ * ..
+ * threadpool thpool; //First we declare a threadpool
+ * thpool = thpool_init(4); //then we initialize it to 4 threads
+ * ..
+ *
+ * @param num_threads number of threads to be created in the threadpool
+ * @return threadpool created threadpool on success,
+ * NULL on error
+ */
+threadpool thpool_init(int num_threads);
+
+
+/**
+ * @brief Add work to the job queue
+ *
+ * Takes an action and its argument and adds it to the threadpool's job queue.
+ * If you want to add to work a function with more than one arguments then
+ * a way to implement this is by passing a pointer to a structure.
+ *
+ * NOTICE: You have to cast both the function and argument to not get warnings.
+ *
+ * @example
+ *
+ * void print_num(int num){
+ * printf("%d\n", num);
+ * }
+ *
+ * int main() {
+ * ..
+ * int a = 10;
+ * thpool_add_work(thpool, (void*)print_num, (void*)a);
+ * ..
+ * }
+ *
+ * @param threadpool threadpool to which the work will be added
+ * @param function_p pointer to function to add as work
+ * @param arg_p pointer to an argument
+ * @return nothing
+ */
+int thpool_add_work(threadpool, void *(*function_p)(void*), void* arg_p);
+
+
+/**
+ * @brief Wait for all queued jobs to finish
+ *
+ * Will wait for all jobs - both queued and currently running to finish.
+ * Once the queue is empty and all work has completed, the calling thread
+ * (probably the main program) will continue.
+ *
+ * Smart polling is used in wait. The polling is initially 0 - meaning that
+ * there is virtually no polling at all. If after 1 seconds the threads
+ * haven't finished, the polling interval starts growing exponentially
+ * untill it reaches max_secs seconds. Then it jumps down to a maximum polling
+ * interval assuming that heavy processing is being used in the threadpool.
+ *
+ * @example
+ *
+ * ..
+ * threadpool thpool = thpool_init(4);
+ * ..
+ * // Add a bunch of work
+ * ..
+ * thpool_wait(thpool);
+ * puts("All added work has finished");
+ * ..
+ *
+ * @param threadpool the threadpool to wait for
+ * @return nothing
+ */
+void thpool_wait(threadpool);
+
+
+/**
+ * @brief Pauses all threads immediately
+ *
+ * The threads will be paused no matter if they are idle or working.
+ * The threads return to their previous states once thpool_resume
+ * is called.
+ *
+ * While the thread is being paused, new work can be added.
+ *
+ * @example
+ *
+ * threadpool thpool = thpool_init(4);
+ * thpool_pause(thpool);
+ * ..
+ * // Add a bunch of work
+ * ..
+ * thpool_resume(thpool); // Let the threads start their magic
+ *
+ * @param threadpool the threadpool where the threads should be paused
+ * @return nothing
+ */
+void thpool_pause(threadpool);
+
+
+/**
+ * @brief Unpauses all threads if they are paused
+ *
+ * @example
+ * ..
+ * thpool_pause(thpool);
+ * sleep(10); // Delay execution 10 seconds
+ * thpool_resume(thpool);
+ * ..
+ *
+ * @param threadpool the threadpool where the threads should be unpaused
+ * @return nothing
+ */
+void thpool_resume(threadpool);
+
+
+/**
+ * @brief Destroy the threadpool
+ *
+ * This will wait for the currently active threads to finish and then 'kill'
+ * the whole threadpool to free up memory.
+ *
+ * @example
+ * int main() {
+ * threadpool thpool1 = thpool_init(2);
+ * threadpool thpool2 = thpool_init(2);
+ * ..
+ * thpool_destroy(thpool1);
+ * ..
+ * return 0;
+ * }
+ *
+ * @param threadpool the threadpool to destroy
+ * @return nothing
+ */
+void thpool_destroy(threadpool);
+
+
+
+
+#endif