You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cl...@apache.org on 2021/12/16 02:12:21 UTC

[qpid-proton] 01/03: PROTON-2362: epoll proactor: prevent opportunistic warm task from running twice

This is an automated email from the ASF dual-hosted git repository.

cliffjansen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit 4f8ac48c48e66d1650308bc1be694dae3215fc9a
Author: Cliff Jansen <cl...@apache.org>
AuthorDate: Wed Dec 15 17:43:30 2021 -0800

    PROTON-2362: epoll proactor: prevent opportunistic warm task from running twice
---
 c/src/proactor/epoll-internal.h |  4 ++--
 c/src/proactor/epoll.c          | 46 +++++++++++++++++++++++++++--------------
 2 files changed, 32 insertions(+), 18 deletions(-)

diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h
index 8e9e1b2..155277b 100644
--- a/c/src/proactor/epoll-internal.h
+++ b/c/src/proactor/epoll-internal.h
@@ -102,7 +102,7 @@ typedef struct task_t {
   tslot_t *prev_runner;
   bool sched_ready;
   bool sched_pending;           /* If true, one or more unseen epoll or other events to process() */
-  bool runnable ;               /* on one of the runnable lists */
+  int runnables_idx;            /* 0 means unset, idx-1 is array position */
 } task_t;
 
 typedef enum {
@@ -198,7 +198,7 @@ struct pn_proactor_t {
   task_t *resched_cutoff; // last resched task of current poller work snapshot.  TODO: superseded by polled_resched_count?
   task_t *resched_next;
   unsigned int resched_count;
-  unsigned int polled_resched_count; 
+  unsigned int polled_resched_count;
   pmutex tslot_mutex;
   int earmark_count;
   bool earmark_drain;
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index ea2e25a..2363f48 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -47,6 +47,8 @@
     non-proactor-task -> proactor-task
     tslot -> sched
 
+ TODO: doc new work: warm (assigned), earmarked (assigned), runnables (unordered), sched_ready
+ list (ordered), resched list (ordered).
  TODO: document role of sched_pending and how sched_XXX (i.e. sched_interrupt)
  transitions from "private to the scheduler" to "visible to the task".
  TODO: document task.working duration can be long: from xxx_process() to xxx_done() or null batch.
@@ -442,7 +444,7 @@ static void assign_thread(tslot_t *ts, task_t *tsk) {
   assert(!tsk->runner);
   tsk->runner = ts;
   tsk->prev_runner = NULL;
-  tsk->runnable = false;
+  tsk->runnables_idx = 0;
   ts->task = tsk;
   ts->prev_task = NULL;
 }
@@ -539,10 +541,9 @@ static void remove_earmark(tslot_t *ts) {
 static void make_runnable(task_t *tsk) {
   pn_proactor_t *p = tsk->proactor;
   assert(p->n_runnables <= p->runnables_capacity);
-  assert(!tsk->runnable);
+  assert(!tsk->runnables_idx);
   if (tsk->runner) return;
 
-  tsk->runnable = true;
   // Track it as normal or warm or earmarked
   if (pni_warm_sched) {
     tslot_t *ts = tsk->prev_runner;
@@ -552,8 +553,11 @@ static void make_runnable(task_t *tsk) {
           p->warm_runnables[p->n_warm_runnables++] = tsk;
           assign_thread(ts, tsk);
         }
-        else
-          p->runnables[p->n_runnables++] = tsk;
+        else {
+          p->runnables[p->n_runnables] = tsk;
+          tsk->runnables_idx = p->n_runnables + 1; // off by one accounting
+          p->n_runnables++;
+        }
         return;
       }
       if (ts->state == UNUSED && !p->earmark_drain) {
@@ -563,7 +567,9 @@ static void make_runnable(task_t *tsk) {
       }
     }
   }
-  p->runnables[p->n_runnables++] = tsk;
+  p->runnables[p->n_runnables] = tsk;
+  tsk->runnables_idx = p->n_runnables + 1; // off by one accounting
+  p->n_runnables++;
 }
 
 
@@ -2339,7 +2345,7 @@ static task_t *post_event(pn_proactor_t *p, struct epoll_event *evp) {
     break;
   }
   }
-  if (tsk && !tsk->runnable && !tsk->runner && !on_sched_ready_list(tsk, p))
+  if (tsk && !tsk->runnables_idx && !tsk->runner && !on_sched_ready_list(tsk, p))
     return tsk;
   return NULL;
 }
