You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@httpd.apache.org by Aaron Bannert <aa...@clove.org> on 2001/11/24 00:01:38 UTC

[PATCH] time-space tradeoff (reuse tpool, one CV per worker thread)

On Fri, Nov 23, 2001 at 11:46:25AM -0800, Brian Pane wrote:
> Sounds good.  I think the "apr_pool_create_for_thread()" function that
> I proposed earlier this morning will work well in combination with the
> "time-space-tradeoff" worker design, so I'll continue with the prototyping
> on the former.

Here's an updated version of my worker redesign. The "queue" is really a
stack, but I didn't change the name for the sake of having a readable
patch -- if we end up going with this patch I'll rename everything to
"stack".

Some preliminary results: uniprocessor sol8/intel hitting /index.html.en
(ab -c 10 -n 10000):

new code 633.6 r/s (this patch)
mpstat 5:
CPU minf mjf xcal  intr ithr  csw icsw migr smtx  srw syscl  usr sys  wt idl
  0    6   0    0  4032 3932 6401 2856    0   45    0 21235   31  61   0   8
  0    7   0    0  3878 3778 6296 2763    0   41    0 20404   32  57   0  10

old code 629.6 r/s (cvs HEAD)
mpstat 5:
CPU minf mjf xcal  intr ithr  csw icsw migr smtx  srw syscl  usr sys  wt idl
  0    4   0    0  4073 3973 6302 2526    0   57    0 25240   33  59   0   8
  0    7   0    0  3704 3604 5757 2421    0   43    2 22985   28  55   0  17

I don't expect this small sampling to predict future performance, but at
least it proves that I didn't make any mistakes in bringing this patch
back up to date. It also shows that at least under these conditions this
design performs on par with the current worker code.

-aaron


Index: server/mpm/worker/worker.c
===================================================================
RCS file: /home/cvs/httpd-2.0/server/mpm/worker/worker.c,v
retrieving revision 1.43
diff -u -r1.43 worker.c
--- server/mpm/worker/worker.c	2001/11/22 05:13:29	1.43
+++ server/mpm/worker/worker.c	2001/11/23 22:32:47
@@ -68,9 +68,10 @@
 #include "apr_strings.h"
 #include "apr_file_io.h"
 #include "apr_thread_proc.h"
-#include "apr_signal.h"
 #include "apr_thread_mutex.h"
+#include "apr_thread_cond.h"
 #include "apr_proc_mutex.h"
+#include "apr_signal.h"
 #define APR_WANT_STRFUNC
 #include "apr_want.h"
 
@@ -141,6 +142,25 @@
     apr_threadattr_t *threadattr;
 } thread_starter;
 
+/* State of a particular worker.
+ */
+typedef enum {
+    WORKER_ELEM_IDLE,    /* 0 - idle (ready for another connection) */
+    WORKER_ELEM_BUSY,    /* 1 - busy (currently processing a connection) */
+    WORKER_ELEM_QUIT     /* 2 - time to quit */
+} worker_elem_state_e;
+
+/* Structure used to keep track of the current state of a particular
+ * worker thread.
+ */
+typedef struct {
+    apr_pool_t          *pool;  /* pool to use when calling accept() */
+    apr_socket_t        *sd;    /* socket returned from accept() */
+    worker_elem_state_e  state;
+    apr_thread_mutex_t  *mutex;
+    apr_thread_cond_t   *cond;
+} worker_elem_t;
+
 /*
  * The max child slot ever assigned, preserved across restarts.  Necessary
  * to deal with MaxClients changes across AP_SIG_GRACEFUL restarts.  We 
@@ -202,8 +222,6 @@
 static void signal_workers(void)
 {
     workers_may_exit = 1;
-    /* XXX: This will happen naturally on a graceful, and we don't care otherwise.
-    ap_queue_signal_all_wakeup(worker_queue); */
     ap_queue_interrupt_all(worker_queue);
 }
 
@@ -562,9 +580,8 @@
     int process_slot = ti->pid;
     int thread_slot = ti->tid;
     apr_pool_t *tpool = apr_thread_pool_get(thd);
-    void *csd = NULL;
-    apr_pool_t *ptrans;		/* Pool for per-transaction stuff */
-    int n;
+    int n, numalive;
+    worker_elem_t *a_worker;
     apr_pollfd_t *pollset;
     apr_status_t rv;
     ap_listen_rec *lr, *last_lr = ap_listeners;
@@ -639,10 +656,23 @@
         }
     got_fd:
         if (!workers_may_exit) {
-            /* create a new transaction pool for each accepted socket */
-            apr_pool_create(&ptrans, tpool);
-
-            rv = lr->accept_func(&csd, lr, ptrans);
+            if ((rv = ap_queue_pop(worker_queue, (void **)&a_worker))
+                != APR_SUCCESS) {
+                signal_workers();
+            }
+            if ((rv = apr_thread_mutex_lock(a_worker->mutex))
+                != APR_SUCCESS) {
+                ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
+                             "apr_thread_mutex_lock failed. Attempting "
+                             "to shutdown process gracefully.");
+                signal_workers();
+            }
+            if ((rv = lr->accept_func(&a_worker->sd, lr, a_worker->pool))
+                != APR_SUCCESS) {
+                a_worker->sd = NULL;
+                ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf,
+                             "apr_accept");
+            }
 
             if (rv == APR_EGENERAL) {
                 signal_workers();
@@ -654,17 +684,17 @@
                              "shutdown process gracefully.");
                 signal_workers();
             }
