You are viewing a plain text version of this content. The canonical link for it is here.
Posted to cvs@httpd.apache.org by rb...@apache.org on 2001/08/05 20:41:38 UTC

cvs commit: httpd-2.0/server/mpm/worker fdqueue.c fdqueue.h worker.c

rbb         01/08/05 11:41:38

  Modified:    server/mpm/worker fdqueue.c fdqueue.h worker.c
  Log:
  Get the worker MPM working again.  This should fix the serialization
  problems, and it makes up initialize the queue only once.
  
  Revision  Changes    Path
  1.3       +63 -59    httpd-2.0/server/mpm/worker/fdqueue.c
  
  Index: fdqueue.c
  ===================================================================
  RCS file: /home/cvs/httpd-2.0/server/mpm/worker/fdqueue.c,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- fdqueue.c	2001/07/31 15:35:28	1.2
  +++ fdqueue.c	2001/08/05 18:41:38	1.3
  @@ -57,112 +57,116 @@
    */
   
   #include "fdqueue.h"
  -#include "apr_pools.h"
   
  -/* Assumption: queue itself is allocated by the user */
   /* Assumption: increment and decrement are atomic on int */
   
  -int ap_queue_size(FDQueue *queue) {
  -    return ((queue->tail - queue->head + queue->bounds) % queue->bounds);
  -}
  -
  -int ap_queue_full(FDQueue *queue) {
  -    return(queue->blanks <= 0);
  -}
  -
  -int ap_block_on_queue(FDQueue *queue) {
  -#if 0
  +int ap_increase_blanks(FDQueue *queue) 
  +{
       if (pthread_mutex_lock(&queue->one_big_mutex) != 0) {
           return FD_QUEUE_FAILURE;
       }
  -#endif
  -    if (ap_queue_full(queue)) {
  -        pthread_cond_wait(&queue->not_full, &queue->one_big_mutex);
  -    }
  -#if 0
  +    queue->blanks++;
       if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) {
           return FD_QUEUE_FAILURE;
       }
  -#endif
  -    return FD_QUEUE_SUCCESS;
  -}
  -
  -static int increase_blanks(FDQueue *queue) {
  -    queue->blanks++;
       return FD_QUEUE_SUCCESS;
   }
   
  -static apr_status_t ap_queue_destroy(void *data) {
  +static apr_status_t ap_queue_destroy(void *data) 
  +{
       FDQueue *queue = data;
       /* Ignore errors here, we can't do anything about them anyway */
  -    pthread_cond_destroy(&queue->not_empty);
  -    pthread_cond_destroy(&queue->not_full);
  +    pthread_cond_destroy(&(queue->not_empty));
  +    pthread_cond_destroy(&(queue->not_full));
       pthread_mutex_destroy(&queue->one_big_mutex);
       return FD_QUEUE_SUCCESS;
   }
   
  -int ap_queue_init(FDQueue *queue, int queue_capacity, apr_pool_t *a) {
  +int ap_queue_init(FDQueue *queue, int queue_capacity, apr_pool_t *a) 
  +{
       int i;
       int bounds = queue_capacity + 1;
  +    pthread_cond_t not_empty = PTHREAD_COND_INITIALIZER;
  +    pthread_cond_t not_full = PTHREAD_COND_INITIALIZER;
  +    queue->not_empty = not_empty;
  +    queue->not_full = not_full;
       pthread_mutex_init(&queue->one_big_mutex, NULL);
  -    pthread_cond_init(&queue->not_empty, NULL);
  -    pthread_cond_init(&queue->not_full, NULL);
       queue->head = queue->tail = 0;
       queue->data = apr_palloc(a, bounds * sizeof(FDQueueElement));
       queue->bounds = bounds;
  -    queue->blanks = queue_capacity;
  +    queue->blanks = 0;
       apr_pool_cleanup_register(a, queue, ap_queue_destroy, apr_pool_cleanup_null);
       for (i=0; i < bounds; ++i)
           queue->data[i].sd = NULL;
       return FD_QUEUE_SUCCESS;
   }
   
  -int ap_queue_push(FDQueue *queue, apr_socket_t *sd, apr_pool_t *p) {
  +int ap_queue_push(FDQueue *queue, apr_socket_t *sd, apr_pool_t *p) 
  +{
  +    if (pthread_mutex_lock(&queue->one_big_mutex) != 0) {
  +        return FD_QUEUE_FAILURE;
  +    }
       queue->data[queue->tail].sd = sd;
  -    queue->data[queue->tail].p  = p;
  +    queue->data[queue->tail].p = p;
       queue->tail = (queue->tail + 1) % queue->bounds;
       queue->blanks--;
  -    pthread_cond_signal(&queue->not_empty);
  -#if 0
  -    if (queue->head == (queue->tail + 1) % queue->bounds) {
  -#endif
  -    if (ap_queue_full(queue)) {
  -        pthread_cond_wait(&queue->not_full, &queue->one_big_mutex);
  +    if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) {
  +        return FD_QUEUE_FAILURE;
       }
  +    pthread_cond_signal(&(queue->not_empty));
       return FD_QUEUE_SUCCESS;
   }
   
  -apr_status_t ap_queue_pop(FDQueue *queue, apr_socket_t **sd, apr_pool_t **p, int block_if_empty) {
  -    increase_blanks(queue);
  -    /* We have just removed one from the queue.  By definition, it is
  -     * no longer full.  We can ALWAYS signal the listener thread at
  -     * this point.  However, the original code didn't do it this way,
  -     * so I am leaving the original code in, just commented out.  BTW,
  -     * originally, the increase_blanks wasn't in this function either.
  -     *
  -     if (queue->blanks > 0) {
  -     */
  -    pthread_cond_signal(&queue->not_full);
  -
  -    /*    }    */
  +apr_status_t ap_queue_pop(FDQueue *queue, apr_socket_t **sd, apr_pool_t **p) 
  +{
  +    if (pthread_mutex_lock(&queue->one_big_mutex) != 0) {
  +        return FD_QUEUE_FAILURE;
  +    }
       if (queue->head == queue->tail) {
  -        if (block_if_empty) {
  -            pthread_cond_wait(&queue->not_empty, &queue->one_big_mutex);
  -fprintf(stderr, "Found a non-empty queue  :-)\n");
  -        }
  +        pthread_cond_wait(&(queue->not_empty), &queue->one_big_mutex);
       } 
       
       *sd = queue->data[queue->head].sd;
  -    *p  = queue->data[queue->head].p;
  +    *p = queue->data[queue->head].p;
       queue->data[queue->head].sd = NULL;
  -    if (*sd != NULL) {
  +    queue->data[queue->head].p = NULL;
  +    if (sd != NULL) {
           queue->head = (queue->head + 1) % queue->bounds;
       }
  +    if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) {
  +        return FD_QUEUE_FAILURE;
  +    }
  +    if (queue->blanks > 0) {
  +        pthread_cond_signal(&(queue->not_full));
  +    }
       return APR_SUCCESS;
   }
   
  +int ap_queue_size(FDQueue *queue) 
  +{
  +    return ((queue->tail - queue->head + queue->bounds) % queue->bounds);
  +}
  +
  +int ap_queue_full(FDQueue *queue) 
  +{
  +    return(queue->blanks <= 0);
  +}
  +
  +int ap_block_on_queue(FDQueue *queue) 
  +{
  +    if (pthread_mutex_lock(&queue->one_big_mutex) != 0) {
  +        return FD_QUEUE_FAILURE;
  +    }
  +    if (ap_queue_full(queue)) {
  +        pthread_cond_wait(&(queue->not_full), &queue->one_big_mutex);
  +    }
  +    if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) {
  +        return FD_QUEUE_FAILURE;
  +    }
  +    return FD_QUEUE_SUCCESS;
  +}
  +
   void ap_queue_signal_all_wakeup(FDQueue *queue)
   {
  -fprintf(stderr, "trying to broadcast to all workers\n");
  -    pthread_cond_broadcast(&queue->not_empty);
  +    pthread_cond_broadcast(&(queue->not_empty));
   }
  
  
  
  1.3       +2 -1      httpd-2.0/server/mpm/worker/fdqueue.h
  
  Index: fdqueue.h
  ===================================================================
  RCS file: /home/cvs/httpd-2.0/server/mpm/worker/fdqueue.h,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- fdqueue.h	2001/08/04 11:40:16	1.2
  +++ fdqueue.h	2001/08/05 18:41:38	1.3
  @@ -87,10 +87,11 @@
   
   int ap_queue_init(FDQueue *queue, int queue_size, apr_pool_t *a);
   int ap_queue_push(FDQueue *queue, apr_socket_t *sd, apr_pool_t *p);
  -apr_status_t ap_queue_pop(FDQueue *queue, apr_socket_t **sd, apr_pool_t **p, int block_if_empty);
  +apr_status_t ap_queue_pop(FDQueue *queue, apr_socket_t **sd, apr_pool_t **p);
   int ap_queue_size(FDQueue *queue);
   int ap_queue_full(FDQueue *queue);
   int ap_block_on_queue(FDQueue *queue);
   void ap_queue_signal_all_wakeup(FDQueue *queue);
  +int ap_increase_blanks(FDQueue *queue);
   
   #endif /* FDQUEUE_H */
  
  
  
  1.8       +10 -10    httpd-2.0/server/mpm/worker/worker.c
  
  Index: worker.c
  ===================================================================
  RCS file: /home/cvs/httpd-2.0/server/mpm/worker/worker.c,v
  retrieving revision 1.7
  retrieving revision 1.8
  diff -u -r1.7 -r1.8
  --- worker.c	2001/08/05 18:08:49	1.7
  +++ worker.c	2001/08/05 18:41:38	1.8
  @@ -510,7 +510,6 @@
   /* Sets workers_may_exit if we received a character on the pipe_of_death */
   static void check_pipe_of_death(void)
   {
  -fprintf(stderr, "looking at pipe of death\n");
       apr_lock_acquire(pipe_of_death_mutex);
       if (!workers_may_exit) {
           apr_status_t ret;
  @@ -684,7 +683,8 @@
       free(ti);
   
       while (!workers_may_exit) {
  -        ap_queue_pop(worker_queue, &csd, &ptrans, 1);
  +        ap_queue_pop(worker_queue, &csd, &ptrans);
  +        ap_increase_blanks(worker_queue);
           process_socket(ptrans, csd, process_slot, thread_slot);
           requests_this_child--;
           apr_pool_clear(ptrans);
  @@ -729,21 +729,21 @@
       apr_thread_t **threads = ts->threads;
       apr_threadattr_t *thread_attr = ts->threadattr;
       int child_num_arg = ts->child_num_arg;
  -    int i;
       int my_child_num = child_num_arg;
       proc_info *my_info = NULL;
       apr_status_t rv;
  +    int i = 0;
       int threads_created = 0;
       apr_thread_t *listener;
   
  +    my_info = (proc_info *)malloc(sizeof(proc_info));
  +    my_info->pid = my_child_num;
  +    my_info->tid = i;
  +    my_info->sd = 0;
  +    apr_pool_create(&my_info->tpool, pchild);
  +    apr_thread_create(&listener, thread_attr, listener_thread, my_info, pchild);
       while (1) {
  -        my_info = (proc_info *)malloc(sizeof(proc_info));
  -        my_info->pid = my_child_num;
  -        my_info->tid = i;
  -        my_info->sd = 0;
  -        apr_pool_create(&my_info->tpool, pchild);
  -	apr_thread_create(&listener, thread_attr, listener_thread, my_info, pchild);
  -        for (i=0; i < ap_threads_per_child; i++) {
  +        for (i=1; i < ap_threads_per_child; i++) {
               int status = ap_scoreboard_image->servers[child_num_arg][i].status;
   
               if (status != SERVER_GRACEFUL && status != SERVER_DEAD) {