You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cl...@apache.org on 2017/11/29 07:55:15 UTC
qpid-proton git commit: PROTON-1702: epoll proactor: per socket
rearming and overflow for listeners
Repository: qpid-proton
Updated Branches:
refs/heads/master 27c5bf0b7 -> a7119f56c
PROTON-1702: epoll proactor: per socket rearming and overflow for listeners
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/a7119f56
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/a7119f56
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/a7119f56
Branch: refs/heads/master
Commit: a7119f56cf29f1bb2be261951be1120c2243f29c
Parents: 27c5bf0
Author: Clifford Jansen <cl...@apache.org>
Authored: Tue Nov 28 23:54:34 2017 -0800
Committer: Clifford Jansen <cl...@apache.org>
Committed: Tue Nov 28 23:54:34 2017 -0800
----------------------------------------------------------------------
proton-c/src/proactor/epoll.c | 247 ++++++++++++++++++++++++++-----------
1 file changed, 177 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a7119f56/proton-c/src/proactor/epoll.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/epoll.c b/proton-c/src/proactor/epoll.c
index 69e6d0f..080cd4b 100644
--- a/proton-c/src/proactor/epoll.c
+++ b/proton-c/src/proactor/epoll.c
@@ -112,6 +112,8 @@ static void pmutex_finalize(pthread_mutex_t *m) { pthread_mutex_destroy(m); }
static inline void lock(pmutex *m) { pthread_mutex_lock(m); }
static inline void unlock(pmutex *m) { pthread_mutex_unlock(m); }
+typedef struct acceptor_t acceptor_t;
+
typedef enum {
WAKE, /* see if any work to do in proactor/psocket context */
PCONNECTION_IO,
@@ -407,8 +409,8 @@ struct pn_proactor_t {
pcontext_t *wake_list_last;
// Interrupts have a dedicated eventfd because they must be async-signal safe.
int interruptfd;
- // If the process runs out of file descriptors, disarm listeners temporarily and save them here.
- pn_listener_t *overflow;
+ // If the process runs out of file descriptors, disarm listening sockets temporarily and save them here.
+ acceptor_t *overflow;
pmutex overflow_mutex;
};
@@ -536,24 +538,40 @@ typedef struct pconnection_t {
pmutex rearm_mutex; /* protects pconnection_rearm from out of order arming*/
} pconnection_t;
+/*
+ * A listener can have mutiple sockets (as specified in the addrinfo). They
+ * are armed separately. The individual psockets can be part of at most one
+ * list: the global proactor overflow retry list or the per-listener list of
+ * pending accepts (valid inbound socket obtained, but pn_listener_accept not
+ * yet called by the application). These lists will be small and quick to
+ * traverse.
+ */
+
+struct acceptor_t{
+ psocket_t psocket;
+ int accepted_fd;
+ bool armed;
+ bool overflowed;
+ acceptor_t *next; /* next listener list member */
+};
+
struct pn_listener_t {
- psocket_t *psockets; /* Array of listening sockets */
- size_t psockets_size;
+ acceptor_t *acceptors; /* Array of listening sockets */
+ size_t acceptors_size;
pcontext_t context;
pn_condition_t *condition;
pn_collector_t *collector;
pn_event_batch_t batch;
pn_record_t *attachments;
void *listener_context;
+ acceptor_t *pending_acceptors; /* list of those with a valid inbound fd*/
+ int pending_count;
+ bool unclaimed; /* attach event dispatched but no pn_listener_attach() call yet */
size_t backlog;
- int accepted_fd; /* fd accepted but not yet handled by pn_listener_accept() */
- psocket_t *accepted; /* psocket from which we accpeted accepted_fd */
bool close_dispatched;
- bool armed;
- pn_listener_t *overflow; /* Next overflowed listener */
+ pmutex rearm_mutex; /* orders rearms/disarms, nothing else */
};
-
static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, bool timeout, bool topup);
static void write_flush(pconnection_t *pc);
static void listener_begin_close(pn_listener_t* l);
@@ -568,6 +586,10 @@ static inline pn_listener_t *psocket_listener(psocket_t* ps) {
return ps->listener;
}
+static inline acceptor_t *psocket_acceptor(psocket_t* ps) {
+ return !ps->listener ? NULL : (acceptor_t *)ps;
+}
+
static inline pconnection_t *pcontext_pconnection(pcontext_t *c) {
return c->type == PCONNECTION ?
(pconnection_t*)((char*)c - offsetof(pconnection_t, context)) : NULL;
@@ -602,7 +624,7 @@ static inline bool pconnection_has_event(pconnection_t *pc) {
}
static inline bool listener_has_event(pn_listener_t *l) {
- return pn_collector_peek(l->collector);
+ return pn_collector_peek(l->collector) || (l->pending_count && !l->unclaimed);
}
static inline bool proactor_has_event(pn_proactor_t *p) {
@@ -648,18 +670,33 @@ static void rearm(pn_proactor_t *p, epoll_extended_t *ee) {
EPOLL_FATAL("arming polled file descriptor", errno);
}
+static void listener_list_append(acceptor_t **start, acceptor_t *item) {
+ assert(item->next == NULL);
+ if (*start) {
+ acceptor_t *end = *start;
+ while (end->next)
+ end = end->next;
+ end->next = item;
+ }
+ else *start = item;
+}
+
+static acceptor_t *listener_list_next(acceptor_t **start) {
+ acceptor_t *item = *start;
+ if (*start) *start = (*start)->next;
+ if (item) item->next = NULL;
+ return item;
+}
+
// Add an overflowing listener to the overflow list. Called with listener context lock held.
-static void listener_set_overflow(pn_listener_t *l) {
- pn_proactor_t *p = l->psockets[0].proactor;
+static void listener_set_overflow(acceptor_t *a) {
+ a->overflowed = true;
+ pn_proactor_t *p = a->psocket.proactor;
lock(&p->overflow_mutex);
- l->overflow = p->overflow;
- p->overflow = l;
+ listener_list_append(&p->overflow, a);
unlock(&p->overflow_mutex);
}
-static const int dummy__ = 0;
-static pn_listener_t * const NO_OVERFLOW = (pn_listener_t*)&dummy__; /* Bogus pointer */
-
/* TODO aconway 2017-06-08: we should also call proactor_rearm_overflow after a fixed delay,
even if the proactor has not freed any file descriptors, since other parts of the process
might have*/
@@ -667,22 +704,34 @@ static pn_listener_t * const NO_OVERFLOW = (pn_listener_t*)&dummy__; /* Bogus po
// Activate overflowing listeners, called when there may be available file descriptors.
static void proactor_rearm_overflow(pn_proactor_t *p) {
lock(&p->overflow_mutex);
- pn_listener_t *l = p->overflow;
+ acceptor_t* ovflw = p->overflow;
p->overflow = NULL;
unlock(&p->overflow_mutex);
- while (l) {
+ acceptor_t *a = listener_list_next(&ovflw);
+ while (a) {
+ pn_listener_t *l = a->psocket.listener;
lock(&l->context.mutex);
- rearm(l->accepted->proactor, &l->accepted->epoll_io);
- l->armed = true;
- l->accepted = NULL;
- pn_listener_t *next = l->overflow;
- l->overflow = NO_OVERFLOW;
+ bool rearming = !l->context.closing;
+ bool notify = false;
+ assert(!a->armed);
+ assert(a->overflowed);
+ a->overflowed = false;
+ if (rearming) {
+ lock(&l->rearm_mutex);
+ a->armed = true;
+ }
+ else notify = wake(&l->context);
unlock(&l->context.mutex);
- l = next;
+ if (rearming) {
+ rearm(p, &a->psocket.epoll_io);
+ unlock(&l->rearm_mutex);
+ }
+ if (notify) wake_notify(&l->context);
+ a = listener_list_next(&ovflw);
}
}
-// Close an FD and rearm overflow listeners
+// Close an FD and rearm overflow listeners. Call with no listener locks held.
static int pclosefd(pn_proactor_t *p, int fd) {
int err = close(fd);
if (!err) proactor_rearm_overflow(p);
@@ -1338,8 +1387,6 @@ pn_listener_t *pn_event_listener(pn_event_t *e) {
pn_listener_t *pn_listener() {
pn_listener_t *l = (pn_listener_t*)calloc(1, sizeof(pn_listener_t));
if (l) {
- l->accepted_fd = -1;
- l->accepted = NULL;
l->batch.next_event = listener_batch_next;
l->collector = pn_collector();
l->condition = pn_condition();
@@ -1350,6 +1397,7 @@ pn_listener_t *pn_listener() {
}
pn_proactor_t *unknown = NULL; // won't know until pn_proactor_listen
pcontext_init(&l->context, LISTENER, unknown, l);
+ pmutex_init(&l->rearm_mutex);
}
return l;
}
@@ -1360,7 +1408,6 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in
lock(&l->context.mutex);
l->context.proactor = p;;
l->backlog = backlog;
- l->overflow = NO_OVERFLOW;
char addr_buf[PN_MAX_ADDR];
const char *host, *port;
@@ -1375,9 +1422,9 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in
++len;
}
assert(len > 0); /* guaranteed by getaddrinfo */
- l->psockets = (psocket_t*)calloc(len, sizeof(psocket_t));
- assert(l->psockets); /* TODO aconway 2017-05-05: memory safety */
- l->psockets_size = 0;
+ l->acceptors = (acceptor_t*)calloc(len, sizeof(acceptor_t));
+ assert(l->acceptors); /* TODO aconway 2017-05-05: memory safety */
+ l->acceptors_size = 0;
/* Find working listen addresses */
for (struct addrinfo *ai = addrinfo; ai; ai = ai->ai_next) {
int fd = socket(ai->ai_family, SOCK_STREAM, ai->ai_protocol);
@@ -1390,13 +1437,18 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in
!bind(fd, ai->ai_addr, ai->ai_addrlen) &&
!listen(fd, backlog))
{
- psocket_t *ps = &l->psockets[l->psockets_size++];
+ acceptor_t *acceptor = &l->acceptors[l->acceptors_size++];
+ acceptor->accepted_fd = -1;
+ psocket_t *ps = &acceptor->psocket;
psocket_init(ps, p, l, addr);
ps->sockfd = fd;
ps->epoll_io.fd = fd;
ps->epoll_io.wanted = EPOLLIN;
ps->epoll_io.polling = false;
+ lock(&l->rearm_mutex);
start_polling(&ps->epoll_io, ps->proactor->epollfd); // TODO: check for error
+ acceptor->armed = true;
+ unlock(&l->rearm_mutex);
} else {
close(fd);
}
@@ -1408,14 +1460,16 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in
}
bool notify = wake(&l->context);
- if (l->psockets_size == 0) { /* All failed, create dummy socket with an error */
- l->psockets = (psocket_t*)realloc(l->psockets, sizeof(psocket_t));
- memset(l->psockets, 0, sizeof(psocket_t));
- psocket_init(l->psockets, p, l, addr);
+ if (l->acceptors_size == 0) { /* All failed, create dummy socket with an error */
+ l->acceptors = (acceptor_t*)realloc(l->acceptors, sizeof(acceptor_t));
+ l->acceptors_size = 1;
+ memset(l->acceptors, 0, sizeof(acceptor_t));
+ psocket_init(&l->acceptors[0].psocket, p, l, addr);
+ l->acceptors[0].accepted_fd = -1;
if (gai_err) {
- psocket_gai_error(l->psockets, gai_err, "listen on");
+ psocket_gai_error(&l->acceptors[0].psocket, gai_err, "listen on");
} else {
- psocket_error(l->psockets, errno, "listen on");
+ psocket_error(&l->acceptors[0].psocket, errno, "listen on");
}
} else {
pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_OPEN);
@@ -1433,7 +1487,8 @@ static inline bool listener_can_free(pn_listener_t *l) {
static inline void listener_final_free(pn_listener_t *l) {
pcontext_finalize(&l->context);
- free(l->psockets);
+ pmutex_finalize(&l->rearm_mutex);
+ free(l->acceptors);
free(l);
}
@@ -1456,18 +1511,40 @@ void pn_listener_free(pn_listener_t *l) {
}
}
+/* Always call with lock held so it can be unlocked around overflow processing. */
static void listener_begin_close(pn_listener_t* l) {
// TODO: switch to shutdown(sock, SHUT_RD) and wait for HUP callback per listener socket (analogous to pconnection)
+ int close_count = 0;
if (!l->context.closing) {
l->context.closing = true;
+
/* Close all listening sockets */
- for (size_t i = 0; i < l->psockets_size; ++i) {
- psocket_t *ps = &l->psockets[i];
+ for (size_t i = 0; i < l->acceptors_size; ++i) {
+ psocket_t *ps = &l->acceptors[i].psocket;
if (ps->sockfd >= 0) {
- stop_polling(&ps->epoll_io, ps->proactor->epollfd);
- pclosefd(l->psockets[0].proactor, ps->sockfd);
+ lock(&l->rearm_mutex);
+ stop_polling(&ps->epoll_io, ps->proactor->epollfd); // race: PROTON-1531
+ unlock(&l->rearm_mutex);
+ close(ps->sockfd);
+ ps->sockfd = -1;
+ close_count++;
}
}
+ /* Close all sockets waiting for a pn_listener_accept() */
+ if (l->unclaimed) l->pending_count++;
+ acceptor_t *a = listener_list_next(&l->pending_acceptors);
+ while (a) {
+ close(a->accepted_fd);
+ a->accepted_fd = -1;
+ close_count++;
+ l->pending_count--;
+ a = listener_list_next(&l->pending_acceptors);
+ }
+ assert(!l->pending_count);
+
+ unlock(&l->context.mutex);
+ proactor_rearm_overflow(pn_listener_proactor(l));
+ lock(&l->context.mutex);
pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_CLOSE);
}
}
@@ -1485,7 +1562,9 @@ void pn_listener_close(pn_listener_t* l) {
static void listener_forced_shutdown(pn_listener_t *l) {
// Called by proactor_free, no competing threads, no epoll activity.
+ lock(&l->context.mutex); // needed because of interaction with proactor_rearm_overflow
listener_begin_close(l);
+ unlock(&l->context.mutex);
// pconnection_process will never be called again. Zero everything.
l->context.wake_ops = 0;
l->close_dispatched = true;
@@ -1496,15 +1575,17 @@ static void listener_forced_shutdown(pn_listener_t *l) {
/* Accept a connection as part of listener_process(). Called with listener context lock held. */
static void listener_accept_lh(psocket_t *ps) {
pn_listener_t *l = psocket_listener(ps);
- assert(l->accepted_fd < 0); /* Shouldn't already have an accepted_fd */
- l->accepted_fd = accept(ps->sockfd, NULL, 0);
- l->accepted = ps;
- if (l->accepted_fd >= 0) {
- pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_ACCEPT);
+ acceptor_t *acceptor = psocket_acceptor(ps);
+ assert(acceptor->accepted_fd < 0); /* Shouldn't already have an accepted_fd */
+ acceptor->accepted_fd = accept(ps->sockfd, NULL, 0);
+ if (acceptor->accepted_fd >= 0) {
+ // acceptor_t *acceptor = listener_list_next(pending_acceptors);
+ listener_list_append(&l->pending_acceptors, acceptor);
+ l->pending_count++;
} else {
int err = errno;
if (err == ENFILE || err == EMFILE) {
- listener_set_overflow(l);
+ listener_set_overflow(acceptor);
} else {
psocket_error(ps, err, "accept");
}
@@ -1515,9 +1596,10 @@ static void listener_accept_lh(psocket_t *ps) {
static pn_event_batch_t *listener_process(psocket_t *ps, uint32_t events) {
// TODO: some parallelization of the accept mechanism.
pn_listener_t *l = psocket_listener(ps);
+ acceptor_t *a = psocket_acceptor(ps);
lock(&l->context.mutex);
if (events) {
- l->armed = false; // TODO: armed logic should be per socket not per aggregate listener
+ a->armed = false;
if (events & EPOLLRDHUP) {
/* Calls listener_begin_close which closes all the listener's sockets */
psocket_error(ps, errno, "listener epoll");
@@ -1543,6 +1625,13 @@ static pn_event_t *listener_batch_next(pn_event_batch_t *batch) {
pn_listener_t *l = batch_listener(batch);
lock(&l->context.mutex);
pn_event_t *e = pn_collector_next(l->collector);
+ if (!e && l->pending_count && !l->unclaimed) {
+ // empty collector means pn_collector_put() will not coallesce
+ pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_ACCEPT);
+ l->unclaimed = true;
+ l->pending_count--;
+ e = pn_collector_next(l->collector);
+ }
if (e && pn_event_type(e) == PN_LISTENER_CLOSE)
l->close_dispatched = true;
unlock(&l->context.mutex);
@@ -1560,22 +1649,14 @@ static void listener_done(pn_listener_t *l) {
pn_listener_free(l);
return;
}
- } else if (listener_has_event(l)) {
+ } else if (listener_has_event(l))
notify = wake(&l->context);
- } else if (l->overflow == NO_OVERFLOW &&
- !l->context.closing && !l->armed && l->accepted_fd < 0 && l->accepted)
- {
- /* Don't rearm until the current socket is accepted */
- rearm(l->accepted->proactor, &l->accepted->epoll_io);
- l->armed = true;
- l->accepted = NULL;
- }
unlock(&l->context.mutex);
if (notify) wake_notify(&l->context);
}
pn_proactor_t *pn_listener_proactor(pn_listener_t* l) {
- return l ? l->psockets[0].proactor : NULL;
+ return l ? l->acceptors[0].psocket.proactor : NULL;
}
pn_condition_t* pn_listener_condition(pn_listener_t* l) {
@@ -1597,25 +1678,51 @@ pn_record_t *pn_listener_attachments(pn_listener_t *l) {
void pn_listener_accept(pn_listener_t *l, pn_connection_t *c) {
pconnection_t *pc = (pconnection_t*) pn_class_new(&pconnection_class, sizeof(pconnection_t));
assert(pc); // TODO: memory safety
- const char *err = pconnection_setup(pc, l->psockets[0].proactor, c, true, "");
+ const char *err = pconnection_setup(pc, pn_listener_proactor(l), c, true, "");
if (err) {
pn_logf("pn_listener_accept failure: %s", err);
return;
}
// TODO: fuller sanity check on input args
+ int err2 = 0;
+ int fd = -1;
+ psocket_t *rearming_ps = NULL;
+ bool notify = false;
lock(&l->context.mutex);
- int fd = l->accepted_fd;
- l->accepted_fd = -1;
- proactor_add(&pc->context);
+ if (l->context.closing)
+ err2 = EBADF;
+ else if (l->unclaimed) {
+ l->unclaimed = false;
+ acceptor_t *a = listener_list_next(&l->pending_acceptors);
+ assert(a);
+ assert(!a->armed);
+ fd = a->accepted_fd;
+ a->accepted_fd = -1;
+ lock(&l->rearm_mutex);
+ rearming_ps = &a->psocket;
+ a->armed = true;
+ }
+ else err2 = EWOULDBLOCK;
+ proactor_add(&pc->context);
lock(&pc->context.mutex);
- configure_socket(fd);
pc->psocket.sockfd = fd;
- pconnection_start(pc);
+ if (fd >= 0) {
+ configure_socket(fd);
+ pconnection_start(pc);
+ }
+ else
+ psocket_error(&pc->psocket, err2, "pn_listener_accept");
+ if (!l->context.working && listener_has_event(l))
+ notify = wake(&l->context);
unlock(&pc->context.mutex);
-
unlock(&l->context.mutex);
+ if (rearming_ps) {
+ rearm(rearming_ps->proactor, &rearming_ps->epoll_io);
+ unlock(&l->rearm_mutex);
+ }
+ if (notify) wake_notify(&l->context);
}
@@ -1701,7 +1808,7 @@ void pn_proactor_free(pn_proactor_t *p) {
pn_proactor_t *pn_event_proactor(pn_event_t *e) {
if (pn_event_class(e) == pn_proactor__class()) return (pn_proactor_t*)pn_event_context(e);
pn_listener_t *l = pn_event_listener(e);
- if (l) return l->psockets[0].proactor;
+ if (l) return l->acceptors[0].psocket.proactor;
pn_connection_t *c = pn_event_connection(e);
if (c) return pn_connection_proactor(c);
return NULL;
@@ -1835,7 +1942,7 @@ static pn_event_batch_t *process_inbound_wake(pn_proactor_t *p, epoll_extended_t
case PCONNECTION:
return pconnection_process((pconnection_t *) ctx->owner, 0, false, false);
case LISTENER:
- return listener_process(&((pn_listener_t *) ctx->owner)->psockets[0], 0);
+ return listener_process(&((pn_listener_t *) ctx->owner)->acceptors[0].psocket, 0);
default:
assert(ctx->type == WAKEABLE); // TODO: implement or remove
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org