You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@httpd.apache.org by Aaron Bannert <aa...@clove.org> on 2001/10/11 02:27:20 UTC

[PATCH] "time-space tradeoff" one cond-var per worker patch

This is the patch I promised a few weeks ago for the worker MPM. I've
done enough testing locally to convince me that it is fairly stable
and comparable to what we have in CVS, so now I need people with
multiprocessor machines to give it a go.

This patch is for review/test only, please do not commit (yet).

On my uniprocessor it is performing slightly slower (req/sec) than
current CVS, but it does seem to require fewer context switches as well
as reduced load. It is hard to tell if the fewer context switches/load
is an indication of better scalability or merely higher contention.
At this point the best way to test this will be to throw some "real-world"
load at it, and compare against prefork and the current CVS worker.

Patch Details:

- removes accept-queue, replaces it with a ready-workers-stack
- each worker gets a condition variable, sleeps until it is given a socket
- listener will no longer accept more connections than we are prepared
  to process (which fixes the bug where we do a graceful restart while
  the accept queue contains not-yet-processed connections, since those
  connections will never be processed)
- this architecture truly reuses the most recently used worker thread,
  which may vastly improve cache hits over current CVS which only
  partially implements the stack behaviour (* if you have chips with
  large caches, please compare this patch against CVS worker and prefork
  :) (Also note that this dramatically conserves memory on lower-load
  servers, since we will only populate as many pools as we need. I have
  observed lower memory usage.)
- reuses transaction pools

Tested on Solaris 8/ia32 and Linux 2.4.3/i686.

-aaron


Index: server/mpm/worker/fdqueue.c
===================================================================
RCS file: /home/cvspublic/httpd-2.0/server/mpm/worker/fdqueue.c,v
retrieving revision 1.8
diff -u -r1.8 fdqueue.c
--- server/mpm/worker/fdqueue.c	2001/10/01 19:37:20	1.8
+++ server/mpm/worker/fdqueue.c	2001/10/10 23:42:56
@@ -59,148 +59,132 @@
 #include "fdqueue.h"
 
 /**
- * Detects when the fd_queue_t is full. This utility function is expected
+ * Detects when the fd_stack_t is full. This utility function is expected
  * to be called from within critical sections, and is not threadsafe.
  */
-#define ap_queue_full(queue) ((queue)->tail == (queue)->bounds)
+#define ap_stack_full(stack) ((stack)->top == (stack)->bounds)
 
 /**
- * Detects when the fd_queue_t is empty. This utility function is expected
+ * Detects when the fd_stack_t is empty. This utility function is expected
  * to be called from within critical sections, and is not threadsafe.
  */
-#define ap_queue_empty(queue) ((queue)->tail == 0)
+#define ap_stack_empty(stack) ((stack)->top == 0)
 
 /**
  * Callback routine that is called to destroy this
- * fd_queue_t when it's pool is destroyed.
+ * fd_stack_t when it's pool is destroyed.
  */
-static apr_status_t ap_queue_destroy(void *data) 
+static apr_status_t ap_stack_destroy(void *data) 
 {
-    fd_queue_t *queue = data;
+    fd_stack_t *stack = data;
 
     /* Ignore errors here, we can't do anything about them anyway.
      * XXX: We should at least try to signal an error here, it is
      * indicative of a programmer error. -aaron */
-    pthread_cond_destroy(&queue->not_empty);
-    pthread_cond_destroy(&queue->not_full);
-    pthread_mutex_destroy(&queue->one_big_mutex);
+    pthread_cond_destroy(&stack->not_empty);
+    pthread_mutex_destroy(&stack->one_big_mutex);
 
-    return FD_QUEUE_SUCCESS;
+    return FD_STACK_SUCCESS;
 }
 
 /**
- * Initialize the fd_queue_t.
+ * Initialize the fd_stack_t.
  */
