You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@httpd.apache.org by Ian Holsman <ia...@cnet.com> on 2001/11/25 01:35:56 UTC

Re: [PATCH] time-space tradeoff (reuse tpool, one CV per worker thread)

On Fri, 2001-11-23 at 15:01, Aaron Bannert wrote:
> On Fri, Nov 23, 2001 at 11:46:25AM -0800, Brian Pane wrote:
> > Sounds good.  I think the "apr_pool_create_for_thread()" function that
> > I proposed earlier this morning will work well in combination with the
> > "time-space-tradeoff" worker design, so I'll continue with the prototyping
> > on the former.
> 
> Here's an updated version of my worker redesign. The "queue" is really a
> stack, but I didn't change the name for the sake of having a readable
> patch -- if we end up going with this patch I'll rename everything to
> "stack".
> 
Hi Aaron.
I've applied your patch.
and am benchamrking it now.
One thing I noticed is that on startup I get
[error] (4)Interrupted system call: apr_accept

but the server runs fine regardless.

..Ian
> Some preliminary results: uniprocessor sol8/intel hitting /index.html.en
> (ab -c 10 -n 10000):
> 
> new code 633.6 r/s (this patch)
> mpstat 5:
> CPU minf mjf xcal  intr ithr  csw icsw migr smtx  srw syscl  usr sys  wt idl
>   0    6   0    0  4032 3932 6401 2856    0   45    0 21235   31  61   0   8
>   0    7   0    0  3878 3778 6296 2763    0   41    0 20404   32  57   0  10
> 
> old code 629.6 r/s (cvs HEAD)
> mpstat 5:
> CPU minf mjf xcal  intr ithr  csw icsw migr smtx  srw syscl  usr sys  wt idl
>   0    4   0    0  4073 3973 6302 2526    0   57    0 25240   33  59   0   8
>   0    7   0    0  3704 3604 5757 2421    0   43    2 22985   28  55   0  17
> 
> I don't expect this small sampling to predict future performance, but at
> least it proves that I didn't make any mistakes in bringing this patch
> back up to date. It also shows that at least under these conditions this
> design performs on par with the current worker code.
> 
> -aaron
> 
> 
> Index: server/mpm/worker/worker.c
> ===================================================================
> RCS file: /home/cvs/httpd-2.0/server/mpm/worker/worker.c,v
> retrieving revision 1.43
> diff -u -r1.43 worker.c
> --- server/mpm/worker/worker.c	2001/11/22 05:13:29	1.43
> +++ server/mpm/worker/worker.c	2001/11/23 22:32:47
> @@ -68,9 +68,10 @@
>  #include "apr_strings.h"
>  #include "apr_file_io.h"
>  #include "apr_thread_proc.h"
> -#include "apr_signal.h"
>  #include "apr_thread_mutex.h"
> +#include "apr_thread_cond.h"
>  #include "apr_proc_mutex.h"
> +#include "apr_signal.h"
>  #define APR_WANT_STRFUNC
>  #include "apr_want.h"
>  
> @@ -141,6 +142,25 @@
>      apr_threadattr_t *threadattr;
>  } thread_starter;
>  
> +/* State of a particular worker.
> + */
> +typedef enum {
> +    WORKER_ELEM_IDLE,    /* 0 - idle (ready for another connection) */
> +    WORKER_ELEM_BUSY,    /* 1 - busy (currently processing a connection) */
> +    WORKER_ELEM_QUIT     /* 2 - time to quit */
> +} worker_elem_state_e;
> +
> +/* Structure used to keep track of the current state of a particular
> + * worker thread.
> + */
> +typedef struct {
> +    apr_pool_t          *pool;  /* pool to use when calling accept() */
> +    apr_socket_t        *sd;    /* socket returned from accept() */
> +    worker_elem_state_e  state;
> +    apr_thread_mutex_t  *mutex;
> +    apr_thread_cond_t   *cond;
> +} worker_elem_t;
> +
>  /*
>   * The max child slot ever assigned, preserved across restarts.  Necessary
>   * to deal with MaxClients changes across AP_SIG_GRACEFUL restarts.  We 
> @@ -202,8 +222,6 @@
>  static void signal_workers(void)
>  {
>      workers_may_exit = 1;
> -    /* XXX: This will happen naturally on a graceful, and we don't care otherwise.
> -    ap_queue_signal_all_wakeup(worker_queue); */
>      ap_queue_interrupt_all(worker_queue);
>  }
>  
> @@ -562,9 +580,8 @@
>      int process_slot = ti->pid;
>      int thread_slot = ti->tid;
>      apr_pool_t *tpool = apr_thread_pool_get(thd);
> -    void *csd = NULL;
> -    apr_pool_t *ptrans;		/* Pool for per-transaction stuff */
> -    int n;
> +    int n, numalive;
> +    worker_elem_t *a_worker;
>      apr_pollfd_t *pollset;
>      apr_status_t rv;
>      ap_listen_rec *lr, *last_lr = ap_listeners;
> @@ -639,10 +656,23 @@
>          }
>      got_fd:
>          if (!workers_may_exit) {
> -            /* create a new transaction pool for each accepted socket */
> -            apr_pool_create(&ptrans, tpool);
> -
> -            rv = lr->accept_func(&csd, lr, ptrans);
> +            if ((rv = ap_queue_pop(worker_queue, (void **)&a_worker))
> +                != APR_SUCCESS) {
> +                signal_workers();
> +            }
> +            if ((rv = apr_thread_mutex_lock(a_worker->mutex))
> +                != APR_SUCCESS) {
> +                ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
> +                             "apr_thread_mutex_lock failed. Attempting "
> +                             "to shutdown process gracefully.");
> +                signal_workers();
> +            }
> +            if ((rv = lr->accept_func(&a_worker->sd, lr, a_worker->pool))
> +                != APR_SUCCESS) {
> +                a_worker->sd = NULL;
> +                ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf,
> +                             "apr_accept");
> +            }
>  
>              if (rv == APR_EGENERAL) {
>                  signal_workers();
> @@ -654,17 +684,17 @@
>                               "shutdown process gracefully.");
>                  signal_workers();
>              }
> -            if (csd != NULL) {
> -                rv = ap_queue_push(worker_queue, csd, ptrans);
> -                if (rv) {
> -                    /* trash the connection; we couldn't queue the connected
> -                     * socket to a worker 
> -                     */
> -                    apr_socket_close(csd);
> -                    ap_log_error(APLOG_MARK, APLOG_CRIT, 0, ap_server_conf,
> -                                 "ap_queue_push failed with error code %d",
> -                                 rv);
> -                }
> +
> +            /* Signal worker that it's time to go. */
> +            a_worker->state = WORKER_ELEM_BUSY;
> +            apr_thread_cond_signal(a_worker->cond);
> +
> +            if ((rv = apr_thread_mutex_unlock(a_worker->mutex))
> +                != APR_SUCCESS) {
> +                ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
> +                             "apr_proc_mutex_unlock failed. Attempting to "
> +                             "shutdown process gracefully.");
> +                signal_workers();
>              }
>          }
>          else {
> @@ -679,6 +709,24 @@
>          }
>      }
>  
> +    /* Kill off the workers in a nice way. */
> +    numalive = ap_threads_per_child;
> +    while (numalive > 0) {
> +        if ((rv = ap_queue_pop(worker_queue, (void *)&a_worker))
> +            != APR_SUCCESS) {
> +            ap_log_error(APLOG_MARK, APLOG_EMERG, 0, ap_server_conf,
> +                         "ap_queue_pop failed during shutdown with error "
> +                         "code %d", rv);
> +        }
> +        else {
> +            apr_thread_mutex_lock(a_worker->mutex);
> +            a_worker->state = WORKER_ELEM_QUIT;
> +            apr_thread_cond_signal(a_worker->cond);
> +            apr_thread_mutex_unlock(a_worker->mutex);
> +        }
> +        --numalive;
> +    }
> +
>      ap_update_child_status(process_slot, thread_slot, (dying) ? SERVER_DEAD : SERVER_GRACEFUL,
>          (request_rec *) NULL);
>      dying = 1;
> @@ -691,35 +739,81 @@
>      return NULL;
>  }
>  
> -static void *worker_thread(apr_thread_t *thd, void * dummy)
> +static void *worker_thread(apr_thread_t *thd, void *dummy)
>  {
>      proc_info * ti = dummy;
>      int process_slot = ti->pid;
>      int thread_slot = ti->tid;
> -    apr_socket_t *csd = NULL;
> -    apr_pool_t *ptrans;		/* Pool for per-transaction stuff */
> +    apr_pool_t *tpool = apr_thread_pool_get(thd);
> +    worker_elem_t my_state;
>      apr_status_t rv;
>  
> +    ap_update_child_status(process_slot, thread_slot, SERVER_STARTING, NULL);
> +
>      free(ti);
>  
> -    ap_update_child_status(process_slot, thread_slot, SERVER_STARTING, NULL);
> -    while (!workers_may_exit) {
> +    apr_pool_create(&my_state.pool, tpool);
> +    if ((rv = apr_thread_mutex_create(&my_state.mutex,
> +                                      APR_THREAD_MUTEX_DEFAULT, tpool))
> +        != APR_SUCCESS) {
> +        ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
> +                     "apr_thread_mutex_create");
> +    }
> +    if ((rv = apr_thread_cond_create(&my_state.cond, tpool))
> +        != APR_SUCCESS) {
> +        ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
> +                     "apr_thread_cond_create");
> +    }
> +
> +    if ((rv = apr_thread_mutex_lock(my_state.mutex)) != APR_SUCCESS) {
> +        ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
> +                     "apr_thread_mutex_lock");
> +    }
> +
> +    while (1) {
> +        my_state.sd = NULL;
> +        my_state.state = WORKER_ELEM_IDLE;
> +
>          ap_update_child_status(process_slot, thread_slot, SERVER_READY, NULL);
> -        rv = ap_queue_pop(worker_queue, &csd, &ptrans);
> -        /* We get FD_QUEUE_EINTR whenever ap_queue_pop() has been interrupted
> -         * from an explicit call to ap_queue_interrupt_all(). This allows
> -         * us to unblock threads stuck in ap_queue_pop() when a shutdown
> -         * is pending. */
> -        if (rv == FD_QUEUE_EINTR || !csd) {
> -            continue;
> -        }
> -        process_socket(ptrans, csd, process_slot, thread_slot);
> -        requests_this_child--; /* FIXME: should be synchronized - aaron */
> -        apr_pool_destroy(ptrans);
> +
> +        /* Make ourselves available as a connection-processing worker. */
> +        if ((rv = ap_queue_push(worker_queue, &my_state)) != APR_SUCCESS) {
> +            ap_log_error(APLOG_MARK, APLOG_EMERG, 0, ap_server_conf,
> +                         "ap_stack_push failed with error code %d", rv);
> +        }
> +
> +        /* Because of the way this is architected, we will always have
> +         * a context switch here. It would be neat if we could come up
> +         * with a good way to avoid the call to cond_wait. -aaron
> +         */
> +        while (my_state.state == WORKER_ELEM_IDLE) {
> +            apr_thread_cond_wait(my_state.cond, my_state.mutex);
> +        }
> +        /* Did someone wake us up to notice that it is time to exit? */
> +        if (my_state.state == WORKER_ELEM_QUIT) {
> +            break;
> +        }
> +        else if (my_state.sd != NULL) {
> +            process_socket(my_state.pool, my_state.sd,
> +                           process_slot, thread_slot);
> +            requests_this_child--; /* FIXME: should be synchronized -aaron */
> +        }
> +        apr_pool_clear(my_state.pool);
>      }
>  
>      ap_update_child_status(process_slot, thread_slot,
> -        (dying) ? SERVER_DEAD : SERVER_GRACEFUL, (request_rec *) NULL);
> +        (dying) ? SERVER_DEAD : SERVER_GRACEFUL, (request_rec *)NULL);
> +
> +    if ((rv = apr_thread_cond_destroy(my_state.cond)) != APR_SUCCESS) {
> +        ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
> +                     "apr_thread_cond_destroy");
> +    }
> +    if ((rv = apr_thread_mutex_destroy(my_state.mutex)) != APR_SUCCESS) {
> +        ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
> +                     "apr_thread_mutex_destroy");
> +    }
> +    apr_pool_destroy(my_state.pool);
> +
>      apr_thread_mutex_lock(worker_thread_count_mutex);
>      worker_thread_count--;
>      apr_thread_mutex_unlock(worker_thread_count_mutex);
> Index: server/mpm/worker/fdqueue.c
> ===================================================================
> RCS file: /home/cvs/httpd-2.0/server/mpm/worker/fdqueue.c,v
> retrieving revision 1.9
> diff -u -r1.9 fdqueue.c
> --- server/mpm/worker/fdqueue.c	2001/10/17 16:29:36	1.9
> +++ server/mpm/worker/fdqueue.c	2001/11/23 22:32:47
> @@ -82,7 +82,6 @@
>       * XXX: We should at least try to signal an error here, it is
>       * indicative of a programmer error. -aaron */
>      apr_thread_cond_destroy(queue->not_empty);
> -    apr_thread_cond_destroy(queue->not_full);
>      apr_thread_mutex_destroy(queue->one_big_mutex);
>  
>      return FD_QUEUE_SUCCESS;
> @@ -93,26 +92,21 @@
>   */
>  int ap_queue_init(fd_queue_t *queue, int queue_capacity, apr_pool_t *a) 
>  {
> -    int i;
> -
>      /* FIXME: APRize these return values. */
>      if (apr_thread_mutex_create(&queue->one_big_mutex,
> -                              APR_THREAD_MUTEX_DEFAULT, a) != APR_SUCCESS)
> -        return FD_QUEUE_FAILURE;
> -    if (apr_thread_cond_create(&queue->not_empty, a) != APR_SUCCESS)
> +                                APR_THREAD_MUTEX_DEFAULT, a) != APR_SUCCESS) {
>          return FD_QUEUE_FAILURE;
> -    if (apr_thread_cond_create(&queue->not_full, a) != APR_SUCCESS)
> +    }
> +    if (apr_thread_cond_create(&queue->not_empty, a) != APR_SUCCESS) {
>          return FD_QUEUE_FAILURE;
> +    }
>  
>      queue->tail = 0;
> -    queue->data = apr_palloc(a, queue_capacity * sizeof(fd_queue_elem_t));
> +    queue->data = apr_palloc(a, queue_capacity * sizeof(void*));
>      queue->bounds = queue_capacity;
> -
> -    /* Set all the sockets in the queue to NULL */
> -    for (i = 0; i < queue_capacity; ++i)
> -        queue->data[i].sd = NULL;
>  
> -    apr_pool_cleanup_register(a, queue, ap_queue_destroy, apr_pool_cleanup_null);
> +    apr_pool_cleanup_register(a, queue, ap_queue_destroy,
> +                              apr_pool_cleanup_null);
>  
>      return FD_QUEUE_SUCCESS;
>  }
> @@ -122,23 +116,29 @@
>   * the push operation has completed, it signals other threads waiting
>   * in apr_queue_pop() that they may continue consuming sockets.
>   */
> -int ap_queue_push(fd_queue_t *queue, apr_socket_t *sd, apr_pool_t *p) 
> +int ap_queue_push(fd_queue_t *queue, void *e)
>  {
> -    fd_queue_elem_t *elem;
> -
>      if (apr_thread_mutex_lock(queue->one_big_mutex) != APR_SUCCESS) {
>          return FD_QUEUE_FAILURE;
>      }
>  
> -    while (ap_queue_full(queue)) {
> -        apr_thread_cond_wait(queue->not_full, queue->one_big_mutex);
> +    /* If they push too many, they didn't allocate enough slots
> +     * in the stack, and we treat that as fatal. */
> +    if (ap_queue_full(queue)) {
> +        if (apr_thread_mutex_unlock(queue->one_big_mutex) != APR_SUCCESS) {
> +            return FD_QUEUE_FAILURE;
> +        }
> +        return FD_QUEUE_OVERFLOW;
>      }
>  
> -    elem = &queue->data[queue->tail++];
> -    elem->sd = sd;
> -    elem->p = p;
> +    queue->data[queue->tail++] = e;
>  
> -    apr_thread_cond_signal(queue->not_empty);
> +    /* Only perform the overhead of signaling if we were empty before
> +     * inserting this element.
> +     */
> +    if (1 == queue->tail) {
> +        apr_thread_cond_signal(queue->not_empty);
> +    }
>  
>      if (apr_thread_mutex_unlock(queue->one_big_mutex) != APR_SUCCESS) {
>          return FD_QUEUE_FAILURE;
> @@ -153,16 +153,14 @@
>   * Once retrieved, the socket is placed into the address specified by
>   * 'sd'.
>   */
> -apr_status_t ap_queue_pop(fd_queue_t *queue, apr_socket_t **sd, apr_pool_t **p) 
> +apr_status_t ap_queue_pop(fd_queue_t *queue, void **e)
>  {
> -    fd_queue_elem_t *elem;
> -
>      if (apr_thread_mutex_lock(queue->one_big_mutex) != APR_SUCCESS) {
>          return FD_QUEUE_FAILURE;
>      }
>  
>      /* Keep waiting until we wake up and find that the queue is not empty. */
> -    if (ap_queue_empty(queue)) {
> +    while (ap_queue_empty(queue)) {
>          apr_thread_cond_wait(queue->not_empty, queue->one_big_mutex);
>          /* If we wake up and it's still empty, then we were interrupted */
>          if (ap_queue_empty(queue)) {
> @@ -173,16 +171,7 @@
>          }
>      } 
>      
> -    elem = &queue->data[--queue->tail];
> -    *sd = elem->sd;
> -    *p = elem->p;
> -    elem->sd = NULL;
> -    elem->p = NULL;
> -
> -    /* signal not_full if we were full before this pop */
> -    if (queue->tail == queue->bounds - 1) {
> -        apr_thread_cond_signal(queue->not_full);
> -    }
> +    *e = queue->data[--queue->tail];
>  
>      if (apr_thread_mutex_unlock(queue->one_big_mutex) != APR_SUCCESS) {
>          return FD_QUEUE_FAILURE;
> @@ -197,9 +186,6 @@
>          return FD_QUEUE_FAILURE;
>      }
>      apr_thread_cond_broadcast(queue->not_empty);
> -    /* We shouldn't have multiple threads sitting in not_full, but
> -     * broadcast just in case. */
> -    apr_thread_cond_broadcast(queue->not_full);
>      if (apr_thread_mutex_unlock(queue->one_big_mutex) != APR_SUCCESS) {
>          return FD_QUEUE_FAILURE;
>      }
> Index: server/mpm/worker/fdqueue.h
> ===================================================================
> RCS file: /home/cvs/httpd-2.0/server/mpm/worker/fdqueue.h,v
> retrieving revision 1.9
> diff -u -r1.9 fdqueue.h
> --- server/mpm/worker/fdqueue.h	2001/10/17 16:29:37	1.9
> +++ server/mpm/worker/fdqueue.h	2001/11/23 22:32:47
> @@ -73,29 +73,21 @@
>  #define FD_QUEUE_FAILURE -1 /* Needs to be an invalid file descriptor because
>                                 of queue_pop semantics */
>  #define FD_QUEUE_EINTR APR_EINTR
> +#define FD_QUEUE_OVERFLOW -2
>  
> -struct fd_queue_elem_t {
> -    apr_socket_t      *sd;
> -    apr_pool_t        *p;
> -};
> -typedef struct fd_queue_elem_t fd_queue_elem_t;
> -
>  struct fd_queue_t {
>      int                 tail;
> -    fd_queue_elem_t    *data;
> +    void              **data;
>      int                 bounds;
> -    int                 blanks;
>      apr_thread_mutex_t *one_big_mutex;
>      apr_thread_cond_t  *not_empty;
> -    apr_thread_cond_t  *not_full;
> -    int                 cancel_state;
>  };
>  typedef struct fd_queue_t fd_queue_t;
>  
>  /* FIXME: APRize these -- return values should be apr_status_t */
>  int ap_queue_init(fd_queue_t *queue, int queue_capacity, apr_pool_t *a);
> -int ap_queue_push(fd_queue_t *queue, apr_socket_t *sd, apr_pool_t *p);
> -apr_status_t ap_queue_pop(fd_queue_t *queue, apr_socket_t **sd, apr_pool_t **p);
> -apr_status_t ap_queue_interrupt_all(fd_queue_t *queue);
> +int ap_queue_push(fd_queue_t *queue, void *e);
> +int ap_queue_pop(fd_queue_t *queue, void **e);
> +int ap_queue_interrupt_all(fd_queue_t *queue);
>  
>  #endif /* FDQUEUE_H */
-- 
Ian Holsman          IanH@cnet.com
Performance Measurement & Analysis
CNET Networks   -   (415) 344-2608