You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cl...@apache.org on 2020/05/01 05:47:58 UTC
[qpid-proton] branch master updated: PROTON-2203: fix duplicate
listener socket rearming and rationalize rearming locking
This is an automated email from the ASF dual-hosted git repository.
cliffjansen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
The following commit(s) were added to refs/heads/master by this push:
new 9f1e5f0 PROTON-2203: fix duplicate listener socket rearming and rationalize rearming locking
9f1e5f0 is described below
commit 9f1e5f0cd9cd1c146cee94ca4d6ff6ed4b71c139
Author: Cliff Jansen <cl...@apache.org>
AuthorDate: Thu Apr 30 22:47:17 2020 -0700
PROTON-2203: fix duplicate listener socket rearming and rationalize rearming locking
---
c/src/proactor/epoll-internal.h | 1 -
c/src/proactor/epoll.c | 40 +++++++++++++---------------------------
2 files changed, 13 insertions(+), 28 deletions(-)
diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h
index 20b01ac..fd02817 100644
--- a/c/src/proactor/epoll-internal.h
+++ b/c/src/proactor/epoll-internal.h
@@ -276,7 +276,6 @@ struct pn_listener_t {
size_t pending_count; /* number of pending accepted connections */
size_t backlog; /* size of pending accepted array */
bool close_dispatched;
- pmutex rearm_mutex; /* orders rearms/disarms, nothing else */
uint32_t sched_io_events;
};
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index 76b5c37..6667365 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -848,15 +848,11 @@ static void proactor_rearm_overflow(pn_proactor_t *p) {
assert(a->overflowed);
a->overflowed = false;
if (rearming) {
- lock(&l->rearm_mutex);
+ rearm(p, &a->psocket.epoll_io);
a->armed = true;
}
else notify = wake(&l->context);
unlock(&l->context.mutex);
- if (rearming) {
- rearm(p, &a->psocket.epoll_io);
- unlock(&l->rearm_mutex);
- }
if (notify) wake_notify(&l->context);
a = acceptor_list_next(&ovflw);
}
@@ -1642,7 +1638,6 @@ pn_listener_t *pn_listener() {
}
pn_proactor_t *unknown = NULL; // won't know until pn_proactor_listen
pcontext_init(&l->context, LISTENER, unknown);
- pmutex_init(&l->rearm_mutex);
}
return l;
}
@@ -1702,11 +1697,9 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in
ps->epoll_io.fd = fd;
ps->epoll_io.wanted = EPOLLIN;
ps->epoll_io.polling = false;
- lock(&l->rearm_mutex);
start_polling(&ps->epoll_io, ps->proactor->epollfd); // TODO: check for error
l->active_count++;
acceptor->armed = true;
- unlock(&l->rearm_mutex);
} else {
close(fd);
}
@@ -1745,7 +1738,6 @@ static inline bool listener_can_free(pn_listener_t *l) {
static inline void listener_final_free(pn_listener_t *l) {
pcontext_finalize(&l->context);
- pmutex_finalize(&l->rearm_mutex);
free(l->acceptors);
free(l->pending_accepteds);
free(l);
@@ -1780,7 +1772,6 @@ static void listener_begin_close(pn_listener_t* l) {
acceptor_t *a = &l->acceptors[i];
psocket_t *ps = &a->psocket;
if (ps->epoll_io.fd >= 0) {
- lock(&l->rearm_mutex);
if (a->armed) {
shutdown(ps->epoll_io.fd, SHUT_RD); // Force epoll event and callback
} else {
@@ -1789,7 +1780,6 @@ static void listener_begin_close(pn_listener_t* l) {
ps->epoll_io.fd = -1;
l->active_count--;
}
- unlock(&l->rearm_mutex);
}
}
/* Close all sockets waiting for a pn_listener_accept2() */
@@ -1869,17 +1859,13 @@ static pn_event_batch_t *listener_process(pn_listener_t *l, int n_events, bool w
uint32_t events = ps->working_io_events;
ps->working_io_events = 0;
if (l->context.closing) {
- lock(&l->rearm_mutex);
l->acceptors[i].armed = false;
stop_polling(&ps->epoll_io, ps->proactor->epollfd);
- unlock(&l->rearm_mutex);
close(ps->epoll_io.fd);
ps->epoll_io.fd = -1;
l->active_count--;
} else {
- lock(&l->rearm_mutex);
l->acceptors[i].armed = false;
- unlock(&l->rearm_mutex);
if (events & EPOLLRDHUP) {
/* Calls listener_begin_close which closes all the listener's sockets */
psocket_error(ps, errno, "listener epoll");
@@ -1929,29 +1915,29 @@ static pn_event_t *listener_batch_next(pn_event_batch_t *batch) {
static void listener_done(pn_listener_t *l) {
pn_proactor_t *p = l->context.proactor;
tslot_t *ts = l->context.runner;
+ lock(&l->context.mutex);
// Just in case the app didn't accept all the pending accepts
// Shuffle the list back to start at 0
memmove(&l->pending_accepteds[0], &l->pending_accepteds[l->pending_first], l->pending_count * sizeof(accepted_t));
l->pending_first = 0;
- for (size_t i = 0; i < l->acceptors_size; i++) {
- acceptor_t *a = &l->acceptors[i];
- psocket_t *ps = &a->psocket;
-
- // Rearm acceptor when appropriate
- if (ps->epoll_io.polling && l->pending_count==0) {
- lock(&l->rearm_mutex);
- if (!a->armed) {
- rearm(ps->proactor, &ps->epoll_io);
- a->armed = true;
+ if (!l->context.closing) {
+ for (size_t i = 0; i < l->acceptors_size; i++) {
+ acceptor_t *a = &l->acceptors[i];
+ psocket_t *ps = &a->psocket;
+
+ // Rearm acceptor when appropriate
+ if (ps->epoll_io.polling && l->pending_count==0 && !a->overflowed) {
+ if (!a->armed) {
+ rearm(ps->proactor, &ps->epoll_io);
+ a->armed = true;
+ }
}
- unlock(&l->rearm_mutex);
}
}
bool notify = false;
- lock(&l->context.mutex);
l->context.working = false;
lock(&p->sched_mutex);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org