You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@httpd.apache.org by Aaron Bannert <aa...@clove.org> on 2001/10/11 02:27:20 UTC
[PATCH] "time-space tradeoff" one cond-var per worker patch
This is the patch I promised a few weeks ago for the worker MPM. I've
done enough testing locally to convince me that it is fairly stable
and comparable to what we have in CVS, so now I need people with
multiprocessor machines to give it a go.
This patch is for review/test only, please do not commit (yet).
On my uniprocessor it is performing slightly slower (req/sec) than
current CVS, but it does seem to require fewer context switches as well
as reduced load. It is hard to tell if the fewer context switches/load
is an indication of better scalability or merely higher contention.
At this point the best way to test this will be to throw some "real-world"
load at it, and compare against prefork and the current CVS worker.
Patch Details:
- removes accept-queue, replaces it with a ready-workers-stack
- each worker gets a condition variable, sleeps until it is given a socket
- listener will no longer accept more connections than we are prepared
to process (which fixes the bug where we do a graceful restart while
the accept queue contains not-yet-processed connections, since those
connections will never be processed)
- this architecture truly reuses the most recently used worker thread,
which may vastly improve cache hits over current CVS which only
partially implements the stack behaviour (* if you have chips with
large caches, please compare this patch against CVS worker and prefork
:) (Also note that this dramatically conserves memory on lower-load
servers, since we will only populate as many pools as we need. I have
observed lower memory usage.)
- reuses transaction pools
Tested on Solaris 8/ia32 and Linux 2.4.3/i686.
-aaron
Index: server/mpm/worker/fdqueue.c
===================================================================
RCS file: /home/cvspublic/httpd-2.0/server/mpm/worker/fdqueue.c,v
retrieving revision 1.8
diff -u -r1.8 fdqueue.c
--- server/mpm/worker/fdqueue.c 2001/10/01 19:37:20 1.8
+++ server/mpm/worker/fdqueue.c 2001/10/10 23:42:56
@@ -59,148 +59,132 @@
#include "fdqueue.h"
/**
- * Detects when the fd_queue_t is full. This utility function is expected
+ * Detects when the fd_stack_t is full. This utility function is expected
* to be called from within critical sections, and is not threadsafe.
*/
-#define ap_queue_full(queue) ((queue)->tail == (queue)->bounds)
+#define ap_stack_full(stack) ((stack)->top == (stack)->bounds)
/**
- * Detects when the fd_queue_t is empty. This utility function is expected
+ * Detects when the fd_stack_t is empty. This utility function is expected
* to be called from within critical sections, and is not threadsafe.
*/
-#define ap_queue_empty(queue) ((queue)->tail == 0)
+#define ap_stack_empty(stack) ((stack)->top == 0)
/**
* Callback routine that is called to destroy this
- * fd_queue_t when it's pool is destroyed.
+ * fd_stack_t when it's pool is destroyed.
*/
-static apr_status_t ap_queue_destroy(void *data)
+static apr_status_t ap_stack_destroy(void *data)
{
- fd_queue_t *queue = data;
+ fd_stack_t *stack = data;
/* Ignore errors here, we can't do anything about them anyway.
* XXX: We should at least try to signal an error here, it is
* indicative of a programmer error. -aaron */
- pthread_cond_destroy(&queue->not_empty);
- pthread_cond_destroy(&queue->not_full);
- pthread_mutex_destroy(&queue->one_big_mutex);
+ pthread_cond_destroy(&stack->not_empty);
+ pthread_mutex_destroy(&stack->one_big_mutex);
- return FD_QUEUE_SUCCESS;
+ return FD_STACK_SUCCESS;
}
/**
- * Initialize the fd_queue_t.
+ * Initialize the fd_stack_t.
*/
-int ap_queue_init(fd_queue_t *queue, int queue_capacity, apr_pool_t *a)
+int ap_stack_init(fd_stack_t *stack, int stack_capacity, apr_pool_t *a)
{
- int i;
+ if (pthread_mutex_init(&stack->one_big_mutex, NULL) != 0)
+ return FD_STACK_FAILURE;
+ if (pthread_cond_init(&stack->not_empty, NULL) != 0)
+ return FD_STACK_FAILURE;
+
+ stack->top = 0;
+ stack->data = apr_palloc(a, stack_capacity * sizeof(void*));
+ stack->bounds = stack_capacity;
- if (pthread_mutex_init(&queue->one_big_mutex, NULL) != 0)
- return FD_QUEUE_FAILURE;
- if (pthread_cond_init(&queue->not_empty, NULL) != 0)
- return FD_QUEUE_FAILURE;
- if (pthread_cond_init(&queue->not_full, NULL) != 0)
- return FD_QUEUE_FAILURE;
-
- queue->tail = 0;
- queue->data = apr_palloc(a, queue_capacity * sizeof(fd_queue_elem_t));
- queue->bounds = queue_capacity;
-
- /* Set all the sockets in the queue to NULL */
- for (i = 0; i < queue_capacity; ++i)
- queue->data[i].sd = NULL;
+ apr_pool_cleanup_register(a, stack, ap_stack_destroy, apr_pool_cleanup_null);
- apr_pool_cleanup_register(a, queue, ap_queue_destroy, apr_pool_cleanup_null);
-
- return FD_QUEUE_SUCCESS;
+ return FD_STACK_SUCCESS;
}
/**
- * Push a new socket onto the queue. Blocks if the queue is full. Once
+ * Push a new socket onto the stack. Blocks if the stack is full. Once
* the push operation has completed, it signals other threads waiting
- * in apr_queue_pop() that they may continue consuming sockets.
+ * in apr_stack_pop() that they may continue consuming sockets.
*/
-int ap_queue_push(fd_queue_t *queue, apr_socket_t *sd, apr_pool_t *p)
+int ap_stack_push(fd_stack_t *stack, void *e)
{
- fd_queue_elem_t *elem;
-
- if (pthread_mutex_lock(&queue->one_big_mutex) != 0) {
- return FD_QUEUE_FAILURE;
+ if (pthread_mutex_lock(&stack->one_big_mutex) != 0) {
+ return FD_STACK_FAILURE;
}
- while (ap_queue_full(queue)) {
- pthread_cond_wait(&queue->not_full, &queue->one_big_mutex);
+ /* If they push too many, they didn't allocate tnough slots
+ * in the stack, so fatal error.
+ */
+ if (ap_stack_full(stack)) {
+ if (pthread_mutex_unlock(&stack->one_big_mutex) != 0) {
+ return FD_STACK_FAILURE;
+ }
+ return FD_STACK_OVERFLOW;
}
- elem = &queue->data[queue->tail++];
- elem->sd = sd;
- elem->p = p;
+ stack->data[stack->top++] = e;
- pthread_cond_signal(&queue->not_empty);
+ /* only perform the overhead of signaling if we were empty before
+ * inserting this element.
+ */
+ if (1 == stack->top) {
+ pthread_cond_signal(&stack->not_empty);
+ }
- if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) {
- return FD_QUEUE_FAILURE;
+ if (pthread_mutex_unlock(&stack->one_big_mutex) != 0) {
+ return FD_STACK_FAILURE;
}
- return FD_QUEUE_SUCCESS;
+ return FD_STACK_SUCCESS;
}
/**
- * Retrieves the next available socket from the queue. If there are no
+ * Retrieves the next available socket from the stack. If there are no
* sockets available, it will block until one becomes available.
* Once retrieved, the socket is placed into the address specified by
* 'sd'.
*/
-apr_status_t ap_queue_pop(fd_queue_t *queue, apr_socket_t **sd, apr_pool_t **p)
+int ap_stack_pop(fd_stack_t *stack, void **e)
{
- fd_queue_elem_t *elem;
-
- if (pthread_mutex_lock(&queue->one_big_mutex) != 0) {
- return FD_QUEUE_FAILURE;
+ if (pthread_mutex_lock(&stack->one_big_mutex) != 0) {
+ return FD_STACK_FAILURE;
}
- /* Keep waiting until we wake up and find that the queue is not empty. */
- if (ap_queue_empty(queue)) {
- pthread_cond_wait(&queue->not_empty, &queue->one_big_mutex);
+ /* Keep waiting until we wake up and find that the stack is not empty. */
+ while (ap_stack_empty(stack)) {
+ pthread_cond_wait(&stack->not_empty, &stack->one_big_mutex);
/* If we wake up and it's still empty, then we were interrupted */
- if (ap_queue_empty(queue)) {
- if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) {
- return FD_QUEUE_FAILURE;
+ if (ap_stack_empty(stack)) {
+ if (pthread_mutex_unlock(&stack->one_big_mutex) != 0) {
+ return FD_STACK_FAILURE;
}
- return FD_QUEUE_EINTR;
+ return FD_STACK_EINTR;
}
}
- elem = &queue->data[--queue->tail];
- *sd = elem->sd;
- *p = elem->p;
- elem->sd = NULL;
- elem->p = NULL;
-
- /* signal not_full if we were full before this pop */
- if (queue->tail == queue->bounds - 1) {
- pthread_cond_signal(&queue->not_full);
- }
+ *e = stack->data[--stack->top];
- if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) {
- return FD_QUEUE_FAILURE;
+ if (pthread_mutex_unlock(&stack->one_big_mutex) != 0) {
+ return FD_STACK_FAILURE;
}
return APR_SUCCESS;
}
-apr_status_t ap_queue_interrupt_all(fd_queue_t *queue)
+int ap_stack_interrupt_all(fd_stack_t *stack)
{
- if (pthread_mutex_lock(&queue->one_big_mutex) != 0) {
- return FD_QUEUE_FAILURE;
+ if (pthread_mutex_lock(&stack->one_big_mutex) != 0) {
+ return FD_STACK_FAILURE;
}
- pthread_cond_broadcast(&queue->not_empty);
- /* We shouldn't have multiple threads sitting in not_full, but
- * broadcast just in case. */
- pthread_cond_broadcast(&queue->not_full);
- if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) {
- return FD_QUEUE_FAILURE;
+ pthread_cond_broadcast(&stack->not_empty);
+ if (pthread_mutex_unlock(&stack->one_big_mutex) != 0) {
+ return FD_STACK_FAILURE;
}
- return FD_QUEUE_SUCCESS;
+ return FD_STACK_SUCCESS;
}
Index: server/mpm/worker/fdqueue.h
===================================================================
RCS file: /home/cvspublic/httpd-2.0/server/mpm/worker/fdqueue.h,v
retrieving revision 1.8
diff -u -r1.8 fdqueue.h
--- server/mpm/worker/fdqueue.h 2001/10/01 19:37:20 1.8
+++ server/mpm/worker/fdqueue.h 2001/10/10 23:42:56
@@ -68,32 +68,24 @@
#include <sys/socket.h>
#include <apr_errno.h>
-#define FD_QUEUE_SUCCESS 0
-#define FD_QUEUE_FAILURE -1 /* Needs to be an invalid file descriptor because
- of queue_pop semantics */
-#define FD_QUEUE_EINTR APR_EINTR
+#define FD_STACK_SUCCESS 0
+#define FD_STACK_FAILURE -1 /* Needs to be an invalid file descriptor because
+ of stack_pop semantics */
+#define FD_STACK_EINTR APR_EINTR
+#define FD_STACK_OVERFLOW -2
-struct fd_queue_elem_t {
- apr_socket_t *sd;
- apr_pool_t *p;
-};
-typedef struct fd_queue_elem_t fd_queue_elem_t;
-
-struct fd_queue_t {
- int tail;
- fd_queue_elem_t *data;
+struct fd_stack_t {
+ int top;
+ void **data;
int bounds;
- int blanks;
pthread_mutex_t one_big_mutex;
pthread_cond_t not_empty;
- pthread_cond_t not_full;
- int cancel_state;
};
-typedef struct fd_queue_t fd_queue_t;
+typedef struct fd_stack_t fd_stack_t;
-int ap_queue_init(fd_queue_t *queue, int queue_capacity, apr_pool_t *a);
-int ap_queue_push(fd_queue_t *queue, apr_socket_t *sd, apr_pool_t *p);
-apr_status_t ap_queue_pop(fd_queue_t *queue, apr_socket_t **sd, apr_pool_t **p);
-apr_status_t ap_queue_interrupt_all(fd_queue_t *queue);
+int ap_stack_init(fd_stack_t *stack, int stack_capacity, apr_pool_t *a);
+int ap_stack_push(fd_stack_t *stack, void *e);
+int ap_stack_pop(fd_stack_t *stack, void **e);
+int ap_stack_interrupt_all(fd_stack_t *stack);
#endif /* FDQUEUE_H */
Index: server/mpm/worker/worker.c
===================================================================
RCS file: /home/cvspublic/httpd-2.0/server/mpm/worker/worker.c,v
retrieving revision 1.27
diff -u -r1.27 worker.c
--- server/mpm/worker/worker.c 2001/09/24 06:42:12 1.27
+++ server/mpm/worker/worker.c 2001/10/10 23:43:02
@@ -68,6 +68,8 @@
#include "apr_strings.h"
#include "apr_file_io.h"
#include "apr_thread_proc.h"
+#include "apr_thread_mutex.h"
+#include "apr_thread_cond.h"
#include "apr_signal.h"
#define APR_WANT_STRFUNC
#include "apr_want.h"
@@ -122,7 +124,7 @@
static int requests_this_child;
static int num_listensocks = 0;
static apr_socket_t **listensocks;
-static fd_queue_t *worker_queue;
+static fd_stack_t *worker_stack;
/* The structure used to pass unique initialization info to each thread */
typedef struct {
@@ -140,6 +142,25 @@
apr_threadattr_t *threadattr;
} thread_starter;
+/* State of a particular worker.
+ */
+typedef enum {
+ WORKER_ELEM_IDLE, /* 0 - idle (ready for another connection) */
+ WORKER_ELEM_BUSY, /* 1 - busy (currently processing a connection) */
+ WORKER_ELEM_QUIT /* 2 - time to quit */
+} worker_elem_state_e;
+
+/* Structure used to keep track of the current state of a particular
+ * worker thread.
+ */
+typedef struct {
+ apr_pool_t *pool; /* pool to use when calling accept() */
+ apr_socket_t *sd; /* socket returned from accept() */
+ worker_elem_state_e state;
+ apr_thread_mutex_t *mutex;
+ apr_thread_cond_t *cond;
+} worker_elem_t;
+
/*
* The max child slot ever assigned, preserved across restarts. Necessary
* to deal with MaxClients changes across AP_SIG_GRACEFUL restarts. We
@@ -201,9 +222,7 @@
static void signal_workers(void)
{
workers_may_exit = 1;
- /* XXX: This will happen naturally on a graceful, and we don't care otherwise.
- ap_queue_signal_all_wakeup(worker_queue); */
- ap_queue_interrupt_all(worker_queue);
+ ap_stack_interrupt_all(worker_stack);
}
AP_DECLARE(apr_status_t) ap_mpm_query(int query_code, int *result)
@@ -557,10 +576,9 @@
int process_slot = ti->pid;
int thread_slot = ti->tid;
apr_pool_t *tpool = apr_thread_pool_get(thd);
- apr_socket_t *csd = NULL;
- apr_pool_t *ptrans; /* Pool for per-transaction stuff */
apr_socket_t *sd = NULL;
- int n;
+ worker_elem_t *a_worker;
+ int n, numalive;
int curr_pollfd, last_pollfd = 0;
apr_pollfd_t *pollset;
apr_status_t rv;
@@ -643,11 +661,20 @@
}
got_fd:
if (!workers_may_exit) {
- /* create a new transaction pool for each accepted socket */
- apr_pool_create(&ptrans, tpool);
-
- if ((rv = apr_accept(&csd, sd, ptrans)) != APR_SUCCESS) {
- csd = NULL;
+ if ((rv = ap_stack_pop(worker_stack, (void **)&a_worker))
+ != APR_SUCCESS) {
+ signal_workers();
+ }
+ if ((rv = apr_thread_mutex_lock(a_worker->mutex))
+ != APR_SUCCESS) {
+ ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
+ "apr_thread_mutex_lock failed. Attempting "
+ "to shutdown process gracefully.");
+ signal_workers();
+ }
+ if ((rv = apr_accept(&a_worker->sd, sd, a_worker->pool))
+ != APR_SUCCESS) {
+ a_worker->sd = NULL;
ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf,
"apr_accept");
}
@@ -658,17 +685,16 @@
"process gracefully.");
signal_workers();
}
- if (csd != NULL) {
- rv = ap_queue_push(worker_queue, csd, ptrans);
- if (rv) {
- /* trash the connection; we couldn't queue the connected
- * socket to a worker
- */
- apr_socket_close(csd);
- ap_log_error(APLOG_MARK, APLOG_CRIT, 0, ap_server_conf,
- "ap_queue_push failed with error code %d",
- rv);
- }
+
+ /* signal worker that it's ok to go */
+ a_worker->state = WORKER_ELEM_BUSY;
+ apr_thread_cond_signal(a_worker->cond);
+ if ((rv = apr_thread_mutex_unlock(a_worker->mutex))
+ != APR_SUCCESS) {
+ ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
+ "apr_thread_mutex_unlock failed. Attempting "
+ "to shutdown process gracefully.");
+ signal_workers();
}
}
else {
@@ -683,6 +709,24 @@
}
}
+ /* Kill off the workers in a nice way */
+ numalive = ap_threads_per_child;
+ while (numalive > 0) {
+ if ((rv = ap_stack_pop(worker_stack, (void *)&a_worker))
+ != APR_SUCCESS) {
+ ap_log_error(APLOG_MARK, APLOG_EMERG, 0, ap_server_conf,
+ "ap_queue_pop failed during shutdown with error "
+ "code %d", rv);
+ }
+ else {
+ apr_thread_mutex_lock(a_worker->mutex);
+ a_worker->state = WORKER_ELEM_QUIT;
+ apr_thread_cond_signal(a_worker->cond);
+ apr_thread_mutex_unlock(a_worker->mutex);
+ }
+ --numalive;
+ }
+
ap_update_child_status(process_slot, thread_slot, (dying) ? SERVER_DEAD : SERVER_GRACEFUL,
(request_rec *) NULL);
dying = 1;
@@ -695,35 +739,85 @@
return NULL;
}
-static void *worker_thread(apr_thread_t *thd, void * dummy)
+static void *worker_thread(apr_thread_t *thd, void *dummy)
{
- proc_info * ti = dummy;
+ proc_info *ti = dummy;
int process_slot = ti->pid;
int thread_slot = ti->tid;
- apr_socket_t *csd = NULL;
- apr_pool_t *ptrans; /* Pool for per-transaction stuff */
+ apr_pool_t *tpool = apr_thread_pool_get(thd);
+ worker_elem_t my_state;
apr_status_t rv;
+ ap_update_child_status(process_slot, thread_slot, SERVER_STARTING, NULL);
+
free(ti);
- ap_update_child_status(process_slot, thread_slot, SERVER_STARTING, NULL);
- while (!workers_may_exit) {
+ apr_pool_create(&my_state.pool, tpool);
+ if ((rv = apr_thread_mutex_create(&my_state.mutex, tpool))
+ != APR_SUCCESS) {
+ ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
+ "apr_thread_mutex_create");
+ }
+ if ((rv = apr_thread_cond_create(&my_state.cond, tpool))
+ != APR_SUCCESS) {
+ ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
+ "apr_thread_cond_create");
+ }
+
+ if ((rv = apr_thread_mutex_lock(my_state.mutex)) != APR_SUCCESS) {
+ ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
+ "apr_thread_mutex_lock");
+ }
+
+ while (1) {
+ my_state.sd = NULL;
+ my_state.state = WORKER_ELEM_IDLE;
+
ap_update_child_status(process_slot, thread_slot, SERVER_READY, NULL);
- rv = ap_queue_pop(worker_queue, &csd, &ptrans);
- /* We get FD_QUEUE_EINTR whenever ap_queue_pop() has been interrupted
- * from an explicit call to ap_queue_interrupt_all(). This allows
- * us to unblock threads stuck in ap_queue_pop() when a shutdown
- * is pending. */
- if (rv == FD_QUEUE_EINTR || !csd) {
- continue;
- }
- process_socket(ptrans, csd, process_slot, thread_slot);
- requests_this_child--; /* FIXME: should be synchronized - aaron */
- apr_pool_destroy(ptrans);
+
+ /* Make ourselves available as a connection processing worker. */
+ if ((rv = ap_stack_push(worker_stack, &my_state)) != APR_SUCCESS) {
+ ap_log_error(APLOG_MARK, APLOG_EMERG, 0, ap_server_conf,
+ "ap_stack_push failed with error code %d", rv);
+ }
+
+ /* Because of the way this is architected, we will always have
+ * a context switch here. It would be neat if we came up with
+ * a good way to avoid the call to cond_wait, though.
+ */
+
+ while (my_state.state == WORKER_ELEM_IDLE) {
+ apr_thread_cond_wait(my_state.cond, my_state.mutex);
+ }
+ /* did someone wake us up to cnotice that it's time to exit? */
+ if (my_state.state == WORKER_ELEM_QUIT) {
+ break;
+ }
+ else if (my_state.sd != NULL) {
+ process_socket(my_state.pool, my_state.sd,
+ process_slot, thread_slot);
+ requests_this_child--; /* FIXME: should be synchronized - aaron */
+ }
+ apr_pool_clear(my_state.pool);
+ }
+
+ if ((rv = apr_thread_mutex_unlock(my_state.mutex)) != APR_SUCCESS) {
+ ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
+ "apr_thread_mutex_unlock");
}
ap_update_child_status(process_slot, thread_slot,
- (dying) ? SERVER_DEAD : SERVER_GRACEFUL, (request_rec *) NULL);
+ (dying) ? SERVER_DEAD : SERVER_GRACEFUL, (request_rec *)NULL);
+ if ((rv = apr_thread_cond_destroy(my_state.cond)) != APR_SUCCESS) {
+ ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
+ "apr_thread_cond_destroy");
+ }
+ if ((rv = apr_thread_mutex_destroy(my_state.mutex)) != APR_SUCCESS) {
+ ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
+ "apr_thread_mutex_destroy");
+ }
+ apr_pool_destroy(my_state.pool);
+
apr_lock_acquire(worker_thread_count_mutex);
worker_thread_count--;
apr_lock_release(worker_thread_count_mutex);
@@ -757,8 +851,8 @@
/* We must create the fd queues before we start up the listener
* and worker threads. */
- worker_queue = apr_pcalloc(pchild, sizeof(*worker_queue));
- ap_queue_init(worker_queue, ap_threads_per_child, pchild);
+ worker_stack = apr_pcalloc(pchild, sizeof(*worker_stack));
+ ap_stack_init(worker_stack, ap_threads_per_child, pchild);
my_info = (proc_info *)malloc(sizeof(proc_info));
my_info->pid = my_child_num;