-            if (csd != NULL) {
-                rv = ap_queue_push(worker_queue, csd, ptrans);
-                if (rv) {
-                    /* trash the connection; we couldn't queue the connected
-                     * socket to a worker 
-                     */
-                    apr_socket_close(csd);
-                    ap_log_error(APLOG_MARK, APLOG_CRIT, 0, ap_server_conf,
-                                 "ap_queue_push failed with error code %d",
-                                 rv);
-                }
+
+            /* Signal worker that it's time to go. */
+            a_worker->state = WORKER_ELEM_BUSY;
+            apr_thread_cond_signal(a_worker->cond);
+
+            if ((rv = apr_thread_mutex_unlock(a_worker->mutex))
+                != APR_SUCCESS) {
+                ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
+                             "apr_proc_mutex_unlock failed. Attempting to "
+                             "shutdown process gracefully.");
+                signal_workers();
             }
         }
         else {
@@ -679,6 +709,24 @@
         }
     }
 
+    /* Kill off the workers in a nice way. */
+    numalive = ap_threads_per_child;
+    while (numalive > 0) {
+        if ((rv = ap_queue_pop(worker_queue, (void *)&a_worker))
+            != APR_SUCCESS) {
+            ap_log_error(APLOG_MARK, APLOG_EMERG, 0, ap_server_conf,
+                         "ap_queue_pop failed during shutdown with error "
+                         "code %d", rv);
+        }
+        else {
+            apr_thread_mutex_lock(a_worker->mutex);
+            a_worker->state = WORKER_ELEM_QUIT;
+            apr_thread_cond_signal(a_worker->cond);
+            apr_thread_mutex_unlock(a_worker->mutex);
+        }
+        --numalive;
+    }
+
     ap_update_child_status(process_slot, thread_slot, (dying) ? SERVER_DEAD : SERVER_GRACEFUL,
         (request_rec *) NULL);
     dying = 1;
@@ -691,35 +739,81 @@
     return NULL;
 }
 
-static void *worker_thread(apr_thread_t *thd, void * dummy)
+static void *worker_thread(apr_thread_t *thd, void *dummy)
 {
     proc_info * ti = dummy;
     int process_slot = ti->pid;
     int thread_slot = ti->tid;
-    apr_socket_t *csd = NULL;
-    apr_pool_t *ptrans;		/* Pool for per-transaction stuff */
+    apr_pool_t *tpool = apr_thread_pool_get(thd);
+    worker_elem_t my_state;
     apr_status_t rv;
 
+    ap_update_child_status(process_slot, thread_slot, SERVER_STARTING, NULL);
+
     free(ti);
 
-    ap_update_child_status(process_slot, thread_slot, SERVER_STARTING, NULL);
-    while (!workers_may_exit) {
+    apr_pool_create(&my_state.pool, tpool);
+    if ((rv = apr_thread_mutex_create(&my_state.mutex,
+                                      APR_THREAD_MUTEX_DEFAULT, tpool))
+        != APR_SUCCESS) {
+        ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
+                     "apr_thread_mutex_create");
+    }
+    if ((rv = apr_thread_cond_create(&my_state.cond, tpool))
+        != APR_SUCCESS) {
+        ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
+                     "apr_thread_cond_create");
+    }
+
+    if ((rv = apr_thread_mutex_lock(my_state.mutex)) != APR_SUCCESS) {
+        ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
+                     "apr_thread_mutex_lock");
+    }
+
+    while (1) {
+        my_state.sd = NULL;
+        my_state.state = WORKER_ELEM_IDLE;
+
         ap_update_child_status(process_slot, thread_slot, SERVER_READY, NULL);
-        rv = ap_queue_pop(worker_queue, &csd, &ptrans);
-        /* We get FD_QUEUE_EINTR whenever ap_queue_pop() has been interrupted
-         * from an explicit call to ap_queue_interrupt_all(). This allows
-         * us to unblock threads stuck in ap_queue_pop() when a shutdown
-         * is pending. */
-        if (rv == FD_QUEUE_EINTR || !csd) {
-            continue;
-        }
-        process_socket(ptrans, csd, process_slot, thread_slot);
-        requests_this_child--; /* FIXME: should be synchronized - aaron */
-        apr_pool_destroy(ptrans);
+
+        /* Make ourselves available as a connection-processing worker. */
+        if ((rv = ap_queue_push(worker_queue, &my_state)) != APR_SUCCESS) {
+            ap_log_error(APLOG_MARK, APLOG_EMERG, 0, ap_server_conf,
+                         "ap_stack_push failed with error code %d", rv);
+        }
+
+        /* Because of the way this is architected, we will always have
+         * a context switch here. It would be neat if we could come up
+         * with a good way to avoid the call to cond_wait. -aaron
+         */
+        while (my_state.state == WORKER_ELEM_IDLE) {
+            apr_thread_cond_wait(my_state.cond, my_state.mutex);
+        }
+        /* Did someone wake us up to notice that it is time to exit? */
+        if (my_state.state == WORKER_ELEM_QUIT) {
+            break;
+        }
+        else if (my_state.sd != NULL) {
+            process_socket(my_state.pool, my_state.sd,
+                           process_slot, thread_slot);
+            requests_this_child--; /* FIXME: should be synchronized -aaron */
+        }
+        apr_pool_clear(my_state.pool);
     }
 
     ap_update_child_status(process_slot, thread_slot,
-        (dying) ? SERVER_DEAD : SERVER_GRACEFUL, (request_rec *) NULL);
+        (dying) ? SERVER_DEAD : SERVER_GRACEFUL, (request_rec *)NULL);
+
+    if ((rv = apr_thread_cond_destroy(my_state.cond)) != APR_SUCCESS) {
+        ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
+                     "apr_thread_cond_destroy");
+    }
+    if ((rv = apr_thread_mutex_destroy(my_state.mutex)) != APR_SUCCESS) {
+        ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
+                     "apr_thread_mutex_destroy");
+    }
+    apr_pool_destroy(my_state.pool);
+
     apr_thread_mutex_lock(worker_thread_count_mutex);
     worker_thread_count--;
     apr_thread_mutex_unlock(worker_thread_count_mutex);
