You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/02/11 02:51:50 UTC
[2/2] qpid-proton git commit: c proactor: improved robustness and
testing
c proactor: improved robustness and testing
Added assert self-tests to the libuv.c proactor. The assertions are kept in a
release build to fail fast and aid debugging.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/ec70d73d
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/ec70d73d
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/ec70d73d
Branch: refs/heads/master
Commit: ec70d73dd5e5b58eaf64f5e137104fd9d4042e70
Parents: afacb16
Author: Alan Conway <ac...@redhat.com>
Authored: Fri Feb 10 21:44:25 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Fri Feb 10 21:49:59 2017 -0500
----------------------------------------------------------------------
proton-c/src/proactor/libuv.c | 557 ++++++++++++++++++++-----------------
1 file changed, 306 insertions(+), 251 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ec70d73d/proton-c/src/proactor/libuv.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c
index 42bbfab..6064bd6 100644
--- a/proton-c/src/proactor/libuv.c
+++ b/proton-c/src/proactor/libuv.c
@@ -30,7 +30,10 @@
#include <proton/transport.h>
#include <proton/url.h>
+/* All asserts are cheap and should remain in a release build for debugability */
+#undef NDEBUG
#include <assert.h>
+
#include <stddef.h>
#include <stdio.h>
#include <stdlib.h>
@@ -58,12 +61,10 @@
wake-up to be processed in a single thread with no context switches.
Function naming:
- - on_ - called in leader thread via uv_run().
- - leader_ - called in leader thread, while processing the leader_q.
- - owner_ - called in owning thread, leader or worker but not concurrently.
-
- Note on_ and leader_ functions can call each other, the prefix indicates the
- path they are most often called on.
+ - on_* - called in leader thread by uv_run().
+ - leader_* - called in leader thread (either leader_q processing or from an on_ function)
+ - worker_* - called in worker thread
+ - *_lh - called with the relevant lock held
*/
const char *COND_NAME = "proactor";
@@ -80,6 +81,13 @@ PN_HANDLE(PN_PROACTOR)
PN_STRUCT_CLASSDEF(pn_proactor, CID_pn_proactor)
PN_STRUCT_CLASSDEF(pn_listener, CID_pn_listener)
+/* A psocket (connection or listener) has the following *mutually exclusive* states. */
+typedef enum {
+ ON_WORKER, /* On worker_q or in use by user code in worker thread */
+ ON_LEADER, /* On leader_q or in use the leader loop */
+ ON_UV /* Scheduled for a UV event, or in use by leader thread in on_ handler*/
+} psocket_state_t;
+
/* common to connection and listener */
typedef struct psocket_t {
/* Immutable */
@@ -87,14 +95,16 @@ typedef struct psocket_t {
/* Protected by proactor.lock */
struct psocket_t* next;
- void (*wakeup)(struct psocket_t*); /* interrupting action for leader */
+ psocket_state_t state;
+ void (*action)(struct psocket_t*); /* deferred action for leader */
+ void (*wakeup)(struct psocket_t*); /* wakeup action for leader */
- /* Only used by leader */
+ /* Only used by leader when it owns the psocket */
uv_tcp_t tcp;
- void (*action)(struct psocket_t*); /* deferred action for leader */
- bool is_conn:1;
char host[NI_MAXHOST];
char port[NI_MAXSERV];
+ bool is_conn;
+
} psocket_t;
/* Special value for psocket.next pointer when socket is not on any any list. */
@@ -105,11 +115,12 @@ static void psocket_init(psocket_t* ps, pn_proactor_t* p, bool is_conn, const ch
ps->next = &UNLISTED;
ps->is_conn = is_conn;
ps->tcp.data = ps;
+ ps->state = ON_WORKER;
/* For platforms that don't know about "amqp" and "amqps" service names. */
- if (strcmp(port, AMQP_PORT_NAME) == 0)
+ if (port && strcmp(port, AMQP_PORT_NAME) == 0)
port = AMQP_PORT;
- else if (strcmp(port, AMQPS_PORT_NAME) == 0)
+ else if (port && strcmp(port, AMQPS_PORT_NAME) == 0)
port = AMQPS_PORT;
/* Set to "\001" to indicate a NULL as opposed to an empty string "" */
strncpy(ps->host, host ? host : "\001", sizeof(ps->host));
@@ -132,24 +143,27 @@ typedef struct pconnection_t {
uv_timer_t timer;
uv_write_t write;
uv_shutdown_t shutdown;
- size_t writing;
- bool reading:1;
- bool server:1; /* accept, not connect */
+ size_t writing; /* size of pending write request, 0 if none pending */
+ bool reading; /* true if a read request is pending */
+ bool server; /* accept, not connect */
} pconnection_t;
struct pn_listener_t {
psocket_t psocket;
/* Only used by owner thread */
- pconnection_t *accepting; /* accept in progress */
+ pconnection_t *accepting; /* set in worker, used in UV loop for accept */
pn_condition_t *condition;
pn_collector_t *collector;
pn_event_batch_t batch;
pn_record_t *attachments;
void *context;
size_t backlog;
-};
+ bool closing; /* close requested or closed by error */
+ /* Only used in leader thread */
+ size_t connections; /* number of connections waiting to be accepted */
+};
typedef struct queue { psocket_t *front, *back; } queue;
@@ -166,17 +180,16 @@ struct pn_proactor_t {
/* Protected by lock */
uv_mutex_t lock;
- queue start_q;
- queue worker_q;
- queue leader_q;
+ queue worker_q; /* psockets ready for work, to be returned via pn_proactor_wait() */
+ queue leader_q; /* psockets waiting for attention by the leader thread */
size_t interrupt; /* pending interrupts */
pn_millis_t timeout;
size_t count; /* psocket count */
- bool inactive:1;
- bool timeout_request:1;
- bool timeout_elapsed:1;
- bool has_leader:1;
- bool batch_working:1; /* batch belongs to a worker. */
+ bool inactive;
+ bool timeout_request;
+ bool timeout_elapsed;
+ bool has_leader;
+ bool batch_working; /* batch is being processed in a worker thread */
};
static bool push_lh(queue *q, psocket_t *ps) {
@@ -201,90 +214,46 @@ static psocket_t* pop_lh(queue *q) {
return ps;
}
-static inline pconnection_t *as_pconnection_t(psocket_t* ps) {
- return ps->is_conn ? (pconnection_t*)ps : NULL;
-}
-
-static inline pn_listener_t *as_listener(psocket_t* ps) {
- return ps->is_conn ? NULL: (pn_listener_t*)ps;
-}
-
-/* Put ps on the leader queue for processing. Thread safe. */
-static void to_leader_lh(psocket_t *ps) {
- push_lh(&ps->proactor->leader_q, ps);
- uv_async_send(&ps->proactor->async); /* Wake leader */
-}
-
-static void to_leader(psocket_t *ps) {
- uv_mutex_lock(&ps->proactor->lock);
- to_leader_lh(ps);
- uv_mutex_unlock(&ps->proactor->lock);
-}
-
-/* Detach from IO and put ps on the worker queue */
-static void leader_to_worker(psocket_t *ps) {
- if (ps->is_conn) {
- pconnection_t *pc = as_pconnection_t(ps);
- /* Don't detach if there are no events yet. */
- if (pn_connection_driver_has_event(&pc->driver)) {
- if (pc->writing) {
- pc->writing = 0;
- uv_cancel((uv_req_t*)&pc->write);
- }
- if (pc->reading) {
- pc->reading = false;
- uv_read_stop((uv_stream_t*)&pc->psocket.tcp);
- }
- if (pc->timer.data && !uv_is_closing((uv_handle_t*)&pc->timer)) {
- uv_timer_stop(&pc->timer);
- }
- }
- } else {
- pn_listener_t *l = as_listener(ps);
- uv_read_stop((uv_stream_t*)&l->psocket.tcp);
- }
- uv_mutex_lock(&ps->proactor->lock);
- push_lh(&ps->proactor->worker_q, ps);
- uv_mutex_unlock(&ps->proactor->lock);
-}
-
-/* Set a deferred action for leader, if not already set. */
-static void owner_to_leader(psocket_t *ps, void (*action)(psocket_t*)) {
- uv_mutex_lock(&ps->proactor->lock);
- if (!ps->action) {
+/* Set state and action and push to relevant queue */
+static inline void set_state_lh(psocket_t *ps, psocket_state_t state, void (*action)(psocket_t*)) {
+ /* Illegal if ps is already listed under a different state */
+ assert(ps->next == &UNLISTED || ps->state == state);
+ ps->state = state;
+ if (action && !ps->action) {
ps->action = action;
}
- to_leader_lh(ps);
- uv_mutex_unlock(&ps->proactor->lock);
+ switch(state) {
+ case ON_LEADER: push_lh(&ps->proactor->leader_q, ps); break;
+ case ON_WORKER: push_lh(&ps->proactor->worker_q, ps); break;
+ case ON_UV:
+ assert(ps->next == &UNLISTED);
+ break; /* No queue for UV loop */
+ }
}
-/* Owner thread send to worker thread. Set deferred action if not already set. */
-static void owner_to_worker(psocket_t *ps, void (*action)(psocket_t*)) {
+/* Set state and action, push to queue and notify leader. Thread safe. */
+static void set_state(psocket_t *ps, psocket_state_t state, void (*action)(psocket_t*)) {
uv_mutex_lock(&ps->proactor->lock);
- if (!ps->action) {
- ps->action = action;
- }
- push_lh(&ps->proactor->worker_q, ps);
- uv_async_send(&ps->proactor->async); /* Wake leader */
+ set_state_lh(ps, state, action);
+ uv_async_send(&ps->proactor->async);
uv_mutex_unlock(&ps->proactor->lock);
}
+static inline pconnection_t *as_pconnection(psocket_t* ps) {
+ return ps->is_conn ? (pconnection_t*)ps : NULL;
+}
-/* Re-queue for further work */
-static void worker_requeue(psocket_t* ps) {
- uv_mutex_lock(&ps->proactor->lock);
- push_lh(&ps->proactor->worker_q, ps);
- uv_async_send(&ps->proactor->async); /* Wake leader */
- uv_mutex_unlock(&ps->proactor->lock);
+static inline pn_listener_t *as_listener(psocket_t* ps) {
+ return ps->is_conn ? NULL: (pn_listener_t*)ps;
}
-static pconnection_t *new_pconnection_t(pn_proactor_t *p, pn_connection_t *c, bool server, const char *host, const char *port) {
+static pconnection_t *new_pconnection(pn_proactor_t *p, pn_connection_t *c, bool server, const char *host, const char *port) {
pconnection_t *pc = (pconnection_t*)calloc(1, sizeof(*pc));
- if (!pc) return NULL;
- if (pn_connection_driver_init(&pc->driver, c, NULL) != 0) {
+ if (!pc || pn_connection_driver_init(&pc->driver, c, NULL) != 0) {
return NULL;
}
psocket_init(&pc->psocket, p, true, host, port);
+ pc->write.data = &pc->psocket;
if (server) {
pn_transport_set_server(pc->driver.transport);
}
@@ -319,74 +288,49 @@ static void leader_count(pn_proactor_t *p, int change) {
uv_mutex_unlock(&p->lock);
}
-/* Free if there are no uv callbacks pending and no events */
-static void leader_pconnection_t_maybe_free(pconnection_t *pc) {
- if (pn_connection_driver_has_event(&pc->driver)) {
- leader_to_worker(&pc->psocket); /* Return to worker */
- } else if (!(pc->psocket.tcp.data || pc->write.data || pc->shutdown.data || pc->timer.data)) {
- /* All UV requests are finished */
- pn_connection_driver_destroy(&pc->driver);
- leader_count(pc->psocket.proactor, -1);
- free(pc);
- }
+/* Final close event for a a pconnection_t */
+static void on_close_pconnection_final(uv_handle_t *h) {
+ pconnection_t *pc = (pconnection_t*)h->data;
+ free(pc);
}
-/* Free if there are no uv callbacks pending and no events */
-static void leader_listener_maybe_free(pn_listener_t *l) {
- if (pn_collector_peek(l->collector)) {
- leader_to_worker(&l->psocket); /* Return to worker */
- } else if (!l->psocket.tcp.data) {
- pn_condition_free(l->condition);
- leader_count(l->psocket.proactor, -1);
- free(l);
- }
-}
-
-/* Free if there are no uv callbacks pending and no events */
-static void leader_maybe_free(psocket_t *ps) {
- if (ps->is_conn) {
- leader_pconnection_t_maybe_free(as_pconnection_t(ps));
- } else {
- leader_listener_maybe_free(as_listener(ps));
- }
+/* Close event for uv_tcp_t of a pconnection_t */
+static void on_close_pconnection(uv_handle_t *h) {
+ pconnection_t *pc = (pconnection_t*)h->data;
+ assert(pc->psocket.state == ON_UV);
+ leader_count(pc->psocket.proactor, -1);
+ pn_connection_driver_destroy(&pc->driver);
+ uv_timer_stop(&pc->timer);
+ /* Close the timer with the final event to free the pconnection_t */
+ uv_close((uv_handle_t*)&pc->timer, on_close_pconnection_final);
}
-static void on_close(uv_handle_t *h) {
- psocket_t *ps = (psocket_t*)h->data;
- h->data = NULL; /* Mark closed */
- leader_maybe_free(ps);
+/* Close event for uv_tcp_t of a pn_listener_t */
+static void on_close_listener(uv_handle_t *h) {
+ pn_listener_t *l = (pn_listener_t*)h->data;
+ pn_condition_free(l->condition);
+ free(l);
}
-static void on_shutdown(uv_shutdown_t *shutdown, int err) {
- psocket_t *ps = (psocket_t*)shutdown->data;
- shutdown->data = NULL; /* Mark closed */
- leader_maybe_free(ps);
+static inline void leader_finished(psocket_t *ps) {
+ set_state(ps, ON_UV, NULL);
+ uv_close((uv_handle_t*)&ps->tcp, ps->is_conn ? on_close_pconnection : on_close_listener);
}
-static inline void leader_close(psocket_t *ps) {
- if (ps->tcp.data && !uv_is_closing((uv_handle_t*)&ps->tcp)) {
- uv_close((uv_handle_t*)&ps->tcp, on_close);
- }
- pconnection_t *pc = as_pconnection_t(ps);
- if (pc) {
- pn_connection_driver_close(&pc->driver);
- if (pc->timer.data && !uv_is_closing((uv_handle_t*)&pc->timer)) {
- uv_timer_stop(&pc->timer);
- uv_close((uv_handle_t*)&pc->timer, on_close);
- }
+static pconnection_t *get_pconnection(pn_connection_t* c) {
+ if (!c) {
+ return NULL;
}
- leader_maybe_free(ps);
-}
-
-static pconnection_t *get_pconnection_t(pn_connection_t* c) {
- if (!c) return NULL;
pn_record_t *r = pn_connection_attachments(c);
return (pconnection_t*) pn_record_get(r, PN_PROACTOR);
}
+static void leader_unwatch(psocket_t *ps);
+
static void leader_error(psocket_t *ps, int err, const char* what) {
+ assert(ps->state != ON_WORKER);
if (ps->is_conn) {
- pn_connection_driver_t *driver = &as_pconnection_t(ps)->driver;
+ pn_connection_driver_t *driver = &as_pconnection(ps)->driver;
pn_connection_driver_bind(driver); /* Bind so errors will be reported */
pn_connection_driver_errorf(driver, COND_NAME, "%s %s:%s: %s",
what, fixstr(ps->host), fixstr(ps->port),
@@ -398,16 +342,18 @@ static void leader_error(psocket_t *ps, int err, const char* what) {
what, fixstr(ps->host), fixstr(ps->port),
uv_strerror(err));
pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_CLOSE);
+ l->closing = true;
}
- leader_to_worker(ps); /* Worker to handle the error */
+ leader_unwatch(ps); /* Worker to handle the error */
}
/* uv-initialization */
static int leader_init(psocket_t *ps) {
+ ps->state = ON_LEADER;
leader_count(ps->proactor, +1);
int err = uv_tcp_init(&ps->proactor->loop, &ps->tcp);
if (!err) {
- pconnection_t *pc = as_pconnection_t(ps);
+ pconnection_t *pc = as_pconnection(ps);
if (pc) {
pc->connect.data = ps;
int err = uv_timer_init(&ps->proactor->loop, &pc->timer);
@@ -422,40 +368,53 @@ static int leader_init(psocket_t *ps) {
return err;
}
-/* Common logic for on_connect and on_accept */
-static void leader_connect_accept(pconnection_t *pc, int err, const char *what) {
+/* Outgoing connection */
+static void on_connect(uv_connect_t *connect, int err) {
+ pconnection_t *pc = (pconnection_t*)connect->data;
+ assert(pc->psocket.state == ON_UV);
if (!err) {
- leader_to_worker(&pc->psocket);
+ leader_unwatch(&pc->psocket);
} else {
- leader_error(&pc->psocket, err, what);
+ leader_error(&pc->psocket, err, "on connect to");
}
}
-static void on_connect(uv_connect_t *connect, int err) {
- leader_connect_accept((pconnection_t*)connect->data, err, "on connect");
-}
-
-static void on_accept(uv_stream_t* server, int err) {
+/* Incoming connection ready to be accepted */
+static void on_connection(uv_stream_t* server, int err) {
+ /* Unlike most on_* functions, this one can be called by the leader thrad when the
+ * listener is ON_WORKER, because there's no way to stop libuv from calling
+ * on_connection() in leader_unwatch(). Just increase a counter and deal with it in the
+ * worker thread.
+ */
pn_listener_t *l = (pn_listener_t*) server->data;
- if (err) {
- leader_error(&l->psocket, err, "on accept");
+ assert(l->psocket.state == ON_UV);
+ if (!err) {
+ ++l->connections;
+ leader_unwatch(&l->psocket);
+ } else {
+ leader_error(&l->psocket, err, "on incoming connection from");
}
- pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_ACCEPT);
- leader_to_worker(&l->psocket); /* Let user call pn_listener_accept */
}
-static void leader_accept(psocket_t *ps) {
- pn_listener_t * l = as_listener(ps);
+static void leader_accept(pn_listener_t * l) {
+ assert(l->psocket.state == ON_UV);
+ assert(l->accepting);
pconnection_t *pc = l->accepting;
l->accepting = NULL;
- if (pc) {
- int err = leader_init(&pc->psocket);
- if (!err) err = uv_accept((uv_stream_t*)&l->psocket.tcp, (uv_stream_t*)&pc->psocket.tcp);
- leader_connect_accept(pc, err, "on accept");
+ int err = leader_init(&pc->psocket);
+ if (!err) {
+ err = uv_accept((uv_stream_t*)&l->psocket.tcp, (uv_stream_t*)&pc->psocket.tcp);
+ }
+ if (!err) {
+ leader_unwatch(&pc->psocket);
+ } else {
+ leader_error(&pc->psocket, err, "accepting from");
+ leader_error(&l->psocket, err, "accepting from");
}
}
static int leader_resolve(psocket_t *ps, uv_getaddrinfo_t *info, bool server) {
+ assert(ps->state == ON_LEADER);
int err = leader_init(ps);
struct addrinfo hints = { 0 };
if (server) hints.ai_flags = AI_PASSIVE;
@@ -466,55 +425,75 @@ static int leader_resolve(psocket_t *ps, uv_getaddrinfo_t *info, bool server) {
}
static void leader_connect(psocket_t *ps) {
- pconnection_t *pc = as_pconnection_t(ps);
+ assert(ps->state == ON_LEADER);
+ pconnection_t *pc = as_pconnection(ps);
uv_getaddrinfo_t info;
int err = leader_resolve(ps, &info, false);
if (!err) {
err = uv_tcp_connect(&pc->connect, &pc->psocket.tcp, info.addrinfo->ai_addr, on_connect);
uv_freeaddrinfo(info.addrinfo);
}
- if (err) {
- leader_error(ps, err, "connect to");
+ if (!err) {
+ ps->state = ON_UV;
+ } else {
+ leader_error(ps, err, "connecting to");
}
}
static void leader_listen(psocket_t *ps) {
+ assert(ps->state == ON_LEADER);
pn_listener_t *l = as_listener(ps);
- uv_getaddrinfo_t info;
+ uv_getaddrinfo_t info;
int err = leader_resolve(ps, &info, true);
if (!err) {
err = uv_tcp_bind(&l->psocket.tcp, info.addrinfo->ai_addr, 0);
uv_freeaddrinfo(info.addrinfo);
}
- if (!err) err = uv_listen((uv_stream_t*)&l->psocket.tcp, l->backlog, on_accept);
- if (err) {
- leader_error(ps, err, "listen on ");
+ if (!err) {
+ err = uv_listen((uv_stream_t*)&l->psocket.tcp, l->backlog, on_connection);
+ }
+ if (!err) {
+ set_state(ps, ON_UV, NULL);
+ } else {
+ leader_error(ps, err, "listening on");
}
}
-static void on_tick(uv_timer_t *timer) {
- pconnection_t *pc = (pconnection_t*)timer->data;
+/* Generate tick events and return millis till next tick or 0 if no tick is required */
+static pn_millis_t leader_tick(pconnection_t *pc) {
+ assert(pc->psocket.state != ON_WORKER);
pn_transport_t *t = pc->driver.transport;
if (pn_transport_get_idle_timeout(t) || pn_transport_get_remote_idle_timeout(t)) {
- uv_timer_stop(&pc->timer);
uint64_t now = uv_now(pc->timer.loop);
uint64_t next = pn_transport_tick(t, now);
- if (next) {
- uv_timer_start(&pc->timer, on_tick, next - now, 0);
- }
+ return next ? next - now : 0;
+ }
+ return 0;
+}
+
+static void on_tick(uv_timer_t *timer) {
+ if (!timer->data) return; /* timer closed */
+ pconnection_t *pc = (pconnection_t*)timer->data;
+ assert(pc->psocket.state == ON_UV);
+ uv_timer_stop(&pc->timer);
+ pn_millis_t next = leader_tick(pc);
+ if (pn_connection_driver_has_event(&pc->driver)) {
+ leader_unwatch(&pc->psocket);
+ } else if (next) {
+ uv_timer_start(&pc->timer, on_tick, next, 0);
}
}
static void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
pconnection_t *pc = (pconnection_t*)stream->data;
+ assert(pc->psocket.state == ON_UV);
if (nread >= 0) {
pn_connection_driver_read_done(&pc->driver, nread);
on_tick(&pc->timer); /* check for tick changes. */
- leader_to_worker(&pc->psocket);
/* Reading continues automatically until stopped. */
} else if (nread == UV_EOF) { /* hangup */
pn_connection_driver_read_close(&pc->driver);
- leader_maybe_free(&pc->psocket);
+ leader_unwatch(&pc->psocket);
} else {
leader_error(&pc->psocket, nread, "on read from");
}
@@ -522,16 +501,17 @@ static void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
static void on_write(uv_write_t* write, int err) {
pconnection_t *pc = (pconnection_t*)write->data;
- write->data = NULL;
+ assert(pc->psocket.state == ON_UV);
+ size_t writing = pc->writing;
+ pc->writing = 0; /* This write is done regardless of outcome */
if (err == 0) {
- pn_connection_driver_write_done(&pc->driver, pc->writing);
- leader_to_worker(&pc->psocket);
+ pn_connection_driver_write_done(&pc->driver, writing);
+ leader_unwatch(&pc->psocket);
} else if (err == UV_ECANCELED) {
- leader_maybe_free(&pc->psocket);
+ leader_unwatch(&pc->psocket); /* cancelled by leader_unwatch, complete the job */
} else {
leader_error(&pc->psocket, err, "on write to");
}
- pc->writing = 0; /* Need to send a new write request */
}
static void on_timeout(uv_timer_t *timer) {
@@ -544,47 +524,93 @@ static void on_timeout(uv_timer_t *timer) {
// Read buffer allocation function for uv, just returns the transports read buffer.
static void alloc_read_buffer(uv_handle_t* stream, size_t size, uv_buf_t* buf) {
pconnection_t *pc = (pconnection_t*)stream->data;
+ assert(pc->psocket.state == ON_UV);
pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
*buf = uv_buf_init(rbuf.start, rbuf.size);
}
-static void leader_rewatch(psocket_t *ps) {
+/* Monitor a socket in the UV loop */
+static void leader_watch(psocket_t *ps) {
+ assert(ps->state == ON_LEADER);
int err = 0;
+ set_state(ps, ON_UV, NULL); /* Assume we are going to UV loop unless sent to worker or leader. */
+
if (ps->is_conn) {
- pconnection_t *pc = as_pconnection_t(ps);
- if (pc->timer.data) { /* uv-initialized */
- on_tick(&pc->timer); /* Re-enable ticks if required */
+ pconnection_t *pc = as_pconnection(ps);
+ if (pn_connection_driver_finished(&pc->driver)) {
+ leader_finished(ps);
+ return;
}
+ pn_millis_t next_tick = leader_tick(pc);
pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver);
-
- /* Ticks and checking buffers can generate events, process before proceeding */
if (pn_connection_driver_has_event(&pc->driver)) {
- leader_to_worker(ps);
- } else { /* Re-watch for IO */
- if (wbuf.size > 0 && !pc->writing) {
- pc->writing = wbuf.size;
- uv_buf_t buf = uv_buf_init((char*)wbuf.start, wbuf.size);
- pc->write.data = ps;
- uv_write(&pc->write, (uv_stream_t*)&pc->psocket.tcp, &buf, 1, on_write);
- } else if (wbuf.size == 0 && pn_connection_driver_write_closed(&pc->driver)) {
- pc->shutdown.data = ps;
- uv_shutdown(&pc->shutdown, (uv_stream_t*)&pc->psocket.tcp, on_shutdown);
- }
- if (rbuf.size > 0 && !pc->reading) {
- pc->reading = true;
- err = uv_read_start((uv_stream_t*)&pc->psocket.tcp, alloc_read_buffer, on_read);
- }
+ /* Ticks and checking buffers have generated events, send back to worker to process */
+ set_state(ps, ON_WORKER, NULL);
+ return;
+ }
+ if (next_tick) {
+ uv_timer_start(&pc->timer, on_tick, next_tick, 0);
+ }
+ if (wbuf.size > 0 && !pc->writing) {
+ pc->writing = wbuf.size;
+ uv_buf_t buf = uv_buf_init((char*)wbuf.start, wbuf.size);
+ err = uv_write(&pc->write, (uv_stream_t*)&pc->psocket.tcp, &buf, 1, on_write);
+ } else if (wbuf.size == 0 && pn_connection_driver_write_closed(&pc->driver)) {
+ pc->shutdown.data = ps;
+ err = uv_shutdown(&pc->shutdown, (uv_stream_t*)&pc->psocket.tcp, NULL);
+ }
+ if (rbuf.size > 0 && !pc->reading) {
+ pc->reading = true;
+ err = uv_read_start((uv_stream_t*)&pc->psocket.tcp, alloc_read_buffer, on_read);
}
} else {
pn_listener_t *l = as_listener(ps);
- err = uv_listen((uv_stream_t*)&l->psocket.tcp, l->backlog, on_accept);
+ if (l->closing && pn_collector_peek(l->collector)) {
+ leader_finished(&l->psocket);
+ } else {
+ if (l->accepting) {
+ leader_accept(l);
+ }
+ if (l->connections) {
+ leader_unwatch(ps);
+ }
+ }
}
if (err) {
- leader_error(ps, err, "rewatch");
+ leader_error(ps, err, "re-watching");
}
}
+/* Detach a socket from IO and put it on the worker queue */
+static void leader_unwatch(psocket_t *ps) {
+ assert(ps->state != ON_WORKER); /* From ON_UV or ON_LEADER */
+ if (ps->is_conn) {
+ pconnection_t *pc = as_pconnection(ps);
+ if (!pn_connection_driver_has_event(&pc->driver)) {
+ /* Don't return an empty event batch */
+ if (ps->state == ON_UV) {
+ return; /* Just leave it in the UV loop */
+ } else {
+ leader_watch(ps); /* Re-attach to UV loop */
+ }
+ return;
+ } else {
+ if (pc->writing) {
+ uv_cancel((uv_req_t*)&pc->write);
+ }
+ if (pc->reading) {
+ pc->reading = false;
+ uv_read_stop((uv_stream_t*)&pc->psocket.tcp);
+ }
+ if (pc->timer.data && !uv_is_closing((uv_handle_t*)&pc->timer)) {
+ uv_timer_stop(&pc->timer);
+ }
+ }
+ }
+ set_state(ps, ON_WORKER, NULL);
+}
+
/* Set the event in the proactor's batch */
static pn_event_batch_t *proactor_batch_lh(pn_proactor_t *p, pn_event_type_t t) {
pn_collector_put(p->collector, pn_proactor__class(), p, t);
@@ -609,23 +635,32 @@ static pn_event_batch_t* get_batch_lh(pn_proactor_t *p) {
}
}
for (psocket_t *ps = pop_lh(&p->worker_q); ps; ps = pop_lh(&p->worker_q)) {
+ assert(ps->state == ON_WORKER);
if (ps->is_conn) {
- pconnection_t *pc = as_pconnection_t(ps);
+ pconnection_t *pc = as_pconnection(ps);
return &pc->driver.batch;
} else { /* Listener */
pn_listener_t *l = as_listener(ps);
+ /* Generate accept events one at a time */
+ if (l->connections && !pn_collector_peek(l->collector)) {
+ --l->connections;
+ pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_ACCEPT);
+ }
return &l->batch;
}
- to_leader(ps); /* No event, back to leader */
+ set_state_lh(ps, ON_LEADER, NULL); /* No event, back to leader */
}
return 0;
}
-/* Called in any thread to set a wakeup action. Replaces any previous wakeup action. */
+/* Called in any thread to set a wakeup action */
static void wakeup(psocket_t *ps, void (*action)(psocket_t*)) {
uv_mutex_lock(&ps->proactor->lock);
- ps->wakeup = action;
- to_leader_lh(ps);
+ if (action && !ps->wakeup) {
+ ps->wakeup = action;
+ }
+ set_state_lh(ps, ON_LEADER, NULL);
+ uv_async_send(&ps->proactor->async); /* Wake leader */
uv_mutex_unlock(&ps->proactor->lock);
}
@@ -634,30 +669,36 @@ pn_listener_t *pn_event_listener(pn_event_t *e) {
}
pn_proactor_t *pn_event_proactor(pn_event_t *e) {
- if (pn_event_class(e) == pn_proactor__class()) return (pn_proactor_t*)pn_event_context(e);
+ if (pn_event_class(e) == pn_proactor__class()) {
+ return (pn_proactor_t*)pn_event_context(e);
+ }
pn_listener_t *l = pn_event_listener(e);
- if (l) return l->psocket.proactor;
+ if (l) {
+ return l->psocket.proactor;
+ }
pn_connection_t *c = pn_event_connection(e);
- if (c) return pn_connection_proactor(pn_event_connection(e));
+ if (c) {
+ return pn_connection_proactor(pn_event_connection(e));
+ }
return NULL;
}
void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) {
pconnection_t *pc = batch_pconnection(batch);
if (pc) {
+ assert(pc->psocket.state == ON_WORKER);
if (pn_connection_driver_has_event(&pc->driver)) {
- /* Process all events before going back to IO. */
- worker_requeue(&pc->psocket);
- } else if (pn_connection_driver_finished(&pc->driver)) {
- owner_to_leader(&pc->psocket, leader_close);
+ /* Process all events before going back to leader */
+ set_state(&pc->psocket, ON_WORKER, NULL);
} else {
- owner_to_leader(&pc->psocket, leader_rewatch);
+ set_state(&pc->psocket, ON_LEADER, leader_watch);
}
return;
}
pn_listener_t *l = batch_listener(batch);
if (l) {
- owner_to_leader(&l->psocket, leader_rewatch);
+ assert(l->psocket.state == ON_WORKER);
+ set_state(&l->psocket, ON_LEADER, leader_watch);
return;
}
pn_proactor_t *bp = batch_proactor(batch);
@@ -692,14 +733,16 @@ pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) {
}
}
for (psocket_t *ps = pop_lh(&p->leader_q); ps; ps = pop_lh(&p->leader_q)) {
- void (*action)(psocket_t*) = ps->action;
- void (*wakeup)(psocket_t*) = ps->wakeup;
- ps->action = NULL;
- ps->wakeup = NULL;
- if (action || wakeup) {
+ assert(ps->state == ON_LEADER);
+ if (ps->wakeup) {
+ uv_mutex_unlock(&p->lock);
+ ps->wakeup(ps);
+ ps->wakeup = NULL;
+ uv_mutex_lock(&p->lock);
+ } else if (ps->action) {
uv_mutex_unlock(&p->lock);
- if (action) action(ps);
- if (wakeup) wakeup(ps);
+ ps->action(ps);
+ ps->action = NULL;
uv_mutex_lock(&p->lock);
}
}
@@ -734,12 +777,11 @@ void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) {
}
int pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *host, const char *port) {
- pconnection_t *pc = new_pconnection_t(p, c, false, host, port);
+ pconnection_t *pc = new_pconnection(p, c, false, host, port);
if (!pc) {
return PN_OUT_OF_MEMORY;
}
- /* Process PN_CONNECTION_INIT before binding */
- owner_to_worker(&pc->psocket, leader_connect);
+ set_state(&pc->psocket, ON_LEADER, leader_connect);
return 0;
}
@@ -747,24 +789,26 @@ int pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *host, con
{
psocket_init(&l->psocket, p, false, host, port);
l->backlog = backlog;
- owner_to_leader(&l->psocket, leader_listen);
+ set_state(&l->psocket, ON_LEADER, leader_listen);
return 0;
}
pn_proactor_t *pn_connection_proactor(pn_connection_t* c) {
- pconnection_t *pc = get_pconnection_t(c);
+ pconnection_t *pc = get_pconnection(c);
return pc ? pc->psocket.proactor : NULL;
}
void leader_wake_connection(psocket_t *ps) {
- pconnection_t *pc = as_pconnection_t(ps);
+ assert(ps->state == ON_LEADER);
+ pconnection_t *pc = as_pconnection(ps);
pn_connection_t *c = pc->driver.connection;
pn_collector_put(pn_connection_collector(c), PN_OBJECT, c, PN_CONNECTION_WAKE);
- leader_to_worker(ps);
+ leader_unwatch(ps);
}
void pn_connection_wake(pn_connection_t* c) {
- wakeup(&get_pconnection_t(c)->psocket, leader_wake_connection);
+ /* May be called from any thread */
+ wakeup(&get_pconnection(c)->psocket, leader_wake_connection);
}
pn_proactor_t *pn_proactor() {
@@ -782,9 +826,11 @@ pn_proactor_t *pn_proactor() {
}
static void on_stopping(uv_handle_t* h, void* v) {
- uv_close(h, NULL); /* Close this handle */
+ if (!uv_is_closing(h)) {
+ uv_close(h, NULL); /* Close this handle */
+ }
if (!uv_loop_alive(h->loop)) /* Everything closed */
- uv_stop(h->loop); /* Stop the loop, pn_proactor_destroy() can return */
+ uv_stop(h->loop); /* Stop the loop, pn_proactor_destroy() can return */
}
void pn_proactor_free(pn_proactor_t *p) {
@@ -799,10 +845,7 @@ void pn_proactor_free(pn_proactor_t *p) {
static pn_event_t *listener_batch_next(pn_event_batch_t *batch) {
pn_listener_t *l = batch_listener(batch);
- pn_event_t *handled = pn_collector_prev(l->collector);
- if (handled && pn_event_type(handled) == PN_LISTENER_CLOSE) {
- owner_to_leader(&l->psocket, leader_close); /* Close event handled, do close */
- }
+ assert(l->psocket.state == ON_WORKER);
return pn_collector_next(l->collector);
}
@@ -811,6 +854,7 @@ static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) {
}
static void pn_listener_free(pn_listener_t *l) {
+ assert(l->psocket.state == ON_WORKER);
if (l) {
if (!l->collector) pn_collector_free(l->collector);
if (!l->condition) pn_condition_free(l->condition);
@@ -834,40 +878,51 @@ pn_listener_t *pn_listener() {
return l;
}
+void leader_listener_close(psocket_t *ps) {
+ assert(ps->state = ON_LEADER);
+ pn_listener_t *l = (pn_listener_t*)ps;
+ l->closing = true;
+ leader_watch(ps);
+}
+
void pn_listener_close(pn_listener_t* l) {
- wakeup(&l->psocket, leader_close);
+ /* This can be called from any thread, not just the owner of l */
+ wakeup(&l->psocket, leader_listener_close);
}
pn_proactor_t *pn_listener_proactor(pn_listener_t* l) {
+ assert(l->psocket.state == ON_WORKER);
return l ? l->psocket.proactor : NULL;
}
pn_condition_t* pn_listener_condition(pn_listener_t* l) {
+ assert(l->psocket.state == ON_WORKER);
return l->condition;
}
void *pn_listener_get_context(pn_listener_t *l) {
+ assert(l->psocket.state == ON_WORKER);
return l->context;
}
void pn_listener_set_context(pn_listener_t *l, void *context) {
+ assert(l->psocket.state == ON_WORKER);
l->context = context;
}
pn_record_t *pn_listener_attachments(pn_listener_t *l) {
+ assert(l->psocket.state == ON_WORKER);
return l->attachments;
}
int pn_listener_accept(pn_listener_t *l, pn_connection_t *c) {
+ assert(l->psocket.state == ON_WORKER);
if (l->accepting) {
return PN_STATE_ERR; /* Only one at a time */
}
- l->accepting = new_pconnection_t(
- l->psocket.proactor, c, true, l->psocket.host, l->psocket.port);
+ l->accepting = new_pconnection(l->psocket.proactor, c, true, l->psocket.host, l->psocket.port);
if (!l->accepting) {
return UV_ENOMEM;
}
- owner_to_leader(&l->psocket, leader_accept);
return 0;
}
-
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org