-int ap_queue_init(fd_queue_t *queue, int queue_capacity, apr_pool_t *a) 
+int ap_stack_init(fd_stack_t *stack, int stack_capacity, apr_pool_t *a) 
 {
-    int i;
+    if (pthread_mutex_init(&stack->one_big_mutex, NULL) != 0)
+        return FD_STACK_FAILURE;
+    if (pthread_cond_init(&stack->not_empty, NULL) != 0)
+        return FD_STACK_FAILURE;
+
+    stack->top = 0;
+    stack->data = apr_palloc(a, stack_capacity * sizeof(void*));
+    stack->bounds = stack_capacity;
 
-    if (pthread_mutex_init(&queue->one_big_mutex, NULL) != 0)
-        return FD_QUEUE_FAILURE;
-    if (pthread_cond_init(&queue->not_empty, NULL) != 0)
-        return FD_QUEUE_FAILURE;
-    if (pthread_cond_init(&queue->not_full, NULL) != 0)
-        return FD_QUEUE_FAILURE;
-
-    queue->tail = 0;
-    queue->data = apr_palloc(a, queue_capacity * sizeof(fd_queue_elem_t));
-    queue->bounds = queue_capacity;
-
-    /* Set all the sockets in the queue to NULL */
-    for (i = 0; i < queue_capacity; ++i)
-        queue->data[i].sd = NULL;
+    apr_pool_cleanup_register(a, stack, ap_stack_destroy, apr_pool_cleanup_null);
 
-    apr_pool_cleanup_register(a, queue, ap_queue_destroy, apr_pool_cleanup_null);
-
-    return FD_QUEUE_SUCCESS;
+    return FD_STACK_SUCCESS;
 }
 
 /**
- * Push a new socket onto the queue. Blocks if the queue is full. Once
+ * Push a new socket onto the stack. Blocks if the stack is full. Once
  * the push operation has completed, it signals other threads waiting
- * in apr_queue_pop() that they may continue consuming sockets.
+ * in apr_stack_pop() that they may continue consuming sockets.
  */
