You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/05/05 21:52:07 UTC
[1/2] qpid-proton git commit: PROTON-1463: Fix incorrect address
handling.
Repository: qpid-proton
Updated Branches:
refs/heads/master ed9532e2a -> e1d1f2f40
PROTON-1463: Fix incorrect address handling.
Fix to previous commit
a61f50e * PROTON-1463: Golang AMQP URL incompatable with Qpid Cpp Broker.had
Use of string.TrimPrefix was incorrect.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/fa5b36dc
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/fa5b36dc
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/fa5b36dc
Branch: refs/heads/master
Commit: fa5b36dc739d9cad595b9c50099305ffdd50a884
Parents: ed9532e
Author: Alan Conway <ac...@redhat.com>
Authored: Fri May 5 14:19:40 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Fri May 5 14:19:40 2017 -0400
----------------------------------------------------------------------
examples/go/electron/receive.go | 2 +-
examples/go/electron/send.go | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/fa5b36dc/examples/go/electron/receive.go
----------------------------------------------------------------------
diff --git a/examples/go/electron/receive.go b/examples/go/electron/receive.go
index c2ba1d1..7229b07 100644
--- a/examples/go/electron/receive.go
+++ b/examples/go/electron/receive.go
@@ -76,7 +76,7 @@ func main() {
c, err := container.Dial("tcp", url.Host)
fatalIf(err)
connections <- c // Save connection so we can Close() when main() ends
- addr := strings.TrimPrefix("/", url.Path)
+ addr := strings.TrimPrefix(url.Path, "/")
r, err := c.Receiver(electron.Source(addr))
fatalIf(err)
// Loop receiving messages and sending them to the main() goroutine
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/fa5b36dc/examples/go/electron/send.go
----------------------------------------------------------------------
diff --git a/examples/go/electron/send.go b/examples/go/electron/send.go
index 033abab..2ba99c8 100644
--- a/examples/go/electron/send.go
+++ b/examples/go/electron/send.go
@@ -75,7 +75,7 @@ func main() {
c, err := container.Dial("tcp", url.Host)
fatalIf(err)
connections <- c // Save connection so we can Close() when main() ends
- addr := strings.TrimPrefix("/", url.Path)
+ addr := strings.TrimPrefix(url.Path, "/")
s, err := c.Sender(electron.Target(addr))
fatalIf(err)
// Loop sending messages.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/2] qpid-proton git commit: PROTON-1460: epoll multi-address listen
and connect.
Posted by ac...@apache.org.
PROTON-1460: epoll multi-address listen and connect.
epoll proactor:
pn_listener_t listens to every working address in the getaddrinfo list.
It holds one psocket_t per listening socket but has a single pcontext_t.
Returns one accept event at a time.
pn_proactor_connect tries each address in the getaddrinfo list until one works.
No further attempts are made after the first successful connection.
Modified ipv4/v6 flags and address parsing to follow the behavior documented for
pn_proactor_listen and pn_proactor_connect.
Modified proactor to deal directly with contexts not sockets, moved "closing"
flag from socket to context. Previously there was a 1-1 relationship, but now a
listener context has >1 socket.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/e1d1f2f4
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/e1d1f2f4
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/e1d1f2f4
Branch: refs/heads/master
Commit: e1d1f2f4041dcf2b56be0aab0529f21e0dbf8c19
Parents: fa5b36d
Author: Alan Conway <ac...@redhat.com>
Authored: Fri May 5 17:40:30 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Fri May 5 17:50:38 2017 -0400
----------------------------------------------------------------------
proton-c/include/proton/proactor.h | 4 +-
proton-c/src/proactor/epoll.c | 470 ++++++++++++++++++--------------
2 files changed, 273 insertions(+), 201 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e1d1f2f4/proton-c/include/proton/proactor.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/proactor.h b/proton-c/include/proton/proactor.h
index 4a3b3c7..861afbe 100644
--- a/proton-c/include/proton/proactor.h
+++ b/proton-c/include/proton/proactor.h
@@ -107,7 +107,7 @@ PNP_EXTERN void pn_proactor_free(pn_proactor_t *proactor);
* pn_proactor_release_connection()
*
* @param[in] addr the "host:port" network address, constructed by pn_proactor_addr()
- * An empty host will connect to the local host via the default protocol.
+ * An empty host will connect to the local host via the default protocol (IPV6 or IPV4).
* An empty port will connect to the standard AMQP port (5672).
*
* @param[in] connection @ref connection to be connected to @p addr.
@@ -128,7 +128,7 @@ PNP_EXTERN void pn_proactor_connect(pn_proactor_t *proactor, pn_connection_t *co
* is handled, or when pn_proactor_free() is called.
*
* @param[in] addr the "host:port" network address, constructed by pn_proactor_addr()
- * An empty host will listen for all protocols on all local interfaces.
+ * An empty host will listen for all protocols (IPV6 and IPV4) on all local interfaces.
* An empty port will listen on the standard AMQP port (5672).
* @param[in] backlog of un-handled connection requests to allow before refusing
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e1d1f2f4/proton-c/src/proactor/epoll.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/epoll.c b/proton-c/src/proactor/epoll.c
index 7930eb6..22a6d92 100644
--- a/proton-c/src/proactor/epoll.c
+++ b/proton-c/src/proactor/epoll.c
@@ -265,17 +265,21 @@ typedef struct pcontext_t {
pcontext_type_t type;
bool working;
int wake_ops; // unprocessed eventfd wake callback (convert to bool?)
- struct pcontext_t *next; // wake list, guarded by proactor eventfd_mutex
+ struct pcontext_t *wake_next; // wake list, guarded by proactor eventfd_mutex
+ bool closing;
+ // Next 4 are protected by the proactor mutex
+ struct pcontext_t* next; /* Protected by proactor.mutex */
+ struct pcontext_t* prev; /* Protected by proactor.mutex */
+ int disconnect_ops; /* ops remaining before disconnect complete */
+ bool disconnecting; /* pn_proactor_disconnect */
} pcontext_t;
static void pcontext_init(pcontext_t *ctx, pcontext_type_t t, pn_proactor_t *p, void *o) {
+ memset(ctx, 0, sizeof(*ctx));
pmutex_init(&ctx->mutex);
ctx->proactor = p;
ctx->owner = o;
ctx->type = t;
- ctx->working = false;
- ctx->wake_ops = 0;
- ctx->next = NULL;
}
static void pcontext_finalize(pcontext_t* ctx) {
@@ -285,16 +289,10 @@ static void pcontext_finalize(pcontext_t* ctx) {
/* common to connection and listener */
typedef struct psocket_t {
pn_proactor_t *proactor;
- // Next 4 are protected by the proactor mutex
- struct psocket_t* next; /* Protected by proactor.mutex */
- struct psocket_t* prev; /* Protected by proactor.mutex */
- bool disconnecting; /* pn_proactor_disconnect */
- int disconnect_ops; /* ops remaining before disconnect complete */
// Remaining protected by the pconnection/listener mutex
int sockfd;
epoll_extended_t epoll_io;
- bool is_conn;
- bool closing;
+ pn_listener_t *listener; /* NULL for a connection socket */
char addr_buf[PN_MAX_ADDR];
const char *host, *port;
} psocket_t;
@@ -304,7 +302,7 @@ struct pn_proactor_t {
int epollfd;
ptimer_t timer;
pn_collector_t *collector;
- psocket_t *psockets; /* track in-use psockets for PN_PROACTOR_INACTIVE and final cleanup */
+ pcontext_t *contexts; /* in-use contexts for PN_PROACTOR_INACTIVE and cleanup */
epoll_extended_t epoll_wake;
pn_event_t *cached_event;
pn_event_batch_t batch;
@@ -349,7 +347,7 @@ static bool wake(pcontext_t *ctx) {
if (!p->wake_list_first) {
p->wake_list_first = p->wake_list_last = ctx;
} else {
- p->wake_list_last->next = ctx;
+ p->wake_list_last->wake_next = ctx;
p->wake_list_last = ctx;
}
if (!p->wakes_in_progress) {
@@ -376,9 +374,9 @@ static pcontext_t *wake_pop_front(pn_proactor_t *p) {
assert(p->wakes_in_progress);
if (p->wake_list_first) {
ctx = p->wake_list_first;
- p->wake_list_first = ctx->next;
+ p->wake_list_first = ctx->wake_next;
if (!p->wake_list_first) p->wake_list_last = NULL;
- ctx->next = NULL;
+ ctx->wake_next = NULL;
if (!p->wake_list_first) {
/* Reset the eventfd until a future write.
@@ -402,19 +400,14 @@ static inline void wake_done(pcontext_t *ctx) {
}
-static void psocket_init(psocket_t* ps, pn_proactor_t* p, bool is_conn, const char *addr)
+static void psocket_init(psocket_t* ps, pn_proactor_t* p, pn_listener_t *listener, const char *addr)
{
ps->epoll_io.psocket = ps;
ps->epoll_io.fd = -1;
- ps->epoll_io.type = is_conn ? PCONNECTION_IO : LISTENER_IO;
+ ps->epoll_io.type = listener ? LISTENER_IO : PCONNECTION_IO;
ps->epoll_io.wanted = 0;
ps->proactor = p;
- ps->next = NULL;
- ps->prev = NULL;
- ps->disconnecting = false;
- ps->disconnect_ops = 0;
- ps->is_conn = is_conn;
- ps->closing = false;
+ ps->listener = listener;
ps->sockfd = -1;
pni_parse_addr(addr, ps->addr_buf, sizeof(ps->addr_buf), &ps->host, &ps->port);
}
@@ -434,6 +427,7 @@ typedef struct pconnection_t {
ptimer_t timer; // TODO: review one timerfd per connectoin
// Following values only changed by (sole) working context:
uint32_t current_arm; // active epoll io events
+ bool connected;
bool read_blocked;
bool write_blocked;
bool read_closed;
@@ -444,10 +438,13 @@ typedef struct pconnection_t {
pn_event_batch_t batch;
pn_connection_driver_t driver;
struct pn_netaddr_t local, remote; /* Actual addresses */
+ struct addrinfo *addrinfo; /* Resolved address list */
+ struct addrinfo *ai; /* Current connect address */
} pconnection_t;
struct pn_listener_t {
- psocket_t psocket;
+ psocket_t *psockets; /* Array of listening sockets */
+ size_t psockets_size;
pcontext_t context;
pn_condition_t *condition;
pn_collector_t *collector;
@@ -456,8 +453,7 @@ struct pn_listener_t {
pn_record_t *attachments;
void *listener_context;
size_t backlog;
- int available_accepts;
- int pending_accepts;
+ psocket_t *acceptable, *accepted;
bool close_dispatched;
bool armed;
};
@@ -465,15 +461,25 @@ struct pn_listener_t {
static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, bool timeout, bool topup);
static void listener_begin_close(pn_listener_t* l);
-static void proactor_add(psocket_t *ps);
-static bool proactor_remove(psocket_t *ps);
+static void proactor_add(pcontext_t *ctx);
+static bool proactor_remove(pcontext_t *ctx);
+
+static inline pconnection_t *psocket_pconnection(psocket_t* ps) {
+ return ps->listener ? NULL : (pconnection_t*)ps;
+}
-static inline pconnection_t *as_pconnection(psocket_t* ps) {
- return ps->is_conn ? (pconnection_t*)ps : NULL;
+static inline pn_listener_t *psocket_listener(psocket_t* ps) {
+ return ps->listener;
}
-static inline pn_listener_t *as_listener(psocket_t* ps) {
- return ps->is_conn ? NULL: (pn_listener_t*)ps;
+static inline pconnection_t *pcontext_pconnection(pcontext_t *c) {
+ return c->type == PCONNECTION ?
+ (pconnection_t*)((char*)c - offsetof(pconnection_t, context)) : NULL;
+}
+
+static inline pn_listener_t *pcontext_listener(pcontext_t *c) {
+ return c->type == LISTENER ?
+ (pn_listener_t*)((char*)c - offsetof(pn_listener_t, context)) : NULL;
}
static pn_event_t *listener_batch_next(pn_event_batch_t *batch);
@@ -515,15 +521,15 @@ static pn_event_t *log_event(void* p, pn_event_t *e) {
}
static void psocket_error(psocket_t *ps, int err, const char* what) {
- if (ps->is_conn) {
- pn_connection_driver_t *driver = &as_pconnection(ps)->driver;
+ if (!ps->listener) {
+ pn_connection_driver_t *driver = &psocket_pconnection(ps)->driver;
pn_connection_driver_bind(driver); /* Bind so errors will be reported */
pn_connection_driver_errorf(driver, COND_NAME, "%s %s:%s: %s",
what, ps->host, ps->port,
strerror(err));
pn_connection_driver_close(driver);
} else {
- pn_listener_t *l = as_listener(ps);
+ pn_listener_t *l = psocket_listener(ps);
pn_condition_format(l->condition, COND_NAME, "%s %s:%s: %s",
what, ps->host, ps->port,
strerror(err));
@@ -572,13 +578,14 @@ static pconnection_t *new_pconnection_t(pn_proactor_t *p, pn_connection_t *c, bo
abort();
}
pcontext_init(&pc->context, PCONNECTION, p, pc);
- psocket_init(&pc->psocket, p, true, addr);
+ psocket_init(&pc->psocket, p, NULL, addr);
pc->new_events = 0;
pc->wake_count = 0;
pc->tick_pending = false;
pc->timer_armed = false;
pc->current_arm = 0;
+ pc->connected = false;
pc->read_blocked = true;
pc->write_blocked = true;
pc->read_closed = true;
@@ -618,7 +625,7 @@ static void pconnection_cleanup(pconnection_t *pc) {
stop_polling(&pc->timer.epoll_io, pc->psocket.proactor->epollfd);
ptimer_finalize(&pc->timer);
lock(&pc->context.mutex);
- bool can_free = proactor_remove(&pc->psocket);
+ bool can_free = proactor_remove(&pc->context);
unlock(&pc->context.mutex);
if (can_free)
pconnection_final_free(pc);
@@ -627,8 +634,8 @@ static void pconnection_cleanup(pconnection_t *pc) {
// Call with lock held or from forced_shutdown
void pconnection_begin_close(pconnection_t *pc) {
- if (!pc->psocket.closing) {
- pc->psocket.closing = true;
+ if (!pc->context.closing) {
+ pc->context.closing = true;
pc->read_closed = pc->write_closed = true;
stop_polling(&pc->psocket.epoll_io, pc->psocket.proactor->epollfd);
pc->current_arm = 0;
@@ -753,6 +760,9 @@ static bool pconnection_write(pconnection_t *pc, pn_bytes_t wbuf) {
return true;
}
+static void pconnection_connected_lh(pconnection_t *pc);
+static void pconnection_maybe_connect_lh(pconnection_t *pc);
+
/*
* May be called concurrently from multiple threads:
* pn_event_batch_t loop (topup is true)
@@ -808,7 +818,7 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
// Confirmed as working thread. Review state and unlock ASAP.
- if (pc->psocket.closing && pconnection_is_final(pc)) {
+ if (pc->context.closing && pconnection_is_final(pc)) {
unlock(&pc->context.mutex);
pconnection_cleanup(pc);
return NULL;
@@ -833,7 +843,9 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
if (pc->new_events) {
if ((pc->new_events & (EPOLLHUP | EPOLLERR)) && !pc->read_closed && !pc->write_closed)
- pc->disconnected = true;
+ pconnection_maybe_connect_lh(pc);
+ else
+ pconnection_connected_lh(pc); /* Non error event means we are connected */
if (pc->new_events & EPOLLOUT)
pc->write_blocked = false;
if (pc->new_events & EPOLLIN)
@@ -934,7 +946,7 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
}
lock(&pc->context.mutex);
- if (pc->psocket.closing && pconnection_is_final(pc)) {
+ if (pc->context.closing && pconnection_is_final(pc)) {
unlock(&pc->context.mutex);
pconnection_cleanup(pc);
return NULL;
@@ -961,7 +973,6 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
return NULL;
}
-
static void configure_socket(int sock) {
int flags = fcntl(sock, F_GETFL);
flags |= O_NONBLOCK;
@@ -971,7 +982,19 @@ static void configure_socket(int sock) {
setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (void*) &tcp_nodelay, sizeof(tcp_nodelay));
}
-void pconnection_start(pconnection_t *pc) {
+/* Called with context.lock held */
+void pconnection_connected_lh(pconnection_t *pc) {
+ if (!pc->connected) {
+ pc->connected = true;
+ if (pc->addrinfo) {
+ freeaddrinfo(pc->addrinfo);
+ }
+ pc->addrinfo = NULL;
+ pc->ai = NULL;
+ }
+}
+
+static void pconnection_start(pconnection_t *pc) {
int efd = pc->psocket.proactor->epollfd;
start_polling(&pc->timer.epoll_io, efd); // TODO: check for error
@@ -990,40 +1013,63 @@ void pconnection_start(pconnection_t *pc) {
start_polling(ee, efd); // TODO: check for error
}
+/* Called on initial connect, and if connection fails to try another address */
+static void pconnection_maybe_connect_lh(pconnection_t *pc) {
+ errno = 0;
+ if (!pc->connected) { /* Not yet connected */
+ while (pc->ai) { /* Have an address */
+ struct addrinfo *ai = pc->ai;
+ pc->ai = pc->ai->ai_next; /* Move to next address in case this fails */
+ int fd = socket(ai->ai_family, SOCK_STREAM, 0);
+ if (fd >= 0) {
+ configure_socket(fd);
+ if (!connect(fd, ai->ai_addr, ai->ai_addrlen) || errno == EINPROGRESS) {
+ pc->psocket.sockfd = fd;
+ pconnection_start(pc);
+ return; /* Async connection started */
+ }
+ }
+ /* connect failed immediately, go round the loop to try the next addr */
+ }
+ /* If there was a previous attempted connection, let the poller discover the
+ errno from its socket, otherwise set the current error. */
+ if (pc->psocket.sockfd < 1) {
+ psocket_error(&pc->psocket, errno ? errno : ENOTCONN, "on connect");
+ }
+ }
+ pc->disconnected = true;
+}
+
+static int pgetaddrinfo(const char *host, const char *port, int flags, struct addrinfo **res)
+{
+ struct addrinfo hints = { 0 };
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = AI_V4MAPPED | AI_ADDRCONFIG | flags;
+ return getaddrinfo(host, port, &hints, res);
+}
+
void pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *addr) {
pconnection_t *pc = new_pconnection_t(p, c, false, addr);
assert(pc); // TODO: memory safety
// TODO: check case of proactor shutting down
lock(&pc->context.mutex);
- proactor_add(&pc->psocket);
+ proactor_add(&pc->context);
pn_connection_open(pc->driver.connection); /* Auto-open */
- struct addrinfo *ai = NULL;
- int fd = -1;
- if (!getaddrinfo(pc->psocket.host, pc->psocket.port, 0, &ai)) {
- fd = socket(ai->ai_family, SOCK_STREAM, ai->ai_protocol);
- if (fd >= 0) {
- configure_socket(fd);
- if (!connect(fd, ai->ai_addr, ai->ai_addrlen) || errno == EINPROGRESS) {
- pc->psocket.sockfd = fd;
- pconnection_start(pc);
- unlock(&pc->context.mutex);
- freeaddrinfo(ai);
- return;
- }
- }
+ bool notify = false;
+ if (!pgetaddrinfo(pc->psocket.host, pc->psocket.port, 0, &pc->addrinfo)) {
+ pc->ai = pc->addrinfo;
+ pconnection_maybe_connect_lh(pc); /* Start connection attempts */
+ notify = pc->disconnected;
+ } else {
+ psocket_error(&pc->psocket, errno, "connect to ");
+ notify = wake(&pc->context);
}
-
- psocket_error(&pc->psocket, errno, "connect to ");
- bool notify = wake(&pc->context);
- if (ai) freeaddrinfo(ai);
- if (fd != -1) close (fd);
unlock(&pc->context.mutex);
if (notify) wake_notify(&pc->context);
- return;
}
-
static void pconnection_tick(pconnection_t *pc) {
pn_transport_t *t = pc->driver.transport;
if (pn_transport_get_idle_timeout(t) || pn_transport_get_remote_idle_timeout(t)) {
@@ -1041,7 +1087,7 @@ void pn_connection_wake(pn_connection_t* c) {
pconnection_t *pc = get_pconnection(c);
if (pc) {
lock(&pc->context.mutex);
- if (!pc->psocket.closing) {
+ if (!pc->context.closing) {
pc->wake_count++;
notify = wake(&pc->context);
}
@@ -1093,53 +1139,76 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in
// TODO: check listener not already listening for this or another proactor
lock(&l->context.mutex);
l->context.proactor = p;;
- psocket_init(&l->psocket, p, false, addr);
l->backlog = backlog;
- proactor_add(&l->psocket);
+
+ char addr_buf[PN_MAX_ADDR];
+ const char *host, *port;
+ pni_parse_addr(addr, addr_buf, PN_MAX_ADDR, &host, &port);
+
+ struct addrinfo *addrinfo = NULL;
+ if (!pgetaddrinfo(host, port, AI_PASSIVE | AI_ALL, &addrinfo)) {
+ /* Count addresses, allocate enough space for sockets */
+ size_t len = 0;
+ for (struct addrinfo *ai = addrinfo; ai; ai = ai->ai_next) {
+ ++len;
+ }
+ assert(len > 0); /* guaranteed by getaddrinfo */
+ l->psockets = (psocket_t*)calloc(len, sizeof(psocket_t));
+ assert(l->psockets); /* FIXME aconway 2017-05-05: memory safety */
+ l->psockets_size = 0;
+ /* Find working listen addresses */
+ for (struct addrinfo *ai = addrinfo; ai; ai = ai->ai_next) {
+ int fd = socket(ai->ai_family, SOCK_STREAM, ai->ai_protocol);
+ static int on = 1;
+ if ((fd >= 0) &&
+ !setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) &&
+ /* We listen to v4/v6 on separate sockets, don't let v6 listen for v4 */
+ (ai->ai_family != AF_INET6 ||
+ !setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &on, sizeof(on))) &&
+ !bind(fd, ai->ai_addr, ai->ai_addrlen) &&
+ !listen(fd, backlog))
+ {
+ psocket_t *ps = &l->psockets[l->psockets_size++];
+ psocket_init(ps, p, l, addr);
+ ps->sockfd = fd;
+ ps->epoll_io.fd = fd;
+ ps->epoll_io.wanted = EPOLLIN;
+ start_polling(&ps->epoll_io, ps->proactor->epollfd); // TODO: check for error
+ }
+ }
+ }
+ if (addrinfo) freeaddrinfo(addrinfo);
/* Always put an OPEN event for symmetry, even if we immediately close with err */
pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_OPEN);
bool notify = wake(&l->context);
- struct addrinfo *ai = NULL;
- int fd = -1;
- if (!getaddrinfo(l->psocket.host, l->psocket.port, 0, &ai)) {
- fd = socket(ai->ai_family, SOCK_STREAM, ai->ai_protocol);
- if (fd >= 0) {
- int yes = 1;
- if (!setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)))
- if (!bind(fd, ai->ai_addr, ai->ai_addrlen))
- if (!listen(fd, backlog)) {
- l->psocket.sockfd = fd;
- l->psocket.epoll_io.fd = fd;
- l->psocket.epoll_io.wanted = EPOLLIN;
- start_polling(&l->psocket.epoll_io, l->psocket.proactor->epollfd); // TODO: check for error
- unlock(&l->context.mutex);
- if (notify) wake_notify(&l->context);
- freeaddrinfo(ai);
- return;
- }
- }
+ if (l->psockets_size == 0) { /* All failed, create dummy socket with an error */
+ int err = errno;
+ l->psockets = (psocket_t*)calloc(sizeof(psocket_t), 1);
+ psocket_init(l->psockets, p, l, addr);
+ psocket_error(l->psockets, err, "listen on");
}
-
- psocket_error(&l->psocket, errno, "listen on");
+ proactor_add(&l->context);
unlock(&l->context.mutex);
if (notify) wake_notify(&l->context);
- if (ai) freeaddrinfo(ai);
return;
}
// call with lock held
static inline bool listener_can_free(pn_listener_t *l) {
- return l->psocket.closing && l->close_dispatched &&
- !l->context.wake_ops;
+ return l->context.closing && l->close_dispatched && !l->context.wake_ops;
}
static inline void listener_final_free(pn_listener_t *l) {
pcontext_finalize(&l->context);
+ free(l->psockets);
free(l);
}
void pn_listener_free(pn_listener_t *l) {
+ /* Note at this point either the listener has never been used (freed by user)
+ or it has been closed, so all its sockets are closed.
+ */
// TODO: do we need a QPID DeletionManager equivalent to be safe from inbound connection (accept) epoll events?
if (l) {
bool can_free = true;
@@ -1147,8 +1216,9 @@ void pn_listener_free(pn_listener_t *l) {
if (l->condition) pn_condition_free(l->condition);
if (l->attachments) pn_free(l->attachments);
lock(&l->context.mutex);
- if (l->context.proactor)
- can_free = proactor_remove(&l->psocket);
+ if (l->context.proactor) {
+ can_free = proactor_remove(&l->context);
+ }
unlock(&l->context.mutex);
if (can_free)
listener_final_free(l);
@@ -1156,21 +1226,25 @@ void pn_listener_free(pn_listener_t *l) {
}
static void listener_begin_close(pn_listener_t* l) {
- if (!l->psocket.closing) {
- l->psocket.closing = true;
- if (l->psocket.sockfd >= 0) {
- stop_polling(&l->psocket.epoll_io, l->psocket.proactor->epollfd);
- close(l->psocket.sockfd);
+ if (!l->context.closing) {
+ l->context.closing = true;
+ /* Close all listening sockets */
+ for (size_t i = 0; i < l->psockets_size; ++i) {
+ psocket_t *ps = &l->psockets[i];
+ if (ps->sockfd >= 0) {
+ stop_polling(&ps->epoll_io, ps->proactor->epollfd);
+ close(ps->sockfd);
+ }
}
pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_CLOSE);
- l->pending_accepts = l->available_accepts = 0;
+ l->acceptable = l->accepted = NULL;
}
}
void pn_listener_close(pn_listener_t* l) {
bool notify = false;
lock(&l->context.mutex);
- if (!l->psocket.closing) {
+ if (!l->context.closing) {
listener_begin_close(l);
notify = wake(&l->context);
}
@@ -1188,17 +1262,19 @@ static void listener_forced_shutdown(pn_listener_t *l) {
pn_listener_free(l);
}
-
-static pn_event_batch_t *listener_process(pn_listener_t *l, uint32_t events) {
+/* Process a listening socket */
+static pn_event_batch_t *listener_process(psocket_t *ps, uint32_t events) {
// TODO: some parallelization of the accept mechanism.
+ pn_listener_t *l = psocket_listener(ps);
lock(&l->context.mutex);
if (events) {
l->armed = false;
- if (events & EPOLLRDHUP)
- psocket_error(&l->psocket, errno, "listener epoll"); // includes listener_begin_close
- else if (!l->psocket.closing && events & EPOLLIN) {
- l->available_accepts++;
- l->pending_accepts++;
+ if (events & EPOLLRDHUP) {
+ /* Calls listener_begin_close which closes all the listener's sockets */
+ psocket_error(ps, errno, "listener epoll");
+ } else if (!l->context.closing && events & EPOLLIN) {
+ l->acceptable = ps;
+ pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_ACCEPT);
}
} else {
wake_done(&l->context); // callback accounting
@@ -1206,7 +1282,7 @@ static pn_event_batch_t *listener_process(pn_listener_t *l, uint32_t events) {
pn_event_batch_t *lb = NULL;
if (!l->context.working) {
l->context.working = true;
- if (listener_has_event(l) || l->available_accepts)
+ if (listener_has_event(l))
lb = &l->batch;
else
l->context.working = false;
@@ -1219,10 +1295,6 @@ static pn_event_t *listener_batch_next(pn_event_batch_t *batch) {
pn_listener_t *l = batch_listener(batch);
lock(&l->context.mutex);
pn_event_t *e = NULL;
- if (!listener_has_event(l) && l->available_accepts && !l->psocket.closing) {
- l->available_accepts--;
- pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_ACCEPT);
- }
if (listener_has_event(l))
e = l->cached_event;
l->cached_event = NULL;
@@ -1244,12 +1316,14 @@ static void listener_done(pn_listener_t *l) {
return;
}
} else {
- if (listener_has_event(l) || l->available_accepts)
+ if (listener_has_event(l))
notify = wake(&l->context);
else {
- if (!l->psocket.closing && !l->armed) {
- rearm(l->psocket.proactor, &l->psocket.epoll_io);
+ /* Don't rearm until the current socket is accepted */
+ if (!l->context.closing && !l->armed && !l->acceptable && l->accepted) {
+ rearm(l->accepted->proactor, &l->accepted->epoll_io);
l->armed = true;
+ l->accepted = NULL;
}
}
}
@@ -1258,7 +1332,7 @@ static void listener_done(pn_listener_t *l) {
}
pn_proactor_t *pn_listener_proactor(pn_listener_t* l) {
- return l ? l->psocket.proactor : NULL;
+ return l ? l->psockets[0].proactor : NULL;
}
pn_condition_t* pn_listener_condition(pn_listener_t* l) {
@@ -1279,31 +1353,32 @@ pn_record_t *pn_listener_attachments(pn_listener_t *l) {
void pn_listener_accept(pn_listener_t *l, pn_connection_t *c) {
// TODO: fuller sanity check on input args
- pconnection_t *pc = new_pconnection_t(l->psocket.proactor, c, true, "");
+ pconnection_t *pc = new_pconnection_t(l->psockets[0].proactor, c, true, "");
assert(pc); // TODO: memory safety
int err = 0;
lock(&l->context.mutex);
- proactor_add(&pc->psocket);
- if (l->psocket.closing)
+ proactor_add(&pc->context);
+ if (l->context.closing)
err = EBADF;
- else {
- if (l->pending_accepts == 0)
+ else if (l->acceptable == 0) {
err = EAGAIN;
}
if (err) {
- psocket_error(&l->psocket, errno, "listener state on accept");
+ /* Error on one socket closes the entire listener */
+ psocket_error(&l->psockets[0], errno, "listener state on accept");
unlock(&l->context.mutex);
return;
}
- l->pending_accepts--;
+ psocket_t *ps = l->accepted = l->acceptable;
+ l->acceptable = 0;
- int newfd = accept(l->psocket.sockfd, NULL, 0);
+ int newfd = accept(ps->sockfd, NULL, 0);
if (newfd < 0) {
err = errno;
psocket_error(&pc->psocket, err, "failed initialization on accept");
- psocket_error(&l->psocket, err, "accept");
+ psocket_error(ps, err, "accept");
} else {
lock(&pc->context.mutex);
configure_socket(newfd);
@@ -1358,16 +1433,18 @@ void pn_proactor_free(pn_proactor_t *p) {
close(p->epollfd);
close(p->eventfd);
ptimer_finalize(&p->timer);
- while (p->psockets) {
- psocket_t *ps = p->psockets;
- p->psockets = ps->next;
- pconnection_t *pc = as_pconnection(ps);
- if (pc) {
- pconnection_forced_shutdown(pc);
- } else {
- pn_listener_t *l = as_listener(ps);
- if (l)
- listener_forced_shutdown(l);
+ while (p->contexts) {
+ pcontext_t *ctx = p->contexts;
+ p->contexts = ctx->next;
+ switch (ctx->type) {
+ case PCONNECTION:
+ pconnection_forced_shutdown(pcontext_pconnection(ctx));
+ break;
+ case LISTENER:
+ listener_forced_shutdown(pcontext_listener(ctx));
+ break;
+ default:
+ break;
}
}
@@ -1380,7 +1457,7 @@ void pn_proactor_free(pn_proactor_t *p) {
pn_proactor_t *pn_event_proactor(pn_event_t *e) {
if (pn_event_class(e) == pn_proactor__class()) return (pn_proactor_t*)pn_event_context(e);
pn_listener_t *l = pn_event_listener(e);
- if (l) return l->psocket.proactor;
+ if (l) return l->psockets[0].proactor;
pn_connection_t *c = pn_event_connection(e);
if (c) return pn_connection_proactor(pn_event_connection(e));
return NULL;
@@ -1462,29 +1539,28 @@ static pn_event_batch_t *proactor_process(pn_proactor_t *p, bool timeout) {
return NULL;
}
-static void proactor_add(psocket_t *ps) {
- pn_proactor_t *p = ps->proactor;
+static void proactor_add(pcontext_t *ctx) {
+ pn_proactor_t *p = ctx->proactor;
lock(&p->context.mutex);
- if (p->psockets) {
- p->psockets->prev = ps;
- ps->next = p->psockets;
- p->psockets = ps;
+ if (p->contexts) {
+ p->contexts->prev = ctx;
+ ctx->next = p->contexts;
}
- else p->psockets = ps;
+ p->contexts = ctx;
unlock(&p->context.mutex);
}
// call with psocket's mutex held
// return true if safe for caller to free psocket
-static bool proactor_remove(psocket_t *ps) {
- pn_proactor_t *p = ps->proactor;
+static bool proactor_remove(pcontext_t *ctx) {
+ pn_proactor_t *p = ctx->proactor;
lock(&p->context.mutex);
bool notify = false;
bool can_free = true;
- if (ps->disconnecting) {
- // No longer on psockets list
- if (--ps->disconnect_ops == 0) {
- if (--p->disconnects_pending == 0 && !p->psockets) {
+ if (ctx->disconnecting) {
+ // No longer on contexts list
+ if (--ctx->disconnect_ops == 0) {
+ if (--p->disconnects_pending == 0 && !p->contexts) {
p->inactive = true;
notify = wake(&p->context);
}
@@ -1494,18 +1570,18 @@ static bool proactor_remove(psocket_t *ps) {
}
else {
// normal case
- if (ps->prev)
- ps->prev->next = ps->next;
+ if (ctx->prev)
+ ctx->prev->next = ctx->next;
else {
- p->psockets = ps->next;
- ps->next = NULL;
- if (p->psockets)
- p->psockets->prev = NULL;
+ p->contexts = ctx->next;
+ ctx->next = NULL;
+ if (p->contexts)
+ p->contexts->prev = NULL;
}
- if (ps->next)
- ps->next->prev = ps->prev;
+ if (ctx->next)
+ ctx->next->prev = ctx->prev;
- if (!p->psockets && !p->disconnects_pending && !p->shutting_down) {
+ if (!p->contexts && !p->disconnects_pending && !p->shutting_down) {
p->inactive = true;
notify = wake(&p->context);
}
@@ -1524,7 +1600,7 @@ static pn_event_batch_t *process_inbound_wake(pn_proactor_t *p) {
case PCONNECTION:
return pconnection_process((pconnection_t *) ctx->owner, 0, false, false);
case LISTENER:
- return listener_process((pn_listener_t *) ctx->owner, 0);
+ return listener_process(&((pn_listener_t *) ctx->owner)->psockets[0], 0);
default:
assert(ctx->type == WAKEABLE); // TODO: implement or remove
}
@@ -1562,7 +1638,7 @@ static pn_event_batch_t *proactor_do_epoll(struct pn_proactor_t* p, bool can_blo
} else if (ee->type == PROACTOR_TIMER) {
batch = proactor_process(p, true);
} else {
- pconnection_t *pc = as_pconnection(ee->psocket);
+ pconnection_t *pc = psocket_pconnection(ee->psocket);
if (pc) {
if (ee->type == PCONNECTION_IO) {
batch = pconnection_process(pc, ev.events, false, false);
@@ -1572,9 +1648,8 @@ static pn_event_batch_t *proactor_do_epoll(struct pn_proactor_t* p, bool can_blo
}
}
else {
- pn_listener_t *l = as_listener(ee->psocket);
// TODO: can any of the listener processing be parallelized like IOCP?
- batch = listener_process(l, ev.events);
+ batch = listener_process(ee->psocket, ev.events);
}
}
@@ -1657,76 +1732,73 @@ pn_proactor_t *pn_connection_proactor(pn_connection_t* c) {
void pn_proactor_disconnect(pn_proactor_t *p, pn_condition_t *cond) {
lock(&p->context.mutex);
- // Move the whole psockets list into a disconnecting state
- psocket_t *disconnecting_psockets = p->psockets;
- p->psockets = NULL;
- // First pass: mark each psocket as disconnecting and update global pending count.
- psocket_t *ps = disconnecting_psockets;
- while (ps) {
- ps->disconnecting = true;
- ps->disconnect_ops = 2; // Second pass below and proactor_remove(), in any order.
+ // Move the whole contexts list into a disconnecting state
+ pcontext_t *disconnecting_pcontexts = p->contexts;
+ p->contexts = NULL;
+ // First pass: mark each pcontext as disconnecting and update global pending count.
+ pcontext_t *ctx = disconnecting_pcontexts;
+ while (ctx) {
+ ctx->disconnecting = true;
+ ctx->disconnect_ops = 2; // Second pass below and proactor_remove(), in any order.
p->disconnects_pending++;
- ps = ps->next;
+ ctx = ctx->next;
}
unlock(&p->context.mutex);
- if (!disconnecting_psockets)
+ if (!disconnecting_pcontexts)
return;
- // Second pass: different locking, close the psockets, free them if !disconnect_ops
+ // Second pass: different locking, close the pcontexts, free them if !disconnect_ops
bool notify = false;
- for (ps = disconnecting_psockets; ps; ps = ps->next) {
+ for (ctx = disconnecting_pcontexts; ctx; ctx = ctx ? ctx->next : NULL) {
bool do_free = false;
- pcontext_t *psocket_context = NULL;
- pmutex *ps_mutex = NULL;
- pconnection_t *pc = as_pconnection(ps);
+ pmutex *ctx_mutex = NULL;
+ pconnection_t *pc = pcontext_pconnection(ctx);
if (pc) {
- ps_mutex = &pc->context.mutex;
- lock(ps_mutex);
- if (!ps->closing) {
+ ctx_mutex = &pc->context.mutex;
+ lock(ctx_mutex);
+ if (!ctx->closing) {
if (cond) {
pn_condition_copy(pn_transport_condition(pc->driver.transport), cond);
}
pn_connection_driver_close(&pc->driver);
pc->read_closed = true;
- psocket_context = &pc->context;
}
} else {
- pn_listener_t *l = as_listener(ps);
+ pn_listener_t *l = pcontext_listener(ctx);
assert(l);
- ps_mutex = &l->context.mutex;
- lock(ps_mutex);
- if (!ps->closing) {
+ ctx_mutex = &l->context.mutex;
+ lock(ctx_mutex);
+ if (!ctx->closing) {
if (cond) {
pn_condition_copy(pn_listener_condition(l), cond);
}
listener_begin_close(l);
- psocket_context = &l->context;
}
}
lock(&p->context.mutex);
- if (--ps->disconnect_ops == 0) {
+ if (--ctx->disconnect_ops == 0) {
do_free = true;
- psocket_context = NULL;
- if (--p->disconnects_pending == 0 && !p->psockets) {
+ ctx = NULL;
+ if (--p->disconnects_pending == 0 && !p->contexts) {
p->inactive = true;
notify = wake(&p->context);
}
} else {
- // If initiating the close, wake the psocket to do the free.
- if (psocket_context)
- if (!wake(psocket_context))
- psocket_context = NULL; // Wake already pending.
+ // If initiating the close, wake the pcontext to do the free.
+ if (ctx)
+ if (!wake(ctx))
+ ctx = NULL; // Wake already pending.
}
unlock(&p->context.mutex);
- unlock(ps_mutex);
+ unlock(ctx_mutex);
if (do_free) {
if (pc) pconnection_final_free(pc);
- else listener_final_free(as_listener(ps));
+ else listener_final_free(pcontext_listener(ctx));
} else {
- if (psocket_context)
- wake_notify(psocket_context);
+ if (ctx)
+ wake_notify(ctx);
}
}
if (notify)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org