You are viewing a plain text version of this content. The canonical link for it is here.
Posted to axis-cvs@ws.apache.org by da...@apache.org on 2009/03/25 02:10:35 UTC

svn commit: r758111 - in /webservices/axis2/branches/c/new_thread_pool-25march2009/util: include/axutil_thread_pool.h include/platforms/unix/axutil_thread_unix.h src/Makefile.am src/platforms/unix/thread_unix.c src/thread_pool.c

Author: damitha
Date: Wed Mar 25 01:10:34 2009
New Revision: 758111

URL: http://svn.apache.org/viewvc?rev=758111&view=rev
Log:
Adding new thread pool code

Modified:
    webservices/axis2/branches/c/new_thread_pool-25march2009/util/include/axutil_thread_pool.h
    webservices/axis2/branches/c/new_thread_pool-25march2009/util/include/platforms/unix/axutil_thread_unix.h
    webservices/axis2/branches/c/new_thread_pool-25march2009/util/src/Makefile.am
    webservices/axis2/branches/c/new_thread_pool-25march2009/util/src/platforms/unix/thread_unix.c
    webservices/axis2/branches/c/new_thread_pool-25march2009/util/src/thread_pool.c

Modified: webservices/axis2/branches/c/new_thread_pool-25march2009/util/include/axutil_thread_pool.h
URL: http://svn.apache.org/viewvc/webservices/axis2/branches/c/new_thread_pool-25march2009/util/include/axutil_thread_pool.h?rev=758111&r1=758110&r2=758111&view=diff
==============================================================================
--- webservices/axis2/branches/c/new_thread_pool-25march2009/util/include/axutil_thread_pool.h (original)
+++ webservices/axis2/branches/c/new_thread_pool-25march2009/util/include/axutil_thread_pool.h Wed Mar 25 01:10:34 2009
@@ -28,6 +28,9 @@
 #include <axutil_allocator.h>
 #include <axutil_thread.h>
 
+/* maximum number of threads allowed in the pool */
+#define MAXT_IN_POOL 200
+
 #ifdef __cplusplus
 extern "C"
 {
@@ -48,8 +51,8 @@
      * @param data arguments to be passed to the function
      * @return pointer to a thread in ready state.
      */
-    AXIS2_EXTERN axutil_thread_t *AXIS2_CALL
-    axutil_thread_pool_get_thread(
+    AXIS2_EXTERN axis2_status_t AXIS2_CALL
+    axutil_thread_pool_dispatch(
         axutil_thread_pool_t * pool,
         axutil_thread_start_t func,
         void *data);
@@ -95,11 +98,13 @@
     /**
     * Initializes (creates) an thread_pool.
     * @param allocator user defined allocator for the memory allocation.
+    * @param size Size of the thread pool.
     * @return initialized thread_pool. NULL on error.
     */
     AXIS2_EXTERN axutil_thread_pool_t *AXIS2_CALL
     axutil_thread_pool_init(
-        axutil_allocator_t * allocator);
+        axutil_allocator_t * allocator,
+        int size);
 
     /**
      * This function can be used to initialize the environment in case of

Modified: webservices/axis2/branches/c/new_thread_pool-25march2009/util/include/platforms/unix/axutil_thread_unix.h
URL: http://svn.apache.org/viewvc/webservices/axis2/branches/c/new_thread_pool-25march2009/util/include/platforms/unix/axutil_thread_unix.h?rev=758111&r1=758110&r2=758111&view=diff
==============================================================================
--- webservices/axis2/branches/c/new_thread_pool-25march2009/util/include/platforms/unix/axutil_thread_unix.h (original)
+++ webservices/axis2/branches/c/new_thread_pool-25march2009/util/include/platforms/unix/axutil_thread_unix.h Wed Mar 25 01:10:34 2009
@@ -56,4 +56,10 @@
     pthread_mutex_t mutex;
 };
 
+struct axutil_thread_cond_t
+{
+    axutil_allocator_t *allocator;
+    pthread_cond_t cond;
+};
+
 #endif                          /* AXIS2_THREAD_UNIX_H */

Modified: webservices/axis2/branches/c/new_thread_pool-25march2009/util/src/Makefile.am
URL: http://svn.apache.org/viewvc/webservices/axis2/branches/c/new_thread_pool-25march2009/util/src/Makefile.am?rev=758111&r1=758110&r2=758111&view=diff
==============================================================================
--- webservices/axis2/branches/c/new_thread_pool-25march2009/util/src/Makefile.am (original)
+++ webservices/axis2/branches/c/new_thread_pool-25march2009/util/src/Makefile.am Wed Mar 25 01:10:34 2009
@@ -18,6 +18,7 @@
                         network_handler.c \
                         file.c\
                         uuid_gen.c\
+						pool_queue.c\
                         thread_pool.c \
                         property.c \
                         types.c \

Modified: webservices/axis2/branches/c/new_thread_pool-25march2009/util/src/platforms/unix/thread_unix.c
URL: http://svn.apache.org/viewvc/webservices/axis2/branches/c/new_thread_pool-25march2009/util/src/platforms/unix/thread_unix.c?rev=758111&r1=758110&r2=758111&view=diff
==============================================================================
--- webservices/axis2/branches/c/new_thread_pool-25march2009/util/src/platforms/unix/thread_unix.c (original)
+++ webservices/axis2/branches/c/new_thread_pool-25march2009/util/src/platforms/unix/thread_unix.c Wed Mar 25 01:10:34 2009
@@ -86,7 +86,7 @@
 }
 
 static void *
