You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cl...@apache.org on 2021/12/16 02:12:21 UTC
[qpid-proton] 01/03: PROTON-2362: epoll proactor: prevent opportunistic warm task from running twice
This is an automated email from the ASF dual-hosted git repository.
cliffjansen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
commit 4f8ac48c48e66d1650308bc1be694dae3215fc9a
Author: Cliff Jansen <cl...@apache.org>
AuthorDate: Wed Dec 15 17:43:30 2021 -0800
PROTON-2362: epoll proactor: prevent opportunistic warm task from running twice
---
c/src/proactor/epoll-internal.h | 4 ++--
c/src/proactor/epoll.c | 46 +++++++++++++++++++++++++++--------------
2 files changed, 32 insertions(+), 18 deletions(-)
diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h
index 8e9e1b2..155277b 100644
--- a/c/src/proactor/epoll-internal.h
+++ b/c/src/proactor/epoll-internal.h
@@ -102,7 +102,7 @@ typedef struct task_t {
tslot_t *prev_runner;
bool sched_ready;
bool sched_pending; /* If true, one or more unseen epoll or other events to process() */
- bool runnable ; /* on one of the runnable lists */
+ int runnables_idx; /* 0 means unset, idx-1 is array position */
} task_t;
typedef enum {
@@ -198,7 +198,7 @@ struct pn_proactor_t {
task_t *resched_cutoff; // last resched task of current poller work snapshot. TODO: superseded by polled_resched_count?
task_t *resched_next;
unsigned int resched_count;
- unsigned int polled_resched_count;
+ unsigned int polled_resched_count;
pmutex tslot_mutex;
int earmark_count;
bool earmark_drain;
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index ea2e25a..2363f48 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -47,6 +47,8 @@
non-proactor-task -> proactor-task
tslot -> sched
+ TODO: doc new work: warm (assigned), earmarked (assigned), runnables (unordered), sched_ready
+ list (ordered), resched list (ordered).
TODO: document role of sched_pending and how sched_XXX (i.e. sched_interrupt)
transitions from "private to the scheduler" to "visible to the task".
TODO: document task.working duration can be long: from xxx_process() to xxx_done() or null batch.
@@ -442,7 +444,7 @@ static void assign_thread(tslot_t *ts, task_t *tsk) {
assert(!tsk->runner);
tsk->runner = ts;
tsk->prev_runner = NULL;
- tsk->runnable = false;
+ tsk->runnables_idx = 0;
ts->task = tsk;
ts->prev_task = NULL;
}
@@ -539,10 +541,9 @@ static void remove_earmark(tslot_t *ts) {
static void make_runnable(task_t *tsk) {
pn_proactor_t *p = tsk->proactor;
assert(p->n_runnables <= p->runnables_capacity);
- assert(!tsk->runnable);
+ assert(!tsk->runnables_idx);
if (tsk->runner) return;
- tsk->runnable = true;
// Track it as normal or warm or earmarked
if (pni_warm_sched) {
tslot_t *ts = tsk->prev_runner;
@@ -552,8 +553,11 @@ static void make_runnable(task_t *tsk) {
p->warm_runnables[p->n_warm_runnables++] = tsk;
assign_thread(ts, tsk);
}
- else
- p->runnables[p->n_runnables++] = tsk;
+ else {
+ p->runnables[p->n_runnables] = tsk;
+ tsk->runnables_idx = p->n_runnables + 1; // off by one accounting
+ p->n_runnables++;
+ }
return;
}
if (ts->state == UNUSED && !p->earmark_drain) {
@@ -563,7 +567,9 @@ static void make_runnable(task_t *tsk) {
}
}
}
- p->runnables[p->n_runnables++] = tsk;
+ p->runnables[p->n_runnables] = tsk;
+ tsk->runnables_idx = p->n_runnables + 1; // off by one accounting
+ p->n_runnables++;
}
@@ -2339,7 +2345,7 @@ static task_t *post_event(pn_proactor_t *p, struct epoll_event *evp) {
break;
}
}
- if (tsk && !tsk->runnable && !tsk->runner && !on_sched_ready_list(tsk, p))
+ if (tsk && !tsk->runnables_idx && !tsk->runner && !on_sched_ready_list(tsk, p))
return tsk;
return NULL;
}
@@ -2348,7 +2354,7 @@ static task_t *post_event(pn_proactor_t *p, struct epoll_event *evp) {
static inline task_t *post_ready(pn_proactor_t *p, task_t *tsk) {
tsk->sched_ready = true;
tsk->sched_pending = true;
- if (!tsk->runnable && !tsk->runner)
+ if (!tsk->runnables_idx && !tsk->runner)
return tsk;
return NULL;
}
@@ -2386,33 +2392,38 @@ static task_t *next_runnable(pn_proactor_t *p, tslot_t *ts) {
return ts->task;
}
- // warm pairing ?
+ // Take any runnables task if it results in a warm pairing.
task_t *tsk = ts->prev_task;
- if (tsk && (tsk->runnable)) {
+ if (tsk && (tsk->runnables_idx)) {
+ // A task can self delete, so don't allow it to run twice.
+ task_t **runnables_slot = &p->runnables[tsk->runnables_idx -1];
+ assert(*runnables_slot == tsk);
+ *runnables_slot = NULL;
assign_thread(ts, tsk);
return tsk;
}
- // check for an unassigned runnable task or ready list task
+ // check for remaining runnable tasks
if (p->n_runnables) {
// Any unclaimed runnable?
while (p->n_runnables) {
tsk = p->runnables[p->next_runnable++];
if (p->n_runnables == p->next_runnable)
p->n_runnables = 0;
- if (tsk->runnable) {
+ if (tsk) {
assign_thread(ts, tsk);
return tsk;
}
}
}
+ // rest of sched_ready list
while (p->sched_ready_count) {
tsk = p->sched_ready_current;
assert(tsk->ready); // eventfd_mutex required post ready set and pre move to sched_ready_list
if (post_ready(p, tsk)) {
pop_ready_task(tsk); // updates sched_ready_current
- assert(!tsk->runnable && !tsk->runner);
+ assert(!tsk->runnables_idx && !tsk->runner);
assign_thread(ts, tsk);
return tsk;
} else {
@@ -2420,10 +2431,11 @@ static task_t *next_runnable(pn_proactor_t *p, tslot_t *ts) {
}
}
+ // the resched list
if (p->polled_resched_count) {
// Unprocessed resched tasks remain.
tsk = resched_pop_front(p);
- assert(tsk->sched_pending && !tsk->runnable && tsk->runner == RESCHEDULE_PLACEHOLDER);
+ assert(tsk->sched_pending && !tsk->runnables_idx && tsk->runner == RESCHEDULE_PLACEHOLDER);
tsk->runner = NULL;
assign_thread(ts, tsk);
return tsk;
@@ -2582,9 +2594,11 @@ static bool poller_do_epoll(struct pn_proactor_t* p, tslot_t *ts, bool can_block
break;
}
- // We have unpolled work or at least one new epoll event
+ // We have unpolled work or at least one new epoll event.
+ // Remember tasks that together constitute new work. See note at beginning about duplicates.
lock(&p->eventfd_mutex);
+
// Longest hold of eventfd_mutex. The following must be quick with no external calls:
// post_event(), make_runnable(), assign_thread(), earmark_thread().
for (int i = 0; i < n_events; i++) {
@@ -2620,7 +2634,7 @@ static bool poller_do_epoll(struct pn_proactor_t* p, tslot_t *ts, bool can_block
while (p->resched_cutoff && p->n_runnables < max_runnables && warm_tries) {
ctsk = resched_pop_front(p);
- assert(ctsk->runner == RESCHEDULE_PLACEHOLDER && !ctsk->runnable);
+ assert(ctsk->runner == RESCHEDULE_PLACEHOLDER && !ctsk->runnables_idx);
ctsk->runner = NULL; // Allow task to run again.
warm_tries--;
make_runnable(ctsk);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org