You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/02/23 22:51:29 UTC

[22/38] qpid-proton git commit: PROTON-1403: C proactor tests, fixes & additions

PROTON-1403: C proactor tests, fixes & additions

proactor API additions:
- PN_LISTENER_OPEN: event when listener is listening and connects will succeed.
- pn_proactor_grab(): non-blocking version of pn_proactor_wait(), used in tests

src/tests/test_tools.h - simple C test framework
src/tests/proactor.c - initial tests for basic proactor functionality
src/proactor/libuv.c
 - fixed some assertion bugs and memory leaks
 - renaming and simplifying the code
examples/broker.c: exit with non-0 if broker stops because of an error


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/b987a6a7
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/b987a6a7
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/b987a6a7

Branch: refs/heads/go1
Commit: b987a6a70f30dc593bbea6c98ebae36ec77e4b7d
Parents: 9bd99eb
Author: Alan Conway <ac...@redhat.com>
Authored: Fri Jan 13 16:41:43 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Feb 16 17:58:20 2017 -0500

----------------------------------------------------------------------
 CMakeLists.txt                                  |   2 +-
 examples/c/proactor/CMakeLists.txt              |   6 +-
 examples/c/proactor/broker.c                    |   6 +-
 examples/c/proactor/direct.c                    |   5 +-
 examples/c/proactor/send.c                      |   2 +-
 proton-c/CMakeLists.txt                         |   6 -
 .../cpp/include/proton/io/connection_driver.hpp |   1 +
 proton-c/include/proton/event.h                 |  17 +-
 proton-c/include/proton/listener.h              |   2 +-
 proton-c/include/proton/proactor.h              |  34 +-
 proton-c/src/core/connection_driver.c           |   4 +-
 proton-c/src/core/event.c                       |   2 +
 proton-c/src/proactor/libuv.c                   | 518 ++++++++++---------
 proton-c/src/tests/CMakeLists.txt               |   9 +-
 proton-c/src/tests/proactor.c                   | 217 ++++++++
 proton-c/src/tests/test_tools.h                 | 140 +++++
 16 files changed, 698 insertions(+), 273 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index b538ffd..294fd03 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -71,7 +71,7 @@ if (CMAKE_BUILD_TYPE MATCHES "Coverage")
   make_directory(coverage_results)
   add_custom_target(coverage
     WORKING_DIRECTORY ./coverage_results
-    COMMAND ${CMAKE_SOURCE_DIR}/bin/record-coverage.sh ${CMAKE_SOURCE_DIR} ${CMAKE_BINARY_DIR})
+    CgOMMAND ${CMAKE_SOURCE_DIR}/bin/record-coverage.sh ${CMAKE_SOURCE_DIR} ${CMAKE_BINARY_DIR})
 endif()
 
 if (${CMAKE_SYSTEM_NAME} MATCHES "Darwin")

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/examples/c/proactor/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/c/proactor/CMakeLists.txt b/examples/c/proactor/CMakeLists.txt
index 4189cf5..153f35f 100644
--- a/examples/c/proactor/CMakeLists.txt
+++ b/examples/c/proactor/CMakeLists.txt
@@ -29,7 +29,7 @@ set(CMAKE_REQUIRED_LIBRARIES ${CMAKE_REQUIRED_LIBRARIES} ${Proton_LIBRARIES})
 check_function_exists(pn_proactor HAS_PROACTOR)
 cmake_pop_check_state()
 
-if (HAS_PROACTOR)
+if(HAS_PROACTOR)
 
 add_definitions(${COMPILE_WARNING_FLAGS} ${WERROR} ${COMPILE_PLATFORM_FLAGS} ${LINK_TIME_OPTIMIZATION})
 
@@ -48,6 +48,6 @@ foreach(name broker send receive direct)
 endforeach()
 
 set(run_env ${PYTHON_EXECUTABLE} ${CMAKE_SOURCE_DIR}/proton-c/env.py ${EXAMPLE_ENV} "PATH=${test_path}" ${VALGRIND_ENV})
-add_test(c-proactor ${run_env} -- ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/test.py -v)
+add_test(c-example-proactor ${run_env} -- ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/test.py -v)
 
-endif()
+endif(HAS_PROACTOR)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/examples/c/proactor/broker.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/broker.c b/examples/c/proactor/broker.c
index ebf4068..2a338e1 100644
--- a/examples/c/proactor/broker.c
+++ b/examples/c/proactor/broker.c
@@ -22,6 +22,7 @@
 #include <proton/connection_driver.h>
 #include <proton/proactor.h>
 #include <proton/engine.h>
+#include <proton/listener.h>
 #include <proton/sasl.h>
 #include <proton/transport.h>
 #include <proton/url.h>
@@ -288,8 +289,11 @@ static void session_unsub(broker_t *b, pn_session_t *ssn) {
   }
 }
 
