You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@httpd.apache.org by Aaron Bannert <aa...@clove.org> on 2001/08/24 18:04:37 UTC

[PATCH] make worker work

I've spent the last week digesting the worker MPM, and I've run across
a few problems that are fixed in this patch. The worker MPM will now:

 - play nice with shutdown
 - use thread-pools correctly

The major bugs that are fixed in this patch are:

 - since the worker queue was created in the listener thread, it was
   possible for a worker to try to use the queue before it was initialized.

 - only one transaction pool was being created for all connections,
   so as soon as one worker successfully processed that connection
   and the pool was cleared, it would at some later random time puke
   all over itself. [* this would have been a good time to have asserts
   in the code :) *]
     Note: Right now for each transaction a pool is created and destroyed
     when the transaction is completed, so we aren't yet getting the speed
     improvement from reusing pools. I will provide a patch for this in the
     near future, I just wanted to get this out the door for others to look
     at. Keep in mind that although this will improve efficiency, I'm
     already maxing out a solaris8 box's ephemeral ports within a minute
     or so with 'ab'.

Implementation details:

 - I removed the fdqueue functions for blocking, since they are now handled
   completely in ap_queue_push and ap_queue_pop.

 - I added a new fdqueue function for waking up threads that are blocked
   on a condition, and added the ability for ap_queue_pop to return with
   FD_QUEUE_EINTR if it was woken up before the queue was empty. This gives
   us a cleaner way to wake up idle threads when we are shutting down
   without disturbing threads that are processing connections.


-aaron


Index: fdqueue.c
===================================================================
RCS file: /home/cvspublic/httpd-2.0/server/mpm/worker/fdqueue.c,v
retrieving revision 1.3
diff -u -r1.3 fdqueue.c
--- fdqueue.c	2001/08/05 18:41:38	1.3
+++ fdqueue.c	2001/08/24 00:14:08
@@ -60,7 +60,11 @@
 
 /* Assumption: increment and decrement are atomic on int */
 
-int ap_increase_blanks(FDQueue *queue) 
+/**
+ * Threadsafe way to increment the number of empty slots ("blanks")
+ * in the resource queue.
+ */
+int ap_increase_blanks(fd_queue_t *queue) 
 {
     if (pthread_mutex_lock(&queue->one_big_mutex) != 0) {
         return FD_QUEUE_FAILURE;
@@ -69,61 +73,129 @@
     if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) {
         return FD_QUEUE_FAILURE;
     }
+
     return FD_QUEUE_SUCCESS;
 }
 
+/**
+ * Detects when the fd_queue_t is full. This utility function is expected
+ * to be called from within critical sections, and is not threadsafe.
+ */
+static int ap_queue_full(fd_queue_t *queue)
+{
+    return (queue->blanks <= 0);
+}
+
+/**
+ * Detects when the fd_queue_t is empty. This utility function is expected
+ * to be called from within critical sections, and is not threadsafe.
+ */
+static int ap_queue_empty(fd_queue_t *queue)
+{
+    /*return (queue->head == queue->tail);*/
+    return (queue->blanks >= queue->bounds - 1);
+}
+
+/**
+ * Callback routine that is called to destroy this
+ * fd_queue_t when it's pool is destroyed.
+ */
 static apr_status_t ap_queue_destroy(void *data) 
 {
-    FDQueue *queue = data;
-    /* Ignore errors here, we can't do anything about them anyway */
-    pthread_cond_destroy(&(queue->not_empty));
-    pthread_cond_destroy(&(queue->not_full));
+    fd_queue_t *queue = data;
+
+    /* Ignore errors here, we can't do anything about them anyway.
+     * XXX: We should at least try to signal an error here, it is
+     * indicative of a programmer error. -aaron */
+    pthread_cond_destroy(&queue->not_empty);
+    pthread_cond_destroy(&queue->not_full);
     pthread_mutex_destroy(&queue->one_big_mutex);
+
     return FD_QUEUE_SUCCESS;
 }
 