@@ -2348,7 +2354,7 @@ static task_t *post_event(pn_proactor_t *p, struct epoll_event *evp) {
 static inline task_t *post_ready(pn_proactor_t *p, task_t *tsk) {
   tsk->sched_ready = true;
   tsk->sched_pending = true;
-  if (!tsk->runnable && !tsk->runner)
+  if (!tsk->runnables_idx && !tsk->runner)
     return tsk;
   return NULL;
 }
@@ -2386,33 +2392,38 @@ static task_t *next_runnable(pn_proactor_t *p, tslot_t *ts) {
     return ts->task;
   }
 
-  // warm pairing ?
+  // Take any runnables task if it results in a warm pairing.
   task_t *tsk = ts->prev_task;
-  if (tsk && (tsk->runnable)) {
+  if (tsk && (tsk->runnables_idx)) {
+    // A task can self delete, so don't allow it to run twice.
+    task_t **runnables_slot = &p->runnables[tsk->runnables_idx -1];
+    assert(*runnables_slot == tsk);
+    *runnables_slot = NULL;
     assign_thread(ts, tsk);
     return tsk;
   }
 
-  // check for an unassigned runnable task or ready list task
+  // check for remaining runnable tasks
   if (p->n_runnables) {
     // Any unclaimed runnable?
     while (p->n_runnables) {
       tsk = p->runnables[p->next_runnable++];
       if (p->n_runnables == p->next_runnable)
         p->n_runnables = 0;
-      if (tsk->runnable) {
+      if (tsk) {
         assign_thread(ts, tsk);
         return tsk;
       }
     }
   }
 
+  // rest of sched_ready list
   while (p->sched_ready_count) {
     tsk = p->sched_ready_current;
     assert(tsk->ready); // eventfd_mutex required post ready set and pre move to sched_ready_list
     if (post_ready(p, tsk)) {
       pop_ready_task(tsk);  // updates sched_ready_current
-      assert(!tsk->runnable && !tsk->runner);
+      assert(!tsk->runnables_idx && !tsk->runner);
       assign_thread(ts, tsk);
       return tsk;
     } else {
@@ -2420,10 +2431,11 @@ static task_t *next_runnable(pn_proactor_t *p, tslot_t *ts) {
     }
   }
 
+  // the resched list
   if (p->polled_resched_count) {
     // Unprocessed resched tasks remain.
     tsk = resched_pop_front(p);
-    assert(tsk->sched_pending && !tsk->runnable && tsk->runner == RESCHEDULE_PLACEHOLDER);
+    assert(tsk->sched_pending && !tsk->runnables_idx && tsk->runner == RESCHEDULE_PLACEHOLDER);
     tsk->runner = NULL;
     assign_thread(ts, tsk);
     return tsk;
@@ -2582,9 +2594,11 @@ static bool poller_do_epoll(struct pn_proactor_t* p, tslot_t *ts, bool can_block
     break;
   }
 
-  // We have unpolled work or at least one new epoll event
+  // We have unpolled work or at least one new epoll event.
+  // Remember tasks that together constitute new work.  See note at beginning about duplicates.
 
   lock(&p->eventfd_mutex);
+
   // Longest hold of eventfd_mutex.  The following must be quick with no external calls:
   // post_event(), make_runnable(), assign_thread(), earmark_thread().
   for (int i = 0; i < n_events; i++) {
@@ -2620,7 +2634,7 @@ static bool poller_do_epoll(struct pn_proactor_t* p, tslot_t *ts, bool can_block
 
   while (p->resched_cutoff && p->n_runnables < max_runnables && warm_tries) {
     ctsk = resched_pop_front(p);
-    assert(ctsk->runner == RESCHEDULE_PLACEHOLDER && !ctsk->runnable);
+    assert(ctsk->runner == RESCHEDULE_PLACEHOLDER && !ctsk->runnables_idx);
     ctsk->runner = NULL;  // Allow task to run again.
     warm_tries--;
     make_runnable(ctsk);

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org