You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cl...@apache.org on 2020/10/20 07:04:49 UTC
[qpid-proton] branch master updated: NO-JIRA: Split out epoll
proactor poller logic to separate routine for readability
This is an automated email from the ASF dual-hosted git repository.
cliffjansen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
The following commit(s) were added to refs/heads/master by this push:
new 34ca28f NO-JIRA: Split out epoll proactor poller logic to separate routine for readability
34ca28f is described below
commit 34ca28fae4a880151440c33312ca7f723441c2b2
Author: Cliff Jansen <cl...@apache.org>
AuthorDate: Tue Oct 20 00:04:13 2020 -0700
NO-JIRA: Split out epoll proactor poller logic to separate routine for readability
---
c/src/proactor/epoll.c | 292 +++++++++++++++++++++++++------------------------
1 file changed, 151 insertions(+), 141 deletions(-)
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index 01d9db8..4c00d47 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -101,7 +101,7 @@
// Maybe futex is even better?
// See other "TODO" in code.
//
-// Consider case of large number of wakes: proactor_do_epoll() could start by
+// Consider case of large number of wakes: next_event_batch() could start by
// looking for pending wakes before a kernel call to epoll_wait(), or there
// could be several eventfds with random assignment of wakeables.
@@ -691,6 +691,7 @@ static void set_pconnection(pn_connection_t* c, pconnection_t *pc) {
static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, bool timeout, bool wake, bool topup);
static void write_flush(pconnection_t *pc);
static void listener_begin_close(pn_listener_t* l);
+static bool poller_do_epoll(struct pn_proactor_t* p, tslot_t *ts, bool can_block);
static void poller_done(struct pn_proactor_t* p, tslot_t *ts);
static inline pconnection_t *psocket_pconnection(psocket_t* ps) {
@@ -2580,7 +2581,7 @@ static pcontext_t *next_runnable(pn_proactor_t *p, tslot_t *ts) {
return NULL;
}
-static pn_event_batch_t *proactor_do_epoll(pn_proactor_t* p, bool can_block) {
+static pn_event_batch_t *next_event_batch(pn_proactor_t* p, bool can_block) {
lock(&p->tslot_mutex);
tslot_t * ts = find_tslot(p);
unlock(&p->tslot_mutex);
@@ -2591,9 +2592,9 @@ static pn_event_batch_t *proactor_do_epoll(pn_proactor_t* p, bool can_block) {
assert(ts->state == UNUSED || ts->state == NEW);
ts->state = PROCESSING;
+ // Process outstanding epoll events until we get a batch or need to block.
while (true) {
- // Process outstanding epoll events until we get a batch or need to block.
-
+ // First see if there are any contexts waiting to run and perhaps generate new Proton events,
pcontext_t *ctx = next_runnable(p, ts);
if (ctx) {
ts->state = BATCHING;
@@ -2611,155 +2612,164 @@ static pn_event_batch_t *proactor_do_epoll(pn_proactor_t* p, bool can_block) {
continue; // Long time may have passed. Back to beginning.
}
- // poll or wait for a runnable context
+ // Poll or wait for a runnable context
if (p->poller == NULL) {
+ bool return_immediately;
p->poller = ts;
- // As poller with lots to do, be mindful of hogging the sched lock. Release when making kernel calls.
- assert(p->n_runnables == 0);
- if (p->thread_count > p->thread_capacity)
- grow_poller_bufs(p);
- p->next_runnable = 0;
- p->n_warm_runnables = 0;
- p->last_earmark = NULL;
-
- bool unfinished_earmarks = p->earmark_count > 0;
- bool new_wakes = false;
- bool epoll_immediate = unfinished_earmarks || !can_block;
- assert(!p->sched_wake_first);
- if (!epoll_immediate) {
- lock(&p->eventfd_mutex);
- if (p->wake_list_first) {
- epoll_immediate = true;
- new_wakes = true;
- } else {
- p->wakes_in_progress = false;
- }
- unlock(&p->eventfd_mutex);
+ // Get new epoll events (if any) and mark the relevant contexts as runnable
+ return_immediately = poller_do_epoll(p, ts, can_block);
+ p->poller = NULL;
+ if (return_immediately) {
+ // Check if another thread is available to continue epoll-ing.
+ tslot_t *res_ts = resume_one_thread(p);
+ ts->state = UNUSED;
+ unlock(&p->sched_mutex);
+ if (res_ts) resume(p, res_ts);
+ return NULL;
}
- int timeout = (epoll_immediate) ? 0 : -1;
- p->poller_suspended = (timeout == -1);
+ poller_done(p, ts); // put suspended threads to work.
+ } else if (!can_block) {
+ ts->state = UNUSED;
unlock(&p->sched_mutex);
+ return NULL;
+ } else {
+ // TODO: loop while !poller_suspended, since new work coming
+ suspend(p, ts);
+ }
+ } // while
+}
- int n = epoll_wait(p->epollfd, p->kevents, p->kevents_capacity, timeout);
-
- lock(&p->sched_mutex);
- p->poller_suspended = false;
+// Call with sched lock. Return true if !can_block and no new events to process.
+static bool poller_do_epoll(struct pn_proactor_t* p, tslot_t *ts, bool can_block) {
+ // As poller with lots to do, be mindful of hogging the sched lock. Release when making kernel calls.
+ int n_events;
+ pcontext_t *ctx;
- bool unpolled_work = false;
- if (p->earmark_count > 0) {
- p->earmark_drain = true;
- unpolled_work = true;
- }
- if (new_wakes) {
- lock(&p->eventfd_mutex);
- schedule_wake_list(p);
- unlock(&p->eventfd_mutex);
- unpolled_work = true;
+ while (true) {
+ assert(p->n_runnables == 0);
+ if (p->thread_count > p->thread_capacity)
+ grow_poller_bufs(p);
+ p->next_runnable = 0;
+ p->n_warm_runnables = 0;
+ p->last_earmark = NULL;
+
+ bool unfinished_earmarks = p->earmark_count > 0;
+ bool new_wakes = false;
+ bool epoll_immediate = unfinished_earmarks || !can_block;
+ assert(!p->sched_wake_first);
+ if (!epoll_immediate) {
+ lock(&p->eventfd_mutex);
+ if (p->wake_list_first) {
+ epoll_immediate = true;
+ new_wakes = true;
+ } else {
+ p->wakes_in_progress = false;
}
+ unlock(&p->eventfd_mutex);
+ }
+ int timeout = (epoll_immediate) ? 0 : -1;
+ p->poller_suspended = (timeout == -1);
+ unlock(&p->sched_mutex);
- if (n < 0) {
- if (errno != EINTR)
- perror("epoll_wait"); // TODO: proper log
- if (!can_block && !unpolled_work) {
- p->poller = NULL;
- tslot_t *res_ts = resume_one_thread(p);
- ts->state = UNUSED;
- unlock(&p->sched_mutex);
- if (res_ts) resume(p, res_ts);
- return NULL;
- }
- else {
- p->poller = NULL;
- continue;
- }
- } else if (n == 0) {
- if (!can_block && !unpolled_work) {
- p->poller = NULL;
- tslot_t *res_ts = resume_one_thread(p);
- ts->state = UNUSED;
- unlock(&p->sched_mutex);
- if (res_ts) resume(p, res_ts);
- return NULL;
- }
- else {
- if (!epoll_immediate)
- perror("epoll_wait unexpected timeout"); // TODO: proper log
- if (!unpolled_work) {
- p->poller = NULL;
- continue;
- }
- }
- }
+ n_events = epoll_wait(p->epollfd, p->kevents, p->kevents_capacity, timeout);
- for (int i = 0; i < n; i++) {
- ctx = post_event(p, &p->kevents[i]);
- if (ctx)
- make_runnable(ctx);
- }
- if (n > 0)
- memset(p->kevents, 0, sizeof(struct epoll_event) * n);
-
- // The list of pending wakes can be very long. Traverse part of it looking for warm pairings.
- pcontext_t *wctx = p->sched_wake_current;
- int max_runnables = p->runnables_capacity;
- while (wctx && p->n_runnables < max_runnables) {
- if (wctx->runner == REWAKE_PLACEHOLDER)
- wctx->runner = NULL; // Allow context to run again.
- ctx = post_wake(p, wctx);
- if (ctx)
- make_runnable(ctx);
- pop_wake(wctx);
- wctx = wctx->wake_next;
- }
- p->sched_wake_current = wctx;
- // More wakes than places on the runnables list
- while (wctx) {
- if (wctx->runner == REWAKE_PLACEHOLDER)
- wctx->runner = NULL; // Allow context to run again.
- wctx->sched_wake = true;
- wctx->sched_pending = true;
- if (wctx->runnable || wctx->runner)
- pop_wake(wctx);
- wctx = wctx->wake_next;
- }
+ lock(&p->sched_mutex);
+ p->poller_suspended = false;
- if (pni_immediate && !ts->context) {
- // Poller gets to run if possible
- pcontext_t *pctx;
- if (p->n_runnables) {
- assert(p->next_runnable == 0);
- pctx = p->runnables[0];
- if (++p->next_runnable == p->n_runnables)
- p->n_runnables = 0;
- } else if (p->n_warm_runnables) {
- pctx = p->warm_runnables[--p->n_warm_runnables];
- tslot_t *ts2 = pctx->runner;
- ts2->prev_context = ts2->context = NULL;
- pctx->runner = NULL;
- } else if (p->last_earmark) {
- pctx = p->last_earmark->context;
- remove_earmark(p->last_earmark);
- if (p->earmark_count == 0)
- p->earmark_drain = false;
- } else {
- pctx = NULL;
- }
- if (pctx) {
- assign_thread(ts, pctx);
- }
+ bool unpolled_work = false;
+ if (p->earmark_count > 0) {
+ p->earmark_drain = true;
+ unpolled_work = true;
+ }
+ if (new_wakes) {
+ lock(&p->eventfd_mutex);
+ schedule_wake_list(p);
+ unlock(&p->eventfd_mutex);
+ unpolled_work = true;
+ }
+
+ if (n_events < 0) {
+ if (errno != EINTR)
+ perror("epoll_wait"); // TODO: proper log
+ if (!can_block && !unpolled_work)
+ return true;
+ else
+ continue;
+ } else if (n_events == 0) {
+ if (!can_block && !unpolled_work)
+ return true;
+ else {
+ if (!epoll_immediate)
+ perror("epoll_wait unexpected timeout"); // TODO: proper log
+ if (!unpolled_work)
+ continue;
}
+ }
- poller_done(p, ts); // put suspended threads to work.
- // p->poller has been released, so a new poller may already be running.
- } else if (!can_block) {
- ts->state = UNUSED;
- unlock(&p->sched_mutex);
- return NULL;
+ break;
+ }
+
+ // We have unpolled work or at least one new epoll event
+
+
+ for (int i = 0; i < n_events; i++) {
+ ctx = post_event(p, &p->kevents[i]);
+ if (ctx)
+ make_runnable(ctx);
+ }
+ if (n_events > 0)
+ memset(p->kevents, 0, sizeof(struct epoll_event) * n_events);
+
+ // The list of pending wakes can be very long. Traverse part of it looking for warm pairings.
+ pcontext_t *wctx = p->sched_wake_current;
+ int max_runnables = p->runnables_capacity;
+ while (wctx && p->n_runnables < max_runnables) {
+ if (wctx->runner == REWAKE_PLACEHOLDER)
+ wctx->runner = NULL; // Allow context to run again.
+ ctx = post_wake(p, wctx);
+ if (ctx)
+ make_runnable(ctx);
+ pop_wake(wctx);
+ wctx = wctx->wake_next;
+ }
+ p->sched_wake_current = wctx;
+ // More wakes than places on the runnables list
+ while (wctx) {
+ if (wctx->runner == REWAKE_PLACEHOLDER)
+ wctx->runner = NULL; // Allow context to run again.
+ wctx->sched_wake = true;
+ wctx->sched_pending = true;
+ if (wctx->runnable || wctx->runner)
+ pop_wake(wctx);
+ wctx = wctx->wake_next;
+ }
+
+ if (pni_immediate && !ts->context) {
+ // Poller gets to run if possible
+ pcontext_t *pctx;
+ if (p->n_runnables) {
+ assert(p->next_runnable == 0);
+ pctx = p->runnables[0];
+ if (++p->next_runnable == p->n_runnables)
+ p->n_runnables = 0;
+ } else if (p->n_warm_runnables) {
+ pctx = p->warm_runnables[--p->n_warm_runnables];
+ tslot_t *ts2 = pctx->runner;
+ ts2->prev_context = ts2->context = NULL;
+ pctx->runner = NULL;
+ } else if (p->last_earmark) {
+ pctx = p->last_earmark->context;
+ remove_earmark(p->last_earmark);
+ if (p->earmark_count == 0)
+ p->earmark_drain = false;
} else {
- // TODO: loop while !poller_suspended, since new work coming
- suspend(p, ts);
+ pctx = NULL;
}
- } // while
+ if (pctx) {
+ assign_thread(ts, pctx);
+ }
+ }
+ return false;
}
// Call with sched lock, but only from poller context.
@@ -2819,11 +2829,11 @@ static void poller_done(struct pn_proactor_t* p, tslot_t *ts) {
}
pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) {
- return proactor_do_epoll(p, true);
+ return next_event_batch(p, true);
}
pn_event_batch_t *pn_proactor_get(struct pn_proactor_t* p) {
- return proactor_do_epoll(p, false);
+ return next_event_batch(p, false);
}
// Call with no locks
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org