You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/02/16 23:26:27 UTC
[4/6] qpid-proton git commit: PROTON-1403: C proactor tests,
fixes & additions
PROTON-1403: C proactor tests, fixes & additions
proactor API additions:
- PN_LISTENER_OPEN: event when listener is listening and connects will succeed.
- pn_proactor_grab(): non-blocking version of pn_proactor_wait(), used in tests
src/tests/test_tools.h - simple C test framework
src/tests/proactor.c - initial tests for basic proactor functionality
src/proactor/libuv.c
- fixed some assertion bugs and memory leaks
- renaming and simplifying the code
examples/broker.c: exit with non-0 if broker stops because of an error
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/b987a6a7
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/b987a6a7
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/b987a6a7
Branch: refs/heads/master
Commit: b987a6a70f30dc593bbea6c98ebae36ec77e4b7d
Parents: 9bd99eb
Author: Alan Conway <ac...@redhat.com>
Authored: Fri Jan 13 16:41:43 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Feb 16 17:58:20 2017 -0500
----------------------------------------------------------------------
CMakeLists.txt | 2 +-
examples/c/proactor/CMakeLists.txt | 6 +-
examples/c/proactor/broker.c | 6 +-
examples/c/proactor/direct.c | 5 +-
examples/c/proactor/send.c | 2 +-
proton-c/CMakeLists.txt | 6 -
.../cpp/include/proton/io/connection_driver.hpp | 1 +
proton-c/include/proton/event.h | 17 +-
proton-c/include/proton/listener.h | 2 +-
proton-c/include/proton/proactor.h | 34 +-
proton-c/src/core/connection_driver.c | 4 +-
proton-c/src/core/event.c | 2 +
proton-c/src/proactor/libuv.c | 518 ++++++++++---------
proton-c/src/tests/CMakeLists.txt | 9 +-
proton-c/src/tests/proactor.c | 217 ++++++++
proton-c/src/tests/test_tools.h | 140 +++++
16 files changed, 698 insertions(+), 273 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index b538ffd..294fd03 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -71,7 +71,7 @@ if (CMAKE_BUILD_TYPE MATCHES "Coverage")
make_directory(coverage_results)
add_custom_target(coverage
WORKING_DIRECTORY ./coverage_results
- COMMAND ${CMAKE_SOURCE_DIR}/bin/record-coverage.sh ${CMAKE_SOURCE_DIR} ${CMAKE_BINARY_DIR})
+ CgOMMAND ${CMAKE_SOURCE_DIR}/bin/record-coverage.sh ${CMAKE_SOURCE_DIR} ${CMAKE_BINARY_DIR})
endif()
if (${CMAKE_SYSTEM_NAME} MATCHES "Darwin")
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/examples/c/proactor/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/c/proactor/CMakeLists.txt b/examples/c/proactor/CMakeLists.txt
index 4189cf5..153f35f 100644
--- a/examples/c/proactor/CMakeLists.txt
+++ b/examples/c/proactor/CMakeLists.txt
@@ -29,7 +29,7 @@ set(CMAKE_REQUIRED_LIBRARIES ${CMAKE_REQUIRED_LIBRARIES} ${Proton_LIBRARIES})
check_function_exists(pn_proactor HAS_PROACTOR)
cmake_pop_check_state()
-if (HAS_PROACTOR)
+if(HAS_PROACTOR)
add_definitions(${COMPILE_WARNING_FLAGS} ${WERROR} ${COMPILE_PLATFORM_FLAGS} ${LINK_TIME_OPTIMIZATION})
@@ -48,6 +48,6 @@ foreach(name broker send receive direct)
endforeach()
set(run_env ${PYTHON_EXECUTABLE} ${CMAKE_SOURCE_DIR}/proton-c/env.py ${EXAMPLE_ENV} "PATH=${test_path}" ${VALGRIND_ENV})
-add_test(c-proactor ${run_env} -- ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/test.py -v)
+add_test(c-example-proactor ${run_env} -- ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/test.py -v)
-endif()
+endif(HAS_PROACTOR)
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/examples/c/proactor/broker.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/broker.c b/examples/c/proactor/broker.c
index ebf4068..2a338e1 100644
--- a/examples/c/proactor/broker.c
+++ b/examples/c/proactor/broker.c
@@ -22,6 +22,7 @@
#include <proton/connection_driver.h>
#include <proton/proactor.h>
#include <proton/engine.h>
+#include <proton/listener.h>
#include <proton/sasl.h>
#include <proton/transport.h>
#include <proton/url.h>
@@ -288,8 +289,11 @@ static void session_unsub(broker_t *b, pn_session_t *ssn) {
}
}
+static int exit_code = 0;
+
static void check_condition(pn_event_t *e, pn_condition_t *cond) {
if (pn_condition_is_set(cond)) {
+ exit_code = 1;
const char *ename = e ? pn_event_type_name(pn_event_type(e)) : "UNKNOWN";
fprintf(stderr, "%s: %s: %s\n", ename,
pn_condition_get_name(cond), pn_condition_get_description(cond));
@@ -483,5 +487,5 @@ int main(int argc, char **argv) {
}
pn_proactor_free(b.proactor);
free(threads);
- return 0;
+ return exit_code;
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/examples/c/proactor/direct.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/direct.c b/examples/c/proactor/direct.c
index 26f1b33..3d0a7d1 100644
--- a/examples/c/proactor/direct.c
+++ b/examples/c/proactor/direct.c
@@ -22,9 +22,10 @@
#include <proton/connection.h>
#include <proton/connection_driver.h>
#include <proton/delivery.h>
-#include <proton/proactor.h>
#include <proton/link.h>
+#include <proton/listener.h>
#include <proton/message.h>
+#include <proton/proactor.h>
#include <proton/sasl.h>
#include <proton/session.h>
#include <proton/transport.h>
@@ -59,7 +60,7 @@ typedef struct app_data_t {
static const int BATCH = 1000; /* Batch size for unlimited receive */
-int exit_code = 0;
+static int exit_code = 0;
static void check_condition(pn_event_t *e, pn_condition_t *cond) {
if (pn_condition_is_set(cond)) {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/examples/c/proactor/send.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/send.c b/examples/c/proactor/send.c
index 48fcecd..bba5d3e 100644
--- a/examples/c/proactor/send.c
+++ b/examples/c/proactor/send.c
@@ -50,7 +50,7 @@ typedef struct app_data_t {
bool finished;
} app_data_t;
-int exit_code = 0;
+static int exit_code = 0;
static void check_condition(pn_event_t *e, pn_condition_t *cond) {
if (pn_condition_is_set(cond)) {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/proton-c/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/CMakeLists.txt b/proton-c/CMakeLists.txt
index 30a77e2..0731b67 100644
--- a/proton-c/CMakeLists.txt
+++ b/proton-c/CMakeLists.txt
@@ -278,12 +278,6 @@ if (CMAKE_C_COMPILER_ID MATCHES "Clang")
if (ENABLE_WARNING_ERROR)
set (COMPILE_WARNING_FLAGS "-Werror ${COMPILE_WARNING_FLAGS}")
endif (ENABLE_WARNING_ERROR)
-endif()
-
-if (CMAKE_CXX_COMPILER_ID MATCHES "Clang")
- if (ENABLE_WARNING_ERROR)
- set (WERROR "-Werror")
- endif (ENABLE_WARNING_ERROR)
# TODO aconway 2016-01-06: we should be able to clean up the code and turn on
# some of these warnings.
set (CXX_WARNING_FLAGS "${COMPILE_WARNING_FLAGS} -Wno-c++98-compat -Wno-c++98-compat-pedantic -Wno-float-equal -Wno-padded -Wno-sign-conversion -Wno-switch-enum -Wno-weak-vtables -Wno-exit-time-destructors -Wno-global-constructors -Wno-shorten-64-to-32 -Wno-documentation -Wno-documentation-unknown-command -Wno-old-style-cast -Wno-missing-noreturn")
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp b/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp
index 4a0efe9..759b1fc 100644
--- a/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp
+++ b/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp
@@ -113,6 +113,7 @@ PN_CPP_CLASS_EXTERN connection_driver {
///
PN_CPP_EXTERN connection_driver(proton::container&);
#if PN_CPP_HAS_RVALUE_REFERENCES
+ /// @copydoc connection_driver()
PN_CPP_EXTERN connection_driver(proton::container&, event_loop&& loop);
#endif
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/proton-c/include/proton/event.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/event.h b/proton-c/include/proton/event.h
index 4a88368..3cfcc82 100644
--- a/proton-c/include/proton/event.h
+++ b/proton-c/include/proton/event.h
@@ -321,7 +321,8 @@ typedef enum {
PN_CONNECTION_WAKE,
/**
- * Indicates the listener is ready to call pn_listener_accept()
+ * Indicates the listener has an incoming connection, call pn_listener_accept()
+ * to accept it.
* Events of this type point to the @ref pn_listener_t.
*/
PN_LISTENER_ACCEPT,
@@ -350,7 +351,13 @@ typedef enum {
*
* Events of this type point to the @ref pn_proactor_t.
*/
- PN_PROACTOR_INACTIVE
+ PN_PROACTOR_INACTIVE,
+
+ /**
+ * Indicates the listener is listeneing.
+ * Events of this type point to the @ref pn_listener_t.
+ */
+ PN_LISTENER_OPEN
} pn_event_type_t;
@@ -537,9 +544,9 @@ PN_EXTERN pn_transport_t *pn_event_transport(pn_event_t *event);
PN_EXTERN pn_record_t *pn_event_attachments(pn_event_t *event);
/**
- * **Experimental** - A batch of events to handle. Call
- * pn_event_batch_next() in a loop until it returns NULL to handle
- * them.
+ * **Experimental** - A batch of events that must be handled in sequence.
+ * Call pn_event_batch_next() in a loop until it returns NULL to extract
+ * the events.
*/
typedef struct pn_event_batch_t pn_event_batch_t;
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/proton-c/include/proton/listener.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/listener.h b/proton-c/include/proton/listener.h
index 18feca7..2038c06 100644
--- a/proton-c/include/proton/listener.h
+++ b/proton-c/include/proton/listener.h
@@ -20,7 +20,7 @@
* under the License.
*/
-#include <proton/condition.h>
+#include <proton/import_export.h>
#include <proton/types.h>
#ifdef __cplusplus
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/proton-c/include/proton/proactor.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/proactor.h b/proton-c/include/proton/proactor.h
index 71a7dda..af3acbc 100644
--- a/proton-c/include/proton/proactor.h
+++ b/proton-c/include/proton/proactor.h
@@ -20,10 +20,9 @@
* under the License.
*/
-#include <proton/types.h>
-#include <proton/import_export.h>
-#include <proton/listener.h>
#include <proton/event.h>
+#include <proton/import_export.h>
+#include <proton/types.h>
#ifdef __cplusplus
extern "C" {
@@ -95,25 +94,34 @@ PNP_EXTERN int pn_proactor_listen(pn_proactor_t *proactor, pn_listener_t *listen
const char *host, const char *port, int backlog);
/**
- * Wait for events to handle.
+ * Wait until there is at least one event to handle.
+ * Always returns a non-empty batch of events.
*
- * Handle events in the returned batch by calling
- * pn_event_batch_next() until it returns NULL. You must call
- * pn_proactor_done() when you are finished with the batch.
+ * You must call pn_proactor_done() when you are finished with the batch, you
+ * must not use the batch pointer after calling pn_proactor_done().
*
- * If you call pn_proactor_done() before finishing the batch, the
- * remaining events will be returned again by another call
- * pn_proactor_wait(). This is less efficient, but allows you to
- * handle part of a batch and then hand off the rest to another
- * thread.
+ * Normally it is most efficient to handle the entire batch in one thread, but
+ * you can call pn_proactor_done() on an unfinished the batch. The remaining
+ * events will be returned by another call to pn_proactor_done(), possibly in a
+ * different thread.
+ *
+ * @note You can generate events to force threads to wake up from
+ * pn_proactor_wait() using pn_proactor_interrupt(), pn_proactor_set_timeout()
+ * and pn_connection_wake()
*
* @note Thread-safe: can be called concurrently. Events in a single
* batch must be handled in sequence, but batches returned by separate
- * calls to pn_proactor_wait() can be handled concurrently.
+ * calls can be handled concurrently.
*/
PNP_EXTERN pn_event_batch_t *pn_proactor_wait(pn_proactor_t *proactor);
/**
+ * Return a batch of events if one is available immediately, otherwise return NULL. If it
+ * does return an event batch, the rules are the same as for pn_proactor_wait()
+ */
+PNP_EXTERN pn_event_batch_t *pn_proactor_grab(pn_proactor_t *proactor);
+
+/**
* Call when done handling a batch of events.
*
* Must be called exactly once to match each call to
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/proton-c/src/core/connection_driver.c
----------------------------------------------------------------------
diff --git a/proton-c/src/core/connection_driver.c b/proton-c/src/core/connection_driver.c
index 3393e64..0d1db21 100644
--- a/proton-c/src/core/connection_driver.c
+++ b/proton-c/src/core/connection_driver.c
@@ -127,7 +127,9 @@ pn_event_t* pn_connection_driver_next_event(pn_connection_driver_t *d) {
}
bool pn_connection_driver_has_event(pn_connection_driver_t *d) {
- return pn_collector_peek(pn_connection_collector(d->connection));
+ /* FIXME aconway 2017-02-15: this is ugly */
+ pn_collector_t *c = pn_connection_collector(d->connection);
+ return pn_collector_more(c) || (pn_collector_peek(c) && pn_collector_peek(c) != pn_collector_prev(c));
}
bool pn_connection_driver_finished(pn_connection_driver_t *d) {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/proton-c/src/core/event.c
----------------------------------------------------------------------
diff --git a/proton-c/src/core/event.c b/proton-c/src/core/event.c
index 41ff6d1..e213c8f 100644
--- a/proton-c/src/core/event.c
+++ b/proton-c/src/core/event.c
@@ -395,6 +395,8 @@ const char *pn_event_type_name(pn_event_type_t type)
return "PN_PROACTOR_TIMEOUT";
case PN_PROACTOR_INACTIVE:
return "PN_PROACTOR_INACTIVE";
+ case PN_LISTENER_OPEN:
+ return "PN_LISTENER_OPEN";
default:
return "PN_UNKNOWN";
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/proton-c/src/proactor/libuv.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c
index d17136a..0ccfcda 100644
--- a/proton-c/src/proactor/libuv.c
+++ b/proton-c/src/proactor/libuv.c
@@ -24,6 +24,7 @@
#include <proton/condition.h>
#include <proton/connection_driver.h>
#include <proton/engine.h>
+#include <proton/listener.h>
#include <proton/message.h>
#include <proton/object.h>
#include <proton/proactor.h>
@@ -40,37 +41,34 @@
#include <string.h>
/*
- libuv loop functions are thread unsafe. The only exception is uv_async_send()
- which is a thread safe "wakeup" that can wake the uv_loop from another thread.
+ libuv functions are thread unsafe. The exception is uv_async_send(), a thread safe
+ "wakeup" that can wake the uv_loop from another thread.
- To provide concurrency the proactor uses a "leader-worker-follower" model,
- threads take turns at the roles:
+ To provide concurrency proactor uses a "leader-worker-follower" model, threads take
+ turns at the roles:
- - a single "leader" calls libuv functions and runs the uv_loop in short bursts
- to generate work. When there is work available it gives up leadership and
- becomes a "worker"
+ - a single "leader" thread uses libuv, it runs the uv_loop the in short bursts to
+ generate work. Once there is work it becomes becomes a "worker" thread, another thread
+ takes over as leader.
- - "workers" handle events concurrently for distinct connections/listeners
- They do as much work as they can get, when none is left they become "followers"
+ - "workers" handle events for separate connections or listeners concurrently. They do as
+ much work as they can, when none is left they become "followers"
- - "followers" wait for the leader to generate work and become workers.
- When the leader itself becomes a worker, one of the followers takes over.
+ - "followers" wait for the leader to generate work. One follower becomes the new leader,
+ the others become workers or continue to follow till they can get work.
- This model is symmetric: any thread can take on any role based on run-time
- requirements. It also allows the IO and non-IO work associated with an IO
- wake-up to be processed in a single thread with no context switches.
+ Any thread in a pool can take on any role necessary at run-time. All the work generated
+ by an IO wake-up for a single connection can be processed in a single single worker
+ thread to minimize context switching.
Function naming:
- - on_* - called in leader thread by uv_run().
+ - on_* - called in leader thread via uv_run().
- leader_* - called in leader thread (either leader_q processing or from an on_ function)
- - worker_* - called in worker thread
- *_lh - called with the relevant lock held
LIFECYCLE: pconnection_t and pn_listener_t objects must not be deleted until all their
- UV handles have received an on_close(). Freeing resources is always initiated by
- uv_close() of the uv_tcp_t handle, and completed in on_close() handler functions when it
- is safe. The only exception is when an error occurs that prevents a pn_connection_t or
- pn_listener_t from being associated with a uv handle at all.
+ UV handles have received a close callback. Freeing resources is initiated by uv_close()
+ of the uv_tcp_t handle, and executed in an on_close() handler when it is safe.
*/
const char *COND_NAME = "proactor";
@@ -82,12 +80,12 @@ const char *AMQPS_PORT_NAME = "amqps";
PN_HANDLE(PN_PROACTOR)
/* pn_proactor_t and pn_listener_t are plain C structs with normal memory management.
- Class definitions are for identification as pn_event_t context only.
+ CLASSDEF is for identification when used as a pn_event_t context.
*/
PN_STRUCT_CLASSDEF(pn_proactor, CID_pn_proactor)
PN_STRUCT_CLASSDEF(pn_listener, CID_pn_listener)
-/* A psocket (connection or listener) has the following *mutually exclusive* states. */
+/* A psocket (connection or listener) has the following mutually exclusive states. */
typedef enum {
ON_WORKER, /* On worker_q or in use by user code in worker thread */
ON_LEADER, /* On leader_q or in use the leader loop */
@@ -105,12 +103,11 @@ typedef struct psocket_t {
void (*action)(struct psocket_t*); /* deferred action for leader */
void (*wakeup)(struct psocket_t*); /* wakeup action for leader */
- /* Only used by leader when it owns the psocket */
+ /* Only used by leader thread when it owns the psocket */
uv_tcp_t tcp;
char host[NI_MAXHOST];
char port[NI_MAXSERV];
bool is_conn;
-
} psocket_t;
/* Special value for psocket.next pointer when socket is not on any any list. */
@@ -138,6 +135,7 @@ static inline const char* fixstr(const char* str) {
return str[0] == '\001' ? NULL : str;
}
+/* Holds a psocket and a pn_connection_driver */
typedef struct pconnection_t {
psocket_t psocket;
@@ -150,10 +148,11 @@ typedef struct pconnection_t {
uv_write_t write;
uv_shutdown_t shutdown;
size_t writing; /* size of pending write request, 0 if none pending */
- bool reading; /* true if a read request is pending */
- bool server; /* accept, not connect */
+ bool server; /* accepting not connecting */
} pconnection_t;
+
+/* pn_listener_t with a psocket_t */
struct pn_listener_t {
psocket_t psocket;
@@ -169,6 +168,8 @@ struct pn_listener_t {
/* Only used in leader thread */
size_t connections; /* number of connections waiting to be accepted */
+ int err; /* uv error code, 0 = OK, UV_EOF = closed */
+ const char *what; /* static description string */
};
typedef struct queue { psocket_t *front, *back; } queue;
@@ -198,9 +199,9 @@ struct pn_proactor_t {
bool batch_working; /* batch is being processed in a worker thread */
};
-static bool push_lh(queue *q, psocket_t *ps) {
- if (ps->next != &UNLISTED) /* Don't move if already listed. */
- return false;
+/* Push ps to back of q. Must not be on a different queue */
+static void push_lh(queue *q, psocket_t *ps) {
+ assert(ps->next == &UNLISTED);
ps->next = NULL;
if (!q->front) {
q->front = q->back = ps;
@@ -208,9 +209,9 @@ static bool push_lh(queue *q, psocket_t *ps) {
q->back->next = ps;
q->back = ps;
}
- return true;
}
+/* Pop returns front of q or NULL if empty */
static psocket_t* pop_lh(queue *q) {
psocket_t *ps = q->front;
if (ps) {
@@ -220,29 +221,49 @@ static psocket_t* pop_lh(queue *q) {
return ps;
}
-/* Set state and action and push to relevant queue */
-static inline void set_state_lh(psocket_t *ps, psocket_state_t state, void (*action)(psocket_t*)) {
- /* Illegal if ps is already listed under a different state */
- assert(ps->next == &UNLISTED || ps->state == state);
- ps->state = state;
- if (action && !ps->action) {
- ps->action = action;
+/* Queue an action for the leader thread */
+static void to_leader(psocket_t *ps, void (*action)(psocket_t*)) {
+ uv_mutex_lock(&ps->proactor->lock);
+ ps->action = action;
+ if (ps->next == &UNLISTED) {
+ ps->state = ON_LEADER;
+ push_lh(&ps->proactor->leader_q, ps);
+ }
+ uv_mutex_unlock(&ps->proactor->lock);
+ uv_async_send(&ps->proactor->async); /* Wake leader */
+}
+
+/* Push to the worker thread */
+static void to_worker(psocket_t *ps) {
+ uv_mutex_lock(&ps->proactor->lock);
+ /* If already ON_WORKER do nothing */
+ if (ps->next == &UNLISTED && ps->state != ON_WORKER) {
+ ps->state = ON_WORKER;
+ push_lh(&ps->proactor->worker_q, ps);
}
- switch(state) {
- case ON_LEADER: push_lh(&ps->proactor->leader_q, ps); break;
- case ON_WORKER: push_lh(&ps->proactor->worker_q, ps); break;
- case ON_UV:
- assert(ps->next == &UNLISTED);
- break; /* No queue for UV loop */
+ uv_mutex_unlock(&ps->proactor->lock);
+}
+
+/* Set state to ON_UV */
+static void to_uv(psocket_t *ps) {
+ uv_mutex_lock(&ps->proactor->lock);
+ if (ps->next == &UNLISTED) {
+ ps->state = ON_UV;
}
+ uv_mutex_unlock(&ps->proactor->lock);
}
-/* Set state and action, push to queue and notify leader. Thread safe. */
-static void set_state(psocket_t *ps, psocket_state_t state, void (*action)(psocket_t*)) {
+/* Called in any thread to set a wakeup action */
+static void wakeup(psocket_t *ps, void (*action)(psocket_t*)) {
uv_mutex_lock(&ps->proactor->lock);
- set_state_lh(ps, state, action);
- uv_async_send(&ps->proactor->async);
+ ps->wakeup = action;
+ /* If ON_WORKER we'll do the wakeup in pn_proactor_done() */
+ if (ps->next == &UNLISTED && ps->state != ON_WORKER) {
+ push_lh(&ps->proactor->leader_q, ps);
+ ps->state = ON_LEADER; /* Otherwise notify the leader */
+ }
uv_mutex_unlock(&ps->proactor->lock);
+ uv_async_send(&ps->proactor->async); /* Wake leader */
}
static inline pconnection_t *as_pconnection(psocket_t* ps) {
@@ -308,7 +329,6 @@ static void on_close_pconnection_final(uv_handle_t *h) {
/* Close event for uv_tcp_t of a psocket_t */
static void on_close_psocket(uv_handle_t *h) {
- /* No assert(ps->state == ON_UV); may be called in other states during shutdown. */
psocket_t *ps = (psocket_t*)h->data;
if (ps->is_conn) {
leader_count(ps->proactor, -1);
@@ -329,29 +349,43 @@ static pconnection_t *get_pconnection(pn_connection_t* c) {
return (pconnection_t*) pn_record_get(r, PN_PROACTOR);
}
-static void leader_unwatch(psocket_t *ps);
+static void pconnection_to_worker(pconnection_t *pc);
+static void listener_to_worker(pn_listener_t *l);
-static void leader_error(psocket_t *ps, int err, const char* what) {
- assert(ps->state != ON_WORKER);
- if (ps->is_conn) {
- pn_connection_driver_t *driver = &as_pconnection(ps)->driver;
+int pconnection_error(pconnection_t *pc, int err, const char* what) {
+ if (err) {
+ pn_connection_driver_t *driver = &pc->driver;
pn_connection_driver_bind(driver); /* Bind so errors will be reported */
pn_connection_driver_errorf(driver, COND_NAME, "%s %s:%s: %s",
- what, fixstr(ps->host), fixstr(ps->port),
+ what, fixstr(pc->psocket.host), fixstr(pc->psocket.port),
uv_strerror(err));
pn_connection_driver_close(driver);
- } else {
- pn_listener_t *l = as_listener(ps);
- pn_condition_format(l->condition, COND_NAME, "%s %s:%s: %s",
- what, fixstr(ps->host), fixstr(ps->port),
- uv_strerror(err));
- pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_CLOSE);
- l->closing = true;
+ pconnection_to_worker(pc);
+ }
+ return err;
+}
+
+static int listener_error(pn_listener_t *l, int err, const char* what) {
+ if (err) {
+ l->err = err;
+ l->what = what;
+ listener_to_worker(l);
}
- leader_unwatch(ps); /* Worker to handle the error */
+ return err;
}
-/* uv-initialization */
+static int psocket_error(psocket_t *ps, int err, const char* what) {
+ if (err) {
+ if (ps->is_conn) {
+ pconnection_error(as_pconnection(ps), err, "initialization");
+ } else {
+ listener_error(as_listener(ps), err, "initialization");
+ }
+ }
+ return err;
+}
+
+/* psocket uv-initialization */
static int leader_init(psocket_t *ps) {
ps->state = ON_LEADER;
leader_count(ps->proactor, +1);
@@ -365,9 +399,8 @@ static int leader_init(psocket_t *ps) {
pc->timer.data = ps;
}
}
- }
- if (err) {
- leader_error(ps, err, "initialization");
+ } else {
+ psocket_error(ps, err, "initialization");
}
return err;
}
@@ -375,33 +408,27 @@ static int leader_init(psocket_t *ps) {
/* Outgoing connection */
static void on_connect(uv_connect_t *connect, int err) {
pconnection_t *pc = (pconnection_t*)connect->data;
- assert(pc->psocket.state == ON_UV);
if (!err) {
- leader_unwatch(&pc->psocket);
+ pconnection_to_worker(pc);
} else {
- leader_error(&pc->psocket, err, "on connect to");
+ pconnection_error(pc, err, "on connect to");
}
}
/* Incoming connection ready to be accepted */
static void on_connection(uv_stream_t* server, int err) {
- /* Unlike most on_* functions, this one can be called by the leader thrad when the
+ /* Unlike most on_* functions, this one can be called by the leader thread when the
* listener is ON_WORKER, because there's no way to stop libuv from calling
- * on_connection() in leader_unwatch(). Just increase a counter and deal with it in the
- * worker thread.
+ * on_connection(). Just increase a counter and generate events in to_worker.
*/
pn_listener_t *l = (pn_listener_t*) server->data;
- assert(l->psocket.state == ON_UV);
- if (!err) {
- ++l->connections;
- leader_unwatch(&l->psocket);
- } else {
- leader_error(&l->psocket, err, "on connection from");
- }
+ l->err = err;
+ if (!err) ++l->connections;
+ listener_to_worker(l); /* If already ON_WORKER it will stay there */
}
+/* FIXME aconway 2017-02-16: listener events in unwatch*/
static void leader_accept(pn_listener_t * l) {
- assert(l->psocket.state == ON_UV);
assert(l->accepting);
pconnection_t *pc = l->accepting;
l->accepting = NULL;
@@ -410,10 +437,10 @@ static void leader_accept(pn_listener_t * l) {
err = uv_accept((uv_stream_t*)&l->psocket.tcp, (uv_stream_t*)&pc->psocket.tcp);
}
if (!err) {
- leader_unwatch(&pc->psocket);
+ pconnection_to_worker(pc);
} else {
- leader_error(&pc->psocket, err, "accepting from");
- leader_error(&l->psocket, err, "accepting from");
+ pconnection_error(pc, err, "accepting from");
+ listener_error(l, err, "accepting from");
}
}
@@ -440,7 +467,7 @@ static void leader_connect(psocket_t *ps) {
if (!err) {
ps->state = ON_UV;
} else {
- leader_error(ps, err, "connecting to");
+ psocket_error(ps, err, "connecting to");
}
}
@@ -457,32 +484,26 @@ static void leader_listen(psocket_t *ps) {
err = uv_listen((uv_stream_t*)&l->psocket.tcp, l->backlog, on_connection);
}
if (!err) {
- set_state(ps, ON_UV, NULL);
+ pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_OPEN);
+ listener_to_worker(l); /* Let worker see the OPEN event */
} else {
- leader_error(ps, err, "listening on");
+ listener_error(l, err, "listening on");
}
}
/* Generate tick events and return millis till next tick or 0 if no tick is required */
static pn_millis_t leader_tick(pconnection_t *pc) {
assert(pc->psocket.state != ON_WORKER);
- pn_transport_t *t = pc->driver.transport;
- if (pn_transport_get_idle_timeout(t) || pn_transport_get_remote_idle_timeout(t)) {
- uint64_t now = uv_now(pc->timer.loop);
- uint64_t next = pn_transport_tick(t, now);
- return next ? next - now : 0;
- }
- return 0;
+ uint64_t now = uv_now(pc->timer.loop);
+ uint64_t next = pn_transport_tick(pc->driver.transport, now);
+ return next ? next - now : 0;
}
static void on_tick(uv_timer_t *timer) {
- if (!timer->data) return; /* timer closed */
pconnection_t *pc = (pconnection_t*)timer->data;
- assert(pc->psocket.state == ON_UV);
- uv_timer_stop(&pc->timer);
- pn_millis_t next = leader_tick(pc);
+ pn_millis_t next = leader_tick(pc); /* May generate events */
if (pn_connection_driver_has_event(&pc->driver)) {
- leader_unwatch(&pc->psocket);
+ pconnection_to_worker(pc);
} else if (next) {
uv_timer_start(&pc->timer, on_tick, next, 0);
}
@@ -490,31 +511,28 @@ static void on_tick(uv_timer_t *timer) {
static void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
pconnection_t *pc = (pconnection_t*)stream->data;
- assert(pc->psocket.state == ON_UV);
if (nread >= 0) {
pn_connection_driver_read_done(&pc->driver, nread);
- leader_unwatch(&pc->psocket); /* Handle events */
+ pconnection_to_worker(pc);
} else if (nread == UV_EOF) { /* hangup */
pn_connection_driver_read_close(&pc->driver);
- leader_unwatch(&pc->psocket);
+ pconnection_to_worker(pc);
} else {
- leader_error(&pc->psocket, nread, "on read from");
+ pconnection_error(pc, nread, "on read from");
}
}
static void on_write(uv_write_t* write, int err) {
pconnection_t *pc = (pconnection_t*)write->data;
- assert(pc->psocket.state == ON_UV);
- size_t writing = pc->writing;
- pc->writing = 0; /* This write is done regardless of outcome */
if (err == 0) {
- pn_connection_driver_write_done(&pc->driver, writing);
- leader_unwatch(&pc->psocket);
+ pn_connection_driver_write_done(&pc->driver, pc->writing);
+ pconnection_to_worker(pc);
} else if (err == UV_ECANCELED) {
- leader_unwatch(&pc->psocket); /* cancelled by leader_unwatch, complete the job */
+ pconnection_to_worker(pc);
} else {
- leader_error(&pc->psocket, err, "on write to");
+ pconnection_error(pc, err, "on write to");
}
+ pc->writing = 0;
}
static void on_timeout(uv_timer_t *timer) {
@@ -527,88 +545,111 @@ static void on_timeout(uv_timer_t *timer) {
// Read buffer allocation function for uv, just returns the transports read buffer.
static void alloc_read_buffer(uv_handle_t* stream, size_t size, uv_buf_t* buf) {
pconnection_t *pc = (pconnection_t*)stream->data;
- assert(pc->psocket.state == ON_UV);
pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
*buf = uv_buf_init(rbuf.start, rbuf.size);
}
-/* Monitor a socket in the UV loop */
-static void leader_watch(psocket_t *ps) {
- assert(ps->state != ON_WORKER);
- int err = 0;
- set_state(ps, ON_UV, NULL); /* Assume we are going to UV loop unless sent to worker or leader. */
-
- if (ps->is_conn) {
- pconnection_t *pc = as_pconnection(ps);
- if (pn_connection_driver_finished(&pc->driver)) {
- uv_close((uv_handle_t*)&ps->tcp, on_close_psocket);
- return;
+static void pconnection_to_uv(pconnection_t *pc) {
+ to_uv(&pc->psocket); /* Assume we're going to UV unless sent elsewhere */
+ if (pn_connection_driver_finished(&pc->driver)) {
+ if (!uv_is_closing((uv_handle_t*)&pc->psocket)) {
+ uv_close((uv_handle_t*)&pc->psocket.tcp, on_close_psocket);
}
- pn_millis_t next_tick = leader_tick(pc);
- pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
- pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver);
- if (pn_connection_driver_has_event(&pc->driver)) {
- /* Ticks and checking buffers have generated events, send back to worker to process */
- set_state(ps, ON_WORKER, NULL);
+ return;
+ }
+ pn_millis_t next_tick = leader_tick(pc);
+ pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
+ pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver);
+ if (pn_connection_driver_has_event(&pc->driver)) {
+ to_worker(&pc->psocket); /* Ticks/buffer checks generated events */
+ return;
+ }
+ if (next_tick &&
+ pconnection_error(pc, uv_timer_start(&pc->timer, on_tick, next_tick, 0), "timer start")) {
+ return;
+ }
+ if (wbuf.size > 0) {
+ uv_buf_t buf = uv_buf_init((char*)wbuf.start, wbuf.size);
+ if (pconnection_error(
+ pc, uv_write(&pc->write, (uv_stream_t*)&pc->psocket.tcp, &buf, 1, on_write), "write"))
return;
+ pc->writing = wbuf.size;
+ } else if (pn_connection_driver_write_closed(&pc->driver)) {
+ pc->shutdown.data = &pc->psocket;
+ if (pconnection_error(
+ pc, uv_shutdown(&pc->shutdown, (uv_stream_t*)&pc->psocket.tcp, NULL), "shutdown write"))
+ return;
+ }
+ if (rbuf.size > 0) {
+ if (pconnection_error(
+ pc, uv_read_start((uv_stream_t*)&pc->psocket.tcp, alloc_read_buffer, on_read), "read"))
+ return;
+ }
+}
+
+static void listener_to_uv(pn_listener_t *l) {
+ to_uv(&l->psocket); /* Assume we're going to UV unless sent elsewhere */
+ if (l->err) {
+ if (!uv_is_closing((uv_handle_t*)&l->psocket.tcp)) {
+ uv_close((uv_handle_t*)&l->psocket.tcp, on_close_psocket);
}
- if (next_tick) {
- uv_timer_start(&pc->timer, on_tick, next_tick, 0);
- }
- if (wbuf.size > 0 && !pc->writing) {
- pc->writing = wbuf.size;
- uv_buf_t buf = uv_buf_init((char*)wbuf.start, wbuf.size);
- err = uv_write(&pc->write, (uv_stream_t*)&pc->psocket.tcp, &buf, 1, on_write);
- } else if (wbuf.size == 0 && pn_connection_driver_write_closed(&pc->driver)) {
- pc->shutdown.data = ps;
- err = uv_shutdown(&pc->shutdown, (uv_stream_t*)&pc->psocket.tcp, NULL);
+ } else {
+ if (l->accepting) {
+ leader_accept(l);
}
- if (rbuf.size > 0 && !pc->reading) {
- pc->reading = true;
- err = uv_read_start((uv_stream_t*)&pc->psocket.tcp, alloc_read_buffer, on_read);
+ if (l->connections) {
+ listener_to_worker(l);
}
+ }
+}
+
+/* Monitor a psocket_t in the UV loop */
+static void psocket_to_uv(psocket_t *ps) {
+ if (ps->is_conn) {
+ pconnection_to_uv(as_pconnection(ps));
} else {
- pn_listener_t *l = as_listener(ps);
- if (l->closing && pn_collector_peek(l->collector)) {
- uv_close((uv_handle_t*)&ps->tcp, on_close_psocket);
- return;
- } else {
- if (l->accepting) {
- leader_accept(l);
- }
- if (l->connections) {
- leader_unwatch(ps);
- }
- }
+ listener_to_uv(as_listener(ps));
}
- if (err) {
- leader_error(ps, err, "re-watching");
+}
+
+/* Detach a connection from IO and put it on the worker queue */
+static void pconnection_to_worker(pconnection_t *pc) {
+ /* Can't go to worker if a write is outstanding or the batch is empty */
+ if (!pc->writing && pn_connection_driver_has_event(&pc->driver)) {
+ uv_read_stop((uv_stream_t*)&pc->psocket.tcp);
+ uv_timer_stop(&pc->timer);
}
+ to_worker(&pc->psocket);
}
-/* Detach a socket from IO and put it on the worker queue */
-static void leader_unwatch(psocket_t *ps) {
- assert(ps->state != ON_WORKER); /* From ON_UV or ON_LEADER */
- if (ps->is_conn) {
- pconnection_t *pc = as_pconnection(ps);
- if (!pn_connection_driver_has_event(&pc->driver)) {
- /* Don't return an empty event batch, re-attach to UV loop */
- leader_watch(ps);
- return;
- } else {
- if (pc->writing) {
- uv_cancel((uv_req_t*)&pc->write);
- }
- if (pc->reading) {
- pc->reading = false;
- uv_read_stop((uv_stream_t*)&pc->psocket.tcp);
- }
- if (pc->timer.data && !uv_is_closing((uv_handle_t*)&pc->timer)) {
- uv_timer_stop(&pc->timer);
- }
+/* TODO aconway 2017-02-16: simplify collector API*/
+static bool collector_has_next(pn_collector_t *c) {
+ return pn_collector_more(c) ||
+ (pn_collector_peek(c) && pn_collector_peek(c) != pn_collector_prev(c));
+}
+
+/* Can't really detach a listener, as on_connection can always be called.
+ Generate events here safely.
+*/
+static void listener_to_worker(pn_listener_t *l) {
+ if (collector_has_next(l->collector)) { /* Already have events */
+ to_worker(&l->psocket);
+ } else if (l->err) {
+ if (l->err != UV_EOF) {
+ pn_condition_format(l->condition, COND_NAME, "%s %s:%s: %s",
+ l->what, fixstr(l->psocket.host), fixstr(l->psocket.port),
+ uv_strerror(l->err));
}
+ l->err = 0;
+ pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_CLOSE);
+ to_worker(&l->psocket);
+ } else if (l->connections) { /* Generate accept events one at a time */
+ --l->connections;
+ pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_ACCEPT);
+ to_worker(&l->psocket);
+ } else {
+ listener_to_uv(l);
}
- set_state(ps, ON_WORKER, NULL);
}
/* Set the event in the proactor's batch */
@@ -618,7 +659,7 @@ static pn_event_batch_t *proactor_batch_lh(pn_proactor_t *p, pn_event_type_t t)
return &p->batch;
}
-/* Return the next event batch or 0 if no events are ready */
+/* Return the next event batch or 0 if no events are available in the worker_q */
static pn_event_batch_t* get_batch_lh(pn_proactor_t *p) {
if (!p->batch_working) { /* Can generate proactor events */
if (p->inactive) {
@@ -637,33 +678,14 @@ static pn_event_batch_t* get_batch_lh(pn_proactor_t *p) {
for (psocket_t *ps = pop_lh(&p->worker_q); ps; ps = pop_lh(&p->worker_q)) {
assert(ps->state == ON_WORKER);
if (ps->is_conn) {
- pconnection_t *pc = as_pconnection(ps);
- return &pc->driver.batch;
+ return &as_pconnection(ps)->driver.batch;
} else { /* Listener */
- pn_listener_t *l = as_listener(ps);
- /* Generate accept events one at a time */
- if (l->connections && !pn_collector_peek(l->collector)) {
- --l->connections;
- pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_ACCEPT);
- }
- return &l->batch;
+ return &as_listener(ps)->batch;
}
- set_state_lh(ps, ON_LEADER, NULL); /* No event, back to leader */
}
return 0;
}
-/* Called in any thread to set a wakeup action */
-static void wakeup(psocket_t *ps, void (*action)(psocket_t*)) {
- uv_mutex_lock(&ps->proactor->lock);
- if (action && !ps->wakeup) {
- ps->wakeup = action;
- }
- set_state_lh(ps, ON_LEADER, NULL);
- uv_async_send(&ps->proactor->async); /* Wake leader */
- uv_mutex_unlock(&ps->proactor->lock);
-}
-
pn_listener_t *pn_event_listener(pn_event_t *e) {
return (pn_event_class(e) == pn_listener__class()) ? (pn_listener_t*)pn_event_context(e) : NULL;
}
@@ -689,26 +711,52 @@ void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) {
assert(pc->psocket.state == ON_WORKER);
if (pn_connection_driver_has_event(&pc->driver)) {
/* Process all events before going back to leader */
- set_state(&pc->psocket, ON_WORKER, NULL);
+ pconnection_to_worker(pc);
} else {
- set_state(&pc->psocket, ON_LEADER, leader_watch);
+ to_leader(&pc->psocket, psocket_to_uv);
}
return;
}
pn_listener_t *l = batch_listener(batch);
if (l) {
assert(l->psocket.state == ON_WORKER);
- set_state(&l->psocket, ON_LEADER, leader_watch);
+ to_leader(&l->psocket, psocket_to_uv);
return;
}
pn_proactor_t *bp = batch_proactor(batch);
if (bp == p) {
uv_mutex_lock(&p->lock);
p->batch_working = false;
- uv_async_send(&p->async); /* Wake leader */
uv_mutex_unlock(&p->lock);
return;
}
+ uv_async_send(&p->async); /* Wake leader */
+}
+
+/* Process the leader_q, in the leader thread */
+static void leader_process_lh(pn_proactor_t *p) {
+ if (p->timeout_request) {
+ p->timeout_request = false;
+ if (p->timeout) {
+ uv_timer_start(&p->timer, on_timeout, p->timeout, 0);
+ } else {
+ uv_timer_stop(&p->timer);
+ }
+ }
+ for (psocket_t *ps = pop_lh(&p->leader_q); ps; ps = pop_lh(&p->leader_q)) {
+ assert(ps->state == ON_LEADER);
+ if (ps->wakeup) {
+ uv_mutex_unlock(&p->lock);
+ ps->wakeup(ps);
+ ps->wakeup = NULL;
+ uv_mutex_lock(&p->lock);
+ } else if (ps->action) {
+ uv_mutex_unlock(&p->lock);
+ ps->action(ps);
+ ps->action = NULL;
+ uv_mutex_lock(&p->lock);
+ }
+ }
}
/* Run follower/leader loop till we can return an event and be a worker */
@@ -724,28 +772,7 @@ pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) {
/* Lead till there is work to do. */
p->has_leader = true;
while (batch == NULL) {
- if (p->timeout_request) {
- p->timeout_request = false;
- if (p->timeout) {
- uv_timer_start(&p->timer, on_timeout, p->timeout, 0);
- } else {
- uv_timer_stop(&p->timer);
- }
- }
- for (psocket_t *ps = pop_lh(&p->leader_q); ps; ps = pop_lh(&p->leader_q)) {
- assert(ps->state == ON_LEADER);
- if (ps->wakeup) {
- uv_mutex_unlock(&p->lock);
- ps->wakeup(ps);
- ps->wakeup = NULL;
- uv_mutex_lock(&p->lock);
- } else if (ps->action) {
- uv_mutex_unlock(&p->lock);
- ps->action(ps);
- ps->action = NULL;
- uv_mutex_lock(&p->lock);
- }
- }
+ leader_process_lh(p);
batch = get_batch_lh(p);
if (batch == NULL) {
uv_mutex_unlock(&p->lock);
@@ -753,7 +780,7 @@ pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) {
uv_mutex_lock(&p->lock);
}
}
- /* Signal the next leader and return to work */
+ /* Signal the next leader and go to work */
p->has_leader = false;
uv_cond_signal(&p->cond);
}
@@ -761,19 +788,36 @@ pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) {
return batch;
}
+pn_event_batch_t *pn_proactor_grab(struct pn_proactor_t* p) {
+ uv_mutex_lock(&p->lock);
+ pn_event_batch_t *batch = get_batch_lh(p);
+ if (batch == NULL && !p->has_leader) {
+ /* If there is no leader, try a non-waiting lead to generate some work */
+ p->has_leader = true;
+ leader_process_lh(p);
+ uv_mutex_unlock(&p->lock);
+ uv_run(&p->loop, UV_RUN_NOWAIT);
+ uv_mutex_lock(&p->lock);
+ batch = get_batch_lh(p);
+ p->has_leader = false;
+ }
+ uv_mutex_unlock(&p->lock);
+ return batch;
+}
+
void pn_proactor_interrupt(pn_proactor_t *p) {
uv_mutex_lock(&p->lock);
++p->interrupt;
- uv_async_send(&p->async); /* Interrupt the UV loop */
uv_mutex_unlock(&p->lock);
+ uv_async_send(&p->async); /* Interrupt the UV loop */
}
void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) {
uv_mutex_lock(&p->lock);
p->timeout = t;
p->timeout_request = true;
- uv_async_send(&p->async); /* Interrupt the UV loop */
uv_mutex_unlock(&p->lock);
+ uv_async_send(&p->async); /* Interrupt the UV loop */
}
int pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *host, const char *port) {
@@ -781,7 +825,7 @@ int pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *host,
if (!pc) {
return PN_OUT_OF_MEMORY;
}
- set_state(&pc->psocket, ON_LEADER, leader_connect);
+ to_leader(&pc->psocket, leader_connect);
return 0;
}
@@ -789,7 +833,7 @@ int pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *host, con
{
psocket_init(&l->psocket, p, false, host, port);
l->backlog = backlog;
- set_state(&l->psocket, ON_LEADER, leader_listen);
+ to_leader(&l->psocket, leader_listen);
return 0;
}
@@ -803,7 +847,7 @@ void leader_wake_connection(psocket_t *ps) {
pconnection_t *pc = as_pconnection(ps);
pn_connection_t *c = pc->driver.connection;
pn_collector_put(pn_connection_collector(c), PN_OBJECT, c, PN_CONNECTION_WAKE);
- leader_unwatch(ps);
+ pconnection_to_worker(pc);
}
void pn_connection_wake(pn_connection_t* c) {
@@ -850,11 +894,17 @@ void pn_proactor_free(pn_proactor_t *p) {
static pn_event_t *listener_batch_next(pn_event_batch_t *batch) {
pn_listener_t *l = batch_listener(batch);
assert(l->psocket.state == ON_WORKER);
+ pn_event_t *prev = pn_collector_prev(l->collector);
+ if (prev && pn_event_type(prev) == PN_LISTENER_CLOSE) {
+ l->err = UV_EOF;
+ }
return pn_collector_next(l->collector);
}
static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) {
- return pn_collector_next(batch_proactor(batch)->collector);
+ pn_proactor_t *p = batch_proactor(batch);
+ assert(p->batch_working);
+ return pn_collector_next(p->collector);
}
static void pn_listener_free(pn_listener_t *l) {
@@ -867,7 +917,7 @@ static void pn_listener_free(pn_listener_t *l) {
}
}
-pn_listener_t *pn_listener() {
+pn_listener_t *pn_listener(void) {
pn_listener_t *l = (pn_listener_t*)calloc(1, sizeof(pn_listener_t));
if (l) {
l->batch.next_event = listener_batch_next;
@@ -885,8 +935,8 @@ pn_listener_t *pn_listener() {
void leader_listener_close(psocket_t *ps) {
assert(ps->state = ON_LEADER);
pn_listener_t *l = (pn_listener_t*)ps;
- l->closing = true;
- leader_watch(ps);
+ l->err = UV_EOF;
+ listener_to_uv(l);
}
void pn_listener_close(pn_listener_t* l) {
@@ -895,12 +945,10 @@ void pn_listener_close(pn_listener_t* l) {
}
pn_proactor_t *pn_listener_proactor(pn_listener_t* l) {
- assert(l->psocket.state == ON_WORKER);
return l ? l->psocket.proactor : NULL;
}
pn_condition_t* pn_listener_condition(pn_listener_t* l) {
- assert(l->psocket.state == ON_WORKER);
return l->condition;
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/proton-c/src/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/CMakeLists.txt b/proton-c/src/tests/CMakeLists.txt
index 59c7665..70b30a0 100644
--- a/proton-c/src/tests/CMakeLists.txt
+++ b/proton-c/src/tests/CMakeLists.txt
@@ -20,15 +20,15 @@
add_definitions(${COMPILE_WARNING_FLAGS} ${COMPILE_PLATFORM_FLAGS})
if (ENABLE_VALGRIND AND VALGRIND_EXE)
- set(memcheck-cmd ${VALGRIND_EXE} --error-exitcode=1 --quiet
+ set(memcheck-cmd ${VALGRIND_EXE} --error-exitcode=42 --quiet
--leak-check=full --trace-children=yes)
endif ()
-macro (pn_add_c_test test file)
- add_executable (${test} ${file})
+macro (pn_add_c_test test)
+ add_executable (${test} ${ARGN})
target_link_libraries (${test} qpid-proton)
if (BUILD_WITH_CXX)
- set_source_files_properties (${file} PROPERTIES LANGUAGE CXX)
+ set_source_files_properties (${ARGN} PROPERTIES LANGUAGE CXX)
endif (BUILD_WITH_CXX)
if (CMAKE_SYSTEM_NAME STREQUAL Windows)
add_test (NAME ${test}
@@ -49,3 +49,4 @@ pn_add_c_test (c-reactor-tests reactor.c)
pn_add_c_test (c-event-tests event.c)
pn_add_c_test (c-data-tests data.c)
pn_add_c_test (c-condition-tests condition.c)
+pn_add_c_test (c-proactor-tests proactor.c)
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/proton-c/src/tests/proactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/proactor.c b/proton-c/src/tests/proactor.c
new file mode 100644
index 0000000..ae5b1f6
--- /dev/null
+++ b/proton-c/src/tests/proactor.c
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "test_tools.h"
+#include <proton/condition.h>
+#include <proton/connection.h>
+#include <proton/event.h>
+#include <proton/listener.h>
+#include <proton/proactor.h>
+#include <proton/transport.h>
+#include <stdlib.h>
+#include <string.h>
+
+static pn_millis_t timeout = 5*1000; /* timeout for hanging tests */
+
+static const char *localhost = "127.0.0.1"; /* host for connect/listen */
+
+/* Wait for the next single event, return its type */
+static pn_event_type_t wait_next(pn_proactor_t *proactor) {
+ pn_event_batch_t *events = pn_proactor_wait(proactor);
+ pn_event_type_t etype = pn_event_type(pn_event_batch_next(events));
+ pn_proactor_done(proactor, events);
+ return etype;
+}
+
+/* Get events until an event of `type` or a PN_TRANSPORT_CLOSED/PN_PROACTOR_TIMEOUT */
+static pn_event_type_t wait_for(pn_proactor_t *proactor, pn_event_type_t etype) {
+ while (true) {
+ pn_event_type_t t = wait_next(proactor);
+ if (t == etype || t == PN_PROACTOR_TIMEOUT) {
+ return t;
+ }
+ }
+}
+
+/* Test that interrupt and timeout events cause pn_proactor_wait() to return. */
+static void test_interrupt_timeout(test_t *t) {
+ pn_proactor_t *p = pn_proactor();
+ pn_proactor_interrupt(p);
+ pn_event_type_t etype = wait_next(p);
+ TEST_CHECK(t, PN_PROACTOR_INTERRUPT == etype, pn_event_type_name(etype));
+ pn_proactor_set_timeout(p, 1); /* very short timeout */
+ etype = wait_next(p);
+ TEST_CHECK(t, PN_PROACTOR_TIMEOUT == etype, pn_event_type_name(etype));
+ pn_proactor_free(p);
+}
+
+/* Test handler return value */
+typedef enum {
+ H_CONTINUE, /**@<< handler wants more events */
+ H_FINISHED, /**@<< handler completed without error */
+ H_FAILED /**@<< handler hit an error and cannot continue */
+} handler_state_t;
+
+typedef handler_state_t (*test_handler_fn)(test_t *, pn_event_t*);
+
+/* Proactor and handler that take part in a test */
+typedef struct proactor_test_t {
+ test_t *t;
+ test_handler_fn handler;
+ pn_proactor_t *proactor;
+ handler_state_t state; /* Result of last handler call */
+} proactor_test_t;
+
+
+/* Initialize an array of proactor_test_t */
+static void proactor_test_init(proactor_test_t *pts, size_t n) {
+ for (proactor_test_t *pt = pts; pt < pts + n; ++pt) {
+ if (!pt->proactor) pt->proactor = pn_proactor();
+ pn_proactor_set_timeout(pt->proactor, timeout);
+ pt->state = H_CONTINUE;
+ }
+}
+
+/* Iterate over an array of proactors, draining or handling events with the non-blocking
+ pn_proactor_grab. Continue till all handlers return H_FINISHED (and return 0) or one
+ returns H_FAILED (and return non-0)
+*/
+int proactor_test_run(proactor_test_t *pts, size_t n) {
+ /* Make sure pts are initialized */
+ proactor_test_init(pts, n);
+ size_t finished = 0;
+ do {
+ finished = 0;
+ for (proactor_test_t *pt = pts; pt < pts + n; ++pt) {
+ pn_event_batch_t *events = pn_proactor_grab(pt->proactor);
+ if (events) {
+ pn_event_t *e;
+ while ((e = pn_event_batch_next(events))) {
+ if (pt->state == H_CONTINUE) {
+ pt->state = pt->handler(pt->t, e);
+ }
+ }
+ pn_proactor_done(pt->proactor, events);
+ }
+ switch (pt->state) {
+ case H_CONTINUE: break;
+ case H_FINISHED: ++finished; break;
+ case H_FAILED: return 1;
+ }
+ }
+ } while (finished < n);
+ return 0;
+}
+
+
+/* Simple test of client connect to a listening server */
+handler_state_t listen_connect_server(test_t *t, pn_event_t *e) {
+ switch (pn_event_type(e)) {
+ /* Ignore these events */
+ case PN_LISTENER_OPEN:
+ case PN_CONNECTION_LOCAL_OPEN:
+ case PN_CONNECTION_REMOTE_OPEN:
+ case PN_CONNECTION_BOUND:
+ return H_CONTINUE;
+
+ /* Act on these events */
+ case PN_LISTENER_ACCEPT:
+ pn_listener_accept(pn_event_listener(e), pn_connection());
+ return H_CONTINUE;
+ case PN_CONNECTION_INIT:
+ pn_connection_open(pn_event_connection(e));
+ return H_CONTINUE;
+ case PN_CONNECTION_REMOTE_CLOSE:
+ return H_FINISHED;
+
+ default:
+ TEST_CHECK(t, false, "unexpected event %s", pn_event_type_name(pn_event_type(e)));
+ return H_FAILED;
+ break;
+ }
+}
+
+handler_state_t listen_connect_client(test_t *t, pn_event_t *e) {
+ switch (pn_event_type(e)) {
+ /* Ignore these events */
+ case PN_CONNECTION_LOCAL_OPEN:
+ case PN_CONNECTION_BOUND:
+ return H_CONTINUE;
+
+ /* Act on these events */
+ case PN_CONNECTION_INIT:
+ pn_connection_open(pn_event_connection(e));
+ return H_CONTINUE;
+ case PN_CONNECTION_REMOTE_OPEN:
+ pn_connection_close(pn_event_connection(e));
+ return H_FINISHED;
+
+ /* Unexpected events */
+ default:
+ TEST_CHECK(t, false, "unexpected event %s", pn_event_type_name(pn_event_type(e)));
+ return H_FAILED;
+ break;
+ }
+}
+
+/* Simplest client/server interaction */
+static void test_listen_connect(test_t *t) {
+ proactor_test_t pts[] = { { t, listen_connect_client }, { t, listen_connect_server } };
+ proactor_test_t *client = &pts[0], *server = &pts[1];
+ proactor_test_init(pts, 2);
+
+ int port = pick_port();
+ char port_str[16];
+ snprintf(port_str, sizeof(port_str), "%d", port);
+ pn_proactor_listen(server->proactor, pn_listener(), localhost, port_str, 4);
+ pn_event_type_t etype = wait_for(server->proactor, PN_LISTENER_OPEN);
+ if (TEST_CHECK(t, PN_LISTENER_OPEN == etype, pn_event_type_name(etype))) {
+ pn_proactor_connect(client->proactor, pn_connection(), localhost, port_str);
+ proactor_test_run(pts, 2);
+ }
+ pn_proactor_free(client->proactor);
+ pn_proactor_free(server->proactor);
+}
+
+/* Test error handling */
+static void test_listen_connect_error(test_t *t) {
+ pn_proactor_t *p = pn_proactor();
+ pn_proactor_set_timeout(p, timeout); /* In case of hang */
+ pn_connection_t *c = pn_connection();
+ pn_proactor_connect(p, c, "nosuchost", "nosuchport");
+ pn_event_type_t etype = wait_for(p, PN_TRANSPORT_CLOSED);
+ TEST_CHECK(t, PN_TRANSPORT_CLOSED == etype, pn_event_type_name(etype));
+ TEST_CHECK(t, pn_condition_is_set(pn_transport_condition(pn_connection_transport(c))), "");
+
+ pn_listener_t *l = pn_listener();
+ pn_proactor_listen(p, l, "nosuchost", "nosuchport", 1);
+ etype = wait_for(p, PN_LISTENER_CLOSE);
+ TEST_CHECK(t, PN_LISTENER_CLOSE == etype, pn_event_type_name(etype));
+ TEST_CHECK(t, pn_condition_is_set(pn_listener_condition(l)), "");
+
+ pn_proactor_free(p);
+}
+
+int main(int argv, char** argc) {
+ int failed = 0;
+ RUN_TEST(failed, t, test_interrupt_timeout(&t));
+ RUN_TEST(failed, t, test_listen_connect(&t));
+ RUN_TEST(failed, t, test_listen_connect_error(&t));
+ return failed;
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/proton-c/src/tests/test_tools.h
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/test_tools.h b/proton-c/src/tests/test_tools.h
new file mode 100644
index 0000000..047ab42
--- /dev/null
+++ b/proton-c/src/tests/test_tools.h
@@ -0,0 +1,140 @@
+#ifndef TESTS_TEST_TOOLS_H
+#define TESTS_TEST_TOOLS_H
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <proton/type_compat.h>
+
+#include <stdarg.h>
+#include <stdio.h>
+#include <stdlib.h>
+
+/* Call via ASSERT macro. */
+static void assert_fail_(const char* cond, const char* file, int line) {
+ printf("%s:%d: Assertion failed: %s\n", file, line, cond);
+ abort();
+}
+
+/* Unconditional assert (does not depend on NDEBUG) for tests. */
+#define ASSERT(expr) \
+ ((expr) ? (void)0 : assert_fail_(#expr, __FILE__, __LINE__))
+
+/* Call via macro ASSERT_PERROR */
+static void assert_perror_fail_(const char* cond, const char* file, int line) {
+ perror(cond);
+ printf("%s:%d: Assertion failed (error above): %s\n", file, line, cond);
+ abort();
+}
+
+/* Like ASSERT but also calls perror() to print the current errno error. */
+#define ASSERT_PERROR(expr) \
+ ((expr) ? (void)0 : assert_perror_fail_(#expr, __FILE__, __LINE__))
+
+
+/* A struct to collect the results of a test.
+ * Declare and initialize with TEST_START(t) where t will be declared as a test_t
+ */
+typedef struct test_t {
+ const char* name;
+ int errors;
+} test_t;
+
+/* if !expr print the printf-style error and increment t->errors. Use via macros. Returns expr. */
+static inline bool test_check_(test_t *t, bool expr, const char *sexpr, const char *file, int line, const char* fmt, ...) {
+ if (!expr) {
+ va_list ap;
+ va_start(ap, fmt);
+ fprintf(stderr, "%s:%d:[%s] check failed: (%s)", file, line, t->name, sexpr);
+ if (fmt && *fmt) {
+ fprintf(stderr, " - ");
+ vfprintf(stderr, fmt, ap);
+ }
+ fprintf(stderr, "\n");
+ fflush(stderr);
+ ++t->errors;
+ }
+ return expr;
+}
+
+#define TEST_CHECK(TEST, EXPR, ...) test_check_((TEST), (EXPR), #EXPR, __FILE__, __LINE__, __VA_ARGS__)
+
+/* T is name of a test_t variable, EXPR is the test expression (which should update T)
+ FAILED is incremented if the test has errors
+*/
+#define RUN_TEST(FAILED, T, EXPR) do { \
+ printf("TEST: %s\n", #EXPR); \
+ fflush(stdout); \
+ test_t T = { #EXPR, 0 }; \
+ (EXPR); \
+ if (T.errors) { \
+ printf("FAIL: %s (%d errors)\n", #EXPR, T.errors); \
+ ++(FAILED); \
+ } \
+ } while(0)
+
+#if defined(WIN32)
+
+#include <winsock2.h>
+#include <ws2tcpip.h>
+typedef SOCKET sock_t;
+static inline void sock_close(sock_t sock) { closesocket(sock); }
+
+#else
+
+#include <netdb.h>
+#include <netinet/in.h>
+#include <time.h>
+#include <unistd.h>
+
+static int port_in_use(int port) {
+ /* Attempt to bind a dummy socket to test if the port is in use. */
+ int dummy_socket = socket(AF_INET, SOCK_STREAM, 0);
+ ASSERT_PERROR(dummy_socket >= 0);
+ struct sockaddr_in addr = {0};
+ addr.sin_family = AF_INET;
+ addr.sin_addr.s_addr = INADDR_ANY;
+ addr.sin_port = htons(port);
+ int ret = bind(dummy_socket, (struct sockaddr *) &addr, sizeof(addr));
+ close(dummy_socket);
+ return ret < 0;
+}
+
+/* Try to pick an unused port by picking random ports till we find one
+ that is not in use. This is not foolproof as some other process may
+ grab it before the caller binds or connects.
+*/
+static int pick_port(void) {
+ srand(time(NULL));
+ static int MAX_TRIES = 10;
+ int port = -1;
+ int i = 0;
+ do {
+ /* Pick a random port. Avoid the standard OS ephemeral port range used by
+ bind(0) - ports can be allocated and re-allocated very rapidly there.
+ */
+ port = (rand()%10000) + 10000;
+ } while (i++ < MAX_TRIES && port_in_use(port));
+ ASSERT(i < MAX_TRIES && "cannot pick a port");
+ return port;
+}
+
+#endif
+
+#endif // TESTS_TEST_TOOLS_H
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org