You are viewing a plain text version of this content. The canonical link for it is here.
Posted to cvs@httpd.apache.org by yl...@apache.org on 2021/02/06 12:17:40 UTC

svn commit: r1886255 - in /httpd/httpd/trunk/modules/http2: h2_workers.c h2_workers.h

Author: ylavic
Date: Sat Feb  6 12:17:40 2021
New Revision: 1886255

URL: http://svn.apache.org/viewvc?rev=1886255&view=rev
Log:
mod_http2: Fix workers synchronization on pchild cleanup.

When the MPM child exits and pre-workers_pool_cleanup() is called, all the
workers are are necessarily in their idle critical section, thus aborting slots
in the ->idle list only may leave worker threads alive, later blocked in the
idle critical section with no one to wake them.

Instead of the per-slot ->aborted flag, workers_pool_cleanup() will now set
workers->aborted "globally" such that slot_run() does not wait to be woken up
from idle in this case, and all workers really exit.

Also, for workers_pool_cleanup() to wait for all the workers to reach the
->zombies list before returning, a new ->all_done condition variable is armed
when the last thread exits. Since this depends on the atomic ->worker_count to
reach zero, for accuracy the increment in activate_slot() is moved before the
thread startup.


* modules/http2/h2_workers.h (struct h2_workers): volatilize ->aborted and
  add the ->all_done condition variable.

* modules/http2/h2_workers.c (push_slot, pop_slot): volatilize the h2_slot*
  being cas-ed.

* modules/http2/h2_workers.c (cleanup_zombies): rename to join_zombies(), and
  move ->worker_count atomic inc to slot_done().

* modules/http2/h2_workers.c (get_next): when workers->aborted, leave and don't
  wait for ->not_idle. Return an int/bool since it's gotten / not gotten.

* modules/http2/h2_workers.c (slot_done): signal ->all_done when the last
  worker and the MPM child are exiting.

* modules/http2/h2_workers.c (slot_run): rework the loops now that get_next()
  is the stop signal.

* modules/http2/h2_workers.c (workers_pool_cleanup): wait for ->all_done when
  needed, and remove the !workers->aborted condition since the cleanup will
  only be called once.

* modules/http2/h2_workers.c (activate_slot): move ->worker_count atomic inc
  before the thread creation and handle failure rollback.

github: closes #169

Modified:
    httpd/httpd/trunk/modules/http2/h2_workers.c
    httpd/httpd/trunk/modules/http2/h2_workers.h

Modified: httpd/httpd/trunk/modules/http2/h2_workers.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_workers.c?rev=1886255&r1=1886254&r2=1886255&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_workers.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_workers.c Sat Feb  6 12:17:40 2021
@@ -34,17 +34,16 @@
 typedef struct h2_slot h2_slot;
 struct h2_slot {
     int id;
+    int sticks;
     h2_slot *next;
     h2_workers *workers;
-    int aborted;
-    int sticks;
     h2_task *task;
     apr_thread_t *thread;
     apr_thread_mutex_t *lock;
     apr_thread_cond_t *not_idle;
 };
 
