You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apr.apache.org by "William A. Rowe, Jr." <wr...@rowe-clan.net> on 2007/01/04 01:15:52 UTC

Re: [PATCH] apr_threadpool

Henry,

thank you for your submission.

http://svn.apache.org/viewvc?view=rev&revision=492362

Please remember, the API is flexible until 1.3.0 (or 2.0.0) is tagged and
released, at which point the new files become subject to the (rather strict)
versioning rules.

Yours,

Bill

Henry Jen wrote:
> Henry Jen wrote:
>> Hi,
>>
>> Attached please find the patch for thread pool implementation, looking
>> forward to see it get committed.
>>
> 
> I just realized that I sent the wrong patch, which did not drop the
> copyright notice. Attached is the correct patch. :-)
> 
> Just want to make sure the consensus is the code is ready for commit and
> is now simply waiting some committer's love.
> 
> Cheers,
> Henry
> 
> 
> 
> 
> ------------------------------------------------------------------------
> 
> Index: aprutil.dsp
> ===================================================================
> --- aprutil.dsp	(revision 453014)
> +++ aprutil.dsp	(working copy)
> @@ -240,6 +240,10 @@
>  # PROP Default_Filter ""
>  # Begin Source File
>  
> +SOURCE=.\misc\apr_thread_pool.c
> +# End Source File
> +# Begin Source File
> +
>  SOURCE=.\misc\apr_date.c
>  # End Source File
>  # Begin Source File
> @@ -512,6 +516,10 @@
>  # End Source File
>  # Begin Source File
>  
> +SOURCE=.\include\apr_thread_pool.h
> +# End Source File
> +# Begin Source File
> +
>  SOURCE=.\include\apr_date.h
>  # End Source File
>  # Begin Source File
> Index: include/apr_thread_pool.h
> ===================================================================
> --- include/apr_thread_pool.h	(revision 0)
> +++ include/apr_thread_pool.h	(revision 0)
> @@ -0,0 +1,240 @@
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed
> + * with this work for additional information regarding copyright
> + * ownership.  The ASF licenses this file to you under the Apache
> + * License, Version 2.0 (the "License"); you may not use this file
> + * except in compliance with the License.  You may obtain a copy of
> + * the License at
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> + * implied.  See the License for the specific language governing
> + * permissions and limitations under the License.
> + */
> +
> +#ifndef APR_THREAD_POOL_H
> +#define APR_THREAD_POOL_H
> +
> +#include "apr.h"
> +#include "apr_thread_proc.h"
> +
> +/**
> + * @file apr_thread_pool.h
> + * @brief APR Thread Pool Library
> +
> + * @remarks This library implements a thread pool using apr_thread_t. A thread
> + * pool is a set of threads that can be created in advance or on demand until a
> + * maximum number. When a task is scheduled, the thread pool will find an idle
> + * thread to handle the task. In case all existing threads are busy and the
> + * number of tasks in the queue is higher than the adjustable threshold, the
> + * pool will try to create a new thread to serve the task if the maximum number
> + * has not been reached. Otherwise, the task will be put into a queue based on
> + * priority, which can be valued from 0 to 255, with higher value been served
> + * first. In case there are tasks with the same priority, the new task is put at
> + * the top or the bottom depeneds on which function is used to put the task.
> + *
> + * @remarks There may be the case that a thread pool can use up the maximum
> + * number of threads at peak load, but having those threads idle afterwards. A
> + * maximum number of idle threads can be set so that extra idling threads will
> + * be terminated to save system resrouces. 
> + */
> +#if APR_HAS_THREADS
> +
> +#ifdef __cplusplus
> +extern "C"
> +{
> +#if 0
> +};
> +#endif
> +#endif /* __cplusplus */
> +
> +/** Opaque Thread Pool structure. */
> +typedef struct apr_thread_pool apr_thread_pool_t;
> +
> +#define APR_THREAD_TASK_PRIORITY_LOWEST 0
> +#define APR_THREAD_TASK_PRIORITY_LOW 63
> +#define APR_THREAD_TASK_PRIORITY_NORMAL 127
> +#define APR_THREAD_TASK_PRIORITY_HIGH 191
> +#define APR_THREAD_TASK_PRIORITY_HIGHEST 255
> +
> +/**
> + * Create a thread pool
> + * @param me A pointer points to the pointer receives the created
> + * apr_thread_pool object. The returned value will be NULL if failed to create
> + * the thread pool.
> + * @param init_threads The number of threads to be created initially, the number
> + * will also be used as the initial value for maximum number of idle threads. 
> + * @param max_threads The maximum number of threads that can be created
> + * @param pool The pool to use
> + * @return APR_SUCCESS if the thread pool was created successfully. Otherwise,
> + * the error code.
> + */
> +APR_DECLARE(apr_status_t) apr_thread_pool_create(apr_thread_pool_t ** me,
> +                                                 apr_size_t init_threads,
> +                                                 apr_size_t max_threads,
> +                                                 apr_pool_t * pool);
> +
> +/**
> + * Destroy the thread pool and stop all the threads
> + * @return APR_SUCCESS if all threads are stopped.
> + */
> +APR_DECLARE(apr_status_t) apr_thread_pool_destroy(apr_thread_pool_t * me);
> +
> +/**
> + * Schedule a task to the bottom of the tasks of same priority.
> + * @param me The thread pool
> + * @param func The task function
> + * @param param The parameter for the task function
> + * @param priority The priority of the task.
> + * @param owner Owner of this task.
> + * @return APR_SUCCESS if the task had been scheduled successfully
> + */
> +APR_DECLARE(apr_status_t) apr_thread_pool_push(apr_thread_pool_t * me,
> +                                               apr_thread_start_t func,
> +                                               void *param,
> +                                               apr_byte_t priority,
> +                                               void *owner);
> +/**
> + * Schedule a task to be run after a delay
> + * @param me The thread pool
> + * @param func The task function
> + * @param param The parameter for the task function
> + * @param time Time in microseconds
> + * @param owner Owner of this task.
> + * @return APR_SUCCESS if the task had been scheduled successfully
> + */
> +APR_DECLARE(apr_status_t) apr_thread_pool_schedule(apr_thread_pool_t * me,
> +                                                   apr_thread_start_t func,
> +                                                   void *param,
> +                                                   apr_interval_time_t time,
> +                                                   void *owner);
> +
> +/**
> + * Schedule a task to the top of the tasks of same priority.
> + * @param me The thread pool
> + * @param func The task function
> + * @param param The parameter for the task function
> + * @param priority The priority of the task.
> + * @param owner Owner of this task.
> + * @return APR_SUCCESS if the task had been scheduled successfully
> + */
> +APR_DECLARE(apr_status_t) apr_thread_pool_top(apr_thread_pool_t * me,
> +                                              apr_thread_start_t func,
> +                                              void *param,
> +                                              apr_byte_t priority,
> +                                              void *owner);
> +
> +/**
> + * Cancel tasks submitted by the owner. If there is any task from the owner is
> + * currently under process, the function will spin until the task finished.
> + * @param me The thread pool
> + * @param owner Owner of the task
> + * @return APR_SUCCESS if the task has been cancelled successfully
> + * @note The task function should not be calling cancel, otherwise the function
> + * may get stuck forever. The function assert if it detect such a case.
> + */
> +APR_DECLARE(apr_status_t) apr_thread_pool_tasks_cancel(apr_thread_pool_t * me,
> +                                                       void *owner);
> +
> +/**
> + * Get current number of tasks waiting in the queue
> + * @param me The thread pool
> + * @return Number of tasks in the queue
> + */
> +APR_DECLARE(apr_size_t) apr_thread_pool_tasks_count(apr_thread_pool_t * me);
> +
> +/**
> + * Get current number of scheduled tasks waiting in the queue
> + * @param me The thread pool
> + * @return Number of scheduled tasks in the queue
> + */
> +APR_DECLARE(apr_size_t)
> +    apr_thread_pool_scheduled_tasks_count(apr_thread_pool_t * me);
> +
> +/**
> + * Get current number of threads
> + * @param me The thread pool
> + * @return Number of total threads
> + */
> +APR_DECLARE(apr_size_t) apr_thread_pool_threads_count(apr_thread_pool_t * me);
> +
> +/**
> + * Get current number of busy threads
> + * @param me The thread pool
> + * @return Number of busy threads
> + */
> +APR_DECLARE(apr_size_t) apr_thread_pool_busy_count(apr_thread_pool_t * me);
> +
> +/**
> + * Get current number of idling thread
> + * @param me The thread pool
> + * @return Number of idling threads
> + */
> +APR_DECLARE(apr_size_t) apr_thread_pool_idle_count(apr_thread_pool_t * me);
> +
> +/**
> + * Access function for the maximum number of idling thread. Number of current
> + * idle threads will be reduced to the new limit.
> + * @param me The thread pool
> + * @param cnt The number
> + * @return The number of threads were stopped.
> + */
> +APR_DECLARE(apr_size_t) apr_thread_pool_idle_max_set(apr_thread_pool_t * me,
> +                                                     apr_size_t cnt);
> +
> +/**
> + * Access function for the maximum number of idling thread
> + * @param me The thread pool
> + * @return The current maximum number
> + */
> +APR_DECLARE(apr_size_t) apr_thread_pool_idle_max_get(apr_thread_pool_t * me);
> +
> +/**
> + * Access function for the maximum number of thread. 
> + * @param me The thread pool
> + * @param cnt The number
> + * @return The original maximum number of threads
> + */
> +APR_DECLARE(apr_size_t) apr_thread_pool_thread_max_set(apr_thread_pool_t * me,
> +                                                       apr_size_t cnt);
> +
> +/**
> + * Access function for the maximum number of threads
> + * @param me The thread pool
> + * @return The current maximum number
> + */
> +APR_DECLARE(apr_size_t) apr_thread_pool_thread_max_get(apr_thread_pool_t *
> +                                                       me);
> +
> +/**
> + * Access function for the threshold of tasks in queue to trigger a new thread. 
> + * @param me The thread pool
> + * @param cnt The new threshold
> + * @return The original threshold
> + */
> +APR_DECLARE(apr_size_t) apr_thread_pool_threshold_set(apr_thread_pool_t * me,
> +                                                      apr_size_t val);
> +
> +/**
> + * Access function for the threshold of tasks in queue to trigger a new thread. 
> + * @param me The thread pool
> + * @return The current threshold
> + */
> +APR_DECLARE(apr_size_t) apr_thread_pool_threshold_get(apr_thread_pool_t * me);
> +
> +#ifdef __cplusplus
> +#if 0
> +{
> +#endif
> +}
> +#endif
> +
> +#endif /* APR_HAS_THREADS */
> +
> +#endif /* APR_THREAD_POOL_H */
> +
> +/* vim: set ts=4 sw=4 et cin tw=80: */
> Index: misc/apr_thread_pool.c
> ===================================================================
> --- misc/apr_thread_pool.c	(revision 0)
> +++ misc/apr_thread_pool.c	(revision 0)
> @@ -0,0 +1,817 @@
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed
> + * with this work for additional information regarding copyright
> + * ownership.  The ASF licenses this file to you under the Apache
> + * License, Version 2.0 (the "License"); you may not use this file
> + * except in compliance with the License.  You may obtain a copy of
> + * the License at
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> + * implied.  See the License for the specific language governing
> + * permissions and limitations under the License.
> + */
> +
> +#include <assert.h>
> +#include "apr_thread_pool.h"
> +#include "apr_ring.h"
> +#include "apr_thread_cond.h"
> +#include "apr_portable.h"
> +
> +#if APR_HAS_THREADS
> +
> +#define TASK_PRIORITY_SEGS 4
> +#define TASK_PRIORITY_SEG(x) (((x)->dispatch.priority & 0xFF) / 64)
> +
> +typedef struct apr_thread_pool_task
> +{
> +    APR_RING_ENTRY(apr_thread_pool_task) link;
> +    apr_thread_start_t func;
> +    void *param;
> +    void *owner;
> +    union
> +    {
> +        apr_byte_t priority;
> +        apr_time_t time;
> +    } dispatch;
> +} apr_thread_pool_task_t;
> +
> +APR_RING_HEAD(apr_thread_pool_tasks, apr_thread_pool_task);
> +
> +struct apr_thread_list_elt
> +{
> +    APR_RING_ENTRY(apr_thread_list_elt) link;
> +    apr_thread_t *thd;
> +    volatile void *current_owner;
> +    volatile int stop;
> +};
> +
> +APR_RING_HEAD(apr_thread_list, apr_thread_list_elt);
> +
> +struct apr_thread_pool
> +{
> +    apr_pool_t *pool;
> +    volatile apr_size_t thd_max;
> +    volatile apr_size_t idle_max;
> +    volatile apr_size_t thd_cnt;
> +    volatile apr_size_t idle_cnt;
> +    volatile apr_size_t task_cnt;
> +    volatile apr_size_t scheduled_task_cnt;
> +    volatile apr_size_t threshold;
> +    struct apr_thread_pool_tasks *tasks;
> +    struct apr_thread_pool_tasks *scheduled_tasks;
> +    struct apr_thread_list *busy_thds;
> +    struct apr_thread_list *idle_thds;
> +    apr_thread_mutex_t *lock;
> +    apr_thread_mutex_t *cond_lock;
> +    apr_thread_cond_t *cond;
> +    volatile int terminated;
> +    struct apr_thread_pool_tasks *recycled_tasks;
> +    apr_thread_pool_task_t *task_idx[TASK_PRIORITY_SEGS];
> +};
> +
> +static apr_status_t thread_pool_construct(apr_thread_pool_t * me,
> +                                          apr_size_t init_threads,
> +                                          apr_size_t max_threads)
> +{
> +    apr_status_t rv;
> +    int i;
> +
> +    me->thd_max = max_threads;
> +    me->idle_max = init_threads;
> +    me->threshold = init_threads / 2;
> +    rv = apr_thread_mutex_create(&me->lock, APR_THREAD_MUTEX_NESTED,
> +                                 me->pool);
> +    if (APR_SUCCESS != rv) {
> +        return rv;
> +    }
> +    rv = apr_thread_mutex_create(&me->cond_lock, APR_THREAD_MUTEX_UNNESTED,
> +                                 me->pool);
> +    if (APR_SUCCESS != rv) {
> +        apr_thread_mutex_destroy(me->lock);
> +        return rv;
> +    }
> +    rv = apr_thread_cond_create(&me->cond, me->pool);
> +    if (APR_SUCCESS != rv) {
> +        apr_thread_mutex_destroy(me->lock);
> +        apr_thread_mutex_destroy(me->cond_lock);
> +        return rv;
> +    }
> +    me->tasks = apr_palloc(me->pool, sizeof(*me->tasks));
> +    if (!me->tasks) {
> +        goto CATCH_ENOMEM;
> +    }
> +    APR_RING_INIT(me->tasks, apr_thread_pool_task, link);
> +    me->scheduled_tasks = apr_palloc(me->pool, sizeof(*me->scheduled_tasks));
> +    if (!me->scheduled_tasks) {
> +        goto CATCH_ENOMEM;
> +    }
> +    APR_RING_INIT(me->scheduled_tasks, apr_thread_pool_task, link);
> +    me->recycled_tasks = apr_palloc(me->pool, sizeof(*me->recycled_tasks));
> +    if (!me->recycled_tasks) {
> +        goto CATCH_ENOMEM;
> +    }
> +    APR_RING_INIT(me->recycled_tasks, apr_thread_pool_task, link);
> +    me->busy_thds = apr_palloc(me->pool, sizeof(*me->busy_thds));
> +    if (!me->busy_thds) {
> +        goto CATCH_ENOMEM;
> +    }
> +    APR_RING_INIT(me->busy_thds, apr_thread_list_elt, link);
> +    me->idle_thds = apr_palloc(me->pool, sizeof(*me->idle_thds));
> +    if (!me->idle_thds) {
> +        goto CATCH_ENOMEM;
> +    }
> +    APR_RING_INIT(me->idle_thds, apr_thread_list_elt, link);
> +    me->thd_cnt = me->idle_cnt = me->task_cnt = me->scheduled_task_cnt = 0;
> +    me->terminated = 0;
> +    for (i = 0; i < TASK_PRIORITY_SEGS; i++) {
> +        me->task_idx[i] = NULL;
> +    }
> +    goto FINAL_EXIT;
> +  CATCH_ENOMEM:
> +    rv = APR_ENOMEM;
> +    apr_thread_mutex_destroy(me->lock);
> +    apr_thread_mutex_destroy(me->cond_lock);
> +    apr_thread_cond_destroy(me->cond);
> +  FINAL_EXIT:
> +    return rv;
> +}
> +
> +/*
> + * NOTE: This function is not thread safe by itself. Caller should hold the lock
> + */
> +static apr_thread_pool_task_t *pop_task(apr_thread_pool_t * me)
> +{
> +    apr_thread_pool_task_t *task = NULL;
> +    int seg;
> +
> +    /* check for scheduled tasks */
> +    if (me->scheduled_task_cnt > 0) {
> +        task = APR_RING_FIRST(me->scheduled_tasks);
> +        assert(task != NULL);
> +        assert(task !=
> +               APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task,
> +                                 link));
> +        /* if it's time */
> +        if (task->dispatch.time <= apr_time_now()) {
> +            --me->scheduled_task_cnt;
> +            APR_RING_REMOVE(task, link);
> +            return task;
> +        }
> +    }
> +    /* check for normal tasks if we're not returning a scheduled task */
> +    if (me->task_cnt == 0) {
> +        return NULL;
> +    }
> +
> +    task = APR_RING_FIRST(me->tasks);
> +    assert(task != NULL);
> +    assert(task != APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link));
> +    --me->task_cnt;
> +    seg = TASK_PRIORITY_SEG(task);
> +    if (task == me->task_idx[seg]) {
> +        me->task_idx[seg] = APR_RING_NEXT(task, link);
> +        if (me->task_idx[seg] == APR_RING_SENTINEL(me->tasks,
> +                                                   apr_thread_pool_task, link)
> +            || TASK_PRIORITY_SEG(me->task_idx[seg]) != seg) {
> +            me->task_idx[seg] = NULL;
> +        }
> +    }
> +    APR_RING_REMOVE(task, link);
> +    return task;
> +}
> +
> +static apr_interval_time_t waiting_time(apr_thread_pool_t * me)
> +{
> +    apr_thread_pool_task_t *task = NULL;
> +
> +    task = APR_RING_FIRST(me->scheduled_tasks);
> +    assert(task != NULL);
> +    assert(task !=
> +           APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task,
> +                             link));
> +    return task->dispatch.time - apr_time_now();
> +}
> +
> +/*
> + * The worker thread function. Take a task from the queue and perform it if
> + * there is any. Otherwise, put itself into the idle thread list and waiting
> + * for signal to wake up.
> + * The thread terminate directly by detach and exit when it is asked to stop
> + * after finishing a task. Otherwise, the thread should be in idle thread list
> + * and should be joined.
> + */
> +static void *APR_THREAD_FUNC thread_pool_func(apr_thread_t * t, void *param)
> +{
> +    apr_status_t rv = APR_SUCCESS;
> +    apr_thread_pool_t *me = param;
> +    apr_thread_pool_task_t *task = NULL;
> +    apr_interval_time_t wait;
> +    struct apr_thread_list_elt *elt;
> +
> +    elt = apr_pcalloc(me->pool, sizeof(*elt));
> +    if (!elt) {
> +        apr_thread_exit(t, APR_ENOMEM);
> +    }
> +    APR_RING_ELEM_INIT(elt, link);
> +    elt->thd = t;
> +    elt->stop = 0;
> +
> +    apr_thread_mutex_lock(me->lock);
> +    while (!me->terminated && !elt->stop) {
> +        /* if not new element, it is awakened from idle */
> +        if (APR_RING_NEXT(elt, link) != elt) {
> +            --me->idle_cnt;
> +            APR_RING_REMOVE(elt, link);
> +        }
> +        APR_RING_INSERT_TAIL(me->busy_thds, elt, apr_thread_list_elt, link);
> +        task = pop_task(me);
> +        while (NULL != task && !me->terminated) {
> +            elt->current_owner = task->owner;
> +            apr_thread_mutex_unlock(me->lock);
> +            task->func(t, task->param);
> +            apr_thread_mutex_lock(me->lock);
> +            APR_RING_INSERT_TAIL(me->recycled_tasks, task,
> +                                 apr_thread_pool_task, link);
> +            elt->current_owner = NULL;
> +            if (elt->stop) {
> +                break;
> +            }
> +            task = pop_task(me);
> +        }
> +        assert(NULL == elt->current_owner);
> +        APR_RING_REMOVE(elt, link);
> +
> +        /* busy thread been asked to stop, not joinable */
> +        if ((me->idle_cnt >= me->idle_max
> +             && !(me->scheduled_task_cnt && 0 >= me->idle_max))
> +            || me->terminated || elt->stop) {
> +            --me->thd_cnt;
> +            apr_thread_mutex_unlock(me->lock);
> +            apr_thread_detach(t);
> +            apr_thread_exit(t, APR_SUCCESS);
> +            return NULL;        /* should not be here, safe net */
> +        }
> +
> +        ++me->idle_cnt;
> +        APR_RING_INSERT_TAIL(me->idle_thds, elt, apr_thread_list_elt, link);
> +        wait = (me->scheduled_task_cnt) ? waiting_time(me) : -1;
> +        apr_thread_mutex_unlock(me->lock);
> +        apr_thread_mutex_lock(me->cond_lock);
> +        if (wait >= 0) {
> +            rv = apr_thread_cond_timedwait(me->cond, me->cond_lock, wait);
> +        }
> +        else {
> +            rv = apr_thread_cond_wait(me->cond, me->cond_lock);
> +        }
> +        apr_thread_mutex_unlock(me->cond_lock);
> +        apr_thread_mutex_lock(me->lock);
> +    }
> +
> +    /* idle thread been asked to stop, will be joined */
> +    --me->thd_cnt;
> +    apr_thread_mutex_unlock(me->lock);
> +    apr_thread_exit(t, APR_SUCCESS);
> +    return NULL;                /* should not be here, safe net */
> +}
> +
> +static apr_status_t thread_pool_cleanup(void *me)
> +{
> +    apr_thread_pool_t *_self = me;
> +
> +    _self->terminated = 1;
> +    apr_thread_pool_idle_max_set(_self, 0);
> +    while (_self->thd_cnt) {
> +        apr_sleep(20 * 1000);   /* spin lock with 20 ms */
> +    }
> +    apr_thread_mutex_destroy(_self->lock);
> +    apr_thread_mutex_destroy(_self->cond_lock);
> +    apr_thread_cond_destroy(_self->cond);
> +    return APR_SUCCESS;
> +}
> +
> +APR_DECLARE(apr_status_t) apr_thread_pool_create(apr_thread_pool_t ** me,
> +                                                 apr_size_t init_threads,
> +                                                 apr_size_t max_threads,
> +                                                 apr_pool_t * pool)
> +{
> +    apr_thread_t *t;
> +    apr_status_t rv = APR_SUCCESS;
> +
> +    *me = apr_pcalloc(pool, sizeof(**me));
> +    if (!*me) {
> +        return APR_ENOMEM;
> +    }
> +
> +    (*me)->pool = pool;
> +
> +    rv = thread_pool_construct(*me, init_threads, max_threads);
> +    if (APR_SUCCESS != rv) {
> +        *me = NULL;
> +        return rv;
> +    }
> +    apr_pool_cleanup_register(pool, *me, thread_pool_cleanup,
> +                              apr_pool_cleanup_null);
> +
> +    while (init_threads) {
> +        rv = apr_thread_create(&t, NULL, thread_pool_func, *me, (*me)->pool);
> +        if (APR_SUCCESS != rv) {
> +            break;
> +        }
> +        ++(*me)->thd_cnt;
> +        --init_threads;
> +    }
> +
> +    return rv;
> +}
> +
> +APR_DECLARE(apr_status_t) apr_thread_pool_destroy(apr_thread_pool_t * me)
> +{
> +    return apr_pool_cleanup_run(me->pool, me, thread_pool_cleanup);
> +}
> +
> +/*
> + * NOTE: This function is not thread safe by itself. Caller should hold the lock
> + */
> +static apr_thread_pool_task_t *task_new(apr_thread_pool_t * me,
> +                                        apr_thread_start_t func,
> +                                        void *param, apr_byte_t priority,
> +                                        void *owner, apr_time_t time)
> +{
> +    apr_thread_pool_task_t *t;
> +
> +    if (APR_RING_EMPTY(me->recycled_tasks, apr_thread_pool_task, link)) {
> +        t = apr_pcalloc(me->pool, sizeof(*t));
> +        if (NULL == t) {
> +            return NULL;
> +        }
> +    }
> +    else {
> +        t = APR_RING_FIRST(me->recycled_tasks);
> +        APR_RING_REMOVE(t, link);
> +    }
> +
> +    APR_RING_ELEM_INIT(t, link);
> +    t->func = func;
> +    t->param = param;
> +    t->owner = owner;
> +    if (time > 0) {
> +        t->dispatch.time = apr_time_now() + time;
> +    }
> +    else {
> +        t->dispatch.priority = priority;
> +    }
> +    return t;
> +}
> +
> +/*
> + * Test it the task is the only one within the priority segment. 
> + * If it is not, return the first element with same or lower priority. 
> + * Otherwise, add the task into the queue and return NULL.
> + *
> + * NOTE: This function is not thread safe by itself. Caller should hold the lock
> + */
> +static apr_thread_pool_task_t *add_if_empty(apr_thread_pool_t * me,
> +                                            apr_thread_pool_task_t * const t)
> +{
> +    int seg;
> +    int next;
> +    apr_thread_pool_task_t *t_next;
> +
> +    seg = TASK_PRIORITY_SEG(t);
> +    if (me->task_idx[seg]) {
> +        assert(APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) !=
> +               me->task_idx[seg]);
> +        t_next = me->task_idx[seg];
> +        while (t_next->dispatch.priority > t->dispatch.priority) {
> +            t_next = APR_RING_NEXT(t_next, link);
> +            if (APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) ==
> +                t_next) {
> +                return t_next;
> +            }
> +        }
> +        return t_next;
> +    }
> +
> +    for (next = seg - 1; next >= 0; next--) {
> +        if (me->task_idx[next]) {
> +            APR_RING_INSERT_BEFORE(me->task_idx[next], t, link);
> +            break;
> +        }
> +    }
> +    if (0 > next) {
> +        APR_RING_INSERT_TAIL(me->tasks, t, apr_thread_pool_task, link);
> +    }
> +    me->task_idx[seg] = t;
> +    return NULL;
> +}
> +
> +/*
> +*   schedule a task to run in "time" milliseconds. Find the spot in the ring where
> +*   the time fits. Adjust the short_time so the thread wakes up when the time is reached.
> +*/
> +static apr_status_t schedule_task(apr_thread_pool_t * me,
> +                                  apr_thread_start_t func, void *param,
> +                                  void *owner, apr_interval_time_t time)
> +{
> +    apr_thread_pool_task_t *t;
> +    apr_thread_pool_task_t *t_loc;
> +    apr_thread_t *thd;
> +    apr_status_t rv = APR_SUCCESS;
> +    apr_thread_mutex_lock(me->lock);
> +
> +    t = task_new(me, func, param, 0, owner, time);
> +    if (NULL == t) {
> +        apr_thread_mutex_unlock(me->lock);
> +        return APR_ENOMEM;
> +    }
> +    t_loc = APR_RING_FIRST(me->scheduled_tasks);
> +    while (NULL != t_loc) {
> +        /* if the time is less than the entry insert ahead of it */
> +        if (t->dispatch.time < t_loc->dispatch.time) {
> +            ++me->scheduled_task_cnt;
> +            APR_RING_INSERT_BEFORE(t_loc, t, link);
> +            break;
> +        }
> +        else {
> +            t_loc = APR_RING_NEXT(t_loc, link);
> +            if (t_loc ==
> +                APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task,
> +                                  link)) {
> +                ++me->scheduled_task_cnt;
> +                APR_RING_INSERT_TAIL(me->scheduled_tasks, t,
> +                                     apr_thread_pool_task, link);
> +                break;
> +            }
> +        }
> +    }
> +    /* there should be at least one thread for scheduled tasks */
> +    if (0 == me->thd_cnt) {
> +        rv = apr_thread_create(&thd, NULL, thread_pool_func, me, me->pool);
> +        if (APR_SUCCESS == rv) {
> +            ++me->thd_cnt;
> +        }
> +    }
> +    apr_thread_mutex_unlock(me->lock);
> +    apr_thread_mutex_lock(me->cond_lock);
> +    apr_thread_cond_signal(me->cond);
> +    apr_thread_mutex_unlock(me->cond_lock);
> +    return rv;
> +}
> +
> +static apr_status_t add_task(apr_thread_pool_t * me, apr_thread_start_t func,
> +                             void *param, apr_byte_t priority, int push,
> +                             void *owner)
> +{
> +    apr_thread_pool_task_t *t;
> +    apr_thread_pool_task_t *t_loc;
> +    apr_thread_t *thd;
> +    apr_status_t rv = APR_SUCCESS;
> +
> +    apr_thread_mutex_lock(me->lock);
> +
> +    t = task_new(me, func, param, priority, owner, 0);
> +    if (NULL == t) {
> +        apr_thread_mutex_unlock(me->lock);
> +        return APR_ENOMEM;
> +    }
> +
> +    t_loc = add_if_empty(me, t);
> +    if (NULL == t_loc) {
> +        goto FINAL_EXIT;
> +    }
> +
> +    if (push) {
> +        while (APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) !=
> +               t_loc && t_loc->dispatch.priority >= t->dispatch.priority) {
> +            t_loc = APR_RING_NEXT(t_loc, link);
> +        }
> +    }
> +    APR_RING_INSERT_BEFORE(t_loc, t, link);
> +    if (!push) {
> +        if (t_loc == me->task_idx[TASK_PRIORITY_SEG(t)]) {
> +            me->task_idx[TASK_PRIORITY_SEG(t)] = t;
> +        }
> +    }
> +
> +  FINAL_EXIT:
> +    me->task_cnt++;
> +    if (0 == me->thd_cnt || (0 == me->idle_cnt && me->thd_cnt < me->thd_max &&
> +                             me->task_cnt > me->threshold)) {
> +        rv = apr_thread_create(&thd, NULL, thread_pool_func, me, me->pool);
> +        if (APR_SUCCESS == rv) {
> +            ++me->thd_cnt;
> +        }
> +    }
> +    apr_thread_mutex_unlock(me->lock);
> +
> +    apr_thread_mutex_lock(me->cond_lock);
> +    apr_thread_cond_signal(me->cond);
> +    apr_thread_mutex_unlock(me->cond_lock);
> +
> +    return rv;
> +}
> +
> +APR_DECLARE(apr_status_t) apr_thread_pool_push(apr_thread_pool_t * me,
> +                                               apr_thread_start_t func,
> +                                               void *param,
> +                                               apr_byte_t priority,
> +                                               void *owner)
> +{
> +    return add_task(me, func, param, priority, 1, owner);
> +}
> +
> +APR_DECLARE(apr_status_t) apr_thread_pool_schedule(apr_thread_pool_t * me,
> +                                                   apr_thread_start_t func,
> +                                                   void *param,
> +                                                   apr_interval_time_t time,
> +                                                   void *owner)
> +{
> +    return schedule_task(me, func, param, owner, time);
> +}
> +
> +APR_DECLARE(apr_status_t) apr_thread_pool_top(apr_thread_pool_t * me,
> +                                              apr_thread_start_t func,
> +                                              void *param,
> +                                              apr_byte_t priority,
> +                                              void *owner)
> +{
> +    return add_task(me, func, param, priority, 0, owner);
> +}
> +
> +static apr_status_t remove_scheduled_tasks(apr_thread_pool_t * me,
> +                                           void *owner)
> +{
> +    apr_thread_pool_task_t *t_loc;
> +    apr_thread_pool_task_t *next;
> +
> +    t_loc = APR_RING_FIRST(me->scheduled_tasks);
> +    while (t_loc !=
> +           APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task,
> +                             link)) {
> +        next = APR_RING_NEXT(t_loc, link);
> +        /* if this is the owner remove it */
> +        if (t_loc->owner == owner) {
> +            --me->scheduled_task_cnt;
> +            APR_RING_REMOVE(t_loc, link);
> +        }
> +        t_loc = next;
> +    }
> +    return APR_SUCCESS;
> +}
> +
> +static apr_status_t remove_tasks(apr_thread_pool_t * me, void *owner)
> +{
> +    apr_thread_pool_task_t *t_loc;
> +    apr_thread_pool_task_t *next;
> +    int seg;
> +
> +    t_loc = APR_RING_FIRST(me->tasks);
> +    while (t_loc != APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link)) {
> +        next = APR_RING_NEXT(t_loc, link);
> +        if (t_loc->owner == owner) {
> +            --me->task_cnt;
> +            seg = TASK_PRIORITY_SEG(t_loc);
> +            if (t_loc == me->task_idx[seg]) {
> +                me->task_idx[seg] = APR_RING_NEXT(t_loc, link);
> +                if (me->task_idx[seg] == APR_RING_SENTINEL(me->tasks,
> +                                                           apr_thread_pool_task,
> +                                                           link)
> +                    || TASK_PRIORITY_SEG(me->task_idx[seg]) != seg) {
> +                    me->task_idx[seg] = NULL;
> +                }
> +            }
> +            APR_RING_REMOVE(t_loc, link);
> +        }
> +        t_loc = next;
> +    }
> +    return APR_SUCCESS;
> +}
> +
> +static void wait_on_busy_threads(apr_thread_pool_t * me, void *owner)
> +{
> +    struct apr_thread_list_elt *elt;
> +    apr_thread_mutex_lock(me->lock);
> +    elt = APR_RING_FIRST(me->busy_thds);
> +    while (elt != APR_RING_SENTINEL(me->busy_thds, apr_thread_list_elt, link)) {
> +#ifndef NDEBUG
> +        /* make sure the thread is not the one calling tasks_cancel */
> +        apr_os_thread_t *os_thread;
> +        apr_os_thread_get(&os_thread, elt->thd);
> +#ifdef WIN32
> +        /* hack for apr win32 bug */
> +        assert(!apr_os_thread_equal(apr_os_thread_current(), os_thread));
> +#else
> +        assert(!apr_os_thread_equal(apr_os_thread_current(), *os_thread));
> +#endif
> +#endif
> +        if (elt->current_owner != owner) {
> +            elt = APR_RING_NEXT(elt, link);
> +            continue;
> +        }
> +        while (elt->current_owner == owner) {
> +            apr_thread_mutex_unlock(me->lock);
> +            apr_sleep(200 * 1000);
> +            apr_thread_mutex_lock(me->lock);
> +        }
> +        elt = APR_RING_FIRST(me->busy_thds);
> +    }
> +    apr_thread_mutex_unlock(me->lock);
> +    return;
> +}
> +
> +APR_DECLARE(apr_status_t) apr_thread_pool_tasks_cancel(apr_thread_pool_t * me,
> +                                                       void *owner)
> +{
> +    apr_status_t rv = APR_SUCCESS;
> +
> +    apr_thread_mutex_lock(me->lock);
> +    if (me->task_cnt > 0) {
> +        rv = remove_tasks(me, owner);
> +    }
> +    if (me->scheduled_task_cnt > 0) {
> +        rv = remove_scheduled_tasks(me, owner);
> +    }
> +    apr_thread_mutex_unlock(me->lock);
> +    wait_on_busy_threads(me, owner);
> +
> +    return rv;
> +}
> +
> +APR_DECLARE(apr_size_t) apr_thread_pool_tasks_count(apr_thread_pool_t * me)
> +{
> +    return me->task_cnt;
> +}
> +
> +APR_DECLARE(apr_size_t)
> +    apr_thread_pool_scheduled_tasks_count(apr_thread_pool_t * me)
> +{
> +    return me->scheduled_task_cnt;
> +}
> +
> +APR_DECLARE(apr_size_t) apr_thread_pool_threads_count(apr_thread_pool_t * me)
> +{
> +    return me->thd_cnt;
> +}
> +
> +APR_DECLARE(apr_size_t) apr_thread_pool_busy_count(apr_thread_pool_t * me)
> +{
> +    return me->thd_cnt - me->idle_cnt;
> +}
> +
> +APR_DECLARE(apr_size_t) apr_thread_pool_idle_count(apr_thread_pool_t * me)
> +{
> +    return me->idle_cnt;
> +}
> +
> +APR_DECLARE(apr_size_t) apr_thread_pool_idle_max_get(apr_thread_pool_t * me)
> +{
> +    return me->idle_max;
> +}
> +
> +/*
> + * This function stop extra idle threads to the cnt.
> + * @return the number of threads stopped
> + * NOTE: There could be busy threads become idle during this function
> + */
> +static struct apr_thread_list_elt *trim_threads(apr_thread_pool_t * me,
> +                                                apr_size_t * cnt, int idle)
> +{
> +    struct apr_thread_list *thds;
> +    apr_size_t n, n_dbg, i;
> +    struct apr_thread_list_elt *head, *tail, *elt;
> +
> +    apr_thread_mutex_lock(me->lock);
> +    if (idle) {
> +        thds = me->idle_thds;
> +        n = me->idle_cnt;
> +    }
> +    else {
> +        thds = me->busy_thds;
> +        n = me->thd_cnt - me->idle_cnt;
> +    }
> +    if (n <= *cnt) {
> +        apr_thread_mutex_unlock(me->lock);
> +        *cnt = 0;
> +        return NULL;
> +    }
> +    n -= *cnt;
> +
> +    head = APR_RING_FIRST(thds);
> +    for (i = 0; i < *cnt; i++) {
> +        head = APR_RING_NEXT(head, link);
> +    }
> +    tail = APR_RING_LAST(thds);
> +    APR_RING_UNSPLICE(head, tail, link);
> +    if (idle) {
> +        me->idle_cnt = *cnt;
> +    }
> +    apr_thread_mutex_unlock(me->lock);
> +
> +    n_dbg = 0;
> +    for (elt = head; elt != tail; elt = APR_RING_NEXT(elt, link)) {
> +        elt->stop = 1;
> +        n_dbg++;
> +    }
> +    elt->stop = 1;
> +    n_dbg++;
> +    assert(n == n_dbg);
> +    *cnt = n;
> +
> +    APR_RING_PREV(head, link) = NULL;
> +    APR_RING_NEXT(tail, link) = NULL;
> +    return head;
> +}
> +
> +static apr_size_t trim_idle_threads(apr_thread_pool_t * me, apr_size_t cnt)
> +{
> +    apr_size_t n_dbg;
> +    struct apr_thread_list_elt *elt;
> +    apr_status_t rv;
> +
> +    elt = trim_threads(me, &cnt, 1);
> +
> +    apr_thread_mutex_lock(me->cond_lock);
> +    apr_thread_cond_broadcast(me->cond);
> +    apr_thread_mutex_unlock(me->cond_lock);
> +
> +    n_dbg = 0;
> +    while (elt) {
> +        apr_thread_join(&rv, elt->thd);
> +        elt = APR_RING_NEXT(elt, link);
> +        ++n_dbg;
> +    }
> +    assert(cnt == n_dbg);
> +
> +    return cnt;
> +}
> +
> +/* don't join on busy threads for performance reasons, who knows how long will
> + * the task takes to perform
> + */
> +static apr_size_t trim_busy_threads(apr_thread_pool_t * me, apr_size_t cnt)
> +{
> +    trim_threads(me, &cnt, 0);
> +    return cnt;
> +}
> +
> +APR_DECLARE(apr_size_t) apr_thread_pool_idle_max_set(apr_thread_pool_t * me,
> +                                                     apr_size_t cnt)
> +{
> +    me->idle_max = cnt;
> +    cnt = trim_idle_threads(me, cnt);
> +    return cnt;
> +}
> +
> +APR_DECLARE(apr_size_t) apr_thread_pool_thread_max_get(apr_thread_pool_t * me)
> +{
> +    return me->thd_max;
> +}
> +
> +/*
> + * This function stop extra working threads to the new limit.
> + * NOTE: There could be busy threads become idle during this function
> + */
> +APR_DECLARE(apr_size_t) apr_thread_pool_thread_max_set(apr_thread_pool_t * me,
> +                                                       apr_size_t cnt)
> +{
> +    unsigned int n;
> +
> +    me->thd_max = cnt;
> +    if (0 == cnt || me->thd_cnt <= cnt) {
> +        return 0;
> +    }
> +
> +    n = me->thd_cnt - cnt;
> +    if (n >= me->idle_cnt) {
> +        trim_busy_threads(me, n - me->idle_cnt);
> +        trim_idle_threads(me, 0);
> +    }
> +    else {
> +        trim_idle_threads(me, me->idle_cnt - n);
> +    }
> +    return n;
> +}
> +
> +APR_DECLARE(apr_size_t) apr_thread_pool_threshold_get(apr_thread_pool_t * me)
> +{
> +    return me->threshold;
> +}
> +
> +APR_DECLARE(apr_size_t) apr_thread_pool_threshold_set(apr_thread_pool_t * me,
> +                                                      apr_size_t val)
> +{
> +    apr_size_t ov;
> +
> +    ov = me->threshold;
> +    me->threshold = val;
> +    return ov;
> +}
> +
> +#endif /* APR_HAS_THREADS */
> +
> +/* vim: set ts=4 sw=4 et cin tw=80: */


