You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2021/12/17 20:40:00 UTC

[qpid-proton] branch main updated: NO-JIRA: Remove unused header definition

This is an automated email from the ASF dual-hosted git repository.

astitcher pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git


The following commit(s) were added to refs/heads/main by this push:
     new 8676e30  NO-JIRA: Remove unused header definition
8676e30 is described below

commit 8676e30b79df93753fa317c1cb155f94f3d0ca0b
Author: Andrew Stitcher <as...@apache.org>
AuthorDate: Fri Dec 17 15:37:54 2021 -0500

    NO-JIRA: Remove unused header definition
---
 c/src/core/engine-internal.h    |   2 -
 c/src/proactor/epoll-internal.h |   5 +-
 c/src/proactor/epoll.c          | 131 ++++++++++++++++++++++++----------------
 3 files changed, 82 insertions(+), 56 deletions(-)

diff --git a/c/src/core/engine-internal.h b/c/src/core/engine-internal.h
index 500be12..5c8b6c2 100644
--- a/c/src/core/engine-internal.h
+++ b/c/src/core/engine-internal.h
@@ -376,8 +376,6 @@ void pn_link_unbound(pn_link_t* link);
 void pn_ep_incref(pn_endpoint_t *endpoint);
 void pn_ep_decref(pn_endpoint_t *endpoint);
 
-pn_bytes_t pn_fill_performative(pn_transport_t *transport, const char *fmt, ...);
-
 #if __cplusplus
 }
 #endif
diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h
index f0afc9e..8e9e1b2 100644
--- a/c/src/proactor/epoll-internal.h
+++ b/c/src/proactor/epoll-internal.h
@@ -102,7 +102,7 @@ typedef struct task_t {
   tslot_t *prev_runner;
   bool sched_ready;
   bool sched_pending;           /* If true, one or more unseen epoll or other events to process() */
-  int runnables_idx;            /* 0 means unset, idx-1 is array position */
+  bool runnable ;               /* on one of the runnable lists */
 } task_t;
 
 typedef enum {
@@ -198,7 +198,7 @@ struct pn_proactor_t {
   task_t *resched_cutoff; // last resched task of current poller work snapshot.  TODO: superseded by polled_resched_count?
   task_t *resched_next;
   unsigned int resched_count;
-  unsigned int polled_resched_count;
+  unsigned int polled_resched_count; 
   pmutex tslot_mutex;
   int earmark_count;
   bool earmark_drain;
@@ -302,7 +302,6 @@ struct pn_listener_t {
   size_t pending_count;              /* number of pending accepted connections */
   size_t backlog;                 /* size of pending accepted array */
   bool close_dispatched;
-  int overflow_count;
   uint32_t sched_io_events;
 };
 
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index 6207267..ea2e25a 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -47,8 +47,6 @@
     non-proactor-task -> proactor-task
     tslot -> sched
 
- TODO: doc new work: warm (assigned), earmarked (assigned), runnables (unordered), sched_ready
- list (ordered), resched list (ordered).
  TODO: document role of sched_pending and how sched_XXX (i.e. sched_interrupt)
  transitions from "private to the scheduler" to "visible to the task".
  TODO: document task.working duration can be long: from xxx_process() to xxx_done() or null batch.
@@ -444,7 +442,7 @@ static void assign_thread(tslot_t *ts, task_t *tsk) {
   assert(!tsk->runner);
   tsk->runner = ts;
   tsk->prev_runner = NULL;
-  tsk->runnables_idx = 0;
+  tsk->runnable = false;
   ts->task = tsk;
   ts->prev_task = NULL;
 }
@@ -541,9 +539,10 @@ static void remove_earmark(tslot_t *ts) {
 static void make_runnable(task_t *tsk) {
   pn_proactor_t *p = tsk->proactor;
   assert(p->n_runnables <= p->runnables_capacity);
-  assert(!tsk->runnables_idx);
+  assert(!tsk->runnable);
   if (tsk->runner) return;
 
+  tsk->runnable = true;
   // Track it as normal or warm or earmarked
   if (pni_warm_sched) {
     tslot_t *ts = tsk->prev_runner;
@@ -553,11 +552,8 @@ static void make_runnable(task_t *tsk) {
           p->warm_runnables[p->n_warm_runnables++] = tsk;
           assign_thread(ts, tsk);
         }
-        else {
-          p->runnables[p->n_runnables] = tsk;
-          tsk->runnables_idx = p->n_runnables + 1; // off by one accounting
-          p->n_runnables++;
-        }
+        else
+          p->runnables[p->n_runnables++] = tsk;
         return;
       }
       if (ts->state == UNUSED && !p->earmark_drain) {
@@ -567,9 +563,7 @@ static void make_runnable(task_t *tsk) {
       }
     }
   }
-  p->runnables[p->n_runnables] = tsk;
-  tsk->runnables_idx = p->n_runnables + 1; // off by one accounting
-  p->n_runnables++;
+  p->runnables[p->n_runnables++] = tsk;
 }
 
 
@@ -715,7 +709,6 @@ static acceptor_t *acceptor_list_next(acceptor_t **start) {
 // Add an overflowing acceptor to the overflow list. Called with listener task lock held.
 static void acceptor_set_overflow(acceptor_t *a) {
   a->overflowed = true;
-  a->listener->overflow_count++;
   pn_proactor_t *p = a->listener->task.proactor;
   lock(&p->overflow_mutex);
   acceptor_list_append(&p->overflow, a);
@@ -741,7 +734,6 @@ static void proactor_rearm_overflow(pn_proactor_t *p) {
     assert(!a->armed);
     assert(a->overflowed);
     a->overflowed = false;
-    l->overflow_count++;
     if (rearming) {
       rearm(p, &a->psocket.epoll_io);
       a->armed = true;
@@ -991,6 +983,35 @@ static inline void pconnection_rearm(pconnection_t *pc, int wanted_now) {
   // Return immediately.  pc may have just been freed by another thread.
 }
 
+/* Only call when context switch is imminent.  Sched lock is highly contested. */
+// Call with both task and sched locks.
+static bool pconnection_sched_sync(pconnection_t *pc) {
+  uint32_t sync_events = 0;
+  uint32_t sync_args = pc->tick_pending << 1;
+  if (pc->psocket.sched_io_events) {
+    pc->new_events = pc->psocket.sched_io_events;
+    pc->psocket.sched_io_events = 0;
+    pc->current_arm = 0;  // or outside lock?
+    sync_events = pc->new_events;
+  }
+  if (pc->task.sched_ready) {
+    pc->task.sched_ready = false;
+    schedule_done(&pc->task);
+    sync_args |= 1;
+  }
+  pc->task.sched_pending = false;
+
+  if (sync_args || sync_events) {
+    // Only replace if poller has found new work for us.
+    pc->process_args = (1 << 2) | sync_args;
+    pc->process_events = sync_events;
+  }
+
+  // Indicate if there are free proactor threads
+  pn_proactor_t *p = pc->task.proactor;
+  return p->poller_suspended || p->suspend_list_head;
+}
+
 /* Call with task lock and having done a write_flush() to "know" the value of wbuf_remaining */
 static inline bool pconnection_work_pending(pconnection_t *pc) {
   if (pc->new_events || pni_task_wake_pending(&pc->task) || pc->tick_pending || pc->queued_disconnect)
@@ -1009,9 +1030,14 @@ static void pconnection_done(pconnection_t *pc) {
   bool self_sched = false;
   lock(&pc->task.mutex);
   pc->task.working = false;  // So we can schedule() ourself if necessary.  We remain the de facto
-                             // working task instance while the lock is held.
+                             // working task instance while the lock is held.  Need sched_sync too to drain
+                             // a possible stale sched_ready.
   pc->hog_count = 0;
   bool has_event = pconnection_has_event(pc);
+  // Do as little as possible while holding the sched lock
+  lock(&p->sched_mutex);
+  pconnection_sched_sync(pc);
+  unlock(&p->sched_mutex);
 
   if (has_event || pconnection_work_pending(pc)) {
     self_sched = true;
@@ -1261,6 +1287,9 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
   }
 
   // Never stop working while work remains.  hog_count exception to this rule is elsewhere.
+  lock(&pc->task.proactor->sched_mutex);
+  bool workers_free = pconnection_sched_sync(pc);
+  unlock(&pc->task.proactor->sched_mutex);
 
   if (pconnection_work_pending(pc)) {
     goto retry;  // TODO: get rid of goto without adding more locking
@@ -1277,13 +1306,8 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
     }
   }
 
-  // check one last time for new io before context switch
-  bool workers_free;
-  pn_proactor_t *p = pc->task.proactor;
-  lock(&p->sched_mutex);
-  workers_free = (p->suspend_list_head != NULL);
-  unlock(&p->sched_mutex);
   if (workers_free && !pc->task.closing && !pc->io_doublecheck) {
+    // check one last time for new io before context switch
     pc->io_doublecheck = true;
     pc->read_blocked = false;
     pc->write_blocked = false;
@@ -1587,7 +1611,7 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in
 
 // call with lock held and task.working false
 static inline bool listener_can_free(pn_listener_t *l) {
-  return l->task.closing && l->close_dispatched && !l->task.ready && !l->active_count && !l->overflow_count;
+  return l->task.closing && l->close_dispatched && !l->task.ready && !l->active_count;
 }
 
 static inline void listener_final_free(pn_listener_t *l) {
@@ -1645,6 +1669,10 @@ static void listener_begin_close(pn_listener_t* l) {
     }
     assert(!l->pending_count);
 
+    unlock(&l->task.mutex);
+    /* Remove all acceptors from the overflow list.  closing flag prevents re-insertion.*/
+    proactor_rearm_overflow(pn_listener_proactor(l));
+    lock(&l->task.mutex);
     pn_collector_put(l->collector, PN_CLASSCLASS(pn_listener), l, PN_LISTENER_CLOSE);
   }
 }
@@ -1765,17 +1793,8 @@ static pn_event_t *listener_batch_next(pn_event_batch_t *batch) {
 static void listener_done(pn_listener_t *l) {
   pn_proactor_t *p = l->task.proactor;
   tslot_t *ts = l->task.runner;
-
   lock(&l->task.mutex);
 
-  if (l->close_dispatched && l->overflow_count) {
-    unlock(&l->task.mutex);
-    /* Remove all acceptors from the overflow list.  closing flag prevents re-insertion.*/
-    proactor_rearm_overflow(pn_listener_proactor(l));
-    lock(&l->task.mutex);
-    assert(l->overflow_count == 0);
-  }
-
   // Just in case the app didn't accept all the pending accepts
   // Shuffle the list back to start at 0
   memmove(&l->pending_accepteds[0], &l->pending_accepteds[l->pending_first], l->pending_count * sizeof(accepted_t));
@@ -1799,7 +1818,25 @@ static void listener_done(pn_listener_t *l) {
   bool notify = false;
   l->task.working = false;
 
-  if (listener_can_free(l)) {
+  lock(&p->sched_mutex);
+  int n_events = 0;
+  for (size_t i = 0; i < l->acceptors_size; i++) {
+    psocket_t *ps = &l->acceptors[i].psocket;
+    if (ps->sched_io_events) {
+      ps->working_io_events = ps->sched_io_events;
+      ps->sched_io_events = 0;
+    }
+    if (ps->working_io_events)
+      n_events++;
+  }
+
+  if (l->task.sched_ready) {
+    l->task.sched_ready = false;
+    schedule_done(&l->task);
+  }
+  unlock(&p->sched_mutex);
+
+  if (!n_events && listener_can_free(l)) {
     unlock(&l->task.mutex);
     pn_listener_free(l);
     lock(&p->sched_mutex);
@@ -1809,7 +1846,7 @@ static void listener_done(pn_listener_t *l) {
     if (notify) notify_poller(p);
     if (resume_thread) resume(p, resume_thread);
     return;
-  } else if (listener_has_event(l))
+  } else if (n_events || listener_has_event(l))
     notify = schedule(&l->task);
   unlock(&l->task.mutex);
 
@@ -2302,7 +2339,7 @@ static task_t *post_event(pn_proactor_t *p, struct epoll_event *evp) {
     break;
   }
   }
-  if (tsk && !tsk->runnables_idx && !tsk->runner && !on_sched_ready_list(tsk, p))
+  if (tsk && !tsk->runnable && !tsk->runner && !on_sched_ready_list(tsk, p))
     return tsk;
   return NULL;
 }
@@ -2311,7 +2348,7 @@ static task_t *post_event(pn_proactor_t *p, struct epoll_event *evp) {
 static inline task_t *post_ready(pn_proactor_t *p, task_t *tsk) {
   tsk->sched_ready = true;
   tsk->sched_pending = true;
-  if (!tsk->runnables_idx && !tsk->runner)
+  if (!tsk->runnable && !tsk->runner)
     return tsk;
   return NULL;
 }
@@ -2349,38 +2386,33 @@ static task_t *next_runnable(pn_proactor_t *p, tslot_t *ts) {
     return ts->task;
   }
 
-  // Take any runnables task if it results in a warm pairing.
+  // warm pairing ?
   task_t *tsk = ts->prev_task;
-  if (tsk && (tsk->runnables_idx)) {
-    // A task can self delete, so don't allow it to run twice.
-    task_t **runnables_slot = &p->runnables[tsk->runnables_idx -1];
-    assert(*runnables_slot == tsk);
-    *runnables_slot = NULL;
+  if (tsk && (tsk->runnable)) {
     assign_thread(ts, tsk);
     return tsk;
   }
 
-  // check for remaining runnable tasks
+  // check for an unassigned runnable task or ready list task
   if (p->n_runnables) {
     // Any unclaimed runnable?
     while (p->n_runnables) {
       tsk = p->runnables[p->next_runnable++];
       if (p->n_runnables == p->next_runnable)
         p->n_runnables = 0;
-      if (tsk) {
+      if (tsk->runnable) {
         assign_thread(ts, tsk);
         return tsk;
       }
     }
   }
 
-  // rest of sched_ready list
   while (p->sched_ready_count) {
     tsk = p->sched_ready_current;
     assert(tsk->ready); // eventfd_mutex required post ready set and pre move to sched_ready_list
     if (post_ready(p, tsk)) {
       pop_ready_task(tsk);  // updates sched_ready_current
-      assert(!tsk->runnables_idx && !tsk->runner);
+      assert(!tsk->runnable && !tsk->runner);
       assign_thread(ts, tsk);
       return tsk;
     } else {
@@ -2388,11 +2420,10 @@ static task_t *next_runnable(pn_proactor_t *p, tslot_t *ts) {
     }
   }
 
-  // the resched list
   if (p->polled_resched_count) {
     // Unprocessed resched tasks remain.
     tsk = resched_pop_front(p);
-    assert(tsk->sched_pending && !tsk->runnables_idx && tsk->runner == RESCHEDULE_PLACEHOLDER);
+    assert(tsk->sched_pending && !tsk->runnable && tsk->runner == RESCHEDULE_PLACEHOLDER);
     tsk->runner = NULL;
     assign_thread(ts, tsk);
     return tsk;
@@ -2551,11 +2582,9 @@ static bool poller_do_epoll(struct pn_proactor_t* p, tslot_t *ts, bool can_block
     break;
   }
 
-  // We have unpolled work or at least one new epoll event.
-  // Remember tasks that together constitute new work.  See note at beginning about duplicates.
+  // We have unpolled work or at least one new epoll event
 
   lock(&p->eventfd_mutex);
-
   // Longest hold of eventfd_mutex.  The following must be quick with no external calls:
   // post_event(), make_runnable(), assign_thread(), earmark_thread().
   for (int i = 0; i < n_events; i++) {
@@ -2591,7 +2620,7 @@ static bool poller_do_epoll(struct pn_proactor_t* p, tslot_t *ts, bool can_block
 
   while (p->resched_cutoff && p->n_runnables < max_runnables && warm_tries) {
     ctsk = resched_pop_front(p);
-    assert(ctsk->runner == RESCHEDULE_PLACEHOLDER && !ctsk->runnables_idx);
+    assert(ctsk->runner == RESCHEDULE_PLACEHOLDER && !ctsk->runnable);
     ctsk->runner = NULL;  // Allow task to run again.
     warm_tries--;
     make_runnable(ctsk);

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org