-int ap_queue_push(fd_queue_t *queue, apr_socket_t *sd, apr_pool_t *p) 
+int ap_stack_push(fd_stack_t *stack, void *e)
 {
-    fd_queue_elem_t *elem;
-
-    if (pthread_mutex_lock(&queue->one_big_mutex) != 0) {
-        return FD_QUEUE_FAILURE;
+    if (pthread_mutex_lock(&stack->one_big_mutex) != 0) {
+        return FD_STACK_FAILURE;
     }
 
-    while (ap_queue_full(queue)) {
-        pthread_cond_wait(&queue->not_full, &queue->one_big_mutex);
+    /* If they push too many, they didn't allocate tnough slots
+     * in the stack, so fatal error.
+     */
+    if (ap_stack_full(stack)) {
+        if (pthread_mutex_unlock(&stack->one_big_mutex) != 0) {
+            return FD_STACK_FAILURE;
+        }
+        return FD_STACK_OVERFLOW;
     }
 
-    elem = &queue->data[queue->tail++];
-    elem->sd = sd;
-    elem->p = p;
+    stack->data[stack->top++] = e;
 
-    pthread_cond_signal(&queue->not_empty);
+    /* only perform the overhead of signaling if we were empty before
+     * inserting this element.
+     */
+    if (1 == stack->top) {
+        pthread_cond_signal(&stack->not_empty);
+    }
 
-    if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) {
-        return FD_QUEUE_FAILURE;
+    if (pthread_mutex_unlock(&stack->one_big_mutex) != 0) {
+        return FD_STACK_FAILURE;
     }
 
-    return FD_QUEUE_SUCCESS;
+    return FD_STACK_SUCCESS;
 }
 
 /**
- * Retrieves the next available socket from the queue. If there are no
+ * Retrieves the next available socket from the stack. If there are no
  * sockets available, it will block until one becomes available.
  * Once retrieved, the socket is placed into the address specified by
  * 'sd'.
  */
-apr_status_t ap_queue_pop(fd_queue_t *queue, apr_socket_t **sd, apr_pool_t **p) 
+int ap_stack_pop(fd_stack_t *stack, void **e)
 {
-    fd_queue_elem_t *elem;
-
-    if (pthread_mutex_lock(&queue->one_big_mutex) != 0) {
-        return FD_QUEUE_FAILURE;
+    if (pthread_mutex_lock(&stack->one_big_mutex) != 0) {
+        return FD_STACK_FAILURE;
     }
 
-    /* Keep waiting until we wake up and find that the queue is not empty. */
-    if (ap_queue_empty(queue)) {
-        pthread_cond_wait(&queue->not_empty, &queue->one_big_mutex);
+    /* Keep waiting until we wake up and find that the stack is not empty. */
+    while (ap_stack_empty(stack)) {
+        pthread_cond_wait(&stack->not_empty, &stack->one_big_mutex);
         /* If we wake up and it's still empty, then we were interrupted */
-        if (ap_queue_empty(queue)) {
-            if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) {
-                return FD_QUEUE_FAILURE;
+        if (ap_stack_empty(stack)) {
+            if (pthread_mutex_unlock(&stack->one_big_mutex) != 0) {
+                return FD_STACK_FAILURE;
             }
-            return FD_QUEUE_EINTR;
+            return FD_STACK_EINTR;
         }
     } 
     
-    elem = &queue->data[--queue->tail];
-    *sd = elem->sd;
-    *p = elem->p;
-    elem->sd = NULL;
-    elem->p = NULL;
-
-    /* signal not_full if we were full before this pop */
-    if (queue->tail == queue->bounds - 1) {
-        pthread_cond_signal(&queue->not_full);
-    }
+    *e = stack->data[--stack->top];
 
-    if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) {
-        return FD_QUEUE_FAILURE;
+    if (pthread_mutex_unlock(&stack->one_big_mutex) != 0) {
+        return FD_STACK_FAILURE;
     }
 
     return APR_SUCCESS;
 }
 
-apr_status_t ap_queue_interrupt_all(fd_queue_t *queue)
+int ap_stack_interrupt_all(fd_stack_t *stack)
 {
-    if (pthread_mutex_lock(&queue->one_big_mutex) != 0) {
-        return FD_QUEUE_FAILURE;
+    if (pthread_mutex_lock(&stack->one_big_mutex) != 0) {
+        return FD_STACK_FAILURE;
     }
-    pthread_cond_broadcast(&queue->not_empty);
-    /* We shouldn't have multiple threads sitting in not_full, but
-     * broadcast just in case. */
-    pthread_cond_broadcast(&queue->not_full);
-    if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) {
-        return FD_QUEUE_FAILURE;
+    pthread_cond_broadcast(&stack->not_empty);
+    if (pthread_mutex_unlock(&stack->one_big_mutex) != 0) {
+        return FD_STACK_FAILURE;
     }
-    return FD_QUEUE_SUCCESS;
+    return FD_STACK_SUCCESS;
 }
 
Index: server/mpm/worker/fdqueue.h
===================================================================
RCS file: /home/cvspublic/httpd-2.0/server/mpm/worker/fdqueue.h,v
retrieving revision 1.8
diff -u -r1.8 fdqueue.h
--- server/mpm/worker/fdqueue.h	2001/10/01 19:37:20	1.8
+++ server/mpm/worker/fdqueue.h	2001/10/10 23:42:56
@@ -68,32 +68,24 @@
 #include <sys/socket.h>
 #include <apr_errno.h>
 
-#define FD_QUEUE_SUCCESS 0
-#define FD_QUEUE_FAILURE -1 /* Needs to be an invalid file descriptor because
-                               of queue_pop semantics */
-#define FD_QUEUE_EINTR APR_EINTR
+#define FD_STACK_SUCCESS 0
+#define FD_STACK_FAILURE -1 /* Needs to be an invalid file descriptor because
+                               of stack_pop semantics */
+#define FD_STACK_EINTR APR_EINTR
+#define FD_STACK_OVERFLOW -2
 
-struct fd_queue_elem_t {
-    apr_socket_t      *sd;
-    apr_pool_t        *p;
-};
-typedef struct fd_queue_elem_t fd_queue_elem_t;
-
-struct fd_queue_t {
-    int                tail;
-    fd_queue_elem_t   *data;
+struct fd_stack_t {
+    int                top;
+    void             **data;
     int                bounds;
-    int                blanks;
     pthread_mutex_t    one_big_mutex;
     pthread_cond_t     not_empty;
-    pthread_cond_t     not_full;
-    int                cancel_state;
 };
-typedef struct fd_queue_t fd_queue_t;
+typedef struct fd_stack_t fd_stack_t;
 
-int ap_queue_init(fd_queue_t *queue, int queue_capacity, apr_pool_t *a);
-int ap_queue_push(fd_queue_t *queue, apr_socket_t *sd, apr_pool_t *p);
-apr_status_t ap_queue_pop(fd_queue_t *queue, apr_socket_t **sd, apr_pool_t **p);
-apr_status_t ap_queue_interrupt_all(fd_queue_t *queue);
+int ap_stack_init(fd_stack_t *stack, int stack_capacity, apr_pool_t *a);
+int ap_stack_push(fd_stack_t *stack, void *e);
+int ap_stack_pop(fd_stack_t *stack, void **e);
+int ap_stack_interrupt_all(fd_stack_t *stack);
 
 #endif /* FDQUEUE_H */
Index: server/mpm/worker/worker.c
===================================================================
RCS file: /home/cvspublic/httpd-2.0/server/mpm/worker/worker.c,v
retrieving revision 1.27
diff -u -r1.27 worker.c
--- server/mpm/worker/worker.c	2001/09/24 06:42:12	1.27
+++ server/mpm/worker/worker.c	2001/10/10 23:43:02
@@ -68,6 +68,8 @@
 #include "apr_strings.h"
 #include "apr_file_io.h"
 #include "apr_thread_proc.h"
+#include "apr_thread_mutex.h"
+#include "apr_thread_cond.h"
 #include "apr_signal.h"
 #define APR_WANT_STRFUNC
 #include "apr_want.h"
@@ -122,7 +124,7 @@
 static int requests_this_child;
 static int num_listensocks = 0;
 static apr_socket_t **listensocks;
-static fd_queue_t *worker_queue;
+static fd_stack_t *worker_stack;
 
 /* The structure used to pass unique initialization info to each thread */
 typedef struct {
@@ -140,6 +142,25 @@
     apr_threadattr_t *threadattr;
 } thread_starter;
 
+/* State of a particular worker.
+ */
+typedef enum {
+    WORKER_ELEM_IDLE,   /* 0 - idle (ready for another connection) */
+    WORKER_ELEM_BUSY,   /* 1 - busy (currently processing a connection) */
+    WORKER_ELEM_QUIT    /* 2 - time to quit */
+} worker_elem_state_e;
+
+/* Structure used to keep track of the current state of a particular
+ * worker thread.
+ */
+typedef struct {
+    apr_pool_t          *pool;  /* pool to use when calling accept() */
+    apr_socket_t        *sd;    /* socket returned from accept() */
+    worker_elem_state_e  state;
+    apr_thread_mutex_t  *mutex;
+    apr_thread_cond_t   *cond;
+} worker_elem_t;
+
 /*
  * The max child slot ever assigned, preserved across restarts.  Necessary
  * to deal with MaxClients changes across AP_SIG_GRACEFUL restarts.  We 
@@ -201,9 +222,7 @@
 static void signal_workers(void)
 {
     workers_may_exit = 1;
-    /* XXX: This will happen naturally on a graceful, and we don't care otherwise.
-    ap_queue_signal_all_wakeup(worker_queue); */
-    ap_queue_interrupt_all(worker_queue);
+    ap_stack_interrupt_all(worker_stack);
 }
 
 AP_DECLARE(apr_status_t) ap_mpm_query(int query_code, int *result)
@@ -557,10 +576,9 @@
     int process_slot = ti->pid;
     int thread_slot = ti->tid;
     apr_pool_t *tpool = apr_thread_pool_get(thd);
-    apr_socket_t *csd = NULL;
-    apr_pool_t *ptrans;		/* Pool for per-transaction stuff */
     apr_socket_t *sd = NULL;
-    int n;
+    worker_elem_t *a_worker;
+    int n, numalive;
     int curr_pollfd, last_pollfd = 0;
     apr_pollfd_t *pollset;
     apr_status_t rv;
@@ -643,11 +661,20 @@
         }
     got_fd:
         if (!workers_may_exit) {
-            /* create a new transaction pool for each accepted socket */
-            apr_pool_create(&ptrans, tpool);
-
-            if ((rv = apr_accept(&csd, sd, ptrans)) != APR_SUCCESS) {
-                csd = NULL;
+            if ((rv = ap_stack_pop(worker_stack, (void **)&a_worker))
+                != APR_SUCCESS) {
+                signal_workers();
+            }
+            if ((rv = apr_thread_mutex_lock(a_worker->mutex))
+                != APR_SUCCESS) {
+                ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
+                             "apr_thread_mutex_lock failed. Attempting "
+                             "to shutdown process gracefully.");
+                signal_workers();
+            }
+            if ((rv = apr_accept(&a_worker->sd, sd, a_worker->pool))
+                != APR_SUCCESS) {
+                a_worker->sd = NULL;
                 ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, 
                              "apr_accept");
             }
@@ -658,17 +685,16 @@
                              "process gracefully.");
                 signal_workers();
             }
-            if (csd != NULL) {
-                rv = ap_queue_push(worker_queue, csd, ptrans);
-                if (rv) {
-                    /* trash the connection; we couldn't queue the connected
-                     * socket to a worker 
-                     */
-                    apr_socket_close(csd);
-                    ap_log_error(APLOG_MARK, APLOG_CRIT, 0, ap_server_conf,
-                                 "ap_queue_push failed with error code %d",
-                                 rv);
-                }
+
+            /* signal worker that it's ok to go */
+            a_worker->state = WORKER_ELEM_BUSY;
+            apr_thread_cond_signal(a_worker->cond);
+            if ((rv = apr_thread_mutex_unlock(a_worker->mutex))
+                != APR_SUCCESS) {
+                ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
+                             "apr_thread_mutex_unlock failed. Attempting "
+                             "to shutdown process gracefully.");
+                signal_workers();
             }
         }
         else {
@@ -683,6 +709,24 @@
         }
     }
 
+    /* Kill off the workers in a nice way */
+    numalive = ap_threads_per_child;
+    while (numalive > 0) {
+        if ((rv = ap_stack_pop(worker_stack, (void *)&a_worker))
+            != APR_SUCCESS) {
+            ap_log_error(APLOG_MARK, APLOG_EMERG, 0, ap_server_conf,
+                         "ap_queue_pop failed during shutdown with error "
+                         "code %d", rv);
+        }
+        else {
+            apr_thread_mutex_lock(a_worker->mutex);
+            a_worker->state = WORKER_ELEM_QUIT;
+            apr_thread_cond_signal(a_worker->cond);
+            apr_thread_mutex_unlock(a_worker->mutex);
+        }
+        --numalive;
+    }
+
     ap_update_child_status(process_slot, thread_slot, (dying) ? SERVER_DEAD : SERVER_GRACEFUL,
         (request_rec *) NULL);
     dying = 1;
@@ -695,35 +739,85 @@
     return NULL;
 }
 
