You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cl...@apache.org on 2021/12/16 02:12:22 UTC
[qpid-proton] 02/03: PROTON-2362: epoll proactor: fix locking in listener overflow processing
This is an automated email from the ASF dual-hosted git repository.
cliffjansen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
commit cfd300e7d41ddfd39ae3e130fde8d1d654cfd099
Author: Cliff Jansen <cl...@apache.org>
AuthorDate: Wed Dec 15 17:48:20 2021 -0800
PROTON-2362: epoll proactor: fix locking in listener overflow processing
---
c/src/proactor/epoll-internal.h | 1 +
c/src/proactor/epoll.c | 17 ++++++++++++-----
2 files changed, 13 insertions(+), 5 deletions(-)
diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h
index 155277b..f0afc9e 100644
--- a/c/src/proactor/epoll-internal.h
+++ b/c/src/proactor/epoll-internal.h
@@ -302,6 +302,7 @@ struct pn_listener_t {
size_t pending_count; /* number of pending accepted connections */
size_t backlog; /* size of pending accepted array */
bool close_dispatched;
+ int overflow_count;
uint32_t sched_io_events;
};
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index 2363f48..6a5beb4 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -715,6 +715,7 @@ static acceptor_t *acceptor_list_next(acceptor_t **start) {
// Add an overflowing acceptor to the overflow list. Called with listener task lock held.
static void acceptor_set_overflow(acceptor_t *a) {
a->overflowed = true;
+ a->listener->overflow_count++;
pn_proactor_t *p = a->listener->task.proactor;
lock(&p->overflow_mutex);
acceptor_list_append(&p->overflow, a);
@@ -740,6 +741,7 @@ static void proactor_rearm_overflow(pn_proactor_t *p) {
assert(!a->armed);
assert(a->overflowed);
a->overflowed = false;
+ l->overflow_count++;
if (rearming) {
rearm(p, &a->psocket.epoll_io);
a->armed = true;
@@ -1617,7 +1619,7 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in
// call with lock held and task.working false
static inline bool listener_can_free(pn_listener_t *l) {
- return l->task.closing && l->close_dispatched && !l->task.ready && !l->active_count;
+ return l->task.closing && l->close_dispatched && !l->task.ready && !l->active_count && !l->overflow_count;
}
static inline void listener_final_free(pn_listener_t *l) {
@@ -1675,10 +1677,6 @@ static void listener_begin_close(pn_listener_t* l) {
}
assert(!l->pending_count);
- unlock(&l->task.mutex);
- /* Remove all acceptors from the overflow list. closing flag prevents re-insertion.*/
- proactor_rearm_overflow(pn_listener_proactor(l));
- lock(&l->task.mutex);
pn_collector_put(l->collector, PN_CLASSCLASS(pn_listener), l, PN_LISTENER_CLOSE);
}
}
@@ -1799,8 +1797,17 @@ static pn_event_t *listener_batch_next(pn_event_batch_t *batch) {
static void listener_done(pn_listener_t *l) {
pn_proactor_t *p = l->task.proactor;
tslot_t *ts = l->task.runner;
+
lock(&l->task.mutex);
+ if (l->close_dispatched && l->overflow_count) {
+ unlock(&l->task.mutex);
+ /* Remove all acceptors from the overflow list. closing flag prevents re-insertion.*/
+ proactor_rearm_overflow(pn_listener_proactor(l));
+ lock(&l->task.mutex);
+ assert(l->overflow_count == 0);
+ }
+
// Just in case the app didn't accept all the pending accepts
// Shuffle the list back to start at 0
memmove(&l->pending_accepteds[0], &l->pending_accepteds[l->pending_first], l->pending_count * sizeof(accepted_t));
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org