You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cl...@apache.org on 2021/12/16 02:12:23 UTC

[qpid-proton] 03/03: PROTON-2362: epoll proactor: remove ready list sneak peek optimization causing early task deletion

This is an automated email from the ASF dual-hosted git repository.

cliffjansen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit 996b9b114fdb4682c8114ad700705446ff3a24fd
Author: Cliff Jansen <cl...@apache.org>
AuthorDate: Wed Dec 15 17:54:51 2021 -0800

    PROTON-2362: epoll proactor: remove ready list sneak peek optimization causing early task deletion
---
 c/src/proactor/epoll.c | 68 +++++++-------------------------------------------
 1 file changed, 9 insertions(+), 59 deletions(-)

diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index 6a5beb4..6207267 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -991,35 +991,6 @@ static inline void pconnection_rearm(pconnection_t *pc, int wanted_now) {
   // Return immediately.  pc may have just been freed by another thread.
 }
 
-/* Only call when context switch is imminent.  Sched lock is highly contested. */
-// Call with both task and sched locks.
-static bool pconnection_sched_sync(pconnection_t *pc) {
-  uint32_t sync_events = 0;
-  uint32_t sync_args = pc->tick_pending << 1;
-  if (pc->psocket.sched_io_events) {
-    pc->new_events = pc->psocket.sched_io_events;
-    pc->psocket.sched_io_events = 0;
-    pc->current_arm = 0;  // or outside lock?
-    sync_events = pc->new_events;
-  }
-  if (pc->task.sched_ready) {
-    pc->task.sched_ready = false;
-    schedule_done(&pc->task);
-    sync_args |= 1;
-  }
-  pc->task.sched_pending = false;
-
-  if (sync_args || sync_events) {
-    // Only replace if poller has found new work for us.
-    pc->process_args = (1 << 2) | sync_args;
-    pc->process_events = sync_events;
-  }
-
-  // Indicate if there are free proactor threads
-  pn_proactor_t *p = pc->task.proactor;
-  return p->poller_suspended || p->suspend_list_head;
-}
-
 /* Call with task lock and having done a write_flush() to "know" the value of wbuf_remaining */
 static inline bool pconnection_work_pending(pconnection_t *pc) {
   if (pc->new_events || pni_task_wake_pending(&pc->task) || pc->tick_pending || pc->queued_disconnect)
@@ -1038,14 +1009,9 @@ static void pconnection_done(pconnection_t *pc) {
   bool self_sched = false;
   lock(&pc->task.mutex);
   pc->task.working = false;  // So we can schedule() ourself if necessary.  We remain the de facto
-                             // working task instance while the lock is held.  Need sched_sync too to drain
-                             // a possible stale sched_ready.
+                             // working task instance while the lock is held.
   pc->hog_count = 0;
   bool has_event = pconnection_has_event(pc);
-  // Do as little as possible while holding the sched lock
-  lock(&p->sched_mutex);
-  pconnection_sched_sync(pc);
-  unlock(&p->sched_mutex);
 
   if (has_event || pconnection_work_pending(pc)) {
     self_sched = true;
@@ -1295,9 +1261,6 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
   }
 
   // Never stop working while work remains.  hog_count exception to this rule is elsewhere.
-  lock(&pc->task.proactor->sched_mutex);
-  bool workers_free = pconnection_sched_sync(pc);
-  unlock(&pc->task.proactor->sched_mutex);
 
   if (pconnection_work_pending(pc)) {
     goto retry;  // TODO: get rid of goto without adding more locking
@@ -1314,8 +1277,13 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
     }
   }
 
+  // check one last time for new io before context switch
+  bool workers_free;
+  pn_proactor_t *p = pc->task.proactor;
+  lock(&p->sched_mutex);
+  workers_free = (p->suspend_list_head != NULL);
+  unlock(&p->sched_mutex);
   if (workers_free && !pc->task.closing && !pc->io_doublecheck) {
-    // check one last time for new io before context switch
     pc->io_doublecheck = true;
     pc->read_blocked = false;
     pc->write_blocked = false;
@@ -1831,25 +1799,7 @@ static void listener_done(pn_listener_t *l) {
   bool notify = false;
   l->task.working = false;
 
-  lock(&p->sched_mutex);
-  int n_events = 0;
-  for (size_t i = 0; i < l->acceptors_size; i++) {
-    psocket_t *ps = &l->acceptors[i].psocket;
-    if (ps->sched_io_events) {
-      ps->working_io_events = ps->sched_io_events;
-      ps->sched_io_events = 0;
-    }
-    if (ps->working_io_events)
-      n_events++;
-  }
-
-  if (l->task.sched_ready) {
-    l->task.sched_ready = false;
-    schedule_done(&l->task);
-  }
-  unlock(&p->sched_mutex);
-
-  if (!n_events && listener_can_free(l)) {
+  if (listener_can_free(l)) {
     unlock(&l->task.mutex);
     pn_listener_free(l);
     lock(&p->sched_mutex);
@@ -1859,7 +1809,7 @@ static void listener_done(pn_listener_t *l) {
     if (notify) notify_poller(p);
     if (resume_thread) resume(p, resume_thread);
     return;
-  } else if (n_events || listener_has_event(l))
+  } else if (listener_has_event(l))
     notify = schedule(&l->task);
   unlock(&l->task.mutex);
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org