You are viewing a plain text version of this content. The canonical link for it is here.
Posted to cvs@httpd.apache.org by yl...@apache.org on 2021/02/06 12:17:40 UTC
svn commit: r1886255 - in /httpd/httpd/trunk/modules/http2: h2_workers.c
h2_workers.h
Author: ylavic
Date: Sat Feb 6 12:17:40 2021
New Revision: 1886255
URL: http://svn.apache.org/viewvc?rev=1886255&view=rev
Log:
mod_http2: Fix workers synchronization on pchild cleanup.
When the MPM child exits and pre-workers_pool_cleanup() is called, all the
workers are are necessarily in their idle critical section, thus aborting slots
in the ->idle list only may leave worker threads alive, later blocked in the
idle critical section with no one to wake them.
Instead of the per-slot ->aborted flag, workers_pool_cleanup() will now set
workers->aborted "globally" such that slot_run() does not wait to be woken up
from idle in this case, and all workers really exit.
Also, for workers_pool_cleanup() to wait for all the workers to reach the
->zombies list before returning, a new ->all_done condition variable is armed
when the last thread exits. Since this depends on the atomic ->worker_count to
reach zero, for accuracy the increment in activate_slot() is moved before the
thread startup.
* modules/http2/h2_workers.h (struct h2_workers): volatilize ->aborted and
add the ->all_done condition variable.
* modules/http2/h2_workers.c (push_slot, pop_slot): volatilize the h2_slot*
being cas-ed.
* modules/http2/h2_workers.c (cleanup_zombies): rename to join_zombies(), and
move ->worker_count atomic inc to slot_done().
* modules/http2/h2_workers.c (get_next): when workers->aborted, leave and don't
wait for ->not_idle. Return an int/bool since it's gotten / not gotten.
* modules/http2/h2_workers.c (slot_done): signal ->all_done when the last
worker and the MPM child are exiting.
* modules/http2/h2_workers.c (slot_run): rework the loops now that get_next()
is the stop signal.
* modules/http2/h2_workers.c (workers_pool_cleanup): wait for ->all_done when
needed, and remove the !workers->aborted condition since the cleanup will
only be called once.
* modules/http2/h2_workers.c (activate_slot): move ->worker_count atomic inc
before the thread creation and handle failure rollback.
github: closes #169
Modified:
httpd/httpd/trunk/modules/http2/h2_workers.c
httpd/httpd/trunk/modules/http2/h2_workers.h
Modified: httpd/httpd/trunk/modules/http2/h2_workers.c
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_workers.c?rev=1886255&r1=1886254&r2=1886255&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_workers.c (original)
+++ httpd/httpd/trunk/modules/http2/h2_workers.c Sat Feb 6 12:17:40 2021
@@ -34,17 +34,16 @@
typedef struct h2_slot h2_slot;
struct h2_slot {
int id;
+ int sticks;
h2_slot *next;
h2_workers *workers;
- int aborted;
- int sticks;
h2_task *task;
apr_thread_t *thread;
apr_thread_mutex_t *lock;
apr_thread_cond_t *not_idle;
};
-static h2_slot *pop_slot(h2_slot **phead)
+static h2_slot *pop_slot(h2_slot *volatile *phead)
{
/* Atomically pop a slot from the list */
for (;;) {
@@ -59,7 +58,7 @@ static h2_slot *pop_slot(h2_slot **phead
}
}
-static void push_slot(h2_slot **phead, h2_slot *slot)
+static void push_slot(h2_slot *volatile *phead, h2_slot *slot)
{
/* Atomically push a slot to the list */
ap_assert(!slot->next);
@@ -78,7 +77,6 @@ static apr_status_t activate_slot(h2_wor
apr_status_t status;
slot->workers = workers;
- slot->aborted = 0;
slot->task = NULL;
if (!slot->lock) {
@@ -101,16 +99,18 @@ static apr_status_t activate_slot(h2_wor
ap_log_error(APLOG_MARK, APLOG_TRACE2, 0, workers->s,
"h2_workers: new thread for slot %d", slot->id);
+
/* thread will either immediately start work or add itself
* to the idle queue */
- apr_thread_create(&slot->thread, workers->thread_attr, slot_run, slot,
- workers->pool);
- if (!slot->thread) {
+ apr_atomic_inc32(&workers->worker_count);
+ status = apr_thread_create(&slot->thread, workers->thread_attr,
+ slot_run, slot, workers->pool);
+ if (status != APR_SUCCESS) {
+ apr_atomic_dec32(&workers->worker_count);
push_slot(&workers->free, slot);
- return APR_ENOMEM;
+ return status;
}
- apr_atomic_inc32(&workers->worker_count);
return APR_SUCCESS;
}
@@ -136,17 +136,15 @@ static void wake_idle_worker(h2_workers
}
}
-static void cleanup_zombies(h2_workers *workers)
+static void join_zombies(h2_workers *workers)
{
h2_slot *slot;
while ((slot = pop_slot(&workers->zombies))) {
- if (slot->thread) {
- apr_status_t status;
- apr_thread_join(&status, slot->thread);
- slot->thread = NULL;
- }
- apr_atomic_dec32(&workers->worker_count);
- slot->next = NULL;
+ apr_status_t status;
+ ap_assert(slot->thread != NULL);
+ apr_thread_join(&status, slot->thread);
+ slot->thread = NULL;
+
push_slot(&workers->free, slot);
}
}
@@ -184,37 +182,49 @@ static h2_fifo_op_t mplx_peek(void *head
* Get the next task for the given worker. Will block until a task arrives
* or the max_wait timer expires and more than min workers exist.
*/
-static apr_status_t get_next(h2_slot *slot)
+static int get_next(h2_slot *slot)
{
h2_workers *workers = slot->workers;
- apr_status_t status;
-
- slot->task = NULL;
- while (!slot->aborted) {
- if (!slot->task) {
- status = h2_fifo_try_peek(workers->mplxs, mplx_peek, slot);
- if (status == APR_EOF) {
- return status;
- }
+
+ while (!workers->aborted) {
+ ap_assert(slot->task == NULL);
+ if (h2_fifo_try_peek(workers->mplxs, mplx_peek, slot) == APR_EOF) {
+ /* The queue is terminated with the MPM child being cleaned up,
+ * just leave.
+ */
+ break;
}
-
if (slot->task) {
- return APR_SUCCESS;
+ return 1;
}
- cleanup_zombies(workers);
+ join_zombies(workers);
apr_thread_mutex_lock(slot->lock);
- push_slot(&workers->idle, slot);
- apr_thread_cond_wait(slot->not_idle, slot->lock);
+ if (!workers->aborted) {
+ push_slot(&workers->idle, slot);
+ apr_thread_cond_wait(slot->not_idle, slot->lock);
+ }
apr_thread_mutex_unlock(slot->lock);
}
- return APR_EOF;
+
+ return 0;
}
static void slot_done(h2_slot *slot)
{
- push_slot(&(slot->workers->zombies), slot);
+ h2_workers *workers = slot->workers;
+
+ push_slot(&workers->zombies, slot);
+
+ /* If this worker is the last one exiting and the MPM child is stopping,
+ * unblock workers_pool_cleanup().
+ */
+ if (!apr_atomic_dec32(&workers->worker_count) && workers->aborted) {
+ apr_thread_mutex_lock(workers->lock);
+ apr_thread_cond_signal(workers->all_done);
+ apr_thread_mutex_unlock(workers->lock);
+ }
}
@@ -222,25 +232,23 @@ static void* APR_THREAD_FUNC slot_run(ap
{
h2_slot *slot = wctx;
- while (!slot->aborted) {
-
- /* Get a h2_task from the mplxs queue. */
- get_next(slot);
- while (slot->task) {
-
+ /* Get the h2_task(s) from the ->mplxs queue. */
+ while (get_next(slot)) {
+ ap_assert(slot->task != NULL);
+ do {
h2_task_do(slot->task, thread, slot->id);
/* Report the task as done. If stickyness is left, offer the
* mplx the opportunity to give us back a new task right away.
*/
- if (!slot->aborted && (--slot->sticks > 0)) {
+ if (!slot->workers->aborted && --slot->sticks > 0) {
h2_mplx_s_task_done(slot->task->mplx, slot->task, &slot->task);
}
else {
h2_mplx_s_task_done(slot->task->mplx, slot->task, NULL);
slot->task = NULL;
}
- }
+ } while (slot->task);
}
slot_done(slot);
@@ -254,26 +262,24 @@ static apr_status_t workers_pool_cleanup
h2_workers *workers = data;
h2_slot *slot;
- if (!workers->aborted) {
- workers->aborted = 1;
- /* abort all idle slots */
- for (;;) {
- slot = pop_slot(&workers->idle);
- if (slot) {
- apr_thread_mutex_lock(slot->lock);
- slot->aborted = 1;
- apr_thread_cond_signal(slot->not_idle);
- apr_thread_mutex_unlock(slot->lock);
- }
- else {
- break;
- }
- }
+ workers->aborted = 1;
+ h2_fifo_term(workers->mplxs);
- h2_fifo_term(workers->mplxs);
+ /* abort all idle slots */
+ while ((slot = pop_slot(&workers->idle))) {
+ apr_thread_mutex_lock(slot->lock);
+ apr_thread_cond_signal(slot->not_idle);
+ apr_thread_mutex_unlock(slot->lock);
+ }
- cleanup_zombies(workers);
+ /* wait for all the workers to become zombies and join them */
+ apr_thread_mutex_lock(workers->lock);
+ if (apr_atomic_read32(&workers->worker_count)) {
+ apr_thread_cond_wait(workers->all_done, workers->lock);
}
+ apr_thread_mutex_unlock(workers->lock);
+ join_zombies(workers);
+
return APR_SUCCESS;
}
@@ -340,6 +346,9 @@ h2_workers *h2_workers_create(server_rec
APR_THREAD_MUTEX_DEFAULT,
workers->pool);
if (status == APR_SUCCESS) {
+ status = apr_thread_cond_create(&workers->all_done, workers->pool);
+ }
+ if (status == APR_SUCCESS) {
n = workers->nslots = workers->max_workers;
workers->slots = apr_pcalloc(workers->pool, n * sizeof(h2_slot));
if (workers->slots == NULL) {
Modified: httpd/httpd/trunk/modules/http2/h2_workers.h
URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_workers.h?rev=1886255&r1=1886254&r2=1886255&view=diff
==============================================================================
--- httpd/httpd/trunk/modules/http2/h2_workers.h (original)
+++ httpd/httpd/trunk/modules/http2/h2_workers.h Sat Feb 6 12:17:40 2021
@@ -42,7 +42,7 @@ struct h2_workers {
apr_uint32_t max_workers;
int max_idle_secs;
- int aborted;
+ volatile int aborted;
int dynamic;
apr_threadattr_t *thread_attr;
@@ -58,6 +58,7 @@ struct h2_workers {
struct h2_fifo *mplxs;
struct apr_thread_mutex_t *lock;
+ struct apr_thread_cond_t *all_done;
};
Re: svn commit: r1886255 - in /httpd/httpd/trunk/modules/http2:
h2_workers.c h2_workers.h
Posted by st...@eissing.org.
Nice work, Yann!
> Am 06.02.2021 um 13:17 schrieb ylavic@apache.org:
>
> Author: ylavic
> Date: Sat Feb 6 12:17:40 2021
> New Revision: 1886255
>
> URL: http://svn.apache.org/viewvc?rev=1886255&view=rev
> Log:
> mod_http2: Fix workers synchronization on pchild cleanup.
>
> When the MPM child exits and pre-workers_pool_cleanup() is called, all the
> workers are are necessarily in their idle critical section, thus aborting slots
> in the ->idle list only may leave worker threads alive, later blocked in the
> idle critical section with no one to wake them.
>
> Instead of the per-slot ->aborted flag, workers_pool_cleanup() will now set
> workers->aborted "globally" such that slot_run() does not wait to be woken up
> from idle in this case, and all workers really exit.
>
> Also, for workers_pool_cleanup() to wait for all the workers to reach the
> ->zombies list before returning, a new ->all_done condition variable is armed
> when the last thread exits. Since this depends on the atomic ->worker_count to
> reach zero, for accuracy the increment in activate_slot() is moved before the
> thread startup.
>
>
> * modules/http2/h2_workers.h (struct h2_workers): volatilize ->aborted and
> add the ->all_done condition variable.
>
> * modules/http2/h2_workers.c (push_slot, pop_slot): volatilize the h2_slot*
> being cas-ed.
>
> * modules/http2/h2_workers.c (cleanup_zombies): rename to join_zombies(), and
> move ->worker_count atomic inc to slot_done().
>
> * modules/http2/h2_workers.c (get_next): when workers->aborted, leave and don't
> wait for ->not_idle. Return an int/bool since it's gotten / not gotten.
>
> * modules/http2/h2_workers.c (slot_done): signal ->all_done when the last
> worker and the MPM child are exiting.
>
> * modules/http2/h2_workers.c (slot_run): rework the loops now that get_next()
> is the stop signal.
>
> * modules/http2/h2_workers.c (workers_pool_cleanup): wait for ->all_done when
> needed, and remove the !workers->aborted condition since the cleanup will
> only be called once.
>
> * modules/http2/h2_workers.c (activate_slot): move ->worker_count atomic inc
> before the thread creation and handle failure rollback.
>
> github: closes #169
>
> Modified:
> httpd/httpd/trunk/modules/http2/h2_workers.c
> httpd/httpd/trunk/modules/http2/h2_workers.h
>
> Modified: httpd/httpd/trunk/modules/http2/h2_workers.c
> URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_workers.c?rev=1886255&r1=1886254&r2=1886255&view=diff
> ==============================================================================
> --- httpd/httpd/trunk/modules/http2/h2_workers.c (original)
> +++ httpd/httpd/trunk/modules/http2/h2_workers.c Sat Feb 6 12:17:40 2021
> @@ -34,17 +34,16 @@
> typedef struct h2_slot h2_slot;
> struct h2_slot {
> int id;
> + int sticks;
> h2_slot *next;
> h2_workers *workers;
> - int aborted;
> - int sticks;
> h2_task *task;
> apr_thread_t *thread;
> apr_thread_mutex_t *lock;
> apr_thread_cond_t *not_idle;
> };
>
> -static h2_slot *pop_slot(h2_slot **phead)
> +static h2_slot *pop_slot(h2_slot *volatile *phead)
> {
> /* Atomically pop a slot from the list */
> for (;;) {
> @@ -59,7 +58,7 @@ static h2_slot *pop_slot(h2_slot **phead
> }
> }
>
> -static void push_slot(h2_slot **phead, h2_slot *slot)
> +static void push_slot(h2_slot *volatile *phead, h2_slot *slot)
> {
> /* Atomically push a slot to the list */
> ap_assert(!slot->next);
> @@ -78,7 +77,6 @@ static apr_status_t activate_slot(h2_wor
> apr_status_t status;
>
> slot->workers = workers;
> - slot->aborted = 0;
> slot->task = NULL;
>
> if (!slot->lock) {
> @@ -101,16 +99,18 @@ static apr_status_t activate_slot(h2_wor
>
> ap_log_error(APLOG_MARK, APLOG_TRACE2, 0, workers->s,
> "h2_workers: new thread for slot %d", slot->id);
> +
> /* thread will either immediately start work or add itself
> * to the idle queue */
> - apr_thread_create(&slot->thread, workers->thread_attr, slot_run, slot,
> - workers->pool);
> - if (!slot->thread) {
> + apr_atomic_inc32(&workers->worker_count);
> + status = apr_thread_create(&slot->thread, workers->thread_attr,
> + slot_run, slot, workers->pool);
> + if (status != APR_SUCCESS) {
> + apr_atomic_dec32(&workers->worker_count);
> push_slot(&workers->free, slot);
> - return APR_ENOMEM;
> + return status;
> }
>
> - apr_atomic_inc32(&workers->worker_count);
> return APR_SUCCESS;
> }
>
> @@ -136,17 +136,15 @@ static void wake_idle_worker(h2_workers
> }
> }
>
> -static void cleanup_zombies(h2_workers *workers)
> +static void join_zombies(h2_workers *workers)
> {
> h2_slot *slot;
> while ((slot = pop_slot(&workers->zombies))) {
> - if (slot->thread) {
> - apr_status_t status;
> - apr_thread_join(&status, slot->thread);
> - slot->thread = NULL;
> - }
> - apr_atomic_dec32(&workers->worker_count);
> - slot->next = NULL;
> + apr_status_t status;
> + ap_assert(slot->thread != NULL);
> + apr_thread_join(&status, slot->thread);
> + slot->thread = NULL;
> +
> push_slot(&workers->free, slot);
> }
> }
> @@ -184,37 +182,49 @@ static h2_fifo_op_t mplx_peek(void *head
> * Get the next task for the given worker. Will block until a task arrives
> * or the max_wait timer expires and more than min workers exist.
> */
> -static apr_status_t get_next(h2_slot *slot)
> +static int get_next(h2_slot *slot)
> {
> h2_workers *workers = slot->workers;
> - apr_status_t status;
> -
> - slot->task = NULL;
> - while (!slot->aborted) {
> - if (!slot->task) {
> - status = h2_fifo_try_peek(workers->mplxs, mplx_peek, slot);
> - if (status == APR_EOF) {
> - return status;
> - }
> +
> + while (!workers->aborted) {
> + ap_assert(slot->task == NULL);
> + if (h2_fifo_try_peek(workers->mplxs, mplx_peek, slot) == APR_EOF) {
> + /* The queue is terminated with the MPM child being cleaned up,
> + * just leave.
> + */
> + break;
> }
> -
> if (slot->task) {
> - return APR_SUCCESS;
> + return 1;
> }
>
> - cleanup_zombies(workers);
> + join_zombies(workers);
>
> apr_thread_mutex_lock(slot->lock);
> - push_slot(&workers->idle, slot);
> - apr_thread_cond_wait(slot->not_idle, slot->lock);
> + if (!workers->aborted) {
> + push_slot(&workers->idle, slot);
> + apr_thread_cond_wait(slot->not_idle, slot->lock);
> + }
> apr_thread_mutex_unlock(slot->lock);
> }
> - return APR_EOF;
> +
> + return 0;
> }
>
> static void slot_done(h2_slot *slot)
> {
> - push_slot(&(slot->workers->zombies), slot);
> + h2_workers *workers = slot->workers;
> +
> + push_slot(&workers->zombies, slot);
> +
> + /* If this worker is the last one exiting and the MPM child is stopping,
> + * unblock workers_pool_cleanup().
> + */
> + if (!apr_atomic_dec32(&workers->worker_count) && workers->aborted) {
> + apr_thread_mutex_lock(workers->lock);
> + apr_thread_cond_signal(workers->all_done);
> + apr_thread_mutex_unlock(workers->lock);
> + }
> }
>
>
> @@ -222,25 +232,23 @@ static void* APR_THREAD_FUNC slot_run(ap
> {
> h2_slot *slot = wctx;
>
> - while (!slot->aborted) {
> -
> - /* Get a h2_task from the mplxs queue. */
> - get_next(slot);
> - while (slot->task) {
> -
> + /* Get the h2_task(s) from the ->mplxs queue. */
> + while (get_next(slot)) {
> + ap_assert(slot->task != NULL);
> + do {
> h2_task_do(slot->task, thread, slot->id);
>
> /* Report the task as done. If stickyness is left, offer the
> * mplx the opportunity to give us back a new task right away.
> */
> - if (!slot->aborted && (--slot->sticks > 0)) {
> + if (!slot->workers->aborted && --slot->sticks > 0) {
> h2_mplx_s_task_done(slot->task->mplx, slot->task, &slot->task);
> }
> else {
> h2_mplx_s_task_done(slot->task->mplx, slot->task, NULL);
> slot->task = NULL;
> }
> - }
> + } while (slot->task);
> }
>
> slot_done(slot);
> @@ -254,26 +262,24 @@ static apr_status_t workers_pool_cleanup
> h2_workers *workers = data;
> h2_slot *slot;
>
> - if (!workers->aborted) {
> - workers->aborted = 1;
> - /* abort all idle slots */
> - for (;;) {
> - slot = pop_slot(&workers->idle);
> - if (slot) {
> - apr_thread_mutex_lock(slot->lock);
> - slot->aborted = 1;
> - apr_thread_cond_signal(slot->not_idle);
> - apr_thread_mutex_unlock(slot->lock);
> - }
> - else {
> - break;
> - }
> - }
> + workers->aborted = 1;
> + h2_fifo_term(workers->mplxs);
>
> - h2_fifo_term(workers->mplxs);
> + /* abort all idle slots */
> + while ((slot = pop_slot(&workers->idle))) {
> + apr_thread_mutex_lock(slot->lock);
> + apr_thread_cond_signal(slot->not_idle);
> + apr_thread_mutex_unlock(slot->lock);
> + }
>
> - cleanup_zombies(workers);
> + /* wait for all the workers to become zombies and join them */
> + apr_thread_mutex_lock(workers->lock);
> + if (apr_atomic_read32(&workers->worker_count)) {
> + apr_thread_cond_wait(workers->all_done, workers->lock);
> }
> + apr_thread_mutex_unlock(workers->lock);
> + join_zombies(workers);
> +
> return APR_SUCCESS;
> }
>
> @@ -340,6 +346,9 @@ h2_workers *h2_workers_create(server_rec
> APR_THREAD_MUTEX_DEFAULT,
> workers->pool);
> if (status == APR_SUCCESS) {
> + status = apr_thread_cond_create(&workers->all_done, workers->pool);
> + }
> + if (status == APR_SUCCESS) {
> n = workers->nslots = workers->max_workers;
> workers->slots = apr_pcalloc(workers->pool, n * sizeof(h2_slot));
> if (workers->slots == NULL) {
>
> Modified: httpd/httpd/trunk/modules/http2/h2_workers.h
> URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/http2/h2_workers.h?rev=1886255&r1=1886254&r2=1886255&view=diff
> ==============================================================================
> --- httpd/httpd/trunk/modules/http2/h2_workers.h (original)
> +++ httpd/httpd/trunk/modules/http2/h2_workers.h Sat Feb 6 12:17:40 2021
> @@ -42,7 +42,7 @@ struct h2_workers {
> apr_uint32_t max_workers;
> int max_idle_secs;
>
> - int aborted;
> + volatile int aborted;
> int dynamic;
>
> apr_threadattr_t *thread_attr;
> @@ -58,6 +58,7 @@ struct h2_workers {
> struct h2_fifo *mplxs;
>
> struct apr_thread_mutex_t *lock;
> + struct apr_thread_cond_t *all_done;
> };
>
>
>
>