You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2021/12/17 20:40:00 UTC
[qpid-proton] branch main updated: NO-JIRA: Remove unused header definition
This is an automated email from the ASF dual-hosted git repository.
astitcher pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
The following commit(s) were added to refs/heads/main by this push:
new 8676e30 NO-JIRA: Remove unused header definition
8676e30 is described below
commit 8676e30b79df93753fa317c1cb155f94f3d0ca0b
Author: Andrew Stitcher <as...@apache.org>
AuthorDate: Fri Dec 17 15:37:54 2021 -0500
NO-JIRA: Remove unused header definition
---
c/src/core/engine-internal.h | 2 -
c/src/proactor/epoll-internal.h | 5 +-
c/src/proactor/epoll.c | 131 ++++++++++++++++++++++++----------------
3 files changed, 82 insertions(+), 56 deletions(-)
diff --git a/c/src/core/engine-internal.h b/c/src/core/engine-internal.h
index 500be12..5c8b6c2 100644
--- a/c/src/core/engine-internal.h
+++ b/c/src/core/engine-internal.h
@@ -376,8 +376,6 @@ void pn_link_unbound(pn_link_t* link);
void pn_ep_incref(pn_endpoint_t *endpoint);
void pn_ep_decref(pn_endpoint_t *endpoint);
-pn_bytes_t pn_fill_performative(pn_transport_t *transport, const char *fmt, ...);
-
#if __cplusplus
}
#endif
diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h
index f0afc9e..8e9e1b2 100644
--- a/c/src/proactor/epoll-internal.h
+++ b/c/src/proactor/epoll-internal.h
@@ -102,7 +102,7 @@ typedef struct task_t {
tslot_t *prev_runner;
bool sched_ready;
bool sched_pending; /* If true, one or more unseen epoll or other events to process() */
- int runnables_idx; /* 0 means unset, idx-1 is array position */
+ bool runnable ; /* on one of the runnable lists */
} task_t;
typedef enum {
@@ -198,7 +198,7 @@ struct pn_proactor_t {
task_t *resched_cutoff; // last resched task of current poller work snapshot. TODO: superseded by polled_resched_count?
task_t *resched_next;
unsigned int resched_count;
- unsigned int polled_resched_count;
+ unsigned int polled_resched_count;
pmutex tslot_mutex;
int earmark_count;
bool earmark_drain;
@@ -302,7 +302,6 @@ struct pn_listener_t {
size_t pending_count; /* number of pending accepted connections */
size_t backlog; /* size of pending accepted array */
bool close_dispatched;
- int overflow_count;
uint32_t sched_io_events;
};
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index 6207267..ea2e25a 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -47,8 +47,6 @@
non-proactor-task -> proactor-task
tslot -> sched
- TODO: doc new work: warm (assigned), earmarked (assigned), runnables (unordered), sched_ready
- list (ordered), resched list (ordered).
TODO: document role of sched_pending and how sched_XXX (i.e. sched_interrupt)
transitions from "private to the scheduler" to "visible to the task".
TODO: document task.working duration can be long: from xxx_process() to xxx_done() or null batch.
@@ -444,7 +442,7 @@ static void assign_thread(tslot_t *ts, task_t *tsk) {
assert(!tsk->runner);
tsk->runner = ts;
tsk->prev_runner = NULL;
- tsk->runnables_idx = 0;
+ tsk->runnable = false;
ts->task = tsk;
ts->prev_task = NULL;
}
@@ -541,9 +539,10 @@ static void remove_earmark(tslot_t *ts) {
static void make_runnable(task_t *tsk) {
pn_proactor_t *p = tsk->proactor;
assert(p->n_runnables <= p->runnables_capacity);
- assert(!tsk->runnables_idx);
+ assert(!tsk->runnable);
if (tsk->runner) return;
+ tsk->runnable = true;
// Track it as normal or warm or earmarked
if (pni_warm_sched) {
tslot_t *ts = tsk->prev_runner;
@@ -553,11 +552,8 @@ static void make_runnable(task_t *tsk) {
p->warm_runnables[p->n_warm_runnables++] = tsk;
assign_thread(ts, tsk);
}
- else {
- p->runnables[p->n_runnables] = tsk;
- tsk->runnables_idx = p->n_runnables + 1; // off by one accounting
- p->n_runnables++;
- }
+ else
+ p->runnables[p->n_runnables++] = tsk;
return;
}
if (ts->state == UNUSED && !p->earmark_drain) {
@@ -567,9 +563,7 @@ static void make_runnable(task_t *tsk) {
}
}
}
- p->runnables[p->n_runnables] = tsk;
- tsk->runnables_idx = p->n_runnables + 1; // off by one accounting
- p->n_runnables++;
+ p->runnables[p->n_runnables++] = tsk;
}
@@ -715,7 +709,6 @@ static acceptor_t *acceptor_list_next(acceptor_t **start) {
// Add an overflowing acceptor to the overflow list. Called with listener task lock held.
static void acceptor_set_overflow(acceptor_t *a) {
a->overflowed = true;
- a->listener->overflow_count++;
pn_proactor_t *p = a->listener->task.proactor;
lock(&p->overflow_mutex);
acceptor_list_append(&p->overflow, a);
@@ -741,7 +734,6 @@ static void proactor_rearm_overflow(pn_proactor_t *p) {
assert(!a->armed);
assert(a->overflowed);
a->overflowed = false;
- l->overflow_count++;
if (rearming) {
rearm(p, &a->psocket.epoll_io);
a->armed = true;
@@ -991,6 +983,35 @@ static inline void pconnection_rearm(pconnection_t *pc, int wanted_now) {
// Return immediately. pc may have just been freed by another thread.
}
+/* Only call when context switch is imminent. Sched lock is highly contested. */
+// Call with both task and sched locks.
+static bool pconnection_sched_sync(pconnection_t *pc) {
+ uint32_t sync_events = 0;
+ uint32_t sync_args = pc->tick_pending << 1;
+ if (pc->psocket.sched_io_events) {
+ pc->new_events = pc->psocket.sched_io_events;
+ pc->psocket.sched_io_events = 0;
+ pc->current_arm = 0; // or outside lock?
+ sync_events = pc->new_events;
+ }
+ if (pc->task.sched_ready) {
+ pc->task.sched_ready = false;
+ schedule_done(&pc->task);
+ sync_args |= 1;
+ }
+ pc->task.sched_pending = false;
+
+ if (sync_args || sync_events) {
+ // Only replace if poller has found new work for us.
+ pc->process_args = (1 << 2) | sync_args;
+ pc->process_events = sync_events;
+ }
+
+ // Indicate if there are free proactor threads
+ pn_proactor_t *p = pc->task.proactor;
+ return p->poller_suspended || p->suspend_list_head;
+}
+
/* Call with task lock and having done a write_flush() to "know" the value of wbuf_remaining */
static inline bool pconnection_work_pending(pconnection_t *pc) {
if (pc->new_events || pni_task_wake_pending(&pc->task) || pc->tick_pending || pc->queued_disconnect)
@@ -1009,9 +1030,14 @@ static void pconnection_done(pconnection_t *pc) {
bool self_sched = false;
lock(&pc->task.mutex);
pc->task.working = false; // So we can schedule() ourself if necessary. We remain the de facto
- // working task instance while the lock is held.
+ // working task instance while the lock is held. Need sched_sync too to drain
+ // a possible stale sched_ready.
pc->hog_count = 0;
bool has_event = pconnection_has_event(pc);
+ // Do as little as possible while holding the sched lock
+ lock(&p->sched_mutex);
+ pconnection_sched_sync(pc);
+ unlock(&p->sched_mutex);
if (has_event || pconnection_work_pending(pc)) {
self_sched = true;
@@ -1261,6 +1287,9 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
}
// Never stop working while work remains. hog_count exception to this rule is elsewhere.
+ lock(&pc->task.proactor->sched_mutex);
+ bool workers_free = pconnection_sched_sync(pc);
+ unlock(&pc->task.proactor->sched_mutex);
if (pconnection_work_pending(pc)) {
goto retry; // TODO: get rid of goto without adding more locking
@@ -1277,13 +1306,8 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
}
}
- // check one last time for new io before context switch
- bool workers_free;
- pn_proactor_t *p = pc->task.proactor;
- lock(&p->sched_mutex);
- workers_free = (p->suspend_list_head != NULL);
- unlock(&p->sched_mutex);
if (workers_free && !pc->task.closing && !pc->io_doublecheck) {
+ // check one last time for new io before context switch
pc->io_doublecheck = true;
pc->read_blocked = false;
pc->write_blocked = false;
@@ -1587,7 +1611,7 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in
// call with lock held and task.working false
static inline bool listener_can_free(pn_listener_t *l) {
- return l->task.closing && l->close_dispatched && !l->task.ready && !l->active_count && !l->overflow_count;
+ return l->task.closing && l->close_dispatched && !l->task.ready && !l->active_count;
}
static inline void listener_final_free(pn_listener_t *l) {
@@ -1645,6 +1669,10 @@ static void listener_begin_close(pn_listener_t* l) {
}
assert(!l->pending_count);
+ unlock(&l->task.mutex);
+ /* Remove all acceptors from the overflow list. closing flag prevents re-insertion.*/
+ proactor_rearm_overflow(pn_listener_proactor(l));
+ lock(&l->task.mutex);
pn_collector_put(l->collector, PN_CLASSCLASS(pn_listener), l, PN_LISTENER_CLOSE);
}
}
@@ -1765,17 +1793,8 @@ static pn_event_t *listener_batch_next(pn_event_batch_t *batch) {
static void listener_done(pn_listener_t *l) {
pn_proactor_t *p = l->task.proactor;
tslot_t *ts = l->task.runner;
-
lock(&l->task.mutex);
- if (l->close_dispatched && l->overflow_count) {
- unlock(&l->task.mutex);
- /* Remove all acceptors from the overflow list. closing flag prevents re-insertion.*/
- proactor_rearm_overflow(pn_listener_proactor(l));
- lock(&l->task.mutex);
- assert(l->overflow_count == 0);
- }
-
// Just in case the app didn't accept all the pending accepts
// Shuffle the list back to start at 0
memmove(&l->pending_accepteds[0], &l->pending_accepteds[l->pending_first], l->pending_count * sizeof(accepted_t));
@@ -1799,7 +1818,25 @@ static void listener_done(pn_listener_t *l) {
bool notify = false;
l->task.working = false;
- if (listener_can_free(l)) {
+ lock(&p->sched_mutex);
+ int n_events = 0;
+ for (size_t i = 0; i < l->acceptors_size; i++) {
+ psocket_t *ps = &l->acceptors[i].psocket;
+ if (ps->sched_io_events) {
+ ps->working_io_events = ps->sched_io_events;
+ ps->sched_io_events = 0;
+ }
+ if (ps->working_io_events)
+ n_events++;
+ }
+
+ if (l->task.sched_ready) {
+ l->task.sched_ready = false;
+ schedule_done(&l->task);
+ }
+ unlock(&p->sched_mutex);
+
+ if (!n_events && listener_can_free(l)) {
unlock(&l->task.mutex);
pn_listener_free(l);
lock(&p->sched_mutex);
@@ -1809,7 +1846,7 @@ static void listener_done(pn_listener_t *l) {
if (notify) notify_poller(p);
if (resume_thread) resume(p, resume_thread);
return;
- } else if (listener_has_event(l))
+ } else if (n_events || listener_has_event(l))
notify = schedule(&l->task);
unlock(&l->task.mutex);
@@ -2302,7 +2339,7 @@ static task_t *post_event(pn_proactor_t *p, struct epoll_event *evp) {
break;
}
}
- if (tsk && !tsk->runnables_idx && !tsk->runner && !on_sched_ready_list(tsk, p))
+ if (tsk && !tsk->runnable && !tsk->runner && !on_sched_ready_list(tsk, p))
return tsk;
return NULL;
}
@@ -2311,7 +2348,7 @@ static task_t *post_event(pn_proactor_t *p, struct epoll_event *evp) {
static inline task_t *post_ready(pn_proactor_t *p, task_t *tsk) {
tsk->sched_ready = true;
tsk->sched_pending = true;
- if (!tsk->runnables_idx && !tsk->runner)
+ if (!tsk->runnable && !tsk->runner)
return tsk;
return NULL;
}
@@ -2349,38 +2386,33 @@ static task_t *next_runnable(pn_proactor_t *p, tslot_t *ts) {
return ts->task;
}
- // Take any runnables task if it results in a warm pairing.
+ // warm pairing ?
task_t *tsk = ts->prev_task;
- if (tsk && (tsk->runnables_idx)) {
- // A task can self delete, so don't allow it to run twice.
- task_t **runnables_slot = &p->runnables[tsk->runnables_idx -1];
- assert(*runnables_slot == tsk);
- *runnables_slot = NULL;
+ if (tsk && (tsk->runnable)) {
assign_thread(ts, tsk);
return tsk;
}
- // check for remaining runnable tasks
+ // check for an unassigned runnable task or ready list task
if (p->n_runnables) {
// Any unclaimed runnable?
while (p->n_runnables) {
tsk = p->runnables[p->next_runnable++];
if (p->n_runnables == p->next_runnable)
p->n_runnables = 0;
- if (tsk) {
+ if (tsk->runnable) {
assign_thread(ts, tsk);
return tsk;
}
}
}
- // rest of sched_ready list
while (p->sched_ready_count) {
tsk = p->sched_ready_current;
assert(tsk->ready); // eventfd_mutex required post ready set and pre move to sched_ready_list
if (post_ready(p, tsk)) {
pop_ready_task(tsk); // updates sched_ready_current
- assert(!tsk->runnables_idx && !tsk->runner);
+ assert(!tsk->runnable && !tsk->runner);
assign_thread(ts, tsk);
return tsk;
} else {
@@ -2388,11 +2420,10 @@ static task_t *next_runnable(pn_proactor_t *p, tslot_t *ts) {
}
}
- // the resched list
if (p->polled_resched_count) {
// Unprocessed resched tasks remain.
tsk = resched_pop_front(p);
- assert(tsk->sched_pending && !tsk->runnables_idx && tsk->runner == RESCHEDULE_PLACEHOLDER);
+ assert(tsk->sched_pending && !tsk->runnable && tsk->runner == RESCHEDULE_PLACEHOLDER);
tsk->runner = NULL;
assign_thread(ts, tsk);
return tsk;
@@ -2551,11 +2582,9 @@ static bool poller_do_epoll(struct pn_proactor_t* p, tslot_t *ts, bool can_block
break;
}
- // We have unpolled work or at least one new epoll event.
- // Remember tasks that together constitute new work. See note at beginning about duplicates.
+ // We have unpolled work or at least one new epoll event
lock(&p->eventfd_mutex);
-
// Longest hold of eventfd_mutex. The following must be quick with no external calls:
// post_event(), make_runnable(), assign_thread(), earmark_thread().
for (int i = 0; i < n_events; i++) {
@@ -2591,7 +2620,7 @@ static bool poller_do_epoll(struct pn_proactor_t* p, tslot_t *ts, bool can_block
while (p->resched_cutoff && p->n_runnables < max_runnables && warm_tries) {
ctsk = resched_pop_front(p);
- assert(ctsk->runner == RESCHEDULE_PLACEHOLDER && !ctsk->runnables_idx);
+ assert(ctsk->runner == RESCHEDULE_PLACEHOLDER && !ctsk->runnable);
ctsk->runner = NULL; // Allow task to run again.
warm_tries--;
make_runnable(ctsk);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org