You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/06/15 21:53:21 UTC
qpid-proton git commit: PROTON-1504: epoll proactor: no
PN_LISTENER_ACCEPT events if no FDs
Repository: qpid-proton
Updated Branches:
refs/heads/master 48e75e304 -> e504ce12f
PROTON-1504: epoll proactor: no PN_LISTENER_ACCEPT events if no FDs
Epoll proactor now generates PN_LISTENER_ACCEPT events only if a socket accept()
succeeds. Simplifies the code and makes it consistent with the libuv proactor.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/e504ce12
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/e504ce12
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/e504ce12
Branch: refs/heads/master
Commit: e504ce12f148842e43c8584ff96baf2e984e4d10
Parents: 48e75e3
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Jun 15 17:28:32 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Jun 15 17:38:31 2017 -0400
----------------------------------------------------------------------
proton-c/src/proactor/epoll.c | 72 ++++++++++++++++++--------------------
1 file changed, 34 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e504ce12/proton-c/src/proactor/epoll.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/epoll.c b/proton-c/src/proactor/epoll.c
index c4d0c73..a173086 100644
--- a/proton-c/src/proactor/epoll.c
+++ b/proton-c/src/proactor/epoll.c
@@ -528,7 +528,8 @@ struct pn_listener_t {
pn_record_t *attachments;
void *listener_context;
size_t backlog;
- psocket_t *acceptable, *accepted;
+ int accepted_fd; /* fd accepted but not yet handled by pn_listener_accept() */
+ psocket_t *accepted; /* psocket from which we accpeted accepted_fd */
bool close_dispatched;
bool armed;
pn_listener_t *overflow; /* Next overflowed listener */
@@ -710,7 +711,7 @@ static pconnection_t *new_pconnection_t(pn_proactor_t *p, pn_connection_t *c, bo
pc->read_blocked = true;
pc->write_blocked = true;
pc->disconnected = false;
- pc->hog_count = 0;;
+ pc->hog_count = 0;
pc->batch.next_event = pconnection_batch_next;
if (server) {
@@ -1269,6 +1270,8 @@ pn_listener_t *pn_event_listener(pn_event_t *e) {
pn_listener_t *pn_listener() {
pn_listener_t *l = (pn_listener_t*)calloc(1, sizeof(pn_listener_t));
if (l) {
+ l->accepted_fd = -1;
+ l->accepted = NULL;
l->batch.next_event = listener_batch_next;
l->collector = pn_collector();
l->condition = pn_condition();
@@ -1394,7 +1397,6 @@ static void listener_begin_close(pn_listener_t* l) {
}
}
pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_CLOSE);
- l->acceptable = l->accepted = NULL;
}
}
@@ -1419,6 +1421,24 @@ static void listener_forced_shutdown(pn_listener_t *l) {
pn_listener_free(l);
}
+/* Accept a connection as part of listener_process(). Called with listener context lock held. */
+static void listener_accept_lh(psocket_t *ps) {
+ pn_listener_t *l = psocket_listener(ps);
+ assert(l->accepted_fd < 0); /* Shouldn't already have an accepted_fd */
+ l->accepted_fd = accept(ps->sockfd, NULL, 0);
+ l->accepted = ps;
+ if (l->accepted_fd >= 0) {
+ pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_ACCEPT);
+ } else {
+ int err = errno;
+ if (err == ENFILE || err == EMFILE) {
+ listener_set_overflow(l);
+ } else {
+ psocket_error(ps, err, "accept");
+ }
+ }
+}
+
/* Process a listening socket */
static pn_event_batch_t *listener_process(psocket_t *ps, uint32_t events) {
// TODO: some parallelization of the accept mechanism.
@@ -1430,8 +1450,7 @@ static pn_event_batch_t *listener_process(psocket_t *ps, uint32_t events) {
/* Calls listener_begin_close which closes all the listener's sockets */
psocket_error(ps, errno, "listener epoll");
} else if (!l->context.closing && events & EPOLLIN) {
- l->acceptable = ps;
- pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_ACCEPT);
+ listener_accept_lh(ps);
}
} else {
wake_done(&l->context); // callback accounting
@@ -1472,7 +1491,7 @@ static void listener_done(pn_listener_t *l) {
} else if (listener_has_event(l)) {
notify = wake(&l->context);
} else if (l->overflow == NO_OVERFLOW &&
- !l->context.closing && !l->armed && !l->acceptable && l->accepted)
+ !l->context.closing && !l->armed && l->accepted_fd < 0 && l->accepted)
{
/* Don't rearm until the current socket is accepted */
rearm(l->accepted->proactor, &l->accepted->epoll_io);
@@ -1507,42 +1526,19 @@ void pn_listener_accept(pn_listener_t *l, pn_connection_t *c) {
// TODO: fuller sanity check on input args
pconnection_t *pc = new_pconnection_t(l->psockets[0].proactor, c, true, "");
assert(pc); // TODO: memory safety
- int err = 0;
- int newfd = -1;
- bool need_done = false;
lock(&l->context.mutex);
+ int fd = l->accepted_fd;
+ l->accepted_fd = -1;
proactor_add(&pc->context);
- if (l->context.closing) {
- err = EBADF;
- } else if (l->acceptable == 0) {
- err = EAGAIN;
- } else {
- l->accepted = l->acceptable;
- l->acceptable = 0;
- newfd = accept(l->accepted->sockfd, NULL, 0);
- if (newfd < 0) err = errno;
- }
- if (err) {
- lock(&pc->context.mutex);
- psocket_error(&pc->psocket, err, "accepting from"); /* Always signal error on the connection */
- pconnection_begin_close(pc);
- need_done = true;
- unlock(&pc->context.mutex);
- if (err == EMFILE || err == ENFILE) { /* Out of FDs does not close the listener */
- listener_set_overflow(l);
- } else {
- psocket_error(l->accepted, err, "accepting from");
- }
- } else { /* No errors */
- lock(&pc->context.mutex);
- configure_socket(newfd);
- pc->psocket.sockfd = newfd;
- pconnection_start(pc);
- unlock(&pc->context.mutex);
- }
+
+ lock(&pc->context.mutex);
+ configure_socket(fd);
+ pc->psocket.sockfd = fd;
+ pconnection_start(pc);
+ unlock(&pc->context.mutex);
+
unlock(&l->context.mutex);
- if (need_done) pconnection_done(pc);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org