You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/02/27 18:04:54 UTC

[6/6] qpid-proton git commit: PROTON-1413: c proactor fix assertion errors, simplify code

PROTON-1413: c proactor fix assertion errors, simplify code

- expanded & improved tests/proactor.c tests and tests/test_tools.h framework
- drop wakeup/action callbacks
- simpler listening logic using locks for concurrent leader/worker access
- centralize logic for socket processing and error handling


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/2dae68d6
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/2dae68d6
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/2dae68d6

Branch: refs/heads/master
Commit: 2dae68d6a2a98f457ca7691f74d56296431de866
Parents: 105b939
Author: Alan Conway <ac...@redhat.com>
Authored: Mon Feb 27 12:54:15 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Mon Feb 27 12:55:12 2017 -0500

----------------------------------------------------------------------
 proton-c/src/proactor/libuv.c   | 713 +++++++++++++++++------------------
 proton-c/src/tests/proactor.c   | 348 +++++++++--------
 proton-c/src/tests/test_tools.h |  99 +++--
 3 files changed, 584 insertions(+), 576 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2dae68d6/proton-c/src/proactor/libuv.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c
index 322f353..2fafbb3 100644
--- a/proton-c/src/proactor/libuv.c
+++ b/proton-c/src/proactor/libuv.c
@@ -43,34 +43,25 @@
 #include <string.h>
 
 /*
-  libuv functions are thread unsafe. The exception is uv_async_send(), a thread safe
-  call that we use to make uv_run() return.
+  libuv functions are thread unsafe, we use a"leader-worker-follower" model as follows:
 
-  To provide concurrency proactor uses a "leader-worker-follower" model, threads take
-  turns at the roles:
+  - At most one thread at a time is the "leader". The leader runs the UV loop till there
+  are events to process and then becomes a "worker"n
 
-  - a single "leader" thread uses libuv, it runs the uv_loop the in short bursts to
-  generate work. Once there is work it becomes becomes a "worker" thread, another thread
-  takes over as leader.
+  - Concurrent "worker" threads process events for separate connections or listeners.
+  When they run out of work they become "followers"
 
-  - "workers" handle events for separate connections or listeners concurrently. They do as
-  much work as they can, when none is left they become "followers"
+  - A "follower" is idle, waiting for work. When the leader becomes a worker, one follower
+  takes over as the new leader.
 
-  - "followers" wait for the leader to generate work. One follower becomes the new leader,
-  the others become workers or continue to follow till they can get work.
-
-  Any thread in a pool can take on any role necessary at run-time. All the work generated
-  by an IO wake-up for a single connection can be processed in a single single worker
-  thread to minimize context switching.
+  Any thread that calls pn_proactor_wait() or pn_proactor_get() can take on any of the
+  roles as required at run-time. Monitored sockets (connections or listeners) are passed
+  between threads on thread-safe queues.
 
   Function naming:
-  - on_* - called in leader thread via  uv_run().
-  - leader_* - called in leader thread (either leader_q processing or from an on_ function)
+  - on_*() - libuv callbacks, called in leader thread via  uv_run().
+  - leader_* - only called in leader thread from
   - *_lh - called with the relevant lock held
-
-  LIFECYCLE: pconnection_t and pn_listener_t objects must not be deleted until all their
-  UV handles have received a close callback. Freeing resources is initiated by uv_close()
-  of the uv_tcp_t handle, and executed in an on_close() handler when it is safe.
 */
 
 const char *AMQP_PORT = "5672";
@@ -86,13 +77,6 @@ PN_HANDLE(PN_PROACTOR)
 PN_STRUCT_CLASSDEF(pn_proactor, CID_pn_proactor)
 PN_STRUCT_CLASSDEF(pn_listener, CID_pn_listener)
 