-dummy_worker(
+axutil_thread_dummy_worker(
     void *opaque)
 {
     axutil_thread_t *thread = (axutil_thread_t *) opaque;
@@ -129,10 +129,11 @@
         temp = NULL;
     }
 
-    if ((stat = pthread_create(new->td, temp, dummy_worker, new)) == 0)
+    if ((stat = pthread_create(new->td, temp, axutil_thread_dummy_worker, new)) == 0)
     {
         return new;
     }
+
     return NULL;
 }
 
@@ -345,3 +346,59 @@
     AXIS2_FREE(mutex->allocator, mutex);
     return AXIS2_SUCCESS;
 }
+
+AXIS2_EXTERN axutil_thread_cond_t *AXIS2_CALL
+axutil_thread_cond_create(
+    axutil_allocator_t * allocator,
+    unsigned int flags)
+{
+    axutil_thread_cond_t *new_cond = NULL;
+
+    new_cond = AXIS2_MALLOC(allocator, sizeof(axutil_thread_cond_t));
+    new_cond->allocator = allocator;
+
+    if (pthread_cond_init(&(new_cond->cond), NULL) != 0)
+    {
+        AXIS2_FREE(allocator, new_cond);
+        return NULL;
+    }
+    return new_cond;
+}
+
+AXIS2_EXTERN axis2_status_t AXIS2_CALL
+axutil_thread_cond_wait(
+    axutil_thread_cond_t *cond,
+    axutil_thread_mutex_t * mutex)
+{
+    if (pthread_cond_wait(&(cond->cond), &(mutex->mutex)) != 0)
+    {
+        return AXIS2_FAILURE;
+    }
+    return AXIS2_SUCCESS;
+}
+
+AXIS2_EXTERN axis2_status_t AXIS2_CALL
+axutil_thread_cond_signal(
+    axutil_thread_cond_t *cond)
+{
+    if (pthread_cond_signal(&(cond->cond)) != 0)
+    {
+        return AXIS2_FAILURE;
+    }
+    return AXIS2_SUCCESS;
+}
+
+
+AXIS2_EXTERN axis2_status_t AXIS2_CALL
+axutil_thread_cond_destroy(
+    axutil_thread_cond_t * cond)
+{
+    if (0 != pthread_cond_destroy(&(cond->cond)))
+    {
+        return AXIS2_FAILURE;
+    }
+    AXIS2_FREE(cond->allocator, cond);
+    return AXIS2_SUCCESS;
+}
+
+

