You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2020/07/10 18:30:36 UTC

[qpid-proton] branch master updated: PROTON-2250: Rearrange epoll locks to avoid locking in one place and unlocking in another

This is an automated email from the ASF dual-hosted git repository.

astitcher pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git


The following commit(s) were added to refs/heads/master by this push:
     new 181c213  PROTON-2250: Rearrange epoll locks to avoid locking in one place and unlocking in another
181c213 is described below

commit 181c213166d27f5e371400feb65da7d50ebf2b03
Author: Andrew Stitcher <as...@apache.org>
AuthorDate: Wed May 6 14:37:13 2020 -0400

    PROTON-2250: Rearrange epoll locks to avoid locking in one place and unlocking in another
---
 c/src/proactor/epoll.c | 52 ++++++++++++++++++++++++++------------------------
 1 file changed, 27 insertions(+), 25 deletions(-)

diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index 0f6a606..30a07e9 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -1072,9 +1072,9 @@ static void pconnection_rearm_timer(pconnection_t *pc) {
    EPOLL_CTL_DEL can prevent a parallel HUP/ERR error notification during
    close/shutdown.  Let read()/write() return 0 or -1 to trigger cleanup logic.
 */
-static bool pconnection_rearm_check(pconnection_t *pc) {
+static int pconnection_rearm_check(pconnection_t *pc) {
   if (pconnection_rclosed(pc) && pconnection_wclosed(pc)) {
-    return false;
+    return 0;;
   }
   uint32_t wanted_now = (pc->read_blocked && !pconnection_rclosed(pc)) ? EPOLLIN : 0;
   if (!pconnection_wclosed(pc)) {
@@ -1085,16 +1085,15 @@ static bool pconnection_rearm_check(pconnection_t *pc) {
         wanted_now |= EPOLLOUT;
     }
   }
-  if (!wanted_now) return false;
-  if (wanted_now == pc->current_arm) return false;
+  if (!wanted_now) return 0;
+  if (wanted_now == pc->current_arm) return 0;
 
-  lock(&pc->rearm_mutex);      /* unlocked in pconnection_rearm... */
-  pc->current_arm = pc->psocket.epoll_io.wanted = wanted_now;
-  return true;                     /* ... so caller MUST call pconnection_rearm */
+  return wanted_now;
 }
 
-/* Call without lock */
-static inline void pconnection_rearm(pconnection_t *pc) {
+static inline void pconnection_rearm(pconnection_t *pc, int wanted_now) {
+  lock(&pc->rearm_mutex);
+  pc->current_arm = pc->psocket.epoll_io.wanted = wanted_now;
   rearm(pc->psocket.proactor, &pc->psocket.epoll_io);
   unlock(&pc->rearm_mutex);
   // Return immediately.  pc may have just been freed by another thread.
@@ -1175,10 +1174,10 @@ static void pconnection_done(pconnection_t *pc) {
     notify = wake(&pc->context);
 
   pconnection_rearm_timer(pc);
-  bool rearm = pconnection_rearm_check(pc);
+  int wanted = pconnection_rearm_check(pc);
   unlock(&pc->context.mutex);
 
-  if (rearm) pconnection_rearm(pc);  // May free pc on another thread.  Return.
+  if (wanted) pconnection_rearm(pc, wanted);  // May free pc on another thread.  Return.
   lock(&p->sched_mutex);
   if (unassign_thread(ts, UNUSED))
     notify = true;
@@ -1454,10 +1453,10 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
   }
 
   pconnection_rearm_timer(pc);
-  bool rearm_pc = pconnection_rearm_check(pc);  // holds rearm_mutex until pconnection_rearm() below
+  int wanted = pconnection_rearm_check(pc);  // holds rearm_mutex until pconnection_rearm() below
 
   unlock(&pc->context.mutex);
-  if (rearm_pc) pconnection_rearm(pc);  // May free pc on another thread.  Return right away.
+  if (wanted) pconnection_rearm(pc, wanted);  // May free pc on another thread.  Return right away.
   return NULL;
 }
 
@@ -2372,7 +2371,7 @@ static tslot_t *resume_one_thread(pn_proactor_t *p) {
   return ts;
 }
 
-// Call with sched lock.
+// Called with sched lock, returns with sched lock still held.
 static pn_event_batch_t *process(pcontext_t *ctx) {
   bool ctx_wake = false;
   ctx->sched_pending = false;
@@ -2381,16 +2380,17 @@ static pn_event_batch_t *process(pcontext_t *ctx) {
     ctx->sched_wake = false;
     ctx_wake = true;
   }
-
+  pn_proactor_t *p = ctx->proactor;
+  pn_event_batch_t* batch = NULL;
   switch (ctx->type) {
   case PROACTOR: {
-    pn_proactor_t *p = ctx->proactor;
     bool timeout = p->sched_timeout;
     if (timeout) p->sched_timeout = false;
     bool intr = p->sched_interrupt;
     if (intr) p->sched_interrupt = false;
     unlock(&p->sched_mutex);
-    return proactor_process(p, timeout, intr, ctx_wake);
+    batch = proactor_process(p, timeout, intr, ctx_wake);
+    break;
   }
   case PCONNECTION: {
     pconnection_t *pc = pcontext_pconnection(ctx);
@@ -2398,8 +2398,9 @@ static pn_event_batch_t *process(pcontext_t *ctx) {
     if (events) pc->psocket.sched_io_events = 0;
     bool timeout = pc->sched_timeout;
     if (timeout) pc->sched_timeout = false;
-    unlock(&ctx->proactor->sched_mutex);
-    return pconnection_process(pc, events, timeout, ctx_wake, false);
+    unlock(&p->sched_mutex);
+    batch = pconnection_process(pc, events, timeout, ctx_wake, false);
+    break;
   }
   case LISTENER: {
     pn_listener_t *l = pcontext_listener(ctx);
@@ -2413,13 +2414,15 @@ static pn_event_batch_t *process(pcontext_t *ctx) {
       if (ps->working_io_events)
         n_events++;
     }
-    unlock(&ctx->proactor->sched_mutex);
-    return listener_process(l, n_events, ctx_wake);
+    unlock(&p->sched_mutex);
+    batch = listener_process(l, n_events, ctx_wake);
+    break;
   }
   default:
     assert(NULL);
   }
-  return NULL;
+  lock(&p->sched_mutex);
+  return batch;
 }
 
 
@@ -2582,7 +2585,6 @@ static pn_event_batch_t *proactor_do_epoll(pn_proactor_t* p, bool can_block) {
   tslot_t * ts = find_tslot(p);
   unlock(&p->tslot_mutex);
   ts->generation++;  // wrapping OK.  Just looking for any change
-  pn_event_batch_t *batch = NULL;
 
   lock(&p->sched_mutex);
   assert(ts->context == NULL || ts->earmarked);
@@ -2595,11 +2597,11 @@ static pn_event_batch_t *proactor_do_epoll(pn_proactor_t* p, bool can_block) {
     pcontext_t *ctx = next_runnable(p, ts);
     if (ctx) {
       ts->state = BATCHING;
-      batch = process(ctx);  // unlocks sched_lock before returning
+      pn_event_batch_t *batch = process(ctx);
       if (batch) {
+        unlock(&p->sched_mutex);
         return batch;
       }
-      lock(&p->sched_mutex);
       bool notify = unassign_thread(ts, PROCESSING);
       if (notify) {
         unlock(&p->sched_mutex);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org