-int ap_queue_init(FDQueue *queue, int queue_capacity, apr_pool_t *a) 
+/**
+ * Initialize the fd_queue_t.
+ */
+int ap_queue_init(fd_queue_t *queue, int queue_capacity, apr_pool_t *a) 
 {
     int i;
-    int bounds = queue_capacity + 1;
-    pthread_cond_t not_empty = PTHREAD_COND_INITIALIZER;
-    pthread_cond_t not_full = PTHREAD_COND_INITIALIZER;
-    queue->not_empty = not_empty;
-    queue->not_full = not_full;
-    pthread_mutex_init(&queue->one_big_mutex, NULL);
+    int bounds;
+
+    if (pthread_mutex_init(&queue->one_big_mutex, NULL) != 0)
+        return FD_QUEUE_FAILURE;
+    if (pthread_cond_init(&queue->not_empty, NULL) != 0)
+        return FD_QUEUE_FAILURE;
+    if (pthread_cond_init(&queue->not_full, NULL) != 0)
+        return FD_QUEUE_FAILURE;
+
+    bounds = queue_capacity + 1;
     queue->head = queue->tail = 0;
-    queue->data = apr_palloc(a, bounds * sizeof(FDQueueElement));
+    queue->data = apr_palloc(a, bounds * sizeof(fd_queue_elem_t));
     queue->bounds = bounds;
-    queue->blanks = 0;
-    apr_pool_cleanup_register(a, queue, ap_queue_destroy, apr_pool_cleanup_null);
-    for (i=0; i < bounds; ++i)
+    queue->blanks = queue_capacity;
+
+    /* Set all the sockets in the queue to NULL */
+    for (i = 0; i < bounds; ++i)
         queue->data[i].sd = NULL;
+
+    apr_pool_cleanup_register(a, queue, ap_queue_destroy, apr_pool_cleanup_null);
+
     return FD_QUEUE_SUCCESS;
 }
 
-int ap_queue_push(FDQueue *queue, apr_socket_t *sd, apr_pool_t *p) 
+/**
+ * Push a new socket onto the queue. Blocks if the queue is full. Once
+ * the push operation has completed, it signals other threads waiting
+ * in apr_queue_pop() that they may continue consuming sockets.
+ */
+int ap_queue_push(fd_queue_t *queue, apr_socket_t *sd, apr_pool_t *p) 
 {
     if (pthread_mutex_lock(&queue->one_big_mutex) != 0) {
         return FD_QUEUE_FAILURE;
     }
+
+    /* Keep waiting until we wake up and find that the queue is not full. */
+    while (ap_queue_full(queue)) {
+        pthread_cond_wait(&queue->not_full, &queue->one_big_mutex);
+    }
+
     queue->data[queue->tail].sd = sd;
     queue->data[queue->tail].p = p;
     queue->tail = (queue->tail + 1) % queue->bounds;
     queue->blanks--;
+
+    pthread_cond_signal(&queue->not_empty);
+
     if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) {
         return FD_QUEUE_FAILURE;
     }
-    pthread_cond_signal(&(queue->not_empty));
+
     return FD_QUEUE_SUCCESS;
 }
 
-apr_status_t ap_queue_pop(FDQueue *queue, apr_socket_t **sd, apr_pool_t **p) 
+/**
+ * Retrieves the next available socket from the queue. If there are no
+ * sockets available, it will block until one becomes available.
+ * Once retrieved, the socket is placed into the address specified by
+ * 'sd'.
+ */
+apr_status_t ap_queue_pop(fd_queue_t *queue, apr_socket_t **sd, apr_pool_t **p) 
 {
     if (pthread_mutex_lock(&queue->one_big_mutex) != 0) {
         return FD_QUEUE_FAILURE;
     }
-    if (queue->head == queue->tail) {
-        pthread_cond_wait(&(queue->not_empty), &queue->one_big_mutex);
+
+    /* Keep waiting until we wake up and find that the queue is not empty. */
+    if (ap_queue_empty(queue)) {
+        pthread_cond_wait(&queue->not_empty, &queue->one_big_mutex);
+        /* If we wake up and it's still empty, then we were interrupted */
+        if (ap_queue_empty(queue)) {
+            if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) {
+                return FD_QUEUE_FAILURE;
+            }
+            return FD_QUEUE_EINTR;
+        }
     } 
     
     *sd = queue->data[queue->head].sd;
@@ -133,40 +205,27 @@
     if (sd != NULL) {
         queue->head = (queue->head + 1) % queue->bounds;
     }
+    queue->blanks++;
+
+    /* we just consumed a slot, so we're no longer full */
+    pthread_cond_signal(&queue->not_full);
+
     if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) {
         return FD_QUEUE_FAILURE;
     }
