You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cl...@apache.org on 2021/12/16 02:12:20 UTC

[qpid-proton] branch main updated (89082f1 -> 996b9b1)

This is an automated email from the ASF dual-hosted git repository.

cliffjansen pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git.


    from 89082f1  PROTON-2476: Restore truncation behaviour for transfer frame traces
     new 4f8ac48  PROTON-2362: epoll proactor: prevent opportunistic warm task from running twice
     new cfd300e  PROTON-2362: epoll proactor: fix locking in listener overflow processing
     new 996b9b1  PROTON-2362: epoll proactor: remove ready list sneak peek optimization causing early task deletion

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 c/src/proactor/epoll-internal.h |   5 +-
 c/src/proactor/epoll.c          | 131 ++++++++++++++++------------------------
 2 files changed, 54 insertions(+), 82 deletions(-)

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-proton] 01/03: PROTON-2362: epoll proactor: prevent opportunistic warm task from running twice

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cliffjansen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit 4f8ac48c48e66d1650308bc1be694dae3215fc9a
Author: Cliff Jansen <cl...@apache.org>
AuthorDate: Wed Dec 15 17:43:30 2021 -0800

    PROTON-2362: epoll proactor: prevent opportunistic warm task from running twice
---
 c/src/proactor/epoll-internal.h |  4 ++--
 c/src/proactor/epoll.c          | 46 +++++++++++++++++++++++++++--------------
 2 files changed, 32 insertions(+), 18 deletions(-)

diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h
index 8e9e1b2..155277b 100644
--- a/c/src/proactor/epoll-internal.h
+++ b/c/src/proactor/epoll-internal.h
@@ -102,7 +102,7 @@ typedef struct task_t {
   tslot_t *prev_runner;
   bool sched_ready;
   bool sched_pending;           /* If true, one or more unseen epoll or other events to process() */
-  bool runnable ;               /* on one of the runnable lists */
+  int runnables_idx;            /* 0 means unset, idx-1 is array position */
 } task_t;
 
 typedef enum {
@@ -198,7 +198,7 @@ struct pn_proactor_t {
   task_t *resched_cutoff; // last resched task of current poller work snapshot.  TODO: superseded by polled_resched_count?
   task_t *resched_next;
   unsigned int resched_count;
-  unsigned int polled_resched_count; 
+  unsigned int polled_resched_count;
   pmutex tslot_mutex;
   int earmark_count;
   bool earmark_drain;
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index ea2e25a..2363f48 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -47,6 +47,8 @@
     non-proactor-task -> proactor-task
     tslot -> sched
 
+ TODO: doc new work: warm (assigned), earmarked (assigned), runnables (unordered), sched_ready
+ list (ordered), resched list (ordered).
  TODO: document role of sched_pending and how sched_XXX (i.e. sched_interrupt)
  transitions from "private to the scheduler" to "visible to the task".
  TODO: document task.working duration can be long: from xxx_process() to xxx_done() or null batch.
@@ -442,7 +444,7 @@ static void assign_thread(tslot_t *ts, task_t *tsk) {
   assert(!tsk->runner);
   tsk->runner = ts;
   tsk->prev_runner = NULL;
-  tsk->runnable = false;
+  tsk->runnables_idx = 0;
   ts->task = tsk;
   ts->prev_task = NULL;
 }
@@ -539,10 +541,9 @@ static void remove_earmark(tslot_t *ts) {
 static void make_runnable(task_t *tsk) {
   pn_proactor_t *p = tsk->proactor;
   assert(p->n_runnables <= p->runnables_capacity);
-  assert(!tsk->runnable);
+  assert(!tsk->runnables_idx);
   if (tsk->runner) return;
 
-  tsk->runnable = true;
   // Track it as normal or warm or earmarked
   if (pni_warm_sched) {
     tslot_t *ts = tsk->prev_runner;
@@ -552,8 +553,11 @@ static void make_runnable(task_t *tsk) {
           p->warm_runnables[p->n_warm_runnables++] = tsk;
           assign_thread(ts, tsk);
         }
-        else
-          p->runnables[p->n_runnables++] = tsk;
+        else {
+          p->runnables[p->n_runnables] = tsk;
+          tsk->runnables_idx = p->n_runnables + 1; // off by one accounting
+          p->n_runnables++;
+        }
         return;
       }
       if (ts->state == UNUSED && !p->earmark_drain) {
@@ -563,7 +567,9 @@ static void make_runnable(task_t *tsk) {
       }
     }
   }
-  p->runnables[p->n_runnables++] = tsk;
+  p->runnables[p->n_runnables] = tsk;
+  tsk->runnables_idx = p->n_runnables + 1; // off by one accounting
+  p->n_runnables++;
 }
 
 
@@ -2339,7 +2345,7 @@ static task_t *post_event(pn_proactor_t *p, struct epoll_event *evp) {
     break;
   }
   }
-  if (tsk && !tsk->runnable && !tsk->runner && !on_sched_ready_list(tsk, p))
+  if (tsk && !tsk->runnables_idx && !tsk->runner && !on_sched_ready_list(tsk, p))
     return tsk;
   return NULL;
 }
