You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celix.apache.org by bp...@apache.org on 2015/12/14 14:01:42 UTC
[2/2] celix git commit: CELIX-77: update to latest thpool.c version
(including patch to support osx)
CELIX-77: update to latest thpool.c version (including patch to support osx)
Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/c5bc7496
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/c5bc7496
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/c5bc7496
Branch: refs/heads/develop
Commit: c5bc7496c6db8ed6296bb973b051c8cd49a4568f
Parents: 6a2050f
Author: Bjoern Petri <bp...@apache.org>
Authored: Mon Dec 14 14:01:05 2015 +0100
Committer: Bjoern Petri <bp...@apache.org>
Committed: Mon Dec 14 14:01:05 2015 +0100
----------------------------------------------------------------------
utils/private/src/thpool.c | 72 +++++++++++------------------------------
1 file changed, 19 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/celix/blob/c5bc7496/utils/private/src/thpool.c
----------------------------------------------------------------------
diff --git a/utils/private/src/thpool.c b/utils/private/src/thpool.c
index 845c291..5a58218 100644
--- a/utils/private/src/thpool.c
+++ b/utils/private/src/thpool.c
@@ -15,10 +15,7 @@
#include <stdlib.h>
#include <pthread.h>
#include <errno.h>
-#include <time.h>
-#ifdef LINUX
-#include <sys/prctl.h>
-#endif
+#include <time.h>
#include "thpool.h"
#ifdef THPOOL_DEBUG
@@ -27,16 +24,11 @@
#define THPOOL_DEBUG 0
#endif
-#define MAX_NANOSEC 999999999
-#define CEIL(X) ((X-(int)(X)) > 0 ? (int)(X+1) : (int)(X))
-
static volatile int threads_keepalive;
static volatile int threads_on_hold;
-
-
/* ========================== STRUCTURES ============================ */
@@ -80,6 +72,7 @@ typedef struct thpool_{
volatile int num_threads_alive; /* threads currently alive */
volatile int num_threads_working; /* threads currently working */
pthread_mutex_t thcount_lock; /* used for thread count etc */
+ pthread_cond_t threads_all_idle; /* signal to thpool_wait */
jobqueue* jobqueue_p; /* pointer to the job queue */
} thpool_;
@@ -152,6 +145,7 @@ struct thpool_* thpool_init(int num_threads){
}
pthread_mutex_init(&(thpool_p->thcount_lock), NULL);
+ pthread_cond_init(&thpool_p->threads_all_idle, NULL);
/* Thread init */
int n;
@@ -193,46 +187,11 @@ int thpool_add_work(thpool_* thpool_p, void *(*function_p)(void*), void* arg_p){
/* Wait until all jobs have finished */
void thpool_wait(thpool_* thpool_p){
-
- /* Continuous polling */
- double timeout = 1.0;
- time_t start, end;
- double tpassed = 0.0;
- time (&start);
- while (tpassed < timeout &&
- (thpool_p->jobqueue_p->len || thpool_p->num_threads_working))
- {
- time (&end);
- tpassed = difftime(end,start);
- }
-
- /* Exponential polling */
- long init_nano = 1; /* MUST be above 0 */
- long new_nano;
- double multiplier = 1.01;
- int max_secs = 20;
-
- struct timespec polling_interval;
- polling_interval.tv_sec = 0;
- polling_interval.tv_nsec = init_nano;
-
- while (thpool_p->jobqueue_p->len || thpool_p->num_threads_working)
- {
- nanosleep(&polling_interval, NULL);
- if ( polling_interval.tv_sec < max_secs ){
- new_nano = CEIL(polling_interval.tv_nsec * multiplier);
- polling_interval.tv_nsec = new_nano % MAX_NANOSEC;
- if ( new_nano > MAX_NANOSEC ) {
- polling_interval.tv_sec ++;
- }
- }
- else break;
- }
-
- /* Fall back to max polling */
- while (thpool_p->jobqueue_p->len || thpool_p->num_threads_working){
- sleep(max_secs);
+ pthread_mutex_lock(&thpool_p->thcount_lock);
+ while (thpool_p->jobqueue_p->len || thpool_p->num_threads_working) {
+ pthread_cond_wait(&thpool_p->threads_all_idle, &thpool_p->thcount_lock);
}
+ pthread_mutex_unlock(&thpool_p->thcount_lock);
}
@@ -337,11 +296,17 @@ static void thread_hold () {
* @return nothing
*/
static void* thread_do(struct thread* thread_p){
-#ifdef LINUX
+
/* Set thread name for profiling and debuging */
char thread_name[128] = {0};
sprintf(thread_name, "thread-pool-%d", thread_p->id);
- prctl(PR_SET_NAME, thread_name);
+
+#if defined(__linux__)
+ pthread_setname_np(thread_p->pthread, thread_name);
+#elif defined(__APPLE__) && defined(__MACH__)
+ pthread_setname_np(thread_name);
+#else
+ fprintf(stderr, "thread_do(): pthread_setname_np is not supported on this system");
#endif
/* Assure all threads have been created before starting serving */
@@ -349,10 +314,8 @@ static void* thread_do(struct thread* thread_p){
/* Register signal handler */
struct sigaction act;
-
sigemptyset(&act.sa_mask);
- sigaddset(&act.sa_mask, SIGUSR1);
-
+ act.sa_flags = 0;
act.sa_handler = thread_hold;
if (sigaction(SIGUSR1, &act, NULL) == -1) {
fprintf(stderr, "thread_do(): cannot handle SIGUSR1");
@@ -389,6 +352,9 @@ static void* thread_do(struct thread* thread_p){
pthread_mutex_lock(&thpool_p->thcount_lock);
thpool_p->num_threads_working--;
+ if (!thpool_p->num_threads_working) {
+ pthread_cond_signal(&thpool_p->threads_all_idle);
+ }
pthread_mutex_unlock(&thpool_p->thcount_lock);
}