You are viewing a plain text version of this content. The canonical link for it is here.
Posted to axis-cvs@ws.apache.org by da...@apache.org on 2009/03/25 02:10:35 UTC
svn commit: r758111 - in
/webservices/axis2/branches/c/new_thread_pool-25march2009/util:
include/axutil_thread_pool.h include/platforms/unix/axutil_thread_unix.h
src/Makefile.am src/platforms/unix/thread_unix.c src/thread_pool.c
Author: damitha
Date: Wed Mar 25 01:10:34 2009
New Revision: 758111
URL: http://svn.apache.org/viewvc?rev=758111&view=rev
Log:
Adding new thread pool code
Modified:
webservices/axis2/branches/c/new_thread_pool-25march2009/util/include/axutil_thread_pool.h
webservices/axis2/branches/c/new_thread_pool-25march2009/util/include/platforms/unix/axutil_thread_unix.h
webservices/axis2/branches/c/new_thread_pool-25march2009/util/src/Makefile.am
webservices/axis2/branches/c/new_thread_pool-25march2009/util/src/platforms/unix/thread_unix.c
webservices/axis2/branches/c/new_thread_pool-25march2009/util/src/thread_pool.c
Modified: webservices/axis2/branches/c/new_thread_pool-25march2009/util/include/axutil_thread_pool.h
URL: http://svn.apache.org/viewvc/webservices/axis2/branches/c/new_thread_pool-25march2009/util/include/axutil_thread_pool.h?rev=758111&r1=758110&r2=758111&view=diff
==============================================================================
--- webservices/axis2/branches/c/new_thread_pool-25march2009/util/include/axutil_thread_pool.h (original)
+++ webservices/axis2/branches/c/new_thread_pool-25march2009/util/include/axutil_thread_pool.h Wed Mar 25 01:10:34 2009
@@ -28,6 +28,9 @@
#include <axutil_allocator.h>
#include <axutil_thread.h>
+/* maximum number of threads allowed in the pool */
+#define MAXT_IN_POOL 200
+
#ifdef __cplusplus
extern "C"
{
@@ -48,8 +51,8 @@
* @param data arguments to be passed to the function
* @return pointer to a thread in ready state.
*/
- AXIS2_EXTERN axutil_thread_t *AXIS2_CALL
- axutil_thread_pool_get_thread(
+ AXIS2_EXTERN axis2_status_t AXIS2_CALL
+ axutil_thread_pool_dispatch(
axutil_thread_pool_t * pool,
axutil_thread_start_t func,
void *data);
@@ -95,11 +98,13 @@
/**
* Initializes (creates) an thread_pool.
* @param allocator user defined allocator for the memory allocation.
+ * @param size Size of the thread pool.
* @return initialized thread_pool. NULL on error.
*/
AXIS2_EXTERN axutil_thread_pool_t *AXIS2_CALL
axutil_thread_pool_init(
- axutil_allocator_t * allocator);
+ axutil_allocator_t * allocator,
+ int size);
/**
* This function can be used to initialize the environment in case of
Modified: webservices/axis2/branches/c/new_thread_pool-25march2009/util/include/platforms/unix/axutil_thread_unix.h
URL: http://svn.apache.org/viewvc/webservices/axis2/branches/c/new_thread_pool-25march2009/util/include/platforms/unix/axutil_thread_unix.h?rev=758111&r1=758110&r2=758111&view=diff
==============================================================================
--- webservices/axis2/branches/c/new_thread_pool-25march2009/util/include/platforms/unix/axutil_thread_unix.h (original)
+++ webservices/axis2/branches/c/new_thread_pool-25march2009/util/include/platforms/unix/axutil_thread_unix.h Wed Mar 25 01:10:34 2009
@@ -56,4 +56,10 @@
pthread_mutex_t mutex;
};
+struct axutil_thread_cond_t
+{
+ axutil_allocator_t *allocator;
+ pthread_cond_t cond;
+};
+
#endif /* AXIS2_THREAD_UNIX_H */
Modified: webservices/axis2/branches/c/new_thread_pool-25march2009/util/src/Makefile.am
URL: http://svn.apache.org/viewvc/webservices/axis2/branches/c/new_thread_pool-25march2009/util/src/Makefile.am?rev=758111&r1=758110&r2=758111&view=diff
==============================================================================
--- webservices/axis2/branches/c/new_thread_pool-25march2009/util/src/Makefile.am (original)
+++ webservices/axis2/branches/c/new_thread_pool-25march2009/util/src/Makefile.am Wed Mar 25 01:10:34 2009
@@ -18,6 +18,7 @@
network_handler.c \
file.c\
uuid_gen.c\
+ pool_queue.c\
thread_pool.c \
property.c \
types.c \
Modified: webservices/axis2/branches/c/new_thread_pool-25march2009/util/src/platforms/unix/thread_unix.c
URL: http://svn.apache.org/viewvc/webservices/axis2/branches/c/new_thread_pool-25march2009/util/src/platforms/unix/thread_unix.c?rev=758111&r1=758110&r2=758111&view=diff
==============================================================================
--- webservices/axis2/branches/c/new_thread_pool-25march2009/util/src/platforms/unix/thread_unix.c (original)
+++ webservices/axis2/branches/c/new_thread_pool-25march2009/util/src/platforms/unix/thread_unix.c Wed Mar 25 01:10:34 2009
@@ -86,7 +86,7 @@
}
static void *
-dummy_worker(
+axutil_thread_dummy_worker(
void *opaque)
{
axutil_thread_t *thread = (axutil_thread_t *) opaque;
@@ -129,10 +129,11 @@
temp = NULL;
}
- if ((stat = pthread_create(new->td, temp, dummy_worker, new)) == 0)
+ if ((stat = pthread_create(new->td, temp, axutil_thread_dummy_worker, new)) == 0)
{
return new;
}
+
return NULL;
}
@@ -345,3 +346,59 @@
AXIS2_FREE(mutex->allocator, mutex);
return AXIS2_SUCCESS;
}
+
+AXIS2_EXTERN axutil_thread_cond_t *AXIS2_CALL
+axutil_thread_cond_create(
+ axutil_allocator_t * allocator,
+ unsigned int flags)
+{
+ axutil_thread_cond_t *new_cond = NULL;
+
+ new_cond = AXIS2_MALLOC(allocator, sizeof(axutil_thread_cond_t));
+ new_cond->allocator = allocator;
+
+ if (pthread_cond_init(&(new_cond->cond), NULL) != 0)
+ {
+ AXIS2_FREE(allocator, new_cond);
+ return NULL;
+ }
+ return new_cond;
+}
+
+AXIS2_EXTERN axis2_status_t AXIS2_CALL
+axutil_thread_cond_wait(
+ axutil_thread_cond_t *cond,
+ axutil_thread_mutex_t * mutex)
+{
+ if (pthread_cond_wait(&(cond->cond), &(mutex->mutex)) != 0)
+ {
+ return AXIS2_FAILURE;
+ }
+ return AXIS2_SUCCESS;
+}
+
+AXIS2_EXTERN axis2_status_t AXIS2_CALL
+axutil_thread_cond_signal(
+ axutil_thread_cond_t *cond)
+{
+ if (pthread_cond_signal(&(cond->cond)) != 0)
+ {
+ return AXIS2_FAILURE;
+ }
+ return AXIS2_SUCCESS;
+}
+
+
+AXIS2_EXTERN axis2_status_t AXIS2_CALL
+axutil_thread_cond_destroy(
+ axutil_thread_cond_t * cond)
+{
+ if (0 != pthread_cond_destroy(&(cond->cond)))
+ {
+ return AXIS2_FAILURE;
+ }
+ AXIS2_FREE(cond->allocator, cond);
+ return AXIS2_SUCCESS;
+}
+
+
Modified: webservices/axis2/branches/c/new_thread_pool-25march2009/util/src/thread_pool.c
URL: http://svn.apache.org/viewvc/webservices/axis2/branches/c/new_thread_pool-25march2009/util/src/thread_pool.c?rev=758111&r1=758110&r2=758111&view=diff
==============================================================================
--- webservices/axis2/branches/c/new_thread_pool-25march2009/util/src/thread_pool.c (original)
+++ webservices/axis2/branches/c/new_thread_pool-25march2009/util/src/thread_pool.c Wed Mar 25 01:10:34 2009
@@ -18,27 +18,164 @@
#include <axutil_thread_pool.h>
#include <axutil_env.h>
#include <axutil_error_default.h>
+#include <axutil_pool_queue.h>
+#include <stdio.h>
+
+typedef enum
+{
+ ALL_RUN, ALL_EXIT
+} poolstate_t;
struct axutil_thread_pool
{
axutil_allocator_t *allocator;
+ /* Mutex to synchronize the read/write operations */
+ axutil_thread_mutex_t *mutex;
+ int poolsize;
+ axutil_thread_cond_t *job_posted; /* dispatcher: "Hey guys, there's a job!" */
+ axutil_thread_cond_t *job_taken; /* a worker: "Got it!" */
+ poolstate_t state; /* Threads check this before getting job. */
+ int arrsz; /* Number of entries in array. */
+ int live; /* Number of live threads in pool (when pool is being destroyed, live<=arrsz) */
+ axutil_pool_queue_head_t *queue; /* queue of work orders */
+ struct timeval created; /* When the threadpool was created. */
+ axutil_thread_t **array; /* threads themselves. */
};
+/* Define the life of a working thread.*/
+
+static void * axutil_thread_pool_do_work(
+ axutil_thread_t *axis_thd,
+ void * owning_pool)
+{
+ /* Convert pointer to owning pool to proper type. */
+ axutil_thread_pool_t *pool = (axutil_thread_pool_t *) owning_pool;
+
+ /* Remember my creation sequence number */
+ /*int myid = pool->live;*/
+
+ /* When we get a posted job, we copy it into these local vars. */
+ axutil_thread_start_t myjob;
+ void *myarg;
+ axutil_thread_start_t mycleaner;
+ void *mycleanarg;
+
+ /*pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS,NULL);
+ pthread_cleanup_push(pthread_mutex_unlock, (void *) &pool->mutex);*/
+
+ /* Grab mutex so we can begin waiting for a job */
+ axutil_thread_mutex_lock(pool->mutex);
+
+ /* Main loop: wait for job posting, do job(s) ... forever */
+ for( ; ; )
+ {
+ while(axutil_pool_queue_is_job_available(pool->queue) == 0)
+ {
+ axutil_thread_cond_wait(pool->job_posted, pool->mutex);
+ }
+
+ /* We've just woken up and we have the mutex. Check pool's state */
+ if (ALL_EXIT == pool->state)
+ {
+ break;
+ }
+
+ /* while we find work to do */
+ axutil_pool_queue_get_work_order(pool->queue, &myjob, &myarg, &mycleaner, &mycleanarg);
+ axutil_thread_cond_signal(pool->job_taken);
+
+ /* Yield mutex so other jobs can be posted */
+ axutil_thread_mutex_unlock(pool->mutex);
+
+ /* Run the job we've taken */
+ /*if(mycleaner)
+ {
+ pthread_cleanup_push(mycleaner,mycleanarg);
+ myjob(axis_thd, myarg);
+ pthread_cleanup_pop(1);
+ }
+ else*/
+ {
+ myjob(axis_thd, myarg);
+ }
+
+ /* Grab mutex so we can grab posted job, or (if no job is posted) begin waiting for next
+ * posting.
+ */
+ axutil_thread_mutex_lock(pool->mutex);
+ }
+
+ /* If we get here, we broke from loop because state is ALL_EXIT. */
+ --pool->live;
+
+ /* We're not really taking a job ... but this signals the destroyer that one thread has exited,
+ * so it can keep on destroying. pthread_cond_signal(&pool->job_taken);
+ */
+
+ axutil_thread_mutex_unlock(pool->mutex);
+
+ /*pthread_cleanup_pop(1);*/
+ return NULL;
+}
+
+
AXIS2_EXTERN axutil_thread_pool_t *AXIS2_CALL
axutil_thread_pool_init(
- axutil_allocator_t *allocator)
+ axutil_allocator_t *allocator,
+ int poolsize)
{
axutil_thread_pool_t *pool = NULL;
+ int i = 0;
+
+ if ((poolsize <= 0) || (poolsize > MAXT_IN_POOL))
+ {
+ return NULL;
+ }
- pool =
- (axutil_thread_pool_t *) AXIS2_MALLOC(allocator,
- sizeof(axutil_thread_pool_t));
+ pool = (axutil_thread_pool_t *) AXIS2_MALLOC(allocator, sizeof(axutil_thread_pool_t));
if (!pool)
{
return NULL;
}
pool->allocator = allocator;
+ pool->mutex = axutil_thread_mutex_create(allocator, AXIS2_THREAD_MUTEX_DEFAULT);
+ pool->job_posted = axutil_thread_cond_create(allocator, AXIS2_THREAD_MUTEX_DEFAULT);
+ pool->job_taken = axutil_thread_cond_create(allocator, AXIS2_THREAD_MUTEX_DEFAULT);
+ pool->arrsz = poolsize;
+ pool->state = ALL_RUN;
+ pool->queue = axutil_pool_queue_make_queue(poolsize);
+ gettimeofday(pool->created, NULL);
+
+ /* create the array of threads within the pool */
+ pool->array = (axutil_thread_t **) AXIS2_MALLOC(allocator, pool->arrsz * sizeof(axutil_thread_t *));
+ if (!pool->array)
+ {
+ AXIS2_FREE(allocator, pool);
+ pool = NULL;
+ return NULL;
+ }
+
+ /* bring each thread to life (update counters in loop so threads can
+ * access pool->live to find out their ID#
+ */
+ for (i = 0; i < pool->arrsz; ++i)
+ {
+ axutil_thread_t *axis2_thd = NULL;
+
+ axis2_thd = axutil_thread_create(allocator, NULL, axutil_thread_pool_do_work, (void *) pool);
+ if(!axis2_thd)
+ {
+ AXIS2_FREE(allocator, pool);
+ return NULL;
+ }
+
+ pool->array[i] = axis2_thd;
+
+ pool->live = i+1;
+
+ axutil_thread_detach(pool->array[i]); /* automatic cleanup when thread exits. */
+ }
return pool;
}
@@ -57,23 +194,53 @@
}
AXIS2_FREE(pool->allocator, pool);
return;
-}
-AXIS2_EXTERN axutil_thread_t *AXIS2_CALL
-axutil_thread_pool_get_thread(
+ }
+
+AXIS2_EXTERN axis2_status_t AXIS2_CALL
+axutil_thread_pool_dispatch(
axutil_thread_pool_t *pool,
axutil_thread_start_t func,
void *data)
{
+ /*int old_cancel;*/
+
if (!pool)
{
- return NULL;
+ return AXIS2_FAILURE;
}
if (!pool->allocator)
{
- return NULL;
+ return AXIS2_FAILURE;
}
- return axutil_thread_create(pool->allocator, NULL, func, data);
+
+ if(pool == (axutil_thread_pool_t *) data)
+ {
+ }
+ else
+ {
+ /*pthread_cleanup_push(pthread_mutex_unlock, (void *) &pool->mutex);*/
+
+ /* Grab the mutex */
+ axutil_thread_mutex_lock(pool->mutex);
+
+ while(!axutil_pool_queue_can_accept_work(pool->queue))
+ {
+ axutil_thread_cond_signal(pool->job_posted);
+ axutil_thread_cond_wait(pool->job_taken, pool->mutex);
+ }
+
+ /* Finally, there's room to post a job. Do so and signal workers. */
+ axutil_pool_queue_add_work_order(pool->queue, func, data, NULL, NULL);
+
+ axutil_thread_cond_signal(pool->job_posted);
+
+ /* Yield mutex so a worker can pick up the job */
+ axutil_thread_mutex_unlock(pool->mutex);
+ /*pthread_cleanup_pop(1);*/
+ }
+
+ return AXIS2_SUCCESS;
}
AXIS2_EXTERN axis2_status_t AXIS2_CALL