Modified: webservices/axis2/branches/c/new_thread_pool-25march2009/util/src/thread_pool.c
URL: http://svn.apache.org/viewvc/webservices/axis2/branches/c/new_thread_pool-25march2009/util/src/thread_pool.c?rev=758111&r1=758110&r2=758111&view=diff
==============================================================================
--- webservices/axis2/branches/c/new_thread_pool-25march2009/util/src/thread_pool.c (original)
+++ webservices/axis2/branches/c/new_thread_pool-25march2009/util/src/thread_pool.c Wed Mar 25 01:10:34 2009
@@ -18,27 +18,164 @@
 #include <axutil_thread_pool.h>
 #include <axutil_env.h>
 #include <axutil_error_default.h>
+#include <axutil_pool_queue.h>
+#include <stdio.h>
+
+typedef enum 
+{
+    ALL_RUN, ALL_EXIT
+} poolstate_t;
 
 struct axutil_thread_pool
 {
     axutil_allocator_t *allocator;
+    /* Mutex to synchronize the read/write operations */
+    axutil_thread_mutex_t *mutex;
+    int poolsize;
+    axutil_thread_cond_t *job_posted; /* dispatcher: "Hey guys, there's a job!" */
+    axutil_thread_cond_t *job_taken; /* a worker: "Got it!" */
+    poolstate_t state; /* Threads check this before getting job. */
+    int arrsz; /* Number of entries in array. */
+    int live; /* Number of live threads in pool (when pool is being destroyed, live<=arrsz) */
+    axutil_pool_queue_head_t *queue; /* queue of work orders */
+    struct timeval created; /* When the threadpool was created. */
+    axutil_thread_t **array; /* threads themselves. */
 };
 
+/* Define the life of a working thread.*/
+
+static void * axutil_thread_pool_do_work(
+        axutil_thread_t *axis_thd,
+        void * owning_pool) 
+{
+    /* Convert pointer to owning pool to proper type. */
+    axutil_thread_pool_t *pool = (axutil_thread_pool_t *) owning_pool;
+  
+    /* Remember my creation sequence number */
+    /*int myid = pool->live;*/
+
+    /* When we get a posted job, we copy it into these local vars. */
+    axutil_thread_start_t myjob;
+    void *myarg;  
+    axutil_thread_start_t mycleaner;
+    void *mycleanarg;
+
+    /*pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS,NULL);
+    pthread_cleanup_push(pthread_mutex_unlock, (void *) &pool->mutex);*/
+
+    /* Grab mutex so we can begin waiting for a job */
+    axutil_thread_mutex_lock(pool->mutex);
+  
+    /* Main loop: wait for job posting, do job(s) ... forever */
+    for( ; ; ) 
+    {
+        while(axutil_pool_queue_is_job_available(pool->queue) == 0)
+        {
+	        axutil_thread_cond_wait(pool->job_posted, pool->mutex);
+        }
+  
+        /* We've just woken up and we have the mutex.  Check pool's state */
+        if (ALL_EXIT == pool->state)
+        {
+            break;
+        }
+    
+        /* while we find work to do */
+        axutil_pool_queue_get_work_order(pool->queue, &myjob, &myarg, &mycleaner, &mycleanarg);
+        axutil_thread_cond_signal(pool->job_taken);
+    
+        /* Yield mutex so other jobs can be posted */
+        axutil_thread_mutex_unlock(pool->mutex);
+    
+        /* Run the job we've taken */
+        /*if(mycleaner)
+        {
+            pthread_cleanup_push(mycleaner,mycleanarg);
+            myjob(axis_thd, myarg);
+            pthread_cleanup_pop(1);
+        }
+        else*/
+        {
+	        myjob(axis_thd, myarg);
+        }
+    
+        /* Grab mutex so we can grab posted job, or (if no job is posted) begin waiting for next 
+         * posting. 
+         */
+        axutil_thread_mutex_lock(pool->mutex);
+    }
+  
+    /* If we get here, we broke from loop because state is ALL_EXIT. */
+    --pool->live;
+  
+    /* We're not really taking a job ... but this signals the destroyer that one thread has exited, 
+     * so it can keep on destroying. pthread_cond_signal(&pool->job_taken);
+     */
+  
+    axutil_thread_mutex_unlock(pool->mutex);
+
+    /*pthread_cleanup_pop(1);*/
+    return NULL;
+}  
+
+
 AXIS2_EXTERN axutil_thread_pool_t *AXIS2_CALL
 axutil_thread_pool_init(
-    axutil_allocator_t *allocator)
+    axutil_allocator_t *allocator,
+    int poolsize)
 {
     axutil_thread_pool_t *pool = NULL;
+    int i = 0;
+
+    if ((poolsize <= 0) || (poolsize > MAXT_IN_POOL))
+    {
+        return NULL;
+    }
 
-    pool =
-        (axutil_thread_pool_t *) AXIS2_MALLOC(allocator,
-                                              sizeof(axutil_thread_pool_t));
+    pool = (axutil_thread_pool_t *) AXIS2_MALLOC(allocator, sizeof(axutil_thread_pool_t));
 
     if (!pool)
     {
         return NULL;
     }
     pool->allocator = allocator;
+    pool->mutex = axutil_thread_mutex_create(allocator, AXIS2_THREAD_MUTEX_DEFAULT);
+    pool->job_posted = axutil_thread_cond_create(allocator, AXIS2_THREAD_MUTEX_DEFAULT);
+    pool->job_taken = axutil_thread_cond_create(allocator, AXIS2_THREAD_MUTEX_DEFAULT);
+    pool->arrsz = poolsize;
+    pool->state = ALL_RUN;
+    pool->queue = axutil_pool_queue_make_queue(poolsize);
+    gettimeofday(pool->created, NULL);
+
+    /* create the array of threads within the pool */
+    pool->array = (axutil_thread_t **) AXIS2_MALLOC(allocator, pool->arrsz * sizeof(axutil_thread_t *));
+    if (!pool->array) 
+    {
+        AXIS2_FREE(allocator, pool);
+        pool = NULL;
+        return NULL;
+    }
+
+    /* bring each thread to life (update counters in loop so threads can
+     * access pool->live to find out their ID#
+     */
+    for (i = 0; i < pool->arrsz; ++i) 
+    {
+        axutil_thread_t *axis2_thd = NULL;
+
+        axis2_thd = axutil_thread_create(allocator, NULL, axutil_thread_pool_do_work, (void *) pool);
+        if(!axis2_thd)
+        {
+            AXIS2_FREE(allocator, pool);
+            return NULL;
+        }
+
+        pool->array[i] = axis2_thd;
+
+        pool->live = i+1;
+
+        axutil_thread_detach(pool->array[i]); /* automatic cleanup when thread exits. */
+    }
 
     return pool;
 }
