You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/02/27 18:04:54 UTC
[6/6] qpid-proton git commit: PROTON-1413: c proactor fix assertion
errors, simplify code
PROTON-1413: c proactor fix assertion errors, simplify code
- expanded & improved tests/proactor.c tests and tests/test_tools.h framework
- drop wakeup/action callbacks
- simpler listening logic using locks for concurrent leader/worker access
- centralize logic for socket processing and error handling
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/2dae68d6
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/2dae68d6
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/2dae68d6
Branch: refs/heads/master
Commit: 2dae68d6a2a98f457ca7691f74d56296431de866
Parents: 105b939
Author: Alan Conway <ac...@redhat.com>
Authored: Mon Feb 27 12:54:15 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Mon Feb 27 12:55:12 2017 -0500
----------------------------------------------------------------------
proton-c/src/proactor/libuv.c | 713 +++++++++++++++++------------------
proton-c/src/tests/proactor.c | 348 +++++++++--------
proton-c/src/tests/test_tools.h | 99 +++--
3 files changed, 584 insertions(+), 576 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2dae68d6/proton-c/src/proactor/libuv.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c
index 322f353..2fafbb3 100644
--- a/proton-c/src/proactor/libuv.c
+++ b/proton-c/src/proactor/libuv.c
@@ -43,34 +43,25 @@
#include <string.h>
/*
- libuv functions are thread unsafe. The exception is uv_async_send(), a thread safe
- call that we use to make uv_run() return.
+ libuv functions are thread unsafe, we use a"leader-worker-follower" model as follows:
- To provide concurrency proactor uses a "leader-worker-follower" model, threads take
- turns at the roles:
+ - At most one thread at a time is the "leader". The leader runs the UV loop till there
+ are events to process and then becomes a "worker"n
- - a single "leader" thread uses libuv, it runs the uv_loop the in short bursts to
- generate work. Once there is work it becomes becomes a "worker" thread, another thread
- takes over as leader.
+ - Concurrent "worker" threads process events for separate connections or listeners.
+ When they run out of work they become "followers"
- - "workers" handle events for separate connections or listeners concurrently. They do as
- much work as they can, when none is left they become "followers"
+ - A "follower" is idle, waiting for work. When the leader becomes a worker, one follower
+ takes over as the new leader.
- - "followers" wait for the leader to generate work. One follower becomes the new leader,
- the others become workers or continue to follow till they can get work.
-
- Any thread in a pool can take on any role necessary at run-time. All the work generated
- by an IO wake-up for a single connection can be processed in a single single worker
- thread to minimize context switching.
+ Any thread that calls pn_proactor_wait() or pn_proactor_get() can take on any of the
+ roles as required at run-time. Monitored sockets (connections or listeners) are passed
+ between threads on thread-safe queues.
Function naming:
- - on_* - called in leader thread via uv_run().
- - leader_* - called in leader thread (either leader_q processing or from an on_ function)
+ - on_*() - libuv callbacks, called in leader thread via uv_run().
+ - leader_* - only called in leader thread from
- *_lh - called with the relevant lock held
-
- LIFECYCLE: pconnection_t and pn_listener_t objects must not be deleted until all their
- UV handles have received a close callback. Freeing resources is initiated by uv_close()
- of the uv_tcp_t handle, and executed in an on_close() handler when it is safe.
*/
const char *AMQP_PORT = "5672";
@@ -86,13 +77,6 @@ PN_HANDLE(PN_PROACTOR)
PN_STRUCT_CLASSDEF(pn_proactor, CID_pn_proactor)
PN_STRUCT_CLASSDEF(pn_listener, CID_pn_listener)
-/* A psocket (connection or listener) has the following mutually exclusive states. */
-typedef enum {
- ON_WORKER, /* On worker_q or in use by user code in worker thread */
- ON_LEADER, /* On leader_q or in use the leader loop */
- ON_UV /* Scheduled for a UV event, or in use by leader thread in on_ handler*/
-} psocket_state_t;
-
/* common to connection and listener */
typedef struct psocket_t {
/* Immutable */
@@ -103,14 +87,15 @@ typedef struct psocket_t {
/* Protected by proactor.lock */
struct psocket_t* next;
- psocket_state_t state;
+ bool working; /* Owned by a worker thread */
void (*action)(struct psocket_t*); /* deferred action for leader */
- void (*wakeup)(struct psocket_t*); /* wakeup action for leader */
/* Only used by leader thread when it owns the psocket */
uv_tcp_t tcp;
} psocket_t;
+typedef struct queue { psocket_t *front, *back; } queue;
+
/* Special value for psocket.next pointer when socket is not on any any list. */
psocket_t UNLISTED;
@@ -118,8 +103,8 @@ static void psocket_init(psocket_t* ps, pn_proactor_t* p, bool is_conn, const ch
ps->proactor = p;
ps->next = &UNLISTED;
ps->is_conn = is_conn;
- ps->tcp.data = ps;
- ps->state = ON_WORKER;
+ ps->tcp.data = NULL; /* Set in leader_init */
+ ps->working = true;
/* For platforms that don't know about "amqp" and "amqps" service names. */
if (port && strcmp(port, AMQP_PORT_NAME) == 0)
@@ -136,7 +121,7 @@ static inline const char* fixstr(const char* str) {
return str[0] == '\001' ? NULL : str;
}
-/* Holds a psocket and a pn_connection_driver */
+/* a connection socket */
typedef struct pconnection_t {
psocket_t psocket;
@@ -149,31 +134,33 @@ typedef struct pconnection_t {
uv_write_t write;
uv_shutdown_t shutdown;
size_t writing; /* size of pending write request, 0 if none pending */
- bool server; /* accepting not connecting */
+
+ /* Locked for thread-safe access */
+ uv_mutex_t lock;
+ bool wake; /* pn_connection_wake() was called */
} pconnection_t;
-/* pn_listener_t with a psocket_t */
+/* a listener socket */
struct pn_listener_t {
psocket_t psocket;
/* Only used by owner thread */
- pconnection_t *accepting; /* set in worker, used in UV loop for accept */
- pn_condition_t *condition;
- pn_collector_t *collector;
pn_event_batch_t batch;
pn_record_t *attachments;
void *context;
size_t backlog;
- /* Only used in leader thread */
- size_t connections; /* number of connections waiting to be accepted */
- int err; /* uv error code, 0 = OK, UV_EOF = closed */
- const char *what; /* static description string */
+ /* Locked for thread-safe access. uv_listen can't be stopped or cancelled so we can't
+ * detach a listener from the UV loop to prevent concurrent access.
+ */
+ uv_mutex_t lock;
+ pn_condition_t *condition;
+ pn_collector_t *collector;
+ queue accept; /* pconnection_t for uv_accept() */
+ bool closed;
};
-typedef struct queue { psocket_t *front, *back; } queue;
-
struct pn_proactor_t {
/* Leader thread */
uv_cond_t cond;
@@ -199,19 +186,15 @@ struct pn_proactor_t {
bool batch_working; /* batch is being processed in a worker thread */
};
-/* Push ps to back of q. Must not be on a different queue */
-static bool push_lh(queue *q, psocket_t *ps) {
- if (ps->next == &UNLISTED) {
- ps->next = NULL;
- if (!q->front) {
- q->front = q->back = ps;
- } else {
- q->back->next = ps;
- q->back = ps;
- }
- return true;
+static void push_lh(queue *q, psocket_t *ps) {
+ assert(ps->next == &UNLISTED);
+ ps->next = NULL;
+ if (!q->front) {
+ q->front = q->back = ps;
+ } else {
+ q->back->next = ps;
+ q->back = ps;
}
- return false;
}
/* Pop returns front of q or NULL if empty */
@@ -229,51 +212,26 @@ static inline void notify(pn_proactor_t* p) {
uv_async_send(&p->async);
}
-static void to_leader_lh(psocket_t *ps) {
- if (push_lh(&ps->proactor->leader_q, ps)) {
- ps->state = ON_LEADER;
- }
-}
-
-/* Queue an action for the leader thread */
-static void to_leader(psocket_t *ps, void (*action)(psocket_t*)) {
- uv_mutex_lock(&ps->proactor->lock);
- ps->action = action;
- to_leader_lh(ps);
- uv_mutex_unlock(&ps->proactor->lock);
- notify(ps->proactor);
-}
-
-/* Push to the worker thread */
-static void to_worker(psocket_t *ps) {
+/* Notify that this socket needs attention from the leader at the next opportunity */
+static void psocket_notify(psocket_t *ps) {
uv_mutex_lock(&ps->proactor->lock);
- if (push_lh(&ps->proactor->worker_q, ps)) {
- ps->state = ON_WORKER;
- }
- notify(ps->proactor);
- uv_mutex_unlock(&ps->proactor->lock);
-}
-
-/* Set state to ON_UV */
-static void to_uv(psocket_t *ps) {
- uv_mutex_lock(&ps->proactor->lock);
- if (ps->next == &UNLISTED) {
- ps->state = ON_UV;
+ /* Only queue if not working and not already queued */
+ if (!ps->working && ps->next == &UNLISTED) {
+ push_lh(&ps->proactor->leader_q, ps);
+ notify(ps->proactor);
}
uv_mutex_unlock(&ps->proactor->lock);
}
-/* Called in any thread to set a wakeup action */
-static void wakeup(psocket_t *ps, void (*action)(psocket_t*)) {
+/* Notify the leader of a newly-created socket */
+static void psocket_start(psocket_t *ps) {
uv_mutex_lock(&ps->proactor->lock);
- ps->wakeup = action;
- /* If ON_WORKER we'll do the wakeup in pn_proactor_done() */
- if (ps->next == &UNLISTED && ps->state != ON_WORKER) {
+ if (ps->next == &UNLISTED) { /* No-op if already queued */
+ ps->working = false;
push_lh(&ps->proactor->leader_q, ps);
- ps->state = ON_LEADER; /* Otherwise notify the leader */
+ notify(ps->proactor);
+ uv_mutex_unlock(&ps->proactor->lock);
}
- uv_async_send(&ps->proactor->async); /* Wake leader */
- uv_mutex_unlock(&ps->proactor->lock);
}
static inline pconnection_t *as_pconnection(psocket_t* ps) {
@@ -318,6 +276,14 @@ static inline pconnection_t *batch_pconnection(pn_event_batch_t *batch) {
return d ? (pconnection_t*)((char*)d - offsetof(pconnection_t, driver)) : NULL;
}
+static inline psocket_t *batch_psocket(pn_event_batch_t *batch) {
+ pconnection_t *pc = batch_pconnection(batch);
+ if (pc) return &pc->psocket;
+ pn_listener_t *l = batch_listener(batch);
+ if (l) return &l->psocket;
+ return NULL;
+}
+
static void leader_count(pn_proactor_t *p, int change) {
uv_mutex_lock(&p->lock);
p->count += change;
@@ -340,6 +306,12 @@ static void on_close_pconnection_final(uv_handle_t *h) {
pconnection_free((pconnection_t*)h->data);
}
+static void uv_safe_close(uv_handle_t *h, uv_close_cb cb) {
+ if (!uv_is_closing(h)) {
+ uv_close(h, cb);
+ }
+}
+
/* Close event for uv_tcp_t of a psocket_t */
static void on_close_psocket(uv_handle_t *h) {
psocket_t *ps = (psocket_t*)h->data;
@@ -348,7 +320,7 @@ static void on_close_psocket(uv_handle_t *h) {
pconnection_t *pc = as_pconnection(ps);
uv_timer_stop(&pc->timer);
/* Delay the free till the timer handle is also closed */
- uv_close((uv_handle_t*)&pc->timer, on_close_pconnection_final);
+ uv_safe_close((uv_handle_t*)&pc->timer, on_close_pconnection_final);
} else {
pn_listener_free(as_listener(ps));
}
@@ -362,48 +334,49 @@ static pconnection_t *get_pconnection(pn_connection_t* c) {
return (pconnection_t*) pn_record_get(r, PN_PROACTOR);
}
-static void pconnection_to_worker(pconnection_t *pc);
-static void listener_to_worker(pn_listener_t *l);
-
-int pconnection_error(pconnection_t *pc, int err, const char* what) {
- if (err) {
- pn_connection_driver_t *driver = &pc->driver;
- pn_connection_driver_bind(driver); /* Bind so errors will be reported */
+static void pconnection_error(pconnection_t *pc, int err, const char* what) {
+ assert(err);
+ pn_connection_driver_t *driver = &pc->driver;
+ pn_connection_driver_bind(driver); /* Bind so errors will be reported */
+ if (!pn_condition_is_set(pn_transport_condition(driver->transport))) {
pn_connection_driver_errorf(driver, uv_err_name(err), "%s %s:%s: %s",
what, fixstr(pc->psocket.host), fixstr(pc->psocket.port),
uv_strerror(err));
- pn_connection_driver_close(driver);
- pconnection_to_worker(pc);
}
- return err;
+ pn_connection_driver_close(driver);
}
-static int listener_error(pn_listener_t *l, int err, const char* what) {
- if (err) {
- l->err = err;
- l->what = what;
- listener_to_worker(l);
+static void listener_error(pn_listener_t *l, int err, const char* what) {
+ assert(err);
+ uv_mutex_lock(&l->lock);
+ if (!pn_condition_is_set(l->condition)) {
+ pn_condition_format(l->condition, uv_err_name(err), "%s %s:%s: %s",
+ what, fixstr(l->psocket.host), fixstr(l->psocket.port),
+ uv_strerror(err));
}
- return err;
+ uv_mutex_unlock(&l->lock);
+ pn_listener_close(l);
}
-static int psocket_error(psocket_t *ps, int err, const char* what) {
- if (err) {
- if (ps->is_conn) {
- pconnection_error(as_pconnection(ps), err, "initialization");
- } else {
- listener_error(as_listener(ps), err, "initialization");
- }
+static void psocket_error(psocket_t *ps, int err, const char* what) {
+ if (ps->is_conn) {
+ pconnection_error(as_pconnection(ps), err, "initialization");
+ } else {
+ listener_error(as_listener(ps), err, "initialization");
}
- return err;
}
+/* FIXME aconway 2017-02-25: split socket/queue */
+
/* psocket uv-initialization */
static int leader_init(psocket_t *ps) {
- ps->state = ON_LEADER;
+ ps->working = false;
+ ps->tcp.data = ps;
leader_count(ps->proactor, +1);
int err = uv_tcp_init(&ps->proactor->loop, &ps->tcp);
- if (!err) {
+ if (err) {
+ psocket_error(ps, err, "initialization");
+ } else {
pconnection_t *pc = as_pconnection(ps);
if (pc) {
pc->connect.data = ps;
@@ -412,50 +385,63 @@ static int leader_init(psocket_t *ps) {
pc->timer.data = ps;
}
}
- } else {
- psocket_error(ps, err, "initialization");
}
return err;
}
+/* Check if a pconnection has work for a worker thread. Called by owning thread. */
+static bool pconnection_needs_work(pconnection_t *pc) {
+ if (!pc->writing) { /* Can't detach for work while write is pending */
+ /* Check for wake requests */
+ uv_mutex_lock(&pc->lock);
+ bool wake = pc->wake;
+ pc->wake = false;
+ uv_mutex_unlock(&pc->lock);
+ if (wake) {
+ pn_connection_t *c = pc->driver.connection;
+ pn_collector_put(pn_connection_collector(c), PN_OBJECT, c, PN_CONNECTION_WAKE);
+ }
+ return pn_connection_driver_has_event(&pc->driver);
+ }
+ return false;
+}
+
+/* Detach a connection from the UV loop so it can be used safely by a worker */
+void pconnection_detach(pconnection_t *pc) {
+ uv_read_stop((uv_stream_t*)&pc->psocket.tcp);
+ uv_timer_stop(&pc->timer);
+ psocket_notify(&pc->psocket);
+}
+
/* Outgoing connection */
static void on_connect(uv_connect_t *connect, int err) {
pconnection_t *pc = (pconnection_t*)connect->data;
- if (!pconnection_error(pc, err, "on connect to")) {
- pconnection_to_worker(pc);
- }
+ assert(!pc->psocket.working);
+ if (err) pconnection_error(pc, err, "on connect to");
+ pconnection_detach(pc); /* FIXME aconway 2017-02-25: detach AFTER error or vv */
}
/* Incoming connection ready to be accepted */
static void on_connection(uv_stream_t* server, int err) {
- /* Unlike most on_* functions, this one can be called by the leader thread when the
- * listener is ON_WORKER, because there's no way to stop libuv from calling
- * on_connection(). Just increase a counter and generate events in to_worker.
+ /* Unlike most on_* functions, this can be called by the leader thread when the listener
+ * is ON_WORKER or ON_LEADER, because there's no way to stop libuv from calling
+ * on_connection(). Update the state of the listener and queue it for leader attention.
*/
pn_listener_t *l = (pn_listener_t*) server->data;
- l->err = err;
- if (!err) ++l->connections;
- listener_to_worker(l); /* If already ON_WORKER it will stay there */
-}
-
-static void leader_accept(pn_listener_t * l) {
- assert(l->accepting);
- pconnection_t *pc = l->accepting;
- l->accepting = NULL;
- int err = leader_init(&pc->psocket);
- if (!err) {
- err = uv_accept((uv_stream_t*)&l->psocket.tcp, (uv_stream_t*)&pc->psocket.tcp);
- }
- if (!err) {
- pconnection_to_worker(pc);
+ if (err) {
+ listener_error(l, err, "on incoming connection");
} else {
- pconnection_error(pc, err, "accepting from");
- listener_error(l, err, "accepting from");
+ uv_mutex_lock(&l->lock);
+ pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_ACCEPT);
+ uv_mutex_unlock(&l->lock);
+ psocket_notify(&l->psocket);
}
}
+// #error FIXME REVIW UPWARDS FROM HERE ^^^^
+
+/* Common address resolution for leader_listen and leader_connect */
static int leader_resolve(psocket_t *ps, uv_getaddrinfo_t *info, bool server) {
- assert(ps->state == ON_LEADER);
int err = leader_init(ps);
struct addrinfo hints = { 0 };
if (server) hints.ai_flags = AI_PASSIVE;
@@ -465,27 +451,23 @@ static int leader_resolve(psocket_t *ps, uv_getaddrinfo_t *info, bool server) {
return err;
}
-static void leader_connect(psocket_t *ps) {
- assert(ps->state == ON_LEADER);
- pconnection_t *pc = as_pconnection(ps);
+static void leader_connect(pconnection_t *pc) {
uv_getaddrinfo_t info;
- int err = leader_resolve(ps, &info, false);
+ int err = leader_resolve(&pc->psocket, &info, false);
if (!err) {
err = uv_tcp_connect(&pc->connect, &pc->psocket.tcp, info.addrinfo->ai_addr, on_connect);
uv_freeaddrinfo(info.addrinfo);
}
- if (!err) {
- ps->state = ON_UV;
- } else {
+ if (err) {
pconnection_error(pc, err, "connecting to");
+ } else {
+ pn_connection_open(pc->driver.connection);
}
}
-static void leader_listen(psocket_t *ps) {
- assert(ps->state == ON_LEADER);
- pn_listener_t *l = as_listener(ps);
+static void leader_listen(pn_listener_t *l) {
uv_getaddrinfo_t info;
- int err = leader_resolve(ps, &info, true);
+ int err = leader_resolve(&l->psocket, &info, true);
if (!err) {
err = uv_tcp_bind(&l->psocket.tcp, info.addrinfo->ai_addr, 0);
uv_freeaddrinfo(info.addrinfo);
@@ -493,17 +475,53 @@ static void leader_listen(psocket_t *ps) {
if (!err) {
err = uv_listen((uv_stream_t*)&l->psocket.tcp, l->backlog, on_connection);
}
- if (!err) {
- pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_OPEN);
- listener_to_worker(l); /* Let worker see the OPEN event */
+ uv_mutex_lock(&l->lock);
+ /* Always put an OPEN event for symmetry, even if we immediately close with err */
+ pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_OPEN);
+ uv_mutex_unlock(&l->lock);
+ if (err) listener_error(l, err, "listening on");
+}
+
+static bool listener_needs_work(pn_listener_t *l) {
+ uv_mutex_lock(&l->lock);
+ bool needs_work = pn_collector_peek(l->collector);
+ uv_mutex_unlock(&l->lock);
+ return needs_work;
+}
+
+static bool listener_finished_lh(pn_listener_t *l) {
+ return l->closed && !pn_collector_peek(l->collector) && !l->accept.front;
+}
+
+static bool leader_process_listener(pn_listener_t * l) {
+ if (l->psocket.tcp.data == NULL) {
+ leader_listen(l);
} else {
- listener_error(l, err, "listening on");
+ uv_mutex_lock(&l->lock);
+ pconnection_t *pc;
+ while (!listener_finished_lh(l) && (pc = (pconnection_t*)pop_lh(&l->accept))) {
+ uv_mutex_unlock(&l->lock);
+ int err = leader_init(&pc->psocket);
+ if (!err) err = uv_accept((uv_stream_t*)&l->psocket.tcp, (uv_stream_t*)&pc->psocket.tcp);
+ if (err) {
+ listener_error(l, err, "accepting from");
+ psocket_notify(&l->psocket);
+ pconnection_error(pc, err, "accepting from");
+ }
+ psocket_start(&pc->psocket);
+ uv_mutex_lock(&l->lock);
+ }
+ if (listener_finished_lh(l)) {
+ uv_safe_close((uv_handle_t*)&l->psocket.tcp, on_close_psocket);
+ }
+ uv_mutex_unlock(&l->lock);
}
+ return listener_needs_work(l);
}
/* Generate tick events and return millis till next tick or 0 if no tick is required */
static pn_millis_t leader_tick(pconnection_t *pc) {
- assert(pc->psocket.state != ON_WORKER);
+ assert(!pc->psocket.working);
uint64_t now = uv_now(pc->timer.loop);
uint64_t next = pn_transport_tick(pc->driver.transport, now);
return next ? next - now : 0;
@@ -511,38 +529,36 @@ static pn_millis_t leader_tick(pconnection_t *pc) {
static void on_tick(uv_timer_t *timer) {
pconnection_t *pc = (pconnection_t*)timer->data;
- pn_millis_t next = leader_tick(pc); /* May generate events */
- if (pn_connection_driver_has_event(&pc->driver)) {
- pconnection_to_worker(pc);
- } else if (next) {
- uv_timer_start(&pc->timer, on_tick, next, 0);
- }
+ assert(!pc->psocket.working);
+ leader_tick(pc);
+ pconnection_detach(pc);
+ /* FIXME aconway 2017-02-25: optimize - don't detach if no work. Need to check for finished? */
}
static void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
pconnection_t *pc = (pconnection_t*)stream->data;
+ assert(!pc->psocket.working);
if (nread >= 0) {
pn_connection_driver_read_done(&pc->driver, nread);
- pconnection_to_worker(pc);
} else if (nread == UV_EOF) { /* hangup */
pn_connection_driver_read_close(&pc->driver);
- pconnection_to_worker(pc);
} else {
pconnection_error(pc, nread, "on read from");
}
+ pconnection_detach(pc);
}
static void on_write(uv_write_t* write, int err) {
pconnection_t *pc = (pconnection_t*)write->data;
- if (err == 0) {
- pn_connection_driver_write_done(&pc->driver, pc->writing);
- pconnection_to_worker(pc);
- } else if (err == UV_ECANCELED) {
- pconnection_to_worker(pc);
- } else {
+ assert(!pc->psocket.working);
+ size_t size = pc->writing;
+ pc->writing = 0;
+ if (err) {
pconnection_error(pc, err, "on write to");
+ } else {
+ pn_connection_driver_write_done(&pc->driver, size);
}
- pc->writing = 0;
+ pconnection_detach(pc);
}
static void on_timeout(uv_timer_t *timer) {
@@ -552,110 +568,13 @@ static void on_timeout(uv_timer_t *timer) {
uv_mutex_unlock(&p->lock);
}
-// Read buffer allocation function for uv, just returns the transports read buffer.
+/* Read buffer allocation function for uv, just returns the transports read buffer. */
static void alloc_read_buffer(uv_handle_t* stream, size_t size, uv_buf_t* buf) {
pconnection_t *pc = (pconnection_t*)stream->data;
pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
*buf = uv_buf_init(rbuf.start, rbuf.size);
}
-static void pconnection_to_uv(pconnection_t *pc) {
- to_uv(&pc->psocket); /* Assume we're going to UV unless sent elsewhere */
- if (pn_connection_driver_finished(&pc->driver)) {
- if (!uv_is_closing((uv_handle_t*)&pc->psocket.tcp)) {
- uv_close((uv_handle_t*)&pc->psocket.tcp, on_close_psocket);
- }
- return;
- }
- pn_millis_t next_tick = leader_tick(pc);
- pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
- pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver);
- if (pn_connection_driver_has_event(&pc->driver)) {
- to_worker(&pc->psocket); /* Ticks/buffer checks generated events */
- return;
- }
- if (next_tick &&
- pconnection_error(pc, uv_timer_start(&pc->timer, on_tick, next_tick, 0), "timer start")) {
- return;
- }
- if (wbuf.size > 0) {
- uv_buf_t buf = uv_buf_init((char*)wbuf.start, wbuf.size);
- if (pconnection_error(
- pc, uv_write(&pc->write, (uv_stream_t*)&pc->psocket.tcp, &buf, 1, on_write), "write"))
- return;
- pc->writing = wbuf.size;
- } else if (pn_connection_driver_write_closed(&pc->driver)) {
- pc->shutdown.data = &pc->psocket;
- if (pconnection_error(
- pc, uv_shutdown(&pc->shutdown, (uv_stream_t*)&pc->psocket.tcp, NULL), "shutdown write"))
- return;
- }
- if (rbuf.size > 0) {
- if (pconnection_error(
- pc, uv_read_start((uv_stream_t*)&pc->psocket.tcp, alloc_read_buffer, on_read), "read"))
- return;
- }
-}
-
-static void listener_to_uv(pn_listener_t *l) {
- to_uv(&l->psocket); /* Assume we're going to UV unless sent elsewhere */
- if (l->err) {
- if (!uv_is_closing((uv_handle_t*)&l->psocket.tcp)) {
- uv_close((uv_handle_t*)&l->psocket.tcp, on_close_psocket);
- }
- } else {
- if (l->accepting) {
- leader_accept(l);
- }
- if (l->connections) {
- listener_to_worker(l);
- }
- }
-}
-
-/* Monitor a psocket_t in the UV loop */
-static void psocket_to_uv(psocket_t *ps) {
- if (ps->is_conn) {
- pconnection_to_uv(as_pconnection(ps));
- } else {
- listener_to_uv(as_listener(ps));
- }
-}
-
-/* Detach a connection from IO and put it on the worker queue */
-static void pconnection_to_worker(pconnection_t *pc) {
- /* Can't go to worker if a write is outstanding or the batch is empty */
- if (!pc->writing && pn_connection_driver_has_event(&pc->driver)) {
- uv_read_stop((uv_stream_t*)&pc->psocket.tcp);
- uv_timer_stop(&pc->timer);
- }
- to_worker(&pc->psocket);
-}
-
-/* Can't really detach a listener, as on_connection can always be called.
- Generate events here safely.
-*/
-static void listener_to_worker(pn_listener_t *l) {
- if (pn_collector_peek(l->collector)) { /* Already have events */
- to_worker(&l->psocket);
- } else if (l->err) {
- if (l->err != UV_EOF) {
- pn_condition_format(l->condition, uv_err_name(l->err), "%s %s:%s: %s",
- l->what, fixstr(l->psocket.host), fixstr(l->psocket.port),
- uv_strerror(l->err));
- }
- l->err = 0;
- pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_CLOSE);
- to_worker(&l->psocket);
- } else if (l->connections) { /* Generate accept events one at a time */
- --l->connections;
- pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_ACCEPT);
- to_worker(&l->psocket);
- } else {
- listener_to_uv(l);
- }
-}
-
/* Set the event in the proactor's batch */
static pn_event_batch_t *proactor_batch_lh(pn_proactor_t *p, pn_event_type_t t) {
pn_collector_put(p->collector, pn_proactor__class(), p, t);
@@ -663,36 +582,27 @@ static pn_event_batch_t *proactor_batch_lh(pn_proactor_t *p, pn_event_type_t t)
return &p->batch;
}
-void leader_wake_connection(psocket_t *ps) {
- assert(ps->state == ON_LEADER);
- pconnection_t *pc = as_pconnection(ps);
- pn_connection_t *c = pc->driver.connection;
- pn_collector_put(pn_connection_collector(c), PN_OBJECT, c, PN_CONNECTION_WAKE);
- pconnection_to_worker(pc);
-}
-
static void on_stopping(uv_handle_t* h, void* v) {
/* Close all the TCP handles. on_close_psocket will close any other handles if needed */
- if (h->type == UV_TCP && !uv_is_closing(h)) {
- uv_close(h, on_close_psocket);
+ if (h->type == UV_TCP) {
+ uv_safe_close(h, on_close_psocket);
}
}
static pn_event_t *log_event(void* p, pn_event_t *e) {
if (e) {
- pn_logf("[%p]:(%s)\n", (void*)p, pn_event_type_name(pn_event_type(e)));
+ pn_logf("[%p]:(%s)", (void*)p, pn_event_type_name(pn_event_type(e)));
}
return e;
}
static pn_event_t *listener_batch_next(pn_event_batch_t *batch) {
pn_listener_t *l = batch_listener(batch);
- assert(l->psocket.state == ON_WORKER);
- pn_event_t *prev = pn_collector_prev(l->collector);
- if (prev && pn_event_type(prev) == PN_LISTENER_CLOSE) {
- l->err = UV_EOF;
- }
- return log_event(l, pn_collector_next(l->collector));
+ assert(l->psocket.working);
+ uv_mutex_lock(&l->lock);
+ pn_event_t *e = pn_collector_next(l->collector);
+ uv_mutex_unlock(&l->lock);
+ return log_event(l, e);
}
static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) {
@@ -702,25 +612,18 @@ static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) {
}
static void pn_listener_free(pn_listener_t *l) {
- /* No assert(l->psocket.state == ON_WORKER); can be called during shutdown */
+ /* No assert(l->psocket.working); can be called during shutdown */
if (l) {
if (l->collector) pn_collector_free(l->collector);
if (l->condition) pn_condition_free(l->condition);
if (l->attachments) pn_free(l->attachments);
+ assert(!l->accept.front);
free(l);
}
}
-void leader_listener_close(psocket_t *ps) {
- assert(ps->state = ON_LEADER);
- pn_listener_t *l = (pn_listener_t*)ps;
- l->err = UV_EOF;
- listener_to_uv(l);
-}
-
-/* Return the next event batch or 0 if no events are available in the worker_q */
+/* Return the next event batch or NULL if no events are available */
static pn_event_batch_t *get_batch_lh(pn_proactor_t *p) {
- /* FIXME aconway 2017-02-21: generate these in parallel? */
if (!p->batch_working) { /* Can generate proactor events */
if (p->inactive) {
p->inactive = false;
@@ -736,49 +639,99 @@ static pn_event_batch_t *get_batch_lh(pn_proactor_t *p) {
}
}
for (psocket_t *ps = pop_lh(&p->worker_q); ps; ps = pop_lh(&p->worker_q)) {
- assert(ps->state == ON_WORKER);
+ assert(ps->working);
if (ps->is_conn) {
return &as_pconnection(ps)->driver.batch;
} else { /* Listener */
return &as_listener(ps)->batch;
}
}
- return 0;
+ return NULL;
+}
+
+/* Process a pconnection, return true if it has work */
+static bool leader_process_pconnection(pconnection_t *pc) {
+ if (pc->psocket.tcp.data == NULL) {
+ leader_connect(pc);
+ }
+ pn_millis_t next_tick = leader_tick(pc);
+ pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
+ pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver);
+ if (pconnection_needs_work(pc)) {
+ return true; /* Don't wait on IO while there is pending work */
+ }
+ if (pn_connection_driver_finished(&pc->driver)) {
+ uv_safe_close((uv_handle_t*)&pc->psocket.tcp, on_close_psocket);
+ return false;
+ }
+ /* Issue async IO requests */
+ int err = 0;
+ const char *what = NULL;
+
+ if (!err && next_tick) {
+ what = "connection timer start";
+ err = uv_timer_start(&pc->timer, on_tick, next_tick, 0);
+ }
+ if (!err && !pc->writing) {
+ what = "write";
+ if (wbuf.size > 0) {
+ uv_buf_t buf = uv_buf_init((char*)wbuf.start, wbuf.size);
+ err = uv_write(&pc->write, (uv_stream_t*)&pc->psocket.tcp, &buf, 1, on_write);
+ if (!err) {
+ pc->writing = wbuf.size;
+ }
+ } else if (pn_connection_driver_write_closed(&pc->driver)) {
+ uv_shutdown(&pc->shutdown, (uv_stream_t*)&pc->psocket.tcp, NULL);
+ }
+ }
+ if (!err && rbuf.size > 0) {
+ what = "read";
+ err = uv_read_start((uv_stream_t*)&pc->psocket.tcp, alloc_read_buffer, on_read);
+ }
+ if (err) {
+ pconnection_detach(pc);
+ pconnection_error(pc, err, what);
+ return true;
+ }
+ return false;
}
/* Process the leader_q and the UV loop, in the leader thread */
static pn_event_batch_t *leader_lead_lh(pn_proactor_t *p, uv_run_mode mode) {
+ pn_event_batch_t *batch = NULL;
+ for (psocket_t *ps = pop_lh(&p->leader_q); ps; ps = pop_lh(&p->leader_q)) {
+ assert(!ps->working);
- if (p->timeout_request) {
- p->timeout_request = false;
- if (p->timeout) {
- uv_timer_start(&p->timer, on_timeout, p->timeout, 0);
- } else {
- uv_timer_stop(&p->timer);
+ uv_mutex_unlock(&p->lock); /* Unlock to process each item, may add more items to leader_q */
+ bool needs_work = ps->is_conn ?
+ leader_process_pconnection(as_pconnection(ps)) :
+ leader_process_listener(as_listener(ps));
+ uv_mutex_lock(&p->lock);
+
+ if (needs_work && !ps->working && ps->next == &UNLISTED) {
+ ps->working = true;
+ push_lh(&p->worker_q, ps);
}
}
- for (psocket_t *ps = pop_lh(&p->leader_q); ps; ps = pop_lh(&p->leader_q)) {
- if (ps->action) {
- uv_mutex_unlock(&p->lock);
- ps->action(ps);
- ps->action = NULL;
- uv_mutex_lock(&p->lock);
- } else if (ps->wakeup) {
- uv_mutex_unlock(&p->lock);
- ps->wakeup(ps);
- ps->wakeup = NULL;
- uv_mutex_lock(&p->lock);
+ batch = get_batch_lh(p); /* Check for work */
+ if (!batch) { /* No work, run the UV loop */
+ /* Set timeout timer before uv_run */
+ if (p->timeout_request) {
+ p->timeout_request = false;
+ uv_timer_stop(&p->timer);
+ if (p->timeout) {
+ uv_timer_start(&p->timer, on_timeout, p->timeout, 0);
+ }
}
- pn_event_batch_t *batch = get_batch_lh(p);
- if (batch) return batch;
+ uv_mutex_unlock(&p->lock); /* Unlock to run UV loop */
+ uv_run(&p->loop, mode);
+ uv_mutex_lock(&p->lock);
+ batch = get_batch_lh(p);
}
- uv_mutex_unlock(&p->lock);
- uv_run(&p->loop, mode);
- uv_mutex_lock(&p->lock);
- return get_batch_lh(p);
+ return batch;
}
-/* ==== public API ==== */
+/**** public API ****/
pn_event_batch_t *pn_proactor_get(struct pn_proactor_t* p) {
uv_mutex_lock(&p->lock);
@@ -788,7 +741,7 @@ pn_event_batch_t *pn_proactor_get(struct pn_proactor_t* p) {
p->has_leader = true;
batch = leader_lead_lh(p, UV_RUN_NOWAIT);
p->has_leader = false;
- uv_cond_signal(&p->cond); /* Notify next leader */
+ uv_cond_broadcast(&p->cond); /* Signal followers for possible work */
}
uv_mutex_unlock(&p->lock);
return batch;
@@ -802,44 +755,32 @@ pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) {
batch = get_batch_lh(p);
}
if (!batch) { /* Become leader */
- assert(!p->has_leader); /* Implied by loop condition */
p->has_leader = true;
do {
batch = leader_lead_lh(p, UV_RUN_ONCE);
} while (!batch);
p->has_leader = false;
- uv_cond_signal(&p->cond);
+ uv_cond_broadcast(&p->cond); /* Signal a followers. One takes over, many can work. */
}
uv_mutex_unlock(&p->lock);
return batch;
}
void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) {
- pconnection_t *pc = batch_pconnection(batch);
- if (pc) {
- assert(pc->psocket.state == ON_WORKER);
- if (pn_connection_driver_has_event(&pc->driver)) {
- /* Process all events before going back to leader */
- pconnection_to_worker(pc);
- } else {
- to_leader(&pc->psocket, psocket_to_uv);
- }
- return;
- }
- pn_listener_t *l = batch_listener(batch);
- if (l) {
- assert(l->psocket.state == ON_WORKER);
- to_leader(&l->psocket, psocket_to_uv);
- return;
+ uv_mutex_lock(&p->lock);
+ psocket_t *ps = batch_psocket(batch); /* FIXME aconway 2017-02-26: replace with switch? */
+ if (ps) {
+ assert(ps->working);
+ assert(ps->next == &UNLISTED);
+ ps->working = false;
+ push_lh(&p->leader_q, ps);
}
- pn_proactor_t *bp = batch_proactor(batch);
+ pn_proactor_t *bp = batch_proactor(batch); /* Proactor events */
if (bp == p) {
- uv_mutex_lock(&p->lock);
p->batch_working = false;
- notify(p);
- uv_mutex_unlock(&p->lock);
- return;
}
+ uv_mutex_unlock(&p->lock);
+ notify(p);
}
pn_listener_t *pn_event_listener(pn_event_t *e) {
@@ -864,16 +805,16 @@ pn_proactor_t *pn_event_proactor(pn_event_t *e) {
void pn_proactor_interrupt(pn_proactor_t *p) {
uv_mutex_lock(&p->lock);
++p->interrupt;
- notify(p);
uv_mutex_unlock(&p->lock);
+ notify(p);
}
void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) {
uv_mutex_lock(&p->lock);
p->timeout = t;
p->timeout_request = true;
- notify(p);
uv_mutex_unlock(&p->lock);
+ notify(p);
}
int pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *host, const char *port) {
@@ -881,28 +822,19 @@ int pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *host,
if (!pc) {
return PN_OUT_OF_MEMORY;
}
- to_leader(&pc->psocket, leader_connect);
+ psocket_start(&pc->psocket);
return 0;
}
int pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *host, const char *port, int backlog)
{
+ assert(!l->closed);
psocket_init(&l->psocket, p, false, host, port);
l->backlog = backlog;
- to_leader(&l->psocket, leader_listen);
+ psocket_start(&l->psocket);
return 0;
}
-pn_proactor_t *pn_connection_proactor(pn_connection_t* c) {
- pconnection_t *pc = get_pconnection(c);
- return pc ? pc->psocket.proactor : NULL;
-}
-
-void pn_connection_wake(pn_connection_t* c) {
- /* May be called from any thread */
- wakeup(&get_pconnection(c)->psocket, leader_wake_connection);
-}
-
pn_proactor_t *pn_proactor() {
pn_proactor_t *p = (pn_proactor_t*)calloc(1, sizeof(*p));
p->collector = pn_collector();
@@ -919,8 +851,8 @@ pn_proactor_t *pn_proactor() {
void pn_proactor_free(pn_proactor_t *p) {
uv_timer_stop(&p->timer);
- uv_close((uv_handle_t*)&p->timer, NULL);
- uv_close((uv_handle_t*)&p->async, NULL);
+ uv_safe_close((uv_handle_t*)&p->timer, NULL);
+ uv_safe_close((uv_handle_t*)&p->async, NULL);
uv_walk(&p->loop, on_stopping, NULL); /* Close all TCP handles */
while (uv_loop_alive(&p->loop)) {
uv_run(&p->loop, UV_RUN_ONCE); /* Run till all handles closed */
@@ -929,9 +861,28 @@ void pn_proactor_free(pn_proactor_t *p) {
uv_mutex_destroy(&p->lock);
uv_cond_destroy(&p->cond);
pn_collector_free(p->collector);
+ /* FIXME aconway 2017-02-25: restore */
+ /* assert(!p->worker_q.front); */
+ /* assert(!p->leader_q.front); */
free(p);
}
+pn_proactor_t *pn_connection_proactor(pn_connection_t* c) {
+ pconnection_t *pc = get_pconnection(c);
+ return pc ? pc->psocket.proactor : NULL;
+}
+
+void pn_connection_wake(pn_connection_t* c) {
+ /* May be called from any thread */
+ pconnection_t *pc = get_pconnection(c);
+ if (pc) {
+ uv_mutex_lock(&pc->lock);
+ pc->wake = true;
+ uv_mutex_unlock(&pc->lock);
+ psocket_notify(&pc->psocket);
+ }
+}
+
pn_listener_t *pn_listener(void) {
pn_listener_t *l = (pn_listener_t*)calloc(1, sizeof(pn_listener_t));
if (l) {
@@ -948,8 +899,14 @@ pn_listener_t *pn_listener(void) {
}
void pn_listener_close(pn_listener_t* l) {
- /* This can be called from any thread, not just the owner of l */
- wakeup(&l->psocket, leader_listener_close);
+ /* May be called from any thread */
+ uv_mutex_lock(&l->lock);
+ if (!l->closed) {
+ l->closed = true;
+ pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_CLOSE);
+ }
+ uv_mutex_unlock(&l->lock);
+ psocket_notify(&l->psocket);
}
pn_proactor_t *pn_listener_proactor(pn_listener_t* l) {
@@ -973,13 +930,15 @@ pn_record_t *pn_listener_attachments(pn_listener_t *l) {
}
int pn_listener_accept(pn_listener_t *l, pn_connection_t *c) {
- assert(l->psocket.state == ON_WORKER);
- if (l->accepting) {
- return PN_STATE_ERR; /* Only one at a time */
- }
- l->accepting = pconnection(l->psocket.proactor, c, true, l->psocket.host, l->psocket.port);
- if (!l->accepting) {
- return UV_ENOMEM;
+ assert(l->psocket.working);
+ assert(!l->closed);
+ uv_mutex_lock(&l->lock);
+ pconnection_t *pc = pconnection(l->psocket.proactor, c, true, l->psocket.host, l->psocket.port);
+ if (!pc) {
+ return PN_OUT_OF_MEMORY;
}
+ push_lh(&l->accept, &pc->psocket);
+ uv_mutex_unlock(&l->lock);
+ psocket_notify(&l->psocket);
return 0;
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2dae68d6/proton-c/src/tests/proactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/proactor.c b/proton-c/src/tests/proactor.c
index 6595a0b..a0ddcda 100644
--- a/proton-c/src/tests/proactor.c
+++ b/proton-c/src/tests/proactor.c
@@ -28,245 +28,259 @@
#include <stdlib.h>
#include <string.h>
-static pn_millis_t timeout = 5*1000; /* timeout for hanging tests */
+static pn_millis_t timeout = 7*1000; /* timeout for hanging tests */
static const char *localhost = "127.0.0.1"; /* host for connect/listen */
-struct test_events {
- pn_proactor_t *proactor;
- pn_event_batch_t *events;
-};
-
-/* Wait for the next single event, return its type */
-static pn_event_type_t wait_next(pn_proactor_t *proactor) {
- pn_event_batch_t *events = pn_proactor_wait(proactor);
- pn_event_type_t etype = pn_event_type(pn_event_batch_next(events));
- pn_proactor_done(proactor, events);
- return etype;
-}
-
-/* Get events until an event of `type` or a PN_TRANSPORT_CLOSED/PN_PROACTOR_TIMEOUT */
-static pn_event_type_t wait_for(pn_proactor_t *proactor, pn_event_type_t etype) {
- while (true) {
- pn_event_type_t t = wait_next(proactor);
- if (t == etype || t == PN_PROACTOR_TIMEOUT) {
- return t;
- }
- }
-}
-
-/* Test that interrupt and timeout events cause pn_proactor_wait() to return. */
-static void test_interrupt_timeout(test_t *t) {
- pn_proactor_t *p = pn_proactor();
- pn_proactor_interrupt(p);
- pn_event_type_t etype = wait_next(p);
- TEST_CHECK(t, PN_PROACTOR_INTERRUPT == etype, pn_event_type_name(etype));
- pn_proactor_set_timeout(p, 1); /* very short timeout */
- etype = wait_next(p);
- TEST_CHECK(t, PN_PROACTOR_TIMEOUT == etype, pn_event_type_name(etype));
- pn_proactor_free(p);
-}
-
-/* Test handler return value */
-typedef enum {
- H_CONTINUE, /**@<< handler wants more events */
- H_FINISHED, /**@<< handler completed without error */
- H_FAILED /**@<< handler hit an error and cannot continue */
-} handler_state_t;
-
-typedef handler_state_t (*test_handler_fn)(test_t *, pn_event_t*);
+typedef int (*test_handler_fn)(test_t *, pn_event_t*);
/* Proactor and handler that take part in a test */
typedef struct proactor_test_t {
- test_t *t;
test_handler_fn handler;
+ test_t *t;
pn_proactor_t *proactor;
- handler_state_t state; /* Result of last handler call */
} proactor_test_t;
/* Initialize an array of proactor_test_t */
-static void proactor_test_init(proactor_test_t *pts, size_t n) {
+static void proactor_test_init(proactor_test_t *pts, size_t n, test_t *t) {
for (proactor_test_t *pt = pts; pt < pts + n; ++pt) {
+ if (!pt->t) pt->t = t;
if (!pt->proactor) pt->proactor = pn_proactor();
pn_proactor_set_timeout(pt->proactor, timeout);
- pt->state = H_CONTINUE;
}
}
-/* Iterate over an array of proactors, draining or handling events with the non-blocking
- pn_proactor_get. Continue till all handlers return H_FINISHED (and return 0) or one
- returns H_FAILED (and return non-0)
-*/
+#define PROACTOR_TEST_INIT(A, T) proactor_test_init(A, sizeof(A)/sizeof(*A), (T))
+
+static void proactor_test_free(proactor_test_t *pts, size_t n) {
+ for (proactor_test_t *pt = pts; pt < pts + n; ++pt) {
+ pn_proactor_free(pt->proactor);
+ }
+}
+
+#define PROACTOR_TEST_FREE(A) proactor_test_free(A, sizeof(A)/sizeof(*A))
+
+/* Run an array of proactors till a handler returns non-0 */
static int proactor_test_run(proactor_test_t *pts, size_t n) {
- /* Make sure pts are initialized */
- proactor_test_init(pts, n);
- size_t finished = 0;
- do {
- finished = 0;
+ int ret = 0;
+ while (!ret) {
for (proactor_test_t *pt = pts; pt < pts + n; ++pt) {
pn_event_batch_t *events = pn_proactor_get(pt->proactor);
if (events) {
- pn_event_t *e;
- while ((e = pn_event_batch_next(events))) {
- if (pt->state == H_CONTINUE) {
- pt->state = pt->handler(pt->t, e);
- }
+ pn_event_t *e = pn_event_batch_next(events);
+ TEST_CHECKF(pts->t, e, "empty batch");
+ while (e && !ret) {
+ if (!(ret = pt->handler(pt->t, e)))
+ e = pn_event_batch_next(events);
}
pn_proactor_done(pt->proactor, events);
}
- switch (pt->state) {
- case H_CONTINUE: break;
- case H_FINISHED: ++finished; break;
- case H_FAILED: return 1;
- }
}
- } while (finished < n);
- return 0;
+ }
+ return ret;
}
+#define PROACTOR_TEST_RUN(A) proactor_test_run((A), sizeof(A)/sizeof(*A))
+
+/* Wait for the next single event, return its type */
+static pn_event_type_t wait_next(pn_proactor_t *proactor) {
+ pn_event_batch_t *events = pn_proactor_wait(proactor);
+ pn_event_type_t etype = pn_event_type(pn_event_batch_next(events));
+ pn_proactor_done(proactor, events);
+ return etype;
+}
+
+/* Test that interrupt and timeout events cause pn_proactor_wait() to return. */
+static void test_interrupt_timeout(test_t *t) {
+ pn_proactor_t *p = pn_proactor();
+ TEST_CHECK(t, pn_proactor_get(p) == NULL); /* idle */
+ pn_proactor_interrupt(p);
+ TEST_ETYPE_EQUAL(t, PN_PROACTOR_INTERRUPT, wait_next(p));
+ TEST_CHECK(t, pn_proactor_get(p) == NULL); /* idle */
+ pn_proactor_set_timeout(p, 1); /* very short timeout */
+ TEST_ETYPE_EQUAL(t, PN_PROACTOR_TIMEOUT, wait_next(p));
+ pn_proactor_free(p);
+}
-/* Handler for test_listen_connect, does both sides of the connection */
-static handler_state_t listen_connect_handler(test_t *t, pn_event_t *e) {
+/* Common handler for simple client/server interactions, */
+static int common_handler(test_t *t, pn_event_t *e) {
pn_connection_t *c = pn_event_connection(e);
pn_listener_t *l = pn_event_listener(e);
switch (pn_event_type(e)) {
- /* Act on these events */
- case PN_LISTENER_ACCEPT: {
- pn_connection_t *accepted = pn_connection();
- pn_connection_open(accepted);
- pn_listener_accept(l, accepted); /* Listener takes ownership of accepted */
- return H_CONTINUE;
- }
+
+ /* Stop on these events */
+ case PN_LISTENER_OPEN:
+ case PN_PROACTOR_TIMEOUT:
+ case PN_TRANSPORT_CLOSED:
+ case PN_PROACTOR_INACTIVE:
+ case PN_LISTENER_CLOSE:
+ return pn_event_type(e);
+
+ case PN_LISTENER_ACCEPT:
+ pn_listener_accept(l, pn_connection());
+ return 0;
case PN_CONNECTION_REMOTE_OPEN:
- if (pn_connection_state(c) | PN_LOCAL_ACTIVE) { /* Client is fully open - the test is done */
- pn_connection_close(c);
- } else { /* Server returns the open */
- pn_connection_open(c);
- }
- return H_CONTINUE;
+ pn_connection_open(c); /* Return the open (no-op if already open) */
+ return 0;
case PN_CONNECTION_REMOTE_CLOSE:
- if (pn_connection_state(c) | PN_LOCAL_ACTIVE) {
- pn_connection_close(c); /* Return the close */
- }
- return H_CONTINUE;
-
- case PN_TRANSPORT_CLOSED:
- return H_FINISHED;
+ pn_connection_close(c); /* Return the close */
+ return 0;
+
+ /* Ignored these events */
+ case PN_CONNECTION_INIT:
+ case PN_CONNECTION_BOUND:
+ case PN_CONNECTION_LOCAL_OPEN:
+ case PN_CONNECTION_LOCAL_CLOSE:
+ case PN_TRANSPORT:
+ case PN_TRANSPORT_ERROR:
+ case PN_TRANSPORT_HEAD_CLOSED:
+ case PN_TRANSPORT_TAIL_CLOSED:
+ return 0;
default:
- return H_CONTINUE;
- break;
+ TEST_ERRORF(t, "unexpected event %s", pn_event_type_name(pn_event_type(e)));
+ return 0; /* Fail the test but keep going */
}
}
-/* Test bad-address error handling for listen and connect */
-static void test_early_error(test_t *t) {
- pn_proactor_t *p = pn_proactor();
- pn_proactor_set_timeout(p, timeout); /* In case of hang */
- pn_connection_t *c = pn_connection();
- pn_proactor_connect(p, c, localhost, "1"); /* Bad port */
- pn_event_type_t etype = wait_for(p, PN_TRANSPORT_CLOSED);
- TEST_CHECK(t, PN_TRANSPORT_CLOSED == etype, pn_event_type_name(etype));
- TEST_CHECK(t, pn_condition_is_set(pn_transport_condition(pn_connection_transport(c))), "");
-
- pn_listener_t *l = pn_listener();
- pn_proactor_listen(p, l, localhost, "1", 1); /* Bad port */
- etype = wait_for(p, PN_LISTENER_CLOSE);
- TEST_CHECK(t, PN_LISTENER_CLOSE == etype, pn_event_type_name(etype));
- TEST_CHECK(t, pn_condition_is_set(pn_listener_condition(l)), "");
-
- pn_proactor_free(p);
+/* close a connection when it is remote open */
+static int open_close_handler(test_t *t, pn_event_t *e) {
+ switch (pn_event_type(e)) {
+ case PN_CONNECTION_REMOTE_OPEN:
+ pn_connection_close(pn_event_connection(e));
+ return 0; /* common_handler will finish on TRANSPORT_CLOSED */
+ default:
+ return common_handler(t, e);
+ }
}
-/* Simplest client/server interaction with 2 proactors */
-static void test_listen_connect(test_t *t) {
- proactor_test_t pts[] = { { t, listen_connect_handler }, { t, listen_connect_handler } };
- proactor_test_init(pts, 2);
+/* Simple client/server connection with 2 proactors */
+static void test_client_server(test_t *t) {
+ proactor_test_t pts[] ={ { open_close_handler }, { common_handler } };
+ PROACTOR_TEST_INIT(pts, t);
pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor;
- test_port_t port = test_port(); /* Hold a port */
-
+ test_port_t port = test_port();
pn_proactor_listen(server, pn_listener(), localhost, port.str, 4);
- pn_event_type_t etype = wait_for(server, PN_LISTENER_OPEN);
- if (TEST_CHECK(t, PN_LISTENER_OPEN == etype, pn_event_type_name(etype))) {
- sock_close(port.sock);
- pn_proactor_connect(client, pn_connection(), localhost, port.str);
- proactor_test_run(pts, 2);
- }
- pn_proactor_free(client);
- pn_proactor_free(server);
+ TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
+ pn_proactor_connect(client, pn_connection(), localhost, port.str);
+ TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
+ sock_close(port.sock);
+ PROACTOR_TEST_FREE(pts);
}
-static handler_state_t connection_wakeup_handler(test_t *t, pn_event_t *e) {
- pn_connection_t *c = pn_event_connection(e);
+/* Return on connection open, close and return on wake */
+static int open_wake_handler(test_t *t, pn_event_t *e) {
switch (pn_event_type(e)) {
-
case PN_CONNECTION_REMOTE_OPEN:
- if (pn_connection_state(c) | PN_LOCAL_UNINIT) {
- pn_connection_open(c); /* Server returns the open */
- }
- return H_FINISHED; /* Finish when open at both ends */
-
+ return pn_event_type(e);
+ case PN_CONNECTION_WAKE:
+ pn_connection_close(pn_event_connection(e));
+ return pn_event_type(e);
default:
- /* Otherwise same as listen_connect_handler */
- return listen_connect_handler(t, e);
+ return common_handler(t, e);
}
}
/* Test waking up a connection that is idle */
-static void test_connection_wakeup(test_t *t) {
- proactor_test_t pts[] = { { t, connection_wakeup_handler }, { t, connection_wakeup_handler } };
- proactor_test_init(pts, 2);
+static void test_connection_wake(test_t *t) {
+ proactor_test_t pts[] = { { open_wake_handler }, { common_handler } };
+ PROACTOR_TEST_INIT(pts, t);
pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor;
test_port_t port = test_port(); /* Hold a port */
pn_proactor_listen(server, pn_listener(), localhost, port.str, 4);
- pn_event_type_t etype = wait_for(server, PN_LISTENER_OPEN);
- if (TEST_CHECK(t, PN_LISTENER_OPEN == etype, pn_event_type_name(etype))) {
- sock_close(port.sock);
- pn_connection_t *c = pn_connection();
- pn_proactor_connect(client, c, localhost, port.str);
- proactor_test_run(pts, 2); /* Will finish when client is connected */
- TEST_CHECK(t, NULL == pn_proactor_get(client), ""); /* Should be idle */
- pn_connection_wake(c);
- etype = wait_next(client);
- /* FIXME aconway 2017-02-21: TEST_EVENT_TYPE */
- TEST_CHECK(t, PN_CONNECTION_WAKE == etype, pn_event_type_name(etype));
- }
- pn_proactor_free(client);
- pn_proactor_free(server);
+ TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
+ pn_connection_t *c = pn_connection();
+ pn_proactor_connect(client, c, localhost, port.str);
+ TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, PROACTOR_TEST_RUN(pts));
+ TEST_CHECK(t, pn_proactor_get(client) == NULL); /* Should be idle */
+ pn_connection_wake(c);
+ TEST_ETYPE_EQUAL(t, PN_CONNECTION_WAKE, PROACTOR_TEST_RUN(pts));
+ TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
+ sock_close(port.sock);
+ PROACTOR_TEST_FREE(pts);
}
/* Test that INACTIVE event is generated when last connections/listeners closes. */
static void test_inactive(test_t *t) {
- proactor_test_t pts[] = { { t, listen_connect_handler }, { t, listen_connect_handler }};
- proactor_test_init(pts, 2);
+ proactor_test_t pts[] = { { open_wake_handler }, { common_handler } };
+ PROACTOR_TEST_INIT(pts, t);
pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor;
test_port_t port = test_port(); /* Hold a port */
pn_listener_t *l = pn_listener();
pn_proactor_listen(server, l, localhost, port.str, 4);
- pn_event_type_t etype = wait_for(server, PN_LISTENER_OPEN);
- if (TEST_CHECK(t, PN_LISTENER_OPEN == etype, pn_event_type_name(etype))) {
- sock_close(port.sock);
- pn_proactor_connect(client, pn_connection(), localhost, port.str);
- proactor_test_run(pts, 2);
- etype = wait_for(client, PN_PROACTOR_INACTIVE);
- pn_listener_close(l);
- etype = wait_for(server, PN_PROACTOR_INACTIVE);
- }
- pn_proactor_free(client);
- pn_proactor_free(server);
+ TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
+ pn_connection_t *c = pn_connection();
+ pn_proactor_connect(client, c, localhost, port.str);
+ TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, PROACTOR_TEST_RUN(pts));
+ pn_connection_wake(c);
+ TEST_ETYPE_EQUAL(t, PN_CONNECTION_WAKE, PROACTOR_TEST_RUN(pts));
+ /* expect TRANSPORT_CLOSED from client and server, INACTIVE from client */
+ TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
+ TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
+ TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
+ /* server won't be INACTIVE until listener is closed */
+ TEST_CHECK(t, pn_proactor_get(server) == NULL);
+ pn_listener_close(l);
+ TEST_ETYPE_EQUAL(t, PN_LISTENER_CLOSE, PROACTOR_TEST_RUN(pts));
+ TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
+
+ sock_close(port.sock);
+ PROACTOR_TEST_FREE(pts);
+}
+
+#define TEST_CHECK_ERROR(T, WANT, COND) do { \
+ TEST_CHECKF((T), pn_condition_is_set(COND), "expecting error"); \
+ const char* description = pn_condition_get_description(COND); \
+ if (!strstr(description, (WANT))) { \
+ TEST_ERRORF((T), "bad error, expected '%s' in '%s'", (WANT), description); \
+ } \
+ } while(0)
+
+/* Tests for error handling */
+static void test_errors(test_t *t) {
+ proactor_test_t pts[] = { { open_wake_handler }, { common_handler } };
+ PROACTOR_TEST_INIT(pts, t);
+ pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor;
+ test_port_t port = test_port(); /* Hold a port */
+
+ /* Invalid connect/listen parameters */
+ pn_connection_t *c = pn_connection();
+ pn_proactor_connect(client, c, localhost, "xxx");
+ TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
+ TEST_CHECK_ERROR(t, "xxx", pn_transport_condition(pn_connection_transport(c)));
+ TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
+
+ pn_listener_t *l = pn_listener();
+ pn_proactor_listen(server, l, localhost, "xxx", 1);
+ TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
+ TEST_ETYPE_EQUAL(t, PN_LISTENER_CLOSE, PROACTOR_TEST_RUN(pts));
+ TEST_CHECK_ERROR(t, "xxx", pn_listener_condition(l));
+ TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
+
+ /* Connect with no listener */
+ c = pn_connection();
+ pn_proactor_connect(client, c, localhost, port.str);
+ TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
+ TEST_CHECK(t, pn_condition_is_set(pn_transport_condition(pn_connection_transport(c))));
+ TEST_CHECK_ERROR(t, "connection refused", pn_transport_condition(pn_connection_transport(c)));
+ TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
+
+ sock_close(port.sock);
+ PROACTOR_TEST_FREE(pts);
}
+
int main(int argc, char **argv) {
int failed = 0;
RUN_ARGV_TEST(failed, t, test_inactive(&t));
RUN_ARGV_TEST(failed, t, test_interrupt_timeout(&t));
- RUN_ARGV_TEST(failed, t, test_early_error(&t));
- RUN_ARGV_TEST(failed, t, test_listen_connect(&t));
- RUN_ARGV_TEST(failed, t, test_connection_wakeup(&t));
+ RUN_ARGV_TEST(failed, t, test_errors(&t));
+ RUN_ARGV_TEST(failed, t, test_client_server(&t));
+ RUN_ARGV_TEST(failed, t, test_connection_wake(&t));
return failed;
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2dae68d6/proton-c/src/tests/test_tools.h
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/test_tools.h b/proton-c/src/tests/test_tools.h
index 7006334..97dac3f 100644
--- a/proton-c/src/tests/test_tools.h
+++ b/proton-c/src/tests/test_tools.h
@@ -21,6 +21,7 @@
*/
#include <proton/type_compat.h>
+#include <proton/event.h>
#include <errno.h>
#include <stdarg.h>
@@ -28,24 +29,46 @@
#include <stdlib.h>
#include <string.h>
-/*
- All output from test marcros goes to stdout not stderr, error messages are normal for a test.
- Some errno handling functions are thread-unsafe
- */
-
+/* A struct to collect the results of a test, created by RUN_TEST macro. */
+typedef struct test_t {
+ const char* name;
+ int errors;
+ uintptr_t data; /* Test can store some non-error data here */
+} test_t;
-/* Call via TEST_ASSERT macros */
-static void assert_fail_(const char* cond, const char* file, int line, const char *fmt, ...) {
- printf("%s:%d: Assertion failed: %s", file, line, cond);
+/* Internal, use macros. Print error message and increase the t->errors count.
+ All output from test marcros goes to stdout not stderr, error messages are normal for a test.
+*/
+static void test_vlogf_(test_t *t, const char *prefix, const char* expr,
+ const char* file, int line, const char *fmt, va_list ap)
+{
+ printf("%s:%d", file, line);
+ if (prefix && *prefix) printf(": %s", prefix);
+ if (expr && *expr) printf(": %s", expr);
if (fmt && *fmt) {
- va_list ap;
- va_start(ap, fmt);
- printf(" - ");
+ printf(": ");
vprintf(fmt, ap);
- printf("\n");
- fflush(stdout);
- va_end(ap);
}
+ if (t) printf(" [%s]", t->name);
+ printf("\n");
+ fflush(stdout);
+}
+
+static void test_errorf_(test_t *t, const char *prefix, const char* expr,
+ const char* file, int line, const char *fmt, ...) {
+ va_list ap;
+ va_start(ap, fmt);
+ ++t->errors;
+ test_vlogf_(t, prefix, expr, file, line, fmt, ap);
+ va_end(ap);
+}
+
+/* Call via TEST_ASSERT macros */
+static void assert_fail_(const char* expr, const char* file, int line, const char *fmt, ...) {
+ va_list ap;
+ va_start(ap, fmt);
+ test_vlogf_(NULL, "assertion failed", expr, file, line, fmt, ap);
+ va_end(ap);
abort();
}
@@ -63,32 +86,42 @@ static void assert_fail_(const char* cond, const char* file, int line, const cha
TEST_ASSERTF((expr), "%s", strerror(err))
-/* A struct to collect the results of a test.
- * Declare and initialize with TEST_START(t) where t will be declared as a test_t
- */
-typedef struct test_t {
- const char* name;
- int errors;
-} test_t;
-
-/* if !expr print the printf-style error and increment t->errors. Use via macros. Returns expr. */
+/* Internal, use macros */
static inline bool test_check_(test_t *t, bool expr, const char *sexpr, const char *file, int line, const char* fmt, ...) {
if (!expr) {
+ ++t->errors;
va_list ap;
va_start(ap, fmt);
- printf("%s:%d:[%s] check failed: (%s)", file, line, t->name, sexpr);
- if (fmt && *fmt) {
- printf(" - ");
- vprintf(fmt, ap);
- }
- printf("\n");
- fflush(stderr);
- ++t->errors;
+ test_vlogf_(t, "check failed", sexpr, file, line, fmt, ap);
+ va_end(ap);
}
return expr;
}
-#define TEST_CHECK(TEST, EXPR, ...) test_check_((TEST), (EXPR), #EXPR, __FILE__, __LINE__, __VA_ARGS__)
+/* Print a message but don't mark the test as having an error */
+#define TEST_LOGF(TEST, ...) \
+ test_logf_((TEST), "info", NULL, __FILE__, __LINE__, __VA_ARGS__)
+
+/* Print an error with printf-style message, increment TEST->errors */
+#define TEST_ERRORF(TEST, ...) \
+ test_errorf_((TEST), "error", NULL, __FILE__, __LINE__, __VA_ARGS__)
+
+/* If EXPR is false, print and record an error for t */
+#define TEST_CHECKF(TEST, EXPR, ...) \
+ test_check_((TEST), (EXPR), #EXPR, __FILE__, __LINE__, __VA_ARGS__)
+
+/* If EXPR is false, print and record an error for t including EXPR */
+#define TEST_CHECK(TEST, EXPR) \
+ test_check_((TEST), (EXPR), #EXPR, __FILE__, __LINE__, "")
+
+static inline bool test_etype_equal_(test_t *t, int want, int got, const char *file, int line) {
+ return test_check_(t, want == got, NULL, file, line, "want %s got %s",
+ pn_event_type_name((pn_event_type_t)want),
+ pn_event_type_name((pn_event_type_t)got));
+}
+
+#define TEST_ETYPE_EQUAL(TEST, WANT, GOT) \
+ test_etype_equal_((TEST), (WANT), (GOT), __FILE__, __LINE__)
/* T is name of a test_t variable, EXPR is the test expression (which should update T)
FAILED is incremented if the test has errors
@@ -166,12 +199,14 @@ static int sock_port(sock_t sock) {
return ntohs(port);
}
+/* Combines includes a sock_t with the int and char* versions of the port for convenience */
typedef struct test_port_t {
sock_t sock;
int port;
char str[256];
} test_port_t;
+/* Create a test_port_t */
static inline test_port_t test_port(void) {
test_port_t tp = {0};
tp.sock = sock_bind0();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org