-    if (queue->blanks > 0) {
-        pthread_cond_signal(&(queue->not_full));
-    }
-    return APR_SUCCESS;
-}
-
-int ap_queue_size(FDQueue *queue) 
-{
-    return ((queue->tail - queue->head + queue->bounds) % queue->bounds);
-}
 
-int ap_queue_full(FDQueue *queue) 
-{
-    return(queue->blanks <= 0);
+    return APR_SUCCESS;
 }
 
-int ap_block_on_queue(FDQueue *queue) 
+apr_status_t ap_queue_interrupt_all(fd_queue_t *queue)
 {
     if (pthread_mutex_lock(&queue->one_big_mutex) != 0) {
         return FD_QUEUE_FAILURE;
-    }
-    if (ap_queue_full(queue)) {
-        pthread_cond_wait(&(queue->not_full), &queue->one_big_mutex);
     }
+    pthread_cond_broadcast(&queue->not_empty);
     if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) {
         return FD_QUEUE_FAILURE;
     }
     return FD_QUEUE_SUCCESS;
 }
 
-void ap_queue_signal_all_wakeup(FDQueue *queue)
-{
-    pthread_cond_broadcast(&(queue->not_empty));
-}
Index: fdqueue.h
===================================================================
RCS file: /home/cvspublic/httpd-2.0/server/mpm/worker/fdqueue.h,v
retrieving revision 1.4
diff -u -r1.4 fdqueue.h
--- fdqueue.h	2001/08/22 15:40:28	1.4
+++ fdqueue.h	2001/08/24 00:14:08
@@ -66,34 +66,35 @@
 #include <pthread.h>
 #include <sys/types.h>
 #include <sys/socket.h>
+#include <apr_errno.h>
 
 #define FD_QUEUE_SUCCESS 0
 #define FD_QUEUE_FAILURE -1 /* Needs to be an invalid file descriptor because
                                of queue_pop semantics */
+#define FD_QUEUE_EINTR APR_EINTR
 
-typedef struct fd_queue_elem {
-    apr_socket_t *sd;
-    apr_pool_t *p;
-} FDQueueElement;
+struct fd_queue_elem_t {
+    apr_socket_t      *sd;
+    apr_pool_t        *p;
+};
+typedef struct fd_queue_elem_t fd_queue_elem_t;
 
-typedef struct fd_queue {
-    int head;
-    int tail;
-    FDQueueElement *data;
-    int bounds;
-    int blanks;
-    pthread_mutex_t one_big_mutex;
-    pthread_cond_t not_empty;
-    pthread_cond_t not_full;
-} FDQueue;
+struct fd_queue_t {
+    int                head;
+    int                tail;
+    fd_queue_elem_t   *data;
+    int                bounds;
+    int                blanks;
+    pthread_mutex_t    one_big_mutex;
+    pthread_cond_t     not_empty;
+    pthread_cond_t     not_full;
+    int                cancel_state;
+};
+typedef struct fd_queue_t fd_queue_t;
 
-int ap_queue_init(FDQueue *queue, int queue_size, apr_pool_t *a);
-int ap_queue_push(FDQueue *queue, apr_socket_t *sd, apr_pool_t *p);
-apr_status_t ap_queue_pop(FDQueue *queue, apr_socket_t **sd, apr_pool_t **p);
-int ap_queue_size(FDQueue *queue);
-int ap_queue_full(FDQueue *queue);
-int ap_block_on_queue(FDQueue *queue);
-void ap_queue_signal_all_wakeup(FDQueue *queue);
-int ap_increase_blanks(FDQueue *queue);
+int ap_queue_init(fd_queue_t *queue, int queue_capacity, apr_pool_t *a);
+int ap_queue_push(fd_queue_t *queue, apr_socket_t *sd, apr_pool_t *p);
+apr_status_t ap_queue_pop(fd_queue_t *queue, apr_socket_t **sd, apr_pool_t **p);
+apr_status_t ap_queue_interrupt_all(fd_queue_t *queue);
 
 #endif /* FDQUEUE_H */