@@ -2348,7 +2354,7 @@ static task_t *post_event(pn_proactor_t *p, struct epoll_event *evp) {
 static inline task_t *post_ready(pn_proactor_t *p, task_t *tsk) {
   tsk->sched_ready = true;
   tsk->sched_pending = true;
-  if (!tsk->runnable && !tsk->runner)
+  if (!tsk->runnables_idx && !tsk->runner)
     return tsk;
   return NULL;
 }
@@ -2386,33 +2392,38 @@ static task_t *next_runnable(pn_proactor_t *p, tslot_t *ts) {
     return ts->task;
   }
 
-  // warm pairing ?
+  // Take any runnables task if it results in a warm pairing.
   task_t *tsk = ts->prev_task;
-  if (tsk && (tsk->runnable)) {
+  if (tsk && (tsk->runnables_idx)) {
+    // A task can self delete, so don't allow it to run twice.
+    task_t **runnables_slot = &p->runnables[tsk->runnables_idx -1];
+    assert(*runnables_slot == tsk);
+    *runnables_slot = NULL;
     assign_thread(ts, tsk);
     return tsk;
   }
 
-  // check for an unassigned runnable task or ready list task
+  // check for remaining runnable tasks
   if (p->n_runnables) {
     // Any unclaimed runnable?
     while (p->n_runnables) {
       tsk = p->runnables[p->next_runnable++];
       if (p->n_runnables == p->next_runnable)
         p->n_runnables = 0;
-      if (tsk->runnable) {
+      if (tsk) {
         assign_thread(ts, tsk);
         return tsk;
       }
     }
   }
 
+  // rest of sched_ready list
   while (p->sched_ready_count) {
     tsk = p->sched_ready_current;
     assert(tsk->ready); // eventfd_mutex required post ready set and pre move to sched_ready_list
     if (post_ready(p, tsk)) {
       pop_ready_task(tsk);  // updates sched_ready_current
-      assert(!tsk->runnable && !tsk->runner);
+      assert(!tsk->runnables_idx && !tsk->runner);
       assign_thread(ts, tsk);
       return tsk;
     } else {
@@ -2420,10 +2431,11 @@ static task_t *next_runnable(pn_proactor_t *p, tslot_t *ts) {
     }
   }
 
+  // the resched list
   if (p->polled_resched_count) {
     // Unprocessed resched tasks remain.
     tsk = resched_pop_front(p);
-    assert(tsk->sched_pending && !tsk->runnable && tsk->runner == RESCHEDULE_PLACEHOLDER);
+    assert(tsk->sched_pending && !tsk->runnables_idx && tsk->runner == RESCHEDULE_PLACEHOLDER);
     tsk->runner = NULL;
     assign_thread(ts, tsk);
     return tsk;
@@ -2582,9 +2594,11 @@ static bool poller_do_epoll(struct pn_proactor_t* p, tslot_t *ts, bool can_block
     break;
   }
 
-  // We have unpolled work or at least one new epoll event
+  // We have unpolled work or at least one new epoll event.
+  // Remember tasks that together constitute new work.  See note at beginning about duplicates.
 
   lock(&p->eventfd_mutex);
+
   // Longest hold of eventfd_mutex.  The following must be quick with no external calls:
   // post_event(), make_runnable(), assign_thread(), earmark_thread().
   for (int i = 0; i < n_events; i++) {
@@ -2620,7 +2634,7 @@ static bool poller_do_epoll(struct pn_proactor_t* p, tslot_t *ts, bool can_block
 
   while (p->resched_cutoff && p->n_runnables < max_runnables && warm_tries) {
     ctsk = resched_pop_front(p);
-    assert(ctsk->runner == RESCHEDULE_PLACEHOLDER && !ctsk->runnable);
+    assert(ctsk->runner == RESCHEDULE_PLACEHOLDER && !ctsk->runnables_idx);
     ctsk->runner = NULL;  // Allow task to run again.
     warm_tries--;
     make_runnable(ctsk);

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-proton] 02/03: PROTON-2362: epoll proactor: fix locking in listener overflow processing

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cliffjansen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit cfd300e7d41ddfd39ae3e130fde8d1d654cfd099
Author: Cliff Jansen <cl...@apache.org>
AuthorDate: Wed Dec 15 17:48:20 2021 -0800

    PROTON-2362: epoll proactor: fix locking in listener overflow processing
---
 c/src/proactor/epoll-internal.h |  1 +
 c/src/proactor/epoll.c          | 17 ++++++++++++-----
 2 files changed, 13 insertions(+), 5 deletions(-)

diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h
index 155277b..f0afc9e 100644
--- a/c/src/proactor/epoll-internal.h
+++ b/c/src/proactor/epoll-internal.h
@@ -302,6 +302,7 @@ struct pn_listener_t {
   size_t pending_count;              /* number of pending accepted connections */
   size_t backlog;                 /* size of pending accepted array */
   bool close_dispatched;
+  int overflow_count;
   uint32_t sched_io_events;
 };
 
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index 2363f48..6a5beb4 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -715,6 +715,7 @@ static acceptor_t *acceptor_list_next(acceptor_t **start) {
 // Add an overflowing acceptor to the overflow list. Called with listener task lock held.
 static void acceptor_set_overflow(acceptor_t *a) {
   a->overflowed = true;
+  a->listener->overflow_count++;
   pn_proactor_t *p = a->listener->task.proactor;
   lock(&p->overflow_mutex);
   acceptor_list_append(&p->overflow, a);
@@ -740,6 +741,7 @@ static void proactor_rearm_overflow(pn_proactor_t *p) {
     assert(!a->armed);
     assert(a->overflowed);
     a->overflowed = false;
+    l->overflow_count++;
     if (rearming) {
       rearm(p, &a->psocket.epoll_io);
       a->armed = true;
@@ -1617,7 +1619,7 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in
 
 // call with lock held and task.working false
 static inline bool listener_can_free(pn_listener_t *l) {
-  return l->task.closing && l->close_dispatched && !l->task.ready && !l->active_count;
+  return l->task.closing && l->close_dispatched && !l->task.ready && !l->active_count && !l->overflow_count;
 }
 
 static inline void listener_final_free(pn_listener_t *l) {
@@ -1675,10 +1677,6 @@ static void listener_begin_close(pn_listener_t* l) {
     }
     assert(!l->pending_count);
 
-    unlock(&l->task.mutex);
-    /* Remove all acceptors from the overflow list.  closing flag prevents re-insertion.*/
-    proactor_rearm_overflow(pn_listener_proactor(l));
-    lock(&l->task.mutex);
     pn_collector_put(l->collector, PN_CLASSCLASS(pn_listener), l, PN_LISTENER_CLOSE);
   }
 }
@@ -1799,8 +1797,17 @@ static pn_event_t *listener_batch_next(pn_event_batch_t *batch) {
 static void listener_done(pn_listener_t *l) {
   pn_proactor_t *p = l->task.proactor;
   tslot_t *ts = l->task.runner;
+
   lock(&l->task.mutex);
 
+  if (l->close_dispatched && l->overflow_count) {
+    unlock(&l->task.mutex);
+    /* Remove all acceptors from the overflow list.  closing flag prevents re-insertion.*/
+    proactor_rearm_overflow(pn_listener_proactor(l));
+    lock(&l->task.mutex);
+    assert(l->overflow_count == 0);
+  }
+
   // Just in case the app didn't accept all the pending accepts
   // Shuffle the list back to start at 0
   memmove(&l->pending_accepteds[0], &l->pending_accepteds[l->pending_first], l->pending_count * sizeof(accepted_t));

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-proton] 03/03: PROTON-2362: epoll proactor: remove ready list sneak peek optimization causing early task deletion

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cliffjansen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit 996b9b114fdb4682c8114ad700705446ff3a24fd
Author: Cliff Jansen <cl...@apache.org>
AuthorDate: Wed Dec 15 17:54:51 2021 -0800

    PROTON-2362: epoll proactor: remove ready list sneak peek optimization causing early task deletion
---
 c/src/proactor/epoll.c | 68 +++++++-------------------------------------------
 1 file changed, 9 insertions(+), 59 deletions(-)

diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index 6a5beb4..6207267 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -991,35 +991,6 @@ static inline void pconnection_rearm(pconnection_t *pc, int wanted_now) {
   // Return immediately.  pc may have just been freed by another thread.
 }
 
-/* Only call when context switch is imminent.  Sched lock is highly contested. */
-// Call with both task and sched locks.
-static bool pconnection_sched_sync(pconnection_t *pc) {
-  uint32_t sync_events = 0;
-  uint32_t sync_args = pc->tick_pending << 1;
-  if (pc->psocket.sched_io_events) {
-    pc->new_events = pc->psocket.sched_io_events;
-    pc->psocket.sched_io_events = 0;
-    pc->current_arm = 0;  // or outside lock?
-    sync_events = pc->new_events;
-  }
-  if (pc->task.sched_ready) {
-    pc->task.sched_ready = false;
-    schedule_done(&pc->task);
-    sync_args |= 1;
-  }
-  pc->task.sched_pending = false;
-
-  if (sync_args || sync_events) {
-    // Only replace if poller has found new work for us.
-    pc->process_args = (1 << 2) | sync_args;
-    pc->process_events = sync_events;
-  }
-
-  // Indicate if there are free proactor threads
-  pn_proactor_t *p = pc->task.proactor;
-  return p->poller_suspended || p->suspend_list_head;
-}
-
 /* Call with task lock and having done a write_flush() to "know" the value of wbuf_remaining */
 static inline bool pconnection_work_pending(pconnection_t *pc) {
   if (pc->new_events || pni_task_wake_pending(&pc->task) || pc->tick_pending || pc->queued_disconnect)
@@ -1038,14 +1009,9 @@ static void pconnection_done(pconnection_t *pc) {
   bool self_sched = false;
   lock(&pc->task.mutex);
   pc->task.working = false;  // So we can schedule() ourself if necessary.  We remain the de facto
-                             // working task instance while the lock is held.  Need sched_sync too to drain
-                             // a possible stale sched_ready.
+                             // working task instance while the lock is held.
   pc->hog_count = 0;
   bool has_event = pconnection_has_event(pc);
-  // Do as little as possible while holding the sched lock
-  lock(&p->sched_mutex);
-  pconnection_sched_sync(pc);
-  unlock(&p->sched_mutex);
 
   if (has_event || pconnection_work_pending(pc)) {
     self_sched = true;
@@ -1295,9 +1261,6 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
   }
 
   // Never stop working while work remains.  hog_count exception to this rule is elsewhere.
-  lock(&pc->task.proactor->sched_mutex);
-  bool workers_free = pconnection_sched_sync(pc);
-  unlock(&pc->task.proactor->sched_mutex);
 
   if (pconnection_work_pending(pc)) {
     goto retry;  // TODO: get rid of goto without adding more locking
@@ -1314,8 +1277,13 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
     }
   }
 
+  // check one last time for new io before context switch
+  bool workers_free;
+  pn_proactor_t *p = pc->task.proactor;
+  lock(&p->sched_mutex);
+  workers_free = (p->suspend_list_head != NULL);
+  unlock(&p->sched_mutex);
   if (workers_free && !pc->task.closing && !pc->io_doublecheck) {
-    // check one last time for new io before context switch
     pc->io_doublecheck = true;
     pc->read_blocked = false;
     pc->write_blocked = false;
@@ -1831,25 +1799,7 @@ static void listener_done(pn_listener_t *l) {
   bool notify = false;
   l->task.working = false;
 
-  lock(&p->sched_mutex);
-  int n_events = 0;
-  for (size_t i = 0; i < l->acceptors_size; i++) {
-    psocket_t *ps = &l->acceptors[i].psocket;
-    if (ps->sched_io_events) {
-      ps->working_io_events = ps->sched_io_events;
-      ps->sched_io_events = 0;
-    }
-    if (ps->working_io_events)
-      n_events++;
-  }
-
-  if (l->task.sched_ready) {
-    l->task.sched_ready = false;
-    schedule_done(&l->task);
-  }
-  unlock(&p->sched_mutex);
-
-  if (!n_events && listener_can_free(l)) {
+  if (listener_can_free(l)) {
     unlock(&l->task.mutex);
     pn_listener_free(l);
     lock(&p->sched_mutex);
@@ -1859,7 +1809,7 @@ static void listener_done(pn_listener_t *l) {
     if (notify) notify_poller(p);
     if (resume_thread) resume(p, resume_thread);
     return;
-  } else if (n_events || listener_has_event(l))
+  } else if (listener_has_event(l))
     notify = schedule(&l->task);
   unlock(&l->task.mutex);
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org