You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cl...@apache.org on 2021/12/16 02:12:23 UTC
[qpid-proton] 03/03: PROTON-2362: epoll proactor: remove ready list sneak peek optimization causing early task deletion
This is an automated email from the ASF dual-hosted git repository.
cliffjansen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
commit 996b9b114fdb4682c8114ad700705446ff3a24fd
Author: Cliff Jansen <cl...@apache.org>
AuthorDate: Wed Dec 15 17:54:51 2021 -0800
PROTON-2362: epoll proactor: remove ready list sneak peek optimization causing early task deletion
---
c/src/proactor/epoll.c | 68 +++++++-------------------------------------------
1 file changed, 9 insertions(+), 59 deletions(-)
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index 6a5beb4..6207267 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -991,35 +991,6 @@ static inline void pconnection_rearm(pconnection_t *pc, int wanted_now) {
// Return immediately. pc may have just been freed by another thread.
}
-/* Only call when context switch is imminent. Sched lock is highly contested. */
-// Call with both task and sched locks.
-static bool pconnection_sched_sync(pconnection_t *pc) {
- uint32_t sync_events = 0;
- uint32_t sync_args = pc->tick_pending << 1;
- if (pc->psocket.sched_io_events) {
- pc->new_events = pc->psocket.sched_io_events;
- pc->psocket.sched_io_events = 0;
- pc->current_arm = 0; // or outside lock?
- sync_events = pc->new_events;
- }
- if (pc->task.sched_ready) {
- pc->task.sched_ready = false;
- schedule_done(&pc->task);
- sync_args |= 1;
- }
- pc->task.sched_pending = false;
-
- if (sync_args || sync_events) {
- // Only replace if poller has found new work for us.
- pc->process_args = (1 << 2) | sync_args;
- pc->process_events = sync_events;
- }
-
- // Indicate if there are free proactor threads
- pn_proactor_t *p = pc->task.proactor;
- return p->poller_suspended || p->suspend_list_head;
-}
-
/* Call with task lock and having done a write_flush() to "know" the value of wbuf_remaining */
static inline bool pconnection_work_pending(pconnection_t *pc) {
if (pc->new_events || pni_task_wake_pending(&pc->task) || pc->tick_pending || pc->queued_disconnect)
@@ -1038,14 +1009,9 @@ static void pconnection_done(pconnection_t *pc) {
bool self_sched = false;
lock(&pc->task.mutex);
pc->task.working = false; // So we can schedule() ourself if necessary. We remain the de facto
- // working task instance while the lock is held. Need sched_sync too to drain
- // a possible stale sched_ready.
+ // working task instance while the lock is held.
pc->hog_count = 0;
bool has_event = pconnection_has_event(pc);
- // Do as little as possible while holding the sched lock
- lock(&p->sched_mutex);
- pconnection_sched_sync(pc);
- unlock(&p->sched_mutex);
if (has_event || pconnection_work_pending(pc)) {
self_sched = true;
@@ -1295,9 +1261,6 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
}
// Never stop working while work remains. hog_count exception to this rule is elsewhere.
- lock(&pc->task.proactor->sched_mutex);
- bool workers_free = pconnection_sched_sync(pc);
- unlock(&pc->task.proactor->sched_mutex);
if (pconnection_work_pending(pc)) {
goto retry; // TODO: get rid of goto without adding more locking
@@ -1314,8 +1277,13 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
}
}
+ // check one last time for new io before context switch
+ bool workers_free;
+ pn_proactor_t *p = pc->task.proactor;
+ lock(&p->sched_mutex);
+ workers_free = (p->suspend_list_head != NULL);
+ unlock(&p->sched_mutex);
if (workers_free && !pc->task.closing && !pc->io_doublecheck) {
- // check one last time for new io before context switch
pc->io_doublecheck = true;
pc->read_blocked = false;
pc->write_blocked = false;
@@ -1831,25 +1799,7 @@ static void listener_done(pn_listener_t *l) {
bool notify = false;
l->task.working = false;
- lock(&p->sched_mutex);
- int n_events = 0;
- for (size_t i = 0; i < l->acceptors_size; i++) {
- psocket_t *ps = &l->acceptors[i].psocket;
- if (ps->sched_io_events) {
- ps->working_io_events = ps->sched_io_events;
- ps->sched_io_events = 0;
- }
- if (ps->working_io_events)
- n_events++;
- }
-
- if (l->task.sched_ready) {
- l->task.sched_ready = false;
- schedule_done(&l->task);
- }
- unlock(&p->sched_mutex);
-
- if (!n_events && listener_can_free(l)) {
+ if (listener_can_free(l)) {
unlock(&l->task.mutex);
pn_listener_free(l);
lock(&p->sched_mutex);
@@ -1859,7 +1809,7 @@ static void listener_done(pn_listener_t *l) {
if (notify) notify_poller(p);
if (resume_thread) resume(p, resume_thread);
return;
- } else if (n_events || listener_has_event(l))
+ } else if (listener_has_event(l))
notify = schedule(&l->task);
unlock(&l->task.mutex);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org