Index: server/mpm/worker/fdqueue.c
===================================================================
RCS file: /home/cvs/httpd-2.0/server/mpm/worker/fdqueue.c,v
retrieving revision 1.9
diff -u -r1.9 fdqueue.c
--- server/mpm/worker/fdqueue.c	2001/10/17 16:29:36	1.9
+++ server/mpm/worker/fdqueue.c	2001/11/23 22:32:47
@@ -82,7 +82,6 @@
      * XXX: We should at least try to signal an error here, it is
      * indicative of a programmer error. -aaron */
     apr_thread_cond_destroy(queue->not_empty);
-    apr_thread_cond_destroy(queue->not_full);
     apr_thread_mutex_destroy(queue->one_big_mutex);
 
     return FD_QUEUE_SUCCESS;
@@ -93,26 +92,21 @@
  */
 int ap_queue_init(fd_queue_t *queue, int queue_capacity, apr_pool_t *a) 
 {
-    int i;
-
     /* FIXME: APRize these return values. */
     if (apr_thread_mutex_create(&queue->one_big_mutex,
-                              APR_THREAD_MUTEX_DEFAULT, a) != APR_SUCCESS)
-        return FD_QUEUE_FAILURE;
-    if (apr_thread_cond_create(&queue->not_empty, a) != APR_SUCCESS)
+                                APR_THREAD_MUTEX_DEFAULT, a) != APR_SUCCESS) {
         return FD_QUEUE_FAILURE;
-    if (apr_thread_cond_create(&queue->not_full, a) != APR_SUCCESS)
+    }
+    if (apr_thread_cond_create(&queue->not_empty, a) != APR_SUCCESS) {
         return FD_QUEUE_FAILURE;
+    }
 
     queue->tail = 0;
-    queue->data = apr_palloc(a, queue_capacity * sizeof(fd_queue_elem_t));
+    queue->data = apr_palloc(a, queue_capacity * sizeof(void*));
     queue->bounds = queue_capacity;
-
-    /* Set all the sockets in the queue to NULL */
-    for (i = 0; i < queue_capacity; ++i)
-        queue->data[i].sd = NULL;
 
-    apr_pool_cleanup_register(a, queue, ap_queue_destroy, apr_pool_cleanup_null);
+    apr_pool_cleanup_register(a, queue, ap_queue_destroy,
+                              apr_pool_cleanup_null);
 
     return FD_QUEUE_SUCCESS;
 }
@@ -122,23 +116,29 @@
  * the push operation has completed, it signals other threads waiting
  * in apr_queue_pop() that they may continue consuming sockets.
  */