+static int exit_code = 0;
+
 static void check_condition(pn_event_t *e, pn_condition_t *cond) {
   if (pn_condition_is_set(cond)) {
+    exit_code = 1;
     const char *ename = e ? pn_event_type_name(pn_event_type(e)) : "UNKNOWN";
     fprintf(stderr, "%s: %s: %s\n", ename,
             pn_condition_get_name(cond), pn_condition_get_description(cond));
@@ -483,5 +487,5 @@ int main(int argc, char **argv) {
   }
   pn_proactor_free(b.proactor);
   free(threads);
-  return 0;
+  return exit_code;
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/examples/c/proactor/direct.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/direct.c b/examples/c/proactor/direct.c
index 26f1b33..3d0a7d1 100644
--- a/examples/c/proactor/direct.c
+++ b/examples/c/proactor/direct.c
@@ -22,9 +22,10 @@
 #include <proton/connection.h>
 #include <proton/connection_driver.h>
 #include <proton/delivery.h>
-#include <proton/proactor.h>
 #include <proton/link.h>
+#include <proton/listener.h>
 #include <proton/message.h>
+#include <proton/proactor.h>
 #include <proton/sasl.h>
 #include <proton/session.h>
 #include <proton/transport.h>
@@ -59,7 +60,7 @@ typedef struct app_data_t {
 
 static const int BATCH = 1000; /* Batch size for unlimited receive */
 
-int exit_code = 0;
+static int exit_code = 0;
 
 static void check_condition(pn_event_t *e, pn_condition_t *cond) {
   if (pn_condition_is_set(cond)) {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/examples/c/proactor/send.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/send.c b/examples/c/proactor/send.c
index 48fcecd..bba5d3e 100644
--- a/examples/c/proactor/send.c
+++ b/examples/c/proactor/send.c
@@ -50,7 +50,7 @@ typedef struct app_data_t {
   bool finished;
 } app_data_t;
 
-int exit_code = 0;
+static int exit_code = 0;
 
 static void check_condition(pn_event_t *e, pn_condition_t *cond) {
   if (pn_condition_is_set(cond)) {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/proton-c/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/CMakeLists.txt b/proton-c/CMakeLists.txt
index 30a77e2..0731b67 100644
--- a/proton-c/CMakeLists.txt
+++ b/proton-c/CMakeLists.txt
@@ -278,12 +278,6 @@ if (CMAKE_C_COMPILER_ID MATCHES "Clang")
   if (ENABLE_WARNING_ERROR)
     set (COMPILE_WARNING_FLAGS "-Werror ${COMPILE_WARNING_FLAGS}")
   endif (ENABLE_WARNING_ERROR)
-endif()
-
-if (CMAKE_CXX_COMPILER_ID MATCHES "Clang")
-  if (ENABLE_WARNING_ERROR)
-    set (WERROR "-Werror")
-  endif (ENABLE_WARNING_ERROR)
   # TODO aconway 2016-01-06: we should be able to clean up the code and turn on
   # some of these warnings.
   set (CXX_WARNING_FLAGS "${COMPILE_WARNING_FLAGS} -Wno-c++98-compat -Wno-c++98-compat-pedantic -Wno-float-equal -Wno-padded -Wno-sign-conversion -Wno-switch-enum -Wno-weak-vtables -Wno-exit-time-destructors -Wno-global-constructors -Wno-shorten-64-to-32 -Wno-documentation -Wno-documentation-unknown-command -Wno-old-style-cast -Wno-missing-noreturn")

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp b/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp
index 4a0efe9..759b1fc 100644
--- a/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp
+++ b/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp
@@ -113,6 +113,7 @@ PN_CPP_CLASS_EXTERN connection_driver {
     ///
     PN_CPP_EXTERN connection_driver(proton::container&);
 #if PN_CPP_HAS_RVALUE_REFERENCES
+    /// @copydoc connection_driver()
     PN_CPP_EXTERN connection_driver(proton::container&, event_loop&& loop);
 #endif
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/proton-c/include/proton/event.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/event.h b/proton-c/include/proton/event.h
index 4a88368..3cfcc82 100644
--- a/proton-c/include/proton/event.h
+++ b/proton-c/include/proton/event.h
@@ -321,7 +321,8 @@ typedef enum {
   PN_CONNECTION_WAKE,
 
   /**
-   * Indicates the listener is ready to call pn_listener_accept() 
+   * Indicates the listener has an incoming connection, call pn_listener_accept()
+   * to accept it.
    * Events of this type point to the @ref pn_listener_t.
    */
   PN_LISTENER_ACCEPT,
@@ -350,7 +351,13 @@ typedef enum {
    *
    * Events of this type point to the @ref pn_proactor_t.
    */
-  PN_PROACTOR_INACTIVE
+  PN_PROACTOR_INACTIVE,
+
+  /**
+   * Indicates the listener is listeneing.
+   * Events of this type point to the @ref pn_listener_t.
+   */
+  PN_LISTENER_OPEN
 
 } pn_event_type_t;
 
@@ -537,9 +544,9 @@ PN_EXTERN pn_transport_t *pn_event_transport(pn_event_t *event);
 PN_EXTERN pn_record_t *pn_event_attachments(pn_event_t *event);
 
 /**
- * **Experimental** - A batch of events to handle. Call
- * pn_event_batch_next() in a loop until it returns NULL to handle
- * them.
+ * **Experimental** - A batch of events that must be handled in sequence.
+ * Call pn_event_batch_next() in a loop until it returns NULL to extract
+ * the events.
  */
 typedef struct pn_event_batch_t pn_event_batch_t;
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/proton-c/include/proton/listener.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/listener.h b/proton-c/include/proton/listener.h
index 18feca7..2038c06 100644
--- a/proton-c/include/proton/listener.h
+++ b/proton-c/include/proton/listener.h
@@ -20,7 +20,7 @@
  * under the License.
  */
 
-#include <proton/condition.h>
+#include <proton/import_export.h>
 #include <proton/types.h>
 
 #ifdef __cplusplus

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/proton-c/include/proton/proactor.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/proactor.h b/proton-c/include/proton/proactor.h
index 71a7dda..af3acbc 100644
--- a/proton-c/include/proton/proactor.h
+++ b/proton-c/include/proton/proactor.h
@@ -20,10 +20,9 @@
  * under the License.
  */
 
-#include <proton/types.h>
-#include <proton/import_export.h>
-#include <proton/listener.h>
 #include <proton/event.h>
+#include <proton/import_export.h>
+#include <proton/types.h>
 
 #ifdef __cplusplus
 extern "C" {
@@ -95,25 +94,34 @@ PNP_EXTERN int pn_proactor_listen(pn_proactor_t *proactor, pn_listener_t *listen
                        const char *host, const char *port, int backlog);
 
 /**
- * Wait for events to handle.
+ * Wait until there is at least one event to handle.
+ * Always returns a non-empty batch of events.
  *
- * Handle events in the returned batch by calling
- * pn_event_batch_next() until it returns NULL. You must call
- * pn_proactor_done() when you are finished with the batch.
+ * You must call pn_proactor_done() when you are finished with the batch, you
+ * must not use the batch pointer after calling pn_proactor_done().
  *
- * If you call pn_proactor_done() before finishing the batch, the
- * remaining events will be returned again by another call
- * pn_proactor_wait().  This is less efficient, but allows you to
- * handle part of a batch and then hand off the rest to another
- * thread.
+ * Normally it is most efficient to handle the entire batch in one thread, but
+ * you can call pn_proactor_done() on an unfinished the batch. The remaining
+ * events will be returned by another call to pn_proactor_done(), possibly in a
+ * different thread.
+ *
+ * @note You can generate events to force threads to wake up from
+ * pn_proactor_wait() using pn_proactor_interrupt(), pn_proactor_set_timeout()
+ * and pn_connection_wake()
  *
  * @note Thread-safe: can be called concurrently. Events in a single
  * batch must be handled in sequence, but batches returned by separate
- * calls to pn_proactor_wait() can be handled concurrently.
+ * calls can be handled concurrently.
  */
 PNP_EXTERN pn_event_batch_t *pn_proactor_wait(pn_proactor_t *proactor);
 
 /**
+ * Return a batch of events if one is available immediately, otherwise return NULL.  If it
+ * does return an event batch, the rules are the same as for pn_proactor_wait()
+ */
+PNP_EXTERN pn_event_batch_t *pn_proactor_grab(pn_proactor_t *proactor);
+
+/**
  * Call when done handling a batch of events.
  *
  * Must be called exactly once to match each call to

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/proton-c/src/core/connection_driver.c
----------------------------------------------------------------------
diff --git a/proton-c/src/core/connection_driver.c b/proton-c/src/core/connection_driver.c
index 3393e64..0d1db21 100644
--- a/proton-c/src/core/connection_driver.c
+++ b/proton-c/src/core/connection_driver.c
@@ -127,7 +127,9 @@ pn_event_t* pn_connection_driver_next_event(pn_connection_driver_t *d) {
 }
 
 bool pn_connection_driver_has_event(pn_connection_driver_t *d) {
-  return pn_collector_peek(pn_connection_collector(d->connection));
+  /* FIXME aconway 2017-02-15: this is ugly */
+  pn_collector_t *c = pn_connection_collector(d->connection);
+  return pn_collector_more(c) || (pn_collector_peek(c) && pn_collector_peek(c) != pn_collector_prev(c));
 }
 
 bool pn_connection_driver_finished(pn_connection_driver_t *d) {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/proton-c/src/core/event.c
----------------------------------------------------------------------
diff --git a/proton-c/src/core/event.c b/proton-c/src/core/event.c
index 41ff6d1..e213c8f 100644
--- a/proton-c/src/core/event.c
+++ b/proton-c/src/core/event.c
@@ -395,6 +395,8 @@ const char *pn_event_type_name(pn_event_type_t type)
     return "PN_PROACTOR_TIMEOUT";
    case PN_PROACTOR_INACTIVE:
     return "PN_PROACTOR_INACTIVE";
+   case PN_LISTENER_OPEN:
+    return "PN_LISTENER_OPEN";
    default:
     return "PN_UNKNOWN";
   }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/proton-c/src/proactor/libuv.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c
index d17136a..0ccfcda 100644
--- a/proton-c/src/proactor/libuv.c
+++ b/proton-c/src/proactor/libuv.c
@@ -24,6 +24,7 @@
 #include <proton/condition.h>
 #include <proton/connection_driver.h>
 #include <proton/engine.h>
+#include <proton/listener.h>
 #include <proton/message.h>
 #include <proton/object.h>
 #include <proton/proactor.h>
@@ -40,37 +41,34 @@
 #include <string.h>
 
 /*
-  libuv loop functions are thread unsafe. The only exception is uv_async_send()
-  which is a thread safe "wakeup" that can wake the uv_loop from another thread.
+  libuv functions are thread unsafe. The exception is uv_async_send(), a thread safe
+  "wakeup" that can wake the uv_loop from another thread.
 
-  To provide concurrency the proactor uses a "leader-worker-follower" model,
-  threads take turns at the roles:
+  To provide concurrency proactor uses a "leader-worker-follower" model, threads take
+  turns at the roles:
 
-  - a single "leader" calls libuv functions and runs the uv_loop in short bursts
-    to generate work. When there is work available it gives up leadership and
-    becomes a "worker"
+  - a single "leader" thread uses libuv, it runs the uv_loop the in short bursts to
+  generate work. Once there is work it becomes becomes a "worker" thread, another thread
+  takes over as leader.
 
-  - "workers" handle events concurrently for distinct connections/listeners
-    They do as much work as they can get, when none is left they become "followers"
+  - "workers" handle events for separate connections or listeners concurrently. They do as
+  much work as they can, when none is left they become "followers"
 
-  - "followers" wait for the leader to generate work and become workers.
-    When the leader itself becomes a worker, one of the followers takes over.
+  - "followers" wait for the leader to generate work. One follower becomes the new leader,
+  the others become workers or continue to follow till they can get work.
 
-  This model is symmetric: any thread can take on any role based on run-time
-  requirements. It also allows the IO and non-IO work associated with an IO
-  wake-up to be processed in a single thread with no context switches.
+  Any thread in a pool can take on any role necessary at run-time. All the work generated
+  by an IO wake-up for a single connection can be processed in a single single worker
+  thread to minimize context switching.
 
   Function naming:
-  - on_* - called in leader thread by uv_run().
+  - on_* - called in leader thread via  uv_run().
   - leader_* - called in leader thread (either leader_q processing or from an on_ function)
-  - worker_* - called in worker thread
   - *_lh - called with the relevant lock held
 
   LIFECYCLE: pconnection_t and pn_listener_t objects must not be deleted until all their
-  UV handles have received an on_close(). Freeing resources is always initiated by
-  uv_close() of the uv_tcp_t handle, and completed in on_close() handler functions when it
-  is safe. The only exception is when an error occurs that prevents a pn_connection_t or
-  pn_listener_t from being associated with a uv handle at all.
+  UV handles have received a close callback. Freeing resources is initiated by uv_close()
+  of the uv_tcp_t handle, and executed in an on_close() handler when it is safe.
 */
 
 const char *COND_NAME = "proactor";
@@ -82,12 +80,12 @@ const char *AMQPS_PORT_NAME = "amqps";
 PN_HANDLE(PN_PROACTOR)
 
 /* pn_proactor_t and pn_listener_t are plain C structs with normal memory management.
-   Class definitions are for identification as pn_event_t context only.
+   CLASSDEF is for identification when used as a pn_event_t context.
 */
 PN_STRUCT_CLASSDEF(pn_proactor, CID_pn_proactor)
 PN_STRUCT_CLASSDEF(pn_listener, CID_pn_listener)
 
-/* A psocket (connection or listener) has the following *mutually exclusive* states. */
+/* A psocket (connection or listener) has the following mutually exclusive states. */
 typedef enum {
   ON_WORKER,               /* On worker_q or in use by user code in worker thread  */
   ON_LEADER,               /* On leader_q or in use the leader loop  */
@@ -105,12 +103,11 @@ typedef struct psocket_t {
   void (*action)(struct psocket_t*); /* deferred action for leader */
   void (*wakeup)(struct psocket_t*); /* wakeup action for leader */
 
-  /* Only used by leader when it owns the psocket */
+  /* Only used by leader thread when it owns the psocket */
   uv_tcp_t tcp;
   char host[NI_MAXHOST];
   char port[NI_MAXSERV];
   bool is_conn;
-
 } psocket_t;
 
 /* Special value for psocket.next pointer when socket is not on any any list. */
@@ -138,6 +135,7 @@ static inline const char* fixstr(const char* str) {
   return str[0] == '\001' ? NULL : str;
 }
 
+/* Holds a psocket and a pn_connection_driver  */
 typedef struct pconnection_t {
   psocket_t psocket;
 
@@ -150,10 +148,11 @@ typedef struct pconnection_t {
   uv_write_t write;
   uv_shutdown_t shutdown;
   size_t writing;               /* size of pending write request, 0 if none pending */
-  bool reading;                 /* true if a read request is pending */
-  bool server;                  /* accept, not connect */
+  bool server;                  /* accepting not connecting */
 } pconnection_t;
 
+
+/* pn_listener_t with a psocket_t  */
 struct pn_listener_t {
   psocket_t psocket;
 
@@ -169,6 +168,8 @@ struct pn_listener_t {
 
   /* Only used in leader thread */
   size_t connections;           /* number of connections waiting to be accepted  */
+  int err;                      /* uv error code, 0 = OK, UV_EOF = closed */
+  const char *what;             /* static description string */
 };
 
 typedef struct queue { psocket_t *front, *back; } queue;
@@ -198,9 +199,9 @@ struct pn_proactor_t {
   bool batch_working;          /* batch is being processed in a worker thread */
 };
 
-static bool push_lh(queue *q, psocket_t *ps) {
-  if (ps->next != &UNLISTED)  /* Don't move if already listed. */
-    return false;
+/* Push ps to back of q. Must not be on a different queue */
+static void push_lh(queue *q, psocket_t *ps) {
+  assert(ps->next == &UNLISTED);
   ps->next = NULL;
   if (!q->front) {
     q->front = q->back = ps;
@@ -208,9 +209,9 @@ static bool push_lh(queue *q, psocket_t *ps) {
     q->back->next = ps;
     q->back =  ps;
   }
-  return true;
 }
 
+/* Pop returns front of q or NULL if empty */
 static psocket_t* pop_lh(queue *q) {
   psocket_t *ps = q->front;
   if (ps) {
@@ -220,29 +221,49 @@ static psocket_t* pop_lh(queue *q) {
   return ps;
 }
 
-/* Set state and action and push to relevant queue */
-static inline void set_state_lh(psocket_t *ps, psocket_state_t state, void (*action)(psocket_t*)) {
-  /* Illegal if ps is already listed under a different state */
-  assert(ps->next == &UNLISTED || ps->state == state);
-  ps->state = state;
-  if (action && !ps->action) {
-    ps->action = action;
+/* Queue an action for the leader thread */
+static void to_leader(psocket_t *ps, void (*action)(psocket_t*)) {
+  uv_mutex_lock(&ps->proactor->lock);
+  ps->action = action;
+  if (ps->next == &UNLISTED) {
+    ps->state = ON_LEADER;
+    push_lh(&ps->proactor->leader_q, ps);
+  }
+  uv_mutex_unlock(&ps->proactor->lock);
+  uv_async_send(&ps->proactor->async); /* Wake leader */
+}
+
+/* Push to the worker thread */
+static void to_worker(psocket_t *ps) {
+  uv_mutex_lock(&ps->proactor->lock);
+  /* If already ON_WORKER do nothing */
+  if (ps->next == &UNLISTED && ps->state != ON_WORKER) {
+    ps->state = ON_WORKER;
+    push_lh(&ps->proactor->worker_q, ps);
   }
-  switch(state) {
-   case ON_LEADER: push_lh(&ps->proactor->leader_q, ps); break;
-   case ON_WORKER: push_lh(&ps->proactor->worker_q, ps); break;
-   case ON_UV:
-    assert(ps->next == &UNLISTED);
-    break;           /* No queue for UV loop */
+  uv_mutex_unlock(&ps->proactor->lock);
+}
+
+/* Set state to ON_UV */
+static void to_uv(psocket_t *ps) {
+  uv_mutex_lock(&ps->proactor->lock);
+  if (ps->next == &UNLISTED) {
+    ps->state = ON_UV;
   }
+  uv_mutex_unlock(&ps->proactor->lock);
 }
 
-/* Set state and action, push to queue and notify leader. Thread safe. */
-static void set_state(psocket_t *ps, psocket_state_t state, void (*action)(psocket_t*)) {
+/* Called in any thread to set a wakeup action */
+static void wakeup(psocket_t *ps, void (*action)(psocket_t*)) {
   uv_mutex_lock(&ps->proactor->lock);
-  set_state_lh(ps, state, action);
-  uv_async_send(&ps->proactor->async);
+  ps->wakeup = action;
+  /* If ON_WORKER we'll do the wakeup in pn_proactor_done() */
+  if (ps->next == &UNLISTED && ps->state != ON_WORKER) {
+    push_lh(&ps->proactor->leader_q, ps);
+    ps->state = ON_LEADER;      /* Otherwise notify the leader */
+  }
   uv_mutex_unlock(&ps->proactor->lock);
+  uv_async_send(&ps->proactor->async); /* Wake leader */
 }
 
 static inline pconnection_t *as_pconnection(psocket_t* ps) {
@@ -308,7 +329,6 @@ static void on_close_pconnection_final(uv_handle_t *h) {
 
 /* Close event for uv_tcp_t of a psocket_t */
 static void on_close_psocket(uv_handle_t *h) {
-  /* No assert(ps->state == ON_UV); may be called in other states during shutdown. */
   psocket_t *ps = (psocket_t*)h->data;
   if (ps->is_conn) {
     leader_count(ps->proactor, -1);
@@ -329,29 +349,43 @@ static pconnection_t *get_pconnection(pn_connection_t* c) {
   return (pconnection_t*) pn_record_get(r, PN_PROACTOR);
 }
 
-static void leader_unwatch(psocket_t *ps);
+static void pconnection_to_worker(pconnection_t *pc);
+static void listener_to_worker(pn_listener_t *l);
 
-static void leader_error(psocket_t *ps, int err, const char* what) {
-  assert(ps->state != ON_WORKER);
-  if (ps->is_conn) {
-    pn_connection_driver_t *driver = &as_pconnection(ps)->driver;
+int pconnection_error(pconnection_t *pc, int err, const char* what) {
+  if (err) {
+    pn_connection_driver_t *driver = &pc->driver;
     pn_connection_driver_bind(driver); /* Bind so errors will be reported */
     pn_connection_driver_errorf(driver, COND_NAME, "%s %s:%s: %s",
-                                what, fixstr(ps->host), fixstr(ps->port),
+                                what, fixstr(pc->psocket.host), fixstr(pc->psocket.port),
                                 uv_strerror(err));
     pn_connection_driver_close(driver);
-  } else {
-    pn_listener_t *l = as_listener(ps);
-    pn_condition_format(l->condition, COND_NAME, "%s %s:%s: %s",
-                        what, fixstr(ps->host), fixstr(ps->port),
-                        uv_strerror(err));
-    pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_CLOSE);
-    l->closing = true;
+    pconnection_to_worker(pc);
+  }
+  return err;
+}
+
+static int listener_error(pn_listener_t *l, int err, const char* what) {
+  if (err) {
+    l->err = err;
+    l->what = what;
+    listener_to_worker(l);
   }
-  leader_unwatch(ps);               /* Worker to handle the error */
+  return err;
 }
 
-/* uv-initialization */
+static int psocket_error(psocket_t *ps, int err, const char* what) {
+  if (err) {
+    if (ps->is_conn) {
+      pconnection_error(as_pconnection(ps), err, "initialization");
+    } else {
+      listener_error(as_listener(ps), err, "initialization");
+    }
+  }
+  return err;
+}
+
+/* psocket uv-initialization */
 static int leader_init(psocket_t *ps) {
   ps->state = ON_LEADER;
   leader_count(ps->proactor, +1);
@@ -365,9 +399,8 @@ static int leader_init(psocket_t *ps) {
         pc->timer.data = ps;
       }
     }
-  }
-  if (err) {
-    leader_error(ps, err, "initialization");
+  } else {
+    psocket_error(ps, err, "initialization");
   }
   return err;
 }
@@ -375,33 +408,27 @@ static int leader_init(psocket_t *ps) {
 /* Outgoing connection */
 static void on_connect(uv_connect_t *connect, int err) {
   pconnection_t *pc = (pconnection_t*)connect->data;
-  assert(pc->psocket.state == ON_UV);
   if (!err) {
-    leader_unwatch(&pc->psocket);
+    pconnection_to_worker(pc);
   } else {
-    leader_error(&pc->psocket, err, "on connect to");
+    pconnection_error(pc, err, "on connect to");
   }
 }
 
 /* Incoming connection ready to be accepted */
 static void on_connection(uv_stream_t* server, int err) {
-  /* Unlike most on_* functions, this one can be called by the leader thrad when the
+  /* Unlike most on_* functions, this one can be called by the leader thread when the
    * listener is ON_WORKER, because there's no way to stop libuv from calling
-   * on_connection() in leader_unwatch().  Just increase a counter and deal with it in the
-   * worker thread.
+   * on_connection().  Just increase a counter and generate events in to_worker.
    */
   pn_listener_t *l = (pn_listener_t*) server->data;
-  assert(l->psocket.state == ON_UV);
-  if (!err) {
-    ++l->connections;
-    leader_unwatch(&l->psocket);
-  } else {
-    leader_error(&l->psocket, err, "on connection from");
-  }
+  l->err = err;
+  if (!err) ++l->connections;
+  listener_to_worker(l);        /* If already ON_WORKER it will stay there */
 }
 
+/* FIXME aconway 2017-02-16:  listener events in unwatch*/
 static void leader_accept(pn_listener_t * l) {
-  assert(l->psocket.state == ON_UV);
   assert(l->accepting);
   pconnection_t *pc = l->accepting;
   l->accepting = NULL;
@@ -410,10 +437,10 @@ static void leader_accept(pn_listener_t * l) {
     err = uv_accept((uv_stream_t*)&l->psocket.tcp, (uv_stream_t*)&pc->psocket.tcp);
   }
   if (!err) {
-    leader_unwatch(&pc->psocket);
+    pconnection_to_worker(pc);
   } else {
-    leader_error(&pc->psocket, err, "accepting from");
-    leader_error(&l->psocket, err, "accepting from");
+    pconnection_error(pc, err, "accepting from");
+    listener_error(l, err, "accepting from");
   }
 }
 
@@ -440,7 +467,7 @@ static void leader_connect(psocket_t *ps) {
   if (!err) {
     ps->state = ON_UV;
   } else {
-    leader_error(ps, err, "connecting to");
+    psocket_error(ps, err, "connecting to");
   }
 }
 
@@ -457,32 +484,26 @@ static void leader_listen(psocket_t *ps) {
     err = uv_listen((uv_stream_t*)&l->psocket.tcp, l->backlog, on_connection);
   }
   if (!err) {
-    set_state(ps, ON_UV, NULL);
+    pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_OPEN);
+    listener_to_worker(l);      /* Let worker see the OPEN event */
   } else {
-    leader_error(ps, err, "listening on");
+    listener_error(l, err, "listening on");
   }
 }
 
 /* Generate tick events and return millis till next tick or 0 if no tick is required */
 static pn_millis_t leader_tick(pconnection_t *pc) {
   assert(pc->psocket.state != ON_WORKER);
-  pn_transport_t *t = pc->driver.transport;
-  if (pn_transport_get_idle_timeout(t) || pn_transport_get_remote_idle_timeout(t)) {
-    uint64_t now = uv_now(pc->timer.loop);
-    uint64_t next = pn_transport_tick(t, now);
-    return next ? next - now : 0;
-  }
-  return 0;
+  uint64_t now = uv_now(pc->timer.loop);
+  uint64_t next = pn_transport_tick(pc->driver.transport, now);
+  return next ? next - now : 0;
 }
 
 static void on_tick(uv_timer_t *timer) {
-  if (!timer->data) return;     /* timer closed */
   pconnection_t *pc = (pconnection_t*)timer->data;
-  assert(pc->psocket.state == ON_UV);
-  uv_timer_stop(&pc->timer);
-  pn_millis_t next = leader_tick(pc);
+  pn_millis_t next = leader_tick(pc); /* May generate events */
   if (pn_connection_driver_has_event(&pc->driver)) {
-    leader_unwatch(&pc->psocket);
+    pconnection_to_worker(pc);
   } else if (next) {
     uv_timer_start(&pc->timer, on_tick, next, 0);
   }
@@ -490,31 +511,28 @@ static void on_tick(uv_timer_t *timer) {
 
 static void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
   pconnection_t *pc = (pconnection_t*)stream->data;
-  assert(pc->psocket.state == ON_UV);
   if (nread >= 0) {
     pn_connection_driver_read_done(&pc->driver, nread);
-    leader_unwatch(&pc->psocket); /* Handle events */
+    pconnection_to_worker(pc);
   } else if (nread == UV_EOF) { /* hangup */
     pn_connection_driver_read_close(&pc->driver);
-    leader_unwatch(&pc->psocket);
+    pconnection_to_worker(pc);
   } else {
-    leader_error(&pc->psocket, nread, "on read from");
+    pconnection_error(pc, nread, "on read from");
   }
 }
 
 static void on_write(uv_write_t* write, int err) {
   pconnection_t *pc = (pconnection_t*)write->data;
-  assert(pc->psocket.state == ON_UV);
-  size_t writing = pc->writing;
-  pc->writing = 0;              /* This write is done regardless of outcome */
   if (err == 0) {
-    pn_connection_driver_write_done(&pc->driver, writing);
-    leader_unwatch(&pc->psocket);
+    pn_connection_driver_write_done(&pc->driver, pc->writing);
+    pconnection_to_worker(pc);
   } else if (err == UV_ECANCELED) {
-    leader_unwatch(&pc->psocket);    /* cancelled by leader_unwatch, complete the job */
+    pconnection_to_worker(pc);
   } else {
-    leader_error(&pc->psocket, err, "on write to");
+    pconnection_error(pc, err, "on write to");
   }
+  pc->writing = 0;
 }
 
 static void on_timeout(uv_timer_t *timer) {
@@ -527,88 +545,111 @@ static void on_timeout(uv_timer_t *timer) {
 // Read buffer allocation function for uv, just returns the transports read buffer.
 static void alloc_read_buffer(uv_handle_t* stream, size_t size, uv_buf_t* buf) {
   pconnection_t *pc = (pconnection_t*)stream->data;
-  assert(pc->psocket.state == ON_UV);
   pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
   *buf = uv_buf_init(rbuf.start, rbuf.size);
 }
 
-/* Monitor a socket in the UV loop */
-static void leader_watch(psocket_t *ps) {
-  assert(ps->state != ON_WORKER);
-  int err = 0;
-  set_state(ps, ON_UV, NULL); /* Assume we are going to UV loop unless sent to worker or leader. */
-
-  if (ps->is_conn) {
-    pconnection_t *pc = as_pconnection(ps);
-    if (pn_connection_driver_finished(&pc->driver)) {
-      uv_close((uv_handle_t*)&ps->tcp, on_close_psocket);
-      return;
+static void pconnection_to_uv(pconnection_t *pc) {
+  to_uv(&pc->psocket);          /* Assume we're going to UV unless sent elsewhere */
+  if (pn_connection_driver_finished(&pc->driver)) {
+    if (!uv_is_closing((uv_handle_t*)&pc->psocket)) {
+      uv_close((uv_handle_t*)&pc->psocket.tcp, on_close_psocket);
     }
-    pn_millis_t next_tick = leader_tick(pc);
-    pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
-    pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver);
-    if (pn_connection_driver_has_event(&pc->driver)) {
-      /* Ticks and checking buffers have generated events, send back to worker to process */
-      set_state(ps, ON_WORKER, NULL);
+    return;
+  }
+  pn_millis_t next_tick = leader_tick(pc);
+  pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
+  pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver);
+  if (pn_connection_driver_has_event(&pc->driver)) {
+    to_worker(&pc->psocket);  /* Ticks/buffer checks generated events */
+    return;
+  }
+  if (next_tick &&
+      pconnection_error(pc, uv_timer_start(&pc->timer, on_tick, next_tick, 0), "timer start")) {
+    return;
+  }
+  if (wbuf.size > 0) {
+    uv_buf_t buf = uv_buf_init((char*)wbuf.start, wbuf.size);
+    if (pconnection_error(
+          pc, uv_write(&pc->write, (uv_stream_t*)&pc->psocket.tcp, &buf, 1, on_write), "write"))
       return;
+    pc->writing = wbuf.size;
+  } else if (pn_connection_driver_write_closed(&pc->driver)) {
+    pc->shutdown.data = &pc->psocket;
+    if (pconnection_error(
+          pc, uv_shutdown(&pc->shutdown, (uv_stream_t*)&pc->psocket.tcp, NULL), "shutdown write"))
+      return;
+  }
+  if (rbuf.size > 0) {
+    if (pconnection_error(
+          pc, uv_read_start((uv_stream_t*)&pc->psocket.tcp, alloc_read_buffer, on_read), "read"))
+        return;
+  }
+}
+
+static void listener_to_uv(pn_listener_t *l) {
+  to_uv(&l->psocket);           /* Assume we're going to UV unless sent elsewhere */
+  if (l->err) {
+    if (!uv_is_closing((uv_handle_t*)&l->psocket.tcp)) {
+      uv_close((uv_handle_t*)&l->psocket.tcp, on_close_psocket);
     }
-    if (next_tick) {
-      uv_timer_start(&pc->timer, on_tick, next_tick, 0);
-    }
-    if (wbuf.size > 0 && !pc->writing) {
-      pc->writing = wbuf.size;
-      uv_buf_t buf = uv_buf_init((char*)wbuf.start, wbuf.size);
-      err = uv_write(&pc->write, (uv_stream_t*)&pc->psocket.tcp, &buf, 1, on_write);
-    } else if (wbuf.size == 0 && pn_connection_driver_write_closed(&pc->driver)) {
-      pc->shutdown.data = ps;
-      err = uv_shutdown(&pc->shutdown, (uv_stream_t*)&pc->psocket.tcp, NULL);
+  } else {
+    if (l->accepting) {
+      leader_accept(l);
     }
-    if (rbuf.size > 0 && !pc->reading) {
-      pc->reading = true;
-      err = uv_read_start((uv_stream_t*)&pc->psocket.tcp, alloc_read_buffer, on_read);
+    if (l->connections) {
+      listener_to_worker(l);
     }
+  }
+}
+
+/* Monitor a psocket_t in the UV loop */
+static void psocket_to_uv(psocket_t *ps) {
+  if (ps->is_conn) {
+    pconnection_to_uv(as_pconnection(ps));
   } else {
-    pn_listener_t *l = as_listener(ps);
-    if (l->closing && pn_collector_peek(l->collector)) {
-      uv_close((uv_handle_t*)&ps->tcp, on_close_psocket);
-      return;
-    } else {
-      if (l->accepting) {
-        leader_accept(l);
-      }
-      if (l->connections) {
-        leader_unwatch(ps);
-      }
-    }
+    listener_to_uv(as_listener(ps));
   }
-  if (err) {
-    leader_error(ps, err, "re-watching");
+}
+
+/* Detach a connection from IO and put it on the worker queue */
+static void pconnection_to_worker(pconnection_t *pc) {
+  /* Can't go to worker if a write is outstanding or the batch is empty */
+  if (!pc->writing && pn_connection_driver_has_event(&pc->driver)) {
+    uv_read_stop((uv_stream_t*)&pc->psocket.tcp);
+    uv_timer_stop(&pc->timer);
   }
+  to_worker(&pc->psocket);
 }
 
-/* Detach a socket from IO and put it on the worker queue */
-static void leader_unwatch(psocket_t *ps) {
-  assert(ps->state != ON_WORKER); /* From ON_UV or ON_LEADER */
-  if (ps->is_conn) {
-    pconnection_t *pc = as_pconnection(ps);
-    if (!pn_connection_driver_has_event(&pc->driver)) {
-      /* Don't return an empty event batch, re-attach to UV loop */
-      leader_watch(ps);
-      return;
-    } else {
-      if (pc->writing) {
-        uv_cancel((uv_req_t*)&pc->write);
-      }
-      if (pc->reading) {
-        pc->reading = false;
-        uv_read_stop((uv_stream_t*)&pc->psocket.tcp);
-      }
-      if (pc->timer.data && !uv_is_closing((uv_handle_t*)&pc->timer)) {
-        uv_timer_stop(&pc->timer);
-      }
+/* TODO aconway 2017-02-16: simplify collector API*/
+static bool collector_has_next(pn_collector_t *c) {
+  return pn_collector_more(c) ||
+    (pn_collector_peek(c) && pn_collector_peek(c) != pn_collector_prev(c));
+}
+
+/* Can't really detach a listener, as on_connection can always be called.
+   Generate events here safely.
+*/
+static void listener_to_worker(pn_listener_t *l) {
+  if (collector_has_next(l->collector)) { /* Already have events */
+    to_worker(&l->psocket);
+  } else if (l->err) {
+    if (l->err != UV_EOF) {
+      pn_condition_format(l->condition, COND_NAME, "%s %s:%s: %s",
+                          l->what, fixstr(l->psocket.host), fixstr(l->psocket.port),
+                          uv_strerror(l->err));
     }
+    l->err = 0;
+    pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_CLOSE);
+    to_worker(&l->psocket);
+  } else if (l->connections) {    /* Generate accept events one at a time */
+    --l->connections;
+    pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_ACCEPT);
+    to_worker(&l->psocket);
+  } else {
+    listener_to_uv(l);
   }
-  set_state(ps, ON_WORKER, NULL);
 }
 
 /* Set the event in the proactor's batch  */
@@ -618,7 +659,7 @@ static pn_event_batch_t *proactor_batch_lh(pn_proactor_t *p, pn_event_type_t t)
   return &p->batch;
 }
 
-/* Return the next event batch or 0 if no events are ready */
+/* Return the next event batch or 0 if no events are available in the worker_q */
 static pn_event_batch_t* get_batch_lh(pn_proactor_t *p) {
   if (!p->batch_working) {       /* Can generate proactor events */
     if (p->inactive) {
@@ -637,33 +678,14 @@ static pn_event_batch_t* get_batch_lh(pn_proactor_t *p) {
   for (psocket_t *ps = pop_lh(&p->worker_q); ps; ps = pop_lh(&p->worker_q)) {
     assert(ps->state == ON_WORKER);
     if (ps->is_conn) {
-      pconnection_t *pc = as_pconnection(ps);
-      return &pc->driver.batch;
+      return &as_pconnection(ps)->driver.batch;
     } else {                    /* Listener */
-      pn_listener_t *l = as_listener(ps);
-      /* Generate accept events one at a time */
-      if (l->connections && !pn_collector_peek(l->collector)) {
-        --l->connections;
-        pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_ACCEPT);
-      }
-      return &l->batch;
+      return &as_listener(ps)->batch;
     }
-    set_state_lh(ps, ON_LEADER, NULL); /* No event, back to leader */
   }
   return 0;
 }
 
-/* Called in any thread to set a wakeup action */
-static void wakeup(psocket_t *ps, void (*action)(psocket_t*)) {
-  uv_mutex_lock(&ps->proactor->lock);
-  if (action && !ps->wakeup) {
-    ps->wakeup = action;
-  }
-  set_state_lh(ps, ON_LEADER, NULL);
-  uv_async_send(&ps->proactor->async); /* Wake leader */
-  uv_mutex_unlock(&ps->proactor->lock);
-}
-
 pn_listener_t *pn_event_listener(pn_event_t *e) {
   return (pn_event_class(e) == pn_listener__class()) ? (pn_listener_t*)pn_event_context(e) : NULL;
 }
@@ -689,26 +711,52 @@ void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) {
     assert(pc->psocket.state == ON_WORKER);
     if (pn_connection_driver_has_event(&pc->driver)) {
       /* Process all events before going back to leader */
-      set_state(&pc->psocket, ON_WORKER, NULL);
+      pconnection_to_worker(pc);
     } else {
-      set_state(&pc->psocket, ON_LEADER, leader_watch);
+      to_leader(&pc->psocket, psocket_to_uv);
     }
     return;
   }
   pn_listener_t *l = batch_listener(batch);
   if (l) {
     assert(l->psocket.state == ON_WORKER);
-    set_state(&l->psocket, ON_LEADER, leader_watch);
+    to_leader(&l->psocket, psocket_to_uv);
     return;
   }
   pn_proactor_t *bp = batch_proactor(batch);
   if (bp == p) {
     uv_mutex_lock(&p->lock);
     p->batch_working = false;
-    uv_async_send(&p->async); /* Wake leader */
     uv_mutex_unlock(&p->lock);
     return;
   }
+  uv_async_send(&p->async); /* Wake leader */
+}
+
+/* Process the leader_q, in the leader thread */
+static void leader_process_lh(pn_proactor_t *p) {
+  if (p->timeout_request) {
+    p->timeout_request = false;
+    if (p->timeout) {
+      uv_timer_start(&p->timer, on_timeout, p->timeout, 0);
+    } else {
+      uv_timer_stop(&p->timer);
+    }
+  }
+  for (psocket_t *ps = pop_lh(&p->leader_q); ps; ps = pop_lh(&p->leader_q)) {
+    assert(ps->state == ON_LEADER);
+    if (ps->wakeup) {
+      uv_mutex_unlock(&p->lock);
+      ps->wakeup(ps);
+      ps->wakeup = NULL;
+      uv_mutex_lock(&p->lock);
+    } else if (ps->action) {
+      uv_mutex_unlock(&p->lock);
+      ps->action(ps);
+      ps->action = NULL;
+      uv_mutex_lock(&p->lock);
+    }
+  }
 }
 
 /* Run follower/leader loop till we can return an event and be a worker */
@@ -724,28 +772,7 @@ pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) {
     /* Lead till there is work to do. */
     p->has_leader = true;
     while (batch == NULL) {
-      if (p->timeout_request) {
-        p->timeout_request = false;
-        if (p->timeout) {
-          uv_timer_start(&p->timer, on_timeout, p->timeout, 0);
-        } else {
-          uv_timer_stop(&p->timer);
-        }
-      }
-      for (psocket_t *ps = pop_lh(&p->leader_q); ps; ps = pop_lh(&p->leader_q)) {
-        assert(ps->state == ON_LEADER);
-        if (ps->wakeup) {
-          uv_mutex_unlock(&p->lock);
-          ps->wakeup(ps);
-          ps->wakeup = NULL;
-          uv_mutex_lock(&p->lock);
-        } else if (ps->action) {
-          uv_mutex_unlock(&p->lock);
-          ps->action(ps);
-          ps->action = NULL;
-          uv_mutex_lock(&p->lock);
-        }
-      }
+      leader_process_lh(p);
       batch = get_batch_lh(p);
       if (batch == NULL) {
         uv_mutex_unlock(&p->lock);
@@ -753,7 +780,7 @@ pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) {
         uv_mutex_lock(&p->lock);
       }
     }
-    /* Signal the next leader and return to work */
+    /* Signal the next leader and go to work */
     p->has_leader = false;
     uv_cond_signal(&p->cond);
   }
@@ -761,19 +788,36 @@ pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) {
   return batch;
 }
 
+pn_event_batch_t *pn_proactor_grab(struct pn_proactor_t* p) {
+  uv_mutex_lock(&p->lock);
+  pn_event_batch_t *batch = get_batch_lh(p);
+  if (batch == NULL && !p->has_leader) {
+    /* If there is no leader, try a non-waiting lead to generate some work */
+    p->has_leader = true;
+    leader_process_lh(p);
+    uv_mutex_unlock(&p->lock);
+    uv_run(&p->loop, UV_RUN_NOWAIT);
+    uv_mutex_lock(&p->lock);
+    batch = get_batch_lh(p);
+    p->has_leader = false;
+  }
+  uv_mutex_unlock(&p->lock);
+  return batch;
+}
+
 void pn_proactor_interrupt(pn_proactor_t *p) {
   uv_mutex_lock(&p->lock);
   ++p->interrupt;
-  uv_async_send(&p->async);   /* Interrupt the UV loop */
   uv_mutex_unlock(&p->lock);
+  uv_async_send(&p->async);   /* Interrupt the UV loop */
 }
 
 void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) {
   uv_mutex_lock(&p->lock);
   p->timeout = t;
   p->timeout_request = true;
-  uv_async_send(&p->async);   /* Interrupt the UV loop */
   uv_mutex_unlock(&p->lock);
+  uv_async_send(&p->async);   /* Interrupt the UV loop */
 }
 
 int pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *host, const char *port) {
@@ -781,7 +825,7 @@ int pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *host,
   if (!pc) {
     return PN_OUT_OF_MEMORY;
   }
-  set_state(&pc->psocket, ON_LEADER, leader_connect);
+  to_leader(&pc->psocket, leader_connect);
   return 0;
 }
 
@@ -789,7 +833,7 @@ int pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *host, con
 {
   psocket_init(&l->psocket, p, false, host, port);
   l->backlog = backlog;
-  set_state(&l->psocket, ON_LEADER, leader_listen);
+  to_leader(&l->psocket, leader_listen);
   return 0;
 }
 
@@ -803,7 +847,7 @@ void leader_wake_connection(psocket_t *ps) {
   pconnection_t *pc = as_pconnection(ps);
   pn_connection_t *c = pc->driver.connection;
   pn_collector_put(pn_connection_collector(c), PN_OBJECT, c, PN_CONNECTION_WAKE);
-  leader_unwatch(ps);
+  pconnection_to_worker(pc);
 }
 
 void pn_connection_wake(pn_connection_t* c) {
@@ -850,11 +894,17 @@ void pn_proactor_free(pn_proactor_t *p) {
 static pn_event_t *listener_batch_next(pn_event_batch_t *batch) {
   pn_listener_t *l = batch_listener(batch);
   assert(l->psocket.state == ON_WORKER);
+  pn_event_t *prev = pn_collector_prev(l->collector);
+  if (prev && pn_event_type(prev) == PN_LISTENER_CLOSE) {
+    l->err = UV_EOF;
+  }
   return pn_collector_next(l->collector);
 }
 
 static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) {
-  return pn_collector_next(batch_proactor(batch)->collector);
+  pn_proactor_t *p = batch_proactor(batch);
+  assert(p->batch_working);
+  return pn_collector_next(p->collector);
 }
 
 static void pn_listener_free(pn_listener_t *l) {
@@ -867,7 +917,7 @@ static void pn_listener_free(pn_listener_t *l) {
   }
 }
 
-pn_listener_t *pn_listener() {
+pn_listener_t *pn_listener(void) {
   pn_listener_t *l = (pn_listener_t*)calloc(1, sizeof(pn_listener_t));
   if (l) {
     l->batch.next_event = listener_batch_next;
@@ -885,8 +935,8 @@ pn_listener_t *pn_listener() {
 void leader_listener_close(psocket_t *ps) {
   assert(ps->state = ON_LEADER);
   pn_listener_t *l = (pn_listener_t*)ps;
-  l->closing = true;
-  leader_watch(ps);
+  l->err = UV_EOF;
+  listener_to_uv(l);
 }
 
 void pn_listener_close(pn_listener_t* l) {
@@ -895,12 +945,10 @@ void pn_listener_close(pn_listener_t* l) {
 }
 
 pn_proactor_t *pn_listener_proactor(pn_listener_t* l) {
-  assert(l->psocket.state == ON_WORKER);
   return l ? l->psocket.proactor : NULL;
 }
 
 pn_condition_t* pn_listener_condition(pn_listener_t* l) {
-  assert(l->psocket.state == ON_WORKER);
   return l->condition;
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/proton-c/src/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/CMakeLists.txt b/proton-c/src/tests/CMakeLists.txt
index 59c7665..70b30a0 100644
--- a/proton-c/src/tests/CMakeLists.txt
+++ b/proton-c/src/tests/CMakeLists.txt
@@ -20,15 +20,15 @@
 add_definitions(${COMPILE_WARNING_FLAGS} ${COMPILE_PLATFORM_FLAGS})
 
 if (ENABLE_VALGRIND AND VALGRIND_EXE)
-  set(memcheck-cmd ${VALGRIND_EXE} --error-exitcode=1 --quiet
+  set(memcheck-cmd ${VALGRIND_EXE} --error-exitcode=42 --quiet
                    --leak-check=full --trace-children=yes)
 endif ()
 
-macro (pn_add_c_test test file)
-  add_executable (${test} ${file})
+macro (pn_add_c_test test)
+  add_executable (${test} ${ARGN})
   target_link_libraries (${test} qpid-proton)
   if (BUILD_WITH_CXX)
-    set_source_files_properties (${file} PROPERTIES LANGUAGE CXX)
+    set_source_files_properties (${ARGN} PROPERTIES LANGUAGE CXX)
   endif (BUILD_WITH_CXX)
   if (CMAKE_SYSTEM_NAME STREQUAL Windows)
     add_test (NAME ${test}
@@ -49,3 +49,4 @@ pn_add_c_test (c-reactor-tests reactor.c)
 pn_add_c_test (c-event-tests event.c)
 pn_add_c_test (c-data-tests data.c)
 pn_add_c_test (c-condition-tests condition.c)
+pn_add_c_test (c-proactor-tests proactor.c)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/proton-c/src/tests/proactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/proactor.c b/proton-c/src/tests/proactor.c
new file mode 100644
index 0000000..ae5b1f6
--- /dev/null
+++ b/proton-c/src/tests/proactor.c
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "test_tools.h"
+#include <proton/condition.h>
+#include <proton/connection.h>
+#include <proton/event.h>
+#include <proton/listener.h>
+#include <proton/proactor.h>
+#include <proton/transport.h>
+#include <stdlib.h>
+#include <string.h>
+
+static pn_millis_t timeout = 5*1000; /* timeout for hanging tests */
+
+static const char *localhost = "127.0.0.1"; /* host for connect/listen */
+
+/* Wait for the next single event, return its type */
+static pn_event_type_t wait_next(pn_proactor_t *proactor) {
+  pn_event_batch_t *events = pn_proactor_wait(proactor);
+  pn_event_type_t etype = pn_event_type(pn_event_batch_next(events));
+  pn_proactor_done(proactor, events);
+  return etype;
+}
+
+/* Get events until an event of `type` or a PN_TRANSPORT_CLOSED/PN_PROACTOR_TIMEOUT */
+static pn_event_type_t wait_for(pn_proactor_t *proactor, pn_event_type_t etype) {
+  while (true) {
+    pn_event_type_t t = wait_next(proactor);
+    if (t == etype || t == PN_PROACTOR_TIMEOUT) {
+      return t;
+    }
+  }
+}
+
+/* Test that interrupt and timeout events cause pn_proactor_wait() to return. */
+static void test_interrupt_timeout(test_t *t) {
+  pn_proactor_t *p = pn_proactor();
+  pn_proactor_interrupt(p);
+  pn_event_type_t etype = wait_next(p);
+  TEST_CHECK(t, PN_PROACTOR_INTERRUPT == etype, pn_event_type_name(etype));
+  pn_proactor_set_timeout(p, 1); /* very short timeout */
+  etype = wait_next(p);
+  TEST_CHECK(t, PN_PROACTOR_TIMEOUT == etype, pn_event_type_name(etype));
+  pn_proactor_free(p);
+}
+
+/* Test handler return value  */
+typedef enum {
+  H_CONTINUE,                   /**@<< handler wants more events */
+  H_FINISHED,                   /**@<< handler completed without error */
+  H_FAILED                      /**@<< handler hit an error and cannot continue */
+} handler_state_t;
+
+typedef handler_state_t (*test_handler_fn)(test_t *, pn_event_t*);
+
+/* Proactor and handler that take part in a test */
+typedef struct proactor_test_t {
+  test_t *t;
+  test_handler_fn handler;
+  pn_proactor_t *proactor;
+  handler_state_t state;                    /* Result of last handler call */
+} proactor_test_t;
+
+
+/* Initialize an array of proactor_test_t */
+static void proactor_test_init(proactor_test_t *pts, size_t n) {
+  for (proactor_test_t *pt = pts; pt < pts + n; ++pt) {
+    if (!pt->proactor) pt->proactor = pn_proactor();
+    pn_proactor_set_timeout(pt->proactor, timeout);
+    pt->state = H_CONTINUE;
+  }
+}
+
+/* Iterate over an array of proactors, draining or handling events with the non-blocking
+   pn_proactor_grab.  Continue till all handlers return H_FINISHED (and return 0) or one
+   returns H_FAILED  (and return non-0)
+*/
+int proactor_test_run(proactor_test_t *pts, size_t n) {
+  /* Make sure pts are initialized */
+  proactor_test_init(pts, n);
+  size_t finished = 0;
+  do {
+    finished = 0;
+    for (proactor_test_t *pt = pts; pt < pts + n; ++pt) {
+      pn_event_batch_t *events = pn_proactor_grab(pt->proactor);
+      if (events) {
+          pn_event_t *e;
+          while ((e = pn_event_batch_next(events))) {
+            if (pt->state == H_CONTINUE) {
+              pt->state = pt->handler(pt->t, e);
+            }
+          }
+          pn_proactor_done(pt->proactor, events);
+      }
+      switch (pt->state) {
+       case H_CONTINUE: break;
+       case H_FINISHED: ++finished; break;
+       case H_FAILED: return 1;
+      }
+    }
+  } while (finished < n);
+  return 0;
+}
+
+
+/* Simple test of client connect to a listening server */
+handler_state_t listen_connect_server(test_t *t, pn_event_t *e) {
+  switch (pn_event_type(e)) {
+    /* Ignore these events */
+   case PN_LISTENER_OPEN:
+   case PN_CONNECTION_LOCAL_OPEN:
+   case PN_CONNECTION_REMOTE_OPEN:
+   case PN_CONNECTION_BOUND:
+    return H_CONTINUE;
+
+    /* Act on these events */
+   case PN_LISTENER_ACCEPT:
+     pn_listener_accept(pn_event_listener(e), pn_connection());
+     return H_CONTINUE;
+   case PN_CONNECTION_INIT:
+    pn_connection_open(pn_event_connection(e));
+    return H_CONTINUE;
+   case PN_CONNECTION_REMOTE_CLOSE:
+    return H_FINISHED;
+
+   default:
+    TEST_CHECK(t, false, "unexpected event %s", pn_event_type_name(pn_event_type(e)));
+    return H_FAILED;
+    break;
+  }
+}
+
+handler_state_t listen_connect_client(test_t *t, pn_event_t *e) {
+  switch (pn_event_type(e)) {
+    /* Ignore these events */
+   case PN_CONNECTION_LOCAL_OPEN:
+   case PN_CONNECTION_BOUND:
+    return H_CONTINUE;
+
+    /* Act on these events */
+   case PN_CONNECTION_INIT:
+    pn_connection_open(pn_event_connection(e));
+    return H_CONTINUE;
+   case PN_CONNECTION_REMOTE_OPEN:
+    pn_connection_close(pn_event_connection(e));
+    return H_FINISHED;
+
+    /* Unexpected events */
+   default:
+    TEST_CHECK(t, false, "unexpected event %s", pn_event_type_name(pn_event_type(e)));
+    return H_FAILED;
+    break;
+  }
+}
+
+/* Simplest client/server interaction */
+static void test_listen_connect(test_t *t) {
+  proactor_test_t pts[] =  { { t, listen_connect_client }, { t, listen_connect_server } };
+  proactor_test_t *client = &pts[0], *server = &pts[1];
+  proactor_test_init(pts, 2);
+
+  int port = pick_port();
+  char port_str[16];
+  snprintf(port_str, sizeof(port_str), "%d", port);
+  pn_proactor_listen(server->proactor, pn_listener(), localhost, port_str, 4);
+  pn_event_type_t etype = wait_for(server->proactor, PN_LISTENER_OPEN);
+  if (TEST_CHECK(t, PN_LISTENER_OPEN == etype, pn_event_type_name(etype))) {
+    pn_proactor_connect(client->proactor, pn_connection(), localhost, port_str);
+    proactor_test_run(pts, 2);
+  }
+  pn_proactor_free(client->proactor);
+  pn_proactor_free(server->proactor);
+}
+
+/* Test error handling */
+static void test_listen_connect_error(test_t *t) {
+  pn_proactor_t *p = pn_proactor();
+  pn_proactor_set_timeout(p, timeout); /* In case of hang */
+  pn_connection_t *c = pn_connection();
+  pn_proactor_connect(p, c, "nosuchost", "nosuchport");
+  pn_event_type_t etype = wait_for(p, PN_TRANSPORT_CLOSED);
+  TEST_CHECK(t, PN_TRANSPORT_CLOSED == etype, pn_event_type_name(etype));
+  TEST_CHECK(t, pn_condition_is_set(pn_transport_condition(pn_connection_transport(c))), "");
+
+  pn_listener_t *l = pn_listener();
+  pn_proactor_listen(p, l, "nosuchost", "nosuchport", 1);
+  etype = wait_for(p, PN_LISTENER_CLOSE);
+  TEST_CHECK(t, PN_LISTENER_CLOSE == etype, pn_event_type_name(etype));
+  TEST_CHECK(t, pn_condition_is_set(pn_listener_condition(l)), "");
+
+  pn_proactor_free(p);
+}
+
+int main(int argv, char** argc) {
+  int failed = 0;
+  RUN_TEST(failed, t, test_interrupt_timeout(&t));
+  RUN_TEST(failed, t, test_listen_connect(&t));
+  RUN_TEST(failed, t, test_listen_connect_error(&t));
+  return failed;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/proton-c/src/tests/test_tools.h
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/test_tools.h b/proton-c/src/tests/test_tools.h
new file mode 100644
index 0000000..047ab42
--- /dev/null
+++ b/proton-c/src/tests/test_tools.h
@@ -0,0 +1,140 @@
+#ifndef TESTS_TEST_TOOLS_H
+#define TESTS_TEST_TOOLS_H
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <proton/type_compat.h>
+
+#include <stdarg.h>
+#include <stdio.h>
+#include <stdlib.h>
+
+/* Call via ASSERT macro. */
+static void assert_fail_(const char* cond, const char* file, int line) {
+  printf("%s:%d: Assertion failed: %s\n", file, line, cond);
+  abort();
+}
+
+/* Unconditional assert (does not depend on NDEBUG) for tests. */
+#define ASSERT(expr)                                            \
+  ((expr) ?  (void)0 : assert_fail_(#expr, __FILE__, __LINE__))
+
+/* Call via macro ASSERT_PERROR */
+static void assert_perror_fail_(const char* cond, const char* file, int line) {
+  perror(cond);
+  printf("%s:%d: Assertion failed (error above): %s\n", file, line, cond);
+  abort();
+}
+
+/* Like ASSERT but also calls perror() to print the current errno error. */
+#define ASSERT_PERROR(expr)                                             \
+  ((expr) ?  (void)0 : assert_perror_fail_(#expr, __FILE__, __LINE__))
+
+
+/* A struct to collect the results of a test.
+ * Declare and initialize with TEST_START(t) where t will be declared as a test_t
+ */
+typedef struct test_t {
+  const char* name;
+  int errors;
+} test_t;
+
+/* if !expr print the printf-style error and increment t->errors. Use via macros. Returns expr. */
+static inline bool test_check_(test_t *t, bool expr, const char *sexpr, const char *file, int line, const char* fmt, ...) {
+  if (!expr) {
+    va_list ap;
+    va_start(ap, fmt);
+    fprintf(stderr, "%s:%d:[%s] check failed: (%s)", file, line, t->name, sexpr);
+    if (fmt && *fmt) {
+      fprintf(stderr, " - ");
+      vfprintf(stderr, fmt, ap);
+    }
+    fprintf(stderr, "\n");
+    fflush(stderr);
+    ++t->errors;
+  }
+  return expr;
+}
+
+#define TEST_CHECK(TEST, EXPR, ...) test_check_((TEST), (EXPR), #EXPR, __FILE__, __LINE__, __VA_ARGS__)
+
+/* T is name of a test_t variable, EXPR is the test expression (which should update T)
+   FAILED is incremented if the test has errors
+*/
+#define RUN_TEST(FAILED, T, EXPR) do {                          \
+    printf("TEST: %s\n", #EXPR);                                \
+    fflush(stdout);                                             \
+    test_t T = { #EXPR, 0 };                                    \
+    (EXPR);                                                     \
+    if (T.errors) {                                             \
+      printf("FAIL: %s (%d errors)\n", #EXPR, T.errors);        \
+      ++(FAILED);                                               \
+    }                                                           \
+  } while(0)
+
+#if defined(WIN32)
+
+#include <winsock2.h>
+#include <ws2tcpip.h>
+typedef SOCKET sock_t;
+static inline void sock_close(sock_t sock) { closesocket(sock); }
+
+#else
+
+#include <netdb.h>
+#include <netinet/in.h>
+#include <time.h>
+#include <unistd.h>
+
+static int port_in_use(int port) {
+  /* Attempt to bind a dummy socket to test if the port is in use. */
+  int dummy_socket = socket(AF_INET, SOCK_STREAM, 0);
+  ASSERT_PERROR(dummy_socket >= 0);
+  struct sockaddr_in addr = {0};
+  addr.sin_family = AF_INET;
+  addr.sin_addr.s_addr = INADDR_ANY;
+  addr.sin_port = htons(port);
+  int ret = bind(dummy_socket, (struct sockaddr *) &addr, sizeof(addr));
+  close(dummy_socket);
+  return ret < 0;
+}
+
+/* Try to pick an unused port by picking random ports till we find one
+   that is not in use. This is not foolproof as some other process may
+   grab it before the caller binds or connects.
+*/
+static int pick_port(void) {
+  srand(time(NULL));
+  static int MAX_TRIES = 10;
+  int port = -1;
+  int i = 0;
+  do {
+    /* Pick a random port. Avoid the standard OS ephemeral port range used by
+       bind(0) - ports can be allocated and re-allocated very rapidly there.
+    */
+    port =  (rand()%10000) + 10000;
+  } while (i++ < MAX_TRIES && port_in_use(port));
+  ASSERT(i < MAX_TRIES && "cannot pick a port");
+  return port;
+}
+
+#endif
+
+#endif // TESTS_TEST_TOOLS_H


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org