-static void *worker_thread(apr_thread_t *thd, void * dummy)
+static void *worker_thread(apr_thread_t *thd, void *dummy)
 {
-    proc_info * ti = dummy;
+    proc_info *ti = dummy;
     int process_slot = ti->pid;
     int thread_slot = ti->tid;
-    apr_socket_t *csd = NULL;
-    apr_pool_t *ptrans;		/* Pool for per-transaction stuff */
+    apr_pool_t *tpool = apr_thread_pool_get(thd);
+    worker_elem_t my_state;
     apr_status_t rv;
 
+    ap_update_child_status(process_slot, thread_slot, SERVER_STARTING, NULL);
+
     free(ti);
 
-    ap_update_child_status(process_slot, thread_slot, SERVER_STARTING, NULL);
-    while (!workers_may_exit) {
+    apr_pool_create(&my_state.pool, tpool);
+    if ((rv = apr_thread_mutex_create(&my_state.mutex, tpool))
+        != APR_SUCCESS) {
+        ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
+                     "apr_thread_mutex_create");
+    }
+    if ((rv = apr_thread_cond_create(&my_state.cond, tpool))
+        != APR_SUCCESS) {
+        ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
+                     "apr_thread_cond_create");
+    }
+
+    if ((rv = apr_thread_mutex_lock(my_state.mutex)) != APR_SUCCESS) {
+        ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
+                     "apr_thread_mutex_lock");
+    }
+
+    while (1) {
+        my_state.sd = NULL;
+        my_state.state = WORKER_ELEM_IDLE;
+
         ap_update_child_status(process_slot, thread_slot, SERVER_READY, NULL);
-        rv = ap_queue_pop(worker_queue, &csd, &ptrans);
-        /* We get FD_QUEUE_EINTR whenever ap_queue_pop() has been interrupted
-         * from an explicit call to ap_queue_interrupt_all(). This allows
-         * us to unblock threads stuck in ap_queue_pop() when a shutdown
-         * is pending. */
-        if (rv == FD_QUEUE_EINTR || !csd) {
-            continue;
-        }
-        process_socket(ptrans, csd, process_slot, thread_slot);
-        requests_this_child--; /* FIXME: should be synchronized - aaron */
-        apr_pool_destroy(ptrans);
+
+        /* Make ourselves available as a connection processing worker. */
+        if ((rv = ap_stack_push(worker_stack, &my_state)) != APR_SUCCESS) {
+            ap_log_error(APLOG_MARK, APLOG_EMERG, 0, ap_server_conf,
+                         "ap_stack_push failed with error code %d", rv);
+        }
+
+        /* Because of the way this is architected, we will always have
+         * a context switch here. It would be neat if we came up with
+         * a good way to avoid the call to cond_wait, though.
+         */
+
+        while (my_state.state == WORKER_ELEM_IDLE) {
+            apr_thread_cond_wait(my_state.cond, my_state.mutex);
+        }
+        /* did someone wake us up to cnotice that it's time to exit? */
+        if (my_state.state == WORKER_ELEM_QUIT) { 
+            break;
+        }
+        else if (my_state.sd != NULL) {
+            process_socket(my_state.pool, my_state.sd,
+                           process_slot, thread_slot);
+            requests_this_child--; /* FIXME: should be synchronized - aaron */
+        }
+        apr_pool_clear(my_state.pool);
+    }
+
+    if ((rv = apr_thread_mutex_unlock(my_state.mutex)) != APR_SUCCESS) {
+        ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
+                     "apr_thread_mutex_unlock");
     }
 
     ap_update_child_status(process_slot, thread_slot,
-        (dying) ? SERVER_DEAD : SERVER_GRACEFUL, (request_rec *) NULL);
+        (dying) ? SERVER_DEAD : SERVER_GRACEFUL, (request_rec *)NULL);
+    if ((rv = apr_thread_cond_destroy(my_state.cond)) != APR_SUCCESS) {
+        ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
+                     "apr_thread_cond_destroy");
+    }
+    if ((rv = apr_thread_mutex_destroy(my_state.mutex)) != APR_SUCCESS) {
+        ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
+                     "apr_thread_mutex_destroy");
+    }
+    apr_pool_destroy(my_state.pool);
+
     apr_lock_acquire(worker_thread_count_mutex);
     worker_thread_count--;
     apr_lock_release(worker_thread_count_mutex);
@@ -757,8 +851,8 @@
 
     /* We must create the fd queues before we start up the listener
      * and worker threads. */
-    worker_queue = apr_pcalloc(pchild, sizeof(*worker_queue));
-    ap_queue_init(worker_queue, ap_threads_per_child, pchild);
+    worker_stack = apr_pcalloc(pchild, sizeof(*worker_stack));
+    ap_stack_init(worker_stack, ap_threads_per_child, pchild);
 
     my_info = (proc_info *)malloc(sizeof(proc_info));
     my_info->pid = my_child_num;