-int ap_queue_push(fd_queue_t *queue, apr_socket_t *sd, apr_pool_t *p) 
+int ap_queue_push(fd_queue_t *queue, void *e)
 {
-    fd_queue_elem_t *elem;
-
     if (apr_thread_mutex_lock(queue->one_big_mutex) != APR_SUCCESS) {
         return FD_QUEUE_FAILURE;
     }
 
-    while (ap_queue_full(queue)) {
-        apr_thread_cond_wait(queue->not_full, queue->one_big_mutex);
+    /* If they push too many, they didn't allocate enough slots
+     * in the stack, and we treat that as fatal. */
+    if (ap_queue_full(queue)) {
+        if (apr_thread_mutex_unlock(queue->one_big_mutex) != APR_SUCCESS) {
+            return FD_QUEUE_FAILURE;
+        }
+        return FD_QUEUE_OVERFLOW;
     }
 
-    elem = &queue->data[queue->tail++];
-    elem->sd = sd;
-    elem->p = p;
+    queue->data[queue->tail++] = e;
 
-    apr_thread_cond_signal(queue->not_empty);
+    /* Only perform the overhead of signaling if we were empty before
+     * inserting this element.
+     */
+    if (1 == queue->tail) {
+        apr_thread_cond_signal(queue->not_empty);
+    }
 
     if (apr_thread_mutex_unlock(queue->one_big_mutex) != APR_SUCCESS) {
         return FD_QUEUE_FAILURE;
@@ -153,16 +153,14 @@
  * Once retrieved, the socket is placed into the address specified by
  * 'sd'.
  */
-apr_status_t ap_queue_pop(fd_queue_t *queue, apr_socket_t **sd, apr_pool_t **p) 
+apr_status_t ap_queue_pop(fd_queue_t *queue, void **e)
 {
-    fd_queue_elem_t *elem;
-
     if (apr_thread_mutex_lock(queue->one_big_mutex) != APR_SUCCESS) {
         return FD_QUEUE_FAILURE;
     }
 
     /* Keep waiting until we wake up and find that the queue is not empty. */
-    if (ap_queue_empty(queue)) {
+    while (ap_queue_empty(queue)) {
         apr_thread_cond_wait(queue->not_empty, queue->one_big_mutex);
         /* If we wake up and it's still empty, then we were interrupted */
         if (ap_queue_empty(queue)) {
@@ -173,16 +171,7 @@
         }
     } 
     
-    elem = &queue->data[--queue->tail];
-    *sd = elem->sd;
-    *p = elem->p;
-    elem->sd = NULL;
-    elem->p = NULL;
-
-    /* signal not_full if we were full before this pop */
-    if (queue->tail == queue->bounds - 1) {
-        apr_thread_cond_signal(queue->not_full);
-    }
+    *e = queue->data[--queue->tail];
 
     if (apr_thread_mutex_unlock(queue->one_big_mutex) != APR_SUCCESS) {
         return FD_QUEUE_FAILURE;
@@ -197,9 +186,6 @@
         return FD_QUEUE_FAILURE;
     }
     apr_thread_cond_broadcast(queue->not_empty);
-    /* We shouldn't have multiple threads sitting in not_full, but
-     * broadcast just in case. */
-    apr_thread_cond_broadcast(queue->not_full);
     if (apr_thread_mutex_unlock(queue->one_big_mutex) != APR_SUCCESS) {
         return FD_QUEUE_FAILURE;
     }
Index: server/mpm/worker/fdqueue.h
===================================================================
RCS file: /home/cvs/httpd-2.0/server/mpm/worker/fdqueue.h,v
retrieving revision 1.9
diff -u -r1.9 fdqueue.h
--- server/mpm/worker/fdqueue.h	2001/10/17 16:29:37	1.9
+++ server/mpm/worker/fdqueue.h	2001/11/23 22:32:47
@@ -73,29 +73,21 @@
 #define FD_QUEUE_FAILURE -1 /* Needs to be an invalid file descriptor because
                                of queue_pop semantics */
 #define FD_QUEUE_EINTR APR_EINTR
+#define FD_QUEUE_OVERFLOW -2
 
-struct fd_queue_elem_t {
-    apr_socket_t      *sd;
-    apr_pool_t        *p;
-};
-typedef struct fd_queue_elem_t fd_queue_elem_t;
-
 struct fd_queue_t {
     int                 tail;
-    fd_queue_elem_t    *data;
+    void              **data;
     int                 bounds;
-    int                 blanks;
     apr_thread_mutex_t *one_big_mutex;
     apr_thread_cond_t  *not_empty;
-    apr_thread_cond_t  *not_full;
-    int                 cancel_state;
 };
 typedef struct fd_queue_t fd_queue_t;
 
 /* FIXME: APRize these -- return values should be apr_status_t */
 int ap_queue_init(fd_queue_t *queue, int queue_capacity, apr_pool_t *a);
-int ap_queue_push(fd_queue_t *queue, apr_socket_t *sd, apr_pool_t *p);
-apr_status_t ap_queue_pop(fd_queue_t *queue, apr_socket_t **sd, apr_pool_t **p);
-apr_status_t ap_queue_interrupt_all(fd_queue_t *queue);
+int ap_queue_push(fd_queue_t *queue, void *e);
+int ap_queue_pop(fd_queue_t *queue, void **e);
+int ap_queue_interrupt_all(fd_queue_t *queue);
 
 #endif /* FDQUEUE_H */

Re: [PATCH] time-space tradeoff (reuse tpool, one CV per worker thread)

Posted by Brian Pane <bp...@pacbell.net>.
I wrote:
[...]

> I just finished profiling the combination of the time-space-tradeoff
> worker with thread-private pools, and it does seem to achieve the
> intended effect: most of the mutex locking is eliminated.  I don't
> have before&after throughput data, but I do have profiler data that
> shows that the combination of the two patches reduces the time spent
> in pool by almost half. 

     ^^^^ I meant to say, "the time spent in pool cleanups."  Reducing
          the total time spent in all pool operations by 50% would be
          much more difficult. :-)

--Brian



Re: [PATCH] time-space tradeoff (reuse tpool, one CV per worker thread)

Posted by Brian Pane <bp...@pacbell.net>.
Brian Pane wrote:

> Aaron Bannert wrote: 

[...]

>> Here's an updated version of my worker redesign. The "queue" is really a
>> stack, but I didn't change the name for the sake of having a readable
>> patch -- if we end up going with this patch I'll rename everything to
>> "stack".
>>
>
> Thanks.  Here's my patch to optimize away the mutex operations in
> pools that have been designated thread-private.  With the current
> worker code, it can eliminate the mutex ops for subrequest pool
> creation/destruction.  By combining it with your worker redesign,
> I think we may be able to eliminate the mutexes for the ptrans
> pool. 


I just finished profiling the combination of the time-space-tradeoff
worker with thread-private pools, and it does seem to achieve the
intended effect: most of the mutex locking is eliminated.  I don't
have before&after throughput data, but I do have profiler data that
shows that the combination of the two patches reduces the time spent
in pool by almost half.

--Brian




Re: [PATCH] time-space tradeoff (reuse tpool, one CV per worker thread)

Posted by Brian Pane <bp...@pacbell.net>.
Justin Erenkrantz wrote:

>[ Moving this part of the discussion to dev@apr. ]
>
>On Fri, Nov 23, 2001 at 03:16:35PM -0800, Brian Pane wrote:
>
>>Thanks.  Here's my patch to optimize away the mutex operations in
>>pools that have been designated thread-private.  With the current
>>worker code, it can eliminate the mutex ops for subrequest pool
>>creation/destruction.  By combining it with your worker redesign,
>>I think we may be able to eliminate the mutexes for the ptrans
>>pool.
>>
>
>Nice idea.  However, I think it might be best to resurrect
>Sander's patch which did a rewrite of the pool code (keeping
>virtually the same API) but made the internal code much cleaner.  
>And, it had thread-private to boot.  (The current pool code has
>threads grafted on to it, so it isn't as clean.)  And, we proved
>it was a performance win, too, I think.
>

Agreed--now might be a good time to revive Sander's pool patch.

The benchmark results for my pool patch show that:

  * Modifying the pool code to reduce mutex contention is
    a good idea.  With the patch applied, we saw a large decrease
    in run queue length and a modest increase in throughput on
    Solaris.

  * My patch isn't ready for production use. There's an apparent
    race condition in my patch that results in rare heap corruption,
    and so far I've been unable to isolate the bug.

Thus I think Sander's patch may be a better choice than mine.

--Brian



Re: [PATCH] time-space tradeoff (reuse tpool, one CV per worker thread)

Posted by Justin Erenkrantz <je...@ebuilt.com>.
[ Moving this part of the discussion to dev@apr. ]

On Fri, Nov 23, 2001 at 03:16:35PM -0800, Brian Pane wrote:
> Thanks.  Here's my patch to optimize away the mutex operations in
> pools that have been designated thread-private.  With the current
> worker code, it can eliminate the mutex ops for subrequest pool
> creation/destruction.  By combining it with your worker redesign,
> I think we may be able to eliminate the mutexes for the ptrans
> pool.

Nice idea.  However, I think it might be best to resurrect
Sander's patch which did a rewrite of the pool code (keeping
virtually the same API) but made the internal code much cleaner.  
And, it had thread-private to boot.  (The current pool code has
threads grafted on to it, so it isn't as clean.)  And, we proved
it was a performance win, too, I think.

I remember having this conversation a few months ago, but now
I think might be the time to land this puppy in the tree.  =)
Sander?

My $.02.  -- justin


Re: [PATCH] time-space tradeoff (reuse tpool, one CV per worker thread)

Posted by Brian Pane <bp...@pacbell.net>.
Aaron Bannert wrote:

>On Fri, Nov 23, 2001 at 11:46:25AM -0800, Brian Pane wrote:
>
>>Sounds good.  I think the "apr_pool_create_for_thread()" function that
>>I proposed earlier this morning will work well in combination with the
>>"time-space-tradeoff" worker design, so I'll continue with the prototyping
>>on the former.
>>
>
>Here's an updated version of my worker redesign. The "queue" is really a
>stack, but I didn't change the name for the sake of having a readable
>patch -- if we end up going with this patch I'll rename everything to
>"stack".
>

Thanks.  Here's my patch to optimize away the mutex operations in
pools that have been designated thread-private.  With the current
worker code, it can eliminate the mutex ops for subrequest pool
creation/destruction.  By combining it with your worker redesign,
I think we may be able to eliminate the mutexes for the ptrans
pool.

--Brian



Re: [PATCH] time-space tradeoff (reuse tpool, one CV per worker thread)

Posted by Ian Holsman <ia...@cnet.com>.
On Fri, 2001-11-23 at 15:01, Aaron Bannert wrote:
> On Fri, Nov 23, 2001 at 11:46:25AM -0800, Brian Pane wrote:
> > Sounds good.  I think the "apr_pool_create_for_thread()" function that
> > I proposed earlier this morning will work well in combination with the
> > "time-space-tradeoff" worker design, so I'll continue with the prototyping
> > on the former.
> 
> Here's an updated version of my worker redesign. The "queue" is really a
> stack, but I didn't change the name for the sake of having a readable
> patch -- if we end up going with this patch I'll rename everything to
> "stack".
> 
Hi Aaron.
I've applied your patch.
and am benchamrking it now.
One thing I noticed is that on startup I get
[error] (4)Interrupted system call: apr_accept

but the server runs fine regardless.

..Ian
> Some preliminary results: uniprocessor sol8/intel hitting /index.html.en
> (ab -c 10 -n 10000):
> 
> new code 633.6 r/s (this patch)
> mpstat 5:
> CPU minf mjf xcal  intr ithr  csw icsw migr smtx  srw syscl  usr sys  wt idl
>   0    6   0    0  4032 3932 6401 2856    0   45    0 21235   31  61   0   8
>   0    7   0    0  3878 3778 6296 2763    0   41    0 20404   32  57   0  10
> 
> old code 629.6 r/s (cvs HEAD)
> mpstat 5:
> CPU minf mjf xcal  intr ithr  csw icsw migr smtx  srw syscl  usr sys  wt idl
>   0    4   0    0  4073 3973 6302 2526    0   57    0 25240   33  59   0   8
>   0    7   0    0  3704 3604 5757 2421    0   43    2 22985   28  55   0  17
> 
> I don't expect this small sampling to predict future performance, but at
> least it proves that I didn't make any mistakes in bringing this patch
> back up to date. It also shows that at least under these conditions this
> design performs on par with the current worker code.
> 
> -aaron
> 
> 
> Index: server/mpm/worker/worker.c
> ===================================================================
> RCS file: /home/cvs/httpd-2.0/server/mpm/worker/worker.c,v
> retrieving revision 1.43
> diff -u -r1.43 worker.c
> --- server/mpm/worker/worker.c	2001/11/22 05:13:29	1.43
> +++ server/mpm/worker/worker.c	2001/11/23 22:32:47
> @@ -68,9 +68,10 @@
>  #include "apr_strings.h"
>  #include "apr_file_io.h"
>  #include "apr_thread_proc.h"
> -#include "apr_signal.h"
>  #include "apr_thread_mutex.h"
> +#include "apr_thread_cond.h"
>  #include "apr_proc_mutex.h"
> +#include "apr_signal.h"
>  #define APR_WANT_STRFUNC
>  #include "apr_want.h"
>  
> @@ -141,6 +142,25 @@
>      apr_threadattr_t *threadattr;
>  } thread_starter;
>  
> +/* State of a particular worker.
> + */
> +typedef enum {
> +    WORKER_ELEM_IDLE,    /* 0 - idle (ready for another connection) */
> +    WORKER_ELEM_BUSY,    /* 1 - busy (currently processing a connection) */
> +    WORKER_ELEM_QUIT     /* 2 - time to quit */
> +} worker_elem_state_e;
> +
> +/* Structure used to keep track of the current state of a particular
> + * worker thread.
> + */
> +typedef struct {
> +    apr_pool_t          *pool;  /* pool to use when calling accept() */
> +    apr_socket_t        *sd;    /* socket returned from accept() */
> +    worker_elem_state_e  state;
> +    apr_thread_mutex_t  *mutex;
> +    apr_thread_cond_t   *cond;
> +} worker_elem_t;
> +
>  /*
>   * The max child slot ever assigned, preserved across restarts.  Necessary
>   * to deal with MaxClients changes across AP_SIG_GRACEFUL restarts.  We 
> @@ -202,8 +222,6 @@
>  static void signal_workers(void)
>  {
>      workers_may_exit = 1;
> -    /* XXX: This will happen naturally on a graceful, and we don't care otherwise.
> -    ap_queue_signal_all_wakeup(worker_queue); */
>      ap_queue_interrupt_all(worker_queue);
>  }
>  
> @@ -562,9 +580,8 @@
>      int process_slot = ti->pid;
>      int thread_slot = ti->tid;
>      apr_pool_t *tpool = apr_thread_pool_get(thd);
> -    void *csd = NULL;
> -    apr_pool_t *ptrans;		/* Pool for per-transaction stuff */
> -    int n;
> +    int n, numalive;
> +    worker_elem_t *a_worker;
>      apr_pollfd_t *pollset;
>      apr_status_t rv;
>      ap_listen_rec *lr, *last_lr = ap_listeners;
> @@ -639,10 +656,23 @@
>          }
>      got_fd:
>          if (!workers_may_exit) {
> -            /* create a new transaction pool for each accepted socket */
> -            apr_pool_create(&ptrans, tpool);
> -
> -            rv = lr->accept_func(&csd, lr, ptrans);
> +            if ((rv = ap_queue_pop(worker_queue, (void **)&a_worker))
> +                != APR_SUCCESS) {
> +                signal_workers();
> +            }
> +            if ((rv = apr_thread_mutex_lock(a_worker->mutex))
> +                != APR_SUCCESS) {
> +                ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
> +                             "apr_thread_mutex_lock failed. Attempting "
> +                             "to shutdown process gracefully.");
> +                signal_workers();
> +            }
> +            if ((rv = lr->accept_func(&a_worker->sd, lr, a_worker->pool))
> +                != APR_SUCCESS) {
> +                a_worker->sd = NULL;
> +                ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf,
> +                             "apr_accept");
> +            }
>  
>              if (rv == APR_EGENERAL) {
>                  signal_workers();
> @@ -654,17 +684,17 @@
>                               "shutdown process gracefully.");
>                  signal_workers();
>              }
> -            if (csd != NULL) {
> -                rv = ap_queue_push(worker_queue, csd, ptrans);
> -                if (rv) {
> -                    /* trash the connection; we couldn't queue the connected
> -                     * socket to a worker 
> -                     */
> -                    apr_socket_close(csd);
> -                    ap_log_error(APLOG_MARK, APLOG_CRIT, 0, ap_server_conf,
> -                                 "ap_queue_push failed with error code %d",
> -                                 rv);
> -                }
> +
> +            /* Signal worker that it's time to go. */
> +            a_worker->state = WORKER_ELEM_BUSY;
> +            apr_thread_cond_signal(a_worker->cond);
> +
> +            if ((rv = apr_thread_mutex_unlock(a_worker->mutex))
> +                != APR_SUCCESS) {
> +                ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
> +                             "apr_proc_mutex_unlock failed. Attempting to "
> +                             "shutdown process gracefully.");
> +                signal_workers();
>              }
>          }
>          else {
> @@ -679,6 +709,24 @@
>          }
>      }
>  
> +    /* Kill off the workers in a nice way. */
> +    numalive = ap_threads_per_child;
> +    while (numalive > 0) {
> +        if ((rv = ap_queue_pop(worker_queue, (void *)&a_worker))
> +            != APR_SUCCESS) {
> +            ap_log_error(APLOG_MARK, APLOG_EMERG, 0, ap_server_conf,
> +                         "ap_queue_pop failed during shutdown with error "
> +                         "code %d", rv);
> +        }
> +        else {
> +            apr_thread_mutex_lock(a_worker->mutex);
> +            a_worker->state = WORKER_ELEM_QUIT;
> +            apr_thread_cond_signal(a_worker->cond);
> +            apr_thread_mutex_unlock(a_worker->mutex);
> +        }
> +        --numalive;
> +    }
> +
>      ap_update_child_status(process_slot, thread_slot, (dying) ? SERVER_DEAD : SERVER_GRACEFUL,
>          (request_rec *) NULL);
>      dying = 1;
> @@ -691,35 +739,81 @@
>      return NULL;
>  }
>  
> -static void *worker_thread(apr_thread_t *thd, void * dummy)
> +static void *worker_thread(apr_thread_t *thd, void *dummy)
>  {
>      proc_info * ti = dummy;
>      int process_slot = ti->pid;
>      int thread_slot = ti->tid;
> -    apr_socket_t *csd = NULL;
> -    apr_pool_t *ptrans;		/* Pool for per-transaction stuff */
> +    apr_pool_t *tpool = apr_thread_pool_get(thd);
> +    worker_elem_t my_state;
>      apr_status_t rv;
>  
> +    ap_update_child_status(process_slot, thread_slot, SERVER_STARTING, NULL);
> +
>      free(ti);
>  
> -    ap_update_child_status(process_slot, thread_slot, SERVER_STARTING, NULL);
> -    while (!workers_may_exit) {
> +    apr_pool_create(&my_state.pool, tpool);
> +    if ((rv = apr_thread_mutex_create(&my_state.mutex,
> +                                      APR_THREAD_MUTEX_DEFAULT, tpool))
> +        != APR_SUCCESS) {
> +        ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
> +                     "apr_thread_mutex_create");
> +    }
> +    if ((rv = apr_thread_cond_create(&my_state.cond, tpool))
> +        != APR_SUCCESS) {
> +        ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
> +                     "apr_thread_cond_create");
> +    }
> +
> +    if ((rv = apr_thread_mutex_lock(my_state.mutex)) != APR_SUCCESS) {
> +        ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
> +                     "apr_thread_mutex_lock");
> +    }
> +
> +    while (1) {
> +        my_state.sd = NULL;
> +        my_state.state = WORKER_ELEM_IDLE;
> +
>          ap_update_child_status(process_slot, thread_slot, SERVER_READY, NULL);
> -        rv = ap_queue_pop(worker_queue, &csd, &ptrans);
> -        /* We get FD_QUEUE_EINTR whenever ap_queue_pop() has been interrupted
> -         * from an explicit call to ap_queue_interrupt_all(). This allows
> -         * us to unblock threads stuck in ap_queue_pop() when a shutdown
> -         * is pending. */
> -        if (rv == FD_QUEUE_EINTR || !csd) {
> -            continue;
> -        }
> -        process_socket(ptrans, csd, process_slot, thread_slot);
> -        requests_this_child--; /* FIXME: should be synchronized - aaron */
> -        apr_pool_destroy(ptrans);
> +
> +        /* Make ourselves available as a connection-processing worker. */
> +        if ((rv = ap_queue_push(worker_queue, &my_state)) != APR_SUCCESS) {
> +            ap_log_error(APLOG_MARK, APLOG_EMERG, 0, ap_server_conf,
> +                         "ap_stack_push failed with error code %d", rv);
> +        }
> +
> +        /* Because of the way this is architected, we will always have
> +         * a context switch here. It would be neat if we could come up
> +         * with a good way to avoid the call to cond_wait. -aaron
> +         */
> +        while (my_state.state == WORKER_ELEM_IDLE) {
> +            apr_thread_cond_wait(my_state.cond, my_state.mutex);
> +        }
> +        /* Did someone wake us up to notice that it is time to exit? */
> +        if (my_state.state == WORKER_ELEM_QUIT) {
> +            break;
> +        }
> +        else if (my_state.sd != NULL) {
> +            process_socket(my_state.pool, my_state.sd,
> +                           process_slot, thread_slot);
> +            requests_this_child--; /* FIXME: should be synchronized -aaron */
> +        }
> +        apr_pool_clear(my_state.pool);
>      }
>  
>      ap_update_child_status(process_slot, thread_slot,
> -        (dying) ? SERVER_DEAD : SERVER_GRACEFUL, (request_rec *) NULL);
> +        (dying) ? SERVER_DEAD : SERVER_GRACEFUL, (request_rec *)NULL);
> +
> +    if ((rv = apr_thread_cond_destroy(my_state.cond)) != APR_SUCCESS) {
> +        ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
> +                     "apr_thread_cond_destroy");
> +    }
> +    if ((rv = apr_thread_mutex_destroy(my_state.mutex)) != APR_SUCCESS) {
> +        ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
> +                     "apr_thread_mutex_destroy");
> +    }
> +    apr_pool_destroy(my_state.pool);
> +
>      apr_thread_mutex_lock(worker_thread_count_mutex);
>      worker_thread_count--;
>      apr_thread_mutex_unlock(worker_thread_count_mutex);
> Index: server/mpm/worker/fdqueue.c
> ===================================================================
> RCS file: /home/cvs/httpd-2.0/server/mpm/worker/fdqueue.c,v
> retrieving revision 1.9
> diff -u -r1.9 fdqueue.c
> --- server/mpm/worker/fdqueue.c	2001/10/17 16:29:36	1.9
> +++ server/mpm/worker/fdqueue.c	2001/11/23 22:32:47
> @@ -82,7 +82,6 @@
>       * XXX: We should at least try to signal an error here, it is
>       * indicative of a programmer error. -aaron */
>      apr_thread_cond_destroy(queue->not_empty);
> -    apr_thread_cond_destroy(queue->not_full);
>      apr_thread_mutex_destroy(queue->one_big_mutex);
>  
>      return FD_QUEUE_SUCCESS;
> @@ -93,26 +92,21 @@
>   */
>  int ap_queue_init(fd_queue_t *queue, int queue_capacity, apr_pool_t *a) 
>  {
> -    int i;
> -
>      /* FIXME: APRize these return values. */
>      if (apr_thread_mutex_create(&queue->one_big_mutex,
> -                              APR_THREAD_MUTEX_DEFAULT, a) != APR_SUCCESS)
> -        return FD_QUEUE_FAILURE;
> -    if (apr_thread_cond_create(&queue->not_empty, a) != APR_SUCCESS)
> +                                APR_THREAD_MUTEX_DEFAULT, a) != APR_SUCCESS) {
>          return FD_QUEUE_FAILURE;
> -    if (apr_thread_cond_create(&queue->not_full, a) != APR_SUCCESS)
> +    }
> +    if (apr_thread_cond_create(&queue->not_empty, a) != APR_SUCCESS) {
>          return FD_QUEUE_FAILURE;
> +    }
>  
>      queue->tail = 0;
> -    queue->data = apr_palloc(a, queue_capacity * sizeof(fd_queue_elem_t));
> +    queue->data = apr_palloc(a, queue_capacity * sizeof(void*));
>      queue->bounds = queue_capacity;
> -
> -    /* Set all the sockets in the queue to NULL */
> -    for (i = 0; i < queue_capacity; ++i)
> -        queue->data[i].sd = NULL;
>  
> -    apr_pool_cleanup_register(a, queue, ap_queue_destroy, apr_pool_cleanup_null);
> +    apr_pool_cleanup_register(a, queue, ap_queue_destroy,
> +                              apr_pool_cleanup_null);
>  
>      return FD_QUEUE_SUCCESS;
>  }
> @@ -122,23 +116,29 @@
>   * the push operation has completed, it signals other threads waiting
>   * in apr_queue_pop() that they may continue consuming sockets.
>   */
> -int ap_queue_push(fd_queue_t *queue, apr_socket_t *sd, apr_pool_t *p) 
> +int ap_queue_push(fd_queue_t *queue, void *e)
>  {
> -    fd_queue_elem_t *elem;
> -
>      if (apr_thread_mutex_lock(queue->one_big_mutex) != APR_SUCCESS) {
>          return FD_QUEUE_FAILURE;
>      }
>  
> -    while (ap_queue_full(queue)) {
> -        apr_thread_cond_wait(queue->not_full, queue->one_big_mutex);
> +    /* If they push too many, they didn't allocate enough slots
> +     * in the stack, and we treat that as fatal. */
> +    if (ap_queue_full(queue)) {
> +        if (apr_thread_mutex_unlock(queue->one_big_mutex) != APR_SUCCESS) {
> +            return FD_QUEUE_FAILURE;
> +        }
> +        return FD_QUEUE_OVERFLOW;
>      }
>  
> -    elem = &queue->data[queue->tail++];
> -    elem->sd = sd;
> -    elem->p = p;
> +    queue->data[queue->tail++] = e;
>  
> -    apr_thread_cond_signal(queue->not_empty);
> +    /* Only perform the overhead of signaling if we were empty before
> +     * inserting this element.
> +     */
> +    if (1 == queue->tail) {
> +        apr_thread_cond_signal(queue->not_empty);
> +    }
>  
>      if (apr_thread_mutex_unlock(queue->one_big_mutex) != APR_SUCCESS) {
>          return FD_QUEUE_FAILURE;
> @@ -153,16 +153,14 @@
>   * Once retrieved, the socket is placed into the address specified by
>   * 'sd'.
>   */
> -apr_status_t ap_queue_pop(fd_queue_t *queue, apr_socket_t **sd, apr_pool_t **p) 
> +apr_status_t ap_queue_pop(fd_queue_t *queue, void **e)
>  {
> -    fd_queue_elem_t *elem;
> -
>      if (apr_thread_mutex_lock(queue->one_big_mutex) != APR_SUCCESS) {
>          return FD_QUEUE_FAILURE;
>      }
>  
>      /* Keep waiting until we wake up and find that the queue is not empty. */
> -    if (ap_queue_empty(queue)) {
> +    while (ap_queue_empty(queue)) {
>          apr_thread_cond_wait(queue->not_empty, queue->one_big_mutex);
>          /* If we wake up and it's still empty, then we were interrupted */
>          if (ap_queue_empty(queue)) {
> @@ -173,16 +171,7 @@
>          }
>      } 
>      
> -    elem = &queue->data[--queue->tail];
> -    *sd = elem->sd;
> -    *p = elem->p;
> -    elem->sd = NULL;
> -    elem->p = NULL;
> -
> -    /* signal not_full if we were full before this pop */
> -    if (queue->tail == queue->bounds - 1) {
> -        apr_thread_cond_signal(queue->not_full);
> -    }
> +    *e = queue->data[--queue->tail];
>  
>      if (apr_thread_mutex_unlock(queue->one_big_mutex) != APR_SUCCESS) {
>          return FD_QUEUE_FAILURE;
> @@ -197,9 +186,6 @@
>          return FD_QUEUE_FAILURE;
>      }
>      apr_thread_cond_broadcast(queue->not_empty);
> -    /* We shouldn't have multiple threads sitting in not_full, but
> -     * broadcast just in case. */
> -    apr_thread_cond_broadcast(queue->not_full);
>      if (apr_thread_mutex_unlock(queue->one_big_mutex) != APR_SUCCESS) {
>          return FD_QUEUE_FAILURE;
>      }
> Index: server/mpm/worker/fdqueue.h
> ===================================================================
> RCS file: /home/cvs/httpd-2.0/server/mpm/worker/fdqueue.h,v
> retrieving revision 1.9
> diff -u -r1.9 fdqueue.h
> --- server/mpm/worker/fdqueue.h	2001/10/17 16:29:37	1.9
> +++ server/mpm/worker/fdqueue.h	2001/11/23 22:32:47
> @@ -73,29 +73,21 @@
>  #define FD_QUEUE_FAILURE -1 /* Needs to be an invalid file descriptor because
>                                 of queue_pop semantics */
>  #define FD_QUEUE_EINTR APR_EINTR
> +#define FD_QUEUE_OVERFLOW -2
>  
> -struct fd_queue_elem_t {
> -    apr_socket_t      *sd;
> -    apr_pool_t        *p;
> -};
> -typedef struct fd_queue_elem_t fd_queue_elem_t;
> -
>  struct fd_queue_t {
>      int                 tail;
> -    fd_queue_elem_t    *data;
> +    void              **data;
>      int                 bounds;
> -    int                 blanks;
>      apr_thread_mutex_t *one_big_mutex;
>      apr_thread_cond_t  *not_empty;
> -    apr_thread_cond_t  *not_full;
> -    int                 cancel_state;
>  };
>  typedef struct fd_queue_t fd_queue_t;
>  
>  /* FIXME: APRize these -- return values should be apr_status_t */
>  int ap_queue_init(fd_queue_t *queue, int queue_capacity, apr_pool_t *a);
> -int ap_queue_push(fd_queue_t *queue, apr_socket_t *sd, apr_pool_t *p);
> -apr_status_t ap_queue_pop(fd_queue_t *queue, apr_socket_t **sd, apr_pool_t **p);
> -apr_status_t ap_queue_interrupt_all(fd_queue_t *queue);
> +int ap_queue_push(fd_queue_t *queue, void *e);
> +int ap_queue_pop(fd_queue_t *queue, void **e);
> +int ap_queue_interrupt_all(fd_queue_t *queue);
>  
>  #endif /* FDQUEUE_H */
-- 
Ian Holsman          IanH@cnet.com
Performance Measurement & Analysis
CNET Networks   -   (415) 344-2608