You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cl...@apache.org on 2017/11/29 19:53:23 UTC
qpid-proton git commit: PROTON-1531: epoll proactor: use shutdown to
avoid race on close
Repository: qpid-proton
Updated Branches:
refs/heads/master a7119f56c -> 584d3afc3
PROTON-1531: epoll proactor: use shutdown to avoid race on close
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/584d3afc
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/584d3afc
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/584d3afc
Branch: refs/heads/master
Commit: 584d3afc32ffa9a3e92b0287da7c90e390265a9d
Parents: a7119f5
Author: Clifford Jansen <cl...@apache.org>
Authored: Wed Nov 29 11:52:02 2017 -0800
Committer: Clifford Jansen <cl...@apache.org>
Committed: Wed Nov 29 11:52:02 2017 -0800
----------------------------------------------------------------------
proton-c/src/proactor/epoll.c | 66 +++++++++++++++++++++++++-------------
1 file changed, 43 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/584d3afc/proton-c/src/proactor/epoll.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/epoll.c b/proton-c/src/proactor/epoll.c
index 080cd4b..1fb0c9e 100644
--- a/proton-c/src/proactor/epoll.c
+++ b/proton-c/src/proactor/epoll.c
@@ -558,6 +558,7 @@ struct acceptor_t{
struct pn_listener_t {
acceptor_t *acceptors; /* Array of listening sockets */
size_t acceptors_size;
+ int active_count; /* Number of listener sockets registered with epoll */
pcontext_t context;
pn_condition_t *condition;
pn_collector_t *collector;
@@ -1447,6 +1448,7 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in
ps->epoll_io.polling = false;
lock(&l->rearm_mutex);
start_polling(&ps->epoll_io, ps->proactor->epollfd); // TODO: check for error
+ l->active_count++;
acceptor->armed = true;
unlock(&l->rearm_mutex);
} else {
@@ -1480,9 +1482,9 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in
return;
}
-// call with lock held
+// call with lock held and context.working false
static inline bool listener_can_free(pn_listener_t *l) {
- return l->context.closing && l->close_dispatched && !l->context.wake_ops;
+ return l->context.closing && l->close_dispatched && !l->context.wake_ops && !l->active_count;
}
static inline void listener_final_free(pn_listener_t *l) {
@@ -1513,21 +1515,24 @@ void pn_listener_free(pn_listener_t *l) {
/* Always call with lock held so it can be unlocked around overflow processing. */
static void listener_begin_close(pn_listener_t* l) {
- // TODO: switch to shutdown(sock, SHUT_RD) and wait for HUP callback per listener socket (analogous to pconnection)
- int close_count = 0;
if (!l->context.closing) {
l->context.closing = true;
/* Close all listening sockets */
for (size_t i = 0; i < l->acceptors_size; ++i) {
- psocket_t *ps = &l->acceptors[i].psocket;
+ acceptor_t *a = &l->acceptors[i];
+ psocket_t *ps = &a->psocket;
if (ps->sockfd >= 0) {
lock(&l->rearm_mutex);
- stop_polling(&ps->epoll_io, ps->proactor->epollfd); // race: PROTON-1531
+ if (a->armed) {
+ shutdown(ps->sockfd, SHUT_RD); // Force epoll event and callback
+ } else {
+ stop_polling(&ps->epoll_io, ps->proactor->epollfd);
+ close(ps->sockfd);
+ ps->sockfd = -1;
+ l->active_count--;
+ }
unlock(&l->rearm_mutex);
- close(ps->sockfd);
- ps->sockfd = -1;
- close_count++;
}
}
/* Close all sockets waiting for a pn_listener_accept() */
@@ -1536,13 +1541,13 @@ static void listener_begin_close(pn_listener_t* l) {
while (a) {
close(a->accepted_fd);
a->accepted_fd = -1;
- close_count++;
l->pending_count--;
a = listener_list_next(&l->pending_acceptors);
}
assert(!l->pending_count);
unlock(&l->context.mutex);
+ /* Remove all acceptors from the overflow list. closing flag prevents re-insertion.*/
proactor_rearm_overflow(pn_listener_proactor(l));
lock(&l->context.mutex);
pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_CLOSE);
@@ -1568,6 +1573,7 @@ static void listener_forced_shutdown(pn_listener_t *l) {
// pconnection_process will never be called again. Zero everything.
l->context.wake_ops = 0;
l->close_dispatched = true;
+ l->active_count = 0;
assert(listener_can_free(l));
pn_listener_free(l);
}
@@ -1600,11 +1606,21 @@ static pn_event_batch_t *listener_process(psocket_t *ps, uint32_t events) {
lock(&l->context.mutex);
if (events) {
a->armed = false;
- if (events & EPOLLRDHUP) {
- /* Calls listener_begin_close which closes all the listener's sockets */
- psocket_error(ps, errno, "listener epoll");
- } else if (!l->context.closing && events & EPOLLIN) {
- listener_accept_lh(ps);
+ if (l->context.closing) {
+ lock(&l->rearm_mutex);
+ stop_polling(&ps->epoll_io, ps->proactor->epollfd);
+ unlock(&l->rearm_mutex);
+ close(ps->sockfd);
+ ps->sockfd = -1;
+ l->active_count--;
+ }
+ else {
+ if (events & EPOLLRDHUP) {
+ /* Calls listener_begin_close which closes all the listener's sockets */
+ psocket_error(ps, errno, "listener epoll");
+ } else if (!l->context.closing && events & EPOLLIN) {
+ listener_accept_lh(ps);
+ }
}
} else {
wake_done(&l->context); // callback accounting
@@ -1614,8 +1630,14 @@ static pn_event_batch_t *listener_process(psocket_t *ps, uint32_t events) {
l->context.working = true;
if (listener_has_event(l))
lb = &l->batch;
- else
+ else {
l->context.working = false;
+ if (listener_can_free(l)) {
+ unlock(&l->context.mutex);
+ pn_listener_free(l);
+ return NULL;
+ }
+ }
}
unlock(&l->context.mutex);
return lb;
@@ -1626,7 +1648,7 @@ static pn_event_t *listener_batch_next(pn_event_batch_t *batch) {
lock(&l->context.mutex);
pn_event_t *e = pn_collector_next(l->collector);
if (!e && l->pending_count && !l->unclaimed) {
- // empty collector means pn_collector_put() will not coallesce
+ // empty collector means pn_collector_put() will not coalesce
pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_ACCEPT);
l->unclaimed = true;
l->pending_count--;
@@ -1643,12 +1665,10 @@ static void listener_done(pn_listener_t *l) {
lock(&l->context.mutex);
l->context.working = false;
- if (l->close_dispatched) {
- if (listener_can_free(l)) {
- unlock(&l->context.mutex);
- pn_listener_free(l);
- return;
- }
+ if (listener_can_free(l)) {
+ unlock(&l->context.mutex);
+ pn_listener_free(l);
+ return;
} else if (listener_has_event(l))
notify = wake(&l->context);
unlock(&l->context.mutex);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org