Index: worker.c
===================================================================
RCS file: /home/cvspublic/httpd-2.0/server/mpm/worker/worker.c,v
retrieving revision 1.14
diff -u -r1.14 worker.c
--- worker.c	2001/08/16 13:59:14	1.14
+++ worker.c	2001/08/24 00:14:08
@@ -124,14 +124,13 @@
 static int requests_this_child;
 static int num_listensocks = 0;
 static apr_socket_t **listensocks;
-static FDQueue *worker_queue;
+static fd_queue_t *worker_queue;
 
 /* The structure used to pass unique initialization info to each thread */
 typedef struct {
     int pid;
     int tid;
     int sd;
-    apr_pool_t *tpool; /* "pthread" would be confusing */
 } proc_info;
 
 /* Structure used to pass information to the thread responsible for 
@@ -201,7 +200,9 @@
 static void signal_workers(void)
 {
     workers_may_exit = 1;
-    ap_queue_signal_all_wakeup(worker_queue);
+    /* XXX: This will happen naturally on a graceful, and we don't care otherwise.
+    ap_queue_signal_all_wakeup(worker_queue); */
+    ap_queue_interrupt_all(worker_queue);
 }
 
 AP_DECLARE(apr_status_t) ap_mpm_query(int query_code, int *result)
@@ -553,7 +554,7 @@
     proc_info * ti = dummy;
     int process_slot = ti->pid;
     int thread_slot = ti->tid;
-    apr_pool_t *tpool = ti->tpool;
+    apr_pool_t *tpool = apr_thread_pool_get(thd);
     apr_socket_t *csd = NULL;
     apr_pool_t *ptrans;		/* Pool for per-transaction stuff */
     apr_socket_t *sd = NULL;
@@ -564,8 +565,6 @@
 
     free(ti);
 
-    apr_pool_create(&ptrans, tpool);
-
     apr_lock_acquire(worker_thread_count_mutex);
     worker_thread_count++;
     apr_lock_release(worker_thread_count_mutex);
@@ -574,12 +573,10 @@
     for(n=0 ; n <= num_listensocks ; ++n)
 	apr_poll_socket_add(pollset, listensocks[n], APR_POLLIN);
 
-    worker_queue = apr_pcalloc(pchild, sizeof(*worker_queue));
-    ap_queue_init(worker_queue, ap_threads_per_child, pchild);
-
     /* TODO: Switch to a system where threads reuse the results from earlier
        poll calls - manoj */
     while (1) {
+        /* TODO: requests_this_child should be synchronized - aaron */
         if (requests_this_child <= 0) {
             check_infinite_requests();
         }
@@ -644,6 +641,9 @@
         }
     got_fd:
         if (!workers_may_exit) {
+            /* create a new transaction pool for each accepted socket */
+            apr_pool_create(&ptrans, tpool);
+
             if ((rv = apr_accept(&csd, sd, ptrans)) != APR_SUCCESS) {
                 csd = NULL;
                 ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, 
@@ -658,7 +658,6 @@
             }
             if (csd != NULL) {
                 ap_queue_push(worker_queue, csd, ptrans);
-                ap_block_on_queue(worker_queue);
             }
         }
         else {
@@ -673,13 +672,15 @@
         }
     }
 
-    apr_pool_destroy(tpool);
     ap_update_child_status(process_slot, thread_slot, (dying) ? SERVER_DEAD : SERVER_GRACEFUL,
         (request_rec *) NULL);
     dying = 1;
     ap_scoreboard_image->parent[process_slot].quiescing = 1;
     kill(ap_my_pid, SIGTERM);
 