@@ -57,23 +194,53 @@
     }
     AXIS2_FREE(pool->allocator, pool);
     return;
-}
 
-AXIS2_EXTERN axutil_thread_t *AXIS2_CALL
-axutil_thread_pool_get_thread(
+  }
+
+AXIS2_EXTERN axis2_status_t AXIS2_CALL
+axutil_thread_pool_dispatch(
     axutil_thread_pool_t *pool,
     axutil_thread_start_t func,
     void *data)
 {
+    /*int old_cancel;*/
+
     if (!pool)
     {
-        return NULL;
+        return AXIS2_FAILURE;
     }
     if (!pool->allocator)
     {
-        return NULL;
+        return AXIS2_FAILURE;
     }
-    return axutil_thread_create(pool->allocator, NULL, func, data);
+
+    if(pool == (axutil_thread_pool_t *) data)
+    {
+    }
+    else
+    {
+        /*pthread_cleanup_push(pthread_mutex_unlock, (void *) &pool->mutex);*/
+      
+        /* Grab the mutex */
+        axutil_thread_mutex_lock(pool->mutex);
+
+        while(!axutil_pool_queue_can_accept_work(pool->queue))
+        {
+            axutil_thread_cond_signal(pool->job_posted);
+            axutil_thread_cond_wait(pool->job_taken, pool->mutex);
+        }
+      
+        /* Finally, there's room to post a job. Do so and signal workers. */
+        axutil_pool_queue_add_work_order(pool->queue, func, data, NULL, NULL);
+
+        axutil_thread_cond_signal(pool->job_posted);
+      
+        /* Yield mutex so a worker can pick up the job */
+        axutil_thread_mutex_unlock(pool->mutex);
+        /*pthread_cleanup_pop(1);*/
+    }
+
+    return AXIS2_SUCCESS;
 }
 
 AXIS2_EXTERN axis2_status_t AXIS2_CALL