You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@httpd.apache.org by Ian Holsman <ia...@cnet.com> on 2001/11/25 01:35:56 UTC
Re: [PATCH] time-space tradeoff (reuse tpool, one CV per worker
thread)
On Fri, 2001-11-23 at 15:01, Aaron Bannert wrote:
> On Fri, Nov 23, 2001 at 11:46:25AM -0800, Brian Pane wrote:
> > Sounds good. I think the "apr_pool_create_for_thread()" function that
> > I proposed earlier this morning will work well in combination with the
> > "time-space-tradeoff" worker design, so I'll continue with the prototyping
> > on the former.
>
> Here's an updated version of my worker redesign. The "queue" is really a
> stack, but I didn't change the name for the sake of having a readable
> patch -- if we end up going with this patch I'll rename everything to
> "stack".
>
Hi Aaron.
I've applied your patch.
and am benchamrking it now.
One thing I noticed is that on startup I get
[error] (4)Interrupted system call: apr_accept
but the server runs fine regardless.
..Ian
> Some preliminary results: uniprocessor sol8/intel hitting /index.html.en
> (ab -c 10 -n 10000):
>
> new code 633.6 r/s (this patch)
> mpstat 5:
> CPU minf mjf xcal intr ithr csw icsw migr smtx srw syscl usr sys wt idl
> 0 6 0 0 4032 3932 6401 2856 0 45 0 21235 31 61 0 8
> 0 7 0 0 3878 3778 6296 2763 0 41 0 20404 32 57 0 10
>
> old code 629.6 r/s (cvs HEAD)
> mpstat 5:
> CPU minf mjf xcal intr ithr csw icsw migr smtx srw syscl usr sys wt idl
> 0 4 0 0 4073 3973 6302 2526 0 57 0 25240 33 59 0 8
> 0 7 0 0 3704 3604 5757 2421 0 43 2 22985 28 55 0 17
>
> I don't expect this small sampling to predict future performance, but at
> least it proves that I didn't make any mistakes in bringing this patch
> back up to date. It also shows that at least under these conditions this
> design performs on par with the current worker code.
>
> -aaron
>
>
> Index: server/mpm/worker/worker.c
> ===================================================================
> RCS file: /home/cvs/httpd-2.0/server/mpm/worker/worker.c,v
> retrieving revision 1.43
> diff -u -r1.43 worker.c
> --- server/mpm/worker/worker.c 2001/11/22 05:13:29 1.43
> +++ server/mpm/worker/worker.c 2001/11/23 22:32:47
> @@ -68,9 +68,10 @@
> #include "apr_strings.h"
> #include "apr_file_io.h"
> #include "apr_thread_proc.h"
> -#include "apr_signal.h"
> #include "apr_thread_mutex.h"
> +#include "apr_thread_cond.h"
> #include "apr_proc_mutex.h"
> +#include "apr_signal.h"
> #define APR_WANT_STRFUNC
> #include "apr_want.h"
>
> @@ -141,6 +142,25 @@
> apr_threadattr_t *threadattr;
> } thread_starter;
>
> +/* State of a particular worker.
> + */
> +typedef enum {
> + WORKER_ELEM_IDLE, /* 0 - idle (ready for another connection) */
> + WORKER_ELEM_BUSY, /* 1 - busy (currently processing a connection) */
> + WORKER_ELEM_QUIT /* 2 - time to quit */
> +} worker_elem_state_e;
> +
> +/* Structure used to keep track of the current state of a particular
> + * worker thread.
> + */
> +typedef struct {
> + apr_pool_t *pool; /* pool to use when calling accept() */
> + apr_socket_t *sd; /* socket returned from accept() */
> + worker_elem_state_e state;
> + apr_thread_mutex_t *mutex;
> + apr_thread_cond_t *cond;
> +} worker_elem_t;
> +
> /*
> * The max child slot ever assigned, preserved across restarts. Necessary
> * to deal with MaxClients changes across AP_SIG_GRACEFUL restarts. We
> @@ -202,8 +222,6 @@
> static void signal_workers(void)
> {
> workers_may_exit = 1;
> - /* XXX: This will happen naturally on a graceful, and we don't care otherwise.
> - ap_queue_signal_all_wakeup(worker_queue); */
> ap_queue_interrupt_all(worker_queue);
> }
>
> @@ -562,9 +580,8 @@
> int process_slot = ti->pid;
> int thread_slot = ti->tid;
> apr_pool_t *tpool = apr_thread_pool_get(thd);
> - void *csd = NULL;
> - apr_pool_t *ptrans; /* Pool for per-transaction stuff */
> - int n;
> + int n, numalive;
> + worker_elem_t *a_worker;
> apr_pollfd_t *pollset;
> apr_status_t rv;
> ap_listen_rec *lr, *last_lr = ap_listeners;
> @@ -639,10 +656,23 @@
> }
> got_fd:
> if (!workers_may_exit) {
> - /* create a new transaction pool for each accepted socket */
> - apr_pool_create(&ptrans, tpool);
> -
> - rv = lr->accept_func(&csd, lr, ptrans);
> + if ((rv = ap_queue_pop(worker_queue, (void **)&a_worker))
> + != APR_SUCCESS) {
> + signal_workers();
> + }
> + if ((rv = apr_thread_mutex_lock(a_worker->mutex))
> + != APR_SUCCESS) {
> + ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
> + "apr_thread_mutex_lock failed. Attempting "
> + "to shutdown process gracefully.");
> + signal_workers();
> + }
> + if ((rv = lr->accept_func(&a_worker->sd, lr, a_worker->pool))
> + != APR_SUCCESS) {
> + a_worker->sd = NULL;
> + ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf,
> + "apr_accept");
> + }
>
> if (rv == APR_EGENERAL) {
> signal_workers();
> @@ -654,17 +684,17 @@
> "shutdown process gracefully.");
> signal_workers();
> }
> - if (csd != NULL) {
> - rv = ap_queue_push(worker_queue, csd, ptrans);
> - if (rv) {
> - /* trash the connection; we couldn't queue the connected
> - * socket to a worker
> - */
> - apr_socket_close(csd);
> - ap_log_error(APLOG_MARK, APLOG_CRIT, 0, ap_server_conf,
> - "ap_queue_push failed with error code %d",
> - rv);
> - }
> +
> + /* Signal worker that it's time to go. */
> + a_worker->state = WORKER_ELEM_BUSY;
> + apr_thread_cond_signal(a_worker->cond);
> +
> + if ((rv = apr_thread_mutex_unlock(a_worker->mutex))
> + != APR_SUCCESS) {
> + ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
> + "apr_proc_mutex_unlock failed. Attempting to "
> + "shutdown process gracefully.");
> + signal_workers();
> }
> }
> else {
> @@ -679,6 +709,24 @@
> }
> }
>
> + /* Kill off the workers in a nice way. */
> + numalive = ap_threads_per_child;
> + while (numalive > 0) {
> + if ((rv = ap_queue_pop(worker_queue, (void *)&a_worker))
> + != APR_SUCCESS) {
> + ap_log_error(APLOG_MARK, APLOG_EMERG, 0, ap_server_conf,
> + "ap_queue_pop failed during shutdown with error "
> + "code %d", rv);
> + }
> + else {
> + apr_thread_mutex_lock(a_worker->mutex);
> + a_worker->state = WORKER_ELEM_QUIT;
> + apr_thread_cond_signal(a_worker->cond);
> + apr_thread_mutex_unlock(a_worker->mutex);
> + }
> + --numalive;
> + }
> +
> ap_update_child_status(process_slot, thread_slot, (dying) ? SERVER_DEAD : SERVER_GRACEFUL,
> (request_rec *) NULL);
> dying = 1;
> @@ -691,35 +739,81 @@
> return NULL;
> }
>
> -static void *worker_thread(apr_thread_t *thd, void * dummy)
> +static void *worker_thread(apr_thread_t *thd, void *dummy)
> {
> proc_info * ti = dummy;
> int process_slot = ti->pid;
> int thread_slot = ti->tid;
> - apr_socket_t *csd = NULL;
> - apr_pool_t *ptrans; /* Pool for per-transaction stuff */
> + apr_pool_t *tpool = apr_thread_pool_get(thd);
> + worker_elem_t my_state;
> apr_status_t rv;
>
> + ap_update_child_status(process_slot, thread_slot, SERVER_STARTING, NULL);
> +
> free(ti);
>
> - ap_update_child_status(process_slot, thread_slot, SERVER_STARTING, NULL);
> - while (!workers_may_exit) {
> + apr_pool_create(&my_state.pool, tpool);
> + if ((rv = apr_thread_mutex_create(&my_state.mutex,
> + APR_THREAD_MUTEX_DEFAULT, tpool))
> + != APR_SUCCESS) {
> + ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
> + "apr_thread_mutex_create");
> + }
> + if ((rv = apr_thread_cond_create(&my_state.cond, tpool))
> + != APR_SUCCESS) {
> + ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
> + "apr_thread_cond_create");
> + }
> +
> + if ((rv = apr_thread_mutex_lock(my_state.mutex)) != APR_SUCCESS) {
> + ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
> + "apr_thread_mutex_lock");
> + }
> +
> + while (1) {
> + my_state.sd = NULL;
> + my_state.state = WORKER_ELEM_IDLE;
> +
> ap_update_child_status(process_slot, thread_slot, SERVER_READY, NULL);
> - rv = ap_queue_pop(worker_queue, &csd, &ptrans);
> - /* We get FD_QUEUE_EINTR whenever ap_queue_pop() has been interrupted
> - * from an explicit call to ap_queue_interrupt_all(). This allows
> - * us to unblock threads stuck in ap_queue_pop() when a shutdown
> - * is pending. */
> - if (rv == FD_QUEUE_EINTR || !csd) {
> - continue;
> - }
> - process_socket(ptrans, csd, process_slot, thread_slot);
> - requests_this_child--; /* FIXME: should be synchronized - aaron */
> - apr_pool_destroy(ptrans);
> +
> + /* Make ourselves available as a connection-processing worker. */
> + if ((rv = ap_queue_push(worker_queue, &my_state)) != APR_SUCCESS) {
> + ap_log_error(APLOG_MARK, APLOG_EMERG, 0, ap_server_conf,
> + "ap_stack_push failed with error code %d", rv);
> + }
> +
> + /* Because of the way this is architected, we will always have
> + * a context switch here. It would be neat if we could come up
> + * with a good way to avoid the call to cond_wait. -aaron
> + */
> + while (my_state.state == WORKER_ELEM_IDLE) {
> + apr_thread_cond_wait(my_state.cond, my_state.mutex);
> + }
> + /* Did someone wake us up to notice that it is time to exit? */
> + if (my_state.state == WORKER_ELEM_QUIT) {
> + break;
> + }
> + else if (my_state.sd != NULL) {
> + process_socket(my_state.pool, my_state.sd,
> + process_slot, thread_slot);
> + requests_this_child--; /* FIXME: should be synchronized -aaron */
> + }
> + apr_pool_clear(my_state.pool);
> }
>
> ap_update_child_status(process_slot, thread_slot,
> - (dying) ? SERVER_DEAD : SERVER_GRACEFUL, (request_rec *) NULL);
> + (dying) ? SERVER_DEAD : SERVER_GRACEFUL, (request_rec *)NULL);
> +
> + if ((rv = apr_thread_cond_destroy(my_state.cond)) != APR_SUCCESS) {
> + ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
> + "apr_thread_cond_destroy");
> + }
> + if ((rv = apr_thread_mutex_destroy(my_state.mutex)) != APR_SUCCESS) {
> + ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
> + "apr_thread_mutex_destroy");
> + }
> + apr_pool_destroy(my_state.pool);
> +
> apr_thread_mutex_lock(worker_thread_count_mutex);
> worker_thread_count--;
> apr_thread_mutex_unlock(worker_thread_count_mutex);
> Index: server/mpm/worker/fdqueue.c
> ===================================================================
> RCS file: /home/cvs/httpd-2.0/server/mpm/worker/fdqueue.c,v
> retrieving revision 1.9
> diff -u -r1.9 fdqueue.c
> --- server/mpm/worker/fdqueue.c 2001/10/17 16:29:36 1.9
> +++ server/mpm/worker/fdqueue.c 2001/11/23 22:32:47
> @@ -82,7 +82,6 @@
> * XXX: We should at least try to signal an error here, it is
> * indicative of a programmer error. -aaron */
> apr_thread_cond_destroy(queue->not_empty);
> - apr_thread_cond_destroy(queue->not_full);
> apr_thread_mutex_destroy(queue->one_big_mutex);
>
> return FD_QUEUE_SUCCESS;
> @@ -93,26 +92,21 @@
> */
> int ap_queue_init(fd_queue_t *queue, int queue_capacity, apr_pool_t *a)
> {
> - int i;
> -
> /* FIXME: APRize these return values. */
> if (apr_thread_mutex_create(&queue->one_big_mutex,
> - APR_THREAD_MUTEX_DEFAULT, a) != APR_SUCCESS)
> - return FD_QUEUE_FAILURE;
> - if (apr_thread_cond_create(&queue->not_empty, a) != APR_SUCCESS)
> + APR_THREAD_MUTEX_DEFAULT, a) != APR_SUCCESS) {
> return FD_QUEUE_FAILURE;
> - if (apr_thread_cond_create(&queue->not_full, a) != APR_SUCCESS)
> + }
> + if (apr_thread_cond_create(&queue->not_empty, a) != APR_SUCCESS) {
> return FD_QUEUE_FAILURE;
> + }
>
> queue->tail = 0;
> - queue->data = apr_palloc(a, queue_capacity * sizeof(fd_queue_elem_t));
> + queue->data = apr_palloc(a, queue_capacity * sizeof(void*));
> queue->bounds = queue_capacity;
> -
> - /* Set all the sockets in the queue to NULL */
> - for (i = 0; i < queue_capacity; ++i)
> - queue->data[i].sd = NULL;
>
> - apr_pool_cleanup_register(a, queue, ap_queue_destroy, apr_pool_cleanup_null);
> + apr_pool_cleanup_register(a, queue, ap_queue_destroy,
> + apr_pool_cleanup_null);
>
> return FD_QUEUE_SUCCESS;
> }
> @@ -122,23 +116,29 @@
> * the push operation has completed, it signals other threads waiting
> * in apr_queue_pop() that they may continue consuming sockets.
> */
> -int ap_queue_push(fd_queue_t *queue, apr_socket_t *sd, apr_pool_t *p)
> +int ap_queue_push(fd_queue_t *queue, void *e)
> {
> - fd_queue_elem_t *elem;
> -
> if (apr_thread_mutex_lock(queue->one_big_mutex) != APR_SUCCESS) {
> return FD_QUEUE_FAILURE;
> }
>
> - while (ap_queue_full(queue)) {
> - apr_thread_cond_wait(queue->not_full, queue->one_big_mutex);
> + /* If they push too many, they didn't allocate enough slots
> + * in the stack, and we treat that as fatal. */
> + if (ap_queue_full(queue)) {
> + if (apr_thread_mutex_unlock(queue->one_big_mutex) != APR_SUCCESS) {
> + return FD_QUEUE_FAILURE;
> + }
> + return FD_QUEUE_OVERFLOW;
> }
>
> - elem = &queue->data[queue->tail++];
> - elem->sd = sd;
> - elem->p = p;
> + queue->data[queue->tail++] = e;
>
> - apr_thread_cond_signal(queue->not_empty);
> + /* Only perform the overhead of signaling if we were empty before
> + * inserting this element.
> + */
> + if (1 == queue->tail) {
> + apr_thread_cond_signal(queue->not_empty);
> + }
>
> if (apr_thread_mutex_unlock(queue->one_big_mutex) != APR_SUCCESS) {
> return FD_QUEUE_FAILURE;
> @@ -153,16 +153,14 @@
> * Once retrieved, the socket is placed into the address specified by
> * 'sd'.
> */
> -apr_status_t ap_queue_pop(fd_queue_t *queue, apr_socket_t **sd, apr_pool_t **p)
> +apr_status_t ap_queue_pop(fd_queue_t *queue, void **e)
> {
> - fd_queue_elem_t *elem;
> -
> if (apr_thread_mutex_lock(queue->one_big_mutex) != APR_SUCCESS) {
> return FD_QUEUE_FAILURE;
> }
>
> /* Keep waiting until we wake up and find that the queue is not empty. */
> - if (ap_queue_empty(queue)) {
> + while (ap_queue_empty(queue)) {
> apr_thread_cond_wait(queue->not_empty, queue->one_big_mutex);
> /* If we wake up and it's still empty, then we were interrupted */
> if (ap_queue_empty(queue)) {
> @@ -173,16 +171,7 @@
> }
> }
>
> - elem = &queue->data[--queue->tail];
> - *sd = elem->sd;
> - *p = elem->p;
> - elem->sd = NULL;
> - elem->p = NULL;
> -
> - /* signal not_full if we were full before this pop */
> - if (queue->tail == queue->bounds - 1) {
> - apr_thread_cond_signal(queue->not_full);
> - }
> + *e = queue->data[--queue->tail];
>
> if (apr_thread_mutex_unlock(queue->one_big_mutex) != APR_SUCCESS) {
> return FD_QUEUE_FAILURE;
> @@ -197,9 +186,6 @@
> return FD_QUEUE_FAILURE;
> }
> apr_thread_cond_broadcast(queue->not_empty);
> - /* We shouldn't have multiple threads sitting in not_full, but
> - * broadcast just in case. */
> - apr_thread_cond_broadcast(queue->not_full);
> if (apr_thread_mutex_unlock(queue->one_big_mutex) != APR_SUCCESS) {
> return FD_QUEUE_FAILURE;
> }
> Index: server/mpm/worker/fdqueue.h
> ===================================================================
> RCS file: /home/cvs/httpd-2.0/server/mpm/worker/fdqueue.h,v
> retrieving revision 1.9
> diff -u -r1.9 fdqueue.h
> --- server/mpm/worker/fdqueue.h 2001/10/17 16:29:37 1.9
> +++ server/mpm/worker/fdqueue.h 2001/11/23 22:32:47
> @@ -73,29 +73,21 @@
> #define FD_QUEUE_FAILURE -1 /* Needs to be an invalid file descriptor because
> of queue_pop semantics */
> #define FD_QUEUE_EINTR APR_EINTR
> +#define FD_QUEUE_OVERFLOW -2
>
> -struct fd_queue_elem_t {
> - apr_socket_t *sd;
> - apr_pool_t *p;
> -};
> -typedef struct fd_queue_elem_t fd_queue_elem_t;
> -
> struct fd_queue_t {
> int tail;
> - fd_queue_elem_t *data;
> + void **data;
> int bounds;
> - int blanks;
> apr_thread_mutex_t *one_big_mutex;
> apr_thread_cond_t *not_empty;
> - apr_thread_cond_t *not_full;
> - int cancel_state;
> };
> typedef struct fd_queue_t fd_queue_t;
>
> /* FIXME: APRize these -- return values should be apr_status_t */
> int ap_queue_init(fd_queue_t *queue, int queue_capacity, apr_pool_t *a);
> -int ap_queue_push(fd_queue_t *queue, apr_socket_t *sd, apr_pool_t *p);
> -apr_status_t ap_queue_pop(fd_queue_t *queue, apr_socket_t **sd, apr_pool_t **p);
> -apr_status_t ap_queue_interrupt_all(fd_queue_t *queue);
> +int ap_queue_push(fd_queue_t *queue, void *e);
> +int ap_queue_pop(fd_queue_t *queue, void **e);
> +int ap_queue_interrupt_all(fd_queue_t *queue);
>
> #endif /* FDQUEUE_H */
--
Ian Holsman IanH@cnet.com
Performance Measurement & Analysis
CNET Networks - (415) 344-2608