+/* this is uncommented when we make a pool-pool
+    apr_thread_exit(thd, APR_SUCCESS);
+*/
     return NULL;
 }
 
@@ -688,30 +689,35 @@
     proc_info * ti = dummy;
     int process_slot = ti->pid;
     int thread_slot = ti->tid;
-    apr_pool_t *tpool = ti->tpool;
     apr_socket_t *csd = NULL;
     apr_pool_t *ptrans;		/* Pool for per-transaction stuff */
+    apr_status_t rv;
 
     free(ti);
 
+    /* apr_pool_create(&ptrans, tpool); */
+
     while (!workers_may_exit) {
-        ap_queue_pop(worker_queue, &csd, &ptrans);
-        if (!csd) {
+        rv = ap_queue_pop(worker_queue, &csd, &ptrans);
+        /* We get FD_QUEUE_EINTR whenever ap_queue_pop() has been interrupted
+         * from an explicit call to ap_queue_interrupt_all(). This allows
+         * us to unblock threads stuck in ap_queue_pop() when a shutdown
+         * is pending. */
+        if (rv == FD_QUEUE_EINTR || !csd) {
             continue;
         }
-        ap_increase_blanks(worker_queue);
         process_socket(ptrans, csd, process_slot, thread_slot);
-        requests_this_child--;
-        apr_pool_clear(ptrans);
+        requests_this_child--; /* FIXME: should be synchronized - aaron */
+        apr_pool_destroy(ptrans);
     }
 
-    apr_pool_destroy(tpool);
     ap_update_child_status(process_slot, thread_slot, (dying) ? SERVER_DEAD : SERVER_GRACEFUL,
         (request_rec *) NULL);
     apr_lock_acquire(worker_thread_count_mutex);
     worker_thread_count--;
     apr_lock_release(worker_thread_count_mutex);
 
+    apr_thread_exit(thd, APR_SUCCESS);
     return NULL;
 }
 
@@ -738,14 +744,20 @@
     int threads_created = 0;
     apr_thread_t *listener;
 
+    /* We must create the fd queue before we start up the listener
+     * and worker threads. */
+    worker_queue = apr_pcalloc(pchild, sizeof(*worker_queue));
+    ap_queue_init(worker_queue, ap_threads_per_child, pchild);
+
     my_info = (proc_info *)malloc(sizeof(proc_info));
     my_info->pid = my_child_num;
     my_info->tid = i;
     my_info->sd = 0;
-    apr_pool_create(&my_info->tpool, pchild);
     apr_thread_create(&listener, thread_attr, listener_thread, my_info, pchild);
     while (1) {
-        for (i=1; i < ap_threads_per_child; i++) {
+        /* Does ap_threads_per_child include the listener thread?
+         * Why does this forloop start at 1? -aaron */
+        for (i = 1; i < ap_threads_per_child; i++) {
             int status = ap_scoreboard_image->servers[child_num_arg][i].status;
 
             if (status != SERVER_GRACEFUL && status != SERVER_DEAD) {
@@ -761,7 +773,6 @@
 	    my_info->pid = my_child_num;
             my_info->tid = i;
 	    my_info->sd = 0;
-	    apr_pool_create(&my_info->tpool, pchild);
 	
   	    /* We are creating threads right now */
 	    (void) ap_update_child_status(my_child_num, i, SERVER_STARTING, 
@@ -794,6 +805,7 @@
      *  "life_status" is almost right, but it's in the worker's structure, and 
      *  the name could be clearer.   gla
      */
+    apr_thread_exit(thd, APR_SUCCESS);
     return NULL;
 }
 
@@ -870,7 +882,7 @@
     apr_lock_create(&pipe_of_death_mutex, APR_MUTEX, APR_INTRAPROCESS, 
                     NULL, pchild);
 
-    ts = apr_palloc(pchild, sizeof(*ts));
+    ts = (thread_starter *)apr_palloc(pchild, sizeof(*ts));
 
     apr_threadattr_create(&thread_attr, pchild);
     apr_threadattr_detach_set(thread_attr, 0);    /* 0 means PTHREAD_CREATE_JOINABLE */