-static h2_slot *pop_slot(h2_slot **phead) 
+static h2_slot *pop_slot(h2_slot *volatile *phead) 
 {
     /* Atomically pop a slot from the list */
     for (;;) {
@@ -59,7 +58,7 @@ static h2_slot *pop_slot(h2_slot **phead
     }
 }
 
-static void push_slot(h2_slot **phead, h2_slot *slot)
+static void push_slot(h2_slot *volatile *phead, h2_slot *slot)
 {
     /* Atomically push a slot to the list */
     ap_assert(!slot->next);
@@ -78,7 +77,6 @@ static apr_status_t activate_slot(h2_wor
     apr_status_t status;
     
     slot->workers = workers;
-    slot->aborted = 0;
     slot->task = NULL;
 
     if (!slot->lock) {
@@ -101,16 +99,18 @@ static apr_status_t activate_slot(h2_wor
     
     ap_log_error(APLOG_MARK, APLOG_TRACE2, 0, workers->s,
                  "h2_workers: new thread for slot %d", slot->id); 
+
     /* thread will either immediately start work or add itself
      * to the idle queue */
-    apr_thread_create(&slot->thread, workers->thread_attr, slot_run, slot, 
-                      workers->pool);
-    if (!slot->thread) {
+    apr_atomic_inc32(&workers->worker_count);
+    status = apr_thread_create(&slot->thread, workers->thread_attr,
+                               slot_run, slot, workers->pool);
+    if (status != APR_SUCCESS) {
+        apr_atomic_dec32(&workers->worker_count);
         push_slot(&workers->free, slot);
-        return APR_ENOMEM;
+        return status;
     }
     
-    apr_atomic_inc32(&workers->worker_count);
     return APR_SUCCESS;
 }
 
@@ -136,17 +136,15 @@ static void wake_idle_worker(h2_workers
     }
 }
 
-static void cleanup_zombies(h2_workers *workers)
+static void join_zombies(h2_workers *workers)
 {
     h2_slot *slot;
     while ((slot = pop_slot(&workers->zombies))) {
-        if (slot->thread) {
-            apr_status_t status;
-            apr_thread_join(&status, slot->thread);
-            slot->thread = NULL;
-        }
-        apr_atomic_dec32(&workers->worker_count);
-        slot->next = NULL;
+        apr_status_t status;
+        ap_assert(slot->thread != NULL);
+        apr_thread_join(&status, slot->thread);
+        slot->thread = NULL;
+
         push_slot(&workers->free, slot);
     }
 }
@@ -184,37 +182,49 @@ static h2_fifo_op_t mplx_peek(void *head
  * Get the next task for the given worker. Will block until a task arrives
  * or the max_wait timer expires and more than min workers exist.
  */
-static apr_status_t get_next(h2_slot *slot)
+static int get_next(h2_slot *slot)
 {
     h2_workers *workers = slot->workers;
-    apr_status_t status;
-    
-    slot->task = NULL;
-    while (!slot->aborted) {
-        if (!slot->task) {
-            status = h2_fifo_try_peek(workers->mplxs, mplx_peek, slot);
-            if (status == APR_EOF) {
-                return status;
-            }
+
+    while (!workers->aborted) {
+        ap_assert(slot->task == NULL);
+        if (h2_fifo_try_peek(workers->mplxs, mplx_peek, slot) == APR_EOF) {
+            /* The queue is terminated with the MPM child being cleaned up,
+             * just leave.
+             */
+            break;
         }
-        
         if (slot->task) {
-            return APR_SUCCESS;
+            return 1;
         }
         
-        cleanup_zombies(workers);
+        join_zombies(workers);
 
         apr_thread_mutex_lock(slot->lock);
-        push_slot(&workers->idle, slot);
-        apr_thread_cond_wait(slot->not_idle, slot->lock);
+        if (!workers->aborted) {
+            push_slot(&workers->idle, slot);
+            apr_thread_cond_wait(slot->not_idle, slot->lock);
+        }
         apr_thread_mutex_unlock(slot->lock);
     }
-    return APR_EOF;
+
+    return 0;
 }
 
 static void slot_done(h2_slot *slot)
 {
-    push_slot(&(slot->workers->zombies), slot);
+    h2_workers *workers = slot->workers;
+
+    push_slot(&workers->zombies, slot);
+
+    /* If this worker is the last one exiting and the MPM child is stopping,
+     * unblock workers_pool_cleanup().
+     */
+    if (!apr_atomic_dec32(&workers->worker_count) && workers->aborted) {
+        apr_thread_mutex_lock(workers->lock);
+        apr_thread_cond_signal(workers->all_done);
+        apr_thread_mutex_unlock(workers->lock);
+    }
 }
 
 
@@ -222,25 +232,23 @@ static void* APR_THREAD_FUNC slot_run(ap
 {
     h2_slot *slot = wctx;
     
-    while (!slot->aborted) {
-
-        /* Get a h2_task from the mplxs queue. */
-        get_next(slot);
-        while (slot->task) {
-        
+    /* Get the h2_task(s) from the ->mplxs queue. */
+    while (get_next(slot)) {
+        ap_assert(slot->task != NULL);
+        do {
             h2_task_do(slot->task, thread, slot->id);
             
             /* Report the task as done. If stickyness is left, offer the
              * mplx the opportunity to give us back a new task right away.
              */
-            if (!slot->aborted && (--slot->sticks > 0)) {
+            if (!slot->workers->aborted && --slot->sticks > 0) {
                 h2_mplx_s_task_done(slot->task->mplx, slot->task, &slot->task);
             }
             else {
                 h2_mplx_s_task_done(slot->task->mplx, slot->task, NULL);
                 slot->task = NULL;
             }
-        }
+        } while (slot->task);
     }
 
     slot_done(slot);
@@ -254,26 +262,24 @@ static apr_status_t workers_pool_cleanup
     h2_workers *workers = data;
     h2_slot *slot;
     
-    if (!workers->aborted) {
-        workers->aborted = 1;
-        /* abort all idle slots */
-        for (;;) {
-            slot = pop_slot(&workers->idle);
-            if (slot) {
-                apr_thread_mutex_lock(slot->lock);
-                slot->aborted = 1;
-                apr_thread_cond_signal(slot->not_idle);
-                apr_thread_mutex_unlock(slot->lock);
-            }
-            else {
-                break;
-            }
-        }
+    workers->aborted = 1;
+    h2_fifo_term(workers->mplxs);
 
-        h2_fifo_term(workers->mplxs);
+    /* abort all idle slots */
+    while ((slot = pop_slot(&workers->idle))) {
+        apr_thread_mutex_lock(slot->lock);
+        apr_thread_cond_signal(slot->not_idle);
+        apr_thread_mutex_unlock(slot->lock);
+    }
 
-        cleanup_zombies(workers);
+    /* wait for all the workers to become zombies and join them */
+    apr_thread_mutex_lock(workers->lock);
+    if (apr_atomic_read32(&workers->worker_count)) {
+        apr_thread_cond_wait(workers->all_done, workers->lock);
     }
+    apr_thread_mutex_unlock(workers->lock);
+    join_zombies(workers);
+
     return APR_SUCCESS;
 }
 
@@ -340,6 +346,9 @@ h2_workers *h2_workers_create(server_rec
                                      APR_THREAD_MUTEX_DEFAULT,
                                      workers->pool);
     if (status == APR_SUCCESS) {        
+        status = apr_thread_cond_create(&workers->all_done, workers->pool);
+    }
+    if (status == APR_SUCCESS) {        
         n = workers->nslots = workers->max_workers;
         workers->slots = apr_pcalloc(workers->pool, n * sizeof(h2_slot));
         if (workers->slots == NULL) {

Modified: httpd/httpd/trunk/modules/http2/h2_workers.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_workers.h?rev=1886255&r1=1886254&r2=1886255&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_workers.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_workers.h Sat Feb  6 12:17:40 2021
@@ -42,7 +42,7 @@ struct h2_workers {
     apr_uint32_t max_workers;
     int max_idle_secs;
     
-    int aborted;
+    volatile int aborted;
     int dynamic;
 
     apr_threadattr_t *thread_attr;
@@ -58,6 +58,7 @@ struct h2_workers {
     struct h2_fifo *mplxs;
     
     struct apr_thread_mutex_t *lock;
+    struct apr_thread_cond_t *all_done;
 };
 
 



Re: svn commit: r1886255 - in /httpd/httpd/trunk/modules/http2: h2_workers.c h2_workers.h

Posted by st...@eissing.org.
Nice work, Yann!

> Am 06.02.2021 um 13:17 schrieb ylavic@apache.org:
> 
> Author: ylavic
> Date: Sat Feb  6 12:17:40 2021
> New Revision: 1886255
> 
> URL: http://svn.apache.org/viewvc?rev=1886255&view=rev
> Log:
> mod_http2: Fix workers synchronization on pchild cleanup.
> 
> When the MPM child exits and pre-workers_pool_cleanup() is called, all the
> workers are are necessarily in their idle critical section, thus aborting slots
> in the ->idle list only may leave worker threads alive, later blocked in the
> idle critical section with no one to wake them.
> 
> Instead of the per-slot ->aborted flag, workers_pool_cleanup() will now set
> workers->aborted "globally" such that slot_run() does not wait to be woken up
> from idle in this case, and all workers really exit.
> 
> Also, for workers_pool_cleanup() to wait for all the workers to reach the
> ->zombies list before returning, a new ->all_done condition variable is armed
> when the last thread exits. Since this depends on the atomic ->worker_count to
> reach zero, for accuracy the increment in activate_slot() is moved before the
> thread startup.
> 
> 
> * modules/http2/h2_workers.h (struct h2_workers): volatilize ->aborted and
>  add the ->all_done condition variable.
> 
> * modules/http2/h2_workers.c (push_slot, pop_slot): volatilize the h2_slot*
>  being cas-ed.
> 
> * modules/http2/h2_workers.c (cleanup_zombies): rename to join_zombies(), and
>  move ->worker_count atomic inc to slot_done().
> 
> * modules/http2/h2_workers.c (get_next): when workers->aborted, leave and don't
>  wait for ->not_idle. Return an int/bool since it's gotten / not gotten.
> 
> * modules/http2/h2_workers.c (slot_done): signal ->all_done when the last
>  worker and the MPM child are exiting.
> 
> * modules/http2/h2_workers.c (slot_run): rework the loops now that get_next()
>  is the stop signal.
> 
> * modules/http2/h2_workers.c (workers_pool_cleanup): wait for ->all_done when
>  needed, and remove the !workers->aborted condition since the cleanup will
>  only be called once.
> 
> * modules/http2/h2_workers.c (activate_slot): move ->worker_count atomic inc
>  before the thread creation and handle failure rollback.
> 
> github: closes #169
> 
> Modified:
>    httpd/httpd/trunk/modules/http2/h2_workers.c
>    httpd/httpd/trunk/modules/http2/h2_workers.h
> 
> Modified: httpd/httpd/trunk/modules/http2/h2_workers.c
> URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_workers.c?rev=1886255&r1=1886254&r2=1886255&view=diff
> ==============================================================================
> --- httpd/httpd/trunk/modules/http2/h2_workers.c (original)
> +++ httpd/httpd/trunk/modules/http2/h2_workers.c Sat Feb  6 12:17:40 2021
> @@ -34,17 +34,16 @@
> typedef struct h2_slot h2_slot;
> struct h2_slot {
>     int id;
> +    int sticks;
>     h2_slot *next;
>     h2_workers *workers;
> -    int aborted;
> -    int sticks;
>     h2_task *task;
>     apr_thread_t *thread;
>     apr_thread_mutex_t *lock;
>     apr_thread_cond_t *not_idle;
> };
> 
> -static h2_slot *pop_slot(h2_slot **phead) 
> +static h2_slot *pop_slot(h2_slot *volatile *phead) 
> {
>     /* Atomically pop a slot from the list */
>     for (;;) {
> @@ -59,7 +58,7 @@ static h2_slot *pop_slot(h2_slot **phead
>     }
> }
> 
> -static void push_slot(h2_slot **phead, h2_slot *slot)
> +static void push_slot(h2_slot *volatile *phead, h2_slot *slot)
> {
>     /* Atomically push a slot to the list */
>     ap_assert(!slot->next);
> @@ -78,7 +77,6 @@ static apr_status_t activate_slot(h2_wor
>     apr_status_t status;
> 
>     slot->workers = workers;
> -    slot->aborted = 0;
>     slot->task = NULL;
> 
>     if (!slot->lock) {
> @@ -101,16 +99,18 @@ static apr_status_t activate_slot(h2_wor
> 
>     ap_log_error(APLOG_MARK, APLOG_TRACE2, 0, workers->s,
>                  "h2_workers: new thread for slot %d", slot->id); 
> +
>     /* thread will either immediately start work or add itself
>      * to the idle queue */
> -    apr_thread_create(&slot->thread, workers->thread_attr, slot_run, slot, 
> -                      workers->pool);
> -    if (!slot->thread) {
> +    apr_atomic_inc32(&workers->worker_count);
> +    status = apr_thread_create(&slot->thread, workers->thread_attr,
> +                               slot_run, slot, workers->pool);
> +    if (status != APR_SUCCESS) {
> +        apr_atomic_dec32(&workers->worker_count);
>         push_slot(&workers->free, slot);
> -        return APR_ENOMEM;
> +        return status;
>     }
> 
> -    apr_atomic_inc32(&workers->worker_count);
>     return APR_SUCCESS;
> }
> 
> @@ -136,17 +136,15 @@ static void wake_idle_worker(h2_workers
>     }
> }
> 
> -static void cleanup_zombies(h2_workers *workers)
> +static void join_zombies(h2_workers *workers)
> {
>     h2_slot *slot;
>     while ((slot = pop_slot(&workers->zombies))) {
> -        if (slot->thread) {
> -            apr_status_t status;
> -            apr_thread_join(&status, slot->thread);
> -            slot->thread = NULL;
> -        }
> -        apr_atomic_dec32(&workers->worker_count);
> -        slot->next = NULL;
> +        apr_status_t status;
> +        ap_assert(slot->thread != NULL);
> +        apr_thread_join(&status, slot->thread);
> +        slot->thread = NULL;
> +
>         push_slot(&workers->free, slot);
>     }
> }
> @@ -184,37 +182,49 @@ static h2_fifo_op_t mplx_peek(void *head
>  * Get the next task for the given worker. Will block until a task arrives
>  * or the max_wait timer expires and more than min workers exist.
>  */
> -static apr_status_t get_next(h2_slot *slot)
> +static int get_next(h2_slot *slot)
> {
>     h2_workers *workers = slot->workers;
> -    apr_status_t status;
> -    
> -    slot->task = NULL;
> -    while (!slot->aborted) {
> -        if (!slot->task) {
> -            status = h2_fifo_try_peek(workers->mplxs, mplx_peek, slot);
> -            if (status == APR_EOF) {
> -                return status;
> -            }
> +
> +    while (!workers->aborted) {
> +        ap_assert(slot->task == NULL);
> +        if (h2_fifo_try_peek(workers->mplxs, mplx_peek, slot) == APR_EOF) {
> +            /* The queue is terminated with the MPM child being cleaned up,
> +             * just leave.
> +             */
> +            break;
>         }
> -        
>         if (slot->task) {
> -            return APR_SUCCESS;
> +            return 1;
>         }
> 
> -        cleanup_zombies(workers);
> +        join_zombies(workers);
> 
>         apr_thread_mutex_lock(slot->lock);
> -        push_slot(&workers->idle, slot);
> -        apr_thread_cond_wait(slot->not_idle, slot->lock);
> +        if (!workers->aborted) {
> +            push_slot(&workers->idle, slot);
> +            apr_thread_cond_wait(slot->not_idle, slot->lock);
> +        }
>         apr_thread_mutex_unlock(slot->lock);
>     }
> -    return APR_EOF;
> +
> +    return 0;
> }
> 
> static void slot_done(h2_slot *slot)
> {
> -    push_slot(&(slot->workers->zombies), slot);
> +    h2_workers *workers = slot->workers;
> +
> +    push_slot(&workers->zombies, slot);
> +
> +    /* If this worker is the last one exiting and the MPM child is stopping,
> +     * unblock workers_pool_cleanup().
> +     */
> +    if (!apr_atomic_dec32(&workers->worker_count) && workers->aborted) {
> +        apr_thread_mutex_lock(workers->lock);
> +        apr_thread_cond_signal(workers->all_done);
> +        apr_thread_mutex_unlock(workers->lock);
> +    }
> }
> 
> 
> @@ -222,25 +232,23 @@ static void* APR_THREAD_FUNC slot_run(ap
> {
>     h2_slot *slot = wctx;
> 
> -    while (!slot->aborted) {
> -
> -        /* Get a h2_task from the mplxs queue. */
> -        get_next(slot);
> -        while (slot->task) {
> -        
> +    /* Get the h2_task(s) from the ->mplxs queue. */
> +    while (get_next(slot)) {
> +        ap_assert(slot->task != NULL);
> +        do {
>             h2_task_do(slot->task, thread, slot->id);
> 
>             /* Report the task as done. If stickyness is left, offer the
>              * mplx the opportunity to give us back a new task right away.
>              */
> -            if (!slot->aborted && (--slot->sticks > 0)) {
> +            if (!slot->workers->aborted && --slot->sticks > 0) {
>                 h2_mplx_s_task_done(slot->task->mplx, slot->task, &slot->task);
>             }
>             else {
>                 h2_mplx_s_task_done(slot->task->mplx, slot->task, NULL);
>                 slot->task = NULL;
>             }
> -        }
> +        } while (slot->task);
>     }
> 
>     slot_done(slot);
> @@ -254,26 +262,24 @@ static apr_status_t workers_pool_cleanup
>     h2_workers *workers = data;
>     h2_slot *slot;
> 
> -    if (!workers->aborted) {
> -        workers->aborted = 1;
> -        /* abort all idle slots */
> -        for (;;) {
> -            slot = pop_slot(&workers->idle);
> -            if (slot) {
> -                apr_thread_mutex_lock(slot->lock);
> -                slot->aborted = 1;
> -                apr_thread_cond_signal(slot->not_idle);
> -                apr_thread_mutex_unlock(slot->lock);
> -            }
> -            else {
> -                break;
> -            }
> -        }
> +    workers->aborted = 1;
> +    h2_fifo_term(workers->mplxs);
> 
> -        h2_fifo_term(workers->mplxs);
> +    /* abort all idle slots */
> +    while ((slot = pop_slot(&workers->idle))) {
> +        apr_thread_mutex_lock(slot->lock);
> +        apr_thread_cond_signal(slot->not_idle);
> +        apr_thread_mutex_unlock(slot->lock);
> +    }
> 
> -        cleanup_zombies(workers);
> +    /* wait for all the workers to become zombies and join them */
> +    apr_thread_mutex_lock(workers->lock);
> +    if (apr_atomic_read32(&workers->worker_count)) {
> +        apr_thread_cond_wait(workers->all_done, workers->lock);
>     }
> +    apr_thread_mutex_unlock(workers->lock);
> +    join_zombies(workers);
> +
>     return APR_SUCCESS;
> }
> 
> @@ -340,6 +346,9 @@ h2_workers *h2_workers_create(server_rec
>                                      APR_THREAD_MUTEX_DEFAULT,
>                                      workers->pool);
>     if (status == APR_SUCCESS) {        
> +        status = apr_thread_cond_create(&workers->all_done, workers->pool);
> +    }
> +    if (status == APR_SUCCESS) {        
>         n = workers->nslots = workers->max_workers;
>         workers->slots = apr_pcalloc(workers->pool, n * sizeof(h2_slot));
>         if (workers->slots == NULL) {
> 
> Modified: httpd/httpd/trunk/modules/http2/h2_workers.h
> URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_workers.h?rev=1886255&r1=1886254&r2=1886255&view=diff
> ==============================================================================
> --- httpd/httpd/trunk/modules/http2/h2_workers.h (original)
> +++ httpd/httpd/trunk/modules/http2/h2_workers.h Sat Feb  6 12:17:40 2021
> @@ -42,7 +42,7 @@ struct h2_workers {
>     apr_uint32_t max_workers;
>     int max_idle_secs;
> 
> -    int aborted;
> +    volatile int aborted;
>     int dynamic;
> 
>     apr_threadattr_t *thread_attr;
> @@ -58,6 +58,7 @@ struct h2_workers {
>     struct h2_fifo *mplxs;
> 
>     struct apr_thread_mutex_t *lock;
> +    struct apr_thread_cond_t *all_done;
> };
> 
> 
> 
>