-/* A psocket (connection or listener) has the following mutually exclusive states. */
-typedef enum {
-  ON_WORKER,               /* On worker_q or in use by user code in worker thread  */
-  ON_LEADER,               /* On leader_q or in use the leader loop  */
-  ON_UV                    /* Scheduled for a UV event, or in use by leader thread in on_ handler*/
-} psocket_state_t;
-
 /* common to connection and listener */
 typedef struct psocket_t {
   /* Immutable */
@@ -103,14 +87,15 @@ typedef struct psocket_t {
 
   /* Protected by proactor.lock */
   struct psocket_t* next;
-  psocket_state_t state;
+  bool working;                      /* Owned by a worker thread */
   void (*action)(struct psocket_t*); /* deferred action for leader */
-  void (*wakeup)(struct psocket_t*); /* wakeup action for leader */
 
   /* Only used by leader thread when it owns the psocket */
   uv_tcp_t tcp;
 } psocket_t;
 
+typedef struct queue { psocket_t *front, *back; } queue;
+
 /* Special value for psocket.next pointer when socket is not on any any list. */
 psocket_t UNLISTED;
 
@@ -118,8 +103,8 @@ static void psocket_init(psocket_t* ps, pn_proactor_t* p, bool is_conn, const ch
   ps->proactor = p;
   ps->next = &UNLISTED;
   ps->is_conn = is_conn;
-  ps->tcp.data = ps;
-  ps->state = ON_WORKER;
+  ps->tcp.data = NULL;          /* Set in leader_init */
+  ps->working = true;
 
   /* For platforms that don't know about "amqp" and "amqps" service names. */
   if (port && strcmp(port, AMQP_PORT_NAME) == 0)
@@ -136,7 +121,7 @@ static inline const char* fixstr(const char* str) {
   return str[0] == '\001' ? NULL : str;
 }
 
-/* Holds a psocket and a pn_connection_driver  */
+/* a connection socket  */
 typedef struct pconnection_t {
   psocket_t psocket;
 
@@ -149,31 +134,33 @@ typedef struct pconnection_t {
   uv_write_t write;
   uv_shutdown_t shutdown;
   size_t writing;               /* size of pending write request, 0 if none pending */
-  bool server;                  /* accepting not connecting */
+
+  /* Locked for thread-safe access */
+  uv_mutex_t lock;
+  bool wake;                    /* pn_connection_wake() was called */
 } pconnection_t;
 
 
-/* pn_listener_t with a psocket_t  */
+/* a listener socket */
 struct pn_listener_t {
   psocket_t psocket;
 
   /* Only used by owner thread */
-  pconnection_t *accepting;     /* set in worker, used in UV loop for accept */
-  pn_condition_t *condition;
-  pn_collector_t *collector;
   pn_event_batch_t batch;
   pn_record_t *attachments;
   void *context;
   size_t backlog;
 
-  /* Only used in leader thread */
-  size_t connections;           /* number of connections waiting to be accepted  */
-  int err;                      /* uv error code, 0 = OK, UV_EOF = closed */
-  const char *what;             /* static description string */
+  /* Locked for thread-safe access. uv_listen can't be stopped or cancelled so we can't
+   * detach a listener from the UV loop to prevent concurrent access.
+   */
+  uv_mutex_t lock;
+  pn_condition_t *condition;
+  pn_collector_t *collector;
+  queue          accept;        /* pconnection_t for uv_accept() */
+  bool closed;
 };
 
-typedef struct queue { psocket_t *front, *back; } queue;
-
 struct pn_proactor_t {
   /* Leader thread  */
   uv_cond_t cond;
@@ -199,19 +186,15 @@ struct pn_proactor_t {
   bool batch_working;          /* batch is being processed in a worker thread */
 };
 
-/* Push ps to back of q. Must not be on a different queue */
-static bool push_lh(queue *q, psocket_t *ps) {
-  if (ps->next == &UNLISTED) {
-    ps->next = NULL;
-    if (!q->front) {
-      q->front = q->back = ps;
-    } else {
-      q->back->next = ps;
-      q->back =  ps;
-    }
-    return true;
+static void push_lh(queue *q, psocket_t *ps) {
+  assert(ps->next == &UNLISTED);
+  ps->next = NULL;
+  if (!q->front) {
+    q->front = q->back = ps;
+  } else {
+    q->back->next = ps;
+    q->back =  ps;
   }
-  return false;
 }
 
 /* Pop returns front of q or NULL if empty */
@@ -229,51 +212,26 @@ static inline void notify(pn_proactor_t* p) {
   uv_async_send(&p->async);
 }
 
-static void to_leader_lh(psocket_t *ps) {
-  if (push_lh(&ps->proactor->leader_q, ps)) {
-    ps->state = ON_LEADER;
-  }
-}
-
-/* Queue an action for the leader thread */
-static void to_leader(psocket_t *ps, void (*action)(psocket_t*)) {
-  uv_mutex_lock(&ps->proactor->lock);
-  ps->action = action;
-  to_leader_lh(ps);
-  uv_mutex_unlock(&ps->proactor->lock);
-  notify(ps->proactor);
-}
-
-/* Push to the worker thread */
-static void to_worker(psocket_t *ps) {
+/* Notify that this socket needs attention from the leader at the next opportunity */
+static void psocket_notify(psocket_t *ps) {
   uv_mutex_lock(&ps->proactor->lock);
-  if (push_lh(&ps->proactor->worker_q, ps)) {
-      ps->state = ON_WORKER;
-  }
-  notify(ps->proactor);
-  uv_mutex_unlock(&ps->proactor->lock);
-}
-
-/* Set state to ON_UV */
-static void to_uv(psocket_t *ps) {
-  uv_mutex_lock(&ps->proactor->lock);
-  if (ps->next == &UNLISTED) {
-    ps->state = ON_UV;
+  /* Only queue if not working and not already queued */
+  if (!ps->working && ps->next == &UNLISTED) {
+    push_lh(&ps->proactor->leader_q, ps);
+    notify(ps->proactor);
   }
   uv_mutex_unlock(&ps->proactor->lock);
 }
 
-/* Called in any thread to set a wakeup action */
-static void wakeup(psocket_t *ps, void (*action)(psocket_t*)) {
+/* Notify the leader of a newly-created socket */
+static void psocket_start(psocket_t *ps) {
   uv_mutex_lock(&ps->proactor->lock);
-  ps->wakeup = action;
-  /* If ON_WORKER we'll do the wakeup in pn_proactor_done() */
-  if (ps->next == &UNLISTED && ps->state != ON_WORKER) {
+  if (ps->next == &UNLISTED) {  /* No-op if already queued */
+    ps->working = false;
     push_lh(&ps->proactor->leader_q, ps);
-    ps->state = ON_LEADER;      /* Otherwise notify the leader */
+    notify(ps->proactor);
+    uv_mutex_unlock(&ps->proactor->lock);
   }
-  uv_async_send(&ps->proactor->async); /* Wake leader */
-  uv_mutex_unlock(&ps->proactor->lock);
 }
 
 static inline pconnection_t *as_pconnection(psocket_t* ps) {
@@ -318,6 +276,14 @@ static inline pconnection_t *batch_pconnection(pn_event_batch_t *batch) {
   return d ? (pconnection_t*)((char*)d - offsetof(pconnection_t, driver)) : NULL;
 }
 
+static inline psocket_t *batch_psocket(pn_event_batch_t *batch) {
+  pconnection_t *pc = batch_pconnection(batch);
+  if (pc) return &pc->psocket;
+  pn_listener_t *l = batch_listener(batch);
+  if (l) return &l->psocket;
+  return NULL;
+}
+
 static void leader_count(pn_proactor_t *p, int change) {
   uv_mutex_lock(&p->lock);
   p->count += change;
@@ -340,6 +306,12 @@ static void on_close_pconnection_final(uv_handle_t *h) {
   pconnection_free((pconnection_t*)h->data);
 }
 
+static void uv_safe_close(uv_handle_t *h, uv_close_cb cb) {
+  if (!uv_is_closing(h)) {
+    uv_close(h, cb);
+  }
+}
+
 /* Close event for uv_tcp_t of a psocket_t */
 static void on_close_psocket(uv_handle_t *h) {
   psocket_t *ps = (psocket_t*)h->data;
@@ -348,7 +320,7 @@ static void on_close_psocket(uv_handle_t *h) {
     pconnection_t *pc = as_pconnection(ps);
     uv_timer_stop(&pc->timer);
     /* Delay the free till the timer handle is also closed */
-    uv_close((uv_handle_t*)&pc->timer, on_close_pconnection_final);
+    uv_safe_close((uv_handle_t*)&pc->timer, on_close_pconnection_final);
   } else {
     pn_listener_free(as_listener(ps));
   }
@@ -362,48 +334,49 @@ static pconnection_t *get_pconnection(pn_connection_t* c) {
   return (pconnection_t*) pn_record_get(r, PN_PROACTOR);
 }
 
-static void pconnection_to_worker(pconnection_t *pc);
-static void listener_to_worker(pn_listener_t *l);
-
-int pconnection_error(pconnection_t *pc, int err, const char* what) {
-  if (err) {
-    pn_connection_driver_t *driver = &pc->driver;
-    pn_connection_driver_bind(driver); /* Bind so errors will be reported */
+static void pconnection_error(pconnection_t *pc, int err, const char* what) {
+  assert(err);
+  pn_connection_driver_t *driver = &pc->driver;
+  pn_connection_driver_bind(driver); /* Bind so errors will be reported */
+  if (!pn_condition_is_set(pn_transport_condition(driver->transport))) {
     pn_connection_driver_errorf(driver, uv_err_name(err), "%s %s:%s: %s",
                                 what, fixstr(pc->psocket.host), fixstr(pc->psocket.port),
                                 uv_strerror(err));
-    pn_connection_driver_close(driver);
-    pconnection_to_worker(pc);
   }
-  return err;
+  pn_connection_driver_close(driver);
 }
 
-static int listener_error(pn_listener_t *l, int err, const char* what) {
-  if (err) {
-    l->err = err;
-    l->what = what;
-    listener_to_worker(l);
+static void listener_error(pn_listener_t *l, int err, const char* what) {
+  assert(err);
+  uv_mutex_lock(&l->lock);
+  if (!pn_condition_is_set(l->condition)) {
+    pn_condition_format(l->condition, uv_err_name(err), "%s %s:%s: %s",
+                        what, fixstr(l->psocket.host), fixstr(l->psocket.port),
+                        uv_strerror(err));
   }
-  return err;
+  uv_mutex_unlock(&l->lock);
+  pn_listener_close(l);
 }
 
-static int psocket_error(psocket_t *ps, int err, const char* what) {
-  if (err) {
-    if (ps->is_conn) {
-      pconnection_error(as_pconnection(ps), err, "initialization");
-    } else {
-      listener_error(as_listener(ps), err, "initialization");
-    }
+static void psocket_error(psocket_t *ps, int err, const char* what) {
+  if (ps->is_conn) {
+    pconnection_error(as_pconnection(ps), err, "initialization");
+  } else {
+    listener_error(as_listener(ps), err, "initialization");
   }
-  return err;
 }
 
+/* FIXME aconway 2017-02-25: split socket/queue */
+
 /* psocket uv-initialization */
 static int leader_init(psocket_t *ps) {
-  ps->state = ON_LEADER;
+  ps->working = false;
+  ps->tcp.data = ps;
   leader_count(ps->proactor, +1);
   int err = uv_tcp_init(&ps->proactor->loop, &ps->tcp);
-  if (!err) {
+  if (err) {
+    psocket_error(ps, err, "initialization");
+  } else {
     pconnection_t *pc = as_pconnection(ps);
     if (pc) {
       pc->connect.data = ps;
@@ -412,50 +385,63 @@ static int leader_init(psocket_t *ps) {
         pc->timer.data = ps;
       }
     }
-  } else {
-    psocket_error(ps, err, "initialization");
   }
   return err;
 }
 
+/* Check if a pconnection has work for a worker thread. Called by owning thread. */
+static bool pconnection_needs_work(pconnection_t *pc) {
+  if (!pc->writing) {           /* Can't detach for work while write is pending */
+    /* Check for wake requests */
+    uv_mutex_lock(&pc->lock);
+    bool wake = pc->wake;
+    pc->wake = false;
+    uv_mutex_unlock(&pc->lock);
+    if (wake) {
+      pn_connection_t *c = pc->driver.connection;
+      pn_collector_put(pn_connection_collector(c), PN_OBJECT, c, PN_CONNECTION_WAKE);
+    }
+    return pn_connection_driver_has_event(&pc->driver);
+  }
+  return false;
+}
+
+/* Detach a connection from the UV loop so it can be used safely by a worker */
+void pconnection_detach(pconnection_t *pc) {
+  uv_read_stop((uv_stream_t*)&pc->psocket.tcp);
+  uv_timer_stop(&pc->timer);
+  psocket_notify(&pc->psocket);
+}
+
 /* Outgoing connection */
 static void on_connect(uv_connect_t *connect, int err) {
   pconnection_t *pc = (pconnection_t*)connect->data;
-  if (!pconnection_error(pc, err, "on connect to")) {
-    pconnection_to_worker(pc);
-  }
+  assert(!pc->psocket.working);
+  if (err) pconnection_error(pc, err, "on connect to");
+  pconnection_detach(pc);       /* FIXME aconway 2017-02-25: detach AFTER error or vv */
 }
 
 /* Incoming connection ready to be accepted */
 static void on_connection(uv_stream_t* server, int err) {
-  /* Unlike most on_* functions, this one can be called by the leader thread when the
-   * listener is ON_WORKER, because there's no way to stop libuv from calling
-   * on_connection().  Just increase a counter and generate events in to_worker.
+  /* Unlike most on_* functions, this can be called by the leader thread when the listener
+   * is ON_WORKER or ON_LEADER, because there's no way to stop libuv from calling
+   * on_connection(). Update the state of the listener and queue it for leader attention.
    */
   pn_listener_t *l = (pn_listener_t*) server->data;
-  l->err = err;
-  if (!err) ++l->connections;
-  listener_to_worker(l);        /* If already ON_WORKER it will stay there */
-}
-
-static void leader_accept(pn_listener_t * l) {
-  assert(l->accepting);
-  pconnection_t *pc = l->accepting;
-  l->accepting = NULL;
-  int err = leader_init(&pc->psocket);
-  if (!err) {
-    err = uv_accept((uv_stream_t*)&l->psocket.tcp, (uv_stream_t*)&pc->psocket.tcp);
-  }
-  if (!err) {
-    pconnection_to_worker(pc);
+  if (err) {
+    listener_error(l, err, "on incoming connection");
   } else {
-    pconnection_error(pc, err, "accepting from");
-    listener_error(l, err, "accepting from");
+    uv_mutex_lock(&l->lock);
+    pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_ACCEPT);
+    uv_mutex_unlock(&l->lock);
+    psocket_notify(&l->psocket);
   }
 }
 
+// #error FIXME REVIW UPWARDS FROM HERE ^^^^
+
+/* Common address resolution for leader_listen and leader_connect */
 static int leader_resolve(psocket_t *ps, uv_getaddrinfo_t *info, bool server) {
-  assert(ps->state == ON_LEADER);
   int err = leader_init(ps);
   struct addrinfo hints = { 0 };
   if (server) hints.ai_flags = AI_PASSIVE;
@@ -465,27 +451,23 @@ static int leader_resolve(psocket_t *ps, uv_getaddrinfo_t *info, bool server) {
   return err;
 }
 
-static void leader_connect(psocket_t *ps) {
-  assert(ps->state == ON_LEADER);
-  pconnection_t *pc = as_pconnection(ps);
+static void leader_connect(pconnection_t *pc) {
   uv_getaddrinfo_t info;
-  int err = leader_resolve(ps, &info, false);
+  int err = leader_resolve(&pc->psocket, &info, false);
   if (!err) {
     err = uv_tcp_connect(&pc->connect, &pc->psocket.tcp, info.addrinfo->ai_addr, on_connect);
     uv_freeaddrinfo(info.addrinfo);
   }
-  if (!err) {
-    ps->state = ON_UV;
-  } else {
+  if (err) {
     pconnection_error(pc, err, "connecting to");
+  } else {
+    pn_connection_open(pc->driver.connection);
   }
 }
 
-static void leader_listen(psocket_t *ps) {
-  assert(ps->state == ON_LEADER);
-  pn_listener_t *l = as_listener(ps);
+static void leader_listen(pn_listener_t *l) {
   uv_getaddrinfo_t info;
-  int err = leader_resolve(ps, &info, true);
+  int err = leader_resolve(&l->psocket, &info, true);
   if (!err) {
     err = uv_tcp_bind(&l->psocket.tcp, info.addrinfo->ai_addr, 0);
     uv_freeaddrinfo(info.addrinfo);
@@ -493,17 +475,53 @@ static void leader_listen(psocket_t *ps) {
   if (!err) {
     err = uv_listen((uv_stream_t*)&l->psocket.tcp, l->backlog, on_connection);
   }
-  if (!err) {
-    pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_OPEN);
-    listener_to_worker(l);      /* Let worker see the OPEN event */
+  uv_mutex_lock(&l->lock);
+  /* Always put an OPEN event for symmetry, even if we immediately close with err */
+  pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_OPEN);
+  uv_mutex_unlock(&l->lock);
+  if (err) listener_error(l, err, "listening on");
+}
+
+static bool listener_needs_work(pn_listener_t *l) {
+  uv_mutex_lock(&l->lock);
+  bool needs_work = pn_collector_peek(l->collector);
+  uv_mutex_unlock(&l->lock);
+  return needs_work;
+}
+
+static bool listener_finished_lh(pn_listener_t *l) {
+  return l->closed && !pn_collector_peek(l->collector) && !l->accept.front;
+}
+
+static bool leader_process_listener(pn_listener_t * l) {
+  if (l->psocket.tcp.data == NULL) {
+    leader_listen(l);
   } else {
-    listener_error(l, err, "listening on");
+    uv_mutex_lock(&l->lock);
+    pconnection_t *pc;
+    while (!listener_finished_lh(l) && (pc = (pconnection_t*)pop_lh(&l->accept))) {
+      uv_mutex_unlock(&l->lock);
+      int err = leader_init(&pc->psocket);
+      if (!err) err = uv_accept((uv_stream_t*)&l->psocket.tcp, (uv_stream_t*)&pc->psocket.tcp);
+      if (err) {
+        listener_error(l, err, "accepting from");
+        psocket_notify(&l->psocket);
+        pconnection_error(pc, err, "accepting from");
+      }
+      psocket_start(&pc->psocket);
+      uv_mutex_lock(&l->lock);
+    }
+    if (listener_finished_lh(l)) {
+      uv_safe_close((uv_handle_t*)&l->psocket.tcp, on_close_psocket);
+    }
+    uv_mutex_unlock(&l->lock);
   }
+  return listener_needs_work(l);
 }
 
 /* Generate tick events and return millis till next tick or 0 if no tick is required */
 static pn_millis_t leader_tick(pconnection_t *pc) {
-  assert(pc->psocket.state != ON_WORKER);
+  assert(!pc->psocket.working);
   uint64_t now = uv_now(pc->timer.loop);
   uint64_t next = pn_transport_tick(pc->driver.transport, now);
   return next ? next - now : 0;
@@ -511,38 +529,36 @@ static pn_millis_t leader_tick(pconnection_t *pc) {
 
 static void on_tick(uv_timer_t *timer) {
   pconnection_t *pc = (pconnection_t*)timer->data;
-  pn_millis_t next = leader_tick(pc); /* May generate events */
-  if (pn_connection_driver_has_event(&pc->driver)) {
-    pconnection_to_worker(pc);
-  } else if (next) {
-    uv_timer_start(&pc->timer, on_tick, next, 0);
-  }
+  assert(!pc->psocket.working);
+  leader_tick(pc);
+  pconnection_detach(pc);
+  /* FIXME aconway 2017-02-25: optimize - don't detach if no work. Need to check for finished? */
 }
 
 static void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
   pconnection_t *pc = (pconnection_t*)stream->data;
+  assert(!pc->psocket.working);
   if (nread >= 0) {
     pn_connection_driver_read_done(&pc->driver, nread);
-    pconnection_to_worker(pc);
   } else if (nread == UV_EOF) { /* hangup */
     pn_connection_driver_read_close(&pc->driver);
-    pconnection_to_worker(pc);
   } else {
     pconnection_error(pc, nread, "on read from");
   }
+  pconnection_detach(pc);
 }
 
 static void on_write(uv_write_t* write, int err) {
   pconnection_t *pc = (pconnection_t*)write->data;
-  if (err == 0) {
-    pn_connection_driver_write_done(&pc->driver, pc->writing);
-    pconnection_to_worker(pc);
-  } else if (err == UV_ECANCELED) {
-    pconnection_to_worker(pc);
-  } else {
+  assert(!pc->psocket.working);
+  size_t size = pc->writing;
+  pc->writing = 0;
+  if (err) {
     pconnection_error(pc, err, "on write to");
+  } else {
+    pn_connection_driver_write_done(&pc->driver, size);
   }
-  pc->writing = 0;
+  pconnection_detach(pc);
 }
 
 static void on_timeout(uv_timer_t *timer) {
@@ -552,110 +568,13 @@ static void on_timeout(uv_timer_t *timer) {
   uv_mutex_unlock(&p->lock);
 }
 
-// Read buffer allocation function for uv, just returns the transports read buffer.
+/* Read buffer allocation function for uv, just returns the transports read buffer. */
 static void alloc_read_buffer(uv_handle_t* stream, size_t size, uv_buf_t* buf) {
   pconnection_t *pc = (pconnection_t*)stream->data;
   pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
   *buf = uv_buf_init(rbuf.start, rbuf.size);
 }
 
-static void pconnection_to_uv(pconnection_t *pc) {
-  to_uv(&pc->psocket);          /* Assume we're going to UV unless sent elsewhere */
-  if (pn_connection_driver_finished(&pc->driver)) {
-    if (!uv_is_closing((uv_handle_t*)&pc->psocket.tcp)) {
-      uv_close((uv_handle_t*)&pc->psocket.tcp, on_close_psocket);
-    }
-    return;
-  }
-  pn_millis_t next_tick = leader_tick(pc);
-  pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
-  pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver);
-  if (pn_connection_driver_has_event(&pc->driver)) {
-    to_worker(&pc->psocket);  /* Ticks/buffer checks generated events */
-    return;
-  }
-  if (next_tick &&
-      pconnection_error(pc, uv_timer_start(&pc->timer, on_tick, next_tick, 0), "timer start")) {
-    return;
-  }
-  if (wbuf.size > 0) {
-    uv_buf_t buf = uv_buf_init((char*)wbuf.start, wbuf.size);
-    if (pconnection_error(
-          pc, uv_write(&pc->write, (uv_stream_t*)&pc->psocket.tcp, &buf, 1, on_write), "write"))
-      return;
-    pc->writing = wbuf.size;
-  } else if (pn_connection_driver_write_closed(&pc->driver)) {
-    pc->shutdown.data = &pc->psocket;
-    if (pconnection_error(
-          pc, uv_shutdown(&pc->shutdown, (uv_stream_t*)&pc->psocket.tcp, NULL), "shutdown write"))
-      return;
-  }
-  if (rbuf.size > 0) {
-    if (pconnection_error(
-          pc, uv_read_start((uv_stream_t*)&pc->psocket.tcp, alloc_read_buffer, on_read), "read"))
-        return;
-  }
-}
-
-static void listener_to_uv(pn_listener_t *l) {
-  to_uv(&l->psocket);           /* Assume we're going to UV unless sent elsewhere */
-  if (l->err) {
-    if (!uv_is_closing((uv_handle_t*)&l->psocket.tcp)) {
-      uv_close((uv_handle_t*)&l->psocket.tcp, on_close_psocket);
-    }
-  } else {
-    if (l->accepting) {
-      leader_accept(l);
-    }
-    if (l->connections) {
-      listener_to_worker(l);
-    }
-  }
-}
-
-/* Monitor a psocket_t in the UV loop */
-static void psocket_to_uv(psocket_t *ps) {
-  if (ps->is_conn) {
-    pconnection_to_uv(as_pconnection(ps));
-  } else {
-    listener_to_uv(as_listener(ps));
-  }
-}
-
-/* Detach a connection from IO and put it on the worker queue */
-static void pconnection_to_worker(pconnection_t *pc) {
-  /* Can't go to worker if a write is outstanding or the batch is empty */
-  if (!pc->writing && pn_connection_driver_has_event(&pc->driver)) {
-    uv_read_stop((uv_stream_t*)&pc->psocket.tcp);
-    uv_timer_stop(&pc->timer);
-  }
-  to_worker(&pc->psocket);
-}
-
-/* Can't really detach a listener, as on_connection can always be called.
-   Generate events here safely.
-*/
-static void listener_to_worker(pn_listener_t *l) {
-  if (pn_collector_peek(l->collector)) { /* Already have events */
-    to_worker(&l->psocket);
-  } else if (l->err) {
-    if (l->err != UV_EOF) {
-      pn_condition_format(l->condition, uv_err_name(l->err), "%s %s:%s: %s",
-                          l->what, fixstr(l->psocket.host), fixstr(l->psocket.port),
-                          uv_strerror(l->err));
-    }
-    l->err = 0;
-    pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_CLOSE);
-    to_worker(&l->psocket);
-  } else if (l->connections) {    /* Generate accept events one at a time */
-    --l->connections;
-    pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_ACCEPT);
-    to_worker(&l->psocket);
-  } else {
-    listener_to_uv(l);
-  }
-}
-
 /* Set the event in the proactor's batch  */
 static pn_event_batch_t *proactor_batch_lh(pn_proactor_t *p, pn_event_type_t t) {
   pn_collector_put(p->collector, pn_proactor__class(), p, t);
@@ -663,36 +582,27 @@ static pn_event_batch_t *proactor_batch_lh(pn_proactor_t *p, pn_event_type_t t)
   return &p->batch;
 }
 
-void leader_wake_connection(psocket_t *ps) {
-  assert(ps->state == ON_LEADER);
-  pconnection_t *pc = as_pconnection(ps);
-  pn_connection_t *c = pc->driver.connection;
-  pn_collector_put(pn_connection_collector(c), PN_OBJECT, c, PN_CONNECTION_WAKE);
-  pconnection_to_worker(pc);
-}
-
 static void on_stopping(uv_handle_t* h, void* v) {
   /* Close all the TCP handles. on_close_psocket will close any other handles if needed */
-  if (h->type == UV_TCP && !uv_is_closing(h)) {
-    uv_close(h, on_close_psocket);
+  if (h->type == UV_TCP) {
+    uv_safe_close(h, on_close_psocket);
   }
 }
 
 static pn_event_t *log_event(void* p, pn_event_t *e) {
   if (e) {
-    pn_logf("[%p]:(%s)\n", (void*)p, pn_event_type_name(pn_event_type(e)));
+    pn_logf("[%p]:(%s)", (void*)p, pn_event_type_name(pn_event_type(e)));
   }
   return e;
 }
 
 static pn_event_t *listener_batch_next(pn_event_batch_t *batch) {
   pn_listener_t *l = batch_listener(batch);
-  assert(l->psocket.state == ON_WORKER);
-  pn_event_t *prev = pn_collector_prev(l->collector);
-  if (prev && pn_event_type(prev) == PN_LISTENER_CLOSE) {
-    l->err = UV_EOF;
-  }
-  return log_event(l, pn_collector_next(l->collector));
+  assert(l->psocket.working);
+  uv_mutex_lock(&l->lock);
+  pn_event_t *e = pn_collector_next(l->collector);
+  uv_mutex_unlock(&l->lock);
+  return log_event(l, e);
 }
 
 static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) {
@@ -702,25 +612,18 @@ static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) {
 }
 
 static void pn_listener_free(pn_listener_t *l) {
-  /* No  assert(l->psocket.state == ON_WORKER);  can be called during shutdown */
+  /* No  assert(l->psocket.working);  can be called during shutdown */
   if (l) {
     if (l->collector) pn_collector_free(l->collector);
     if (l->condition) pn_condition_free(l->condition);
     if (l->attachments) pn_free(l->attachments);
+    assert(!l->accept.front);
     free(l);
   }
 }
 
-void leader_listener_close(psocket_t *ps) {
-  assert(ps->state = ON_LEADER);
-  pn_listener_t *l = (pn_listener_t*)ps;
-  l->err = UV_EOF;
-  listener_to_uv(l);
-}
-
-/* Return the next event batch or 0 if no events are available in the worker_q */
+/* Return the next event batch or NULL if no events are available */
 static pn_event_batch_t *get_batch_lh(pn_proactor_t *p) {
-  /* FIXME aconway 2017-02-21: generate these in parallel? */
   if (!p->batch_working) {       /* Can generate proactor events */
     if (p->inactive) {
       p->inactive = false;
@@ -736,49 +639,99 @@ static pn_event_batch_t *get_batch_lh(pn_proactor_t *p) {
     }
   }
   for (psocket_t *ps = pop_lh(&p->worker_q); ps; ps = pop_lh(&p->worker_q)) {
-    assert(ps->state == ON_WORKER);
+    assert(ps->working);
     if (ps->is_conn) {
       return &as_pconnection(ps)->driver.batch;
     } else {                    /* Listener */
       return &as_listener(ps)->batch;
     }
   }
-  return 0;
+  return NULL;
+}
+
+/* Process a pconnection, return true if it has work */
+static bool leader_process_pconnection(pconnection_t *pc) {
+  if (pc->psocket.tcp.data == NULL) {
+    leader_connect(pc);
+  }
+  pn_millis_t next_tick = leader_tick(pc);
+  pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
+  pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver);
+  if (pconnection_needs_work(pc)) {
+    return true;                /* Don't wait on IO while there is pending work */
+  }
+  if (pn_connection_driver_finished(&pc->driver)) {
+    uv_safe_close((uv_handle_t*)&pc->psocket.tcp, on_close_psocket);
+    return false;
+  }
+  /* Issue async IO requests */
+  int err = 0;
+  const char *what = NULL;
+
+  if (!err && next_tick) {
+    what = "connection timer start";
+    err = uv_timer_start(&pc->timer, on_tick, next_tick, 0);
+  }
+  if (!err && !pc->writing) {
+    what = "write";
+    if (wbuf.size > 0) {
+      uv_buf_t buf = uv_buf_init((char*)wbuf.start, wbuf.size);
+      err = uv_write(&pc->write, (uv_stream_t*)&pc->psocket.tcp, &buf, 1, on_write);
+      if (!err) {
+        pc->writing = wbuf.size;
+      }
+    } else if (pn_connection_driver_write_closed(&pc->driver)) {
+      uv_shutdown(&pc->shutdown, (uv_stream_t*)&pc->psocket.tcp, NULL);
+    }
+  }
+  if (!err && rbuf.size > 0) {
+    what = "read";
+    err = uv_read_start((uv_stream_t*)&pc->psocket.tcp, alloc_read_buffer, on_read);
+  }
+  if (err) {
+    pconnection_detach(pc);
+    pconnection_error(pc, err, what);
+    return true;
+  }
+  return false;
 }
 
 /* Process the leader_q and the UV loop, in the leader thread */
 static pn_event_batch_t *leader_lead_lh(pn_proactor_t *p, uv_run_mode mode) {
+  pn_event_batch_t *batch = NULL;
+  for (psocket_t *ps = pop_lh(&p->leader_q); ps; ps = pop_lh(&p->leader_q)) {
+    assert(!ps->working);
 
-  if (p->timeout_request) {
-    p->timeout_request = false;
-    if (p->timeout) {
-      uv_timer_start(&p->timer, on_timeout, p->timeout, 0);
-    } else {
-      uv_timer_stop(&p->timer);
+    uv_mutex_unlock(&p->lock);  /* Unlock to process each item, may add more items to leader_q */
+    bool needs_work = ps->is_conn ?
+      leader_process_pconnection(as_pconnection(ps)) :
+      leader_process_listener(as_listener(ps));
+    uv_mutex_lock(&p->lock);
+
+    if (needs_work && !ps->working && ps->next == &UNLISTED) {
+      ps->working = true;
+      push_lh(&p->worker_q, ps);
     }
   }
-  for (psocket_t *ps = pop_lh(&p->leader_q); ps; ps = pop_lh(&p->leader_q)) {
-    if (ps->action) {
-      uv_mutex_unlock(&p->lock);
-      ps->action(ps);
-      ps->action = NULL;
-      uv_mutex_lock(&p->lock);
-    } else if (ps->wakeup) {
-      uv_mutex_unlock(&p->lock);
-      ps->wakeup(ps);
-      ps->wakeup = NULL;
-      uv_mutex_lock(&p->lock);
+  batch = get_batch_lh(p);      /* Check for work */
+  if (!batch) { /* No work, run the UV loop */
+    /* Set timeout timer before uv_run */
+    if (p->timeout_request) {
+      p->timeout_request = false;
+      uv_timer_stop(&p->timer);
+      if (p->timeout) {
+        uv_timer_start(&p->timer, on_timeout, p->timeout, 0);
+      }
     }
-    pn_event_batch_t *batch = get_batch_lh(p);
-    if (batch) return batch;
+    uv_mutex_unlock(&p->lock);  /* Unlock to run UV loop */
+    uv_run(&p->loop, mode);
+    uv_mutex_lock(&p->lock);
+    batch = get_batch_lh(p);
   }
-  uv_mutex_unlock(&p->lock);
-  uv_run(&p->loop, mode);
-  uv_mutex_lock(&p->lock);
-  return get_batch_lh(p);
+  return batch;
 }
 
-/*  ==== public API ==== */
+/**** public API ****/
 
 pn_event_batch_t *pn_proactor_get(struct pn_proactor_t* p) {
   uv_mutex_lock(&p->lock);
@@ -788,7 +741,7 @@ pn_event_batch_t *pn_proactor_get(struct pn_proactor_t* p) {
     p->has_leader = true;
     batch = leader_lead_lh(p, UV_RUN_NOWAIT);
     p->has_leader = false;
-    uv_cond_signal(&p->cond);   /* Notify next leader */
+    uv_cond_broadcast(&p->cond);   /* Signal followers for possible work */
   }
   uv_mutex_unlock(&p->lock);
   return batch;
@@ -802,44 +755,32 @@ pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) {
     batch = get_batch_lh(p);
   }
   if (!batch) {                 /* Become leader */
-    assert(!p->has_leader);     /* Implied by loop condition */
     p->has_leader = true;
     do {
       batch = leader_lead_lh(p, UV_RUN_ONCE);
     } while (!batch);
     p->has_leader = false;
-    uv_cond_signal(&p->cond);
+    uv_cond_broadcast(&p->cond); /* Signal a followers. One takes over, many can work. */
   }
   uv_mutex_unlock(&p->lock);
   return batch;
 }
 
 void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) {
-  pconnection_t *pc = batch_pconnection(batch);
-  if (pc) {
-    assert(pc->psocket.state == ON_WORKER);
-    if (pn_connection_driver_has_event(&pc->driver)) {
-      /* Process all events before going back to leader */
-      pconnection_to_worker(pc);
-    } else {
-      to_leader(&pc->psocket, psocket_to_uv);
-    }
-    return;
-  }
-  pn_listener_t *l = batch_listener(batch);
-  if (l) {
-    assert(l->psocket.state == ON_WORKER);
-    to_leader(&l->psocket, psocket_to_uv);
-    return;
+  uv_mutex_lock(&p->lock);
+  psocket_t *ps = batch_psocket(batch); /* FIXME aconway 2017-02-26: replace with switch? */
+  if (ps) {
+    assert(ps->working);
+    assert(ps->next == &UNLISTED);
+    ps->working = false;
+    push_lh(&p->leader_q, ps);
   }
-  pn_proactor_t *bp = batch_proactor(batch);
+  pn_proactor_t *bp = batch_proactor(batch); /* Proactor events */
   if (bp == p) {
-    uv_mutex_lock(&p->lock);
     p->batch_working = false;
-    notify(p);
-    uv_mutex_unlock(&p->lock);
-    return;
   }
+  uv_mutex_unlock(&p->lock);
+  notify(p);
 }
 
 pn_listener_t *pn_event_listener(pn_event_t *e) {
@@ -864,16 +805,16 @@ pn_proactor_t *pn_event_proactor(pn_event_t *e) {
 void pn_proactor_interrupt(pn_proactor_t *p) {
   uv_mutex_lock(&p->lock);
   ++p->interrupt;
-  notify(p);
   uv_mutex_unlock(&p->lock);
+  notify(p);
 }
 
 void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) {
   uv_mutex_lock(&p->lock);
   p->timeout = t;
   p->timeout_request = true;
-  notify(p);
   uv_mutex_unlock(&p->lock);
+  notify(p);
 }
 
 int pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *host, const char *port) {
@@ -881,28 +822,19 @@ int pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *host,
   if (!pc) {
     return PN_OUT_OF_MEMORY;
   }
-  to_leader(&pc->psocket, leader_connect);
+  psocket_start(&pc->psocket);
   return 0;
 }
 
 int pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *host, const char *port, int backlog)
 {
+  assert(!l->closed);
   psocket_init(&l->psocket, p, false, host, port);
   l->backlog = backlog;
-  to_leader(&l->psocket, leader_listen);
+  psocket_start(&l->psocket);
   return 0;
 }
 
-pn_proactor_t *pn_connection_proactor(pn_connection_t* c) {
-  pconnection_t *pc = get_pconnection(c);
-  return pc ? pc->psocket.proactor : NULL;
-}
-
-void pn_connection_wake(pn_connection_t* c) {
-  /* May be called from any thread */
-  wakeup(&get_pconnection(c)->psocket, leader_wake_connection);
-}
-
 pn_proactor_t *pn_proactor() {
   pn_proactor_t *p = (pn_proactor_t*)calloc(1, sizeof(*p));
   p->collector = pn_collector();
@@ -919,8 +851,8 @@ pn_proactor_t *pn_proactor() {
 
 void pn_proactor_free(pn_proactor_t *p) {
   uv_timer_stop(&p->timer);
-  uv_close((uv_handle_t*)&p->timer, NULL);
-  uv_close((uv_handle_t*)&p->async, NULL);
+  uv_safe_close((uv_handle_t*)&p->timer, NULL);
+  uv_safe_close((uv_handle_t*)&p->async, NULL);
   uv_walk(&p->loop, on_stopping, NULL); /* Close all TCP handles */
   while (uv_loop_alive(&p->loop)) {
     uv_run(&p->loop, UV_RUN_ONCE);       /* Run till all handles closed */
@@ -929,9 +861,28 @@ void pn_proactor_free(pn_proactor_t *p) {
   uv_mutex_destroy(&p->lock);
   uv_cond_destroy(&p->cond);
   pn_collector_free(p->collector);
+  /* FIXME aconway 2017-02-25: restore */
+  /* assert(!p->worker_q.front); */
+  /* assert(!p->leader_q.front); */
   free(p);
 }
 
+pn_proactor_t *pn_connection_proactor(pn_connection_t* c) {
+  pconnection_t *pc = get_pconnection(c);
+  return pc ? pc->psocket.proactor : NULL;
+}
+
+void pn_connection_wake(pn_connection_t* c) {
+  /* May be called from any thread */
+  pconnection_t *pc = get_pconnection(c);
+  if (pc) {
+    uv_mutex_lock(&pc->lock);
+    pc->wake = true;
+    uv_mutex_unlock(&pc->lock);
+    psocket_notify(&pc->psocket);
+  }
+}
+
 pn_listener_t *pn_listener(void) {
   pn_listener_t *l = (pn_listener_t*)calloc(1, sizeof(pn_listener_t));
   if (l) {
@@ -948,8 +899,14 @@ pn_listener_t *pn_listener(void) {
 }
 
 void pn_listener_close(pn_listener_t* l) {
-  /* This can be called from any thread, not just the owner of l */
-  wakeup(&l->psocket, leader_listener_close);
+  /* May be called from any thread */
+  uv_mutex_lock(&l->lock);
+  if (!l->closed) {
+    l->closed = true;
+    pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_CLOSE);
+  }
+  uv_mutex_unlock(&l->lock);
+  psocket_notify(&l->psocket);
 }
 
 pn_proactor_t *pn_listener_proactor(pn_listener_t* l) {
@@ -973,13 +930,15 @@ pn_record_t *pn_listener_attachments(pn_listener_t *l) {
 }
 
 int pn_listener_accept(pn_listener_t *l, pn_connection_t *c) {
-  assert(l->psocket.state == ON_WORKER);
-  if (l->accepting) {
-    return PN_STATE_ERR;        /* Only one at a time */
-  }
-  l->accepting = pconnection(l->psocket.proactor, c, true, l->psocket.host, l->psocket.port);
-  if (!l->accepting) {
-    return UV_ENOMEM;
+  assert(l->psocket.working);
+  assert(!l->closed);
+  uv_mutex_lock(&l->lock);
+  pconnection_t *pc = pconnection(l->psocket.proactor, c, true, l->psocket.host, l->psocket.port);
+  if (!pc) {
+    return PN_OUT_OF_MEMORY;
   }
+  push_lh(&l->accept, &pc->psocket);
+  uv_mutex_unlock(&l->lock);
+  psocket_notify(&l->psocket);
   return 0;
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2dae68d6/proton-c/src/tests/proactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/proactor.c b/proton-c/src/tests/proactor.c
index 6595a0b..a0ddcda 100644
--- a/proton-c/src/tests/proactor.c
+++ b/proton-c/src/tests/proactor.c
@@ -28,245 +28,259 @@
 #include <stdlib.h>
 #include <string.h>
 
-static pn_millis_t timeout = 5*1000; /* timeout for hanging tests */
+static pn_millis_t timeout = 7*1000; /* timeout for hanging tests */
 
 static const char *localhost = "127.0.0.1"; /* host for connect/listen */
 
-struct test_events {
-  pn_proactor_t *proactor;
-  pn_event_batch_t *events;
-};
-
-/* Wait for the next single event, return its type */
-static pn_event_type_t wait_next(pn_proactor_t *proactor) {
-  pn_event_batch_t *events = pn_proactor_wait(proactor);
-  pn_event_type_t etype = pn_event_type(pn_event_batch_next(events));
-  pn_proactor_done(proactor, events);
-  return etype;
-}
-
-/* Get events until an event of `type` or a PN_TRANSPORT_CLOSED/PN_PROACTOR_TIMEOUT */
-static pn_event_type_t wait_for(pn_proactor_t *proactor, pn_event_type_t etype) {
-  while (true) {
-    pn_event_type_t t = wait_next(proactor);
-    if (t == etype || t == PN_PROACTOR_TIMEOUT) {
-      return t;
-    }
-  }
-}
-
-/* Test that interrupt and timeout events cause pn_proactor_wait() to return. */
-static void test_interrupt_timeout(test_t *t) {
-  pn_proactor_t *p = pn_proactor();
-  pn_proactor_interrupt(p);
-  pn_event_type_t etype = wait_next(p);
-  TEST_CHECK(t, PN_PROACTOR_INTERRUPT == etype, pn_event_type_name(etype));
-  pn_proactor_set_timeout(p, 1); /* very short timeout */
-  etype = wait_next(p);
-  TEST_CHECK(t, PN_PROACTOR_TIMEOUT == etype, pn_event_type_name(etype));
-  pn_proactor_free(p);
-}
-
-/* Test handler return value  */
-typedef enum {
-  H_CONTINUE,                   /**@<< handler wants more events */
-  H_FINISHED,                   /**@<< handler completed without error */
-  H_FAILED                      /**@<< handler hit an error and cannot continue */
-} handler_state_t;
-
-typedef handler_state_t (*test_handler_fn)(test_t *, pn_event_t*);
+typedef int (*test_handler_fn)(test_t *, pn_event_t*);
 
 /* Proactor and handler that take part in a test */
 typedef struct proactor_test_t {
-  test_t *t;
   test_handler_fn handler;
+  test_t *t;
   pn_proactor_t *proactor;
-  handler_state_t state;                    /* Result of last handler call */
 } proactor_test_t;
 
 
 /* Initialize an array of proactor_test_t */
-static void proactor_test_init(proactor_test_t *pts, size_t n) {
+static void proactor_test_init(proactor_test_t *pts, size_t n, test_t *t) {
   for (proactor_test_t *pt = pts; pt < pts + n; ++pt) {
+    if (!pt->t) pt->t = t;
     if (!pt->proactor) pt->proactor = pn_proactor();
     pn_proactor_set_timeout(pt->proactor, timeout);
-    pt->state = H_CONTINUE;
   }
 }
 
-/* Iterate over an array of proactors, draining or handling events with the non-blocking
-   pn_proactor_get.  Continue till all handlers return H_FINISHED (and return 0) or one
-   returns H_FAILED  (and return non-0)
-*/
+#define PROACTOR_TEST_INIT(A, T) proactor_test_init(A, sizeof(A)/sizeof(*A), (T))
+
+static void proactor_test_free(proactor_test_t *pts, size_t n) {
+  for (proactor_test_t *pt = pts; pt < pts + n; ++pt) {
+    pn_proactor_free(pt->proactor);
+  }
+}
+
+#define PROACTOR_TEST_FREE(A) proactor_test_free(A, sizeof(A)/sizeof(*A))
+
+/* Run an array of proactors till a handler returns non-0 */
 static int proactor_test_run(proactor_test_t *pts, size_t n) {
-  /* Make sure pts are initialized */
-  proactor_test_init(pts, n);
-  size_t finished = 0;
-  do {
-    finished = 0;
+  int ret = 0;
+  while (!ret) {
     for (proactor_test_t *pt = pts; pt < pts + n; ++pt) {
       pn_event_batch_t *events = pn_proactor_get(pt->proactor);
       if (events) {
-          pn_event_t *e;
-          while ((e = pn_event_batch_next(events))) {
-            if (pt->state == H_CONTINUE) {
-              pt->state = pt->handler(pt->t, e);
-            }
+          pn_event_t *e = pn_event_batch_next(events);
+          TEST_CHECKF(pts->t, e, "empty batch");
+          while (e && !ret) {
+            if (!(ret = pt->handler(pt->t, e)))
+              e = pn_event_batch_next(events);
           }
           pn_proactor_done(pt->proactor, events);
       }
-      switch (pt->state) {
-       case H_CONTINUE: break;
-       case H_FINISHED: ++finished; break;
-       case H_FAILED: return 1;
-      }
     }
-  } while (finished < n);
-  return 0;
+  }
+  return ret;
 }
 
+#define PROACTOR_TEST_RUN(A) proactor_test_run((A), sizeof(A)/sizeof(*A))
+
+/* Wait for the next single event, return its type */
+static pn_event_type_t wait_next(pn_proactor_t *proactor) {
+  pn_event_batch_t *events = pn_proactor_wait(proactor);
+  pn_event_type_t etype = pn_event_type(pn_event_batch_next(events));
+  pn_proactor_done(proactor, events);
+  return etype;
+}
+
+/* Test that interrupt and timeout events cause pn_proactor_wait() to return. */
+static void test_interrupt_timeout(test_t *t) {
+  pn_proactor_t *p = pn_proactor();
+  TEST_CHECK(t, pn_proactor_get(p) == NULL); /* idle */
+  pn_proactor_interrupt(p);
+  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INTERRUPT, wait_next(p));
+  TEST_CHECK(t, pn_proactor_get(p) == NULL); /* idle */
+  pn_proactor_set_timeout(p, 1); /* very short timeout */
+  TEST_ETYPE_EQUAL(t, PN_PROACTOR_TIMEOUT, wait_next(p));
+  pn_proactor_free(p);
+}
 
-/* Handler for test_listen_connect, does both sides of the connection */
-static handler_state_t listen_connect_handler(test_t *t, pn_event_t *e) {
+/* Common handler for simple client/server interactions,  */
+static int common_handler(test_t *t, pn_event_t *e) {
   pn_connection_t *c = pn_event_connection(e);
   pn_listener_t *l = pn_event_listener(e);
 
   switch (pn_event_type(e)) {
-    /* Act on these events */
-   case PN_LISTENER_ACCEPT: {
-    pn_connection_t *accepted = pn_connection();
-    pn_connection_open(accepted);
-    pn_listener_accept(l, accepted); /* Listener takes ownership of accepted */
-    return H_CONTINUE;
-   }
+
+    /* Stop on these events */
+   case PN_LISTENER_OPEN:
+   case PN_PROACTOR_TIMEOUT:
+   case PN_TRANSPORT_CLOSED:
+   case PN_PROACTOR_INACTIVE:
+   case PN_LISTENER_CLOSE:
+    return pn_event_type(e);
+
+   case PN_LISTENER_ACCEPT:
+    pn_listener_accept(l, pn_connection());
+    return 0;
 
    case PN_CONNECTION_REMOTE_OPEN:
-    if (pn_connection_state(c) | PN_LOCAL_ACTIVE) { /* Client is fully open - the test is done */
-      pn_connection_close(c);
-    }  else {                   /* Server returns the open */
-      pn_connection_open(c);
-    }
-    return H_CONTINUE;
+    pn_connection_open(c);      /* Return the open (no-op if already open) */
+    return 0;
 
    case PN_CONNECTION_REMOTE_CLOSE:
-    if (pn_connection_state(c) | PN_LOCAL_ACTIVE) {
-      pn_connection_close(c);    /* Return the close */
-    }
-    return H_CONTINUE;
-
-   case PN_TRANSPORT_CLOSED:
-    return H_FINISHED;
+    pn_connection_close(c);     /* Return the close */
+    return 0;
+
+    /* Ignored these events */
+   case PN_CONNECTION_INIT:
+   case PN_CONNECTION_BOUND:
+   case PN_CONNECTION_LOCAL_OPEN:
+   case PN_CONNECTION_LOCAL_CLOSE:
+   case PN_TRANSPORT:
+   case PN_TRANSPORT_ERROR:
+   case PN_TRANSPORT_HEAD_CLOSED:
+   case PN_TRANSPORT_TAIL_CLOSED:
+    return 0;
 
    default:
-    return H_CONTINUE;
-    break;
+    TEST_ERRORF(t, "unexpected event %s", pn_event_type_name(pn_event_type(e)));
+    return 0;                   /* Fail the test but keep going */
   }
 }
 
-/* Test bad-address error handling for listen and connect */
-static void test_early_error(test_t *t) {
-  pn_proactor_t *p = pn_proactor();
-  pn_proactor_set_timeout(p, timeout); /* In case of hang */
-  pn_connection_t *c = pn_connection();
-  pn_proactor_connect(p, c, localhost, "1"); /* Bad port */
-  pn_event_type_t etype = wait_for(p, PN_TRANSPORT_CLOSED);
-  TEST_CHECK(t, PN_TRANSPORT_CLOSED == etype, pn_event_type_name(etype));
-  TEST_CHECK(t, pn_condition_is_set(pn_transport_condition(pn_connection_transport(c))), "");
-
-  pn_listener_t *l = pn_listener();
-  pn_proactor_listen(p, l, localhost, "1", 1); /* Bad port */
-  etype = wait_for(p, PN_LISTENER_CLOSE);
-  TEST_CHECK(t, PN_LISTENER_CLOSE == etype, pn_event_type_name(etype));
-  TEST_CHECK(t, pn_condition_is_set(pn_listener_condition(l)), "");
-
-  pn_proactor_free(p);
+/* close a connection when it is remote open */
+static int open_close_handler(test_t *t, pn_event_t *e) {
+  switch (pn_event_type(e)) {
+   case PN_CONNECTION_REMOTE_OPEN:
+    pn_connection_close(pn_event_connection(e));
+    return 0;          /* common_handler will finish on TRANSPORT_CLOSED */
+   default:
+    return common_handler(t, e);
+  }
 }
 
-/* Simplest client/server interaction with 2 proactors */
-static void test_listen_connect(test_t *t) {
-  proactor_test_t pts[] =  { { t, listen_connect_handler }, { t, listen_connect_handler } };
-  proactor_test_init(pts, 2);
+/* Simple client/server connection with 2 proactors */
+static void test_client_server(test_t *t) {
+  proactor_test_t pts[] ={ { open_close_handler }, { common_handler } };
+  PROACTOR_TEST_INIT(pts, t);
   pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor;
-  test_port_t port = test_port();          /* Hold a port */
-
+  test_port_t port = test_port();
   pn_proactor_listen(server, pn_listener(), localhost, port.str, 4);
-  pn_event_type_t etype = wait_for(server, PN_LISTENER_OPEN);
-  if (TEST_CHECK(t, PN_LISTENER_OPEN == etype, pn_event_type_name(etype))) {
-    sock_close(port.sock);
-    pn_proactor_connect(client, pn_connection(), localhost, port.str);
-    proactor_test_run(pts, 2);
-  }
-  pn_proactor_free(client);
-  pn_proactor_free(server);
+  TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
+  pn_proactor_connect(client, pn_connection(), localhost, port.str);
+  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
+  sock_close(port.sock);
+  PROACTOR_TEST_FREE(pts);
 }
 
-static handler_state_t connection_wakeup_handler(test_t *t, pn_event_t *e) {
-  pn_connection_t *c = pn_event_connection(e);
+/* Return on connection open, close and return on wake */
+static int open_wake_handler(test_t *t, pn_event_t *e) {
   switch (pn_event_type(e)) {
-
    case PN_CONNECTION_REMOTE_OPEN:
-    if (pn_connection_state(c) | PN_LOCAL_UNINIT) {
-      pn_connection_open(c);    /* Server returns the open */
-    }
-    return H_FINISHED;          /* Finish when open at both ends */
-
+    return pn_event_type(e);
+   case PN_CONNECTION_WAKE:
+    pn_connection_close(pn_event_connection(e));
+    return pn_event_type(e);
    default:
-    /* Otherwise same as listen_connect_handler */
-    return listen_connect_handler(t, e);
+    return common_handler(t, e);
   }
 }
 
 /* Test waking up a connection that is idle */
-static void test_connection_wakeup(test_t *t) {
-  proactor_test_t pts[] =  { { t, connection_wakeup_handler }, { t, connection_wakeup_handler } };
-  proactor_test_init(pts, 2);
+static void test_connection_wake(test_t *t) {
+  proactor_test_t pts[] =  { { open_wake_handler }, { common_handler } };
+  PROACTOR_TEST_INIT(pts, t);
   pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor;
   test_port_t port = test_port();          /* Hold a port */
   pn_proactor_listen(server, pn_listener(), localhost, port.str, 4);
-  pn_event_type_t etype = wait_for(server, PN_LISTENER_OPEN);
-  if (TEST_CHECK(t, PN_LISTENER_OPEN == etype, pn_event_type_name(etype))) {
-    sock_close(port.sock);
-    pn_connection_t *c = pn_connection();
-    pn_proactor_connect(client, c, localhost, port.str);
-    proactor_test_run(pts, 2);                          /* Will finish when client is connected */
-    TEST_CHECK(t, NULL == pn_proactor_get(client), ""); /* Should be idle */
-    pn_connection_wake(c);
-    etype = wait_next(client);
-    /* FIXME aconway 2017-02-21: TEST_EVENT_TYPE */
-    TEST_CHECK(t, PN_CONNECTION_WAKE == etype, pn_event_type_name(etype));
-  }
-  pn_proactor_free(client);
-  pn_proactor_free(server);
+  TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
+  pn_connection_t *c = pn_connection();
+  pn_proactor_connect(client, c, localhost, port.str);
+  TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, PROACTOR_TEST_RUN(pts));
+  TEST_CHECK(t, pn_proactor_get(client) == NULL); /* Should be idle */
+  pn_connection_wake(c);
+  TEST_ETYPE_EQUAL(t, PN_CONNECTION_WAKE, PROACTOR_TEST_RUN(pts));
+  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
+  sock_close(port.sock);
+  PROACTOR_TEST_FREE(pts);
 }
 
 /* Test that INACTIVE event is generated when last connections/listeners closes. */
 static void test_inactive(test_t *t) {
-  proactor_test_t pts[] =  { { t, listen_connect_handler }, { t, listen_connect_handler }};
-  proactor_test_init(pts, 2);
+  proactor_test_t pts[] =  { { open_wake_handler }, { common_handler } };
+  PROACTOR_TEST_INIT(pts, t);
   pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor;
   test_port_t port = test_port();          /* Hold a port */
 
   pn_listener_t *l = pn_listener();
   pn_proactor_listen(server, l, localhost, port.str,  4);
-  pn_event_type_t etype = wait_for(server, PN_LISTENER_OPEN);
-  if (TEST_CHECK(t, PN_LISTENER_OPEN == etype, pn_event_type_name(etype))) {
-    sock_close(port.sock);
-    pn_proactor_connect(client, pn_connection(), localhost, port.str);
-    proactor_test_run(pts, 2);
-    etype = wait_for(client, PN_PROACTOR_INACTIVE);
-    pn_listener_close(l);
-    etype = wait_for(server, PN_PROACTOR_INACTIVE);
-  }
-  pn_proactor_free(client);
-  pn_proactor_free(server);
+  TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
+  pn_connection_t *c = pn_connection();
+  pn_proactor_connect(client, c, localhost, port.str);
+  TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, PROACTOR_TEST_RUN(pts));
+  pn_connection_wake(c);
+  TEST_ETYPE_EQUAL(t, PN_CONNECTION_WAKE, PROACTOR_TEST_RUN(pts));
+  /* expect TRANSPORT_CLOSED from client and server, INACTIVE from client */
+  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
+  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
+  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
+  /* server won't be INACTIVE until listener is closed */
+  TEST_CHECK(t, pn_proactor_get(server) == NULL);
+  pn_listener_close(l);
+  TEST_ETYPE_EQUAL(t, PN_LISTENER_CLOSE, PROACTOR_TEST_RUN(pts));
+  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
+
+  sock_close(port.sock);
+  PROACTOR_TEST_FREE(pts);
+}
+
+#define TEST_CHECK_ERROR(T, WANT, COND) do {                            \
+    TEST_CHECKF((T), pn_condition_is_set(COND), "expecting error");     \
+    const char* description = pn_condition_get_description(COND);       \
+    if (!strstr(description, (WANT))) {                                 \
+      TEST_ERRORF((T), "bad error, expected '%s' in '%s'", (WANT), description); \
+    }                                                                   \
+  } while(0)
+
+/* Tests for error handling */
+static void test_errors(test_t *t) {
+  proactor_test_t pts[] =  { { open_wake_handler }, { common_handler } };
+  PROACTOR_TEST_INIT(pts, t);
+  pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor;
+  test_port_t port = test_port();          /* Hold a port */
+
+  /* Invalid connect/listen parameters */
+  pn_connection_t *c = pn_connection();
+  pn_proactor_connect(client, c, localhost, "xxx");
+  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
+  TEST_CHECK_ERROR(t, "xxx", pn_transport_condition(pn_connection_transport(c)));
+  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
+
+  pn_listener_t *l = pn_listener();
+  pn_proactor_listen(server, l, localhost, "xxx", 1);
+  TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
+  TEST_ETYPE_EQUAL(t, PN_LISTENER_CLOSE, PROACTOR_TEST_RUN(pts));
+  TEST_CHECK_ERROR(t, "xxx", pn_listener_condition(l));
+  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
+
+  /* Connect with no listener */
+  c = pn_connection();
+  pn_proactor_connect(client, c, localhost, port.str);
+  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
+  TEST_CHECK(t, pn_condition_is_set(pn_transport_condition(pn_connection_transport(c))));
+  TEST_CHECK_ERROR(t, "connection refused", pn_transport_condition(pn_connection_transport(c)));
+  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
+
+  sock_close(port.sock);
+  PROACTOR_TEST_FREE(pts);
 }
 
+
 int main(int argc, char **argv) {
   int failed = 0;
   RUN_ARGV_TEST(failed, t, test_inactive(&t));
   RUN_ARGV_TEST(failed, t, test_interrupt_timeout(&t));
-  RUN_ARGV_TEST(failed, t, test_early_error(&t));
-  RUN_ARGV_TEST(failed, t, test_listen_connect(&t));
-  RUN_ARGV_TEST(failed, t, test_connection_wakeup(&t));
+  RUN_ARGV_TEST(failed, t, test_errors(&t));
+  RUN_ARGV_TEST(failed, t, test_client_server(&t));
+  RUN_ARGV_TEST(failed, t, test_connection_wake(&t));
   return failed;
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2dae68d6/proton-c/src/tests/test_tools.h
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/test_tools.h b/proton-c/src/tests/test_tools.h
index 7006334..97dac3f 100644
--- a/proton-c/src/tests/test_tools.h
+++ b/proton-c/src/tests/test_tools.h
@@ -21,6 +21,7 @@
  */
 
 #include <proton/type_compat.h>
+#include <proton/event.h>
 
 #include <errno.h>
 #include <stdarg.h>
@@ -28,24 +29,46 @@
 #include <stdlib.h>
 #include <string.h>
 
-/*
-  All output from test marcros goes to stdout not stderr, error messages are normal for a test.
-  Some errno handling functions are thread-unsafe
-  */
-
+/* A struct to collect the results of a test, created by RUN_TEST macro. */
+typedef struct test_t {
+  const char* name;
+  int errors;
+  uintptr_t data;               /* Test can store some non-error data here */
+} test_t;
 
-/* Call via TEST_ASSERT macros */
-static void assert_fail_(const char* cond, const char* file, int line, const char *fmt, ...) {
-  printf("%s:%d: Assertion failed: %s", file, line, cond);
+/* Internal, use macros. Print error message and increase the t->errors count.
+   All output from test marcros goes to stdout not stderr, error messages are normal for a test.
+*/
+static void test_vlogf_(test_t *t, const char *prefix, const char* expr,
+                        const char* file, int line, const char *fmt, va_list ap)
+{
+  printf("%s:%d", file, line);
+  if (prefix && *prefix) printf(": %s", prefix);
+  if (expr && *expr) printf(": %s", expr);
   if (fmt && *fmt) {
-    va_list ap;
-    va_start(ap, fmt);
-    printf(" - ");
+    printf(": ");
     vprintf(fmt, ap);
-    printf("\n");
-    fflush(stdout);
-    va_end(ap);
   }
+  if (t) printf(" [%s]", t->name);
+  printf("\n");
+  fflush(stdout);
+}
+
+static void test_errorf_(test_t *t, const char *prefix, const char* expr,
+                         const char* file, int line, const char *fmt, ...) {
+  va_list ap;
+  va_start(ap, fmt);
+  ++t->errors;
+  test_vlogf_(t, prefix, expr, file, line, fmt, ap);
+  va_end(ap);
+}
+
+/* Call via TEST_ASSERT macros */
+static void assert_fail_(const char* expr, const char* file, int line, const char *fmt, ...) {
+  va_list ap;
+  va_start(ap, fmt);
+  test_vlogf_(NULL, "assertion failed", expr, file, line, fmt, ap);
+  va_end(ap);
   abort();
 }
 
@@ -63,32 +86,42 @@ static void assert_fail_(const char* cond, const char* file, int line, const cha
   TEST_ASSERTF((expr), "%s", strerror(err))
 
 
-/* A struct to collect the results of a test.
- * Declare and initialize with TEST_START(t) where t will be declared as a test_t
- */
-typedef struct test_t {
-  const char* name;
-  int errors;
-} test_t;
-
-/* if !expr print the printf-style error and increment t->errors. Use via macros. Returns expr. */
+/* Internal, use macros */
 static inline bool test_check_(test_t *t, bool expr, const char *sexpr, const char *file, int line, const char* fmt, ...) {
   if (!expr) {
+    ++t->errors;
     va_list ap;
     va_start(ap, fmt);
-    printf("%s:%d:[%s] check failed: (%s)", file, line, t->name, sexpr);
-    if (fmt && *fmt) {
-      printf(" - ");
-      vprintf(fmt, ap);
-    }
-    printf("\n");
-    fflush(stderr);
-    ++t->errors;
+    test_vlogf_(t, "check failed", sexpr, file, line, fmt, ap);
+    va_end(ap);
   }
   return expr;
 }
 
-#define TEST_CHECK(TEST, EXPR, ...) test_check_((TEST), (EXPR), #EXPR, __FILE__, __LINE__, __VA_ARGS__)
+/* Print a message but don't mark the test as having an error */
+#define TEST_LOGF(TEST, ...) \
+  test_logf_((TEST), "info", NULL, __FILE__, __LINE__, __VA_ARGS__)
+
+/* Print an error with printf-style message, increment TEST->errors */
+#define TEST_ERRORF(TEST, ...) \
+  test_errorf_((TEST), "error", NULL, __FILE__, __LINE__, __VA_ARGS__)
+
+/* If EXPR is false, print and record an error for t  */
+#define TEST_CHECKF(TEST, EXPR, ...) \
+  test_check_((TEST), (EXPR), #EXPR, __FILE__, __LINE__, __VA_ARGS__)
+
+/* If EXPR is false, print and record an error for t including EXPR  */
+#define TEST_CHECK(TEST, EXPR) \
+  test_check_((TEST), (EXPR), #EXPR, __FILE__, __LINE__, "")
+
+static inline bool test_etype_equal_(test_t *t, int want, int got, const char *file, int line) {
+  return test_check_(t, want == got, NULL, file, line, "want %s got %s",
+                     pn_event_type_name((pn_event_type_t)want),
+                     pn_event_type_name((pn_event_type_t)got));
+}
+
+#define TEST_ETYPE_EQUAL(TEST, WANT, GOT) \
+  test_etype_equal_((TEST), (WANT), (GOT), __FILE__, __LINE__)
 
 /* T is name of a test_t variable, EXPR is the test expression (which should update T)
    FAILED is incremented if the test has errors
@@ -166,12 +199,14 @@ static int sock_port(sock_t sock) {
   return ntohs(port);
 }
 
+/* Combines includes a sock_t with the int and char* versions of the port for convenience */
 typedef struct test_port_t {
   sock_t sock;
   int port;
   char str[256];
 } test_port_t;
 
+/* Create a test_port_t  */
 static inline test_port_t test_port(void) {
   test_port_t tp = {0};
   tp.sock = sock_bind0();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org