Re: [PATCH] apr_threadpool

Posted by Henry Jen <he...@ztune.net>.
William A. Rowe, Jr. wrote:
> Henry Jen wrote:
>>  
>> +/**
>> + * Get owner of the task currently been executed by the thread. 
>> + * @param thd The thread is executing a task 
>> + * @param owner Pointer to receive owner of the task.
>> + * @return APR_SUCCESS if the owner is retrieved successfully
>> + */
>> +APR_DECLARE(apr_status_t) apr_thread_pool_task_owner_get(apr_thread_t * thd,
>> +                                                         void **owner);
> 
> the void* containing what(???)  Can we define an owner a bit more deliberately?
> 

It is the pointer passed in when push/top/schedule a task, simply a 
opaque pointer to present the owner of the task.

Cheers,
Henry

Re: [PATCH] apr_threadpool

Posted by "William A. Rowe, Jr." <wr...@rowe-clan.net>.
Henry Jen wrote:
>  
> +/**
> + * Get owner of the task currently been executed by the thread. 
> + * @param thd The thread is executing a task 
> + * @param owner Pointer to receive owner of the task.
> + * @return APR_SUCCESS if the owner is retrieved successfully
> + */
> +APR_DECLARE(apr_status_t) apr_thread_pool_task_owner_get(apr_thread_t * thd,
> +                                                         void **owner);

the void* containing what(???)  Can we define an owner a bit more deliberately?

Re: [PATCH] apr_threadpool

Posted by Henry Jen <he...@ztune.net>.
William A. Rowe, Jr. wrote:
> Henry,
> 
> thank you for your submission.
> 
> http://svn.apache.org/viewvc?view=rev&revision=492362
> 
> Please remember, the API is flexible until 1.3.0 (or 2.0.0) is tagged and
> released, at which point the new files become subject to the (rather strict)
> versioning rules.
> 

Hi Bill,

Happy 2007! This is great, thank you for all the help to get this committed.

Attached is a small patch, it includes two modifications:

1. Fix the assert, which should only for preventing task from owner 
itself calling cancel.

2. Add a apr_thread_pool_task_owner_get API to retrieve the owner of the 
task. This is convenient and can, in many cases, eliminate the need to 
create a structure to use as parameters.

Cheers,
Henry